EXAMPLES

See: IoTPy/examples/

The goal of these examples is to demonstrate code reuse. The examples show how code for terminating functions can be wrapped in IoTPy to get persistent agents that operate on streams.

Examples of agents, which you can drive step by step (as opposed to being driven by sources executing in their own threads), are found in the section testing an agent. Applications are generally driven by sources of data, and an application consists of one or more processes. This section contains applications consisting of a single process, multiple shared-memory processes and distributed sections.

The examples in this section are organized as follows:

  • Single process examples: Simple examples that illustrate applications consisting of a single process. See IoTPy/examples/single_process_examples/

  • Multiprocess examples: Simple examples that show how to build applications consisting of multiple processes executing within a common shared memory. See IoTPy/examples/multiprocess_examples/

  • Distributed system examples: Simple examples of distributed applications with multiple virtual machines where each virtual machine may consist of multiple shared-memory processes. See IoTPy/examples/distributed_examples/

  • Text streams, Twitter: Examples of analysis of text streams such as Twitter. See IoTPy/examples/Twitter/

  • Numeric streams from accelerometers: A simple example that illustrates the detection of gunshots. See IoTPy/examples/gunshots/

  • Synchronizing clocks: Examples of generating timestamped streams using NTP servers. These examples illustrate interactions between streams and servers that reply to requests. See IoTPy/examples/timing/

  • Basic CS algorithms such as prime number sieve of Eratosthenes. See IoTPy/examples/BasicCS/

  • UNITY: Algorithms from the book on the UNITY way of proving concurrent programs. See IoTPy/examples/UNITY/

  • Acoustics examples are in IoTPy/examples/acoustics/ These examples illustrate the use of IoTPy for creating networks of agents that process streams of acoustic data.

  • Signal processing examples: Filters These examples illustrate the use of SciPy and other open-source libraries for carrying out signal processing applications, such as filtering, on streams of data.

  • Counting items in streams These examples illustrate the use of open-source libraries such as PyProbables to build applications, such as heavy-hitters, on streams of data.

  • Unsupervised learning: Clustering These examples show how learning from one value of a stream can reduce the time required to learn when the stream is extended. This example is the k-means algorithm applied to streams.

A few trivial examples

These examples are found in detail in single process examples and multiprocess examples.
The first trivial example writes ten values --- 0, 10, 20, 30, ... 90  --- to a file called test.dat. Of course, you don't need to use multiple threads for this problem.

The source agent is created by the source_to_stream which encapsulates a function generate_sequence (see the code below). The source agent puts the next element of the sequence into a stream s and suspends execution for 0.1 seconds. The computational agent is a composition consisting of two component agents: An agent, created by the map_element wrapper wraps a function f which multiplies each of these elements by 10; this agent has input stream s and output stream t. A sink agent  ---- stream_to_file --- writes the values in its input stream t to the file.

The code for the application is given below and is also in the test suite:

IoTPy/IoTPy/tests/multicore_test.py.

def test_single_process_single_source():

    def g(s):
        # A simple source which outputs 1, 2, 3,.. on
        # stream s. If num_steps is positive then the source
        # generates num_steps values.
        def generate_sequence(state): return state+1, state+1

        # Return a thread object which takes 10 steps, and
        # sleeps for 0.1 seconds between successive steps, and
        # puts the next element of the sequence in stream s,
        # and starts the sequence with value 0.
        return source_function(
            func=generate_sequence, out_stream=s,
            time_interval=0.1, num_steps=2, state=0)

    def h(in_streams, out_streams):
        # A trivial example of a composition of agents consisting
        # of two agents where the network has a single input
        # stream: s.
        # The first agent applies function f to each element 
        # of the input stream, s, and puts the result in its
        # output stream t.
        # The second agent puts values in its input stream t
        # on a file called test.dat.
        from op import map_element
        from sink import stream_to_file

        def f(x): return x*10
        t = Stream()
        map_element(
            func=f, in_stream=s, out_stream=t)
        stream_to_file(in_stream=t, filename='test.dat')

    # Create a process with two threads: a source thread and
    # a compute thread. The source thread executes the function
    # g, and the compute thread executes function h.
    proc = make_process(
       compute_func=h,
       in_stream_names=['in'],
       out_stream_names=['out'],
       connect_sources=[('in', g)],
       connect_actuators=[])
   run_multiprocess(processes=[proc], connections=[])
   # Note: You can replace the last two statements with
   # single_process_single_source(source_func=g, compute_func=h)

