merge encapsulators:

multiple inputs, single output

See IoTPy/IoTPy/agent_types/merge.py

This section describes several merge encapsulators (such as zip_stream and mix) with multiple inputs and a single output. We use the convention that plural (e.g., in_streams) refers to lists of streams and singular (e.g., out_stream) refers to a single stream.

Get Started

Get started with only zip_stream and mix from merge and their inverses unzip and unmix (also called separate) from split. If you used timed streams then timed_zip and timed_unzip are helpful.

zip_stream

zip_stream(in_streams, out_stream)

where in_streams is a list of streams. out_stream is a single stream. outstream[j] is the tuple (in_stream[0][j], in_stream[1][j], ...) where in_stream[k][j] is the j-th element of the stream in_stream[k].

Example

zip_stream is like zip except that zip_stream operates on  streams rather than lists.

Example of zip:  zip([[0, 10, 20], ["A", "B", "C"]]) is [(0, "A"), (10, "B"), (20, "C"), ...]

Example of zip_stream: If u and v are the streams [0, 10, 20, ...] and ["A", "B", "C", ...] then zip_stream([u, v], w) sets to w to [(0, "A"), (10, "B"), (20, "C"), ...]


mix

mix(in_streams, out_stream)

where in_streams is a list of streams and out_stream is a stream. The elements of out_stream are of the form (index, value) where index is an index into in_streams and value is an element of an in_streams[index].  The elements of the streams in_streams[0], in_streams[1], ... are copied into out_stream in the order in which they are read. mix is a nondeterministic fair merge of its input streams, and so out_stream may be different on different runs of the same program with the same inputs. 

For example, if z = mix([x, y]), and if the elements of x are 10, 11, 12, 13, ... and those of y are "A", "B", "C", "D", ... then z could be (1, "A"), (1, "B"), (0, 10), (1, "C"), (0, 11), (0, 12), (1, "D"), ...  or it could be (0, 10), (1, "A"), (0, 11), (0, 12), (1, "B"), (1, "C"), (1, "D"),  .... or any fair merge of the input streams.


merge_asynch

merge_asynch(func, in_streams, out_stream, 
    state=None, **kwargs)

in_streams is a list of streams, out_stream is a single stream, and func is a function. This agent accepts elements that appear in its input streams in the order in which they appear and tags each element with the index of the stream on which the element appears. For example, if the elements that arrive at the agent, in the order in which they arrive, are: 'a' from in_streams[0], 'b' from in_streams[0], 'A' from in_streams[1], 'c' from in_streams[0], 'B' from in_streams[1],... then the agent creates a stream of pairs --- stream index, element value ---  and applies the function to each pair to produce an output stream: [f(0, 'a'), f(0, 'b'), f(1, 'A'), f(0, 'c'), f(1, 'B'),....].

state and kwargs are as in the agents described earlier.

This agent is nondeterministic: the output, out_stream, may be different on different runs of the same program with the same input.

Example

def fahrenheit_and_celsius(pair):
        index, value = pair
        if index == 0:
            # Value is in Fahrenheit because
            # index is 0.
            # Convert to Celsius.
            return (value - 32.0)/1.8
        else:
            # value is in Celsius
            # Leave the value as is
            return value

x = Stream('input fahrenheit temperatures')
y = Stream('input celsius temperatures')
z = Stream('output celsius temperatures')

merge_asynch(fahrenheit_and_celsius, [x, y], z)

In this example, the elements that are in stream x are assumed to be in Fahrenheit and those in stream y are assumed to be in Celsius. Since the merge_asynch function has a list [x, y] of input streams, elements from stream x are tagged with 0 and those from stream y are tagged with 1.

If the elements that arrive at this agent, in the order that they appear are, 32.0 from x, 50.0 from x, 68.0 from x, 25.0 from y, ... then z will be [0.0, 10.0, 20.0, 25.0, ....]


merge_window

merge_window(
    func, in_streams, out_stream, window_size, step_size,
    state=None, **kwargs)

func is a function on a list of windows with one window for each input stream, and where a window is a list of length window_sizein_streams is a list of input streams; out_stream is a single output stream; window_size and step_size are positive integers; state and kwargs are as in the agents described earlier. func reads a list of windows with one window from each stream in in_streams and returns a single element of out_stream.

Example

def f(two_windows, threshold): 
    # two_windows is a list consisting of two lists
    # threshold is a number
    expected_window, measured_window = two_windows 
    return (abs(sum(measured_window) - sum(expected_window)) >
             threshold)

merge_window(
    func=f,
    in_streams=[expected_stream, measured_stream],
    out_stream=anomalies, 
    window_size=100, 
    step_size=100, 
    threshold=0.1)

In this example expected_stream and measured_stream are partitioned into windows of size 100, and the output stream, anomalies, consists of booleans whose value is True when the sum of the measured window deviates from that of the expected window by more than threshold.


ZIP_map

zip_map(func, in_streams, out_stream, state=None, **kwargs)

The map of the specified function, with its state and keyword arguments (kwargs) is applied to the results of the zip_stream function. Note that func is a function on a list containing an element for each input stream.

EXAMPLE

zip_map(sum, [u, v], w)

If u and v are the streams [0, 10, 20, 30, ...] and [100, 101, 102, 103, ...] then zip_map(sum, [uv], w) makes w the stream [sum([0, 100]), sum([10, 101]), sum([20, 102]), ...] or [100, 111, 122, 133, ...]. 

Example

def f(two_tuple): 
  return two_tuple[0]*10 + two_tuple[1] 
zip_map(f, [u,v], x)

makes x the stream [f([0, 100]), f([10, 101]), f([20, 102]), ...] or [100, 201, 302, 403, ...]

Example with additional keyword argument

def f(lst, multiplier):
   return sum(lst)*multiplier

zip_map(f, [u,v], y, multiplier=2)

If u and v are as in the above example, the y is the stream [200, 222, 244, 266, ...]


zip_map_list

zip_map_list(func, in_streams, out_stream, 
    state=None, **kwargs)

zip_map_list is the same as zip_map except that in zip_map_list, func operates on a list of lists and returns a list whereas in zip_map, func operates on a single list and returns an element. zip_map_list is helpful when working with NumPy arrays as in the following example:

def f(two_lists):
  return numpy.array(two_lists[0]) > numpy.array(two_lists[1]) 
zip_map_list(f, [u,v], w)

(Function f requires that you download NumPy. This example uses NumPy, but of course, you can write agents that don't use NumPy.) In this example, 

f([[3, 5, 7], [8, 4, 1]]) = array([3>8, 5>4, 7>1]).

So:

f([[3, 5, 7], [8, 4, 1]]) = array([False, False, True])

If u is a stream [3, 5, 7, ...] and v is a stream [8, 4, 1, ...] then the call to zip_map_list makes w the stream array [False, False, True, ....]

 


TIMED_ZIP

timed_zip(in_streams, out_stream)

where in_streams is a list of streams and out_stream is a stream.

timed_zip is similar to a zip function. The streams in in_streams must be timed streams. An element of out_stream is (tv) where v is a list with the same size as in_streams. out_stream has an element (T, [V[0], ..., V[N-1]]) when for each j:

  • either the stream in_streams[j] has an element (T, V[j]) or

  • V[j] is None and in_streams[j] has no non-None value with timestamp T.

out_stream has an element with timestamp T if at least one input stream has an element with timestamp T.

EXAMPLE

timed_zip_f([x,y], z)

where

  • x is [(1, "A"), (5, "B"), (9, "C"), (12, "D"), ....], and

  • y is [(5, "a"), (7, "b"), (9, "c"), (10, "d"), ....]

Then z is:

[(1, ["A", None]), (5, ["B", "a"]), (7, [None, "b"]), (9, ["C", "c"]), (10, [None, "d"]), ..

 


blend

blend(func, in_streams, out_stream, state=None, **kwargs)

where in_streams is a list of streams and out_stream is a stream. The specified function, func, possibly with state and kwargs, is applied to each element of a stream in in_streams. When the elements of the streams in in_stream arrive, func is applied to the elements and the results placed on out_stream. Since the elements of the input streams may arrive in different order in different runs of the same program, the output is nondeterministic --- the output may change from one run to the next even if the inputs are the same.

Note that func reads a single element of a single stream; it does not read a list of elements, one for each input stream.

For example, if z = blend(double, [x, y]), and if the elements of x are 100, 101, 102, ... and those of y are 1, 2, 3 , ... then examples of possible values of stream z include [200, 202, 2, 4, 204, 206, 208, 6, ...] or [2, 4, 6, 202, 204, 8, 206, 208, 210,...] where double(v) returns 2*v.

example with additional keyword argument

def double_and_add(v, addend): return 2*v + addend
z = blend(double_and_add, [x,y], addend=10)

With x and y as in the above example, then examples of z include [210, 212, 12, 14, 214, 216, 218, 16, ...] or [12, 14, 16, 212, 214, 18, 216, 218, 220,...]


timed_mix

timed_mix(in_streams, out_stream)

where in_streams is a list of timed streams and out_stream is a stream. Each element of a timed stream is a tuple, list or array whose zeroth element is a timestamp. Successive timestamps on a timed stream are monotone increasing. Timestamps are non-negative.  timed_mix is nondeterministic: different runs of the same program may give different outputs.

Associated with timed_mix is the last timestamp which has been placed on out_stream. Initially out_stream is empty and we treat the last timestamp placed on out_stream as -1. When an element arrives on any input stream, the timestamp of this element is compared with the last timestamp on out_stream. If the timestamp of the incoming element is less than or equal to the last timestamp on out_stream then the incoming element is not copied on to out_stream. If the timestamp of the incoming element is greater than the last timestamp on out_stream then the incoming element is copied on to out_stream. Thus each successive element of out_stream has a higher timestamp than its preceding element.

timed_mix is nondeterministic, just as mix is nondeterministic, because the output stream depends on the order in which elements arrive at the input streams. For example, consider the following statement:

timed_mix([x, y], z)

Suppose the first element that appears at this timed_mix function is an element on stream x, and let this element be [10, 'a'], i.e. the timestamp is 10. This element is copied to out_stream because there are no elements on out_stream, and so we treat the timestamp of the last element on out_stream to be -1. The index 0 is added because this input element arrived on the x stream. So, the output stream z is now: [10, (0, 'a')] which is [timestamp:10, (index:0, element value: 'a')].

Suppose the next element that appears at this timed_mix function is an element on stream y, and let this element be [5, 'A'], i.e. the timestamp is 5. This element is not copied out to out_stream because the timestamp (5) of this element is less than the latest timestamp (10) on out_stream.

If the next element that appears is [15, 'B'] on stream y then this element is copied to out_stream. Thus the elements on out_stream at this point are [[10, (0, 'a')], [15, (1, 'B')]].

Now consider another sequence of arrivals: [5, 'A'] from y, [10, 'a'] from x, and then [15, 'B'] from y. In this case, at this point out_stream is [[5, (1, 'A')], [10, (0, 'a')], [15, (1, 'B')]].

A timed_mix function is used in managing timed streams from sources that may be delayed due to congestion or other reasons. In some cases, discarding tardy data is better than waiting for tardy data to arrive. You can discard tardy data using timed_mix in the following way. timed_mix has two input streams: (1) from the (possibly tardy) sensor and (2) a clock stream that sends periodic null messages. At time n*T, the clock stream sends a message [n*T - D, None], for n = 1, 2, 3, ... where T is the period and D is the maximum acceptable delay. If the timestamp of the last message from the source is greater than or equal to n*T-D then the null message is not copied to the output stream. If, however, the timestamp of the last message from the source is less than n*T-D then the null message is copied to the output stream. So, at time n*T, the timed_mix function does not copy messages with timestamps earlier than n*T - D from the source into the output. Therefore, at time n*T, messages from the source that are delayed by more than D time units are not copied into the output stream.

Code repository

agent_types/merge.py

Tests are in:

tests/sink_test.py

 

next: agent types. split. single input, multiple outputs

previous: agent types. Operator. single input, single output