single process examples


The code for these examples is found in IoTPy/examples/single_process_examples. One of the files in this folder is which has extensive documentation on creating and executing applications consisting of single processes. Another file in the same folder is This file sets up a harness for testing several simple examples.


A process has:

  1. one or more sources of data,

  2. a function that processes the data,

  3. possibly one or more actuators that consume the data, and

  4. possibly connections to other processes in the same memory space or other spaces.

Sources, actuators, and the function that processes data, all execute in their own threads.

We begin with examples consisting of a single process, a single source and no actuators. An application consisting of a single process has no connections to other processes. The output is written to a file or to the screen. A function that writes a file does not need to run in its own thread because it is unlikely to block; so, these examples have no actuators. These examples have two threads: (1) for the source and (2) for the compute function that processes the data generated by the source.

Examples with actuators are given later.

A harness for testing the code

We will use a harness for testing our code. The harness has an input list and and an expected (correct) output list or lists. The harness creates a single multiprocess application consisting of a single shared memory process which has a single source. The harness consists of the following components.

The source

source_list = range(10)
def source(out_stream):
    return source_list_to_stream(source_list, out_stream)

This source puts source_list on the stream out_stream. In this example source_list is 0, 1, .. , 9.

Checking correctness of outputs: the sink

def check_correctness_of_output(in_stream, check_list):
    def check(v, index, check_list):
        # v is in_stream[index]
        assert v == check_list[index]
        next_index = index + 1
        return next_index
    sink_element(func=check, in_stream=in_stream, state=0,

The first argument, v, of the function check, is the next element of in_stream. Its second argument, index, is the state of the agent. The value of v is in_stream[index]. The expected correct output is check_list. The function check_correctness_of_output creates a sink agent which throws an assertion exception if in_stream[index] is different from check_list[index].

Creating and running an application consisting of a single process with a single source

We create a shared memory process, proc, and then create a multiprocess application consisting of this single process, and then run the application by calling the function: make_and_run_process

def make_and_run_process(compute_func):
    proc = shared_memory_process(
        connect_sources=[('in', source)],
    mp = Multiprocess(processes=[proc], connections=[])

proc has a single input stream called ‘in’ and it has no output streams and no actuators. The single source is connected to this input stream. mp is a multiprocess application consisting of the single process, proc.

All that remains for us to test agents is to provide compute_func and use the given source_list (which is 0, 1, … , 9) or give a different list, and provide the correct output check_list.

examples of map_element

Example 1

def compute_func(in_streams, out_streams):
   def f(x): return x*10
   check_list = map(f, source_list)
   t = Stream()
      func=f, in_stream=in_streams[0], out_stream=t)
      in_stream=t, check_list=check_list)


For any function f, the expected output is:

map(f, source_list)

For a source_list of [0, 1, …, 9] and the given function f, the expected output is [0, 10, 20, …, 90]. If you want to see the output, put the output in a file using the agent stream stream_to_file by adding the following line to the code:

stream_to_file(in_stream=t, filename='example_output.txt')

Example 2: Character Strings

Let’s make source_list the string ‘hello world’ and so the input stream consists of the sequence of characters ‘h’ followed by ‘e’ followed by ‘l’, …. And let’s make f the function string.upper. Then the output sequence of the map_element agent is ‘HELLO WORLD’, i.e. 'the character ‘H’ followed by the character ‘E’, and so on.

Example 3: State and additional parameters

In this example, the state is the running sum of the input elements. With the input elements set to [0, 1, …, 9] the sequence of states is [0, 0, 1, 3, 6, 10, 15, 21, 28, 36] which gives the following stream of output values: [0, 2, 5, 9, 14, 20, 27, 35, 44, 54].

   class example(object):
        def __init__(self, multiplicand):
            self.multiplicand = multiplicand
            self.running_sum = 0
        def step(self, v):
            result = v * self.multiplicand + self.running_sum
            self.running_sum += v
            return result

    def compute_func(in_streams, out_streams):
        eg = example(multiplicand=2)
        check_list = [0, 2, 5, 9, 14, 20, 27, 35, 44, 54]
        t = Stream()
            func=eg.step, in_stream=in_streams[0], out_stream=t)
            in_stream=t, check_list=check_list)


examples of FILTER ELEMENT


In this example, the output is the same as the input except that elements x for which f(x) is True are deleted.

    def compute_func(in_streams, out_streams):
        def f(x): return x < 5
        check_list = filter(f, source_list)
        t = Stream()
            func=f, in_stream=in_streams[0], out_stream=t)
            in_stream=t, check_list=check_list)

In the example, with source_list = [0, 1, … , 9], and the given function f, the output of the filter_element agent will be [0, 1, 2, 3, 4]

Example 2: filter with state

This example passes in_stream[j] through the filter when in_stream[j] <= j. So, if the input stream is [1, 1, 3, 3, 5, 5, 7, 7, 9, 9] then the output stream from the filter_element agent is [1, 3, 5, 7, 9].

This agent has state which is n, the number of steps that the agent has taken. The parameters of the function, func, (which is the function less_than_n) are v and n, where v is in_stream[j] on the j-th step of the filter_element agent, and n is its state. The function returns a 2-tuple, the first element of which is used to determine the output stream of the agent, and the second element is the next state of the agent. Thus, the sequence of states is 0, 1, 2, 3, ….. The first element of the 2-tuple returned by the function on the j-th step is True exactly when in_stream[j] <= j, and this determines what elements of the input stream are passed through to the output stream.

     def compute_func(in_streams, out_streams):
        def less_than_n(v, n):
            return v <= n, n+1
        check_list = [1, 3, 5, 7, 9]
        t = Stream()
            func=less_than_n, in_stream=in_streams[0], out_stream=t,
            in_stream=t, check_list=check_list)

Many more examples, with extensive documentation, are found in IoTPy/examples/single_process_examples/