Commit 1a6fdfdd authored by Jason Madden's avatar Jason Madden

Unify the implementation of Group/Pool imap/map/apply and ThreadPool imap/map/apply.

This ensures ThreadPool has the same API, including accepting multiple iterables in imap.

Also, capture the _raise_exception method of a failed greenlet and use it to raise the original traceback from those methods.
parent 31cc411d
......@@ -4,6 +4,15 @@
.. currentmodule:: gevent
Unreleased
==========
- ``gevent.threadpool.ThreadPool.imap`` and ``imap_unordered`` now
accept multiple iterables.
- (Experimental) Exceptions raised from iterating using the
``ThreadPool`` or ``Group`` mapping/application functions should now
have the original traceback.
1.1a1 (Jun 29, 2015)
====================
......
......@@ -25,7 +25,219 @@ from gevent.lock import Semaphore, DummySemaphore
__all__ = ['Group', 'Pool']
class Group(object):
class IMapUnordered(Greenlet):
_zipped = False
def __init__(self, func, iterable, spawn=None, _zipped=False):
from gevent.queue import Queue
Greenlet.__init__(self)
if spawn is not None:
self.spawn = spawn
if _zipped:
self._zipped = _zipped
self.func = func
self.iterable = iterable
self.queue = Queue()
self.count = 0
self.finished = False
self.rawlink(self._on_finish)
def __iter__(self):
return self
def next(self):
value = self._inext()
if isinstance(value, Failure):
raise value.exc
return value
__next__ = next
def _inext(self):
return self.queue.get()
def _ispawn(self, func, item):
self.count += 1
g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
g.rawlink(self._on_result)
return g
def _run(self):
try:
func = self.func
for item in self.iterable:
self._ispawn(func, item)
finally:
self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None)
self.__dict__.pop('iterable', None)
def _on_result(self, greenlet):
self.count -= 1
if greenlet.successful():
self.queue.put(self._iqueue_value_for_success(greenlet))
else:
self.queue.put(self._iqueue_value_for_failure(greenlet))
if self.ready() and self.count <= 0 and not self.finished:
self.queue.put(self._iqueue_value_for_finished())
self.finished = True
def _on_finish(self, _self):
if self.finished:
return
if not self.successful():
self.queue.put(self._iqueue_value_for_self_failure())
self.finished = True
return
if self.count <= 0:
self.queue.put(self._iqueue_value_for_finished())
self.finished = True
def _iqueue_value_for_success(self, greenlet):
return greenlet.value
def _iqueue_value_for_failure(self, greenlet):
return Failure(greenlet.exception, getattr(greenlet, '_raise_exception'))
def _iqueue_value_for_finished(self):
return Failure(StopIteration)
def _iqueue_value_for_self_failure(self):
return Failure(self.exception, self._raise_exception)
class IMap(IMapUnordered):
# A specialization of IMapUnordered that returns items
# in the order in which they were generated, not
# the order in which they finish.
# We do this by storing tuples (order, value) in the queue
# not just value.
def __init__(self, func, iterable, spawn=None, _zipped=False):
self.waiting = [] # QQQ maybe deque will work faster there?
self.index = 0
self.maxindex = -1
IMapUnordered.__init__(self, func, iterable, spawn, _zipped)
def _inext(self):
while True:
if self.waiting and self.waiting[0][0] <= self.index:
_, value = self.waiting.pop(0)
else:
index, value = self.queue.get()
if index > self.index:
insort_right(self.waiting, (index, value))
continue
self.index += 1
return value
def _ispawn(self, func, item):
g = IMapUnordered._ispawn(self, func, item)
self.maxindex += 1
g.index = self.maxindex
return g
def _iqueue_value_for_success(self, greenlet):
return (greenlet.index, IMapUnordered._iqueue_value_for_success(self, greenlet))
def _iqueue_value_for_failure(self, greenlet):
return (greenlet.index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
def _iqueue_value_for_finished(self):
self.maxindex += 1
return (self.maxindex, IMapUnordered._iqueue_value_for_finished(self))
def _iqueue_value_for_self_failure(self):
self.maxindex += 1
return (self.maxindex, IMapUnordered._iqueue_value_for_self_failure(self))
class GroupMappingMixin(object):
# Internal, non-public API class.
# Provides mixin methods for implementing mapping pools. Subclasses must define:
# - self.spawn(func, *args, **kwargs): a function that runs `func` with `args`
# and `awargs`, potentially asynchronously. Return a value with a `get` method that
# blocks until the results of func are available
# - self._apply_immediately(): should the function passed to apply be called immediately,
# synchronously?
# - self._apply_async_use_greenlet(): Should apply_async directly call
# Greenlet.spawn(), bypassing self.spawn? Return true when self.spawn would block
# - self._apply_async_cb_spawn(callback, result): Run the given callback function, possiblly
# asynchronously, possibly synchronously.
def apply_cb(self, func, args=None, kwds=None, callback=None):
result = self.apply(func, args, kwds)
if callback is not None:
self._apply_async_cb_spawn(callback, result)
return result
def apply_async(self, func, args=None, kwds=None, callback=None):
"""A variant of the apply() method which returns a Greenlet object.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready
callback is applied to it (unless the call failed)."""
if args is None:
args = ()
if kwds is None:
kwds = {}
if self._apply_async_use_greenlet():
# cannot call spawn() directly because it will block
return Greenlet.spawn(self.apply_cb, func, args, kwds, callback)
greenlet = self.spawn(func, *args, **kwds)
if callback is not None:
greenlet.link(pass_value(callback))
return greenlet
def apply(self, func, args=None, kwds=None):
"""Equivalent of the apply() builtin function. It blocks till the result is ready."""
if args is None:
args = ()
if kwds is None:
kwds = {}
if self._apply_immediately():
return func(*args, **kwds)
else:
return self.spawn(func, *args, **kwds).get()
def map(self, func, iterable):
return list(self.imap(func, iterable))
def map_cb(self, func, iterable, callback=None):
result = self.map(func, iterable)
if callback is not None:
callback(result)
return result
def map_async(self, func, iterable, callback=None):
"""
A variant of the map() method which returns a Greenlet object.
If callback is specified then it should be a callable which accepts a
single argument.
"""
return Greenlet.spawn(self.map_cb, func, iterable, callback)
def imap(self, func, *iterables):
"""An equivalent of itertools.imap()"""
return IMap.spawn(func, izip(*iterables), spawn=self.spawn,
_zipped=True)
def imap_unordered(self, func, *iterables):
"""The same as imap() except that the ordering of the results from the
returned iterator should be considered in arbitrary order."""
return IMapUnordered.spawn(func, izip(*iterables), spawn=self.spawn,
_zipped=True)
class Group(GroupMappingMixin):
"""Maintain a group of greenlets that are still running.
Links to each item and removes it upon notification.
......@@ -133,205 +345,38 @@ class Group(object):
if block:
greenlet.join(timeout)
def apply(self, func, args=None, kwds=None):
"""Equivalent of the apply() builtin function. It blocks till the result is ready."""
if args is None:
args = ()
if kwds is None:
kwds = {}
if getcurrent() in self:
return func(*args, **kwds)
else:
return self.spawn(func, *args, **kwds).get()
def apply_cb(self, func, args=None, kwds=None, callback=None):
result = self.apply(func, args, kwds)
if callback is not None:
Greenlet.spawn(callback, result)
return result
def apply_async(self, func, args=None, kwds=None, callback=None):
"""A variant of the apply() method which returns a Greenlet object.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready
callback is applied to it (unless the call failed)."""
if args is None:
args = ()
if kwds is None:
kwds = {}
if self.full():
# cannot call spawn() directly because it will block
return Greenlet.spawn(self.apply_cb, func, args, kwds, callback)
else:
greenlet = self.spawn(func, *args, **kwds)
if callback is not None:
greenlet.link(pass_value(callback))
return greenlet
def map(self, func, iterable):
return list(self.imap(func, iterable))
def map_cb(self, func, iterable, callback=None):
result = self.map(func, iterable)
if callback is not None:
callback(result)
return result
def map_async(self, func, iterable, callback=None):
"""
A variant of the map() method which returns a Greenlet object.
If callback is specified then it should be a callable which accepts a
single argument.
"""
return Greenlet.spawn(self.map_cb, func, iterable, callback)
def imap(self, func, *iterables):
"""An equivalent of itertools.imap()"""
return IMap.spawn(func, izip(*iterables), spawn=self.spawn,
_zipped=True)
def imap_unordered(self, func, *iterables):
"""The same as imap() except that the ordering of the results from the
returned iterator should be considered in arbitrary order."""
return IMapUnordered.spawn(func, izip(*iterables), spawn=self.spawn,
_zipped=True)
def full(self):
return False
def wait_available(self):
pass
# MappingMixin methods
class _IMapBase(Greenlet):
_zipped = False
def __init__(self, func, iterable, spawn=None, _zipped=False):
from gevent.queue import Queue
Greenlet.__init__(self)
if spawn is not None:
self.spawn = spawn
if _zipped:
self._zipped = _zipped
self.func = func
self.iterable = iterable
self.queue = Queue()
self.count = 0
self.finished = False
self.rawlink(self._on_finish)
def __iter__(self):
return self
def _ispawn(self, func, item):
self.count += 1
g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
g.rawlink(self._on_result)
return g
def _run(self):
try:
func = self.func
for item in self.iterable:
self._ispawn(func, item)
finally:
self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None)
self.__dict__.pop('iterable', None)
class IMapUnordered(_IMapBase):
def next(self):
value = self.queue.get()
if isinstance(value, Failure):
raise value.exc
return value
__next__ = next
def _on_result(self, greenlet):
self.count -= 1
if greenlet.successful():
self.queue.put(greenlet.value)
else:
self.queue.put(Failure(greenlet.exception))
if self.ready() and self.count <= 0 and not self.finished:
self.queue.put(Failure(StopIteration))
self.finished = True
def _apply_immediately(self):
# If apply() is called from one of our own
# worker greenlets, don't spawn a new one
return getcurrent() in self
def _on_finish(self, _self):
if self.finished:
return
if not self.successful():
self.queue.put(Failure(self.exception))
self.finished = True
return
if self.count <= 0:
self.queue.put(Failure(StopIteration))
self.finished = True
def _apply_async_cb_spawn(self, callback, result):
Greenlet.spawn(callback, result)
class IMap(_IMapBase):
def __init__(self, func, iterable, spawn=None, _zipped=False):
self.waiting = [] # QQQ maybe deque will work faster there?
self.index = 0
self.maxindex = -1
_IMapBase.__init__(self, func, iterable, spawn, _zipped)
def next(self):
while True:
if self.waiting and self.waiting[0][0] <= self.index:
index, value = self.waiting.pop(0)
else:
index, value = self.queue.get()
if index > self.index:
insort_right(self.waiting, (index, value))
continue
self.index += 1
if isinstance(value, Failure):
raise value.exc
return value
__next__ = next
def _ispawn(self, func, item):
g = _IMapBase._ispawn(self, func, item)
self.maxindex += 1
g.index = self.maxindex
return g
def _on_result(self, greenlet):
self.count -= 1
if greenlet.successful():
self.queue.put((greenlet.index, greenlet.value))
else:
self.queue.put((greenlet.index, Failure(greenlet.exception)))
if self.ready() and self.count <= 0 and not self.finished:
self.maxindex += 1
self.queue.put((self.maxindex, Failure(StopIteration)))
self.finished = True
def _on_finish(self, _self):
if self.finished:
return
if not self.successful():
self.maxindex += 1
self.queue.put((self.maxindex, Failure(self.exception)))
self.finished = True
return
if self.count <= 0:
self.maxindex += 1
self.queue.put((self.maxindex, Failure(StopIteration)))
self.finished = True
def _apply_async_use_greenlet(self):
return self.full() # cannot call self.spawn() because it will block
class Failure(object):
__slots__ = ['exc']
__slots__ = ['exc', '_raise_exception']
def __init__(self, exc):
def __init__(self, exc, raise_exception=None):
self.exc = exc
self._raise_exception = raise_exception
def raise_exc(self):
if self._raise_exception:
self._raise_exception()
else:
raise self.exc
class Pool(Group):
......
......@@ -5,7 +5,7 @@ import os
from gevent.hub import get_hub, getcurrent, sleep, integer_types
from gevent.event import AsyncResult
from gevent.greenlet import Greenlet
from gevent.pool import IMap, IMapUnordered
from gevent.pool import GroupMappingMixin
from gevent.lock import Semaphore
from gevent._threading import Lock, Queue, start_new_thread
......@@ -17,7 +17,7 @@ __all__ = ['ThreadPool',
'ThreadResult']
class ThreadPool(object):
class ThreadPool(GroupMappingMixin):
def __init__(self, maxsize, hub=None):
if hub is None:
......@@ -224,57 +224,16 @@ class ThreadPool(object):
return result
raise result
def apply(self, func, args=None, kwds=None):
"""Equivalent of the apply() builtin function. It blocks till the result is ready."""
if args is None:
args = ()
if kwds is None:
kwds = {}
return self.spawn(func, *args, **kwds).get()
def apply_cb(self, func, args=None, kwds=None, callback=None):
result = self.apply(func, args, kwds)
if callback is not None:
callback(result)
return result
def apply_async(self, func, args=None, kwds=None, callback=None):
"""A variant of the apply() method which returns a Greenlet object.
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready
callback is applied to it (unless the call failed)."""
if args is None:
args = ()
if kwds is None:
kwds = {}
return Greenlet.spawn(self.apply_cb, func, args, kwds, callback)
def _apply_immediately(self):
# we always pass apply() off to the threadpool
return False
def map(self, func, iterable):
return list(self.imap(func, iterable))
def _apply_async_cb_spawn(self, callback, result):
callback(result)
def map_cb(self, func, iterable, callback=None):
result = self.map(func, iterable)
if callback is not None:
callback(result)
return result
def map_async(self, func, iterable, callback=None):
"""
A variant of the map() method which returns a Greenlet object.
If callback is specified then it should be a callable which accepts a
single argument.
"""
return Greenlet.spawn(self.map_cb, func, iterable, callback)
def imap(self, func, iterable):
"""An equivalent of itertools.imap()"""
return IMap.spawn(func, iterable, spawn=self.spawn)
def imap_unordered(self, func, iterable):
"""The same as imap() except that the ordering of the results from the
returned iterator should be considered in arbitrary order."""
return IMapUnordered.spawn(func, iterable, spawn=self.spawn)
def _apply_async_use_greenlet(self):
# Always go to Greenlet because our self.spawn uses threads
return True
class ThreadResult(object):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment