Commit 8916cda1 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1154 from gevent/threadpool-opts

Compile IMap[Unordered] with Cython
parents b4db40b8 6c30ef65
...@@ -11,6 +11,8 @@ src/gevent/_semaphore.c ...@@ -11,6 +11,8 @@ src/gevent/_semaphore.c
src/gevent/local.c src/gevent/local.c
src/gevent/greenlet.c src/gevent/greenlet.c
src/gevent/_ident.c src/gevent/_ident.c
src/gevent/_imap.c
src/gevent/event.c
src/gevent/libev/corecext.c src/gevent/libev/corecext.c
src/gevent/libev/corecext.h src/gevent/libev/corecext.h
src/gevent/libev/_corecffi.c src/gevent/libev/_corecffi.c
......
...@@ -45,6 +45,14 @@ Enhancements ...@@ -45,6 +45,14 @@ Enhancements
- Pools for greenlets and threads have lower overhead, especially for - Pools for greenlets and threads have lower overhead, especially for
``map``. See :pr:`1153`. ``map``. See :pr:`1153`.
- The undocumented, internal implementation classes ``IMap`` and
``IMapUnordered`` classes are now compiled with Cython, further
reducing the overhead of ``[Thread]Pool.imap``.
- The classes `gevent.event.Event` and `gevent.event.AsyncResult`
are compiled with Cython for improved performance. Please report any
compatibility issues.
Monitoring and Debugging Monitoring and Debugging
------------------------ ------------------------
......
...@@ -93,11 +93,25 @@ IDENT = Extension(name="gevent.__ident", ...@@ -93,11 +93,25 @@ IDENT = Extension(name="gevent.__ident",
depends=['src/gevent/__ident.pxd'], depends=['src/gevent/__ident.pxd'],
include_dirs=include_dirs) include_dirs=include_dirs)
IMAP = Extension(name="gevent.__imap",
sources=["src/gevent/_imap.py"],
depends=['src/gevent/__imap.pxd'],
include_dirs=include_dirs)
EVENT = Extension(name="gevent._event",
sources=["src/gevent/event.py"],
depends=['src/gevent/_event.pxd'],
include_dirs=include_dirs)
_to_cythonize = [ _to_cythonize = [
SEMAPHORE, SEMAPHORE,
LOCAL, LOCAL,
GREENLET, GREENLET,
IDENT, IDENT,
IMAP,
EVENT,
] ]
EXT_MODULES = [ EXT_MODULES = [
...@@ -107,6 +121,8 @@ EXT_MODULES = [ ...@@ -107,6 +121,8 @@ EXT_MODULES = [
LOCAL, LOCAL,
GREENLET, GREENLET,
IDENT, IDENT,
IMAP,
EVENT,
] ]
LIBEV_CFFI_MODULE = 'src/gevent/libev/_corecffi_build.py:ffi' LIBEV_CFFI_MODULE = 'src/gevent/libev/_corecffi_build.py:ffi'
...@@ -169,6 +185,13 @@ if PYPY: ...@@ -169,6 +185,13 @@ if PYPY:
_to_cythonize.remove(SEMAPHORE) _to_cythonize.remove(SEMAPHORE)
_to_cythonize.remove(IDENT) _to_cythonize.remove(IDENT)
EXT_MODULES.remove(IMAP)
_to_cythonize.remove(IMAP)
EXT_MODULES.remove(EVENT)
_to_cythonize.remove(EVENT)
for mod in _to_cythonize: for mod in _to_cythonize:
EXT_MODULES.remove(mod) EXT_MODULES.remove(mod)
EXT_MODULES.append(cythonize1(mod)) EXT_MODULES.append(cythonize1(mod))
......
cimport cython
from gevent._greenlet cimport Greenlet
from gevent.__semaphore cimport Semaphore
@cython.freelist(100)
cdef class Failure:
cdef readonly exc
cdef raise_exception
cdef inline _raise_exc(Failure failure)
cdef class IMapUnordered(Greenlet):
cdef bint _zipped
cdef func
cdef iterable
cdef spawn
cdef Semaphore _result_semaphore
cdef int _outstanding_tasks
cdef int _max_index
cdef _queue_get
cdef _queue_put
cdef readonly queue
cdef readonly bint finished
cdef _inext(self)
cdef _ispawn(self, func, item, int item_index)
# Passed to greenlet.link
cpdef _on_result(self, greenlet)
# Called directly
cdef _on_finish(self, exception)
cdef _iqueue_value_for_success(self, greenlet)
cdef _iqueue_value_for_failure(self, greenlet)
cdef _iqueue_value_for_self_finished(self)
cdef _iqueue_value_for_self_failure(self, exception)
cdef class IMap(IMapUnordered):
cdef int index
cdef dict _results
@cython.locals(index=int)
cdef _inext(self)
...@@ -71,10 +71,14 @@ if PY3: ...@@ -71,10 +71,14 @@ if PY3:
iteritems = dict.items iteritems = dict.items
itervalues = dict.values itervalues = dict.values
xrange = range xrange = range
izip = zip
else: else:
iteritems = dict.iteritems # python 3: pylint:disable=no-member iteritems = dict.iteritems # python 3: pylint:disable=no-member
itervalues = dict.itervalues # python 3: pylint:disable=no-member itervalues = dict.itervalues # python 3: pylint:disable=no-member
xrange = __builtin__.xrange xrange = __builtin__.xrange
from itertools import izip # python 3: pylint:disable=no-member
izip = izip
# fspath from 3.6 os.py, but modified to raise the same exceptions as the # fspath from 3.6 os.py, but modified to raise the same exceptions as the
# real native implementation. # real native implementation.
......
cimport cython
cdef _None
cdef reraise
cdef dump_traceback
cdef load_traceback
cdef get_hub
cdef InvalidSwitchError
cdef Timeout
cdef bint _greenlet_imported
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef void _init()
cdef class _AbstractLinkable:
cdef _notifier
cdef set _links
cdef readonly hub
cpdef rawlink(self, callback)
cpdef bint ready(self)
cpdef unlink(self, callback)
cdef _check_and_notify(self)
@cython.locals(todo=set)
cpdef _notify_links(self)
cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success)
cdef _wait(self, timeout=*)
cdef class Event(_AbstractLinkable):
cdef bint _flag
cdef class AsyncResult(_AbstractLinkable):
cdef readonly _value
cdef readonly tuple _exc_info
# For the use of _imap.py
cdef public int _imap_task_index
cpdef get(self, block=*, timeout=*)
cpdef bint successful(self)
cpdef wait(self, timeout=*)
cpdef bint done(self)
cpdef bint cancel(self)
cpdef bint cancelled(self)
...@@ -43,7 +43,6 @@ cdef extern from "frameobject.h": ...@@ -43,7 +43,6 @@ cdef extern from "frameobject.h":
# proper None instead. # proper None instead.
# cdef FrameType f_back # cdef FrameType f_back
cdef void _init() cdef void _init()
cdef class SpawnedLink: cdef class SpawnedLink:
...@@ -119,9 +118,12 @@ cdef class Greenlet(greenlet): ...@@ -119,9 +118,12 @@ cdef class Greenlet(greenlet):
# This is used as the target of a callback # This is used as the target of a callback
# from the loop, and so needs to be a cpdef # from the loop, and so needs to be a cpdef
cpdef _notify_links(self) cpdef _notify_links(self)
# IMapUnordered greenlets in pools need to access this
# method # Hmm, declaring _raise_exception causes issues when _imap
cpdef _raise_exception(self) # is also compiled.
# TypeError: wrap() takes exactly one argument (0 given)
# cpdef _raise_exception(self)
@cython.final @cython.final
cdef greenlet get_hub() cdef greenlet get_hub()
......
# -*- coding: utf-8 -*-
# Copyright (c) 2018 gevent
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False,infer_types=True
"""
Iterators across greenlets or AsyncResult objects.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from gevent import _semaphore
__all__ = [
'IMapUnordered',
'IMap',
]
locals()['Greenlet'] = __import__('gevent').Greenlet
locals()['Semaphore'] = _semaphore.Semaphore
class Failure(object):
__slots__ = ('exc', 'raise_exception')
def __init__(self, exc, raise_exception=None):
self.exc = exc
self.raise_exception = raise_exception
def _raise_exc(failure):
# For cython.
if failure.raise_exception:
failure.raise_exception()
else:
raise failure.exc
class IMapUnordered(Greenlet): # pylint:disable=undefined-variable
"""
At iterator of map results.
"""
def __init__(self, func, iterable, spawn, maxsize=None, _zipped=False):
"""
An iterator that.
:param callable spawn: The function we use to create new greenlets.
:keyword int maxsize: If given and not-None, specifies the maximum number of
finished results that will be allowed to accumulated awaiting the reader;
more than that number of results will cause map function greenlets to begin
to block. This is most useful is there is a great disparity in the speed of
the mapping code and the consumer and the results consume a great deal of resources.
Using a bound is more computationally expensive than not using a bound.
.. versionchanged:: 1.1b3
Added the *maxsize* parameter.
"""
from gevent.queue import Queue
super(IMapUnordered, self).__init__()
self.spawn = spawn
self._zipped = _zipped
self.func = func
self.iterable = iterable
self.queue = Queue()
self._queue_get = self.queue.get
self._queue_put = self.queue.put
if maxsize:
# Bounding the queue is not enough if we want to keep from
# accumulating objects; the result value will be around as
# the greenlet's result, blocked on self.queue.put(), and
# we'll go on to spawn another greenlet, which in turn can
# create the result. So we need a semaphore to prevent a
# greenlet from exiting while the queue is full so that we
# don't spawn the next greenlet (assuming that self.spawn
# is of course bounded). (Alternatively we could have the
# greenlet itself do the insert into the pool, but that
# takes some rework).
#
# Given the use of a semaphore at this level, sizing the queue becomes
# redundant, and that lets us avoid having to use self.link() instead
# of self.rawlink() to avoid having blocking methods called in the
# hub greenlet.
self._result_semaphore = Semaphore(maxsize) # pylint:disable=undefined-variable
else:
self._result_semaphore = None
self._outstanding_tasks = 0
# The index (zero based) of the maximum number of
# results we will have.
self._max_index = -1
self.finished = False
# We're iterating in a different greenlet than we're running.
def __iter__(self):
return self
def __next__(self):
if self._result_semaphore is not None:
self._result_semaphore.release()
value = self._inext()
if isinstance(value, Failure):
_raise_exc(value)
return value
next = __next__ # Py2
def _inext(self):
return self._queue_get()
def _ispawn(self, func, item, item_index):
if self._result_semaphore is not None:
self._result_semaphore.acquire()
self._outstanding_tasks += 1
g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
g._imap_task_index = item_index
g.rawlink(self._on_result)
return g
def _run(self): # pylint:disable=method-hidden
try:
func = self.func
for item in self.iterable:
self._max_index += 1
self._ispawn(func, item, self._max_index)
self._on_finish(None)
except BaseException as e:
self._on_finish(e)
raise
finally:
self.spawn = None
self.func = None
self.iterable = None
self._result_semaphore = None
def _on_result(self, greenlet):
# This method will be called in the hub greenlet (we rawlink)
self._outstanding_tasks -= 1
count = self._outstanding_tasks
finished = self.finished
ready = self.ready()
put_finished = False
if ready and count <= 0 and not finished:
finished = self.finished = True
put_finished = True
if greenlet.successful():
self._queue_put(self._iqueue_value_for_success(greenlet))
else:
self._queue_put(self._iqueue_value_for_failure(greenlet))
if put_finished:
self._queue_put(self._iqueue_value_for_self_finished())
def _on_finish(self, exception):
# Called in this greenlet.
if self.finished:
return
if exception is not None:
self.finished = True
self._queue_put(self._iqueue_value_for_self_failure(exception))
return
if self._outstanding_tasks <= 0:
self.finished = True
self._queue_put(self._iqueue_value_for_self_finished())
def _iqueue_value_for_success(self, greenlet):
return greenlet.value
def _iqueue_value_for_failure(self, greenlet):
return Failure(greenlet.exception, getattr(greenlet, '_raise_exception'))
def _iqueue_value_for_self_finished(self):
return Failure(StopIteration())
def _iqueue_value_for_self_failure(self, exception):
return Failure(exception, self._raise_exception)
class IMap(IMapUnordered):
# A specialization of IMapUnordered that returns items
# in the order in which they were generated, not
# the order in which they finish.
def __init__(self, *args, **kwargs):
# The result dictionary: {index: value}
self._results = {}
# The index of the result to return next.
self.index = 0
IMapUnordered.__init__(self, *args, **kwargs)
def _inext(self):
try:
value = self._results.pop(self.index)
except KeyError:
# Wait for our index to finish.
while 1:
index, value = self._queue_get()
if index == self.index:
break
else:
self._results[index] = value
self.index += 1
return value
def _iqueue_value_for_success(self, greenlet):
return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_success(self, greenlet))
def _iqueue_value_for_failure(self, greenlet):
return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
def _iqueue_value_for_self_finished(self):
return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_finished(self))
def _iqueue_value_for_self_failure(self, exception):
return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_failure(self, exception))
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__imap')
# Copyright (c) 2009-2016 Denis Bilenko, gevent contributors. See LICENSE for details. # Copyright (c) 2009-2016 Denis Bilenko, gevent contributors. See LICENSE for details.
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False,infer_types=True
"""Basic synchronization primitives: Event and AsyncResult""" """Basic synchronization primitives: Event and AsyncResult"""
from __future__ import print_function from __future__ import print_function
import sys import sys
...@@ -8,19 +10,27 @@ from gevent._compat import reraise ...@@ -8,19 +10,27 @@ from gevent._compat import reraise
from gevent._tblib import dump_traceback, load_traceback from gevent._tblib import dump_traceback, load_traceback
from gevent.hub import _get_hub_noargs as get_hub from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import getcurrent
from gevent.hub import InvalidSwitchError from gevent.hub import InvalidSwitchError
from gevent.timeout import Timeout from gevent.timeout import Timeout
__all__ = ['Event', 'AsyncResult'] __all__ = [
'Event',
'AsyncResult',
]
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
import cython
class _AbstractLinkable(object): class _AbstractLinkable(object):
# Encapsulates the standard parts of the linking and notifying protocol # Encapsulates the standard parts of the linking and notifying protocol
# common to both repeatable events and one-time events (AsyncResult). # common to both repeatable events and one-time events (AsyncResult).
_notifier = None __slots__ = ('_links', 'hub', '_notifier')
def __init__(self): def __init__(self):
# Also previously, AsyncResult maintained the order of notifications, but Event # Also previously, AsyncResult maintained the order of notifications, but Event
...@@ -35,6 +45,7 @@ class _AbstractLinkable(object): ...@@ -35,6 +45,7 @@ class _AbstractLinkable(object):
# uniqueness would be with a 2.7+ OrderedDict.) # uniqueness would be with a 2.7+ OrderedDict.)
self._links = set() self._links = set()
self.hub = get_hub() self.hub = get_hub()
self._notifier = None
def ready(self): def ready(self):
# Instances must define this # Instances must define this
...@@ -95,14 +106,14 @@ class _AbstractLinkable(object): ...@@ -95,14 +106,14 @@ class _AbstractLinkable(object):
# bool(self._notifier) would turn to False as soon as we exit this # bool(self._notifier) would turn to False as soon as we exit this
# method anyway. # method anyway.
del todo del todo
del self._notifier self._notifier = None
def _wait_core(self, timeout, catch=Timeout): def _wait_core(self, timeout, catch=Timeout):
# The core of the wait implementation, handling # The core of the wait implementation, handling
# switching and linking. If *catch* is set to (), # switching and linking. If *catch* is set to (),
# a timeout that elapses will be allowed to be raised. # a timeout that elapses will be allowed to be raised.
# Returns a true value if the wait succeeded without timing out. # Returns a true value if the wait succeeded without timing out.
switch = getcurrent().switch switch = getcurrent().switch # pylint:disable=undefined-variable
self.rawlink(switch) self.rawlink(switch)
try: try:
with Timeout._start_new_or_dummy(timeout) as timer: with Timeout._start_new_or_dummy(timeout) as timer:
...@@ -148,7 +159,11 @@ class Event(_AbstractLinkable): ...@@ -148,7 +159,11 @@ class Event(_AbstractLinkable):
the waiting greenlets being awakened. These details may change in the future. the waiting greenlets being awakened. These details may change in the future.
""" """
_flag = False __slots__ = ('_flag',)
def __init__(self):
_AbstractLinkable.__init__(self)
self._flag = False
def __str__(self): def __str__(self):
return '<%s %s _links[%s]>' % (self.__class__.__name__, (self._flag and 'set') or 'clear', len(self._links)) return '<%s %s _links[%s]>' % (self.__class__.__name__, (self._flag and 'set') or 'clear', len(self._links))
...@@ -157,8 +172,14 @@ class Event(_AbstractLinkable): ...@@ -157,8 +172,14 @@ class Event(_AbstractLinkable):
"""Return true if and only if the internal flag is true.""" """Return true if and only if the internal flag is true."""
return self._flag return self._flag
isSet = is_set # makes it a better drop-in replacement for threading.Event def isSet(self):
ready = is_set # makes it compatible with AsyncResult and Greenlet (for example in wait()) # makes it a better drop-in replacement for threading.Event
return self._flag
def ready(self):
# makes it compatible with AsyncResult and Greenlet (for
# example in wait())
return self._flag
def set(self): def set(self):
""" """
...@@ -221,7 +242,7 @@ class Event(_AbstractLinkable): ...@@ -221,7 +242,7 @@ class Event(_AbstractLinkable):
return self._wait(timeout) return self._wait(timeout)
def _reset_internal_locks(self): # pragma: no cover def _reset_internal_locks(self): # pragma: no cover
# for compatibility with threading.Event (only in case of patch_all(Event=True), by default Event is not patched) # for compatibility with threading.Event
# Exception AttributeError: AttributeError("'Event' object has no attribute '_reset_internal_locks'",) # Exception AttributeError: AttributeError("'Event' object has no attribute '_reset_internal_locks'",)
# in <module 'threading' from '/usr/lib/python2.7/threading.pyc'> ignored # in <module 'threading' from '/usr/lib/python2.7/threading.pyc'> ignored
pass pass
...@@ -277,9 +298,12 @@ class AsyncResult(_AbstractLinkable): ...@@ -277,9 +298,12 @@ class AsyncResult(_AbstractLinkable):
merged. merged.
""" """
_value = _NONE __slots__ = ('_value', '_exc_info', '_imap_task_index')
_exc_info = ()
_notifier = None def __init__(self):
_AbstractLinkable.__init__(self)
self._value = _NONE
self._exc_info = ()
@property @property
def _exception(self): def _exception(self):
...@@ -448,3 +472,12 @@ class AsyncResult(_AbstractLinkable): ...@@ -448,3 +472,12 @@ class AsyncResult(_AbstractLinkable):
return False return False
# exception is a method, we use it as a property # exception is a method, we use it as a property
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent._event')
# Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details. # Copyright (c) 2009-2012 Denis Bilenko. See LICENSE for details.
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False # cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
from __future__ import absolute_import, print_function, division from __future__ import absolute_import, print_function, division
from sys import _getframe as sys_getframe from sys import _getframe as sys_getframe
......
...@@ -15,11 +15,6 @@ reached the limit, until there is a free slot. ...@@ -15,11 +15,6 @@ reached the limit, until there is a free slot.
""" """
from __future__ import print_function, absolute_import, division from __future__ import print_function, absolute_import, division
try:
from itertools import izip
except ImportError:
# Python 3
izip = zip
from gevent.hub import GreenletExit, getcurrent, kill as _kill from gevent.hub import GreenletExit, getcurrent, kill as _kill
from gevent.greenlet import joinall, Greenlet from gevent.greenlet import joinall, Greenlet
...@@ -28,6 +23,10 @@ from gevent.timeout import Timeout ...@@ -28,6 +23,10 @@ from gevent.timeout import Timeout
from gevent.event import Event from gevent.event import Event
from gevent.lock import Semaphore, DummySemaphore from gevent.lock import Semaphore, DummySemaphore
from gevent._compat import izip
from gevent._imap import IMap
from gevent._imap import IMapUnordered
__all__ = [ __all__ = [
'Group', 'Group',
'Pool', 'Pool',
...@@ -35,185 +34,6 @@ __all__ = [ ...@@ -35,185 +34,6 @@ __all__ = [
] ]
class IMapUnordered(Greenlet):
"""
At iterator of map results.
"""
def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False):
"""
An iterator that.
:keyword callable spawn: The function we use to
:keyword int maxsize: If given and not-None, specifies the maximum number of
finished results that will be allowed to accumulated awaiting the reader;
more than that number of results will cause map function greenlets to begin
to block. This is most useful is there is a great disparity in the speed of
the mapping code and the consumer and the results consume a great deal of resources.
Using a bound is more computationally expensive than not using a bound.
.. versionchanged:: 1.1b3
Added the *maxsize* parameter.
"""
from gevent.queue import Queue
Greenlet.__init__(self)
if spawn is not None:
self.spawn = spawn
self._zipped = _zipped
self.func = func
self.iterable = iterable
self.queue = Queue()
if maxsize:
# Bounding the queue is not enough if we want to keep from
# accumulating objects; the result value will be around as
# the greenlet's result, blocked on self.queue.put(), and
# we'll go on to spawn another greenlet, which in turn can
# create the result. So we need a semaphore to prevent a
# greenlet from exiting while the queue is full so that we
# don't spawn the next greenlet (assuming that self.spawn
# is of course bounded). (Alternatively we could have the
# greenlet itself do the insert into the pool, but that
# takes some rework).
#
# Given the use of a semaphore at this level, sizing the queue becomes
# redundant, and that lets us avoid having to use self.link() instead
# of self.rawlink() to avoid having blocking methods called in the
# hub greenlet.
factory = Semaphore
else:
factory = DummySemaphore
self._result_semaphore = factory(maxsize)
self._outstanding_tasks = 0
# The index (zero based) of the maximum number of
# results we will have.
self._max_index = -1
self.finished = False
# We're iterating in a different greenlet than we're running.
def __iter__(self):
return self
def next(self):
self._result_semaphore.release()
value = self._inext()
if isinstance(value, Failure):
raise value.exc
return value
__next__ = next
def _inext(self):
return self.queue.get()
def _ispawn(self, func, item, item_index):
self._result_semaphore.acquire()
self._outstanding_tasks += 1
g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
g._imap_task_index = item_index
g.rawlink(self._on_result)
return g
def _run(self): # pylint:disable=method-hidden
try:
func = self.func
for item in self.iterable:
self._max_index += 1
self._ispawn(func, item, self._max_index)
self._on_finish(None)
except BaseException as e:
self._on_finish(e)
raise
finally:
self.__dict__.pop('spawn', None)
self.__dict__.pop('func', None)
self.__dict__.pop('iterable', None)
def _on_result(self, greenlet):
# This method will be called in the hub greenlet (we rawlink)
self._outstanding_tasks -= 1
count = self._outstanding_tasks
finished = self.finished
ready = self.ready()
put_finished = False
if ready and count <= 0 and not finished:
finished = self.finished = True
put_finished = True
if greenlet.successful():
self.queue.put(self._iqueue_value_for_success(greenlet))
else:
self.queue.put(self._iqueue_value_for_failure(greenlet))
if put_finished:
self.queue.put(self._iqueue_value_for_self_finished())
def _on_finish(self, exception):
# Called in this greenlet.
if self.finished:
return
if exception is not None:
self.finished = True
self.queue.put(self._iqueue_value_for_self_failure(exception))
return
if self._outstanding_tasks <= 0:
self.finished = True
self.queue.put(self._iqueue_value_for_self_finished())
def _iqueue_value_for_success(self, greenlet):
return greenlet.value
def _iqueue_value_for_failure(self, greenlet):
return Failure(greenlet.exception, getattr(greenlet, '_raise_exception'))
def _iqueue_value_for_self_finished(self):
return Failure(StopIteration)
def _iqueue_value_for_self_failure(self, exception):
return Failure(exception, self._raise_exception)
class IMap(IMapUnordered):
# A specialization of IMapUnordered that returns items
# in the order in which they were generated, not
# the order in which they finish.
def __init__(self, *args, **kwargs):
# The result dictionary: {index: value}
self._results = {}
# The index of the result to return next.
self.index = 0
IMapUnordered.__init__(self, *args, **kwargs)
def _inext(self):
try:
value = self._results.pop(self.index)
except KeyError:
# Wait for our index to finish.
while 1:
index, value = self.queue.get()
if index == self.index:
break
else:
self._results[index] = value
self.index += 1
return value
def _iqueue_value_for_success(self, greenlet):
return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_success(self, greenlet))
def _iqueue_value_for_failure(self, greenlet):
return (greenlet._imap_task_index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
def _iqueue_value_for_self_finished(self):
return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_finished(self))
def _iqueue_value_for_self_failure(self, exception):
return (self._max_index + 1, IMapUnordered._iqueue_value_for_self_failure(self, exception))
class GroupMappingMixin(object): class GroupMappingMixin(object):
...@@ -683,19 +503,6 @@ class Group(GroupMappingMixin): ...@@ -683,19 +503,6 @@ class Group(GroupMappingMixin):
return self.full() return self.full()
class Failure(object):
__slots__ = ['exc', '_raise_exception']
def __init__(self, exc, raise_exception=None):
self.exc = exc
self._raise_exception = raise_exception
def raise_exc(self):
if self._raise_exception:
self._raise_exception()
else:
raise self.exc
class PoolFull(QueueFull): class PoolFull(QueueFull):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment