IoTPy Wrappers: Functions that Create Agents
The structure of an IoTPy wrapper is:
wrapper(func, in_streams, out_streams, state=None, **kwargs)
where wrapper is a function from the IoTPy wrapper library or a wrapper function that you write, func is a terminating function, in_streams and out_streams are lists of streams, state is an arbitrary object, and **kwargs is the set of additional keyword arguments, if any, of func. Note that state is optional.
The parameter for some wrappers may be in_stream instead of in_streams, or out_stream instead of out_streams. When the parameter name is plural, as in in_streams, then the corresponding argument must be a list of streams, whereas when the parameter name is singular, as in in_stream, the corresponding argument must be a single stream.
IoTPy Wrapper Library Organization.
Wrappers are in categories based on the numbers of inputs and outputs of the agents that they create.
Single input, single output are in category op for operator. IoTPy/IoTPy/agent_types/op.py
Multiple inputs, single output in merge. IoTPy/IoTPy/agent_types/merge.py
Single input, multiple outputs in split. IoTPy/IoTPy/agent_types/split.py
No inputs, single output in source. IoTPy/IoTPy/agent_types/source.py
No outputs, single input in sink or actuator. IoTPy/IoTPy/agent_types/sink.py
And multiple inputs, multiple outputs in multi. IoTPy/IoTPy/agent_types/merge.py
An example of a wrapper: map_element
Next we use the simplest wrapper - map_element - an instance of the op category, as an example. We consider the stateless case (i.e. the state parameter has its default value, None).
Note: The j-th element of a stream s is referred to as s[j], and the number of elements in s is len(s).
ag = map_element(func=f, in_stream=x, out_stream=y)
creates an agent ag. Assuming that y is not an output stream of any other agent, and y is initialized to the empty stream, this agent ensures that:
len(y) <= len(x)
y[j] = f(x[j]), for all j, where j <= len(x).
The operation of the agent is as follows. While len(x) = len(y), the agent is idle. While len(x) > len(y), the agent appends additional values to stream y. (See theory for a temporal logic specification.)
For example, with
def f(v): return 2*v
the elements of stream y will be double those of the corresponding stream x. So, if x is [0, 1, 2] at some point then from some later point onwards, the first 3 elements of y will be [0, 2, 4].
The statement that creates an agent is used just like any other Python statement. For example, you use standard Python programming to create agents that, in turn, create other agents. Also, exactly as Python allows you to define a more powerful function by embedding other functions, you can create more powerful agents by embedding component agents. All you need to develop streaming applications is Python and the library of IoTPy wrapper functions.
Example: illustrating standard python programming for streams
Here is an example that shows that all you need to create applications consisting of complex networks of agents is standard Python programming.
Assume that we are given functions:
local_monitor(sensor_stream, local_alarm, threshold)
where the parameters sensor_stream, local_alarm and regional_alarm are streams, local_alarms is a list of streams, and where threshold is a float. We are given that calling the function local_monitor creates an agent that reads sensor_stream and appends values to local_alarm depending on the value of threshold. And, we are given that calling the function aggregator creates an agent that reads streams in local_alarms and appends values to regional_alarm.
Let’s construct a function that reads multiple sensor streams in a region and generates regional alarms. We are given a parameter sensor_streams which is a list of streams, where each stream in the list contains a sequence of sensor measurements for a sensor in the region. Let’s write a function, regional_monitor which reads the streams in sensor_streams and appends values to regional_alarm, where the value added (either alarm or no_alarm) depends on threshold.
def regional_monitor(sensor_streams, regional_alarm, threshold): num_sensors = len(sensor_streams) local_alarms = [Stream() for i in range(num_sensors)] local_agents = [ local_monitor(sensor_streams[i], local_alarms[i], threshold) for i in range(num_sensors)] aggregation_agent = aggregator(local_alarms, regional_alarm)