Operator on Streams

IoTPy/example_operators.py

A stream is a list, or array, of arbitrary length which can be modified only by appending values to the tail. This description uses lists; exactly the same concepts apply to arrays. A good way to read about operations on streams is to run the Jupiter notebook: IoTPy/example_operators.ipynb

The recent elements of a stream are stored in main memory while older elements may be stored in slower devices. Associated with a stream s is a list s.recent and integers s.offset and s.stop. The s.stop most recent elements of stream s are in s.recent[0: s.stop], while older elements, s[0: s.offset] are stored on slower devices.

for i < s.stop: s[s.offset + i] = s.recent[i]
len(s) = s.offset + s.stop

Subscribers to a Stream

A set s.subscribers_set is associated with stream s. An element of s.subscribers_set is called a subscriber of stream s. A subscriber of a stream s is a callback function which is called when s is modified.

Associated with each subscriber f to a stream s is an integer s.start[f] where the system guarantees:

0 <= s.start[f] < s.stop

The list s.recent[s.start[f] : s.stop] contains the most recent elements of stream s.

A subscriber f can increase s.start[f] but cannot decrease it. The system may change s.recent so that older items in the stream are moved out of main memory into slower storage. When the system changes s.recent it may change s.start[f] and s.stop; however, it guarantees that the list s.recent[s.start[f] : s.stop] is not modified by the changes. The list s.recent[s.start[f] : s.stop] always contains the most recent elements of stream s.

EXAMPLES OF CALLBACK FUNCTIONS

This section gives examples of callback functions. The goal of this section is to give you examples that will help you create your own callback functions. The examples given are useful in several applications. Examples for specific application areas are given in other pages of this website.

operations on each item of a stream

We will define a class called single_item with parameters in_stream and func, which are a stream and function, respectively. single_item may also have positional arguments args and keyword arguments, kwargs, like many Python functions.

The first parameter of function, func, is the element of a stream. The function may also have additional positional and keywords arguments, args and kwargs, respectively. The call to single_item causes a persistent object to be created which executes func on every item of in_stream.

Example

def f(v): print (2*v)
x = Stream(name='x')
single_item(in_stream=x, func=f)

Function f is applied to each element of stream x. So, 2*x[i] is printed for all i, forever.

Example

def f(v, out_stream, multiple): out_stream.append(v*multiple)
x, y = Stream(name='x'), Stream(name='y')
single_item(in_stream=x, func=f, out_stream=y, multiple=2)

The parameters out_stream and multiple are keyword arguments of function f.

The call to single_item causes a persistent object to be created which executes func on every item of in_stream and so:

for all i < len(x): y[i] = x[i]*multiple

Next let’s look at an implementation of the single_item class.

class single_item(object):
    def __init__(self, in_stream, func, *args, **kwargs):
        self.s = in_stream
        self.func, self.args, self.kwargs = func, args, kwargs
        self.s.subscribe(self.callback)
    def callback(self):
        [self.func(v, *self.args, **self.kwargs) for v in 
            self.s.recent[
               self.s.start[self.callback] : self.s.stop]]
        self.s.start[self.callback] = self.s.stop

The callback function has been executed on every item of the stream in_stream up to s.start[callback]. The callback function is called when one or more items are added to the tail of stream s. The callback function executes func on every item of the list

s.recent[s.start[callback] : s.stop]

and then it sets s.start[callback] to s.stop because it has now finished executing func on every item of the stream up to s.stop.

Operations on Sliding Windows

Sliding windows are specified by a window size and a step size which are positive integers.The sliding windows of a stream s with window size W and step size N are:

for all i: s[N*i : N*i + W]

The callback function is applied to each window of the stream. For example, if the window size is 5 and the step size is 2, the stream is s and the callback function is f then:

f(s[0:5]), f(s[2:7]), f(s[4:9]), f(s[6:11]), ...

is executed.

The first argument of the callback function is a window (a list) and the function may also have additional positional and keyword arguments args and kwargs, respectively.

Example

def callback(window, out_stream):  out_stream.append(sum(window))
x, y = Stream(name='x'), Stream(name='y')
sliding_window(in_stream=x, window_size=5, step_size=2, 
               func=callback, out_stream=y)

In this example, out_stream is a keyword argument of the callback function. The call to sliding_window creates a permanent object which sets:

y[0] = sum(s[0:5])
y[1] = sum(s[2:7])
for all i for which s[2*i+5] exists: y[i] = sum(s[2*i:2*i+5])

Next, let’s design a class that operates on sliding windows.

class sliding_window(object):
    def __init__(self, in_stream, window_size, step_size,
                 func, *args, **kwargs):
         ......
def callback(self):
        while ((self.in_stream.start[self.callback] + 
               self.window_size) <= self.in_stream.stop):
            self.func(
                self.in_stream.recent[
                    # Start of window
                    self.in_stream.start[self.callback] :
                    # End of window
                    (self.in_stream.start[self.callback] + 
                     self.window_size]),
                *self.args, **self.kwargs)
            # Slide window forward
            self.in_stream.start[self.callback] += self.step_size

More examples of callback functions are in the Jupyter notebooks at the GitHub website. The goal of these examples is to help you create your own callback functions.