SIMPLE EXAMPLES

This page has a few simple examples. These programs consist of a single thread in a single process. Some of the examples are used in applications such as detecting gunshots and detecting extreme shaking in earthquakes. These examples show how Python and NumPy functions, from standard libraries. are decorated to get functions that create persistent agents that operate on streams that may be unending.

Example: Evaluate Polynomial on elements of a stream

We will create an agent, evaluate_polynomial, with a single input stream x and a single output stream y, and a parameter, polynomial, where the elements of both streams are floats, and y[j] = f(x[j]) where f is a polynomial specified by a list of its parameters. For example, if the polynomial is specified as [2.0, 4.0, 3.0] then f(2.0) = 2.0*2.0**2 + 4.0*2**1 + 3.0*2**0.

@map_e
def evaluate_polynomial(number, polynomial):
    return np.polyval(polynomial, number)

The function np.polyval(polynomial, number) returns the polynomial evaluated at number.

The undecorated function, evaluate_polynomial, has two parameters: number and polynomial where number is a float and polynomial is a list. The decorated function has three parameters, in_stream, out_stream, and polynomial. Each element of in_stream is passed to the undecorated function, and the result returned by the function is appended to out_stream. The parameter, polynomial, is a keyword argument of both the undecorated and decorated function.

We can test the agent (the decorated function) using the following function.

def test_evaluate_polynomial():
    # Declare streams
    x = StreamArray('x')
    y = StreamArray('y')
    # Create agent
    evaluate_polynomial(x, y, polynomial=[1, 0, 1])
    # Put data in the input stream and run
    x.extend(np.array([1.0, 4.0, 3.0, 0.0]))
    run()
    # Check the output stream.
    assert np.array_equal(
        recent_values(y), np.array([2.0, 17.0, 10.0, 1.0]))

The undecorated function, evaluate_polynomial, has two parameters: number and polynomial where number is a float and polynomial is a list. The decorated function has three parameters, in_stream, out_stream, and polynomial. Each element of in_stream is passed to the undecorated function, and the result returned by the function is appended to out_stream. The parameter, polynomial, is a keyword argument of both the undecorated and decorated function.

Example: Subtract the mean in a stream

@map_w
def subtract_mean(window):
    return window[-1] - np.mean(window)

The function subtract_mean has a single argument, window, which is an array or a list. It returns a single float which is the last element of the window minus its mean. When the function is decorated it becomes an agent which is created with the call:

subtract_mean(in_stream, out_stream, window_size, step_size)

The window parameter of the undecorated subtract_mean is a window into the stream, in_stream, of the decorated subtract_mean. The result returned by the undecorated subtract_mean is an element appended to the stream, out_stream, of the decorated subtract_mean. The window size and step size are required parameters for a sliding-window agent.

We test the agent with the following program:

def test_subtract_mean():
    x = StreamArray('x')
    y = StreamArray('y')
    subtract_mean(x, y, window_size=3, step_size=1)
    x.extend(np.array([1.0, 2.0, 0.0, 4.0, 2.0,
                       0.0, 1.0, 2.0, 0.0]))
    run()
    assert (np.array_equal(
        recent_values(y),
        np.array([-1.0,  2.0,  0.0, -2.0,  0.0,  1.0, -1.0])))

In the test, we extended stream array x with the sequence:

[1.0, 2.0, 0.0, 4.0, 2.0, 0.0, 1.0, 2.0, 0.0]

The windows of size 3, with a step size of 1 into this stream are:

[1.0, 2.0, 0.0], [2.0, 0.0, 4.0], [0.0, 4.0, 2.0], ….

and the means of these windows are:

[1.0, 2.0, 2.0, …]

Subtracting the mean of each window from the last element of each window gives the stream:

[-1.0, 2.0, 0.0, ….]

which is the value of the stream y as checked in the assert statement.

If the window is large, subtracting the mean gives the deviation from the long-term average. For example, if the stream contains acceleration measurements from an accelerometer, then subtracting the mean removes gravity and other long-term effects.

Setting initial values of a stream array

Compare the lengths of the input and output stream in the previous example on subtracting the mean. The first element of the output stream corresponds to the first window of the input stream, and so the output stream is shorter than the input stream by the length of the window - 1. We can make the output stream the same length as the input stream by specifying initial values of the output stream, y, as in the following statement.

y = StreamArray(name='y', initial_value=np.zeros(window_size-1))

