Multiprocess Examples

The code for these examples is in:

IoTPy/examples/multiprocess_examples/simple_multiprocess_examples.py

 Example 1

This simple example has two processes. A process, proc_0, has a source that generates a stream and the other process, proc_1, puts the contents of the stream on a file.

def example_1():
   # Define the process, proc_0
    source_list = range(10)
    def source(out_stream):
        return source_list_to_stream(source_list, out_stream)
    
    def compute_0(in_streams, out_streams):
        map_element(
            func=lambda x: x,
            in_stream=in_streams[0], out_stream=out_streams[0])

    proc_0 = shared_memory_process(
        compute_func=compute_0,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[('in', source)],
        name='process_0')

    # Define the process, proc_1
    def compute_1(in_streams, out_streams):
        stream_to_file(in_stream=in_streams[0],
                       filename='result_1.dat')
    proc_1 = shared_memory_process(
        compute_func=compute_1,
        in_stream_names=['in'],
        out_stream_names=[],
        connect_sources=[],
        name='process_1'
        )

    # Create the multiprocess application by
    # connecting streams in the processes.
    vm = Multiprocess(
        processes=[proc_0, proc_1],
        connections=[(proc_0, 'out', proc_1, 'in')])
    vm.run()

In this example, the process proc_0 has a single input called ‘in’ and a single output called ‘out’ as specified in the lines:

in_stream_names = ['in']
out_stream_names = ['out']

It has a single source which feeds the input called ‘in’ as specified by the line:

connect_sources = [('in', source)]

The process proc_1 has a single input, also called ‘in’ and it has no outputs and no sources. The multiprocess application consists of the processes proc_0 and proc_1 as specified in the line:

processes = [proc_0, proc_1]

The output called ‘out’ of proc_0 is connected to the input called ‘in’ of proc_1 as specified in the line:

connections=[(proc_0, 'out', proc_1, 'in')])

Running this application puts source_list in result_1.dat.

Example 2

This example has four processes, proc_0, proc_1, proc_2, and proc_3. The output stream from proc_0 feeds the input streams of both proc_1 and proc_2. The output streams of proc_1 and proc_2 feed the two input streams of proc_3.

def example_2():
    # Specify the process, proc_0
    source_list = range(10)
    def source(out_stream):
        return source_list_to_stream(source_list, out_stream)
    
    def compute_0(in_streams, out_streams):
        map_element(
            func=lambda x: x,
            in_stream=in_streams[0], out_stream=out_streams[0])

    proc_0 = shared_memory_process(
        compute_func=compute_0,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[('in', source)],
        name='process_0')

    # Specify the process, proc_1
    def compute_1(in_streams, out_streams):
        map_element(
            func=lambda x: 10*x,
            in_stream=in_streams[0],
            out_stream=out_streams[0])

    proc_1 = shared_memory_process(
        compute_func=compute_1,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[],
        name='process_1'
        )

    # Specify the process, proc_2
    def compute_2(in_streams, out_streams):
        map_element(
            func=lambda x: 1000*x,
            in_stream=in_streams[0],
            out_stream=out_streams[0])

    proc_2 = shared_memory_process(
        compute_func=compute_2,
        in_stream_names=['in'],
        out_stream_names=['out'],
        connect_sources=[],
        name='process_2'
        )

    # Specify the process, proc_3
    def compute_3(in_streams, out_streams):
        t = Stream()
        zip_stream(in_streams, t)
        stream_to_file(t, 'result_2.dat')

    proc_3 = shared_memory_process(
        compute_func=compute_3,
        in_stream_names=['in_1', 'in_2'],
        out_stream_names=[],
        connect_sources=[],
        name='process_3'
        )

    # Specify the multiprocess application.
    vm = Multiprocess(
        processes=[proc_0, proc_1, proc_2, proc_3],
        connections=[(proc_0, 'out', proc_1, 'in'),
                     (proc_0, 'out', proc_2, 'in'),
                     (proc_1, 'out', proc_3, 'in_1'),
                     (proc_2, 'out', proc_3, 'in_2')])
    vm.run()

Running this application puts source_list into the file result_2.dat.