diff --git a/parmap.py b/parmap.py index a19c36d88d12588b4298a5c0e24b5179f2bd921e..31e0cf8652294aa44a5ad1eef0dfd6e95440544f 100644 --- a/parmap.py +++ b/parmap.py @@ -14,24 +14,21 @@ def chunkme(X, chunksize): if len(chunk): yield chunk -def producer_fun(X, q_in, q_out, cont, nprocs, chunksize): - sent = 0 +def producer_fun(X, q_in, cont, nprocs, chunksize): for i, x in enumerate(chunkme(X, chunksize)): cont.acquire() q_in.put((i, x)) - sent += 1 for _ in range(nprocs): q_in.put((None, None)) - q_out.put((None, sent)) - def worker_fun(f, q_in, q_out): - while True: - i, chunk = q_in.get() - if i is None: - break - q_out.put((i, [ f(x) for x in chunk ])) + while True: + i, chunk = q_in.get() + if i is None: + q_out.put((None, None)) + break + q_out.put((i, [ f(x) for x in chunk ])) def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, inOrder = True): @@ -53,7 +50,7 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, proc.append(multiprocessing.Process( target = producer_fun, - args = (X, q_in, q_out, cont, nprocs, chunksize) + args = (X, q_in, cont, nprocs, chunksize) )) for p in proc: @@ -63,12 +60,12 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, def get_chunk(): ret = {} chunk_index = 0 - jobs = None + running_workers = nprocs - while jobs is None or chunk_index < jobs: + while running_workers > 0: i, val = q_out.get() if i is None: - jobs = val + running_workers -= 1 continue if not inOrder: