Commit 92b1a6b6 authored by Jason Madden's avatar Jason Madden

Compile the hub operations that use Waiters with Cython

Since we've come this far, might as well keep taking advantage of the
effort...

There are substantial improvements on the micro benchmarks for things
that wait and switch:

| Benchmark           | 27_hub_master2 | 27_hub_cython5               |
|---------------------|----------------|------------------------------|
| multiple wait ready | 1.96 us        | 1.10 us: 1.77x faster (-44%) |
| wait ready          | 1.47 us        | 897 ns: 1.64x faster (-39%)  |
| cancel wait         | 2.93 us        | 1.81 us: 1.61x faster (-38%) |
| switch              | 2.33 us        | 1.94 us: 1.20x faster (-17%) |

| Benchmark           | 36_hub_master2 | 36_hub_cython6 |
|---------------------|----------------|------------------------------|
| multiple wait ready | 1.28 us        | 820 ns: 1.56x faster (-36%)  |
| wait ready          | 939 ns         | 722 ns: 1.30x faster (-23%)  |
| cancel wait         | 1.76 us        | 1.37 us: 1.29x faster (-23%) |
| switch              | 1.60 us        | 1.35 us: 1.18x faster (-16%) |
parent 870e8e13
......@@ -16,6 +16,8 @@ src/gevent/event.c
src/gevent/_hub_local.c
src/gevent/_waiter.c
src/gevent/queue.c
src/gevent/_hub_primitives.c
src/gevent/_greenlet_primitives.c
src/gevent/libev/corecext.c
src/gevent/libev/corecext.h
src/gevent/libev/_corecffi.c
......
......@@ -51,7 +51,8 @@ Enhancements
- The classes `gevent.event.Event` and `gevent.event.AsyncResult`
are compiled with Cython for improved performance, as is the
``gevent.queue`` module and ``gevent.hub.Waiter``. Please report any
``gevent.queue`` module and ``gevent.hub.Waiter`` and certain
time-sensitive parts of the hub itself. Please report any
compatibility issues.
Monitoring and Debugging
......
# -*- coding: utf-8 -*-
"""
Benchmarks for hub primitive operations.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import perf
from perf import perf_counter
import gevent
from greenlet import greenlet
from greenlet import getcurrent
N = 1000
def bench_switch():
class Parent(type(gevent.get_hub())):
def run(self):
parent = self.parent
for _ in range(N):
parent.switch()
def child():
parent = getcurrent().parent
# Back to the hub, which in turn goes
# back to the main greenlet
for _ in range(N):
parent.switch()
hub = Parent(None, None)
child_greenlet = greenlet(child, hub)
for _ in range(N):
child_greenlet.switch()
def bench_wait_ready():
class Watcher(object):
def start(self, cb, obj):
# Immediately switch back to the waiter, mark as ready
cb(obj)
def stop(self):
pass
watcher = Watcher()
hub = gevent.get_hub()
for _ in range(1000):
hub.wait(watcher)
def bench_cancel_wait():
class Watcher(object):
active = True
callback = object()
def close(self):
pass
watcher = Watcher()
hub = gevent.get_hub()
loop = hub.loop
for _ in range(1000):
# Schedule all the callbacks.
hub.cancel_wait(watcher, None, True)
# Run them!
for cb in loop._callbacks:
if cb.callback:
cb.callback(*cb.args)
cb.stop() # so the real loop won't do it
# destroy the loop so we don't keep building these functions
# up
hub.destroy(True)
def bench_wait_func_ready():
from gevent.hub import wait
class ToWatch(object):
def rawlink(self, cb):
cb(self)
watched_objects = [ToWatch() for _ in range(N)]
t0 = perf_counter()
wait(watched_objects)
return perf_counter() - t0
def main():
runner = perf.Runner()
runner.bench_func('multiple wait ready',
bench_wait_func_ready,
inner_loops=N)
runner.bench_func('wait ready',
bench_wait_ready,
inner_loops=N)
runner.bench_func('cancel wait',
bench_cancel_wait,
inner_loops=N)
runner.bench_func('switch',
bench_switch,
inner_loops=N)
if __name__ == '__main__':
main()
......@@ -10,6 +10,9 @@
:members:
:undoc-members:
.. automethod:: wait
.. automethod:: cancel_wait
.. autoclass:: Waiter
.. autoclass:: LoopExit
......@@ -119,17 +119,32 @@ WAITER = Extension(name="gevent.__waiter",
depends=['src/gevent/__waiter.pxd'],
include_dirs=include_dirs)
HUB_PRIMITIVES = Extension(name="gevent.__hub_primitives",
sources=["src/gevent/_hub_primitives.py"],
depends=['src/gevent/__hub_primitives.pxd'],
include_dirs=include_dirs)
GLT_PRIMITIVES = Extension(name="gevent.__greenlet_primitives",
sources=["src/gevent/_greenlet_primitives.py"],
depends=['src/gevent/__greenlet_primitives.pxd'],
include_dirs=include_dirs)
_to_cythonize = [
GLT_PRIMITIVES,
HUB_PRIMITIVES,
HUB_LOCAL,
WAITER,
GREENLET,
SEMAPHORE,
LOCAL,
GREENLET,
IDENT,
IMAP,
EVENT,
QUEUE,
HUB_LOCAL,
WAITER,
]
EXT_MODULES = [
......@@ -144,6 +159,8 @@ EXT_MODULES = [
QUEUE,
HUB_LOCAL,
WAITER,
HUB_PRIMITIVES,
GLT_PRIMITIVES,
]
LIBEV_CFFI_MODULE = 'src/gevent/libev/_corecffi_build.py:ffi'
......@@ -221,6 +238,12 @@ if PYPY:
EXT_MODULES.remove(WAITER)
_to_cythonize.remove(WAITER)
EXT_MODULES.remove(GLT_PRIMITIVES)
_to_cythonize.remove(GLT_PRIMITIVES)
EXT_MODULES.remove(HUB_PRIMITIVES)
_to_cythonize.remove(HUB_PRIMITIVES)
for mod in _to_cythonize:
EXT_MODULES.remove(mod)
EXT_MODULES.append(cythonize1(mod))
......
cimport cython
# This file must not cimport anything from gevent.
cdef wref
cdef BlockingSwitchOutError
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
object PyGreenlet_Switch(greenlet self, void* args, void* kwargs)
void PyGreenlet_Import()
@cython.final
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef bint _greenlet_imported
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef inline object _greenlet_switch(greenlet self):
return PyGreenlet_Switch(self, NULL, NULL)
cdef class TrackedRawGreenlet(greenlet):
pass
cdef class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
cdef public loop
cpdef switch(self)
cpdef switch_out(self)
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
cdef _threadlocal
cpdef get_hub_class()
cpdef get_hub_if_exists()
cpdef set_hub(hub)
cpdef SwitchOutGreenletWithLoop get_hub_if_exists()
cpdef set_hub(SwitchOutGreenletWithLoop hub)
cpdef get_loop()
cpdef set_loop(loop)
# We can't cdef this, it won't do varargs.
# cpdef WaitOperationsGreenlet get_hub(*args, **kwargs)
# XXX: TODO: Move the definition of TrackedRawGreenlet
# into a file that can be cython compiled so get_hub can
# return that.
cpdef get_hub_noargs()
cpdef SwitchOutGreenletWithLoop get_hub_noargs()
cimport cython
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent.__hub_local cimport get_hub_noargs as get_hub
from gevent.__waiter cimport Waiter
from gevent.__waiter cimport MultipleWaiter
cdef InvalidSwitchError
cdef _waiter
cdef _greenlet_primitives
cdef traceback
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
@cython.final
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef bint _greenlet_imported
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef class WaitOperationsGreenlet(SwitchOutGreenletWithLoop):
cpdef wait(self, watcher)
cpdef cancel_wait(self, watcher, error, close_watcher=*)
cpdef _cancel_wait(self, watcher, error, close_watcher)
cdef class _WaitIterator:
cdef SwitchOutGreenletWithLoop _hub
cdef MultipleWaiter _waiter
cdef _switch
cdef _timeout
cdef _objects
cdef _timer
cdef Py_ssize_t _count
cdef bint _begun
cdef _cleanup(self)
cpdef iwait(objects, timeout=*, count=*)
cpdef wait(objects=*, timeout=*, count=*)
......@@ -20,5 +20,7 @@ cdef class IdentRegistry:
cdef object _registry
cdef list _available_idents
@cython.final
cpdef object get_ident(self, obj)
@cython.final
cpdef _return_ident(self, ValuedWeakRef ref)
# cython: auto_pickle=False
cimport cython
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef Timeout
......
cimport cython
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef sys
cdef ConcurrentObjectUseError
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef bint _greenlet_imported
cdef _NONE
......@@ -29,11 +31,17 @@ cdef inline void greenlet_init():
_greenlet_imported = True
cdef class Waiter:
cdef readonly hub
cdef readonly greenlet
cdef readonly SwitchOutGreenletWithLoop hub
cdef readonly greenlet greenlet
cdef readonly value
cdef _exception
cpdef get(self)
cpdef clear(self)
# cpdef of switch leads to parameter errors...
#cpdef switch(self, value)
@cython.final
@cython.internal
cdef class MultipleWaiter(Waiter):
......
cimport cython
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef _None
cdef reraise
cdef dump_traceback
cdef load_traceback
cdef get_hub
cdef InvalidSwitchError
cdef Timeout
......@@ -35,7 +38,7 @@ cdef void _init()
cdef class _AbstractLinkable:
cdef _notifier
cdef set _links
cdef readonly hub
cdef readonly SwitchOutGreenletWithLoop hub
cpdef rawlink(self, callback)
cpdef bint ready(self)
......
......@@ -9,6 +9,8 @@ cdef bint _PYPY
cdef sys_getframe
cdef sys_exc_info
cdef Timeout
cdef GreenletExit
cdef InvalidSwitchError
cdef extern from "greenlet/greenlet.h":
......@@ -141,14 +143,12 @@ cdef _threadlocal
cdef get_hub_class
cdef wref
cdef Timeout
cdef dump_traceback
cdef load_traceback
cdef Waiter
cdef wait
cdef iwait
cdef reraise
cdef InvalidSwitchError
cpdef GEVENT_CONFIG
......
# -*- coding: utf-8 -*-
# copyright (c) 2018 gevent. See LICENSE.
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
"""
A collection of primitives used by the hub, and suitable for
compilation with Cython because of their frequency of use.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from weakref import ref as wref
from greenlet import greenlet
from gevent.exceptions import BlockingSwitchOutError
# In Cython, we define these as 'cdef inline' functions. The
# compilation unit cannot have a direct assignment to them (import
# is assignment) without generating a 'lvalue is not valid target'
# error.
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
locals()['_greenlet_switch'] = greenlet.switch
__all__ = [
'TrackedRawGreenlet',
'SwitchOutGreenletWithLoop',
]
class TrackedRawGreenlet(greenlet):
def __init__(self, function, parent):
greenlet.__init__(self, function, parent)
# See greenlet.py's Greenlet class. We capture the cheap
# parts to maintain the tree structure, but we do not capture
# the stack because that's too expensive for 'spawn_raw'.
current = getcurrent() # pylint:disable=undefined-variable
self.spawning_greenlet = wref(current)
# See Greenlet for how trees are maintained.
try:
self.spawn_tree_locals = current.spawn_tree_locals
except AttributeError:
self.spawn_tree_locals = {}
if current.parent:
current.spawn_tree_locals = self.spawn_tree_locals
class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
# Subclasses must define:
# - self.loop
# This class defines loop in its .pxd for Cython. This lets us avoid
# circular dependencies with the hub.
def switch(self):
switch_out = getattr(getcurrent(), 'switch_out', None) # pylint:disable=undefined-variable
if switch_out is not None:
switch_out()
return _greenlet_switch(self) # pylint:disable=undefined-variable
def switch_out(self):
raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__greenlet_primitives')
# -*- coding: utf-8 -*-
# copyright (c) 2018 gevent. See LICENSE.
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
"""
A collection of primitives used by the hub, and suitable for
compilation with Cython because of their frequency of use.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import traceback
from gevent.exceptions import InvalidSwitchError
from gevent import _greenlet_primitives
from gevent import _waiter
from gevent._hub_local import get_hub_noargs as get_hub
# In Cython, we define these as 'cdef inline' functions. The
# compilation unit cannot have a direct assignment to them (import
# is assignment) without generating a 'lvalue is not valid target'
# error.
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
locals()['Waiter'] = _waiter.Waiter
locals()['MultipleWaiter'] = _waiter.MultipleWaiter
locals()['SwitchOutGreenletWithLoop'] = _greenlet_primitives.SwitchOutGreenletWithLoop
__all__ = [
'WaitOperationsGreenlet',
'iwait',
'wait',
]
class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable
def wait(self, watcher):
"""
Wait until the *watcher* (which must not be started) is ready.
The current greenlet will be unscheduled during this time.
"""
waiter = Waiter(self) # pylint:disable=undefined-variable
watcher.start(waiter.switch, waiter)
try:
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (
getcurrent(), # pylint:disable=undefined-variable
result, waiter))
finally:
watcher.stop()
def cancel_wait(self, watcher, error, close_watcher=False):
"""
Cancel an in-progress call to :meth:`wait` by throwing the given *error*
in the waiting greenlet.
.. versionchanged:: 1.3a1
Added the *close_watcher* parameter. If true, the watcher
will be closed after the exception is thrown. The watcher should then
be discarded. Closing the watcher is important to release native resources.
.. versionchanged:: 1.3a2
Allow the *watcher* to be ``None``. No action is taken in that case.
"""
if watcher is None:
# Presumably already closed.
# See https://github.com/gevent/gevent/issues/1089
return
if watcher.callback is not None:
self.loop.run_callback(self._cancel_wait, watcher, error, close_watcher)
elif close_watcher:
watcher.close()
def _cancel_wait(self, watcher, error, close_watcher):
# We have to check again to see if it was still active by the time
# our callback actually runs.
active = watcher.active
cb = watcher.callback
if close_watcher:
watcher.close()
if active:
# The callback should be greenlet.switch(). It may or may not be None.
glet = getattr(cb, '__self__', None)
if glet is not None:
glet.throw(error)
class _WaitIterator(object):
def __init__(self, objects, hub, timeout, count):
self._hub = hub
self._waiter = MultipleWaiter(hub) # pylint:disable=undefined-variable
self._switch = self._waiter.switch
self._timeout = timeout
self._objects = objects
self._timer = None
self._begun = False
# Even if we're only going to return 1 object,
# we must still rawlink() *all* of them, so that no
# matter which one finishes first we find it.
self._count = len(objects) if count is None else min(count, len(objects))
def __iter__(self):
# When we begin iterating, we begin the timer.
# XXX: If iteration doesn't actually happen, we
# could leave these links around!
if not self._begun:
self._begun = True
for obj in self._objects:
obj.rawlink(self._switch)
if self._timeout is not None:
self._timer = self._hub.loop.timer(self._timeout, priority=-1)
self._timer.start(self._switch, self)
return self
def __next__(self):
if self._count == 0:
# Exhausted
self._cleanup()
raise StopIteration()
self._count -= 1
try:
item = self._waiter.get()
self._waiter.clear()
if item is self:
# Timer expired, no more
self._cleanup()
raise StopIteration()
return item
except:
self._cleanup()
raise
next = __next__
def _cleanup(self):
if self._timer is not None:
self._timer.close()
self._timer = None
objs = self._objects
self._objects = ()
for aobj in objs:
unlink = getattr(aobj, 'unlink', None)
if unlink is not None:
try:
unlink(self._switch)
except: # pylint:disable=bare-except
traceback.print_exc()
def iwait(objects, timeout=None, count=None):
"""
Iteratively yield *objects* as they are ready, until all (or *count*) are ready
or *timeout* expired.
:param objects: A sequence (supporting :func:`len`) containing objects
implementing the wait protocol (rawlink() and unlink()).
:keyword int count: If not `None`, then a number specifying the maximum number
of objects to wait for. If ``None`` (the default), all objects
are waited for.
:keyword float timeout: If given, specifies a maximum number of seconds
to wait. If the timeout expires before the desired waited-for objects
are available, then this method returns immediately.
.. seealso:: :func:`wait`
.. versionchanged:: 1.1a1
Add the *count* parameter.
.. versionchanged:: 1.1a2
No longer raise :exc:`LoopExit` if our caller switches greenlets
in between items yielded by this function.
"""
# QQQ would be nice to support iterable here that can be generated slowly (why?)
hub = get_hub()
if objects is None:
return [hub.join(timeout=timeout)]
return _WaitIterator(objects, hub, timeout, count)
def wait(objects=None, timeout=None, count=None):
"""
Wait for ``objects`` to become ready or for event loop to finish.
If ``objects`` is provided, it must be a list containing objects
implementing the wait protocol (rawlink() and unlink() methods):
- :class:`gevent.Greenlet` instance
- :class:`gevent.event.Event` instance
- :class:`gevent.lock.Semaphore` instance
- :class:`gevent.subprocess.Popen` instance
If ``objects`` is ``None`` (the default), ``wait()`` blocks until
the current event loop has nothing to do (or until ``timeout`` passes):
- all greenlets have finished
- all servers were stopped
- all event loop watchers were stopped.
If ``count`` is ``None`` (the default), wait for all ``objects``
to become ready.
If ``count`` is a number, wait for (up to) ``count`` objects to become
ready. (For example, if count is ``1`` then the function exits
when any object in the list is ready).
If ``timeout`` is provided, it specifies the maximum number of
seconds ``wait()`` will block.
Returns the list of ready objects, in the order in which they were
ready.
.. seealso:: :func:`iwait`
"""
if objects is None:
hub = get_hub()
return hub.join(timeout=timeout) # pylint:disable=
return list(iwait(objects, timeout, count))
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__hub_primitives')
......@@ -70,7 +70,11 @@ class IdentRegistry(object):
def _return_ident(self, vref):
# By the time this is called, self._registry has been
# updated
heappush(self._available_idents, vref.value)
if heappush is not None:
# Under some circumstances we can get called
# when the interpreter is shutting down, and globals
# aren't available any more.
heappush(self._available_idents, vref.value)
def __len__(self):
return len(self._registry)
......
......@@ -99,8 +99,14 @@ class Waiter(object):
if self._exception is not _NONE:
return self._exception
def switch(self, value=None):
"""Switch to the greenlet if one's available. Otherwise store the value."""
def switch(self, value):
"""
Switch to the greenlet if one's available. Otherwise store the
*value*.
.. versionchanged:: 1.3b1
The *value* is no longer optional.
"""
greenlet = self.greenlet
if greenlet is None:
self.value = value
......@@ -178,7 +184,7 @@ class MultipleWaiter(Waiter):
# here can be impractical (see https://github.com/gevent/gevent/issues/652)
self._values = list()
def switch(self, value): # pylint:disable=signature-differs
def switch(self, value):
self._values.append(value)
Waiter.switch(self, True)
......
......@@ -9,9 +9,9 @@ from gevent._util import _NONE
from gevent._compat import reraise
from gevent._tblib import dump_traceback, load_traceback
from gevent.hub import _get_hub_noargs as get_hub
from gevent._hub_local import get_hub_noargs as get_hub
from gevent.hub import InvalidSwitchError
from gevent.exceptions import InvalidSwitchError
from gevent.timeout import Timeout
......@@ -23,8 +23,6 @@ __all__ = [
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
import cython
class _AbstractLinkable(object):
# Encapsulates the standard parts of the linking and notifying protocol
......
......@@ -7,16 +7,18 @@ from sys import _getframe as sys_getframe
from sys import exc_info as sys_exc_info
from weakref import ref as wref
# XXX: How to get cython to let us rename this as RawGreenlet
# like we prefer?
from greenlet import greenlet
from greenlet import GreenletExit
from gevent._compat import reraise
from gevent._compat import PYPY as _PYPY
from gevent._tblib import dump_traceback
from gevent._tblib import load_traceback
from gevent.hub import GreenletExit
from gevent.hub import InvalidSwitchError
from gevent.exceptions import InvalidSwitchError
from gevent.hub import iwait
from gevent.hub import wait
......@@ -875,7 +877,7 @@ def _kill(glet, exception, waiter):
# XXX do we need this here?
glet.parent.handle_error(glet, *sys_exc_info())
if waiter is not None:
waiter.switch()
waiter.switch(None)
def joinall(greenlets, timeout=None, raise_error=False, count=None):
......
......@@ -9,7 +9,7 @@ from functools import partial as _functools_partial
import sys
import traceback
from weakref import ref as wref
from greenlet import greenlet as RawGreenlet
from greenlet import getcurrent
......@@ -31,9 +31,7 @@ __all__ = [
]
from gevent._config import config as GEVENT_CONFIG
from gevent._compat import xrange
from gevent._compat import thread_mod_name
from gevent._util import _NONE
from gevent._util import readproperty
from gevent._util import Lazy
from gevent._util import gmctime
......@@ -47,38 +45,24 @@ from gevent._hub_local import get_hub_if_exists as _get_hub
from gevent._hub_local import get_hub_noargs as _get_hub_noargs
from gevent._hub_local import set_default_hub_class
from gevent._greenlet_primitives import TrackedRawGreenlet
from gevent._hub_primitives import WaitOperationsGreenlet
# Export
from gevent import _hub_primitives
wait = _hub_primitives.wait
iwait = _hub_primitives.iwait
from gevent.monkey import get_original
from gevent.exceptions import LoopExit
from gevent.exceptions import BlockingSwitchOutError
from gevent.exceptions import InvalidSwitchError
from gevent._waiter import Waiter
from gevent._waiter import MultipleWaiter as _MultipleWaiter
get_thread_ident = get_original(thread_mod_name, 'get_ident')
MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.
class TrackedRawGreenlet(RawGreenlet):
def __init__(self, function, parent):
RawGreenlet.__init__(self, function, parent)
# See greenlet.py's Greenlet class. We capture the cheap
# parts to maintain the tree structure, but we do not capture
# the stack because that's too expensive for 'spawn_raw'.
current = getcurrent()
self.spawning_greenlet = wref(current)
# See Greenlet for how trees are maintained.
try:
self.spawn_tree_locals = current.spawn_tree_locals
except AttributeError:
self.spawn_tree_locals = {}
if current.parent:
current.spawn_tree_locals = self.spawn_tree_locals
def spawn_raw(function, *args, **kwargs):
"""
Create a new :class:`greenlet.greenlet` object and schedule it to
......@@ -163,7 +147,7 @@ def sleep(seconds=0, ref=True):
loop = hub.loop
if seconds <= 0:
waiter = Waiter(hub)
loop.run_callback(waiter.switch)
loop.run_callback(waiter.switch, None)
waiter.get()
else:
with loop.timer(seconds, ref=ref) as t:
......@@ -348,18 +332,9 @@ def reinit():
#sleep(0.00001)
class _dummy_greenlet(object):
def throw(self):
pass
_dummy_greenlet = _dummy_greenlet()
hub_ident_registry = IdentRegistry()
class Hub(TrackedRawGreenlet):
class Hub(WaitOperationsGreenlet):
"""
A greenlet that runs the event loop.
......@@ -400,7 +375,7 @@ class Hub(TrackedRawGreenlet):
name = ''
def __init__(self, loop=None, default=None):
TrackedRawGreenlet.__init__(self, None, None)
WaitOperationsGreenlet.__init__(self, None, None)
self.thread_ident = get_thread_ident()
if hasattr(loop, 'run'):
if default is not None:
......@@ -551,77 +526,13 @@ class Hub(TrackedRawGreenlet):
context = repr(context)
errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))
def switch(self):
switch_out = getattr(getcurrent(), 'switch_out', None)
if switch_out is not None:
switch_out()
return RawGreenlet.switch(self)
def switch_out(self):
raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
def wait(self, watcher):
"""
Wait until the *watcher* (which must not be started) is ready.
The current greenlet will be unscheduled during this time.
.. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`,
:class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`,
:class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`,
:class:`gevent.core.child`, :class:`gevent.core.stat`
"""
waiter = Waiter(self)
unique = object()
watcher.start(waiter.switch, unique)
try:
result = waiter.get()
if result is not unique:
raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
finally:
watcher.stop()
def cancel_wait(self, watcher, error, close_watcher=False):
"""
Cancel an in-progress call to :meth:`wait` by throwing the given *error*
in the waiting greenlet.
.. versionchanged:: 1.3a1
Added the *close_watcher* parameter. If true, the watcher
will be closed after the exception is thrown. The watcher should then
be discarded. Closing the watcher is important to release native resources.
.. versionchanged:: 1.3a2
Allow the *watcher* to be ``None``. No action is taken in that case.
"""
if watcher is None:
# Presumably already closed.
# See https://github.com/gevent/gevent/issues/1089
return
if watcher.callback is not None:
print('Scheduling close for', watcher, close_watcher)
self.loop.run_callback(self._cancel_wait, watcher, error, close_watcher)
elif close_watcher:
watcher.close()
def _cancel_wait(self, watcher, error, close_watcher, _dummy_greenlet=_dummy_greenlet):
# We have to check again to see if it was still active by the time
# our callback actually runs.
active = watcher.active
cb = watcher.callback
if close_watcher:
watcher.close()
if active:
# The callback should be greenlet.switch(). It may or may not be None.
greenlet = getattr(cb, '__self__', _dummy_greenlet)
greenlet.throw(error)
def run(self):
"""
Entry-point to running the loop. This method is called automatically
when the hub greenlet is scheduled; do not call it directly.
:raises LoopExit: If the loop finishes running. This means
:raises gevent.exceptions.LoopExit: If the loop finishes running. This means
that there are no other scheduled greenlets, and no active
watchers or servers. In some situations, this indicates a
programming error.
......@@ -676,7 +587,7 @@ class Hub(TrackedRawGreenlet):
if timeout is not None:
timeout = self.loop.timer(timeout, ref=False)
timeout.start(waiter.switch)
timeout.start(waiter.switch, None)
try:
try:
......@@ -759,102 +670,6 @@ class Hub(TrackedRawGreenlet):
set_default_hub_class(Hub)
def iwait(objects, timeout=None, count=None):
"""
Iteratively yield *objects* as they are ready, until all (or *count*) are ready
or *timeout* expired.
:param objects: A sequence (supporting :func:`len`) containing objects
implementing the wait protocol (rawlink() and unlink()).
:keyword int count: If not `None`, then a number specifying the maximum number
of objects to wait for. If ``None`` (the default), all objects
are waited for.
:keyword float timeout: If given, specifies a maximum number of seconds
to wait. If the timeout expires before the desired waited-for objects
are available, then this method returns immediately.
.. seealso:: :func:`wait`
.. versionchanged:: 1.1a1
Add the *count* parameter.
.. versionchanged:: 1.1a2
No longer raise :exc:`LoopExit` if our caller switches greenlets
in between items yielded by this function.
"""
# QQQ would be nice to support iterable here that can be generated slowly (why?)
hub = _get_hub_noargs()
if objects is None:
yield hub.join(timeout=timeout)
return
count = len(objects) if count is None else min(count, len(objects))
waiter = _MultipleWaiter(hub)
switch = waiter.switch
if timeout is not None:
timer = hub.loop.timer(timeout, priority=-1)
timer.start(switch, _NONE)
try:
for obj in objects:
obj.rawlink(switch)
for _ in xrange(count):
item = waiter.get()
waiter.clear()
if item is _NONE:
return
yield item
finally:
if timeout is not None:
timer.close()
for aobj in objects:
unlink = getattr(aobj, 'unlink', None)
if unlink:
try:
unlink(switch)
except: # pylint:disable=bare-except
traceback.print_exc()
def wait(objects=None, timeout=None, count=None):
"""
Wait for ``objects`` to become ready or for event loop to finish.
If ``objects`` is provided, it must be a list containing objects
implementing the wait protocol (rawlink() and unlink() methods):
- :class:`gevent.Greenlet` instance
- :class:`gevent.event.Event` instance
- :class:`gevent.lock.Semaphore` instance
- :class:`gevent.subprocess.Popen` instance
If ``objects`` is ``None`` (the default), ``wait()`` blocks until
the current event loop has nothing to do (or until ``timeout`` passes):
- all greenlets have finished
- all servers were stopped
- all event loop watchers were stopped.
If ``count`` is ``None`` (the default), wait for all ``objects``
to become ready.
If ``count`` is a number, wait for (up to) ``count`` objects to become
ready. (For example, if count is ``1`` then the function exits
when any object in the list is ready).
If ``timeout`` is provided, it specifies the maximum number of
seconds ``wait()`` will block.
Returns the list of ready objects, in the order in which they were
ready.
.. seealso:: :func:`iwait`
"""
if objects is None:
return _get_hub_noargs().join(timeout=timeout)
return list(iwait(objects, timeout, count))
class linkproxy(object):
__slots__ = ['callback', 'obj']
......
......@@ -208,8 +208,8 @@ class TestReturn_link(LinksTestCase):
self.assertFalse(p)
self.assertIsInstance(event.get(), greenlet.GreenletExit)
self.assertIsInstance(queue.get().get(), greenlet.GreenletExit)
self.assertIsInstance(event.get(), gevent.GreenletExit)
self.assertIsInstance(queue.get().get(), gevent.GreenletExit)
sleep(DELAY)
self.assertFalse(callback_flag)
......
......@@ -95,7 +95,7 @@ class TestWaiterGet(greentest.timing.AbstractGenericWaitTestCase):
def wait(self, timeout):
with get_hub().loop.timer(timeout) as evt:
evt.start(self.waiter.switch)
evt.start(self.waiter.switch, None)
return self.waiter.get()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment