See IoTPy/multiprocessing/multicore.py for code
and IoTPy/tests/multicore_test.py for tests and examples.
The examples in the directory listed above have extensive documentation. You may find the example code sufficient to use the concepts described below.
In this section we describe a multicore application: a stream-based framework for shared-memory multiprocess programs, i.e., programs consisting of multiple processes executing in the same memory space. A multiprocess application is a set of communicating shared memory processes. We first describe the shared memory process and then multiprocess. We discuss programs that run on distributed systems - networks of computers with different IP addresses - in the distributed application section.
SHARED MEMORY PROCESS
A shared memory process is a single Python process. A shared memory process has:
in_stream_names: A list of names or identifiers of its input streams
out_stream_names: A list of names or identifiers of its output streams
connect_sources: A list of connections to sources
connect_actuators: A list of connections to actuators
compute_func: A function that reads the process’ input streams and extends its output streams.
A shared memory process is created by calling the function shared_memory_process.
shared_memory_process( compute_func, in_stream_names, out_stream_names, connect_sources, connect_actuators)
Each source and each actuator executes in its own thread, and compute_func also executes in its own thread. A source thread acquires data from a sensor or other data source and puts the data into the input stream of compute_func to which the source is connected. Sources have been discussed in the section called sources; for simplicity we give another example below.
def random_number_source(s): return source_function( func=random.random(), out_stream=s, num_steps=1000)
This function puts random numbers on stream s.
If an output stream of compute_func is connected to an actuator then data from that output stream is put in the queue that feeds that actuator. An actuator is any function that reads a queue and processes the data that it reads. Usually an actuator controls a device, such as a thermostat, based on the values it reads from its queue. An example of a simple actuator is:
def print_from_queue(q): while True: v = q.get() if v is None: return else: print 'next value is ', v
The signature of compute_func is:
where in_streams is a list of input streams of the function and out_streams is a list of output streams of the function. A simple example of a function with this signature is:
def f(in_streams, out_streams): map_element(func=lamba v: v, in_stream=in_streams, out_stream=out_streams)
This function merely copies its single input stream to its single output stream.
Names or identifiers of Streams
The j-th name in in_stream_names is the name or identifier associated with the j-th stream in in_streams, for all j. Likewise, the j-th name in out_stream_names is the name or identifier associated with the j-th stream in out_streams, for all j. The lengths of in_streams and out_streams should equal the lengths of in_stream_names and out_stream_names, respectively.
Each entry of in_stream_names must be unique. Likewise, Each entry of out_stream_names must be unique. This is because we want the name of an input stream to uniquely identify a single input stream, and we want the name of an output stream to uniquely identify a single output stream.
Next, let’s look at an example of in_stream_names for a compute_func which has in_streams consisting of 3 streams of accelerometer readings from the east, north and vertical directions. We could call these 3 streams, “east”, “north” and “vertical” as in:
in_stream_names = ['east', 'north', 'vertical']
You could also use numbers for identifiers, as in:
in_stream_names = range(3)
Connecting sources and actuators
connect_sources is a list of pairs where each pair is: (in_stream_name, s) where in_stream_name is an element of in_stream_names and where s is a source. The data generated by s is placed on the input stream uniquely identified by the name in_stream_name.
connect_actuators is a list of pairs where each pair is: (out_stream_name, act) where out_stream_name is an element of out_stream_names and where act is an actuator. The data in the stream called out_stream_name is placed in the queue read by act.
Example: in_stream_names is [‘temperature’, ‘pressure’], and we have two sources called t_source and p_source. We want to connect t_source to the input stream called ‘temperature’ and connect p_source to the input stream called ‘pressure’. We do so with the following connect statement:
connect_sources = [ ('temperature', t_source), ('pressure', 'p_source)]
An example of the creation of a shared memory process, proc is shown below. Process proc merely prints random numbers. For more examples see IoTPy/IoTPy/tests/multiprocessing/multicore.py
proc=shared_memory_process( compute_func=f, in_stream_names=['in'], out_stream_names=['out'], connect_sources=[('in', random_number_source)], connect_actuators=[['out', print_from_queue]] )
A multiprocess program consists of a set of shared memory processes that communicate with each other using shared memory. An input stream of a shared memory process can be connected to an output stream of a shared memory process within the same multiprocess program. If an input stream t of a shared memory process p is connected to an output stream u of a shared memory process q, then the elements of u are copied into t.
An input stream of a shared memory process can also get data from a source, and an output stream can send data to an actuator.
Connections between shared memory processes
You create a multicore application by creating a set of shared memory processes, including their sources and actuators, and then connecting their output streams and input streams. A connection is specified by the identity of the sending process, the name of the output stream, the identity of the receiving process, and the name of the input stream. Note that a connection is defined by the names of streams, and names are character strings (str). An example of a connection is (p, ‘x’, r, ‘v’) where p and r are stream processes, and ‘x’ is the name of an output stream of stream process p, and ‘v’ is the name of an input stream of steam process r.
The following example shows the connections for a multiprocess program with three shared memory processes, p, q, r, as pictured in the next diagram and three connections. The same name can be used for streams in different shared memory processes; for example 'x', is the name of an output stream of process p and also of an input stream of process q.
Execute a multiprocess application for the above example:
#Create stream processes p = shared_memory_process(....) q = shared_memory_process(....) r = shared_memory_process(....) # Start and run the multicore application. mp = Multiprocess( processes=[p, q, r], connections=[(p, 'y', q, 'x'), (p, 'x, r, 'y') (q, 'y', r, 'u')] )