From c6046692d1c9e0d59ba69f299410463e383befa6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Radek=20Hu=C5=A1ek?= <PitelVonSacek@gmail.com> Date: Sun, 13 Dec 2015 12:07:09 +0100 Subject: [PATCH] parmap: simplify generator termination --- parmap.py | 25 +++++++++++-------------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/parmap.py b/parmap.py index a19c36d..31e0cf8 100644 --- a/parmap.py +++ b/parmap.py @@ -14,24 +14,21 @@ def chunkme(X, chunksize): if len(chunk): yield chunk -def producer_fun(X, q_in, q_out, cont, nprocs, chunksize): - sent = 0 +def producer_fun(X, q_in, cont, nprocs, chunksize): for i, x in enumerate(chunkme(X, chunksize)): cont.acquire() q_in.put((i, x)) - sent += 1 for _ in range(nprocs): q_in.put((None, None)) - q_out.put((None, sent)) - def worker_fun(f, q_in, q_out): - while True: - i, chunk = q_in.get() - if i is None: - break - q_out.put((i, [ f(x) for x in chunk ])) + while True: + i, chunk = q_in.get() + if i is None: + q_out.put((None, None)) + break + q_out.put((i, [ f(x) for x in chunk ])) def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, inOrder = True): @@ -53,7 +50,7 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, proc.append(multiprocessing.Process( target = producer_fun, - args = (X, q_in, q_out, cont, nprocs, chunksize) + args = (X, q_in, cont, nprocs, chunksize) )) for p in proc: @@ -63,12 +60,12 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None, def get_chunk(): ret = {} chunk_index = 0 - jobs = None + running_workers = nprocs - while jobs is None or chunk_index < jobs: + while running_workers > 0: i, val = q_out.get() if i is None: - jobs = val + running_workers -= 1 continue if not inOrder: -- GitLab