Skip to content
Snippets Groups Projects
Commit 8be21bb1 authored by Radek Hušek's avatar Radek Hušek Committed by Radek Hušek
Browse files

parmap: add chunksize option

No measurable difference on my laptop :-(
parent abcebb96
No related branches found
No related tags found
No related merge requests found
...@@ -139,7 +139,7 @@ def subdivisionIterator(G, edges = None): ...@@ -139,7 +139,7 @@ def subdivisionIterator(G, edges = None):
def generateSubdivisions(file, G, edges = None, def generateSubdivisions(file, G, edges = None,
function = lambda H: H.graph6_string(), function = lambda H: H.graph6_string(),
parallel = 0): parallel = 0, chunksize = 1):
f = open(file, 'w+') f = open(file, 'w+')
f.write("# Subdivisions of edges '%s' of graph '%s' (%s)\n" % f.write("# Subdivisions of edges '%s' of graph '%s' (%s)\n" %
(edges, G.graph6_string(), G)) (edges, G.graph6_string(), G))
...@@ -147,7 +147,8 @@ def generateSubdivisions(file, G, edges = None, ...@@ -147,7 +147,8 @@ def generateSubdivisions(file, G, edges = None,
if parallel == 0: if parallel == 0:
iterator = map(function, subdivisionIterator(G, edges)) iterator = map(function, subdivisionIterator(G, edges))
else: else:
iterator = parmap(function, subdivisionIterator(G, edges), parallel) iterator = parmap(function, subdivisionIterator(G, edges),
nprocs = parallel, chunksize = chunksize)
for line in iterator: for line in iterator:
f.write(line) f.write(line)
......
...@@ -2,10 +2,21 @@ ...@@ -2,10 +2,21 @@
# http://stackoverflow.com/a/16071616 # http://stackoverflow.com/a/16071616
import multiprocessing import multiprocessing
from itertools import chain
def producer_fun(X, q_in, q_control, nprocs): def chunkme(X, chunksize):
chunk = []
for x in X:
if len(chunk) >= chunksize:
yield chunk
chunk = []
chunk.append(x)
if len(chunk):
yield chunk
def producer_fun(X, q_in, q_control, nprocs, chunksize):
sent = 0 sent = 0
for i, x in enumerate(X): for i, x in enumerate(chunkme(X, chunksize)):
q_in.put((i, x)) q_in.put((i, x))
sent += 1 sent += 1
...@@ -16,12 +27,12 @@ def producer_fun(X, q_in, q_control, nprocs): ...@@ -16,12 +27,12 @@ def producer_fun(X, q_in, q_control, nprocs):
def worker_fun(f, q_in, q_out): def worker_fun(f, q_in, q_out):
while True: while True:
i, x = q_in.get() i, chunk = q_in.get()
if i is None: if i is None:
break break
q_out.put((i, f(x))) q_out.put((i, [ f(x) for x in chunk ]))
def parmap(f, X, nprocs = None): def parmap(f, X, nprocs = None, chunksize = 1):
if nprocs is None: if nprocs is None:
nprocs = multiprocessing.cpu_count() nprocs = multiprocessing.cpu_count()
...@@ -34,7 +45,7 @@ def parmap(f, X, nprocs = None): ...@@ -34,7 +45,7 @@ def parmap(f, X, nprocs = None):
proc.append(multiprocessing.Process( proc.append(multiprocessing.Process(
target = producer_fun, target = producer_fun,
args = (X, q_in, q_out, nprocs) args = (X, q_in, q_out, nprocs, chunksize)
)) ))
for p in proc: for p in proc:
...@@ -57,5 +68,5 @@ def parmap(f, X, nprocs = None): ...@@ -57,5 +68,5 @@ def parmap(f, X, nprocs = None):
for p in proc: for p in proc:
p.join() p.join()
return [ ret[i] for i in range(len(ret)) ] return chain.from_iterable([ ret[i] for i in range(len(ret)) ])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment