Merge: Multiple-In, 1-Out

Let v be a list of length k and let x be a list of streams where the length of x is also k. The n-th element of stream x[i] is x[i][n].

@merge_e
def f(v): return g(v)
f(in_streams=x, out_stream=y)

The above code ensures that

y[n] = g([x[0][n], … ,x[k-1][n])

For example, the following code ensures that z[n] = w[n] + x[n] + y[n].

@merge_e
def total(v): return sum(v)
total(in_streams=[w,x,y], out_stream=z)

Windows, Keyword Parameters, State, Functional form

You use windows, keyword parameters, state, and functional form with merge exactly in the same way that you do for map. For example, we use the decorator @merge_w for merging streams, window by window, in the same way that we use @merge_e to merge streams, element by element. We use the decorator fmerge_e for the functional form of merge_e, and fmerge_w for the functional form of merge_w in exactly the same way that we use fmap_e for the functional form of map_e. For example,

@fmerge_e
def total(v): return sum(v)
z = total([w,x,y])

has the same effect as the previous example except that stream z is created in the functional form whereas it is passed as a parameter in the procedural form.

Let v be a list and let x be a list of streams where v and x have the same length k. Let the n-th element of stream x[i] be x[i][n]. The following code makes y a stream where

y[n] = g([x[0][n*s : n*s+w], …, x[k-1][n*s : n*s+w]])

In other words, y[n] is the value of g applied to a list of k elements where the elements are the n-th windows into the streams x[0], …, x[k-1].

@merge_w
def f(v): return g(v)
f(in_streams=x, out_stream=y, window_size=w, step_size=s)

For example, the following code sets:

z[j] = x[j*3]+x[j*3+1] + max(y[j*3]+y[j*3+1])
@merge_w
def f(two_windows):
   window_0, window_1 = two_windows
   return sum(window_0) + max(window_1)
f(in_streams=[x,y], out_stream=z, window_size=2, step_size=3)

Nondeterminism. asynchronous Merge.

An asynchronous merge nondeterministically merges multiple input streams into a single stream and applies a specified function to the resultant stream. The merge is fair in the sense that if a value appears on the input stream then that value will eventually appear in the output stream.

Let x be a list of streams. The following code ensures that output stream y is function g applied to each element of a fair merge of the streams in x.

@merge_asynch
def f(v): return g(v)
y = f(x)

For example, the following code fairly merges streams x and y and outputs a stream z and multiplies each element by 2.

@merge_asynch
def merge_double(v): return 2*v
merge_double(in_streams=([x, y], out_stream=z)

For example, if the elements of x are 100, 101, 102, ... and those of y are 1, 2, 3 , ... then two of the many examples of possible values of stream z include [200, 202, 2, 4, 204, 206, 208, 6, ...] or [2, 4, 6, 202, 204, 8, 206, 208, 210,...].

Asynchronous merges are only carried out element by element (and not window by window), and it is only used in the procedural form and not as a function.

Decorating Methods

You can decorate methods of objects as illustrated in the following example.

 class average(object):
   def __init__(self, max_value):
      # Clip elements of the stream greater than max_value
      self.max_value = max_value
      self.count = 0
      self.total = 0.0
   def f(self, v):
      v = min(v, self.max_value)
      self.total += v
      self.count += 1
      return self.total/float(self.count)

c = average(max_value=10)
@fmap_e
def avg(v): return c.f(v)
y = avg(x)<p>Hello, World!</p>

The n-th element of stream y is the average of the first n elements of stream x with elements greater than max_value clipped at max_value.

Merging two streams

Merely for syntactic convenience, and since users often want to merge two streams, you can treat the special case of merging two streams as shown in the following examples that use @fmerge_2e and instead of @fmerge_e. The input to a standard merge is a list whereas the input with the special cases have two parameters. The programs below are equivalent.

@fmerge_2e                      @fmerge_e
def f(x, y):                    def f(pair):
   return 2*x + 3*y                return 2*pair[0] + 3*pair[1]
z = f(x, y)                      z = f([x, y])

Analogously you can use @fmerge_2w instead of @fmerge_w for the special case of merging two streams.

Click here to download examples of @merge_e and @merge_w, or download the examples from GitHub at IoTPy/examples/merge/examples_merge.py.

Next, look at agents that split streams. Each of these agents has a single input stream and multiple output streams. Let’s also look at agents with multiple input and multiple output streams, and sink agents that have a single input stream and no output stream. Split, Multiple Input/Output, Sink.