This section describes stream programs for filters. We begin with FIR and IIR (Finite and Infinite Impulse Response) bandpass filters. We first describe the code using windowing agents (*map_window* and *merge_window*). These examples illustrate the use of stream arrays and NumPy.

Later, to illustrate the use of classes to store historical data, we give the program using the *map_element* agent. Using windows gives shorter, more elegant code than code that uses *map_element* and which reads only one element of an input stream at a time. The code using *map_element* is provided merely as an illustration.

The code for this section was written by Deepak Narayanan, IIT Gandhinagar, and Mani Chandy, Caltech.

**BANDPASS FIR FILTER OF STREAMING DATA**

You can download this code by clicking on IoTPy/examples/signal_processing_examples/dsp_filters.py*. *

Let *y* be the output stream and *x* the input stream. The formula for a Finite Impulse Response (FIR) filter is found in https://en.wikipedia.org/wiki/Finite_impulse_response and is:

`y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]`

where *b* is a parameter of the filter and its length is *M*+1. So, *y*[*n*] is the dot product of a window into *x* with the reverse of *b*.

```
def bandpass_FIR(in_stream, out_stream, b):
reverse_b = reverse_array(b)
window_dot_product(
in_stream, out_stream, multiplicand_vector=reverse_b)
```

*out_stream* will be a filtered version of *in_stream* based on FIR parameter *b*.

**BANDPASS IIR FILTER OF STREAMING DATA**

You can download this code by clicking on: IoTPy/examples/signal_processing_examples/dsp_filters.py*. *

The formula for an Infinite Impulse Response (IIR) filter is found in https://en.wikipedia.org/wiki/Infinite_impulse_response and is:

```
y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[P]*x[n-P] -
(a[1]*y[n-1] + a[2]*y[n-2] + ... + a[Q]*y[n-Q]
```

assuming that *a*[0] is 1, and where *b* and *a* are parameters of the filter, and *P* and *Q* are their respective lengths. For simplicity, we set *M* to the larger of *P* and *Q*, and we pad *a* or *b* with zeros, as necessary so that both vectors are of length *M*.

We implement this equation by feeding the output back to the input. The agent is *merge_window* with two input streams, the *in_stream, x, *and the output stream, *y*. The function to compute *y*[*n*] is the dot product of the reverse of *b* and the *x*-window minus the dot product of the reverse of *a* and the *y*-window.

**Bandpass IIR Filter with feedback of the output stream**

We initialize both the input and output streams with *M* zeros so that the merge_window agent starts with two windows of zero values.

```
def bandpass_IIR(in_stream, out_stream, b, a):
def f(windows, b, a):
x_window, y_window = windows
return np.dot(b, x_window) - np.dot(a, y_window[1:])
reverse_b = reverse_array(b)
reverse_a = reverse_array(a[1:])
M = len(b)
out_stream.extend(np.zeros(M))
in_stream.extend(np.zeros(M))
# Create the agent.
merge_window(
func=f, in_streams=[in_stream, out_stream],
out_stream=out_stream,
window_size=M, step_size=1, b=reverse_b, a=reverse_a)
```

# EXAMPLES OF CLASSES THAT STORE HISTORICAL DATA

Next we give examples of code in which the historical stream data are saved as attributes of classes. *These are merely presented as illustrations of programs that use classes to store historical data from streams.* The window operations are simpler.

```
class BP_IIR(object):
"""
Bandpass IIR Filter
Parameters
----------
a, b: list of float
Parameters that define an IIR filter
Attributes
----------
x, y: array of float
Local variables of IIR calculations.
"""
def __init__(self, a, b):
assert len(b) == len(a)
self.b = np.array(b)
self.a = np.array(a)
self.N = len(a)
self.x = np.zeros(self.N)
self.y = np.zeros(self.N)
def filter_sample(self, sample):
"""
This is the standard IIR calculation.
Parameters
----------
sample: float or int
The next element of the stream.
"""
# Shift x and y to the right by 1
self.x[1:] = self.x[:- 1]
self.y[1:] = self.y[:-1]
# Update x[0] and y[0]
self.x[0] = sample
self.y[0] = self.a[0] * self.x[0]
self.y[0] += sum(self.a[1:]*self.x[1:] - self.b[1:]*self.y[1:])
return self.y[0]
def filter_stream(self, in_stream, out_stream):
"""
Filters the input stream to get the output stream
using filter_sample().
"""
map_element(self.filter_sample, in_stream, out_stream)
```

The *filter_sample*() code is a straightforward implementation of the equation for the operation of a bandpass filter when a new sample arrives on the stream. The *filter_stream*() function uses the code to obtain a function that filters its input stream to produce a filtered output stream.

Next, we define a function with parameters that include the low cutoff and high cutoff frequencies and the order of the bandpass filter, and which filters an input stream to produce an output stream. This function uses the *butter_bandpass* function from *scipy*.*signal* to get the parameters* b*, *a* used in the filter.

```
def bandpass_filter_stream(in_stream, out_stream, lowcut, highcut, fs, order):
"""
Parameters
----------
in_stream, out_stream: Stream
The input and output streams of the agent
low_cut, highcut: int or float
The lower and upper frequencies of the bandpass filter.
fs: int or float
The sample rate in number per second.
order: int, positive
The order of the filter.
"""
# butter_bandpass is imported from scipy.sigal
b, a = butter_bandpass(lowcut, highcut, fs, order)
bp = BP_IIR(b, a)
bp.filter_stream(in_stream, out_stream)
```