Skip to content
Snippets Groups Projects
Commit cd4ed502 authored by Radek Hušek's avatar Radek Hušek Committed by Radek Hušek
Browse files

parmap: add queue_size option

Exposes another parameter for fine tuning parmap.
parent 8be21bb1
No related branches found
No related tags found
No related merge requests found
...@@ -139,7 +139,7 @@ def subdivisionIterator(G, edges = None): ...@@ -139,7 +139,7 @@ def subdivisionIterator(G, edges = None):
def generateSubdivisions(file, G, edges = None, def generateSubdivisions(file, G, edges = None,
function = lambda H: H.graph6_string(), function = lambda H: H.graph6_string(),
parallel = 0, chunksize = 1): parallel = 0, chunksize = 1, q_size = 3):
f = open(file, 'w+') f = open(file, 'w+')
f.write("# Subdivisions of edges '%s' of graph '%s' (%s)\n" % f.write("# Subdivisions of edges '%s' of graph '%s' (%s)\n" %
(edges, G.graph6_string(), G)) (edges, G.graph6_string(), G))
...@@ -148,7 +148,7 @@ def generateSubdivisions(file, G, edges = None, ...@@ -148,7 +148,7 @@ def generateSubdivisions(file, G, edges = None,
iterator = map(function, subdivisionIterator(G, edges)) iterator = map(function, subdivisionIterator(G, edges))
else: else:
iterator = parmap(function, subdivisionIterator(G, edges), iterator = parmap(function, subdivisionIterator(G, edges),
nprocs = parallel, chunksize = chunksize) nprocs = parallel, chunksize = chunksize, queue_size = q_size)
for line in iterator: for line in iterator:
f.write(line) f.write(line)
......
...@@ -32,11 +32,11 @@ def worker_fun(f, q_in, q_out): ...@@ -32,11 +32,11 @@ def worker_fun(f, q_in, q_out):
break break
q_out.put((i, [ f(x) for x in chunk ])) q_out.put((i, [ f(x) for x in chunk ]))
def parmap(f, X, nprocs = None, chunksize = 1): def parmap(f, X, nprocs = None, chunksize = 1, queue_size = 3):
if nprocs is None: if nprocs is None:
nprocs = multiprocessing.cpu_count() nprocs = multiprocessing.cpu_count()
q_in = multiprocessing.Queue(100) q_in = multiprocessing.Queue(queue_size)
q_out = multiprocessing.Queue() q_out = multiprocessing.Queue()
proc = [ multiprocessing.Process( proc = [ multiprocessing.Process(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment