This section describes stream programs for filters. We begin with FIR and IIR (Finite and Infinite Impulse Response) bandpass filters. We first describe the code using windowing agents (*map_window* and *merge_window *or their decorators @*map_w *and @*merge_w*). These examples illustrate the use of stream arrays and NumPy. They also show how IoTPy allows you to reuse terminating functions from popular libraries to build non-terminating functions on endless streams.

Later, to illustrate the use of classes to store historical data, we give the program using the *map_element* agent or the @*map_e *decorator. Using windows gives shorter, more elegant code than code that uses *map_element* and which reads only one element of an input stream at a time. The code using *map_element* is provided merely as an illustration.

The code for this section was written by Deepak Narayanan and Mani Chandy, Caltech.

**BANDPASS FIR FILTER OF STREAMING DATA**

You can download this code by clicking on IoTPy/examples/signal_processing_examples/dsp_filters.py*. *

Let *y* be the output stream and *x* the input stream. The formula for a Finite Impulse Response (FIR) filter is found in https://en.wikipedia.org/wiki/Finite_impulse_response and is:

`y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]`

where *b* is a parameter of the filter and its length is *M*+1. So, *y*[*n*] is the dot product of a window into *x* with the reverse of *b*.

```
def bandpass_FIR(in_stream, b):
return dot_product(in_stream, reverse(b))
```

*dot_product *returns the dot product of the second argument, *reverse*(*b*)*, *on sliding windows of *in_stream. *The code for *dot_product *is:

def dot_product(in_stream, vector): @fmap_w def f(window, vector): return np.dot(window, vector) return f(in_stream, window_size=len(vector), step_size=1, vector=vector)

**BANDPASS IIR FILTER OF STREAMING DATA**

You can download this code by clicking on: IoTPy/examples/signal_processing_examples/dsp_filters.py*. *

The formula for an Infinite Impulse Response (IIR) filter is found in https://en.wikipedia.org/wiki/Infinite_impulse_response and is:

```
y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[P]*x[n-P] -
(a[1]*y[n-1] + a[2]*y[n-2] + ... + a[Q]*y[n-Q]
```

We assume that the lengths of *a* and *b* are equal, and we call the length *M*. Function f in the code below returns *y*[*n*] where *x_window* is *x*[*n-M*, …, *n*] and *y_window* is *y*[*n-M*, …, *n*].

def bandpass_IIR(in_stream, b, a): M = len(b) out_stream = Stream(initial_value = np.zeros(M)) in_stream.extend(np.zeros(M)) @merge_w def f(windows, b, a): x_window, y_window = windows return np.dot(b, x_window) - np.dot(a, y_window[1:]) f(in_streams=[in_stream, out_stream], out_stream=out_stream, window_size=M, step_size=1, b=reverse(b), a=reverse(a[1:])) return out_stream

# EXAMPLES OF CLASSES THAT STORE HISTORICAL DATA

Next we give examples of code in which the historical stream data are saved as attributes of classes.

```
class BP_IIR(object):
"""
Bandpass IIR Filter
Parameters
----------
a, b: list of float
Parameters that define an IIR filter
Attributes
----------
x, y: array of float
Local variables of IIR calculations.
"""
def __init__(self, a, b):
assert len(b) == len(a)
self.b = np.array(b)
self.a = np.array(a)
self.N = len(a)
self.x = np.zeros(self.N)
self.y = np.zeros(self.N)
def filter_sample(self, sample):
"""
This is the standard IIR calculation.
Parameters
----------
sample: float or int
The next element of the stream.
"""
# Shift x and y to the right by 1
self.x[1:] = self.x[:- 1]
self.y[1:] = self.y[:-1]
# Update x[0] and y[0]
self.x[0] = sample
self.y[0] = self.a[0] * self.x[0]
self.y[0] += sum(self.a[1:]*self.x[1:] - self.b[1:]*self.y[1:])
return self.y[0]
def filter_stream(self, in_stream, out_stream):
"""
Filters the input stream to get the output stream
using filter_sample().
"""
map_element(self.filter_sample, in_stream, out_stream)
```

The *filter_sample*() code is a straightforward implementation of the equation for the operation of a bandpass filter when a new sample arrives on the stream. The *filter_stream*() function uses the code to obtain a function that filters its input stream to produce a filtered output stream.

Next, we define a function with parameters that include the low cutoff and high cutoff frequencies and the order of the bandpass filter, and which filters an input stream to produce an output stream. This function uses the *butter_bandpass* function from *scipy*.*signal* to get the parameters* b*, *a* used in the filter.

```
def bandpass_filter_stream(in_stream, out_stream, lowcut, highcut, fs, order):
"""
Parameters
----------
in_stream, out_stream: Stream
The input and output streams of the agent
low_cut, highcut: int or float
The lower and upper frequencies of the bandpass filter.
fs: int or float
The sample rate in number per second.
order: int, positive
The order of the filter.
"""
# butter_bandpass is imported from scipy.sigal
b, a = butter_bandpass(lowcut, highcut, fs, order)
bp = BP_IIR(b, a)
bp.filter_stream(in_stream, out_stream)
```