NumPy: Stream Arrays

A stream whose elements are NumPy objects is a StreamArray. An element of a StreamArray is a NumPy type. The default type is a float.

Creating a stream array

We introduce the idea with an example. The statement:

s = StreamArray(name='s', dimension=3, dtype=int)

creates a stream s where s[i] is a NumPy array consisting of an unbounded number of rows and 3 (i.e. dimension) elements of type int. An item appended to this stream must be an array consisting of 3 integers; appending such a 3-element array appends a row to the tail of s. Stream s can be extended by an int array consisting of an arbitrary number of rows and 3 columns; doing so adds the array to the tail of s. We describe appending and extending streams later in this section.

The parameters of StreamArray are: name, dimension, dtype, initial_value, and num_in_memory

  • name is optional and is a string. The default is ‘no_name’.

  • dimension is optional and is the dimension of elements of the stream array. The default is 0.

  • dtype is optional and the type of the rows of the stream array. The default is float.

  • initial_value is optional and is the initial value of the stream array. The default is None.

  • num_in_memory is optional and can be ignored for the time being. It is used for memory management.

When dimension is 0

The dimension parameter can be a non-negative integer or a tuple or list. If dimension is 0 then each element of the stream array belongs to type dtype. In this case, think of the stream array as a 1-D array of unbounded length with elements of type dtype. For example:

t = StreamArray()

makes t a stream array where t is effectively an array of unbounded size where each element of t is a float.

When dimension is a positive integer

If dimension is a positive integer then each element of the stream is a 1-D array whose length is dimension. Stream array s, the first example, is an instance of such a dimension. Another example is,

u = StreamArray(name='u', dimension=2, dtype=float)

makes u a stream array called u. Think of u as an array with an unbounded number of rows where each row of u is an array consisting of 2 floats.

When dimension is a tuple or list

Each element of the tuple must be a positive integer. Think of the stream array as having an unbounded number of rows where each row is an N-dimensional array where N is the length of the tuple. The lengths of the N-dimensional array are given by the tuple. For example,

v = StreamArray(dimension=(3,4), dtype=int)

makes v a stream array where each row of v is a 3 x 4 array of integers. For example, an element of v could be the array a:

a = np.array([
        [0, 1, 2, 3],
        [4, 5, 6, 7],
        [8, 9, 10, 11]])

appending a numpy array to a stream array

You can append an array to a stream provided that they have the same dimensions. Here are examples of appending to stream arrays created in the previous paragraph.

Append a singleton array of float to stream array t

t.append(np.array(1.0))

Append a 1-D array of size 3 of floats to stream array s

a = np.zeros(3)
s.append(a)

makes a the NumPy array: np.array([ 0.,  0.,  0.]) and appends the array a to stream array s. At this point, the zeroth row of s is np.array([ 0.,  0.,  0.]).

Append a 1-D array of size 2 of ints to stream array u

u.append(np.array([0, 1])

Append a 3 x 4 array of integers to stream array v

v.append(np.array([
        [0, 1, 2, 3],
        [4, 5, 6, 7],
        [8, 9, 10, 11]]))

extending a stream array

You can extend a stream array by an array consisting of multiple rows provided that the dimensions of rows of the array and the stream are identical.

Extend stream array t by an array of floats

t.extend(np.array([2.0, 3.0]))

The statement is equivalent to:

t.append(np.array(2.0))
t.append(np.array(3.0))

Since t was np.array([1.0]) before the above statements are executed, its value after the statement is np.array([1.0, 2.0, 3.0]).

Extend stream array s by an array of arrays

An example of an array consisting of 2 rows where each row has 3 elements is b:

b = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
s.extend(b)

The above statement is equivalent to:

s.append(np.array([1.0, 2.0, 3.0])
s.append(np.array([4.0, 5.0, 6.0])

Since s is np.array([[0.0, 0.0, 0.0]]) before the above statements are executed, then at this point, its content is:

np.array([[0.0, 0.0, 0.0],
          [1.0, 2.0, 3.0],
          [4.0, 5.0, 6.0]])

Extend stream array u by an array of arrays

u.extend(np.array([[2,3], [4,5], [6,7]]))

Extend stream array v

v.extend(np.array([
   [[12, 13, 14, 15],[16, 17, 18, 19], [20, 21, 22, 23]],
   [[24, 25, 26, 27], [28, 29, 30, 31], [32, 33, 34, 35]]]))

user-defined types

An example of a user-defined type is:

txyz_dtype = np.dtype([('time','int'), ('data', '3float')])

An example of an object, c, of this type is created by:

c = np.array((1, [0.0, 1.0, 2.0]), dtype=txyz_dtype)

Then, c[‘time’] is np.array(1), and c['data'] is np.array([ 0., 1., 2.]

Creating a stream array with user-defined types

y = StreamArray(dimension=0, dtype=txyz_dtype)

creates a stream array, y, whose elements are of type txyz_dtype.

Appending to a stream array with user-defined types

We can append c to the stream array,

y.append(c)

At this point the value of y is a singleton array containing c, i.e. y’s value is

np.array([(1, [0., 1., 2.])], dtype=txyz_dtype))

EXTending a stream array with user-defined types

d = np.array([
   (2, [3., 4., 5.]), (3, [6., 7., 8.])], dtype=txyz_dtype)

creates an array d with two elements of type txyz_dtype. We can extend y with d:

y.extend(d)

and at this point y’s value is:

np.array([
   (1, [0., 1., 2.]), 
   (2, [3., 4., 5.]), 
   (3, [6., 7., 8.])], 
   dtype=txyz_dtype))

You can also create stream arrays with user defined types and arbitrary dimensions.

Examples

Example: Operator on Stream Arrays

def test_plus_operator_with_arrays():
    x = StreamArray(dimension=2, dtype=int)
    y = StreamArray(dimension=2, dtype=int)
    z = x + y
    A = np.arange(6).reshape((3, 2))
    B = np.arange(100, 110).reshape((5, 2))
    x.extend(A)
    y.extend(B)
    run()
    assert np.array_equal(recent_values(z), np.array([
        [100, 102], [104, 106], [108, 110]]))

    C = np.arange(6, 12).reshape((3, 2))
    x.extend(C)
    run()
    assert np.array_equal(recent_values(z), np.array([
        [100, 102], [104, 106], [108, 110],
        [112, 114], [116, 118]]))

In the example, at the first execution of run(), stream-array x is extended by an array with 3 rows and 2 columns, and y is extended by an array with 5 rows and 2 columns. So, z becomes a stream array with 2 columns, and 3 rows. Then, at the next execution of run(), x is extended by another array with 3 rows and 2 columns while y remains unchanged. Now, x has more rows to add to y, and so z becomes an array with 5 rows.

Example: Operation between a Stream and a Scalar

def test_fmap_with_stream_array():
    x = StreamArray(dimension=2, dtype=int)
    @fmap_e
    def g(v): return 2*v
    y = g(x)

    A = np.array([[1, 10], [2, 20], [3, 30]])
    x.extend(A)
    run()
    assert np.array_equal(
        recent_values(y),
        [[2, 20], [4, 40], [6, 60]])

    x.append(np.array([4, 40]))
    run()
    assert np.array_equal(
        recent_values(y),
        [[2, 20], [4, 40], [6, 60], [8, 80]])