Commit de911b29 authored by Antoine Pitrou's avatar Antoine Pitrou

Issue #12708: Add starmap() and starmap_async() methods (similar to...

Issue #12708: Add starmap() and starmap_async() methods (similar to itertools.starmap()) to multiprocessing.Pool.
Patch by Hynek Schlawack.
parent 12f65d1f
...@@ -1669,6 +1669,24 @@ with the :class:`Pool` class. ...@@ -1669,6 +1669,24 @@ with the :class:`Pool` class.
returned iterator should be considered arbitrary. (Only when there is returned iterator should be considered arbitrary. (Only when there is
only one worker process is the order guaranteed to be "correct".) only one worker process is the order guaranteed to be "correct".)
.. method:: starmap(func, iterable[, chunksize])
Like :meth:`map` except that the elements of the `iterable` are expected
to be iterables that are unpacked as arguments.
Hence an `iterable` of `[(1,2), (3, 4)]` results in `[func(1,2),
func(3,4)]`.
.. versionadded:: 3.3
.. method:: starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
A combination of :meth:`starmap` and :meth:`map_async` that iterates over
`iterable` of iterables and calls `func` with the iterables unpacked.
Returns a result object.
.. versionadded:: 3.3
.. method:: close() .. method:: close()
Prevents any more tasks from being submitted to the pool. Once all the Prevents any more tasks from being submitted to the pool. Once all the
......
...@@ -1066,11 +1066,12 @@ ArrayProxy = MakeProxyType('ArrayProxy', ( ...@@ -1066,11 +1066,12 @@ ArrayProxy = MakeProxyType('ArrayProxy', (
PoolProxy = MakeProxyType('PoolProxy', ( PoolProxy = MakeProxyType('PoolProxy', (
'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join',
'map', 'map_async', 'terminate' 'map', 'map_async', 'starmap', 'starmap_async', 'terminate'
)) ))
PoolProxy._method_to_typeid_ = { PoolProxy._method_to_typeid_ = {
'apply_async': 'AsyncResult', 'apply_async': 'AsyncResult',
'map_async': 'AsyncResult', 'map_async': 'AsyncResult',
'starmap_async': 'AsyncResult',
'imap': 'Iterator', 'imap': 'Iterator',
'imap_unordered': 'Iterator' 'imap_unordered': 'Iterator'
} }
......
...@@ -64,6 +64,9 @@ job_counter = itertools.count() ...@@ -64,6 +64,9 @@ job_counter = itertools.count()
def mapstar(args): def mapstar(args):
return list(map(*args)) return list(map(*args))
def starmapstar(args):
return list(itertools.starmap(args[0], args[1]))
# #
# Code run by worker processes # Code run by worker processes
# #
...@@ -248,7 +251,25 @@ class Pool(object): ...@@ -248,7 +251,25 @@ class Pool(object):
in a list that is returned. in a list that is returned.
''' '''
assert self._state == RUN assert self._state == RUN
return self.map_async(func, iterable, chunksize).get() return self._map_async(func, iterable, mapstar, chunksize).get()
def starmap(self, func, iterable, chunksize=None):
'''
Like `map()` method but the elements of the `iterable` are expected to
be iterables as well and will be unpacked as arguments. Hence
`func` and (a, b) becomes func(a, b).
'''
assert self._state == RUN
return self._map_async(func, iterable, starmapstar, chunksize).get()
def starmap_async(self, func, iterable, chunksize=None, callback=None,
error_callback=None):
'''
Asynchronous version of `starmap()` method.
'''
assert self._state == RUN
return self._map_async(func, iterable, starmapstar, chunksize,
callback, error_callback)
def imap(self, func, iterable, chunksize=1): def imap(self, func, iterable, chunksize=1):
''' '''
...@@ -302,6 +323,13 @@ class Pool(object): ...@@ -302,6 +323,13 @@ class Pool(object):
Asynchronous version of `map()` method. Asynchronous version of `map()` method.
''' '''
assert self._state == RUN assert self._state == RUN
return self._map_async(func, iterable, mapstar, chunksize)
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
'''
Helper function to implement map, starmap and their async counterparts.
'''
if not hasattr(iterable, '__len__'): if not hasattr(iterable, '__len__'):
iterable = list(iterable) iterable = list(iterable)
...@@ -315,7 +343,7 @@ class Pool(object): ...@@ -315,7 +343,7 @@ class Pool(object):
task_batches = Pool._get_tasks(func, iterable, chunksize) task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback, result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback) error_callback=error_callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {}) self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None)) for i, x in enumerate(task_batches)), None))
return result return result
......
...@@ -8,6 +8,7 @@ import unittest ...@@ -8,6 +8,7 @@ import unittest
import queue as pyqueue import queue as pyqueue
import time import time
import io import io
import itertools
import sys import sys
import os import os
import gc import gc
...@@ -1125,6 +1126,9 @@ def sqr(x, wait=0.0): ...@@ -1125,6 +1126,9 @@ def sqr(x, wait=0.0):
time.sleep(wait) time.sleep(wait)
return x*x return x*x
def mul(x, y):
return x*y
class _TestPool(BaseTestCase): class _TestPool(BaseTestCase):
def test_apply(self): def test_apply(self):
...@@ -1138,6 +1142,20 @@ class _TestPool(BaseTestCase): ...@@ -1138,6 +1142,20 @@ class _TestPool(BaseTestCase):
self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
list(map(sqr, list(range(100))))) list(map(sqr, list(range(100)))))
def test_starmap(self):
psmap = self.pool.starmap
tuples = list(zip(range(10), range(9,-1, -1)))
self.assertEqual(psmap(mul, tuples),
list(itertools.starmap(mul, tuples)))
tuples = list(zip(range(100), range(99,-1, -1)))
self.assertEqual(psmap(mul, tuples, chunksize=20),
list(itertools.starmap(mul, tuples)))
def test_starmap_async(self):
tuples = list(zip(range(100), range(99,-1, -1)))
self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
list(itertools.starmap(mul, tuples)))
def test_map_chunksize(self): def test_map_chunksize(self):
try: try:
self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
......
...@@ -878,6 +878,7 @@ Michael Scharf ...@@ -878,6 +878,7 @@ Michael Scharf
Andreas Schawo Andreas Schawo
Neil Schemenauer Neil Schemenauer
David Scherer David Scherer
Hynek Schlawack
Bob Schmertz Bob Schmertz
Gregor Schmid Gregor Schmid
Ralf Schmitt Ralf Schmitt
......
...@@ -419,6 +419,9 @@ Core and Builtins ...@@ -419,6 +419,9 @@ Core and Builtins
Library Library
------- -------
- Issue #12708: Add starmap() and starmap_async() methods (similar to
itertools.starmap()) to multiprocessing.Pool. Patch by Hynek Schlawack.
- Issue #1785: Fix inspect and pydoc with misbehaving descriptors. - Issue #1785: Fix inspect and pydoc with misbehaving descriptors.
- Issue #13637: "a2b" functions in the binascii module now accept ASCII-only - Issue #13637: "a2b" functions in the binascii module now accept ASCII-only
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment