Distributed Applications

See IoTPy/IoTPy/multiprocessing/distributed.py and in IoTPy/IoTPy/tests/ look at VM_publisher_test & VM_subscriber_test, and also test_1 and test_2.

An IoTPy distributed application runs on computers that communicate with each other across the Internet. IoTPy implements distributed applications by using AMQP, the Advanced Message Queuing Protocol. IoTPy code uses the RabbitMQ implementation of AMQP. Install the Python version of RabbitMQ called Pika on each computer running a distributed application. You could also install some Python version of AMQP other than RabbitMQ. The IoTPy code has been tested only with RabbitMQ.

You need to install AMQP only for processes that are part of distributed applications. If your application consists of a single multiprocess program (in which processes communicate only through shared memory) then you don’t need AMQP.

IoTPy has a stream layer on top of AMQP. The stream layer allows developers to design their applications in a single, uniform way - connect streams - whether the underlying execution environment consists of a single core computer, a single multicore computer, or a network of multicore computers.

A VM (virtual machine) is a program that runs on a single computer and which communicates with other VMs by publishing streams and subscribing to streams. A VM is implemented as a set of distributed processes; these processes can execute in parallel on different cores of a computer. All the distributed processes within a single VM can communicate through the shared memory space of that VM. Processes in different VMs communicate by publishing and subscribing to streams.

Example

Publishing and subscribing to streams

Publishing and subscribing to streams

The diagram above illustrates an example of communicating streams across the internet by using publish/subscribe. The example has two VMs: VM_0 and VM_1. Each of these VMs could have a different IP address and port. VM_0 consists of a single process called proc_0 and VM_1 consists of a single process called proc_1. A VM may have an arbitrary number of processes. Processes on different computers (and therefore in different address spaces ) communicate by publishing and subscribing for streams. By contrast, processes within the same computer (and therefore the same shared address space) communicate by connecting streams directly without having to publish and subscribe.

In the above diagram, the output stream named w in proc_0 publishes stream A; so, the contents of the stream with name w are copied into stream A. In the diagram, the output stream named z in proc_1 is published as stream B. The input stream named t of proc_0 subscribes to stream B; so, the contents of B are copied into the stream named t in proc_0; thus, the contents of stream z of proc_1 are copied into stream t in proc_0. The input streams u and v of proc_1 subscribe to stream A; so the contents of stream A (and therefore of stream w in proc_0) are copied into the streams named u and v in proc_1.

In the diagram, the output stream x of proc_0 and the output stream y of proc_1 are not published, and the input stream s of proc_1 is not subscribed to any stream. These streams may be connected to sensor or actuator threads. Also, if there are multiple processes in the same computer (not shown in this example) then these streams may be connected to streams of other processes on the same computer.

The name of a published stream is global. In the above example, a process can subscribe to stream ‘A’ - the process does not have to specify that ‘A’ is the name of a published stream which is a copy of stream w in process proc_1.

Virtual Machine

A VM (virtual machine) is a set of distributed processes. The attributes of a VM are:

  1. processes: a list of component processes

  2. connections: a list of 4-tuples as in shared memory process

  3. publications: a list of 3-tuples, where each tuple specifies a publication of a stream.

  4. subscriptions: a list of 3-tuples, where each tuple specifies a subscription to a stream.

The above example is encoded as:

vm_0 = VM(
   processes=[proc_0], connections=[],
   publications=[(proc_0, 'w', 'A')],
   subscriptions=[(proc_0, 't', 'B')])
vm_1 = VM(
   processes=[proc_1], connections=[],
   publications=[(proc_1, 'z', 'B')],
   subscriptions=[(proc_1, 'u', 'A'), (proc_1, 'v', 'A')])

In this example, the publications of vm_0 is a list consisting of a single triple

(proc_0, 'w', 'A')

which states that the published stream called ‘A’ is obtained by copying the output stream called ‘w’ in the process proc_0. Likewise, the subscriptions of vm_0 is a list consisting of a single triple

(proc_0, 't', 'B')

which states that the published stream called ‘B’ is copied into the input stream called ‘t’ in the process proc_0.

Distributed Process

A distributed process has exactly the same attributes as a shared memory process and has additional methods for publishing and subscribing to streams. For convenience, the attributes are given again below. A distributed process has:

  1. in_stream_names: A list of names of its input streams

  2. out_stream_names: A list of names of its output streams

  3. connect_sources: A list of connections to sources

  4. connect_actuators: A list of connections to actuators

  5. compute_func: A function that reads the process’ input streams and extends its output streams.

A distributed process is created by calling the function distributed_process. For example, we may have the following statement in a program running in one computer:

vm_0 = VM(
   processes=[proc_0, proc_1, proc_2],
   connections=[(proc_0, 'out', proc_1, 'in')],
   publications=[(proc_1, 'rand', 'random')],
   subscribers=[(proc_2, 'in', 'random_sums')])
Virtual machine VM0 consists of processes proc_1, proc_2, and proc_3

Virtual machine VM0 consists of processes proc_1, proc_2, and proc_3

Execution of the above statement creates a VM, vm_0, with three component processes, proc_0, proc_1 and proc_2. The VM has a single shared memory connection (proc_0, 'out', proc_1, 'in') in which elements of the output stream named ‘out’ in proc_0 are put into the input stream named ‘in’ of proc_1. This VM has a single publication: (proc_1, 'rand', 'random)and it has a single subscription: (proc_2, 'in', 'random_sum’). The output stream named ‘rand’ in proc_1 is copied into the stream called ‘random’ and the stream called random_sum is copied into the input stream named ‘in’ in proc_2.

We may have the following statement in a program running in a different computer or the same computer:

vm_1 = VM(
   processes=[proc_0],
   connections=[],
   publications=[(proc_0, 'out', 'random_sums')],
   subscribers=[(proc_0, 'in', 'random')])

This statement creates a VM, vm_1, which has a single component process, proc_0. This VM has a single publication, (proc_0, 'out', 'random_sum’), and a single subscription. (proc_0, 'in', 'random’).

If the description up to this point, coupled with the examples in IoTPy/IoTPy/tests/ are clear then skip the remainder of this page.

MORE DETAIL

Publishing Streams and Subscribing to Streams

A VM publishes a stream by specifying a publication which is a triple:

  1. a component process of the VM,

  2. a local stream name of the component process,

  3. a global stream name.

The attribute publishers of a VM is a list of triples such as:

publishers=[(proc_0, 'out', 'random_sums')]

which, in this example, identifies that the stream called ‘out’ within the component distributed process, proc_0, of the VM has a global name ‘random_sums’.

Likewise, a VM subscribes to a stream by specifying a subscriber which is a triple, identical to a publisher. The attribute subscribers of a VM is a list of triples such as:

subscribers=[(proc_2, 'in', 'random_sums')]

which, in this example, identifies that the stream called ‘in’ within the component distributed process, proc_2, of the VM is the stream with the global name ‘random_sums’. In this example, elements of the stream called ‘out’ within the component process proc_0, of one VM will be copied into the stream called ‘in’ within the component process proc_2, of the other VM, because both streams have the same global name ‘random_sums’.

You can have multiple publishers with the same publication name such as:

(proc_0, 'out', 'A'), (proc_1, 'out', 'A')

In this case the contents of the publication are a merge of the contents of the output streams that feed the publication. In the example above, the stream called ‘A’ will have elements copied from the stream called ‘out’ of proc_0 and also from the stream called ‘out’ of proc_1

Publications and subscriptions of streams are implemented as publications and subscriptions of topics in a direct exchange in AMQP.

Example

This example creates two VMs on different computers or in different terminal shells on the same computer. One VM, vm_0, publishes a stream with the global name “sequence” and the other VM, vm_1, subscribes to that stream. The stream published by vm_0 is 1*p, 2*p, 3*p, …. n*p, where p and n are numbers, which in the code is p = 7 and n = 10. vm_1 merely prints the stream.

Diagram showing processes within virtual machines of the example

Diagram showing processes within virtual machines of the example

The publisheR: vm_0

This VM consists of a single process proc_0 which has a single source. proc_0 has a single input stream and a single output stream called ‘in’ and ‘out’ respectively. proc_0 has a single source which generates the stream 1, 2, 3, … 10 which feeds the input stream ‘in’ of the computational function. The computational function multiplies these values by p = 7 to produce the output stream 7, 14, 21, … 70 on stream named ‘out’. The VM publishes an output stream named ‘out’ in proc_0 giving it the publication name ‘sequence’.

The source

The source outputs the sequence 1, 2, 3, …., 10

   def source(out_stream):
        def generate_sequence(state):
            return state+1, state+1
        return source_func_to_stream(
            func=generate_sequence, out_stream=out_stream,
            time_interval=0.1, num_steps=10, state=0)

The Computational Function: compute_func

This function has a single input stream and a single output stream. The elements of its output stream are p times the corresponding elements of its input stream where p is 7.

    def f(in_streams, out_streams):
        map_element(
            func=lambda x: 7*x,
            in_stream=in_streams[0],
            out_stream=out_streams[0])

Make a distributed process

This distributed process has a single input stream named ‘in’ and a single output stream named ‘out’. The sequence 1, 2, 3,…, 10 generated by the source is fed to the input stream of compute_func (i.e., f), and its output stream consists of elements 7, 14, 21, …, 70. This process has no actuators.

    proc_0 = distributed_process(
        compute_func=f,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[('in', source)],
        connect_actuators=[],
        name='proc_0')

Make and start a VM

This VM consists of a single process, proc_0. It has no shared-memory connections between its processes. The output stream named ‘out’ of proc_0 is published as the global name ‘sequence’.

    vm_0 = VM(
        processes=[proc_0],
        connections=[],
        publishers=[(proc_0, 'out', 'sequence')])
    vm_0.start()

The Subscriber VM_1

This VM also consists of a single process, proc_1 which has no sources and no actuators. Its computational function has a single input stream and no output streams. The input stream is named ‘in’. The input stream named ‘in’ of proc_1 subscribes to the published stream with the global name “sequence”.

The Computational Function: compute_func

This computational function has a single input stream and no output stream. It merely prints the elements of its input stream.

    def g(in_streams, out_streams):
        def print_element(v): print 'stream element is ', v
        sink_element(
            func=print_element, in_stream=in_streams[0])

Make a distributed process

This distributed process has a single input stream, which is called ‘in’, and it has no output streams. The number of input stream names - i.e. the length of in_stream_names - equals the number of input streams of compute_func. Likewise, the number of output stream names matches the number of output streams of compute_func. This process has no sources or actuators.

    proc_1 = distributed_process(
        compute_func=g,
        in_stream_names=['in'],
        out_stream_names=[],
        connect_sources=[],
        connect_actuators=[],
        name='proc_1')

Make and start a VM

This VM consists of a single process, proc_1. It has no shared memory connections among its component processes. It has no publications. The input stream named ‘in’ of its component process proc_1 subscribes to the stream with the global name ‘sequence’.

    vm_1 = VM(
        processes=[proc_1],
        connections=[],
        subscribers=[(proc_1, 'in', 'sequence')])
    vm_1.start()