Streampie Module¶
-
class
Stream
(obj=None)[source]¶ This is our generic stream class. It is iterable and it overloads the
>>
operator for convenience.
-
class
take
(n)[source]¶ Take the first
n
elements, and drop the rest.>>> range(4) >> take(2) >> list [0, 1]
-
class
takei
(indices)[source]¶ Take only the elements whose indices are given in the list.
>>> range(4) >> takei([0, 1]) >> list [0, 1]
-
class
drop
(n)[source]¶ Drop the first n elements, and take the rest.
>>> range(4) >> drop(2) >> list [2, 3]
-
class
dropi
(indices)[source]¶ Drop only the elements whose indices are given in the
indices
list.>>> range(4) >> dropi([0, 1]) >> list [2, 3]
-
class
chop
(n)[source]¶ Split the stream into
n
-sized chunks.>>> range(4) >> chop(2) >> list [[0, 1], [2, 3]]
-
class
map
(function)[source]¶ Call the function
func
for every element, with the element as input.>>> square = lambda x: x**2 >>> range(4) >> map(square) >> list [0, 1, 4, 9]
-
class
filter
(function)[source]¶ Return only the elements for which the predicate
func
evaluates toTrue
.>>> even = lambda x: x % 2 == 0 >>> range(4) >> filter(even) >> list [0, 2]
-
class
apply
(function)[source]¶ Call the function
func
for every element, with the element as arguments.>>> sum = lambda x,y: x+y >>> range(4) >> chop(2) >> apply(sum) >> list [1, 5]
-
class
takewhile
(predicate)[source]¶ Keep taking elements until the predicate
func
isTrue
, then stop.>>> range(4) >> takewhile(lambda x: x < 3) >> list [0, 1, 2]
-
class
dropwhile
(predicate)[source]¶ Keep dropping elements until the predicate
func
isTrue
, then stop.>>> range(4) >> dropwhile(lambda x: x < 3) >> list [3]
-
class
prepend
(prep_iterator)[source]¶ Prepend elements to a stream.
>>> range(4) >> prepend([10, 9]) >> list [10, 9, 0, 1, 2, 3]
-
class
flatten
(obj=None)[source]¶ Flatten an arbitrarily-deep list of lists into a single list.
>>> [0,[1,[2,[3]]]] >> flatten() >> list [0, 1, 2, 3]
-
class
LocalPool
(function, poolsize=None, args=[])[source]¶ A generic class shared by all local (executed on the same machine) pools.
-
class
ProcessPool
(function, poolsize=None, args=[])[source]¶ Create a process pool.
Parameters: - function (int) – Function that each worker executes
- poolsize (int) – How many workers the pool should make
- args (list) – List of arguments to pass to the worker function
A simple that calls the
sum
function for every pair of inputs.>>> def sum(wid, items): ... # wid is the worker id ... # items is an iterator for the inputs to the stream ... for x, y in items: ... yield x + y >>> range(6) >> chop(2) >> ProcessPool(sum) >> list [1, 5, 9]
Note that the order of the output list is not guaranteed, as it depends in which order the elements were consumed. By default, the class creates as many workers as there are cores. Here is a more advanced examples showing
poolsize
control and passing additional arguments.>>> def sum(wid, items, arg1, arg2): ... # arg1 and arg2 are additional arguments passed to the function ... for x, y in items: ... yield x + y >>> sorted(range(6) >> chop(2) >> ProcessPool(sum, poolsize=8, args=[0, 1]) >> list) [1, 5, 9]
The function can yield arbitrarily many results. For example, for a single input, two or more yields can be made.
>>> def sum(wid, items): ... for x, y in items: ... yield x + y ... yield x + y >>> sorted(range(6) >> chop(2) >> ProcessPool(sum) >> list) [1, 1, 5, 5, 9, 9]
-
class
ThreadPool
(function, poolsize=None, args=[])[source]¶ Create a thread pool.
Parameters: - function (int) – Function that each worker executes
- poolsize (int) – How many workers the pool should make
- args (list) – List of arguments to pass to the worker function
>>> def sum(wid, items): ... # wid is the worker id ... # items is an iterator for the inputs to the stream ... for x, y in items: ... yield x + y >>> range(6) >> chop(2) >> ThreadPool(sum) >> list [1, 5, 9]
-
class
StandaloneProcessPool
(function, poolsize=None, args=[])[source]¶ The standalone process pool is exactly like the
ProcessPool
class, other than the fact that it does not take any input, but constantly yields output.Parameters: - function (int) – Function that each worker executes
- poolsize (int) – How many workers the pool should make
- args (list) – List of arguments to pass to the worker function
To illustrate, here is an example of a worker that constantly returns random numbers. Since there is no input stream, the pool needs to be manually terminated.
>>> import random >>> def do_work(wid): ... yield random.random() >>> pool = StandaloneProcessPool(do_work) >>> for x, r in enumerate(pool): ... if x == 2: ... pool.stop() ... break ... print r 0.600151963181 0.144348185086
-
class
Job
(target_id, args=[])[source]¶ This class is our unit of work. It it fetched by a
Worker
, it’starget
is executed, the result (ret
) and exception (if any) is stored and sent back to the JobQueue.Parameters: - target_id (int) – ID of the code to execute. See the source of
JobQueue.enqueue
for details. - args (list) – List of arguments to pass to the worker function
- target_id (int) – ID of the code to execute. See the source of
-
class
Worker
(host='localhost', port=6379, db=10)[source]¶ The workhorse of our implementation. Each worker fetches a job from Redis, executes it, then stores the results back into Redis.
Parameters: - host (str) – Redis hostname
- port (int) – Redis port
- db (int) – Redis database number
-
class
JobQueue
(host='localhost', port=6379, db=10)[source]¶ Warning
The
JobQueue
flushes the selected Redis database! Be sure to specify an unused database!The queue that allows submission and fetching of completed jobs.
Parameters: - host (str) – Redis hostname
- port (int) – Redis port
- db (int) – Redis database number
That being said, here is an example of how to use the queue.
>>> def sum(x, y): ... return x + y >>> q = JobQueue() >>> q.enqueue(sum, (1, 2)) >>> q.enqueue(sum, (2, 3)) >>> q.enqueue(sum, (3, 4)) >>> q.finalize() >>> for r in q: ... print r.ret 3 5 7
-
class
DistributedPool
(function, host='localhost', port=6379, db=10)[source]¶ The distributed pool is a simple wrapper around the
JobQueue
that makes is even more convenient to use, just likeProcessPool
andThreadPool
.Parameters: - host (str) – Redis hostname
- port (int) – Redis port
- db (int) – Redis database number
First, on one machine let’s start a single worker.
python streampie.py
We then execute:
>>> def mul(x, y): ... return x * y >>> range(4) >> chop(2) >> DistributedPool(mul) >> list [0, 6]