In this example, the source function puts elements of the sequence 0, 1, 2, 3, ..9 on stream s with one element every 0.1 seconds, and then stops. The map_element agent in the compute thread applies function f to each element of s and puts the results 0, 10, 20, ..., 90 on stream t while stream_to_file puts these results into file called test.dat.

You may want to try running some variations of this code. Replace h with other networks of agents and try different values for the parameters time_interval and num_steps.

applications consisting of a single process with multiple sources

example: single process with multiple sources

This trivial example writes (0, ran_0), (1, ran_1), (2, ran_2), ... to a file called output.dat where ran_0, ran_1, ran_2, ... are generated by a random number function. The program has two sources. One source puts the sequence 0, 1, 2, ... on to a stream and the other source puts the sequence ran_0ran_1ran_2, ... on to another stream. Each source suspends execution after putting an item on the stream. The computational network consists of two agents: (1) The two streams are zipped by an agent to create the stream t consisting of (0, ran_0), (1, ran_1), (2, ran_2), ... and (2) a sink agent reads stream t and puts it into a file called output.dat.

The code for the application is given below and is also in the test suite: multicore_test.py.

def test_single_process_multiple_sources():
    from source import source_function
    import random

    def g_0(s):
        # A simple source which outputs 1, 2, 3,.. on
        # stream s.
        def generate_sequence(state):
            return state+1, state+1

        # Return a thread object which takes 10 steps, and
        # sleeps for 0.1 seconds between successive steps, and
        # puts the next element of the sequence in stream s,
        # and starts the sequence with value 0.
        
        return source_to_stream(
            func=generate_sequence, stream=s,
            time_interval=0.1, num_steps=10, state=0)

    def g_1(s):
        # A simple source which outputs random numbers
        # stream s.

        # Return a thread object which takes 12 steps, and
        # sleeps for 0.05 seconds between successive steps, and
        # puts the next element of the sequence in stream s.
        return source_to_stream(
            func=random.random, stream=s,
            time_interval=0.05, num_steps=12)

    def h(list_of_two_streams):
        # A trivial example of a composition of agents consisting
        # of two agents where the composition has a single input
        # stream: s.
        # The first agent zips the two input streams and puts
        # the result on stream t.
        # The second agent puts values in its input stream t
        # on a file called output.dat.
        from sink import stream_to_file
        t = Stream()
        zip_stream(in_streams=list_of_two_streams, out_stream=t)
        stream_to_file(in_stream=t, filename='output.dat')

    # Create a process with three threads: two source threads and
    # a compute thread. The source threads execute the functions
    # g_0 and g_1, and the compute thread executes function h.
    proc = make_process(
       compute_func=h,
       in_stream_names=['sequence', 'random'],
       out_stream_names=['out'],
       connect_sources=[('sequence', g_0), ('random', g_1)],
       connect_actuators=[])
   run_multiprocess(processes=[proc], connections=[])

    # You can replace the last two statements with:
    # single_process_multiple_sources(
    #   list_source_func=[g_0, g_1], compute_func=h)

Twitter Source

see IoTPy/examples/Twitter/twitter.py

For brevity we don't show the three steps of application development, which have been described in detail for the previous two examples. Exactly as in the previous two examples, you can use a combination of twitter_to_stream and single_process_single_source. You can also use twitter_analysis which is simpler to use because it has already combined twitter_to_stream and single_process_single_source.

Using twitter_to_stream with single_process_single_source

The example creates a network of two agents, each running in its own thread. The source agent gets tweets from Twitter and puts the tweets on a stream. The other agent, which is a sink, merely prints the tweets.

Note: If you don't have valid access tokens and consumer secrets, the tweets may be a sequence of "401" errors.

def test():
    # Variables that contain the user credentials to access Twitter API 
    access_token = "Your value here"
    access_token_secret = "Your value here"
    consumer_key = "Your value here"
    consumer_secret = "Your value here"

    # Define the compute function.
    # In this example, the compute function consists of a single
    # sink agent which prints values.
    def g(s):
        def h(v):
            if 'text' in v: print v['text']
        sink_element(func=h, in_stream=s)

    # Define the source
    # This source puts 10 tweets that include the word 'Trump'
    # to stream s.
    def f(s):
        return twitter_to_stream(
            consumer_key, consumer_secret,
            access_token, access_token_secret,
            trackwords=['Trump'], source_stream=s, num_steps=10)

    # Use the encapsulator to connect the source and compute
    # functions
    single_process_single_source(
        source_func=f, compute_func=g)

Using twitter_analysis

You can also use twitter_analysis which is simpler than twitter_to_stream. twitter_analysis calls twitter_to_stream and single_process_single_source so that you don't have to.

twitter_analysis(
        consumer_key, consumer_secret,
        access_token, access_token_secret,
        trackwords, compute_func, num_steps=10)

Here compute_func is a function (rather than an agent) that operates on a single tweet. For example:

def simple_test():
    # The function that carries out a computation on a single tweet.
    def h(tweet):
        if 'text' in tweet: print tweet['text']
        
    # Variables that contain the user credentials to access Twitter API 
    access_token = "Your value here"
    access_token_secret = "Your value here"
    consumer_key = "Your value here"
    consumer_secret = "Your value here"
    # trackwords is the list of words that you want to track on Twitter.
    trackwords=['Trump']
    # Create the network of agents that gets tweets from Twitter
    # on one thread and carries out a computation on each tweet
    # in another thread.
    twitter_analysis(
        consumer_key, consumer_secret,
        access_token, access_token_secret,
        trackwords, compute_func=h, num_steps=10)

In this example, compute_func h merely prints each tweet.

NTP Source

We use exactly the same steps for getting timing data from NTP sources and then carrying out computations on the data:

  1. Specify a function f for extracting data from a source.

  2. Specify the source (function g in these examples) by using the encapsulator source_function to encapsulate f.

  3. Specify the composition of agents (h in these examples) which takes in streams obtained from sources and carries out computations on them.

  4. Use the wrapper single_process_single_source or ingle_process_multiple_sources to create a network of agents executing on multiple threads.

The first example prints the clock drift obtained from an NTP server.

Step 1

The function f for extracting data from the source is clock_drift_from_ntp_server which calls an NTP server and gets a single value of the clock drift.

import time
import ntplib
import logging
# If an ntp service is unavailable then this logged in the log file
# 'ntp_service_operation.txt'
logging.basicConfig(
    filename='ntp_service_operation.txt', level=logging.DEBUG)  
ntp_client = ntplib.NTPClient()
ntp_server = "Put the name of an ntp server here"
// An example of a name of an ntp server is "0.us.pool.ntp.org"
def clock_drift_from_ntp_server(ntp_server):
    """
    Returns the system clock drift computed from the specified
    ntp_server 

    Parameters
    ----------
    ntp_server: str
        The name of an ntp server.

    Returns
    -------
    response.offset: float
       system clock drift

    filenames
    ---------
    'ntp_service_operation.txt': logging error file

    """
    try:
        # response.offset is the system clock offset from ntp time.
        response = ntp_client.request(ntp_server, version=3)
        return response.offset
    except:
        logging.warning('No response from NTP server: %s',ntp_server)
        return None

Step 2

Specify the source using the encapsulator source_function to encapsulate the function clock_drift_from_ntp_server. Since clock_drift_from_ntp_server has a parameter ntp_server, it is also a keyword parameter in the call to source_function: see the last line.

def g(out_stream):
    return source_function(
        func=clock_drift_from_ntp_server,
        out_stream=out_stream,
        time_interval=0.1, num_steps=3,
        # Specify keyword arguments of the
        # encapsulated function
        ntp_server=ntp_server
        )

Step 3

The network of agents, h, consists of a single sink agent that merely prints its input stream. We create the sink agent by using the encapsulator sink_element to encapsulate the (terminating) function print_output.

# This is the network of agents that operates on the data
# acquired from the source. This network consists of a single
# sink agent. Create the sink agent by using the encapsulator
# sink_element to encapsulate terminating function
# print_output.
def h(in_stream):
    def print_output(v): print v
    sink_element(func=print_output, in_stream=in_stream)

Step 4

Finally, we create the network of agents executing in two threads, one for the source and one for the computation.

single_process_single_source(source_func=g, compute_func=h)

Another NTP Example

This example has a source that looks up multiple NTP services and returns the offset from the first one that responds. The steps are the same as in the previous example and only step 1, function f, changes. Step 2 is the same as in the previous example except that the keyword argument used in function f in this example is list_of_ntp_servers rather than ntp_server

def clock_drift_from_first_ntp_server(list_of_ntp_servers):
    """
    Returns the system clock drift computed from the first functioning
    ntp_server in list_of_ntp_servers. 

    Parameters
    ----------
    list_of_ntp_servers: list
        List of names of ntp servers which are called in order.

    Returns
    -------
    response.offset: float
       system clock drift

    filenames
    ---------
    'ntp_service_operation.txt': logging error file

    """
    for ntp_server in list_of_ntp_servers:
        drift = clock_drift_from_ntp_server(ntp_server)
        if drift:
            return drift
    # None of the ntp servers in the list returned values.
    return 0.0