Streams

These initial sections deal with the first goal: Get non-terminating functions that operate on endless streams by reusing terminating functions that operate on conventional data structures such as lists. IoTPy has many functions and, in addition, you can write your own. These section describe a few functions to get you started.

A stream is a list of arbitrary length which can be modified only by appending values to the tail of the list. A stream can also be implemented treated as a NumPy array to which rows can be appended and which cannot be modified in any other way; See NumPy: Stream Arrays. The value of a stream, at any point, is a list or array.

Stream creation

s = Stream(name=‘temperature’, initial_value=[20, 21, 20])

creates a stream with the specified name and initial value. Both the name and initial value can be omitted.

Appending and extending a stream You can append or extend a stream in the same way as for a list:

s.append(value)
s.extend(a_list)

Standard python operators, such as +, -, *,… on streams

Let op be one of the following Python operators +, -, *, /, //, <, <=, ==, !=, >, >=. Let x and y be streams; then x op y is a stream whose j-th element is x[j] op y[j]. For example, given streams x, y: x + y is a stream whose j-th element is x[j] + y[j], and x - y is a stream whose j-th element is x[j] - y[j].

# x, y are streams
z = x + y
# Creates a stream z where z[n] = x[n] + y[n]

Operations between Streams and scalars

Functions

In IoTPy the functions f_add, f_mul, f_sub, …. correspond to the Python magic operators __add__, __mul__, __sub__, … respectively. For example:

y = f_add(in_stream=x, arg=2)

where x is a stream, creates a stream y where y[j] = x[j] + 2. Likewise,

y = f_mul(x, 2)
z = f_sub(x, 2)

creates streams y, z where y[j] = x[j] * 2 and z[j] = x[j] - 2. You can use similar functions for the other magic operators.

Procedures

In some cases, a procedural (or relational) form is preferable to the functional form. We use the prefix r for procedures and f for functions. With x and y declared as streams:

r_add(in_stream=x, out_stream=y, arg=2)

creates a procedure which does the following: When a value, v, is appended to stream x, the procedure appends v+2 to stream y. Let Y[n] be the n-th value appended to stream y by this procedure.

Y[n] = x[n] + 2

The other magic operators have functional and procedural representations for streams. For example, r_mul and r_sub correspond to f_mul and f_sub.

A difference between functions and procedures — between f_add and r_add, for example — is that stream y is created by the call to f_add whereas it is declared before the call to r_add. (For a more detailed description of procedures, see procedures and nondeterminism.)

The folder, IoTPy/IoTPy/helpful_functions, contains several functions and procedures in addition to the magic operators.

Examples

Example of a stream operator

def test_plus_operator():
    x = Stream()
    y = Stream()
    z = x + y
    
    x.extend(list(range(3)))
    y.extend(list(range(100, 105)))
    run()
    assert recent_values(z) == [
        100, 102, 104]

    x.extend(list(range(3, 7)))
    run()
    assert recent_values(z) == [
        100, 102, 104, 106, 108]

    run()
    assert recent_values(z) == [
        100, 102, 104, 106, 108]

The function run() causes the statements above it to be executed. The function is described in the section, Testing an Agent.

recent_values(z), where z is a stream, prints the most recent values of the stream. A stream can be arbitrarily large. The function print_stream(z) will execute for ever printing each successive element of stream z on to standard output; so, it’s more convenient to print the most recent values in these small examples.

When run() is first executed the streams x and y contain [0, 1, 2] and [100, 101, 102, 103, 104] respectively; so z is a stream which contains the element-wise sum: [100, 102, 104]. The length of z is the smaller of the lengths of x and y.

When run() is next executed, x contains [0, 1, 2, 3, 4, 5, 6], while y remains unchanged. Again, z contains the element-wise sum, and the length of z is the length of y. In the following call to run, neither x nor y has changed and so z remains unchanged.

Example: Operation between A stream and A scalar

def test_f_add():
    x = Stream()
    K = 5
    y = f_add(x, K)

    x.extend(list(range(3)))
    run()
    assert recent_values(y) == [5, 6, 7]

The statement y = f_add(x, K) creates a stream y, and the elements of the stream are the sum of K and the corresponding elements of x. Later, will discuss issues such as what happens if K changes while f_add is being executed.

Next, look at a related data structure: StreamArray.