Distributed Applications

See IoTPy/IoTPy/multiprocessing/distributed.py and

IoTPy/IoTPy/tests/VM_test, VM_publisher_test, VM_subscriber_test

An IoTPy distributed application runs on computers that communicate with each other across the Internet. IoTPy implements distributed applications by using AMQP, the Advanced Message Queuing Protocol. IoTPy code uses the RabbitMQ implementation of AMQP. Install the Python version of RabbitMQ called Pika on each computer running a distributed application. You could also install some Python version of AMQP other than RabbitMQ. The IoTPy code has been tested only with RabbitMQ.

You need to install AMQP only for processes that are part of distributed applications. If your application consists of a single multiprocess program (in which processes communicate only through shared memory) then you don’t need AMQP.

IoTPy has a stream layer on top of AMQP. The stream layer allows developers to design their applications in a single, uniform way - connect streams - whether the underlying execution environment consists of a single core computer, a single multicore computer, or a network of multicore computers.

A VM (virtual machine) is a program that runs on a single computer and which communicates with other VMs by publishing streams and subscribing to streams. A VM is implemented as a set of distributed processes; these processes can execute in parallel on different cores of a computer. All the distributed processes within a single VM can communicate through the shared memory space of that VM. Processes in different VMs communicate by publishing and subscribing to streams.

Publishing and subscribing to streams

Publishing and subscribing to streams

In the above diagram, the output stream named w in VM 1 publishes stream A; so, the contents of the stream name w are copied into stream A. In the diagram, the output stream named z in VM 2 publishes stream B. The input stream named t of VM 1 subscribes to stream B; so, the contents of B are copied into the stream named t in VM 1. The input streams u and v of VM 2 subscribe to stream A; so the contents of stream A are copied into the streams named u and v in VM 1. In the diagram, the output stream x of VM1 and the output stream y of VM 2 are not published, and the input stream s of VM 2 is not subscribed to any stream.

Distributed Process

A distributed process has exactly the same attributes as a shared memory process and has additional methods for publishing and subscribing to streams. For convenience, the attributes are given again below. A distributed process has:

  1. in_stream_names: A list of names of its input streams

  2. out_stream_names: A list of names of its output streams

  3. connect_sources: A list of connections to sources

  4. connect_actuators: A list of connections to actuators

  5. compute_func: A function that reads the process’ input streams and extends its output streams.

A distributed process is created by calling the function distributed_process.

Virtual Machine

A VM (virtual machine) is a set of distributed processes. The attributes of a VM are:

  1. processes: a list of component processes

  2. connections: a list of 4-tuples as in shared memory process

  3. publications: a list of 3-tuples, where each tuple specifies a publication of a stream.

  4. subscriptions: a list of 3-tuples, where each tuple specifies a subscription to a stream.

For example, we may have the following statement in a program running in one computer:

vm_0 = VM(
   processes=[proc_0, proc_1, proc_2],
   connections=[(proc_0, 'out', proc_1, 'in')],
   publications=[(proc_1, 'rand', 'random')],
   subscribers=[(proc_2, 'in', 'random_sums')])
Virtual machine VM0 consists of processes proc_1, proc_2, and proc_3

Virtual machine VM0 consists of processes proc_1, proc_2, and proc_3

Execution of the above statement creates a VM, vm_0, with three component processes, proc_0, proc_1 and proc_2. The VM has a single shared memory connection (proc_0, 'out', proc_1, 'in') in which elements of the output stream named ‘out’ in proc_0 are put into the input stream named ‘in’ of proc_1. This VM has a single publication: (proc_1, 'rand', 'random)and it has a single subscription: (proc_2, 'in', 'random_sum’). The output stream named ‘rand’ in proc_1 is copied into the stream called ‘random’ and the stream called random_sum is copied into the input stream named ‘in’ in proc_2.

We may have the following statement in a program running in a different computer or the same computer:

vm_1 = VM(
   processes=[proc_0],
   connections=[],
   publications=[(proc_0, 'out', 'random_sums')],
   subscribers=[(proc_0, 'in', 'random')])

This statement creates a VM, vm_1, which has a single component process, proc_0. This VM has a single publication, (proc_0, 'out', 'random_sum’), and a single subscription. (proc_0, 'in', 'random’)

Publishing Streams and Subscribing to Streams

A VM publishes a stream by specifying a publication which is a triple:

  1. a component process of the VM,

  2. a local stream name of the component process,

  3. a global stream name.

The attribute publishers of a VM is a list of triples such as:

publishers=[(proc_0, 'out', 'random_sums')]

which, in this example, identifies that the stream called ‘out’ within the component distributed process, proc_0, of the VM has a global name ‘random_sums’.

