Distributed System Examples

 You must download pika from RabbitMQ to run these examples. You can also download a Python version of another AMPQ protocol. Check that pika is working by executing the tests that the pika website recommends before running the examples given here.

Example 1

This simple example has two virtual machines, each with one process. One process publishes a stream that it generates and the other process subscribes to that stream. Run the publisher on one computer and the subscriber on another computer; first, make sure that the tests recommended for pika are working on your computers. You can also run the publisher and subscriber on different terminal shells of a single computer.

This example is similar to example 1 of Multiprocess Examples. The code for the multiprocess and distributed cases are similar. The differences are that (1) the processes in the distributed case are distributed processes whereas in the multiprocess case they are shared memory processes, and (2) in the distributed case, the sending process publishes a stream to which the receiving process subscribes whereas in the multiprocess case, the sending and receiving stream are connected through shared memory.

The publication is called ‘copy_of_source_list’ in this example, with the publisher specifying:

publishers=[(proc_0, 'out', 'copy_of_source_list')])

and the subscriber specifying

subscribers=[(proc_1, 'in', 'copy_of_source_list')])

The code for VM_0, the publisher

def single_process_publisher():
    source_list = range(10)
    def source(out_stream):
        return source_list_to_stream(
            source_list, out_stream, time_interval=0.2)

    def compute_func(in_streams, out_streams):
        map_element(
            func=lambda x: x,
            in_stream=in_streams[0],
            out_stream=out_streams[0])

    proc_0 = distributed_process(
        compute_func=compute_func,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[('in', source)],
        connect_actuators=[],
        name='proc_0')

    vm_0 = VM(
        processes=[proc_0],
        connections=[],
        publishers=[(proc_0, 'out', 'copy_of_source_list')])
    vm_0.start()

The code for VM_1, the subscriber

def single_process_subscriber():

    def compute_func(in_streams, out_streams):
        stream_to_file(in_streams[0], 'result.dat')

    proc_1 = distributed_process(
        compute_func=compute_func,
        in_stream_names=['in'],
        out_stream_names=[],
        connect_sources=[],
        connect_actuators=[],
        name='proc_1')

    vm_1 = VM(
        processes=[proc_1],
        connections=[],
        subscribers=[(proc_1, 'in', 'copy_of_source_list')])
    vm_1.start()

Run both virtual machines concurrently. The computer running the subscriber will have the source_list appended to a file called result.dat.

Example 2

This example has two virtual machines each of which consists of two processes. The first VM, called vm_0 has processes proc_0 and proc_1. This VM publishes a stream called ‘publication’. The process, proc_0, has a source which generates a stream. The other process, proc_1, reads the stream generated by proc_0 and carries out an operation on it; in the example it adds 10 to each element and publishes the result as ‘publication’.

The second VM, called vm_1 has processes proc_2 and proc_3. This VM subscribes to the stream called ‘publication’. The process, proc_2, gets the stream from ‘publication’ and carries out an operation on it (multiplies elements by 100) and passes the output stream to proc_3 which prints the results in a file.

Streams passed within a VM use shared memory whereas streams passed across VMs use the APMQ protocol. For instance, the stream passed from proc_0 to proc_1 in vm_0, and the stream passed from proc_2 to proc_3 in vm_1, use shared memory to communicate. By contrast, the stream passed from proc_1 in vm_0 to proc_2 to in vm_1 uses APMQ.

Note: You can have the same VM be both a publisher and a subscriber.

The code for VM_0

def two_process_publisher():
    source_list = range(10)
    def source(out_stream):
        return source_list_to_stream(source_list, out_stream)
    
    def compute_0(in_streams, out_streams):
        map_element(
            func=lambda x: x,
            in_stream=in_streams[0], out_stream=out_streams[0])

    proc_0 = distributed_process(
        compute_func=compute_0,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[('in', source)],
        name='process_0')

    def compute_1(in_streams, out_streams):
        map_element(
            func=lambda x: x+10,
            in_stream=in_streams[0], out_stream=out_streams[0])
                    
    proc_1 = distributed_process(
        compute_func=compute_1,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[],
        name='process_1'
        )

    vm_0 = VM(
        processes=[proc_0, proc_1],
        connections=[(proc_0, 'out', proc_1, 'in')],
        publishers=[(proc_1, 'out', 'publication')])
    vm_0.start()

The code for VM_1

def two_process_subscriber():
    def compute_2(in_streams, out_streams):
        map_element(
            func=lambda x: x*100,
            in_stream=in_streams[0], out_stream=out_streams[0])

    proc_2 = distributed_process(
        compute_func=compute_2,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[],
        name='process_3')

    def compute_3(in_streams, out_streams):
        stream_to_file(
            in_streams[0], 'result.dat')
                    
    proc_3 = distributed_process(
        compute_func=compute_3,
        in_stream_names=['in'],
        out_stream_names=[],
        connect_sources=[],
        name='process_1'
        )
    vm_1 = VM(
        processes=[proc_2, proc_3],
        connections=[(proc_2, 'out', proc_3, 'in')],
        subscribers=[(proc_2, 'in', 'publication')])
    vm_1.start()