diff --git a/parmap.py b/parmap.py index 7b266f6dc4a29c43ed4be2787d812ac9c6759793..f5ae309ce4b457f54358a520b7c0996613ce0e15 100644 --- a/parmap.py +++ b/parmap.py @@ -6,6 +6,7 @@ import multiprocessing from itertools import chain def chunkme(X, chunksize): + """Return items generated by X in chunks (lists) of size chunksize.""" chunk = [] for x in X: chunk.append(x) @@ -16,6 +17,7 @@ def chunkme(X, chunksize): yield chunk def producer_fun(X, q_in, cont, nprocs, chunksize): + """parmap internal helper""" for i, x in enumerate(chunkme(X, chunksize)): cont.acquire() q_in.put((i, x)) @@ -24,6 +26,7 @@ def producer_fun(X, q_in, cont, nprocs, chunksize): q_in.put((None, None)) def worker_fun(f, q_in, q_out, multimap, chunksize): + """parmap internal helper""" while True: i, chunk = q_in.get() if i is None: @@ -44,6 +47,22 @@ def worker_fun(f, q_in, q_out, multimap, chunksize): def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, in_order = True, multimap = False, out_chunksize = None): + """Implements parallel version of map. + + Spawns producer and (multiple) worker threads. Works as generator + calculating new values as needed (see below for fine tuning of ahead + computation). Prevents memory exhaustion in case of fast workers + and slow consumption of generated values. + + f -- function to map on each element of X + X -- generator + nprocs -- number of worker threads to spawn + chunksize -- number of items retrieved by worker from queue at once + chunks_in_flight -- maximal number of chunks being processed at once + in_order -- if True, input sequence may be reordered during processing + multimap -- if True, f is expected to return an Iterable and flattening is done + out_chunksize -- size of chunks send from workers to consumer + """ if nprocs is None: nprocs = multiprocessing.cpu_count()