Your Code + IoTPy: Multiple Threads 

This section tells you how to integrate your code running in one or more threads with IoTPy code running in separate threads.

You can create a thread running IoTPy as an instance of the class iThread. This class has two parameters:

  1. in_streams: a list of input streams of the agents executing in the thread. Each input stream must have a name.

  2. output_queues: a list of queues into which the agent in the thread put results.

Here is an example that creates an instance of iThread.

ithread = iThread(in_streams=[x], output_queues=[x_q])

The agent executing in an iThread has input streams but no output stream. It puts output stream values into output queues. A thread interacts with an IoTPy thread in the following ways:

Slide1.jpg
  1. Extend input streams of the agent in the IoTPy thread.

  2. Get data from output queues of the agent.

  3. Execute a terminated command if an input stream of the IoTPy thread will no longer be extended. If the terminated command has been executed for all input streams of the IoTPy thread then the thread terminates.

  4. start and join threads, and get shared data from the thread.

Putting data into an IoTPy thread

ithread.extend(in_stream_name='x', list_of_elements=[0, 1])

In the above call ithread is an instance of iThread. ‘x’ is the name of an input stream of the agent executing in the IoTPy thread. The call causes the stream called ‘x’ to be extended by the list of elements.

ithread.append(in_stream_name='x', element=0)

causes the element 0 to be appended to the stream with name ‘x’.

Indicating that aN iotpy thread will get no further data

A thread tells an IoTPy thread that the agent’s input streams will no longer be extended by calling:

ithread.terminated(in_stream_name='x')

The IoTPy thread terminates execution after all of its input streams have terminated.

Indicating that all iotpy threads will get no further data

In some cases, you may find that it is convenient to tell the IoTPy thread that no further data will be arriving on all its inputs. A thread tells an IoTPy thread that all of the agent’s input streams will no longer be extended by calling:

ithread.finished()

You can also start and join an iThread just like other threads.

Example

This is a very simple example in which the IoTPy thread merely copies values that it receives in its input stream called ‘x’ into its output queue called x_q. Whe

# Create streams and agents that will execute
# in iThread.
x = Stream('x')
# Create agent which puts values in x into queue x_q
stream_to_queue(x, x_q)
Create thread which receives data in iput stream x
ithread = iThread(in_streams=[x], output_queues=[x_q])
# Start thread.
ithread.start()
# Put data into streams.
test_data = list(range(5))
ithread.extend(in_stream_name='x', list_of_elements=test_data)
# Indicate stream is finished
ithread.finished()
# Join threads
ithread.thread.join()