From 62d5e6f1af69c035467f73439112e960615e9fa8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Radek=20Hu=C5=A1ek?= <PitelVonSacek@gmail.com>
Date: Sun, 13 Dec 2015 13:28:32 +0100
Subject: [PATCH] parmap: add multimap & out_chunksize options

---
 parmap.py | 36 +++++++++++++++++++++++++++++-------
 1 file changed, 29 insertions(+), 7 deletions(-)

diff --git a/parmap.py b/parmap.py
index 31e0cf8..63d77f0 100644
--- a/parmap.py
+++ b/parmap.py
@@ -1,5 +1,6 @@
-# modified version of code written by klaus se from
+# based on code written by klaus se from
 # http://stackoverflow.com/a/16071616
+# but now almost completly rewritten
 
 import multiprocessing
 from itertools import chain
@@ -22,19 +23,36 @@ def producer_fun(X, q_in, cont, nprocs, chunksize):
   for _ in range(nprocs):
     q_in.put((None, None))
 
-def worker_fun(f, q_in, q_out):
+def worker_fun(f, q_in, q_out, multimap, chunksize):
   while True:
     i, chunk = q_in.get()
     if i is None:
       q_out.put((None, None))
       break
-    q_out.put((i, [ f(x) for x in chunk ]))
+
+    ret = (f(x) for x in chunk)
+    if multimap:
+      ret = chain.from_iterable(ret)
+
+    if chunksize is not None:
+      for x in chunkme(ret, chunksize):
+        q_out.put((i, x))
+      q_out.put((i, None))
+    else:
+      q_out.put((i, list(ret)))
+
 
 def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None,
-           inOrder = True):
+           inOrder = True, multimap = False, out_chunksize = None):
     if nprocs is None:
       nprocs = multiprocessing.cpu_count()
 
+    if inOrder:
+      out_chunksize = None
+
+    if out_chunksize is not None:
+      out_chunksize = max(out_chunksize, 1)
+
     if chunks_in_flight is None:
       chunks_in_flight = 10 + 3 * nprocs
 
@@ -45,7 +63,8 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None,
     q_out  = multiprocessing.Queue()
 
     proc = [ multiprocessing.Process(
-        target = worker_fun, args = (f, q_in, q_out)
+        target = worker_fun,
+        args = (f, q_in, q_out, multimap, out_chunksize)
       ) for _ in range(nprocs)]
 
     proc.append(multiprocessing.Process(
@@ -69,8 +88,11 @@ def parmap(f, X, nprocs = None, chunksize = 1, chunks_in_flight = None,
           continue
 
         if not inOrder:
-          chunk_index += 1
-          cont.release()
+          if out_chunksize is None:
+            cont.release()
+          if val is None:
+            cont.release()
+            continue
           yield val
           continue
 
-- 
GitLab