Sink encapsulators: single input, no output

See IoTPy/IoTPy/agent_types/

A sink encapsulator encapsulates a function which reads each element of its single input stream, as it arrives, and carries out operations on the element and other objects. For example a sink agent may read a stream and change the setting of a thermostat based on the value it read.

A sink agent may interact with an actuator that requires the agent to suspend execution. In that case, implementing the sink agent as a separate thread improves performance. If the sink agent only writes to a printer or an in-memory file then computation does not have to be suspended. The examples given here are for the case where the sink agent's computation is not suspended.

The package includes the following sink functions:

  • sink_element. Operates on a single element of the input stream.

  • sink_list. Operates on a list of elements of the input stream.

  • sink_window. Operates on a sliding window of the input stream.

  • signal_sink. Operates only on additional keyword arguments and does not operate on the input stream. The arrival of a new element on the input stream is a signal for this agent to take a step which only reads and modifies additional keyword arguments. The values of elements in the input streams are irrelevant.

In many cases, you want to copy a stream to a target object such as list, file, queue or buffer, and so the package includes the following functions:

  • stream_to_list. Copies a stream to a list.

  • stream_to_file. Copies a stream to a file.

  • stream_to_queue. Copies a stream to a queue.

  • stream_to_buffer. Copies a stream to a buffer.

In addition to copying an input stream to a target object, you can also optionally apply a function to each element of the stream and append the results of the function calls to the target.

structure of sink: element, list and window

The signature of a sink function is:

     func, in_stream, state=None, name=None, **kwargs)

where sink_function_name is sink_element, sink_list or sink_window.

The types of the arguments of sink_function_name are as follows: func is a function, in_stream is a Stream, and state is an object. sink_function_name does not return any value (or equivalently, returns None). If state is None then func has only one argument --- an element, list or window of in_stream --- and it returns None. If state is not None the function has two arguments --- (1) an element, list, or window of in_stream and (2) the current state --- and it returns a single value: the next state. func may also have additional keyword (kwargs) arguments.Typically func has a side effect such as modifying an actuator or writing to a file. 


sink_element(func, in_stream, state=None, name=None, **kwargs)

func operates on a single element of the input stream, in_stream.

Example of sink_element

Print every value of a stream s. 

def print_value(v): print v
sink_element(func=print_value, in_stream=s)

Example of sink_element with additional keyword argument

We want to print each element of an input stream s on a separate line, with each line beginning with the text string "::: ". 

def print_with_descriptor(v, descriptor):
   print descriptor, v
sink_element(func=print_with_descriptor, in_stream=s, 

Example of sink_element with state

Print, on each line, the index (0, 1, 2, ...) of each element of an input stream s followed by ":" and then the value of the element. Since the call to sink has a parameter called state, this function's computation has state. The state of the computation is the index of the next element to be printed. The initial state is 0 as specified in the call to sink.

# Set up parameters for call to sink
def print_index(v, state):
  print str(state) + ':' + str(v)
  return state+1 # next state
s = Stream('s')

# Call sink with initial state of 0
sink(func=print_index, in_stream=s, state=0)

# A test
s.extend(range(100, 103))
# Prints 0:100, 1:101, 2:102 on separate lines

Example with state and additional keyword argument

This example has a state and an additional keyword argument, delimiter. It prints the same values as the sink in the previous example.

# Set up parameters for call to sink
def print_index(v, state, delimiter):
  print str(state) + delimiter + str(v)
  return state+1 # next state
s = Stream('s')
# Call sink with an initial state of 0 and 
# a keyword argument, delimiter, set to ':'
   func=print_index, in_stream=s, 
   state=0, delimiter=':')