The lengths of x and y are now equal as we see:

x = [1.0, 2.0, 0.0, 4.0, 2.0, 0.0, 1.0, 2.0, 0.0]

and

y = [0.0, 0.0, -1.0, 2.0, 0.0, -2.0, 0.0, 1.0, -1.0]

incremental computation of streams

Successive windows overlap, and so summing each successive window is less efficient than modifying the sum of the previous window to get the sum of the current window. Incremental computation is efficient only when the window sizes are large. We will, however, use the example given earlier, in which the stream size is only 3 because calculations with small sizes are easier.

Consider the sequence of windows:

[1.0, 2.0, 0.0], [2.0, 0.0, 4.0], [0.0, 4.0, 2.0]

We have to compute the sum of the very first window directly (i.e. not incrementally) because it has no previous window. We can compute the sums of succeeding windows incrementally. Comparing the first two windows, we see that the earliest element (1.0) of the first window drops out, and the last element of the next window (4.0) enters as the window slides forward. So, the sum of the current window can be obtained from that of the previous window by subtracting the element that drops out and adding the element that enters the window. This computation is carried out in the following class.

class Incremental(object):
    def __init__(self):
        self.starting = True
        self.total = 0.0
        self.leaving_window = 0.0
    def subtract_mean(self, window):
        if self.starting:
            self.starting = False
            self.total = np.sum(window)
            self.leaving_window = window[0]
        else:
            self.total += window[-1] - self.leaving_window
            self.leaving_window = window[0]
        return window[-1] - self.total/float(len(window))

We can test this program by first creating an object of the class and then using a decorator to create an agent.

x = StreamArray('x')
    y = StreamArray('y')
    incremental = Incremental()
    @map_w
    def incremental_subtract_mean(window):
        return incremental.subtract_mean(window)
    
    incremental_subtract_mean(x, y, window_size=3, step_size=1)
    x.extend(np.array([1.0, 2.0, 0.0, 4.0, 2.0,
                       0.0, 1.0, 2.0, 0.0]))
    run()
    assert (np.array_equal(
        recent_values(y),
        np.array([-1.0,  2.0,  0.0, -2.0,  0.0,  1.0, -1.0])))

Computing the magnitudes of vectors of streams

An accelerometer may produce streams with measurements of acceleration in x, y, z direction and we may want to compute a stream with the overall magnitudes of the accelerations. The following program does so.

@merge_e 
def magnitude_of_vector(coordinates):
    return np.linalg.norm(coordinates)

The function np.linalg.norm returns the magnitude of the vector with the specified coordinates. The function has a single parameter which is a list or an array of floats, and it returns a single float. The decorated function has two parameters, in_streams and out_stream which are a list of streams and a single stream, respectively. The argument, coordinates, of the undecorated function corresponds to the list of the j-th elements of each stream in in_streams; the value returned corresponds to the j-th element of out_stream.

We can test the function with the following program.

def test_magnitude_of_vector():
    # Declare streams
    x = StreamArray('x')
    y = StreamArray('y')
    z = StreamArray('z')
    magnitude_stream = StreamArray('magnitudes')
    # Declare agent
    magnitude_of_vector([x,y,z], magnitude_stream)
    # Put data in streams and run
    x.extend(np.array([1.0, 4.0, 3.0]))
    y.extend(np.array([2.0, 4.0, 0.0]))
    z.extend(np.array([2.0, 2.0, 4.0]))
    run()
    assert (np.array_equal(recent_values(magnitude_stream),
                           np.array([3.0, 6.0, 5.0])))

Times of Crossing thresholds

Let us create an agent that determines the times at which an input stream crosses a given threshold. For an input stream x we want an agent to generate an output stream y whose elements are integers and are the indexes, j, at which:

|x[j]| > threshold

For example, if an input stream x contains [1.0, 2.0, 4.0, 3.0, 1.5, 5.0, 10.0, -2.0, -3.0], and the threshold is 2.5, then the output stream y should be [2 , 3 , 5, 6, 8]. The following function creates the agent with this specification.

@map_e
def time_of_crossing_threshold_f(value, state, threshold):
    signal = state if abs(value) > threshold else _no_value
    state += 1
    return (signal, state)

state is the number of elements of the input stream processed so far. Initially state is 0. To call the decorated function we have to pass the initial state of 0 as in:

