IoTPy Wrappers: Functions that Create Agents

An IoTPy wrapper is a function that creates a non-terminating agent that reads and writes endless streams; the wrapper encapsulates a terminating function that operates on bounded-size objects such as lists and arrays. Wrappers are used by the decorators described in Quick Start.

An agent is specified by its input streams, output streams, state and a transition function. The transition function maps elements on its input streams and its state to elements appended to its output streams and its next state. An agent can be given a name which is useful for testing programs and logging output. An agent, ag, can also be halted by calling ag.halt().

(An agent, ag, that has been halted can be restarted by calling ag.restart(); however, the behavior of an agent that is restarted is nondeterministic — it may reread parts of stream that it had read before it was halted, and it may skip reading parts of streams. You will not need to restart agents in almost all the applications that you write.)

The structure of an IoTPy wrapper is:

wrapper(func, in_streams, out_streams, state=None, **kwargs)

where wrapper is a function from the IoTPy wrapper library or a wrapper function that you write, func is a terminating function, in_streams and out_streams are lists of streams, state is an arbitrary object, and **kwargs is the set of additional keyword arguments, if any, of func. Note that state is optional.

The parameter for some wrappers may be in_stream instead of in_streams, or out_stream instead of out_streams. When the parameter name is plural, as in in_streams, then the corresponding argument must be a list of streams, whereas when the parameter name is singular, as in in_stream, the corresponding argument must be a single stream. The wrapper returns an agent. For example,

ag = wrapper(func, ....)

allows us to manipulate the agent, ag. You can also get access to the agent that implements relations using decorators such as r_mul, r_add, r_sub, …, as well as other relational forms such as @map_e, @merge_e, @map_w, @split_e, ….For example, with

ag = r_mul(x, 10)

you can manipulate agent ag. Similarly, with

@map_e
def double(v): return 2*v
double_agent = double(x, y)

You can manipulate the agent, double_agent.

IoTPy Wrapper Library Organization.

See IoTPy/IoTPy/agent_types

Wrappers are in categories based on the numbers of inputs and outputs of the agents that they create.

  • Single input, single output are in category op for operator. IoTPy/IoTPy/agent_types/op.py

  • Multiple inputs, single output in merge. IoTPy/IoTPy/agent_types/merge.py

  • Single input, multiple outputs in split. IoTPy/IoTPy/agent_types/split.py

  • No inputs, single output in source. IoTPy/IoTPy/agent_types/source.py

  • No outputs, single input in sink or actuator. IoTPy/IoTPy/agent_types/sink.py

  • And multiple inputs, multiple outputs in multi. IoTPy/IoTPy/agent_types/merge.py

An example of a wrapper: map_element

Next we use the simplest wrapper - map_element - an instance of the op category, as an example. We consider the stateless case (i.e. the state parameter has its default value, None).

Note: The j-th element of a stream s is referred to as s[j], and the number of elements in s is len(s).

The statement:

ag = map_element(func=f, in_stream=x, out_stream=y)

creates an agent ag. The wrapper map_element returns an agent.

In most cases you won’t need to query the agent after it is created and so you don’t need to assign the agent returned by map_element to an agent. In these cases you could just write:

map_element(func=f, in_stream=x, out_stream=y)

Assuming that y is not an output stream of any other agent, and y is initialized to the empty stream, this agent ensures that:

len(y) <= len(x)
y[j] = f(x[j]), for all j, where j <= len(x).  

The operation of the agent is as follows. While len(x) = len(y), the agent is idle. While len(x) > len(y), the agent appends additional values to stream y. (See theory for a temporal logic specification.)

For example, with

def f(v): return 2*v

the elements of stream y will be double those of the corresponding stream x. So, if x is [0, 1, 2] at some point then from some later point onwards, the first 3 elements of y will be [0, 2, 4].

The statement that creates an agent is used just like any other Python statement. For example, you use standard Python programming to create agents that, in turn, create other agents. Also, exactly as Python allows you to define a more powerful function by embedding other functions, you can create more powerful agents by embedding component agents.

Functional Programming

You can write the above agent in a functional programming manner using a different wrapper.

y = map_element_f(f, x)

map_element_f returns the output stream whereas the output stream is passed as a parameter in to map_element. For each wrapper, wrap, in the agent style with the output stream as a parameter, there is a wrapper in functional programming style, wrap_f, where the output stream is returned by the wrapper. For example the output stream is passed as a parameter to map_window whereas the output stream is returned by map_window_f. Use whichever construct - agent or functional - you prefer.

Example: illustrating standard python programming for streams

Here is an example that shows that all you need to create applications consisting of complex networks of agents is standard Python programming.

Assume that we are given functions:

local_monitor(sensor_stream, local_alarm, threshold)
aggregator(local_alarms, regional_alarm)

where the parameters sensor_stream, local_alarm and regional_alarm are streams, local_alarms is a list of streams, and where threshold is a float. We are given that calling the function local_monitor creates an agent that reads sensor_stream and appends values to local_alarm depending on the value of threshold. And, we are given that calling the function aggregator creates an agent that reads streams in local_alarms and appends values to regional_alarm.

Let’s construct a function that reads multiple sensor streams in a region and generates regional alarms. We are given a parameter sensor_streams which is a list of streams, where each stream in the list contains a sequence of sensor measurements for a sensor in the region. Let’s write a function, regional_monitor which reads the streams in sensor_streams and appends values to regional_alarm, where the value added (either alarm or no_alarm) depends on threshold.

def regional_monitor(sensor_streams, regional_alarm, threshold):
   num_sensors = len(sensor_streams)
   local_alarms = [Stream() for i in range(num_sensors)]
   local_agents = [
     local_monitor(sensor_streams[i], local_alarms[i], threshold)
     for i in range(num_sensors)]
   aggregation_agent = aggregator(local_alarms, regional_alarm)

Next, skim through the library of wrappers rapidly; you can select the wrapper that you want later while you are coding. If you want to use NumPy then look at stream arrays.

Next:

Wrapper types or NumPy: Stream Arrays