# Software Design Patterns for Streams

Design patterns for programs based on streams are similar to patterns for distributed systems with message passing, programming with single-assignment variables and functional programming. Here are a few examples of patterns.

## pattern: multiple agents modify the same stream

In the examples given earlier, a stream is modified by exactly one agent. When multiple agents modify the same stream the modifications take place in arbitrary order. Note that all the agents modifying the same stream must be on the same process. In some algorithms it is helpful to have multiple agents modify the same stream knowing that at most one agent can modify the stream at any point; this can help us ensure determinacy (multiple runs of the algorithm give the same result). An example of such an algorithm is the sieve of Eratosthenes for generating prime numbers. The code for the algorithm is given later on this page.

## pattern: agents Create Agents during computation

Most of the examples given earlier create a static network of agents: once the network is created no new agents are created. Some algorithms can benefit by creating agents during computation. An example of such an algorithm is the sieve of Eratosthenes for generating prime numbers. The literature has different versions of this algorithm; let's look at the code for finding all the primes, in increasing order, that are less than N, where N is a parameter: see primes_example_1 in prime_number_sieve.py

## Example: agents creating agents & multiple agents modifying streams

`sieve(in_stream, prime_stream)`

has parameters in_stream and prime_stream which are streams created outside sieve and are passed to sieve. in_stream is assumed to be a sequence of numbers p, p+1, p+2, .... where p is a prime number. prime_stream is assumed to be a sequence of prime numbers, 2, 3, 5, 7, 11, ... p' where p' is the prime number just before p. sieve appends p to prime_stream, and then creates:

`sieve(out_stream, prime_stream)`

where out_stream is a stream that is local to sieve(in_stream, prime_stream). Then sieve(in_stream, prime_stream) appends elements in in_stream that are not multiples of p on out_stream.

For example, if in_stream is the sequence [3, 5, 7, 9,  11, ...] and prime_stream is currently  then p is 3, and sieve(in_stream, prime_stream) will create sieve(out_stream, prime_stream) where out_stream will be the sequence [5, 7, 11, ...]

The source stream is numbers, and the computation is started by extending numbers with 2, ..., N.

This example illustrates two patterns:

1.  Multiple agents modify the same stream: The stream, prime_stream, is modified by multiple agents, but at most one agent modifies the stream at any point in the computation. An agent appends 2 to prime_stream and only then creates the agent that appends 3 to prime_stream, which only then creates the agent that appends 5 to prime_stream. So, there is no race condition between agents modify prime_stream, and primes are appended to prime_stream in increasing order.
2. An agent creates another agent during computation: sieve(in_stream, prime_stream) creates another agent sieve(out_stream, prime_stream).

Next, we describe the code in more detail. sieve(in_stream, prime_stream) creates the agent

```sink(func=f, in_stream=in_stream, state=0,
prime_stream=prime_stream, out_stream=Stream())```

The sink agent encapsulates function f where f has an initial state equal to 0, and has parameters prime_stream and out_stream. A sink agent has an input stream but no output stream; here prime_stream and out_stream are parameters of the encapsulated function.

The state is 0 initially and changes to p, a prime number, and then remains unchanged thereafter. The state is 0 only if the sink hasn't yet read any elements of in_stream. When the first element v of in_stream is read, then if the state is 0 it follows that v is a prime number, and so v is appended to prime_stream, and the state becomes v; and then  f creates the agent sieve(out_stream, prime_stream). When an element v of in_stream is read, if the state is not 0, then the state is a prime number, and v is appended to out_stream if v is not a multiple of p.

## pattern: cycles in agent graphs and storing state in streams

The agent graph can have cycles. If the system has a cycle of agents then we either want the computation to run for ever or we need a mechanism to stop computation. One way to stop computation in a cycle of agents is for an agent to detect that computation has run for long enough so that the desired results have been obtained; then this agent sends a stop message to other agents in the cycle; when agents receive the stop message they stop computation.

Storing the state of an agent in a stream can be helpful in some applications. The agent both outputs and inputs this stream and thus creates a cycle. The j-th element of the stream is the j-th state of the agent, for all j. The stream is initiated by appending the initial state of the agent to the stream. The stream can be read by other agents enabling them to read the sequence of states of this agent.

## Example: same stream as input and output

The state of an agent is stored in a stream called state_stream which is both an output and an input of the agent. The agent also has an input stream called stop_stream. When the agent reads any value on stop_stream the agent stops execution. The agent has a state - a Boolean value - which indicates whether the agent has read any value on stop_stream. The state is initially False and becomes True if the agent reads a value on stop_stream. Once the state becomes True it remains True forever thereafter.

The agent is a merge_asynch agent which operates on elements of both state_stream and stop_stream as they arrive. The merge is asynchronous: in different runs of the same program the agent may read its input elements from state_stream and stop_stream in different order. The agent encapsulates a function, execute_until_stop, which reads and then outputs a sequence of states until it reads a value from stop_stream. The elements that the function execute_until_stop operates on are 2-tuples: (index, value) where index is 0 for values read from state_stream, and is 1 for values read from stop_stream.

## Pattern: Using Timing Services - NTP

Many streaming applications require streams of timestamps. Estimates of the current time can be obtained by reading the system clock of the computer. More accurate times can be obtained by using NTP services