sink_list(func, in_stream, state=None, name=None, 

sink_list is the same as sink_element except that func operates on a list rather than a single element of in_stream.


sink_window(func, in_stream, window_size, step_size=1,
  state=None, name=None, **kwargs)

sink_window is the same as sink_element except that func operates on a moving window (which is a list or array) rather than a single element of in_stream. The moving window is moved forward by step_size at each step.


COPy streams to target objects

We next describe functions that create agents that copy a stream to a target object such as a list, file, queue or ring buffer. The functions are stream_to_list, stream_to_file, stream_to_queue, and stream_to_buffer.


stream_to_list(in_stream, target_list, element_function=None, 
               state=None, **kwargs)

If element_function is None then in_stream is copied to target_list; otherwise, element_function (possibly with state and keyword arguments) is applied to each element of in_stream and the result is copied to target_list

Unlike almost all the other agent-creating functions, the encapsulated function is optional and is called element_function as opposed to func. This is because the default case is to copy the stream directly into the target rather than first apply a function to the elements of the stream.

Example without element_function

# Set up parameters for call to stream_to_list
input_stream = Stream('input stream')
output_list = []
# Call stream_to_list with no function
stream_to_list(input_stream, output_list)
# A test
a_test_list = range(100, 105)
assert output_list == a_test_list


Example with element_function

def square(v): return v*v
stream_to_list(in_stream=input_stream, target_list=output_list, 

test_list = range(5)
assert output_list == [square(v) for v in test_list]


Example with state

def f(element, current_state):
   return element*current_state, current_state+element
stream_to_list(in_stream=input_stream, target_list=output_list, 
               func=f, state=0)

In this example, the initial state is 0. Each call to the function f returns two values: (1) the value --- element*current_state --- to be appended to output_list, and (2) the next state: current_state+element.

For example, if the elements of input_stream are 0, 1, 2, 3, 4, .... then the sequence of states will be 0, 0+0, 0+1, 1+2, 3+3,  6+4, 10+5, .... and the elements of output_list will be 0*0, 1*0, 2*1, 3*3, 4*6, 5*10, ....If output_list is the empty list [] initially, then when the first element of input_stream appears, output_list becomes [0], and when the next element of input_stream appears, output_list becomes [0, 0], and as successive elements of input_stream appear, output_list becomes [0, 0, 2, 9, 24, 50, ...].

Example with additional keyword arguments

This example has no state and has two additional keyword arguments, multiplier and addend.

# Set up parameters for call to stream_to_list
def h(v, multiplier, addend): return v*multiplier+addend
s = Stream()
lst = []

# Call to stream_to_list with keyword arguments
stream_to_list(in_stream=s, target_list=lst, func=h, 
               multiplier=2, addend=100)

# A test
a_test_list = [3, 23, 14]
assert lst == [h(v, 2, 100) for v in a_test_list]


This example has a state and additional keyword arguments, multiplier and addend.

* Set up parameters for the call to stream_to_list
def g(v, state, multiplier, addend):
   return v*multiplier+addend+state, v+state
input_stream = Stream()
output_list = []

# Call stream_to_list with initial state 0 
# and keyword arguments.
   in_stream=input_stream, target_list=output_list, 
   element_function=g, state=0, 
   multiplier=2, addend=100)

# A test
a_test_list = [3, 23, 14]
assert output_list == [106, 149, 154]


stream_to_file(in_stream, filename, element_function=None,

stream_to_file operates in exactly the same way as stream_to_list except that each result of a function call is written to a file as a separate line rather than appended to a stream. The file handle is released after each line is written. The examples below are the file versions of the examples using lists given above.

   in_stream=stream, filename='test_file')
   in_stream=stream, filename='test_file', 
    in_stream=stream, filename='test_file', 
    element_function=f, state=0)
    in_stream=stream, filename='test_file', 
    element_function=h, multiplicand=2, addend=100)
    in_stream=stream, filename='test_file', 
    element_function=g, state=0, multiplicand=2, addend=100)


stream_to_queue(in_stream, queue, element_function=None,
                state=None, **kwargs)

stream_to_queue is identical to stream_to_file except that values returned by element_function are placed in a queue. The examples below are the queue versions of the examples using lists given above.

import Queue
q = Queue.Queue()
   in_stream=stream, queue=q)
   in_stream=stream, queue=q, element_function=square)
   in_stream=stream, queue=q, element_function=f,
    in_stream=stream, queue=q, element_function=h,
    multiplicand=2, addend=100)
    in_stream=stream, queue=q, element_function=g,
    state=0, multiplicand=2, addend=100)

You can also use Queue from the multiprocessing package as in:

import multiprocessing
q = multiprocessing.Queue()

The code repository for all the functions listed here are in


A buffer is a repository of data from streams and other sources. A buffer is usually of a fixed size and so a buffer may discard or compress older data. The function stream_to_buffer copies a stream into a buffer; it does not have a parameter element_function, and so it is used only for direct copying.

stream_to_buffer(in_stream, target_buffer)

For example, you could copy a stream into a buffer and then dynamically plot data from one or more buffers as the contents of the buffers change.


from Buffer import Buffer
#..... code that computes streams s0 and s1

# Create buffers
buffer0 = Buffer(max_size=1000)
buffer1 = Buffer(max_size=1000)

# Specify buffer input
stream_to_buffer(s0, buffer0)
stream_to_buffer(s1, buffer1)

# Plot data from buffers
   buffers=[buffer0, buffer1],