Getting Started

Installing

You can install the library through pip

pip install streampie

or head over to the github repository streampie. The whole library is contained in a single file.

Streams

Streams (Stream) are the basic class of the library. Each stream can be thought of as a class that takes an element from the input iterator, performs some work on the element, and then passes it on to the next stream in line. When working with streams, we overload the >> operator as it intuitively captures the notion of “passing on” the results from one stream into another.

Let’s first start by including the library

In [1]: from streampie import *

To illustrate the concept of streams and processors, let’s take the following example

In [2]: [0, 1, 2, 3] >> take(2) >> list
Out[2]: [0, 1]

In this example, we took our list and passed it to the take processor that took the first two elements of the list and discarded the rest. The final outcome was then converted to a plain list. Another common task in stream processing is splitting inputs into equally-sized chunks. For that purpose we can use chop.

In [3]: range(4) >> chop(2) >> list
Out[3]: [[0, 1], [2, 3]]

We are not limited to a single processor; we can chain arbitrarily many blocks

In [4]: range(4) >> chop(2) >> take(2) >> list
Out[4]: [[0, 1], [2, 3]]

For a full list of processors, see streampie. To illustrate where Streampie is useful, let’s consider two examples that naturally benefit from paralellism.

URL Retrieval

Retrieving URLs is not a CPU-intensive task. To retrieve URLs in parallel (with four threads), we can utilize the ThreadPool to code the following program

import urllib2
from streampie import *

URLs = [
   "http://www.cnn.com/",
   "http://www.bbc.co.uk/",
   "http://www.economist.com/",
   "http://nonexistant.website.at.baddomain/",
   "http://slashdot.org/",
   "http://reddit.com/",
   "http://news.ycombinator.com/",
]

def retrieve(wid, items):
   for url in items:
      yield url, urllib2.urlopen(url).read()

for url, content in URLs >> ThreadPool(retrieve, poolsize=4):
   print url, len(content)

Integer Factorization

The second example is integer factorization, which is CPU intensive. Running a ThreadPool would not result in large performance gains due to the python global lock. However, we can use ProcessPool. Let’s first look at a simple, iterative solution.

# A set of integers, each a product of two primes
ints = [2498834631017, 14536621517459, 6528633441793, 1941760544137, 7311548077279, 
        8567757849149, 5012823744127, 806981130983, 15687248010773, 7750678781801, 
        2703878052163, 3581512537619, 12656415588017, 468180585877, 19268446801283, 
        5719647740869, 11493581481859, 366611086739]

def factor(n):
   """
      Integer factorization.
   """
   result = set()
   for i in range(1, int(n ** 0.5) + 1):
      div, mod = divmod(n, i)
      if mod == 0:
         result |= {i, div}
   return sorted(list(result))[:-1]

print map(factor, ints)

The program just iterates over the list of composite integers (each integer is a product of two primes). We can re-code the example in the following way.

from streampie import *

ints = [2498834631017, 14536621517459, 6528633441793, 1941760544137, 7311548077279, 
        8567757849149, 5012823744127, 806981130983, 15687248010773, 7750678781801, 
        2703878052163, 3581512537619, 12656415588017, 468180585877, 19268446801283, 
        5719647740869, 11493581481859, 366611086739]

def factor(n):
   result = set()
   for i in range(1, int(n ** 0.5) + 1):
      div, mod = divmod(n, i)
      if mod == 0:
         result |= {i, div}
   return sorted(list(result))[:-1]

def do_work(wid, items):
   for i in items:
      yield factor(i)

print ints >> ProcessPool(do_work, poolsize=8) >> list

We now use 8 parallel local processes, and the task of factoring the numbers will be ~8 times as fast. But what if we want to compute the same task on a small cluster (e.g., two machines)? For that purpose, we can use the DistributedPool.

from streampie import *

ints = [2498834631017, 14536621517459, 6528633441793, 1941760544137, 7311548077279, 
        8567757849149, 5012823744127, 806981130983, 15687248010773, 7750678781801, 
        2703878052163, 3581512537619, 12656415588017, 468180585877, 19268446801283, 
        5719647740869, 11493581481859, 366611086739]

def factor(n):
   result = set()
   for i in range(1, int(n ** 0.5) + 1):
      div, mod = divmod(n, i)
      if mod == 0:
         result |= {i, div}
   return sorted(list(result))[:-1]

print ints >> DistributedPool(factor) >> list

This code will now wait for workers to perform the job. We can start a single-process worker with

python streampie.py