The code for this example is in IoTPy/examples/gunshots/. This is a simple example of a distributed system. Though the data for this example was obtained in an experiment to detect gunshots, the program developed in this example can be used to detect unusual shaking due to an earthquake.
The system has a collection of sensors distributed over a region, with a small single board computer, such as a Raspberry Pi, connected to each sensor. These sensor-processor pairs are said to be at the “edge” of the sensor network. Each edge process detects anomalies in the stream generated by the sensor to which it is connected. In this example, the sensors are accelerometers and the anomalies are high accelerations. Each edge process sends information about the anomalies that it detects to processes that aggregate information about anomalies from all edge processes. The aggregators detect global anomalies — in this case multiple sensors recording anomalously high accelerations at about the same time. The aggregator processes may be provided by a cloud service such as Amazon EC2.
An edge process may receive data from sensors at a high sample rate, say 250 samples per second. An edge process usually detects anomalies rarely — perhaps as little as once per day. So the rate of messages from the edge processes to aggregation processes is usually low even when there are thousands of sensors.
In applications such as earthquake monitoring we want to store all the information collected by sensors in addition to collecting information about anomalies. The data from many sensors collected over months can provide information about soil and building structures. Sending 250 samples per second from each of thousands of sensors to a storage device is expensive; besides real-time data is necessary primarily for anomalous situations. So edge processes store all the information locally for some period of time, such as a minute or an hour, and then send a single large message for each time period containing all the information for the period.
In this simple example, the data from a tri-axial sensor is stored in three files identified by e, n and z for East, North and Vertical direction. This example has data from three sensors called S1, S2, and S3, and the files are S1.e.txt, S1.n.txt, S1.z.txt, …., S3.z.txt. This data is synchronized; so the n-th item generated by all sensors were generated at the same time. We had to carry out some amount of preprocessing to synchronize the data for this simple example. Later we will look at more complete examples of sensor networks used to monitor earthquakes.
The data was generated at 250 samples per second and was gathered for a long time. We shortened the files so that they can be downloaded easily; the files are abbreviated by downsampling to one sample per second and keeping data for a small duration. Algorithms for downsampling are in IoTPy/examples/gunshots/shorten_files.py. The goals of the example are not diluted by using reduced data.
The logical structure of the application is shown in the diagram below.
The logical structure is a tree with aggregation of anomalies over larger regions as one moves away from the leaves of the tree and towards the root. For example, a local anomaly is an unusually high magnitude at a sensor, a regional anomaly could be multiple local anomalies in a building, and the next level may have multiple regional anomalies in a campus or city. In this example, we use only two levels: local anomalies and a single regional aggregator.
You first determine the logical structure as a network of agents. Then you decide how many processes you want and finally decide how to partition the network of agents among processes. In this example, the partitioning of the logical network among processes is self-evident: detect local anomalies in edge processes and aggregate anomalies in separate remote processes. The edge processes are situated at sensor locations and so communication from the edge processes to the aggregator is carried out by messages rather than by shared memory. Since communication among these processes is by message passing these processes are distributed processes. If communication among the processes was exclusively through shared memory then the process would be a shared-memory process.
The code for this process is in: IoTPy/examples/accelerometer_example/detect_large_magnitude.py
An edge process has an input for each direction: e, n, and z. It has a single output. Let’s call the input stream names ‘e’, ‘n’, and ‘z’, and let’s call the output stream name ‘out’. We will connect the sources that read data from files ‘S1.e.txt’, ‘S1.n.txt’, and ‘S1.z.txt’ to the inputs e, n, and z, respectively. Later, we will specify the function, compute_func, that carries out the computation on the input streams to produce the output stream. We define the edge process as follows.
directions = ['e', 'n', 'z'] proc_0 = distributed_process( compute_func=compute_func, in_stream_names=directions, out_stream_names=['out'], connect_sources=[ (directions[i], source(filenames[i])) for i in range(len(directions))] )
Next we make a virtual machine which sends or receives messages from other virtual machines. This virtual machine (vm) consists of a single distributed process, proc_0. Since the vm has only one process there are no connections among processes within the vm. This vm publishes its output stream, and let’s call the publication by the sensor name. For example the local anomaly stream generated by the process connected to sensor S1 is published as ‘S1’. This process does not receive streams from other processes, and so it has no subscribers. So, the specification of the vm is as follows, where vm_0.start() starts execution of the vm.
vm_0 = VM( processes=[proc_0], connections=, publishers=[(proc_0, 'out', sensor_name)]), subscribers= vm_0.start()
You can run each VM in a separate computer or in a separate terminal window of a computer. For example by entering:
python detect_large_magnitude.py S1 S1.e.txt S1.n.txt S1.z.txt
and to enable running this command from the terminal we add the following
if __name__ == '__main__': args = sys.argv detect_large_magnitude(sys.argv, sys.argv[2:])
Note: We will describe the computational functions (compute_func) for the edge process and aggregation process later. For the time being, let’s restrict attention to the creation of processes and virtual machines.
The code for this process is in IoTPy/examples/accelerometer_example/global_aggregator.py
We will run the example with only two (rather than three) edge processes because setting up three computers (or terminal windows) for the edge processes and one more for the aggregation process is cumbersome. The aggregation process has two inputs, one from each edge process. Let’s call these inputs ‘in_1’ and ‘in_2’.
We define a distributed process for the aggregator in the same way that we did for the edge process.
proc = distributed_process( compute_func=compute_func, in_stream_names=['in_1', 'in_2'], out_stream_names=, connect_sources=, name='global aggregator')
We will specify its compute function later. The process has two input streams and no output streams. The aggregator has no source threads —- all its inputs are from other processes.
Next, we define the virtual machine constructed from the distributed processes that constitute the virtual machine; in this instance, the virtual machine consists of a single distributed process, proc.
vm = VM( processes=[proc], connections=, subscribers=[(proc, 'in_1', 'S1'), (proc, 'in_2', 'S2')]) vm.start()
Since this VM consists of a single process it has no connections to other processes. This VM does not send information to other processes; hence it has no publishers. For this VM, subscribers is a list of two triples, where the first triple says that the input called ‘in_1’ for process, proc, is obtained from the publication called ‘S1’. Likewise, the second triple of subscribers says that the input called ‘in_2’ for process, proc, is obtained from the publication called ‘S2’.
You can run the aggregator process in a separate computer or in its own terminal window by executing:
When you run the three computers (or three terminal windows), the computations of each terminal will stop with the output:
stopped: No more input
and the results will be stored in two files local_anomalies.txt and global_anomalies.txt. The local anomalies file contains local anomalies detected by the two sensors. These files contain pairs: the time of the anomaly and the magnitude of the anomaly. The global anomalies file contains the time of the global anomalies detected by the aggregation process. In this simple example, a global anomaly is detected when both sensors detect local anomalies at approximately the same time.
The Computational Functions for the Edge and Aggregator Processes
The Computational Function for the Edge Process
Many applications are required to analyze and aggregate multiple streams. For example, a network of agents can ingest streams from sensors in a building and output a stream that captures the vibration and movement of the building. The building’s behavior is important during an earthquake or a fire. Analyzing the streams that capture the behavior of each building in a block can produce a stream that captures the behavior of the block. And analyzing the streams capturing the behavior of a block can generate a stream that represents the behavior of a town.
An application may consist of a distributed system consisting of multiple multicore computers. In this example we consider multiprocess program for a single computer. We discuss distributed systems and cloud computing elsewhere.
This example does not have actuators. An actuator is an agent that runs in its own thread and controls devices: for example it may turn an audible alarm off and on.
the three Central Ideas in the Example
You create non-terminating agents operating on endless streams by wrapping terminating functions that operate on conventional objects such as lists, numbers and strings.
You build an application as a network of agents. You create the network by connecting output streams of agents to the input streams of agents.
You build an application for a multicore computer by creating a network of processes. A process is a network of agents where some agents may execute in separate threads. You build a network of processes in the same way that you build a network of agents: connect output streams of a processes to input streams of processes.
The Process Structure
The process structure is shown in the figure below which only shows one level of aggregation. For simplicity, the example has only two types of processes: source processes that acquire data from sources and an aggregator process that analyzes streams generated by the source processes. The aggregator process generates alerts or controls actuators, and so it does not have output streams. A source process executes a function f on a list of streams generated by sources, and the aggregator process executes a function g on a list of streams produced by the edge processes.
Creating a network of agents
Next, let’s implement function f in a source agent and later implement function g of the aggregator. The function is implemented as a network of agents. Next, we develop an application for a specific example: detecting anomalously high accelerations such as those produced by earthquakes or explosions.
You can test your program on data which consists of accelerometer recordings made in a laboratory at the California Institute of Technology by Julian Bunn, Monica Kohler, and Richard Guy on February 16, 2017 to detect explosions such as gunshots. Each sensor generates three streams of accelerations, one for each direction: east, north and vertical. The j-th value in all streams of all sensors are generated at precisely the same time. Let’s create a multiprocess program which ingests data from 3 accelerometers and detects anomalously high accelerations. Later, we will modify this application to detect earthquakes.
the compute function for the source process
Each source process ingests the 3 streams (east, north, vertical) generated by a single accelerometer. We will develop a program using 3 accelerometers. The application has one source process for each accelerometer. A source process acquires 3 streams of data (east, north, vertical) from an accelerometer and carries out a computation on the streams. The acquisition of data from each source stream is executed in its own thread, and the computation is also carried out in its own thread. The computation is implemented by a network of agents which is depicted below.
Next, we described the agents in the network. We subtract the mean from each stream of accelerations to eliminate the acceleration due to gravity. We then compute the magnitude of the zero-mean accelerations and detect a local anomaly - an unusually high acceleration detected by this accelerometer.
The aggregation process, which is described later, receives streams indicating the presence or absence of local anomalies from each of the 3 source processes, and it detects anomalously high accelerations if at least 2 of the 3 source processes detects an anomalously high acceleration.
This program is simple and its purpose is to illustrate how to construct networks of agents. You can create more sophisticated networks in the same way.
Next, let’s design function f. The parameters of f are in_streams and out_streams which are lists of the input streams and output streams of the network of agents that implements f. We first declare the streams of the network.
Declare streams of the network
# DECLARE STREAMS zero_means = [ Stream('zero_means_' + str(i)) for i in range(len(in_streams))] magnitudes = Stream('magnitudes')
In addition to in_streams and out_streams the network has a zero-mean stream for each acceleration direction and a magnitude stream. We call the three zero_mean_streams: "zero_means_0”, "zero_means_1”, and "zero_means_2”. These names are optional, but they can be helpful for debugging. magnitudes is a stream of magnitudes of accelerations.
We create an agent in two steps: first we write a terminating function and then wrap the function with an IoTPy wrapper to create the agent.
Step 1: Subtract the mean
def subtract_mean(window): return window[-1] - sum(window)/float(len(window))
Next we wrap this function using the map_window wrapper to create 3 agents, one for each input stream.
for i in range(len(in_streams)): map_window( func=subtract_mean, in_stream=in_streams[i], out_stream=zero_mean_streams[i], window_size=4, step_size=1, initial_value=0.0)
For purposes of illustration consider the following example: the input stream consists of elements 0, 1, 0, 1, 0, 1, … with a repetition of the pair 0, 1. Assume that the window size is 4, because illustrations with larger window sizes are lengthy. (In the program to detect gunshots, the window size is large - over 1000 - so that any single acceleration value does not impact the mean for the window significantly.) The first window is filled only after the input stream has at least window-size elements. The initial values of the output stream, before there are window_size elements in the input stream, are initial_value (0.0 in this case).
The first window of the window size is [0, 1, 0, 1] and when the subtract_mean function is applied to this window, it returns 1 - (0+1+0+1)/4 = 0.5. So the output stream will now be 0.0, 0.0, 0.0, 0.5.
The next window is [1, 0, 1, 0], and and when the subtract_mean function is applied to this window, it returns 0.5 again, and so when the input stream is 0, 1, 0, 1, 0 the output stream will be 0.0, 0.0, 0.0, 0.5, 0.5. The values of the input stream and the corresponding values of the output stream are:
Input stream Output stream    [0.0] [0, 1] [0.0, 0.0] [0, 1, 0] [0.0, 0.0, 0.0] [0, 1, 0, 1] [0.0, 0.0, 0.0, 0.5] [0, 1, 0, 1, 0] [0.0, 0.0, 0.0, 0.5, 0.5]
Step 2: Compute the magnitude of the acceleration vector.
def magnitude_of_vector(coordinates): return math.sqrt(sum([v*v for v in coordinates])
We wrap this function using the zip_map wrapper which zips the i-th element of each of the input streams together and applies the specified function (magnitude_of_vector) to the zipped elements
zip_map( func=magnitude_of_vector, in_streams=zero_mean_streams, out_stream=magnitude_stream)
Step 3: Detect anomalously high magnitudes.
The simplest algorithm is detect an anomaly when the magnitude exceeds a threshold, which is 0.01 in this example. The algorithm outputs 1.0 to signal an anomaly, and 0.0 to signal its absence.
def simple_anomaly(value): return 1.0 if value > 0.01 else 0.0
We wrap this function with the map_element wrapper which applies the function to each element of its input stream to obtain each element of its output stream.
map_element( func=simple_anomaly, in_stream=magnitudes, out_stream=out_streams)
The program components that we described above are put together below. This is the entire code for f.
def f(in_streams, out_streams): # DECLARE STREAMS zero_means = [ Stream('zero_means_' + str(i)) for i in range(len(in_streams))] magnitudes = Stream('magnitudes') # CREATE AGENTS # create subtract_mean agents # Define the terminating function def subtract_mean(window): return window[-1] - sum(window)/float(len(window)) # Wrap the terminating function to create an agent for i in range(len(in_streams)): map_window( func=subtract_mean, in_stream=in_streams[i], out_stream=zero_means[i], window_size=500, step_size=1, initial_value=0.0) # Create the magnitude agent # Define the terminating function def magnitude_of_vector(coordinates): return math.sqrt(sum([v*v for v in coordinates])) # Wrap the terminating function to create an agent zip_map( func=magnitude_of_vector, in_streams=zero_means, out_stream=magnitudes ) # Create the local anomaly agent # Define the terminating function def simple_anomaly(value): return 1.0 if value > 1 else 0.0 # Wrap the terminating function to create an agent map_element( func=simple_anomaly, in_stream=magnitudes, out_stream=out_streams )
The compute function for the aggregator
The aggregator detects an anomalously high acceleration if two or more of the source processes detect anomalously high accelerations.
As usual, we define the process by defining its streams and its agents. And, as usual, we create an agent by defining a terminating function and then wrapping it with an IoTPy wrapper. The wrapper used in this example is merge_window.
The terminating function, aggregate, is given below
def aggregate(windows): number_local_anomalies = [ any(window) for window in windows].count(True) return 1.0 if number_local_anomalies > 1 else 0.0
In the above function, windows is a list of lists which is a list of one window per input stream. In our example, windows will be a list of 3 lists, one from each source process. any(window) is True if any element of the window is 1.0 (a local anomaly) in our case. So, number_local_anomalies is a count of the number of window in windows that has at least one local anomaly. The aggregate function detects an anomaly and returns 1.0 if the number of local anomalies is 2 or more.
We wrap the terminating function, aggregate, with the IoTPy wrapper, merge_window, to create the agent.
merge_window( func=aggregate, in_streams=in_streams, out_stream=regional_anomalies, window_size=250, step_size=1, initial_value=0.0)
Copying streams to files
Copying streams to files is helpful while testing an application. Plotting and analyzing the files can help us understand if the agents are designed correctly. Calling
creates an agent that copies the stream to the specified file. In this example, we copy the output of the aggregator to a file. In an application that acquired data from sensors, rather than from files, the agent would issue an alert such as a text message or an audible alarm rather than store results in a file.
You will find it helpful to drive the application with different streams and copy intermediate streams to files to see how the application works,
def g(in_streams, out_streams): # DECLARE STREAMS regional_anomalies = Stream('Regional anomalies') # CREATE AGENTS # Create the aggregation agent # Define the terminating function def aggregate(windows): number_local_anomalies = [ any(window) for window in windows].count(True) return 1.0 if number_local_anomalies > 1 else 0.0 # Wrap the terminating function to create an agent merge_window( func=aggregate, in_streams=in_streams, out_stream=regional_anomalies, window_size=250, step_size=1, initial_value=0.0) # Agent that copies a stream to a file # Plot these files to understand the application. for i in range(len(in_streams)): stream_to_file(in_streams[i], 'Anomalies_'+str(i+1)+'_.txt') stream_to_file(regional_anomalies, 'regional_anomalies.txt')
Create and run a network of processes
We next define the list of processes in our application and the connections between processes.
the list of source processes
We give names to the input and output streams of function f in the source process so that we can connect the sources, actuators (if any) to the function. We can give arbitrary names (character strings) to the functions, but two input/output streams must not have the same name. Let’s call the streams ‘e’, ‘n’, and ‘z’ for the east, north and vertical streams from a sensor. Let’s call the output stream ‘out’.
Now we connect the input stream called ‘e’ to the source generated by this sensor in the east direction, and likewise for the other directions. (If the application had actuators we would connect output streams to actuators as well.)
Assume that we are given a dictionary called sensors where sensors[sensor_name][direction] is a source that generates the stream in the specified direction for the specified sensor. For the process that ingests streams from a sensor called ‘S1’ we connect the input streams called ‘e’, ‘n’ and ‘z’ to the sources sensors[‘S1’][‘e’], sensors[‘S1’][‘n’], and sensors[‘S1’][‘z’], respectively. Likewise, connect sources generated by the other sensors to the processes that ingest streams from them.
source_processes = [ make_process( compute_func=f, in_stream_names=['e', 'n', 'z'], out_stream_names=['out'], connect_sources=[ (direction, source_sensor_direction[sensor_name][direction]) for direction in ['e', 'n', 'z']]) for sensor_name in sensors.keys()]
The process computes its input data using function f. We call the input stream names ‘e’, ‘n’ and ‘z’. We call the output steam name ‘out’. We specify the connections of sources to the input streams of f by connect_sources which is a list of pairs: (stream name, source), which for a sensor called ‘S1’ would be:
[('e', source_sensor_direction['S1']['e']), ('n', source_sensor_direction['S1']['n']), ('z', source_sensor_direction['S1']['z'])]
the aggregation process
The aggregation process computes its input data using function g. We call its input streams ‘in_0’, ‘in_1’, ‘in_2’… The aggregation process has no output streams. It has no sources and so connect_sources is the empty list.
aggregation_process = make_process( compute_func=g, in_stream_names=[ 'in_'+ str(i) for i in range(len(sensors))], out_stream_names=, connect_sources=)
Running the multiprocess application
Next, we specify the connections between processes. The function f in a source process has an output stream called ‘out’. We connect the output stream of the j-th source process to the input stream ‘in_j’ of the aggregation stream. A connection is a list with four elements: (1) the sending process , (2) the name of an output stream of the sending process, (3) a receiving process, and (4) the name of an input stream of the receiving process. For example a connection [p, ‘out’, q, ‘in’] says that the output stream called ‘out’ in process p is the input stream called ‘in’ in process q. The list of connections in this example is:
connections = [ (source_processes[i], 'out', aggregation_process, 'in_'+str(i)) for i in range(len(sensors))])
The processes in the application are the source processes and the aggregation process. We run the multiprocess application with the following code:
run_multiprocess( processes=source_processes + [aggregation_process], connections = [ (source_processes[i], 'out', aggregation_process, 'in_'+str(i)) for i in range(len(sensors))])
Computer Science Course Examples
distributed systems algorithms
Written by Juliette Xiong, undergraduate student at Caltech.
Single process programs