op: operators. single input, single output

See:

IoTPy/IoTPy/agent_types/op.py

IoTPy/examples/op/

You may prefer to look at the examples files, or you may choose to read these pages coupled with looking at the examples.

The op (operator) encapsulators have the following form:

encapsulator_name(func, in_stream, out_stream, 
     state, name, **kwargs)

where:

  • func is a function applied to elements of the input stream to produce elements of the output stream. This is a terminating function that is encapsulated to create a non-terminating agent.

  • in_stream is the input stream

  • out_stream is the output stream

  • state is an optional argument for func for the case where the function has state

  • kwargs: optional additional keyword arguments for func. We begin by considering the case where these keyword arguments are constants and later consider the case where they are variables.

  • name: optional string which is the name given to the agent. The name can be helpful for debugging.

Contents of this page

The IoTPy package has several encapsulators; these notes only describe the following:

  • map_element

  • filter_element

  • map_list

  • map_window

  • map_window_list

  • timed_window

These notes also describe two objects, _no_value and _multivalue, that are used for appending no value or multiple values to output streams. 

Functional Form

For each wrapper, wrap, that takes the output stream as a parameter and returns an agent, there is a functional construction, wrap_f, that returns the output stream. An agent is created in exactly the same way for both wrap and wrap_f, The difference is that the agent is return in the case of wrap whereas the output stream is returned by wrap_f.


map_element

map_element(func, in_stream, out_stream, state=None, name=None, **kwargs)

see

IoTPy/examples/op/map_element_examples.py

func is a function that either has state (memory) or has no state (no memory). If func has no state then it operates on an element of in_stream and returns an element of out_stream. If func has state then it operates on an element of in_stream and its current state, and returns an element of out_stream and its next state. name is an optional string which is the name given to the agent created by the encapsulator. kwargs are optional keyword arguments for func.

map_element is analogous to the map function in Python; the difference is that map_element operates on streams where map operates on lists.

Functions with no state

Example with no state and with no additional keyword arguments

In this case the arguments of func are a single element of in_stream and possibly additional keyword arguments, and the function returns a single element of the output stream. Example:

ag = map_element(func=f, in_stream=x, out_stream=y)

where x and y are streams and f is a function that has a single argument - an element of x  - and where f returns a single value - an element of y. Here ag is the name of the agent created by the call to map_element. The call to map_element creates an agent that continues execution for ever. This agent makes

y[j] = f(x[j]), for all j where j < len(y)

For instance,

def double(v): return 2*v
ag=map_element(func=double, in_stream=x, out_stream=y)

creates an agent ag that makes

y[j] = 2*x[j], for all j < len(x)

Note the similarity between map_element and map. If x is a list, rather than a stream, then

y = map(f, x)

also sets

y[j] = 2*x[j], for all j < len(x)

Functional Form

The above example in functional form is:

y = map_element_f(double, x)

Example with no state and with additional keyword arguments

def g(v, multiplicand, addend):
  # The first argument, v, of func (i.e. g) is
  # an element of the input stream.
  # multiplicand, addend are constants which
  # are additional keyword arguments of
  # map_element.
  return v*multiplicand + addend
# Create an agent with in_stream, x, and out_stream, y.
map_element(func=g, in_stream=x, out_stream=y, 
  multiplicand=2, addend=10)

This call sets:

y[j] = x[j]*2 + 10, for all j < len(y)

Hereafter, we will assume that the clause, j < len(y), in equations of the above form holds implicitly and merely write:

y[j] = x[j]*2 + 10, for all j

Functional Form

The above example in functional form is:

y = map_element_f(g, x, multiplicand=2, addend=10)

functions with state

In this case, the arguments of func are: (1) an element of the input stream and (2) the current state of the function, and possibly additional arguments. func returns a pair: (1) a single element of the out_stream and (2) the next state of the function.

The call to map_element must have an argument, state, whose value is the initial value of the state of the function. The way that agents with state are distinguished from agents without state is that the call that creates an agent with state has an explicit parameter, state, whereas a call that creates an agent without state has no parameter called state.

Example of map_element with state and no additional keyword arguments

def differences(current_value, previous_value):
 # current_value is an element of the input stream.
 # previous_value is the current state which,
 # in this example, is the previous element
 # of the input stream.
 # The function returns: (1) an element of the output stream 
 # and (2) the next state.
 return current_value-previous_value, current_value
# Create a map_element agent and set state to the initial state.
map_element(func=differences, in_stream=x, out_stream=y, state=0)

This call makes:
y[0] = x[0]
y[j] = x[j] - x[j-1] for 0 < j < len(y)

This is because initially, j = 0, and the state - specified in the map_element call - is 0, and the element of the input stream is x[0]. The state is mapped to the second argument of the function differences; so, initially previous_value is 0. The element of the input stream is mapped to the first argument of the function, and so current_value is x[0]. The first step of the map_element agent calls its parameter func, which is the function differences, and passes the function the arguments x[0] and 0. The function returns a 2-tuple, the first element of which is appended to the output stream and the next element is the new state. So, y[0] gets the value x[0] - 0, and the next state of the map_element agent becomes x[0].

For subsequent steps of the map_element agent, with j > 0, the first argument of differences is the next element of the input stream, i.e. x[j], and the second argument is the current state which is x[j-1].

Functional Form

The above example in functional form is:

y = map_element_f(differences, x, state=0)

Example with state and additional keyword arguments

def h(v, running_sum, multiplicand):
   # v is an element of the input stream.
   # Current state is running_sum.
   # Next state is v + running_sum.
   # The element of the output stream is:
   # v*multiplicand + running_sum.
   # multiplicand is an additional keyword argument 
   # of map_element.
   # Returns (1) output stream element and (2) next state.
   return v*multiplicand + running_sum, v+running_sum

# The initial state of the agent is 0
map_element(func=h, in_stream=x, out_stream=y, state=0, 
            multiplicand=2)

Initially, j = 0, and the first argument of h is x[0], the second argument is the initial state which is the argument of state in map_element, and the values of subsequent arguments are specified by keywords in the call to map_element. So, in the initial call to h, the parameters v, running_sum and multiplicand have the values x[0], 0, and 2 respectively. The function returns a 2-tuple, the first element of which is the next element of the output stream and the second is the next state. Thus

y[0] = x[0] * 2 + 0 and the next state is x[0] + 0. For subsequent steps,

y[j] = x[j] * 2 + x[0] + x[1] + ... + x[j-1]

Using a class to record state and keyword arguments

You may prefer to use a class to implement state and additional arguments of func. The last example can be implemented as follows.

class example(object):
   def __init__(self, multiplicand):
      self.multiplicand = multiplicand
      self.running_sum = 0
      def step(self, v):
         result = v * self.multiplicand + self.running_sum
         self.running_sum += v
         return result
x = Stream()
y = Stream()
eg = example(multiplicand=2)
map_element(func=eg.step, in_stream=x, out_stream=y)

Returning no value

_no_value is an object that indicates that no value is to be appended to the output stream. The following example discards elements of the input stream that are less than or equal to the threshold.

Example of returning no value

def f(v, threshold):
   if v > threshold: return v
   else: return _no_value

map_element(func=f, in_stream=x, out_stream=y, threshold=10)

Note: The embedded function must return a value. An exception is raised if the function returns no value. The else part of the above if statement is essential. If the else part returns None then the object None is appended to the output stream. (This is because you may want to use None as an informational signal in a stream. ) So make sure that the embedded function always returns a value and make the value returned the object _no_value if you want nothing appended to the output stream.


returning multiple values

In some cases, we want an output stream to be a stream of lists (see stream z below). In other cases, we want to open up lists returned by a function and append each element to the output stream (see stream y below).

def g(a_list):
   # The argument a_list is a list
   # The elements of a_list are appended individually
   # to the output stream.
   return _multivalue(a_list)
   
def h(a_list): 
   # a_list is appended as a single element to the
   # output list
   return a_list

map_element(func=g, in_stream=x, out_stream=y)
map_element(func=h, in_stream=x, out_stream=z)

If x is [['hello', 'bonjour'], ['bye', 'adieu', 'adios'],.....] then y will be ['hello', 'bonjour', 'goodbye', 'adieu', 'adios', .....] whereas z will be the same as x.

NumPy and User-Defined Types

Operations on stream arrays are identical to those on streams except that the elements of the stream are NumPy objects. Here are some examples.

x = StreamArray('x')
y = StreamArray('y')
map_element(func=np.sin, in_stream=x, out_stream=y)

Here is an example where the input to an agent is a stream array and its output is a stream which is not a stream array.

x = StreamArray(name='x', dimension=3, dtype=float)
y = Stream()
map_element(func=np.median, in_stream=x, out_stream=y)

If we append x with [1., 2., 3.] then 2. will be appended to y.


filter_element

filter_element is similar to filter in Python except that it operates on a stream rather than an iterable.

