Commit abcebb96 authored by Radek Hušek's avatar Radek Hušek Committed by Radek Hušek
Browse files

parmap: decouple producer into separate thread

parent f20caa71
......@@ -3,9 +3,20 @@
import multiprocessing
def fun(f, q_in, q_out):
def producer_fun(X, q_in, q_control, nprocs):
sent = 0
for i, x in enumerate(X):
q_in.put((i, x))
sent += 1
for _ in range(nprocs):
q_in.put((None, None))
q_control.put((None, sent))
def worker_fun(f, q_in, q_out):
while True:
i,x = q_in.get()
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
......@@ -17,20 +28,34 @@ def parmap(f, X, nprocs = None):
q_in = multiprocessing.Queue(100)
q_out = multiprocessing.Queue()
proc = [multiprocessing.Process(target=fun,args=(f,q_in,q_out)) for _ in range(nprocs)]
proc = [ multiprocessing.Process(
target = worker_fun, args = (f, q_in, q_out)
) for _ in range(nprocs)]
proc.append(multiprocessing.Process(
target = producer_fun,
args = (X, q_in, q_out, nprocs)
))
for p in proc:
p.daemon = True
p.start()
sent = [q_in.put((i,x)) for i,x in enumerate(X)]
[q_in.put((None,None)) for _ in range(nprocs)]
ret = {}
for _ in range(len(sent)):
jobs = None
while True:
i, val = q_out.get()
if i is None:
jobs = val
break
ret[i] = val
[ p.join() for p in proc ]
for _ in range(jobs - len(ret)):
i, val = q_out.get()
ret[i] = val
for p in proc:
p.join()
return [ ret[i] for i in range(len(sent)) ]
return [ ret[i] for i in range(len(ret)) ]
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment