Commit c6046692 authored by Radek Hušek's avatar Radek Hušek Committed by Radek Hušek
Browse files

parmap: simplify generator termination

parent b8f62c17
......@@ -14,24 +14,21 @@ def chunkme(X, chunksize):
if len(chunk):
yield chunk
def producer_fun(X, q_in, q_out, cont, nprocs, chunksize):
sent = 0
def producer_fun(X, q_in, cont, nprocs, chunksize):
for i, x in enumerate(chunkme(X, chunksize)):
cont.acquire()
q_in.put((i, x))
sent += 1
for _ in range(nprocs):
q_in.put((None, None))
q_out.put((None, sent))
def worker_fun(f, q_in, q_out):
while True:
i, chunk = q_in.get()
if i is None:
break
q_out.put((i, [ f(x) for x in chunk ]))
while True:
i, chunk = q_in.get()
if i is None:
q_out.put((None, None))
break
q_out.put((i, [ f(x) for x in chunk ]))
def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None,
inOrder = True):
......@@ -53,7 +50,7 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None,
proc.append(multiprocessing.Process(
target = producer_fun,
args = (X, q_in, q_out, cont, nprocs, chunksize)
args = (X, q_in, cont, nprocs, chunksize)
))
for p in proc:
......@@ -63,12 +60,12 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None,
def get_chunk():
ret = {}
chunk_index = 0
jobs = None
running_workers = nprocs
while jobs is None or chunk_index < jobs:
while running_workers > 0:
i, val = q_out.get()
if i is None:
jobs = val
running_workers -= 1
continue
if not inOrder:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment