QUICK START

Read this page if you want to write programs using IoTPy immediately. For a more comprehensive treatment start with the Overview

The Two IoTPy Goals

  1. Get non-terminating functions that operate on endless streams by reusing terminating functions that operate on conventional data structures such as lists and arrays.

  2. Easily build multithreaded, multicore, distributed applications that connect sensors and social media to the cloud.

This section shows how IoTPy satisfies the first goal. IoTPy has many functions on streams and, in addition, you can write your own. This section describes only a few functions to get you started.

STREAMS

A stream is a list of arbitrary length which can be modified only by appending values to the tail of the list. Later, we will see that a stream can also be implemented treated as a NumPy array to which rows can be appended and which cannot be modified in any other way; See NumPy: Stream Arrays.

s = Stream()

creates a stream s. You can give a stream a name when it is created as in:

s = Stream(name='temperature')

which gives the name ‘temperature’ to the stream. A stream’s name can be useful for debugging but is not used in the code. You can append or extend a stream in the same was as for a list:

s.append(value)
s.extend(a_list)

Standard python operators, such as +, -, *,… on streams

Let op be one of the following Python operators +, -, *, /, //, <, <=, ==, !=, >, >=. Let x and y be streams; then x op y is a stream whose j-th element is x[j] op y[j]. For example, given streams x, y: x + y is a stream whose j-th element is x[j] + y[j], and x - y is a stream whose j-th element is x[j] - y[j].

# x, y are streams
z = x + y
# Creates a stream z where z[n] = x[n] + y[n]

Notational Conventions

The j-th element of a stream s is s[j]. The suffix e to refers to functions that operate on an element of a stream, and w for functions on sliding windows of streams. For example, map_e refers to the map function element by element whereas map_w refers to functions that operate on sliding windows of the stream.

We convert a terminating function f that operates on conventional Python bounded-sized to a non-terminating function that operates on endless streams by decorating f. IoTPy has many decorators. We start with map_e, map_w, and their functional variants fmap_e and fmap_w.

MAP

@map_e
def f(v): return g(v)
f(in_stream=x, out_stream=y)

The undecorated function f terminates and operates on a parameter v that is a conventional Python bounded-size object such as a list or a stream. The decorated function has two parameters, an input stream, in_stream, and an output stream, out_stream. Streams can be endless; the length of a stream is potentially unbounded.

In the above example, the undecorated function f terminates and v is a bounded-size object. Executing the decorated function ensures that y[n] = g(x[n]) for all n less than the length of x, for streams x, y. As more values are appended to x, more values are appended to y, and so the lengths of the streams can get arbitrarily large.

For example, given streams x and y, the decorated function double ensures that y[n] = 2 * x[n].

@map_e
def double(v): return 2*v
double(in_stream=x, out_stream=y)

The first goal of IoTPy is to reuse terminating functions on bounded sized objects - such as the undecorated double and v in the above example - to get non-terminating functions, such as the decorated double, operating on unbounded-sized streams x and y.

State

You can pass a state to a call to a function by including the parameter called state and providing its initial value, as in:

@map_e
def f(v, state): return g(v,state), h(v,state)
f(in_stream=x, out_stream=y, state=initial_state)

Let state[n] be the n-th value of state. Then:

state[0] = initial_state
y[n] = g(x[n], state[n])
state[n+1] = h(x[n], state[n])

For example, the following code ensures state[0] = 0, and state[n+1]=x[n], and y[n] = x[n] - state[n]. Therefore, y[0] = x[0], and y[n] = x[n] - x[n-1].

@map_e
def deltas(input_value, state):
  difference = input_value - state
  next_state = input_value
  return difference, next_state
deltas(in_stream=x, out_stream=y, state=0)

You may prefer a less verbose style as in:

@map_e
def deltas(u, v): return u - v, u
deltas(x, y, state=0)

Note that you must explicitly use the keyword state in the call. The state can be any object other than a stream (or other unbounded-size object).

Keyword Parameters

You can pass an arbitrary number of keyword parameters to a function as shown in the following example which has a keyword parameter called multiplicand. The call ensures y[n] = 3*x[n].

@map_e
def multiply(v, multiplicand): return v*multiplicand
multiply(in_stream=x, out_stream=y, multiplicand=3)

Keyword Parameters and State

You can have both keyword parameters and state as in the following example.

@map_e
def anomalous_change(v, state, max_change):
   delta = v - state
   next_state = v
   next_output = True if delta > max_change else False
return next_output, next_state

Given streams x and y, if you call:

anomalous_change(in_stream=x, out_stream=y, state=0, max_change=10)

Then the initial state, s[0] is 0 and the state s[n+1] after n elements of x have been read is x[n]. The output stream y contains Boolean values where y[n] is True to indicate that the change, x[n+1] - x[n], in x exceeds the keyword parameter max_change.

y[0] = (x[0] > max_change)
y[n+1] = ((x[n+1] - x[n]) > max_change)

Click here to download examples of @map_e and @fmap_e or download it from GitHub at IoTPy/examples/op/examples_map_e.py.

WINDOWS

IoTPy has multiple windowing operations. We begin with windows specified by a fixed size and fixed step size. A window is a list. (Later, we will describe windows that are NumPy arrays.) A window is specified by a window_size and step_size. The length of a window is its window_size. A sliding window is moved forward by its step_size. The j-th window specified of a stream x is:

x[j*step_size : j*step_size + window_size] for j >= 0

For example, if window_size is 3 and step_size is 2 then the sequence of windows is the following sequence of lists:

x[0, 1, 2],  x[2, 3, 4],  x[4, 5, 6], …

Use the decorator @map_w for mapping operations on windows.

@map_w
def f(window): return g(window)
f(in_stream=x, out_stream=y, window_size=w, step_size=s)

This call sets:

y[n] = g(x[n*s : n*s+w])

For example,

@map_w
def sum_window(window): return sum(window)
sum_window(in_stream=x, out_stream=y, window_size=2,      step_size=2)

sets

y[n] = x[2*n] + x[2*n+1]

You can use keywords and state for @map_w in exactly the same way as for @map_e.

Functional Form

The functional form obtained with the decorator @fmap_e can be more convenient in some cases (though not more powerful) than the relational form obtained by @map_e, as illustrated in the following example.

@fmap_e
def f(v): return g(v)
y=f(x)

has the same effect as

@map_e
def f(v): return g(v)
f(in_stream=x, out_stream=y)

with one difference: In the functional form with @fmap_e the output stream is created by the call to f whereas in the relational form with @map_e , the output stream must be declared before the call to f and is passed as a parameter to f. The functional form gives you the convenience of using functional notation such as:

z = f(g(x,y) * h(w,x,y))

where w, x, y, z are streams.

You can use keyword parameters and state with the functional form in the usual way.

Functional Form with Windows

The decorator @fmap_w is the functional form of @map_w in exactly the same way that @fmap_e is the functional form of @map_e.

@fmap_w
def f(window): return g(window)
y = f(x, window_size=w, step_size=s)

has the same effect as:

@map_w
def f(window): return g(window)
f(in_stream=x, out_stream=y, window_size=w, step_size=s)

except that the output stream y is created by the functional form whereas it must be created earlier and passed as a parameter in the relational form.

Click here to download examples of @map_w and @fmap_w or download the examples from GitHub at IoTPy/examples/op/examples_map_w.py. Another simple example is in IoTPy/examples/signal_processing_examples/window_dot_product.py.

MERGE

Let v be a list of length k and let x be a list of streams where the length of x is also k. The n-th element of stream x[i] is x[i][n].

@merge_e
def f(v): return g(v)
f(in_streams=x, out_stream=y)

The above code ensures that

y[n] = g([x[0][n], … ,x[k-1][n])

For example, the following code ensures that z[n] = w[n] + x[n] + y[n].

@merge_e
def total(v): return sum(v)
total(in_streams=[w,x,y], out_stream=z)

Windows, Keyword Parameters, State, Functional form

You use windows, keyword parameters, state, and functional form with merge exactly in the same way that you do for map. For example, we use the decorator @merge_w for merging streams, window by window, in the same way that we use @merge_e to merge streams, element by element. We use the decorator fmerge_e for the functional form of merge_e, and fmerge_w for the functional form of merge_w in exactly the same way that we use fmap_e for the functional form of map_e. For example,

@fmerge_e
def total(v): return sum(v)
z = total([w,x,y])

has the same effect as the previous example except that stream z is created in the functional form whereas it is passed as a parameter in the relational form.

Let v be a list and let x be a list of streams where v and x have the same length k. Let the n-th element of stream x[i] be x[i][n]. The following code makes y a stream where

y[n] = g([x[0][n*s : n*s+w], …, x[k-1][n*s : n*s+w]])

In other words, y[n] is the value of g applied to a list of k elements where the elements are the n-th windows into the streams x[0], …, x[k-1].

@merge_w
def f(v): return g(v)
f(in_streams=x, out_stream=y, window_size=w, step_size=s)

For example, the following code sets:

z[j] = x[j*3]+x[j*3+1] + max(y[j*3]+y[j*3+1])
@merge_w
def f(two_windows):
   window_0, window_1 = two_windows
   return sum(window_0) + max(window_1)
f(in_streams=[x,y], out_stream=z, window_size=2, step_size=3)

Nondeterminism. asynchronous Merge.

An asynchronous merge nondeterministically merges multiple input streams into a single stream and applies a specified function to the resultant stream. The merge is fair in the sense that if a value appears on the input stream then that value will eventually appear in the output stream.

Let x be a list of streams. The following code ensures that output stream y is function g applied to each element of a fair merge of the streams in x.

@merge_asynch
def f(v): return g(v)
y = f(x)

For example, the following code fairly merges streams x and y and outputs a stream z and multiplies each element by 2.

@merge_asynch
def merge_double(v): return 2*v
merge_double(in_streams=([x, y], out_stream=z)

For example, if the elements of x are 100, 101, 102, ... and those of y are 1, 2, 3 , ... then two of the many examples of possible values of stream z include [200, 202, 2, 4, 204, 206, 208, 6, ...] or [2, 4, 6, 202, 204, 8, 206, 208, 210,...].

Asynchronous merges are only carried out element by element (and not window by window), and it is only used in the relational form and not as a function.

Decorating Methods

You can decorate methods of objects as illustrated in the following example.

class average(object):
   def __init__(self, max_value):
      # Clip elements of the stream greater than max_value
      self.max_value = max_value
      self.count = 0
      self.total = 0.0
   def f(self, v):
      v = min(v, self.max_value)
      self.total += v
      self.count += 1
      return self.total/float(self.count)

c = average(max_value=10)
@fmap_e
def avg(v): return c.f(v)
y = avg(x)

The n-th element of stream y is the average of the first n elements of stream x with elements greater than max_value clipped at max_value.

Merging two streams

Merely for syntactic convenience, and since users often want to merge two streams, you can treat the special case of merging two streams as shown in the following examples that use @fmerge_2e and instead of @fmerge_e. The input to a standard merge is a list whereas the input with the special cases have two parameters. The programs below are equivalent.

@fmerge_2e                      @fmerge_e
def f(x, y):                    def f(pair):
   return 2*x + 3*y                return 2*pair[0] + 3*pair[1]
z = f(x, y)                      z = f([x, y])

Analogously you can use @fmerge_2w instead of @fmerge_w for the special case of merging two streams.

Click here to download examples of @merge_e and @merge_w, or download the examples from GitHub at IoTPy/examples/merge/examples_merge.py.

Next, look at another example.