time_of_crossing_threshold_f(
    in_stream, out_stream, state=0, threshold=threshold)

We can hide the state as shown in the following call.

def time_of_crossing_threshold(in_stream, out_stream, threshold):
    time_of_crossing_threshold_f(
        in_stream, out_stream, state=0, threshold=threshold)

We can test the function with the following program.

def test_time_of_crossing_threshold():
    # Declare streams
    x = StreamArray('x')
    y = StreamArray('y', dtype='int')
    # Declare agent
    time_of_crossing_threshold(x, y, threshold=2.5)
    # Put data in streams and run
    x.extend(np.array([1.0, 2.0, 4.0, 3.0, 1.5, 5.0, 10.0, -2.0, -3.0]))
    run()
    assert np.array_equal(recent_values(y),
                          np.array([2, 3, 5, 6, 8]))

The undecorated function has three parameters, value, state, and threshold. The call to the decorated function has parameters in_stream, out_stream, threshold and state, and since this call has an explicit keyword parameter, state, the undecorated function represents a transition with state. And so, the undecorated function returns both the element that is appended to the output and the next state.

On the j-th step of the agent, the j-th value of the input stream of the agent is passed to the undecorated function, as the parameter value, and the state at the end of the (j-1)th call is passed as the state. The value returned is the j-th element of the output stream of the agent. The keyword parameter, threshold, in the call to the decorated function is the same as that in the undecorated function.

Throttling Stream Rates: Quenching

You may want to reduce the stream rate in some applications. Next we discuss one of the many ways of throttling stream rates. Threat-detection alarms may go off repeatedly in applications that detect threats. Continuous alarms may not add value to the application. For example, a sensor in a fire-detection application may send signals that it detects smoke many times per minute. In the Community Seismic Network, streams of data from sensors are “quenched”; after an alarm is sent, succeeding alarms are discarded for a quench period. For example, if alarms are generated at times [2, 3, 5, 6, 8], and the quench period is 2 then the alarm at 3 will be deleted because it is within the quench period of the alarm at 2; likewise the alarm at 5 is deleted, and the resulting quenched alarms are [2, 5, 8]. A function for quenching is given next. This function is a variant of time_of_crossing_threshold_f because it quenches the reported times.

@map_e
def quenched_time_of_crossing_threshold_f(
        value, state, threshold, quench_duration):
    current_time, last_quench_time = state
    if ((current_time < last_quench_time + quench_duration) or
        (abs(value) <= threshold)):
        signal = _no_value
    else:
        signal = current_time
        last_quench_time = current_time
    next_state = (current_time+1, last_quench_time)
    return signal, next_state

In this function, threshold and quench_duration are constants. The state is a pair of times (current_time, last_quench_time) where current_time is the number of times that the function has been called, and last_quench_time is the time at which the most recent quench was started. Signals within a quench period are ignored because the function returns _no_value.

You can create an agent that detects times at which a stream crosses thresholds and then quenches reported detections by calling:

quenched_time_of_crossing_threshold_f(
     in_stream, out_stream, state=(0,0),
     threshold=threshold, quench_duration=quench_duration )

The value parameter in the undecorated function is an element of in_stream and the result of the function is an element of out_stream. The other keyword parameters are passed in the usual way. You can specify a function that does not specifying the state exactly as in the previous example:

def test_quenched_time_of_crossing_threshold():
    # Declare streams
    x = StreamArray('x')
    y = StreamArray('y', dtype='int')
    # Declare agent
    quenched_time_of_crossing_threshold(x, y, threshold=2.5, quench_duration=2)
    # Put data in streams and run
    x.extend(np.array([1.0, 2.0, 4.0, 3.0, 1.5, 5.0, 10.0, -2.0, -3.0]))
    run()
    assert np.array_equal(recent_values(y),
                          np.array([2, 5, 8]))

Here is an example of testing this function

def test_quenched_time_of_crossing_threshold():
    # Declare streams
    x = StreamArray('x')
    y = StreamArray('y', dtype='int')
    # Declare agent
    quenched_time_of_crossing_threshold(x, y, threshold=2.5, quench_duration=2)
    # Put data in streams and run
    x.extend(np.array([1.0, 2.0, 4.0, 3.0, 1.5, 5.0, 10.0, -2.0, -3.0]))
    run()
    assert np.array_equal(recent_values(y),
                          np.array([2, 5, 8]))