Multiple Threads: Sensors and Actuators

See IoTPy/multiprocessing/multicore.py for code

and IoTPy/tests/multicore_test.py for tests and examples.

IoTPy’s Two Goals: The earlier pages deal with the first IoTPy goal: Get non-terminating functions that operate on endless streams by reusing terminating functions that operate on conventional data structures such as lists and arrays. Now we describe how IoTPy satisfies its other goal: Easily build multithreaded, multicore, distributed applications that connect sensors and social media to the cloud.

In this section we describe multithreaded applications. The examples in the directory listed above have extensive documentation. You may find the example code sufficient to use the concepts described below.

Multiple threads are used for getting input from sensors, social media and other sources and for controlling actuators and other devices. A source thread acquires data from sensors and other sources and puts the data into a stream. Data from each output stream of a computation is put into a queue, with one queue for each output stream. An actuator thread reads data from a queue and uses the data to control a device or some other purpose. The application has one source thread for each input stream of the computation, one actuator thread and queue for each output stream, and one thread for the computation itself.

A multithreaded, single process application is specified by:

  1. sources: a list of functions where each function interfaces with a sensor or some other data source and returns values which are put in an input stream of the computation.

  2. actuators: a list of functions where each function gets data from a queue and then carries out some operation on the data.

  3. compute_func, a function that processes data on its input streams to produce data on its output stream. The parameters of compute_func are (1) a list of input streams and (2) a list of output streams, where the numbers of input and output streams equal the numbers of sources and actuators (respectively). Each source feeds one input stream of the computation. Likewise, each output stream of the computation feeds a queue from which data is acquired by an actuator thread.

A multithreaded application is created and run by the command:

multithread(sources, actuators, compute_func)

Example

def multithread_example():
    # STEP 1: DEFINE SOURCES.
    # Each function has a single parameter which is a stream.
    def source_file(s):
        return source_file_to_stream(
            lambda x: int(x), s, filename='test.dat',
            time_interval=0.005, num_steps=10)

    # STEP 2: DEFINE ACTUATORS
    # Each function has a single parameter which is a queue.
    def print_from_queue(q):
        stopped = False
        while True:
            try:
                v = q.get(timeout=0.05)
            except:
                return
            print 'next value in queue is ', v

    # STEP 3: DEFINE THE COMPUTATION
    # A function with parameters in_streams and out_streams which
    # are lists of streams.
    def f(in_streams, out_streams):
        map_element(lambda v: v, in_streams[0], out_streams[0])

    # STEP 4: CALL multithread(sources, actuators, compute_func)
    # sources and actuators are lists of functions. Their lengths
    # are equal to the lengths of in_streams and out_streams.
    multithread([source_file], [print_from_queue], f)

The function source_file_to_stream returns a thread which reads a line of the file with the specified file name and converts the line to int and puts the integer on the specified stream s, waits for time_interval seconds, and then repeats the step for a total of num_steps times. The function source_file (see step 1) which is passed to the computation - see step 4 - is a function with a single parameter which is a single stream.

Each actuator is a function with a single parameter which is a queue. In this example, the actuator is the function print_from_queue which gets data from its queue, waiting for at most timeout seconds, and prints the data to the screen.

The computation - see step 3 - is a single function f with two parameters: in_streams and out_streams which are lists of streams. In this example, the computation merely copies its single input stream to its single output stream.

The call to multithread - see step 4 - starts all the source and actuator threads and the computation thread. In this example, the application has a single source (source_file) and a single actuator (print_from_queue), and a computation, f.

Click here to download examples of multithread applications or you can download these examples from GitHub at IoTPy/examples/multiprocess_examples/examples_multithread.py.

Next, let’s look at threads for sources such as sensors and social media: Source Threads, Sensors, Social Media.