Likewise, a VM subscribes to a stream by specifying a subscriber which is a triple, identical to a publisher. The attribute subscribers of a VM is a list of triples such as:

subscribers=[(proc_2, 'in', 'random_sums')]

which, in this example, identifies that the stream called ‘in’ within the component distributed process, proc_2, of the VM is the stream with the global name ‘random_sums’. In this example, elements of the stream called ‘out’ within the component process proc_0, of one VM will be copied into the stream called ‘in’ within the component process proc_2, of the other VM, because both streams have the same global name ‘random_sums’.

You can have multiple publishers with the same publication name such as:

(proc_0, 'out', 'A'), (proc_1, 'out', 'A')

In this case the contents of the publication are a merge of the contents of the output streams that feed the publication. In the example above, the stream called ‘A’ will have elements copied from the stream called ‘out’ of proc_0 and also from the stream called ‘out’ of proc_1

Publications and subscriptions of streams are implemented as publications and subscriptions of topics in a direct exchange in AMQP.

Example

This example creates two VMs on different computers or in different terminal shells on the same computer. One VM, vm_0, publishes a stream with the global name “sequence” and the other VM, vm_1, subscribes to that stream. The stream published by vm_0 is 1*p, 2*p, 3*p, …. n*p, where p and n are numbers, which in the code is p = 7 and n = 10. vm_1 merely prints the stream.

Diagram showing processes within virtual machines of the example

Diagram showing processes within virtual machines of the example

The publisheR: vm_0

This VM consists of a single process proc_0 which has a single source. proc_0 has a single input stream and a single output stream called ‘in’ and ‘out’ respectively. proc_0 has a single source which generates the stream 1, 2, 3, … 10 which feeds the input stream ‘in’ of the computational function. The computational function multiplies these values by p = 7 to produce the output stream 7, 14, 21, … 70 on stream named ‘out’. The VM publishes an output stream named ‘out’ in proc_0 giving it the publication name ‘sequence’.

The source

The source outputs the sequence 1, 2, 3, …., 10

   def source(out_stream):
        def generate_sequence(state):
            return state+1, state+1
        return source_func_to_stream(
            func=generate_sequence, out_stream=out_stream,
            time_interval=0.1, num_steps=10, state=0)

The Computational Function: compute_func

This function has a single input stream and a single output stream. The elements of its output stream are p times the corresponding elements of its input stream where p is 7.

    def f(in_streams, out_streams):
        map_element(
            func=lambda x: 7*x,
            in_stream=in_streams[0],
            out_stream=out_streams[0])

Make a distributed process

This distributed process has a single input stream named ‘in’ and a single output stream named ‘out’. The sequence 1, 2, 3,…, 10 generated by the source is fed to the input stream of compute_func (i.e., f), and its output stream consists of elements 7, 14, 21, …, 70. This process has no actuators.

    proc_0 = distributed_process(
        compute_func=f,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[('in', source)],
        connect_actuators=[],
        name='proc_0')

Make and start a VM

This VM consists of a single process, proc_0. It has no shared-memory connections between its processes. The output stream named ‘out’ of proc_0 is published as the global name ‘sequence’.

    vm_0 = VM(
        processes=[proc_0],
        connections=[],
        publishers=[(proc_0, 'out', 'sequence')])
    vm_0.start()

The Subscriber VM_1

This VM also consists of a single process, proc_1 which has no sources and no actuators. Its computational function has a single input stream and no output streams. The input stream is named ‘in’. The input stream named ‘in’ of proc_1 subscribes to the published stream with the global name “sequence”.

The Computational Function: compute_func

This computational function has a single input stream and no output stream. It merely prints the elements of its input stream.

    def g(in_streams, out_streams):
        def print_element(v): print 'stream element is ', v
        sink_element(
            func=print_element, in_stream=in_streams[0])

Make a distributed process

This distributed process has a single input stream, which is called ‘in’, and it has no output streams. The number of input stream names - i.e. the length of in_stream_names - equals the number of input streams of compute_func. Likewise, the number of output stream names matches the number of output streams of compute_func. This process has no sources or actuators.

    proc_1 = distributed_process(
        compute_func=g,
        in_stream_names=['in'],
        out_stream_names=[],
        connect_sources=[],
        connect_actuators=[],
        name='proc_1')

Make and start a VM

This VM consists of a single process, proc_1. It has no shared memory connections among its component processes. It has no publications. The input stream named ‘in’ of its component process proc_1 subscribes to the stream with the global name ‘sequence’.

    vm_1 = VM(
        processes=[proc_1],
        connections=[],
        subscribers=[(proc_1, 'in', 'sequence')])
    vm_1.start()