This section describes stream programs for filters. We begin with FIR and IIR (Finite and Infinite Impulse Response) bandpass filters. We first describe the code using windowing agents (map_window and merge_window). These examples illustrate the use of stream arrays and NumPy.

Later, to illustrate the use of classes to store historical data, we give the program using the map_element agent. Using windows gives shorter, more elegant code than code that uses map_element and which reads only one element of an input stream at a time. The code using map_element is provided merely as an illustration.

The code for this section was written by Deepak Narayanan, IIT Gandhinagar, and Mani Chandy, Caltech.

BANDPASS FIR FILTER OF STREAMING DATA

You can download this code by clicking on IoTPy/examples/signal_processing_examples/dsp_filters.py.

Let y be the output stream and x the input stream. The formula for a Finite Impulse Response (FIR) filter is found in https://en.wikipedia.org/wiki/Finite_impulse_response and is:

y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]

where b is a parameter of the filter and its length is M+1. So, y[n] is the dot product of a window into x with the reverse of b.

def bandpass_FIR(in_stream, out_stream, b):
   reverse_b = reverse_array(b)
   window_dot_product(
      in_stream, out_stream, multiplicand_vector=reverse_b)

out_stream will be a filtered version of in_stream based on FIR parameter b.

BANDPASS IIR FILTER OF STREAMING DATA

You can download this code by clicking on: IoTPy/examples/signal_processing_examples/dsp_filters.py.

The formula for an Infinite Impulse Response (IIR) filter is found in https://en.wikipedia.org/wiki/Infinite_impulse_response and is:

y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[P]*x[n-P] -
   (a[1]*y[n-1] + a[2]*y[n-2] + ... + a[Q]*y[n-Q]

assuming that a[0] is 1, and where b and a are parameters of the filter, and P and Q are their respective lengths. For simplicity, we set M to the larger of P and Q, and we pad a or b with zeros, as necessary so that both vectors are of length M.

We implement this equation by feeding the output back to the input. The agent is merge_window with two input streams, the in_stream, x, and the output stream, y. The function to compute y[n] is the dot product of the reverse of b and the x-window minus the dot product of the reverse of a and the y-window.

Bandpass IIR Filter with feedback of the output stream

We initialize both the input and output streams with M zeros so that the merge_window agent starts with two windows of zero values.

def bandpass_IIR(in_stream, out_stream, b, a):
    def f(windows, b, a):
        x_window, y_window = windows
        return np.dot(b, x_window) - np.dot(a, y_window[1:])
    reverse_b = reverse_array(b)
    reverse_a = reverse_array(a[1:])
    M = len(b)
    out_stream.extend(np.zeros(M))
    in_stream.extend(np.zeros(M))
    # Create the agent.
    merge_window(
        func=f, in_streams=[in_stream, out_stream],
        out_stream=out_stream,
        window_size=M, step_size=1, b=reverse_b, a=reverse_a)

EXAMPLES OF CLASSES THAT STORE HISTORICAL DATA

Next we give examples of code in which the historical stream data are saved as attributes of classes. These are merely presented as illustrations of programs that use classes to store historical data from streams. The window operations are simpler.

class BP_IIR(object):
    """
    Bandpass IIR Filter

    Parameters
    ----------
    a, b: list of float
      Parameters that define an IIR filter

    Attributes
    ----------
    x, y: array of float
      Local variables of IIR calculations.

    """
    def __init__(self, a, b):
        assert len(b) == len(a)
        self.b = np.array(b)
        self.a = np.array(a)
        self.N = len(a)
        self.x = np.zeros(self.N)
        self.y = np.zeros(self.N)

    def filter_sample(self, sample):
        """
        This is the standard IIR calculation.
        Parameters
        ----------
        sample: float or int
          The next element of the stream.
        """
        # Shift x and y to the right by 1
        self.x[1:] = self.x[:- 1]
        self.y[1:] = self.y[:-1]
        # Update x[0] and y[0]
        self.x[0] = sample
        self.y[0] = self.a[0] * self.x[0]
        self.y[0] += sum(self.a[1:]*self.x[1:] - self.b[1:]*self.y[1:])
        return self.y[0]

    def filter_stream(self, in_stream, out_stream):
        """
        Filters the input stream to get the output stream
        using filter_sample().

        """
        map_element(self.filter_sample, in_stream, out_stream)

The filter_sample() code is a straightforward implementation of the equation for the operation of a bandpass filter when a new sample arrives on the stream. The filter_stream() function uses the code to obtain a function that filters its input stream to produce a filtered output stream.

Next, we define a function with parameters that include the low cutoff and high cutoff frequencies and the order of the bandpass filter, and which filters an input stream to produce an output stream. This function uses the butter_bandpass function from scipy.signal to get the parameters b, a used in the filter.

def bandpass_filter_stream(in_stream, out_stream, lowcut, highcut, fs, order):
    """
    Parameters
    ----------
    in_stream, out_stream: Stream
       The input and output streams of the agent
    low_cut, highcut: int or float
       The lower and upper frequencies of the bandpass filter.
    fs: int or float
       The sample rate in number per second.
    order: int, positive
        The order of the filter.

    """
    # butter_bandpass is imported from scipy.sigal
    b, a = butter_bandpass(lowcut, highcut, fs, order)
    bp = BP_IIR(b, a)
    bp.filter_stream(in_stream, out_stream)