Skip to content
Snippets Groups Projects
Commit 62d5e6f1 authored by Radek Hušek's avatar Radek Hušek Committed by Radek Hušek
Browse files

parmap: add multimap & out_chunksize options

parent c6046692
No related branches found
No related tags found
No related merge requests found
# modified version of code written by klaus se from # based on code written by klaus se from
# http://stackoverflow.com/a/16071616 # http://stackoverflow.com/a/16071616
# but now almost completly rewritten
import multiprocessing import multiprocessing
from itertools import chain from itertools import chain
...@@ -22,19 +23,36 @@ def producer_fun(X, q_in, cont, nprocs, chunksize): ...@@ -22,19 +23,36 @@ def producer_fun(X, q_in, cont, nprocs, chunksize):
for _ in range(nprocs): for _ in range(nprocs):
q_in.put((None, None)) q_in.put((None, None))
def worker_fun(f, q_in, q_out): def worker_fun(f, q_in, q_out, multimap, chunksize):
while True: while True:
i, chunk = q_in.get() i, chunk = q_in.get()
if i is None: if i is None:
q_out.put((None, None)) q_out.put((None, None))
break break
q_out.put((i, [ f(x) for x in chunk ]))
ret = (f(x) for x in chunk)
if multimap:
ret = chain.from_iterable(ret)
if chunksize is not None:
for x in chunkme(ret, chunksize):
q_out.put((i, x))
q_out.put((i, None))
else:
q_out.put((i, list(ret)))
def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None,
inOrder = True): inOrder = True, multimap = False, out_chunksize = None):
if nprocs is None: if nprocs is None:
nprocs = multiprocessing.cpu_count() nprocs = multiprocessing.cpu_count()
if inOrder:
out_chunksize = None
if out_chunksize is not None:
out_chunksize = max(out_chunksize, 1)
if chunks_in_flight is None: if chunks_in_flight is None:
chunks_in_flight = 10 + 3 * nprocs chunks_in_flight = 10 + 3 * nprocs
...@@ -45,7 +63,8 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, ...@@ -45,7 +63,8 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None,
q_out = multiprocessing.Queue() q_out = multiprocessing.Queue()
proc = [ multiprocessing.Process( proc = [ multiprocessing.Process(
target = worker_fun, args = (f, q_in, q_out) target = worker_fun,
args = (f, q_in, q_out, multimap, out_chunksize)
) for _ in range(nprocs)] ) for _ in range(nprocs)]
proc.append(multiprocessing.Process( proc.append(multiprocessing.Process(
...@@ -69,8 +88,11 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, ...@@ -69,8 +88,11 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None,
continue continue
if not inOrder: if not inOrder:
chunk_index += 1 if out_chunksize is None:
cont.release() cont.release()
if val is None:
cont.release()
continue
yield val yield val
continue continue
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment