These examples illustrate algorithms implemented by nondeterministic atomic actions. These examples are from the book, "Parallel Program Design: A Foundation" by Chandy & Misra. These examples illustrate how agents manipulate shared variables.
These examples illustrate SIGNAL streams. A value is appended to a signal stream when a shared variable changes value. Agents read signal streams to determine when shared variables change value. Usually we use True or 1 as the only messages appended to signal streams, though any other value can be use
Sorting The first example is to sort a list in increasing order by flipping any adjacent pair of elements that are out of order. This example has one agent for each adjacent pair indexed (i, i+1) of the list, and this agent is responsible for ensuring that this pair is in increasing order.
Shortest Path The second example is to find the matrix of shortest-path lengths in a graph given the edge-weight matrix of the graph. This example has an agent for every triple (i,j,k) where i,j,k are indices into the matrix. The agent associated with the triple (i,j,k) is responsible for ensuring that the triangle inequality holds for this triple, i.e. d[i,k] <= d[i,j] + d[j,k].
Stopping execution of an agent The third example shows how a shared variable, stop, can be used by one agent to stop the execution of another. This example illustrates the nondeterministic aspect of these programs.
def sort(lst): # flip elements that are out of order. def flip(i): if lst[i-1] > lst[i]: lst[i-1], lst[i] = lst[i], lst[i-1] return 1 else: return _no_value # Create streams S = [Stream() for i in range(len(lst)+1)] # Create agents for i in range(1, len(lst)): signal_element( func=flip, in_stream=weave_f([S[i-1],S[i+1]]), out_stream=S[i], i=i) # Insert data to start agents for stream in S: stream.append(1) run()
The flip() function returns 1 if execution of the function changes the list and returns _no_value otherwise. The agents are indexed i in the interval [1, …, N-1] where N is the length of the list. Agent i puts lst[i-1] and lst[i] in increasing order. Agent i outputs a value on stream S[i] whenever it changes lst[i-1] or lst[i]. Agent i reads an input stream which is an asynchronous merge of streams S[i-1] and S[i+1]. If either S[i-1] or S[i+1]has a new value appended to it, agent i is woken up and takes a step. So agent i is woken up if agent i-1 changed lst[i-1]. Agent i is also woken up if agent i+1 changed lst[i].
Initially, all the agents are woken up because 1 is appended to all streams. The computation terminates when no streams are extended and therefore all agents become quiescent.
We first define the central function which is encapsulated by an agent. For a triple i, j, k, this function makes the triangle inequality hold, i.e., D[i,j] + D[j,k] <= D[i,k] where D is initially the matrix of edge distances.
# STEP 1. DEFINE FUNCTION TO BE ENCAPSULATED def triangle_inequality(triple): i, j, k = triple if D[i][j] + D[j][k] < D[i][k]: D[i][k] = D[i][j] + D[j][k] return 1 else: return _no_value
The next step is to create streams that indicate when agent has changed the data structure. We have one agent for each triple, and so we also have one stream for each triple.
# STEP 2. CREATE STREAMS indices = range(len(D)) changed = [[ Stream() for i in indices] for j in indices]
The next step is to create agents, one for each triple (i, j, k)
# STEP 3. CREATE AGENTS for i in indices: for j in indices: for k in indices: signal_element(func=triangle_inequality, in_stream=weave_f([ changed[i][j], changed[j][k]]), out_stream=changed[i][k], triple=[i, j, k])
The agent for triple (i,j,k) gets an input stream which is an asynchronous merge of streams changed[i][j] and changed[j][k]. So, this agent is woken up whenever D[i][j] or D[j][k] decreases in value. If this agent changes D[i][k] then it appends a value (1) to the stream changed[i][k].
The final steps is to start the computation by putting a value into each of the streams which causes all the agents to be woken up.