filter_element(func, in_stream, out_stream, state=None, name=None, **kwargs)

filter_element is the same as map_element except that func returns a boolean, and values for which func returns true are filtered out.

EXAMPLE WITH NO STATE AND NO ADDITIONAL ARGUMENTS

def f(v): return v > 0
# Create an agent by encapsulating function f
filter_element(func=f, in_stream=x, out_stream=y)

y is a stream containing the elements of stream x for which f returns false; so, y is a stream consisting of the non-positive elements of x. The agent in this example has no state because the call that creates the agent does not have a parameter called state. Also, this agent has no keyword arguments that are used by the encapsulated function f.

example with no state and with an additional argument

def f(v, threshold): 
   # v is an element of the input stream
   # threshold is a constant
   return v > threshold
# Create an agent by encapsulating f.
filter_element(func=f, in_stream=x, out_stream=y, threshold=10)

In the above example, y is the stream consisting of the elements of x that do not exceed 10. The agent created in this example has no state because the call, filter_element(...), that creates the agent does not have a parameter called state. The call that creates the agent has a keyword argument, threshold, which is also a keyword argument of the encapsulated function f.

Example with state and no additional arguments

def f(v, w):
 # w is the current state.
 # v is the current element of the input stream.
 # Returns: (1) a boolean: v <= w
 # If this boolean is true the
 # v does NOT appear in the output.
 # (2) the next state, v.
 # The next state is the current input, v.
 return v <= w, v
# Create an agent with an initial state of 0
filter_element(func=f, in_stream=x, out_stream=y, state=0)

The agent in this example has state because the call that creates the agent has a parameter called state. This agent puts x[0] into y if x[0] is positive (because in this case, the return value of f, is v <= w, which is x[0] <= 0, is false). The agent puts x[j] into y if x[j] > x[j-1], for j > 0.

Example with state and additional arguments

def f(v, w, increase):
  # v is an element of the input stream
  # w is the current state
  # increase is a constant parameter specified in the
  # call that creates the agent
  # return (1) a Boolean and (2) the next state.
  # In this example, the next state is the current input v. 
  return v <= w*increase, v

# Create an agent by encapsulating function f. This agent
# has an initial state of 0, and a parameter, increase, with constant value 1.01
filter_element(func=f, in_stream=x, out_stream=y, state=0, 
 increase=1.01)

This call to filter_element puts x[0] into y if x[0] is positive. The agent puts x[j] into y if x[j] > x[j-1]*1.01, for j > 0. Note that the call that creates this agent has a parameter called state, and so the agent has state. The keyword argument, increase, is also an argument of the encapsulated function f.


map_list

map_list(func, in_stream, out_stream, state=None, name=None, **kwargs)

map_list is the same as map_element except that func is a function from a list to a list rather than from an element to an element. In map_list, func operates on a list which is a slice of the input stream and it returns a list which is a slice of the output stream.

Example of map_list

def g(lst):
 return [v*2 if v%2 else v/2 for v in lst]

map_list(g, x, y)

If the input stream, x, has values [0, 1, 2, 3, 4, ...], then the output stream, y, has values [0, 2, 1, 6, 2, ....]


map_window

map_window(func, in_stream, out_stream, window_size, step_size, state=None, name=None, **kwargs)

The map_window function is similar to the map_element function with three differences:

  1. The map_window function has two additional arguments: window_size and step_size, both positive integers.

  2. func operates on a list of elements of the input stream as opposed to a single element of the input stream. As in the map_element function, map_window returns a single element of the output stream.

  3. func operates on a window and then the window moves forward by step_size.

Note: window_size and step_size must be positive integers.

EXAMPLE with no state and no additional parameters

map_window(func=f, in_stream=x, out_stream=y, 
 window_size=w, step_size=s)

makes:

y[j] = f(x[j*step_size : j*step_size + window_size]) for all j. 

For example,

map_window(sum, x, y, window_size=2, step_size=4)

sets:

 y[0] = x[0]+x[1],
 y[1] = x[4]+x[5],
 y[2] = x[8]+x[10],...

Example with state and no additional parameters

def f(this_list, sum_of_previous_list):
   # this_list is an element of the input stream
   # Current state is sum_of_previous_list
   # Next state will be sum(this_list)
   # return (1) a Boolean and (2) the next state.
   return sum(this_list) > sum_of_previous_list, sum(this_list)
# Create an agent by encapsulating function f.
# The initial state is 0
map_window(func=f, in_stream=x, out_stream=y, 
           state=0, window_size=2, step_size=2)

