Streampie Module

class Stream(obj=None)[source]

This is our generic stream class. It is iterable and it overloads the >> operator for convenience.

next()[source]
class take(n)[source]

Take the first n elements, and drop the rest.

>>> range(4) >> take(2) >> list
[0, 1]
class takei(indices)[source]

Take only the elements whose indices are given in the list.

>>> range(4) >> takei([0, 1]) >> list
[0, 1]
class drop(n)[source]

Drop the first n elements, and take the rest.

>>> range(4) >> drop(2) >> list
[2, 3]
class dropi(indices)[source]

Drop only the elements whose indices are given in the indices list.

>>> range(4) >> dropi([0, 1]) >> list
[2, 3]
class chop(n)[source]

Split the stream into n-sized chunks.

>>> range(4) >> chop(2) >> list
[[0, 1], [2, 3]]
class map(function)[source]

Call the function func for every element, with the element as input.

>>> square = lambda x: x**2
>>> range(4) >> map(square) >> list
[0, 1, 4, 9]
class filter(function)[source]

Return only the elements for which the predicate func evaluates to True.

>>> even = lambda x: x % 2 == 0
>>> range(4) >> filter(even) >> list
[0, 2]
class apply(function)[source]

Call the function func for every element, with the element as arguments.

>>> sum = lambda x,y: x+y
>>> range(4) >> chop(2) >> apply(sum) >> list
[1, 5]
class takewhile(predicate)[source]

Keep taking elements until the predicate func is True, then stop.

>>> range(4) >> takewhile(lambda x: x < 3) >> list
[0, 1, 2]
class dropwhile(predicate)[source]

Keep dropping elements until the predicate func is True, then stop.

>>> range(4) >> dropwhile(lambda x: x < 3) >> list
[3]
class prepend(prep_iterator)[source]

Prepend elements to a stream.

>>> range(4) >> prepend([10, 9]) >> list
[10, 9, 0, 1, 2, 3]
class flatten(obj=None)[source]

Flatten an arbitrarily-deep list of lists into a single list.

>>> [0,[1,[2,[3]]]] >> flatten() >> list
[0, 1, 2, 3]
class LocalPool(function, poolsize=None, args=[])[source]

A generic class shared by all local (executed on the same machine) pools.

stop()[source]

Terminate and wait for all workers to finish.

class ProcessPool(function, poolsize=None, args=[])[source]

Create a process pool.

Parameters:
  • function (int) – Function that each worker executes
  • poolsize (int) – How many workers the pool should make
  • args (list) – List of arguments to pass to the worker function

A simple that calls the sum function for every pair of inputs.

>>> def sum(wid, items):
...   # wid is the worker id
...   # items is an iterator for the inputs to the stream
...   for x, y in items:
...      yield x + y
>>> range(6) >> chop(2) >> ProcessPool(sum) >> list 
[1, 5, 9]

Note that the order of the output list is not guaranteed, as it depends in which order the elements were consumed. By default, the class creates as many workers as there are cores. Here is a more advanced examples showing poolsize control and passing additional arguments.

>>> def sum(wid, items, arg1, arg2):
...   # arg1 and arg2 are additional arguments passed to the function
...   for x, y in items:
...     yield x + y
>>> sorted(range(6) >> chop(2) >> ProcessPool(sum, poolsize=8, args=[0, 1]) >> list)
[1, 5, 9]

The function can yield arbitrarily many results. For example, for a single input, two or more yields can be made.

>>> def sum(wid, items):
...   for x, y in items:
...     yield x + y
...     yield x + y
>>> sorted(range(6) >> chop(2) >> ProcessPool(sum) >> list)
[1, 1, 5, 5, 9, 9]
class ThreadPool(function, poolsize=None, args=[])[source]

Create a thread pool.

Parameters:
  • function (int) – Function that each worker executes
  • poolsize (int) – How many workers the pool should make
  • args (list) – List of arguments to pass to the worker function
>>> def sum(wid, items):
...   # wid is the worker id
...   # items is an iterator for the inputs to the stream
...   for x, y in items:
...      yield x + y
>>> range(6) >> chop(2) >> ThreadPool(sum) >> list 
[1, 5, 9]
class StandaloneProcessPool(function, poolsize=None, args=[])[source]

The standalone process pool is exactly like the ProcessPool class, other than the fact that it does not take any input, but constantly yields output.

Parameters:
  • function (int) – Function that each worker executes
  • poolsize (int) – How many workers the pool should make
  • args (list) – List of arguments to pass to the worker function

To illustrate, here is an example of a worker that constantly returns random numbers. Since there is no input stream, the pool needs to be manually terminated.

>>> import random
>>> def do_work(wid):
...   yield random.random()
>>> pool = StandaloneProcessPool(do_work)
>>> for x, r in enumerate(pool): 
...   if x == 2:
...      pool.stop()
...      break
...   print r
0.600151963181
0.144348185086
class Job(target_id, args=[])[source]

This class is our unit of work. It it fetched by a Worker, it’s target is executed, the result (ret) and exception (if any) is stored and sent back to the JobQueue.

Parameters:
  • target_id (int) – ID of the code to execute. See the source of JobQueue.enqueue for details.
  • args (list) – List of arguments to pass to the worker function
class Worker(host='localhost', port=6379, db=10)[source]

The workhorse of our implementation. Each worker fetches a job from Redis, executes it, then stores the results back into Redis.

Parameters:
  • host (str) – Redis hostname
  • port (int) – Redis port
  • db (int) – Redis database number
run()[source]

In an infinite loop, wait for jobs, then execute them and return the results to Redis.

class JobQueue(host='localhost', port=6379, db=10)[source]

Warning

The JobQueue flushes the selected Redis database! Be sure to specify an unused database!

The queue that allows submission and fetching of completed jobs.

Parameters:
  • host (str) – Redis hostname
  • port (int) – Redis port
  • db (int) – Redis database number

That being said, here is an example of how to use the queue.

>>> def sum(x, y):
...   return x + y
>>> q = JobQueue()
>>> q.enqueue(sum, (1, 2)) 
>>> q.enqueue(sum, (2, 3)) 
>>> q.enqueue(sum, (3, 4)) 
>>> q.finalize()
>>> for r in q:   
...   print r.ret
3
5
7
next()[source]
enqueue(target, args)[source]

Add a job to the queue.

Parameters:
  • target (function) – Function to be executed
  • args (list) – Arguments provided to the job
finalize()[source]

Indicate to the queue that no more jobs will be submitted.

class DistributedPool(function, host='localhost', port=6379, db=10)[source]

The distributed pool is a simple wrapper around the JobQueue that makes is even more convenient to use, just like ProcessPool and ThreadPool.

Parameters:
  • host (str) – Redis hostname
  • port (int) – Redis port
  • db (int) – Redis database number

First, on one machine let’s start a single worker.

python streampie.py

We then execute:

>>> def mul(x, y):
...   return x * y
>>> range(4) >> chop(2) >> DistributedPool(mul) >> list 
[0, 6]
stop()[source]

Currently not implemented. Is it even needed?