This section describes stream programs for filters. We begin with FIR and IIR (Finite and Infinite Impulse Response) bandpass filters. We first describe the code using windowing agents (map_window and merge_window or their decorators @map_w and @merge_w). These examples illustrate the use of stream arrays and NumPy. They also show how IoTPy allows you to reuse terminating functions from popular libraries to build non-terminating functions on endless streams.
Later, to illustrate the use of classes to store historical data, we give the program using the map_element agent or the @map_e decorator. Using windows gives shorter, more elegant code than code that uses map_element and which reads only one element of an input stream at a time. The code using map_element is provided merely as an illustration.
The code for this section was written by Deepak Narayanan and Mani Chandy, Caltech.
BANDPASS FIR FILTER OF STREAMING DATA
You can download this code by clicking on IoTPy/examples/signal_processing_examples/dsp_filters.py.
Let y be the output stream and x the input stream. The formula for a Finite Impulse Response (FIR) filter is found in https://en.wikipedia.org/wiki/Finite_impulse_response and is:
y[n] = b*x[n] + b*x[n-1] + ... + b[M]*x[n-M]
where b is a parameter of the filter and its length is M+1. So, y[n] is the dot product of a window into x with the reverse of b.
def bandpass_FIR(in_stream, b): return dot_product(in_stream, reverse(b))
dot_product returns the dot product of the second argument, reverse(b), on sliding windows of in_stream. The code for dot_product is:
def dot_product(in_stream, vector): @fmap_w def f(window, vector): return np.dot(window, vector) return f(in_stream, window_size=len(vector), step_size=1, vector=vector)
BANDPASS IIR FILTER OF STREAMING DATA
You can download this code by clicking on: IoTPy/examples/signal_processing_examples/dsp_filters.py.
The formula for an Infinite Impulse Response (IIR) filter is found in https://en.wikipedia.org/wiki/Infinite_impulse_response and is:
y[n] = b*x[n] + b*x[n-1] + ... + b[P]*x[n-P] - (a*y[n-1] + a*y[n-2] + ... + a[Q]*y[n-Q]
We assume that the lengths of a and b are equal, and we call the length M. Function f in the code below returns y[n] where x_window is x[n-M, …, n] and y_window is y[n-M, …, n].
def bandpass_IIR(in_stream, b, a): M = len(b) out_stream = Stream(initial_value = np.zeros(M)) in_stream.extend(np.zeros(M)) @merge_w def f(windows, b, a): x_window, y_window = windows return np.dot(b, x_window) - np.dot(a, y_window[1:]) f(in_streams=[in_stream, out_stream], out_stream=out_stream, window_size=M, step_size=1, b=reverse(b), a=reverse(a[1:])) return out_stream
EXAMPLES OF CLASSES THAT STORE HISTORICAL DATA
Next we give examples of code in which the historical stream data are saved as attributes of classes.
class BP_IIR(object): """ Bandpass IIR Filter Parameters ---------- a, b: list of float Parameters that define an IIR filter Attributes ---------- x, y: array of float Local variables of IIR calculations. """ def __init__(self, a, b): assert len(b) == len(a) self.b = np.array(b) self.a = np.array(a) self.N = len(a) self.x = np.zeros(self.N) self.y = np.zeros(self.N) def filter_sample(self, sample): """ This is the standard IIR calculation. Parameters ---------- sample: float or int The next element of the stream. """ # Shift x and y to the right by 1 self.x[1:] = self.x[:- 1] self.y[1:] = self.y[:-1] # Update x and y self.x = sample self.y = self.a * self.x self.y += sum(self.a[1:]*self.x[1:] - self.b[1:]*self.y[1:]) return self.y def filter_stream(self, in_stream, out_stream): """ Filters the input stream to get the output stream using filter_sample(). """ map_element(self.filter_sample, in_stream, out_stream)
The filter_sample() code is a straightforward implementation of the equation for the operation of a bandpass filter when a new sample arrives on the stream. The filter_stream() function uses the code to obtain a function that filters its input stream to produce a filtered output stream.
Next, we define a function with parameters that include the low cutoff and high cutoff frequencies and the order of the bandpass filter, and which filters an input stream to produce an output stream. This function uses the butter_bandpass function from scipy.signal to get the parameters b, a used in the filter.
def bandpass_filter_stream(in_stream, out_stream, lowcut, highcut, fs, order): """ Parameters ---------- in_stream, out_stream: Stream The input and output streams of the agent low_cut, highcut: int or float The lower and upper frequencies of the bandpass filter. fs: int or float The sample rate in number per second. order: int, positive The order of the filter. """ # butter_bandpass is imported from scipy.sigal b, a = butter_bandpass(lowcut, highcut, fs, order) bp = BP_IIR(b, a) bp.filter_stream(in_stream, out_stream)