The call makes:

y[0] = true if x[0]+x[1] > 0 and false otherwise.
For k > 0:
y[k] = true if
 x[2*k]+x[2*k+1] > x[2*k-2] + x[2*k -1] and false otherwise.

example with no state and with additional parameters

def f(this_list, threshold):
   return sum(this_list) > threshold
map_window(f, x, y, window_size=4, step_size=2, threshold=10)

For all j, : y[j] is True if and only if sum(x[2*j : 2*j+4]) > 10.

example with state and with additional parameters

def g(this_list, max_of_sums, alpha):
   # The current window is this_list
   # The current state is max_of_sums
   # max_of_sums is the max over all windows so far
   # of sum(window).
   # alpha is a parameter whose value is specified in
   # the call to map_window.
   # The next state is the max over all past windows
   # and the current window. So, the next state is:
   # max(max_of_sums, sum(this_list))
   return (sum(this_list) > alpha * max_of_sums,
           max(max_of_sums, sum(this_list)))
     
map_window(g, x, y, state=0, window_size=20, step_size=10, 
           alpha=0.5)

In this example, for j > 0, y[j] is True exactly when sum(x[10*j : 10*j+20]) exceeds 0.5 times the max over k = 0, 1,..., j-1 of sum(x[10*k : 10*k+20]). And y[0] is True exactly when sum(x[0:20]) is positive because the initial state is 0.

Example with Stream Array

x = StreamArray('x', dimension=2)
y = StreamArray('y', dimension=2)
t = StreamArray('t', dimension=2)
# f() and ff() differ only in the axis.
def f(input_array):
    return np.mean(input_array, axis=0)
def ff(input_array):
    return np.mean(input_array, axis=1)
map_window(
    func=f, in_stream=x, out_stream=y,
    window_size=2, step_size=2, name='window_map_array_f')
map_window(
    func=ff, in_stream=x, out_stream=t,
    window_size=2, step_size=2, name='window_map_array_ff')

If we extend x with np.array([[1., 5.], [7., 11.]]) then np.array([4., 8.]) will be appended to y and np.array([3., 9.]) will be appended to t. This is because with axis=0, the mean vector over the array is [(1.0 + 7.0)/2.0, (5.0 + 11.0)/2], and with axis=1, the mean vector over the array is [(1.0+5.0)/2.0, (7.0 + 11.0)/2.0].

map_window_list

map_window_list is the same as map_window except that func reads a list --- a slice of the input stream --- and produces a list or tuple which is a slice of the output stream.

Example

def f(lst): 
 average = sum(lst)/float(len(lst))
 return [v - average for v in lst]
map_window_list(f, x, y, window_size=100, step_size=100)

The input stream, x, is evaluated in a sequence of windows, each of size 100, and the output is the deviation of the elements in x from the mean of its window.


timed window

timed_window(func, in_stream, out_stream, window_duration, 
      step_time, state=None, name=None, **kwargs)

A timed_window agent is identical to the map_window agent except that the window size (called window_duration) and step size (called step_time) are in units of time as opposed to the number of elements. The timed_window agent reads a stream that must be a timed stream: every element of the stream must have a timestamp, and timestamps in a stream must be monotonic increasing. Each element of the input stream must be a tuple or list, and the timestamp must be the first member of the tuple or list. The j-th window is a list consisting of all the elements with timestamps t in the interval:

step_time*j <= t < step_time*j + window_duration

For example, consider the timed_window version of the first window example given above:

y = timed_window(sum, x, y, window_duration=2, step_time=4)

The j-th window, in this case, consists of the list of elements of x with timestamps at least 4*j and less than 4*j+2. The output stream is a timed stream and the timestamp of j-th output is the timestamp corresponding to the end of the window: step_time*j + window_duration; in this example, the timestamp of the j-th output is 2*j+4.

For example if stream x is [[1, 100], [3, 250], [5, 400], [5.5, 100], [7, 300], [11.0, 250], ...] then the windows are:

for time window from 0 up to (but not including 2): [[1, 100]]

for time window from 4 up to (but not including 6): [[5, 400], [5.5, 100]]

for time window from 8 up to (but not including 10): []

Therefore, stream y will be [[2.0, 100], [6.0, 500], [10.0, 0], ....].

code repository

agent_types/op.py

Tests are in:

tests/element_test.py

tests/list_test.py

tests/window_test.py

Next: Agent Types. Merge. Multiple input, single output

Previous: Agent Types. overview of agents