Commit 62802671 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1156 from gevent/cython-waiter

Compile the important hub operations that use Waiters with Cython
parents 821e7fc8 a4fbd046
......@@ -16,6 +16,8 @@ src/gevent/event.c
src/gevent/_hub_local.c
src/gevent/_waiter.c
src/gevent/queue.c
src/gevent/_hub_primitives.c
src/gevent/_greenlet_primitives.c
src/gevent/libev/corecext.c
src/gevent/libev/corecext.h
src/gevent/libev/_corecffi.c
......
......@@ -51,7 +51,8 @@ Enhancements
- The classes `gevent.event.Event` and `gevent.event.AsyncResult`
are compiled with Cython for improved performance, as is the
``gevent.queue`` module and ``gevent.hub.Waiter``. Please report any
``gevent.queue`` module and ``gevent.hub.Waiter`` and certain
time-sensitive parts of the hub itself. Please report any
compatibility issues.
Monitoring and Debugging
......
# -*- coding: utf-8 -*-
"""
Benchmarks for hub primitive operations.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import perf
from perf import perf_counter
import gevent
from greenlet import greenlet
from greenlet import getcurrent
N = 1000
def bench_switch():
class Parent(type(gevent.get_hub())):
def run(self):
parent = self.parent
for _ in range(N):
parent.switch()
def child():
parent = getcurrent().parent
# Back to the hub, which in turn goes
# back to the main greenlet
for _ in range(N):
parent.switch()
hub = Parent(None, None)
child_greenlet = greenlet(child, hub)
for _ in range(N):
child_greenlet.switch()
def bench_wait_ready():
class Watcher(object):
def start(self, cb, obj):
# Immediately switch back to the waiter, mark as ready
cb(obj)
def stop(self):
pass
watcher = Watcher()
hub = gevent.get_hub()
for _ in range(1000):
hub.wait(watcher)
def bench_cancel_wait():
class Watcher(object):
active = True
callback = object()
def close(self):
pass
watcher = Watcher()
hub = gevent.get_hub()
loop = hub.loop
for _ in range(1000):
# Schedule all the callbacks.
hub.cancel_wait(watcher, None, True)
# Run them!
for cb in loop._callbacks:
if cb.callback:
cb.callback(*cb.args)
cb.stop() # so the real loop won't do it
# destroy the loop so we don't keep building these functions
# up
hub.destroy(True)
def bench_wait_func_ready():
from gevent.hub import wait
class ToWatch(object):
def rawlink(self, cb):
cb(self)
watched_objects = [ToWatch() for _ in range(N)]
t0 = perf_counter()
wait(watched_objects)
return perf_counter() - t0
def main():
runner = perf.Runner()
runner.bench_func('multiple wait ready',
bench_wait_func_ready,
inner_loops=N)
runner.bench_func('wait ready',
bench_wait_ready,
inner_loops=N)
runner.bench_func('cancel wait',
bench_cancel_wait,
inner_loops=N)
runner.bench_func('switch',
bench_switch,
inner_loops=N)
if __name__ == '__main__':
main()
......@@ -10,6 +10,9 @@
:members:
:undoc-members:
.. automethod:: wait
.. automethod:: cancel_wait
.. autoclass:: Waiter
.. autoclass:: LoopExit
......@@ -119,17 +119,32 @@ WAITER = Extension(name="gevent.__waiter",
depends=['src/gevent/__waiter.pxd'],
include_dirs=include_dirs)
HUB_PRIMITIVES = Extension(name="gevent.__hub_primitives",
sources=["src/gevent/_hub_primitives.py"],
depends=['src/gevent/__hub_primitives.pxd'],
include_dirs=include_dirs)
GLT_PRIMITIVES = Extension(name="gevent.__greenlet_primitives",
sources=["src/gevent/_greenlet_primitives.py"],
depends=['src/gevent/__greenlet_primitives.pxd'],
include_dirs=include_dirs)
_to_cythonize = [
GLT_PRIMITIVES,
HUB_PRIMITIVES,
HUB_LOCAL,
WAITER,
GREENLET,
SEMAPHORE,
LOCAL,
GREENLET,
IDENT,
IMAP,
EVENT,
QUEUE,
HUB_LOCAL,
WAITER,
]
EXT_MODULES = [
......@@ -144,6 +159,8 @@ EXT_MODULES = [
QUEUE,
HUB_LOCAL,
WAITER,
HUB_PRIMITIVES,
GLT_PRIMITIVES,
]
LIBEV_CFFI_MODULE = 'src/gevent/libev/_corecffi_build.py:ffi'
......@@ -221,6 +238,12 @@ if PYPY:
EXT_MODULES.remove(WAITER)
_to_cythonize.remove(WAITER)
EXT_MODULES.remove(GLT_PRIMITIVES)
_to_cythonize.remove(GLT_PRIMITIVES)
EXT_MODULES.remove(HUB_PRIMITIVES)
_to_cythonize.remove(HUB_PRIMITIVES)
for mod in _to_cythonize:
EXT_MODULES.remove(mod)
EXT_MODULES.append(cythonize1(mod))
......
cimport cython
# This file must not cimport anything from gevent.
cdef wref
cdef BlockingSwitchOutError
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
object PyGreenlet_Switch(greenlet self, void* args, void* kwargs)
void PyGreenlet_Import()
@cython.final
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef bint _greenlet_imported
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef inline object _greenlet_switch(greenlet self):
return PyGreenlet_Switch(self, NULL, NULL)
cdef class TrackedRawGreenlet(greenlet):
pass
cdef class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
cdef public loop
cpdef switch(self)
cpdef switch_out(self)
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
cdef _threadlocal
cpdef get_hub_class()
cpdef get_hub_if_exists()
cpdef set_hub(hub)
cpdef SwitchOutGreenletWithLoop get_hub_if_exists()
cpdef set_hub(SwitchOutGreenletWithLoop hub)
cpdef get_loop()
cpdef set_loop(loop)
# We can't cdef this, it won't do varargs.
# cpdef WaitOperationsGreenlet get_hub(*args, **kwargs)
# XXX: TODO: Move the definition of TrackedRawGreenlet
# into a file that can be cython compiled so get_hub can
# return that.
cpdef get_hub_noargs()
cpdef SwitchOutGreenletWithLoop get_hub_noargs()
cimport cython
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent.__hub_local cimport get_hub_noargs as get_hub
from gevent.__waiter cimport Waiter
from gevent.__waiter cimport MultipleWaiter
cdef InvalidSwitchError
cdef _waiter
cdef _greenlet_primitives
cdef traceback
cdef _timeout_error
cdef Timeout
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
@cython.final
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef bint _greenlet_imported
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef class WaitOperationsGreenlet(SwitchOutGreenletWithLoop):
cpdef wait(self, watcher)
cpdef cancel_wait(self, watcher, error, close_watcher=*)
cpdef _cancel_wait(self, watcher, error, close_watcher)
cdef class _WaitIterator:
cdef SwitchOutGreenletWithLoop _hub
cdef MultipleWaiter _waiter
cdef _switch
cdef _timeout
cdef _objects
cdef _timer
cdef Py_ssize_t _count
cdef bint _begun
cdef _cleanup(self)
cpdef iwait_on_objects(objects, timeout=*, count=*)
cpdef wait_on_objects(objects=*, timeout=*, count=*)
cdef _primitive_wait(watcher, timeout, timeout_exc, WaitOperationsGreenlet hub)
cpdef wait_on_watcher(watcher, timeout=*, timeout_exc=*, WaitOperationsGreenlet hub=*)
cpdef wait_read(fileno, timeout=*, timeout_exc=*)
cpdef wait_write(fileno, timeout=*, timeout_exc=*, event=*)
cpdef wait_readwrite(fileno, timeout=*, timeout_exc=*, event=*)
cpdef wait_on_socket(socket, watcher, timeout_exc=*)
......@@ -20,5 +20,7 @@ cdef class IdentRegistry:
cdef object _registry
cdef list _available_idents
@cython.final
cpdef object get_ident(self, obj)
@cython.final
cpdef _return_ident(self, ValuedWeakRef ref)
# cython: auto_pickle=False
cimport cython
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef Timeout
......
cimport cython
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef sys
cdef ConcurrentObjectUseError
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef bint _greenlet_imported
cdef _NONE
......@@ -29,11 +31,17 @@ cdef inline void greenlet_init():
_greenlet_imported = True
cdef class Waiter:
cdef readonly hub
cdef readonly greenlet
cdef readonly SwitchOutGreenletWithLoop hub
cdef readonly greenlet greenlet
cdef readonly value
cdef _exception
cpdef get(self)
cpdef clear(self)
# cpdef of switch leads to parameter errors...
#cpdef switch(self, value)
@cython.final
@cython.internal
cdef class MultipleWaiter(Waiter):
......
cimport cython
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef _None
cdef reraise
cdef dump_traceback
cdef load_traceback
cdef get_hub
cdef InvalidSwitchError
cdef Timeout
......@@ -35,7 +38,7 @@ cdef void _init()
cdef class _AbstractLinkable:
cdef _notifier
cdef set _links
cdef readonly hub
cdef readonly SwitchOutGreenletWithLoop hub
cpdef rawlink(self, callback)
cpdef bint ready(self)
......
......@@ -9,6 +9,8 @@ cdef bint _PYPY
cdef sys_getframe
cdef sys_exc_info
cdef Timeout
cdef GreenletExit
cdef InvalidSwitchError
cdef extern from "greenlet/greenlet.h":
......@@ -141,14 +143,12 @@ cdef _threadlocal
cdef get_hub_class
cdef wref
cdef Timeout
cdef dump_traceback
cdef load_traceback
cdef Waiter
cdef wait
cdef iwait
cdef reraise
cdef InvalidSwitchError
cpdef GEVENT_CONFIG
......
# -*- coding: utf-8 -*-
# copyright (c) 2018 gevent. See LICENSE.
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
"""
A collection of primitives used by the hub, and suitable for
compilation with Cython because of their frequency of use.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from weakref import ref as wref
from greenlet import greenlet
from gevent.exceptions import BlockingSwitchOutError
# In Cython, we define these as 'cdef inline' functions. The
# compilation unit cannot have a direct assignment to them (import
# is assignment) without generating a 'lvalue is not valid target'
# error.
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
locals()['_greenlet_switch'] = greenlet.switch
__all__ = [
'TrackedRawGreenlet',
'SwitchOutGreenletWithLoop',
]
class TrackedRawGreenlet(greenlet):
def __init__(self, function, parent):
greenlet.__init__(self, function, parent)
# See greenlet.py's Greenlet class. We capture the cheap
# parts to maintain the tree structure, but we do not capture
# the stack because that's too expensive for 'spawn_raw'.
current = getcurrent() # pylint:disable=undefined-variable
self.spawning_greenlet = wref(current)
# See Greenlet for how trees are maintained.
try:
self.spawn_tree_locals = current.spawn_tree_locals
except AttributeError:
self.spawn_tree_locals = {}
if current.parent:
current.spawn_tree_locals = self.spawn_tree_locals
class SwitchOutGreenletWithLoop(TrackedRawGreenlet):
# Subclasses must define:
# - self.loop
# This class defines loop in its .pxd for Cython. This lets us avoid
# circular dependencies with the hub.
def switch(self):
switch_out = getattr(getcurrent(), 'switch_out', None) # pylint:disable=undefined-variable
if switch_out is not None:
switch_out()
return _greenlet_switch(self) # pylint:disable=undefined-variable
def switch_out(self):
raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__greenlet_primitives')
# -*- coding: utf-8 -*-
# copyright (c) 2018 gevent. See LICENSE.
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False,binding=True
"""
A collection of primitives used by the hub, and suitable for
compilation with Cython because of their frequency of use.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import traceback
from gevent.exceptions import InvalidSwitchError
from gevent.exceptions import ConcurrentObjectUseError
from gevent import _greenlet_primitives
from gevent import _waiter
from gevent._util import _NONE
from gevent._hub_local import get_hub_noargs as get_hub
from gevent.timeout import Timeout
# In Cython, we define these as 'cdef inline' functions. The
# compilation unit cannot have a direct assignment to them (import
# is assignment) without generating a 'lvalue is not valid target'
# error.
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
locals()['Waiter'] = _waiter.Waiter
locals()['MultipleWaiter'] = _waiter.MultipleWaiter
locals()['SwitchOutGreenletWithLoop'] = _greenlet_primitives.SwitchOutGreenletWithLoop
__all__ = [
'WaitOperationsGreenlet',
'iwait_on_objects',
'wait_on_objects',
'wait_read',
'wait_write',
'wait_readwrite',
]
class WaitOperationsGreenlet(SwitchOutGreenletWithLoop): # pylint:disable=undefined-variable
def wait(self, watcher):
"""
Wait until the *watcher* (which must not be started) is ready.
The current greenlet will be unscheduled during this time.
"""
waiter = Waiter(self) # pylint:disable=undefined-variable
watcher.start(waiter.switch, waiter)
try:
result = waiter.get()
if result is not waiter:
raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (
getcurrent(), # pylint:disable=undefined-variable
result, waiter))
finally:
watcher.stop()
def cancel_wait(self, watcher, error, close_watcher=False):
"""
Cancel an in-progress call to :meth:`wait` by throwing the given *error*
in the waiting greenlet.
.. versionchanged:: 1.3a1
Added the *close_watcher* parameter. If true, the watcher
will be closed after the exception is thrown. The watcher should then
be discarded. Closing the watcher is important to release native resources.
.. versionchanged:: 1.3a2
Allow the *watcher* to be ``None``. No action is taken in that case.
"""
if watcher is None:
# Presumably already closed.
# See https://github.com/gevent/gevent/issues/1089
return
if watcher.callback is not None:
self.loop.run_callback(self._cancel_wait, watcher, error, close_watcher)
elif close_watcher:
watcher.close()
def _cancel_wait(self, watcher, error, close_watcher):
# We have to check again to see if it was still active by the time
# our callback actually runs.
active = watcher.active
cb = watcher.callback
if close_watcher:
watcher.close()
if active:
# The callback should be greenlet.switch(). It may or may not be None.
glet = getattr(cb, '__self__', None)
if glet is not None:
glet.throw(error)
class _WaitIterator(object):
def __init__(self, objects, hub, timeout, count):
self._hub = hub
self._waiter = MultipleWaiter(hub) # pylint:disable=undefined-variable
self._switch = self._waiter.switch
self._timeout = timeout
self._objects = objects
self._timer = None
self._begun = False
# Even if we're only going to return 1 object,
# we must still rawlink() *all* of them, so that no
# matter which one finishes first we find it.
self._count = len(objects) if count is None else min(count, len(objects))
def __iter__(self):
# When we begin iterating, we begin the timer.
# XXX: If iteration doesn't actually happen, we
# could leave these links around!
if not self._begun:
self._begun = True
for obj in self._objects:
obj.rawlink(self._switch)
if self._timeout is not None:
self._timer = self._hub.loop.timer(self._timeout, priority=-1)
self._timer.start(self._switch, self)
return self
def __next__(self):
if self._count == 0:
# Exhausted
self._cleanup()
raise StopIteration()
self._count -= 1
try:
item = self._waiter.get()
self._waiter.clear()
if item is self:
# Timer expired, no more
self._cleanup()
raise StopIteration()
return item
except:
self._cleanup()
raise
next = __next__
def _cleanup(self):
if self._timer is not None:
self._timer.close()
self._timer = None
objs = self._objects
self._objects = ()
for aobj in objs:
unlink = getattr(aobj, 'unlink', None)
if unlink is not None:
try:
unlink(self._switch)
except: # pylint:disable=bare-except
traceback.print_exc()
def iwait_on_objects(objects, timeout=None, count=None):
"""
Iteratively yield *objects* as they are ready, until all (or *count*) are ready
or *timeout* expired.
:param objects: A sequence (supporting :func:`len`) containing objects
implementing the wait protocol (rawlink() and unlink()).
:keyword int count: If not `None`, then a number specifying the maximum number
of objects to wait for. If ``None`` (the default), all objects
are waited for.
:keyword float timeout: If given, specifies a maximum number of seconds
to wait. If the timeout expires before the desired waited-for objects
are available, then this method returns immediately.
.. seealso:: :func:`wait`
.. versionchanged:: 1.1a1
Add the *count* parameter.
.. versionchanged:: 1.1a2
No longer raise :exc:`LoopExit` if our caller switches greenlets
in between items yielded by this function.
"""
# QQQ would be nice to support iterable here that can be generated slowly (why?)
hub = get_hub()
if objects is None:
return [hub.join(timeout=timeout)]
return _WaitIterator(objects, hub, timeout, count)
def wait_on_objects(objects=None, timeout=None, count=None):
"""
Wait for ``objects`` to become ready or for event loop to finish.
If ``objects`` is provided, it must be a list containing objects
implementing the wait protocol (rawlink() and unlink() methods):
- :class:`gevent.Greenlet` instance
- :class:`gevent.event.Event` instance
- :class:`gevent.lock.Semaphore` instance
- :class:`gevent.subprocess.Popen` instance
If ``objects`` is ``None`` (the default), ``wait()`` blocks until
the current event loop has nothing to do (or until ``timeout`` passes):
- all greenlets have finished
- all servers were stopped
- all event loop watchers were stopped.
If ``count`` is ``None`` (the default), wait for all ``objects``
to become ready.
If ``count`` is a number, wait for (up to) ``count`` objects to become
ready. (For example, if count is ``1`` then the function exits
when any object in the list is ready).
If ``timeout`` is provided, it specifies the maximum number of
seconds ``wait()`` will block.
Returns the list of ready objects, in the order in which they were
ready.
.. seealso:: :func:`iwait`
"""
if objects is None:
hub = get_hub()
return hub.join(timeout=timeout) # pylint:disable=
return list(iwait_on_objects(objects, timeout, count))
_timeout_error = Exception
def set_default_timeout_error(e):
global _timeout_error
_timeout_error = e
def _primitive_wait(watcher, timeout, timeout_exc, hub):
if watcher.callback is not None:
raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r'
% (watcher.callback, ))
if hub is None:
hub = get_hub()
if timeout is None:
hub.wait(watcher)
return
timeout = Timeout._start_new_or_dummy(
timeout,
(timeout_exc
if timeout_exc is not _NONE or timeout is None
else _timeout_error('timed out')))
with timeout:
hub.wait(watcher)
# Suitable to be bound as an instance method
def wait_on_socket(socket, watcher, timeout_exc=None):
_primitive_wait(watcher, socket.timeout,
timeout_exc if timeout_exc is not None else _NONE,
socket.hub)
def wait_on_watcher(watcher, timeout=None, timeout_exc=_NONE, hub=None):
"""
wait(watcher, timeout=None, [timeout_exc=None]) -> None
Block the current greenlet until *watcher* is ready.
If *timeout* is non-negative, then *timeout_exc* is raised after
*timeout* second has passed.
If :func:`cancel_wait` is called on *io* by another greenlet,
raise an exception in this blocking greenlet
(``socket.error(EBADF, 'File descriptor was closed in another
greenlet')`` by default).
:param io: A libev watcher, most commonly an IO watcher obtained from
:meth:`gevent.core.loop.io`
:keyword timeout_exc: The exception to raise if the timeout expires.
By default, a :class:`socket.timeout` exception is raised.
If you pass a value for this keyword, it is interpreted as for
:class:`gevent.timeout.Timeout`.
"""
_primitive_wait(watcher, timeout, timeout_exc, hub)
def wait_read(fileno, timeout=None, timeout_exc=_NONE):
"""
wait_read(fileno, timeout=None, [timeout_exc=None]) -> None
Block the current greenlet until *fileno* is ready to read.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
.. seealso:: :func:`cancel_wait`
"""
hub = get_hub()
io = hub.loop.io(fileno, 1)
try:
return wait_on_watcher(io, timeout, timeout_exc, hub)
finally:
io.close()
def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
wait_write(fileno, timeout=None, [timeout_exc=None]) -> None
Block the current greenlet until *fileno* is ready to write.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
.. deprecated:: 1.1
The keyword argument *event* is ignored. Applications should not pass this parameter.
In the future, doing so will become an error.
.. seealso:: :func:`cancel_wait`
"""
# pylint:disable=unused-argument
hub = get_hub()
io = hub.loop.io(fileno, 2)
try:
return wait_on_watcher(io, timeout, timeout_exc, hub)
finally:
io.close()
def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
wait_readwrite(fileno, timeout=None, [timeout_exc=None]) -> None
Block the current greenlet until *fileno* is ready to read or
write.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
.. deprecated:: 1.1
The keyword argument *event* is ignored. Applications should not pass this parameter.
In the future, doing so will become an error.
.. seealso:: :func:`cancel_wait`
"""
# pylint:disable=unused-argument
hub = get_hub()
io = hub.loop.io(fileno, 3)
try:
return wait_on_watcher(io, timeout, timeout_exc, hub)
finally:
io.close()
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__hub_primitives')
......@@ -70,7 +70,11 @@ class IdentRegistry(object):
def _return_ident(self, vref):
# By the time this is called, self._registry has been
# updated
heappush(self._available_idents, vref.value)
if heappush is not None:
# Under some circumstances we can get called
# when the interpreter is shutting down, and globals
# aren't available any more.
heappush(self._available_idents, vref.value)
def __len__(self):
return len(self._registry)
......
......@@ -103,6 +103,7 @@ class _closedsocket(object):
timeout_default = object()
from gevent._hub_primitives import wait_on_socket as _wait_on_socket
class socket(object):
"""
......@@ -180,22 +181,7 @@ class socket(object):
ref = property(_get_ref, _set_ref)
def _wait(self, watcher, timeout_exc=timeout('timed out')):
"""Block the current greenlet until *watcher* has pending events.
If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
By default *timeout_exc* is ``socket.timeout('timed out')``.
If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
"""
if watcher.callback is not None:
raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
timeout = Timeout._start_new_or_dummy(self.timeout, timeout_exc, ref=False)
try:
self.hub.wait(watcher)
finally:
timeout.close()
_wait = _wait_on_socket
def accept(self):
sock = self._sock
......
......@@ -67,6 +67,7 @@ class _wrefsocket(_socket.socket):
timeout = property(lambda s: s.gettimeout(),
lambda s, nv: s.settimeout(nv))
from gevent._hub_primitives import wait_on_socket as _wait_on_socket
class socket(object):
"""
......@@ -181,22 +182,7 @@ class socket(object):
ref = property(_get_ref, _set_ref)
def _wait(self, watcher, timeout_exc=timeout('timed out')):
"""Block the current greenlet until *watcher* has pending events.
If *timeout* is non-negative, then *timeout_exc* is raised after *timeout* second has passed.
By default *timeout_exc* is ``socket.timeout('timed out')``.
If :func:`cancel_wait` is called, raise ``socket.error(EBADF, 'File descriptor was closed in another greenlet')``.
"""
if watcher.callback is not None:
raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
timer = Timeout._start_new_or_dummy(self.timeout, timeout_exc, ref=False)
try:
self.hub.wait(watcher)
finally:
timer.close()
_wait = _wait_on_socket
def dup(self):
"""dup() -> socket object
......
......@@ -134,96 +134,13 @@ del _name, _value
_timeout_error = timeout # pylint: disable=undefined-variable
from gevent import _hub_primitives
_hub_primitives.set_default_timeout_error(_timeout_error)
def wait(io, timeout=None, timeout_exc=_NONE):
"""
Block the current greenlet until *io* is ready.
If *timeout* is non-negative, then *timeout_exc* is raised after
*timeout* second has passed. By default *timeout_exc* is
``socket.timeout('timed out')``.
If :func:`cancel_wait` is called on *io* by another greenlet,
raise an exception in this blocking greenlet
(``socket.error(EBADF, 'File descriptor was closed in another
greenlet')`` by default).
:param io: A libev watcher, most commonly an IO watcher obtained from
:meth:`gevent.core.loop.io`
:keyword timeout_exc: The exception to raise if the timeout expires.
By default, a :class:`socket.timeout` exception is raised.
If you pass a value for this keyword, it is interpreted as for
:class:`gevent.timeout.Timeout`.
"""
if io.callback is not None:
raise ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (io.callback, ))
timeout = Timeout._start_new_or_dummy(
timeout,
(timeout_exc
if timeout_exc is not _NONE or timeout is None
else _timeout_error('timed out')))
with timeout:
return get_hub().wait(io)
# rename "io" to "watcher" because wait() works with any watcher
def wait_read(fileno, timeout=None, timeout_exc=_NONE):
"""
Block the current greenlet until *fileno* is ready to read.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
.. seealso:: :func:`cancel_wait`
"""
io = get_hub().loop.io(fileno, 1)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
Block the current greenlet until *fileno* is ready to write.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
:keyword event: Ignored. Applications should not pass this parameter.
In the future, it may become an error.
.. seealso:: :func:`cancel_wait`
"""
# pylint:disable=unused-argument
io = get_hub().loop.io(fileno, 2)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
Block the current greenlet until *fileno* is ready to read or
write.
For the meaning of the other parameters and possible exceptions,
see :func:`wait`.
:keyword event: Ignored. Applications should not pass this parameter.
In the future, it may become an error.
.. seealso:: :func:`cancel_wait`
"""
# pylint:disable=unused-argument
io = get_hub().loop.io(fileno, 3)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
wait = _hub_primitives.wait_on_watcher
wait_read = _hub_primitives.wait_read
wait_write = _hub_primitives.wait_write
wait_readwrite = _hub_primitives.wait_readwrite
#: The exception raised by default on a call to :func:`cancel_wait`
class cancel_wait_ex(error): # pylint: disable=undefined-variable
......@@ -238,8 +155,6 @@ def cancel_wait(watcher, error=cancel_wait_ex):
get_hub().cancel_wait(watcher, error)
def gethostbyname(hostname):
"""
gethostbyname(host) -> address
......
......@@ -99,8 +99,14 @@ class Waiter(object):
if self._exception is not _NONE:
return self._exception
def switch(self, value=None):
"""Switch to the greenlet if one's available. Otherwise store the value."""
def switch(self, value):
"""
Switch to the greenlet if one's available. Otherwise store the
*value*.
.. versionchanged:: 1.3b1
The *value* is no longer optional.
"""
greenlet = self.greenlet
if greenlet is None:
self.value = value
......@@ -178,7 +184,7 @@ class MultipleWaiter(Waiter):
# here can be impractical (see https://github.com/gevent/gevent/issues/652)
self._values = list()
def switch(self, value): # pylint:disable=signature-differs
def switch(self, value):
self._values.append(value)
Waiter.switch(self, True)
......
......@@ -9,9 +9,9 @@ from gevent._util import _NONE
from gevent._compat import reraise
from gevent._tblib import dump_traceback, load_traceback
from gevent.hub import _get_hub_noargs as get_hub
from gevent._hub_local import get_hub_noargs as get_hub
from gevent.hub import InvalidSwitchError
from gevent.exceptions import InvalidSwitchError
from gevent.timeout import Timeout
......@@ -23,8 +23,6 @@ __all__ = [
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
import cython
class _AbstractLinkable(object):
# Encapsulates the standard parts of the linking and notifying protocol
......
......@@ -7,16 +7,18 @@ from sys import _getframe as sys_getframe
from sys import exc_info as sys_exc_info
from weakref import ref as wref
# XXX: How to get cython to let us rename this as RawGreenlet
# like we prefer?
from greenlet import greenlet
from greenlet import GreenletExit
from gevent._compat import reraise
from gevent._compat import PYPY as _PYPY
from gevent._tblib import dump_traceback
from gevent._tblib import load_traceback
from gevent.hub import GreenletExit
from gevent.hub import InvalidSwitchError
from gevent.exceptions import InvalidSwitchError
from gevent.hub import iwait
from gevent.hub import wait
......@@ -875,7 +877,7 @@ def _kill(glet, exception, waiter):
# XXX do we need this here?
glet.parent.handle_error(glet, *sys_exc_info())
if waiter is not None:
waiter.switch()
waiter.switch(None)
def joinall(greenlets, timeout=None, raise_error=False, count=None):
......
......@@ -9,7 +9,7 @@ from functools import partial as _functools_partial
import sys
import traceback
from weakref import ref as wref
from greenlet import greenlet as RawGreenlet
from greenlet import getcurrent
......@@ -31,9 +31,7 @@ __all__ = [
]
from gevent._config import config as GEVENT_CONFIG
from gevent._compat import xrange
from gevent._compat import thread_mod_name
from gevent._util import _NONE
from gevent._util import readproperty
from gevent._util import Lazy
from gevent._util import gmctime
......@@ -47,38 +45,24 @@ from gevent._hub_local import get_hub_if_exists as _get_hub
from gevent._hub_local import get_hub_noargs as _get_hub_noargs
from gevent._hub_local import set_default_hub_class
from gevent._greenlet_primitives import TrackedRawGreenlet
from gevent._hub_primitives import WaitOperationsGreenlet
# Export
from gevent import _hub_primitives
wait = _hub_primitives.wait_on_objects
iwait = _hub_primitives.iwait_on_objects
from gevent.monkey import get_original
from gevent.exceptions import LoopExit
from gevent.exceptions import BlockingSwitchOutError
from gevent.exceptions import InvalidSwitchError
from gevent._waiter import Waiter
from gevent._waiter import MultipleWaiter as _MultipleWaiter
get_thread_ident = get_original(thread_mod_name, 'get_ident')
MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.
class TrackedRawGreenlet(RawGreenlet):
def __init__(self, function, parent):
RawGreenlet.__init__(self, function, parent)
# See greenlet.py's Greenlet class. We capture the cheap
# parts to maintain the tree structure, but we do not capture
# the stack because that's too expensive for 'spawn_raw'.
current = getcurrent()
self.spawning_greenlet = wref(current)
# See Greenlet for how trees are maintained.
try:
self.spawn_tree_locals = current.spawn_tree_locals
except AttributeError:
self.spawn_tree_locals = {}
if current.parent:
current.spawn_tree_locals = self.spawn_tree_locals
def spawn_raw(function, *args, **kwargs):
"""
Create a new :class:`greenlet.greenlet` object and schedule it to
......@@ -163,7 +147,7 @@ def sleep(seconds=0, ref=True):
loop = hub.loop
if seconds <= 0:
waiter = Waiter(hub)
loop.run_callback(waiter.switch)
loop.run_callback(waiter.switch, None)
waiter.get()
else:
with loop.timer(seconds, ref=ref) as t:
......@@ -348,18 +332,9 @@ def reinit():
#sleep(0.00001)
class _dummy_greenlet(object):
def throw(self):
pass
_dummy_greenlet = _dummy_greenlet()
hub_ident_registry = IdentRegistry()
class Hub(TrackedRawGreenlet):
class Hub(WaitOperationsGreenlet):
"""
A greenlet that runs the event loop.
......@@ -400,7 +375,7 @@ class Hub(TrackedRawGreenlet):
name = ''
def __init__(self, loop=None, default=None):
TrackedRawGreenlet.__init__(self, None, None)
WaitOperationsGreenlet.__init__(self, None, None)
self.thread_ident = get_thread_ident()
if hasattr(loop, 'run'):
if default is not None:
......@@ -551,77 +526,13 @@ class Hub(TrackedRawGreenlet):
context = repr(context)
errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))
def switch(self):
switch_out = getattr(getcurrent(), 'switch_out', None)
if switch_out is not None:
switch_out()
return RawGreenlet.switch(self)
def switch_out(self):
raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
def wait(self, watcher):
"""
Wait until the *watcher* (which must not be started) is ready.
The current greenlet will be unscheduled during this time.
.. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`,
:class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`,
:class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`,
:class:`gevent.core.child`, :class:`gevent.core.stat`
"""
waiter = Waiter(self)
unique = object()
watcher.start(waiter.switch, unique)
try:
result = waiter.get()
if result is not unique:
raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
finally:
watcher.stop()
def cancel_wait(self, watcher, error, close_watcher=False):
"""
Cancel an in-progress call to :meth:`wait` by throwing the given *error*
in the waiting greenlet.
.. versionchanged:: 1.3a1
Added the *close_watcher* parameter. If true, the watcher
will be closed after the exception is thrown. The watcher should then
be discarded. Closing the watcher is important to release native resources.
.. versionchanged:: 1.3a2
Allow the *watcher* to be ``None``. No action is taken in that case.
"""
if watcher is None:
# Presumably already closed.
# See https://github.com/gevent/gevent/issues/1089
return
if watcher.callback is not None:
print('Scheduling close for', watcher, close_watcher)
self.loop.run_callback(self._cancel_wait, watcher, error, close_watcher)
elif close_watcher:
watcher.close()
def _cancel_wait(self, watcher, error, close_watcher, _dummy_greenlet=_dummy_greenlet):
# We have to check again to see if it was still active by the time
# our callback actually runs.
active = watcher.active
cb = watcher.callback
if close_watcher:
watcher.close()
if active:
# The callback should be greenlet.switch(). It may or may not be None.
greenlet = getattr(cb, '__self__', _dummy_greenlet)
greenlet.throw(error)
def run(self):
"""
Entry-point to running the loop. This method is called automatically
when the hub greenlet is scheduled; do not call it directly.
:raises LoopExit: If the loop finishes running. This means
:raises gevent.exceptions.LoopExit: If the loop finishes running. This means
that there are no other scheduled greenlets, and no active
watchers or servers. In some situations, this indicates a
programming error.
......@@ -676,7 +587,7 @@ class Hub(TrackedRawGreenlet):
if timeout is not None:
timeout = self.loop.timer(timeout, ref=False)
timeout.start(waiter.switch)
timeout.start(waiter.switch, None)
try:
try:
......@@ -759,102 +670,6 @@ class Hub(TrackedRawGreenlet):
set_default_hub_class(Hub)
def iwait(objects, timeout=None, count=None):
"""
Iteratively yield *objects* as they are ready, until all (or *count*) are ready
or *timeout* expired.
:param objects: A sequence (supporting :func:`len`) containing objects
implementing the wait protocol (rawlink() and unlink()).
:keyword int count: If not `None`, then a number specifying the maximum number
of objects to wait for. If ``None`` (the default), all objects
are waited for.
:keyword float timeout: If given, specifies a maximum number of seconds
to wait. If the timeout expires before the desired waited-for objects
are available, then this method returns immediately.
.. seealso:: :func:`wait`
.. versionchanged:: 1.1a1
Add the *count* parameter.
.. versionchanged:: 1.1a2
No longer raise :exc:`LoopExit` if our caller switches greenlets
in between items yielded by this function.
"""
# QQQ would be nice to support iterable here that can be generated slowly (why?)
hub = _get_hub_noargs()
if objects is None:
yield hub.join(timeout=timeout)
return
count = len(objects) if count is None else min(count, len(objects))
waiter = _MultipleWaiter(hub)
switch = waiter.switch
if timeout is not None:
timer = hub.loop.timer(timeout, priority=-1)
timer.start(switch, _NONE)
try:
for obj in objects:
obj.rawlink(switch)
for _ in xrange(count):
item = waiter.get()
waiter.clear()
if item is _NONE:
return
yield item
finally:
if timeout is not None:
timer.close()
for aobj in objects:
unlink = getattr(aobj, 'unlink', None)
if unlink:
try:
unlink(switch)
except: # pylint:disable=bare-except
traceback.print_exc()
def wait(objects=None, timeout=None, count=None):
"""
Wait for ``objects`` to become ready or for event loop to finish.
If ``objects`` is provided, it must be a list containing objects
implementing the wait protocol (rawlink() and unlink() methods):
- :class:`gevent.Greenlet` instance
- :class:`gevent.event.Event` instance
- :class:`gevent.lock.Semaphore` instance
- :class:`gevent.subprocess.Popen` instance
If ``objects`` is ``None`` (the default), ``wait()`` blocks until
the current event loop has nothing to do (or until ``timeout`` passes):
- all greenlets have finished
- all servers were stopped
- all event loop watchers were stopped.
If ``count`` is ``None`` (the default), wait for all ``objects``
to become ready.
If ``count`` is a number, wait for (up to) ``count`` objects to become
ready. (For example, if count is ``1`` then the function exits
when any object in the list is ready).
If ``timeout`` is provided, it specifies the maximum number of
seconds ``wait()`` will block.
Returns the list of ready objects, in the order in which they were
ready.
.. seealso:: :func:`iwait`
"""
if objects is None:
return _get_hub_noargs().join(timeout=timeout)
return list(iwait(objects, timeout, count))
class linkproxy(object):
__slots__ = ['callback', 'obj']
......
......@@ -347,7 +347,7 @@ class Values(object):
else:
self.error = source.exception
if self.count <= 0:
self.waiter.switch()
self.waiter.switch(None)
def get(self):
self.waiter.get()
......
......@@ -18,8 +18,8 @@ from __future__ import absolute_import, print_function, division
from gevent._compat import string_types
from gevent._util import _NONE
from gevent.hub import getcurrent
from gevent.hub import _get_hub_noargs as get_hub
from greenlet import getcurrent
from gevent._hub_local import get_hub_noargs as get_hub
__all__ = [
'Timeout',
......
......@@ -77,7 +77,7 @@ from greentest.skipping import skipOnLibuvOnCIOnPyPy
from greentest.skipping import skipOnLibuvOnPyPyOnWin
from greentest.skipping import skipOnPurePython
from greentest.skipping import skipWithCExtensions
from greentest.skipping import skipOnLibuvOnTravisOnCPython27
from greentest.exception import ExpectedException
......
......@@ -49,6 +49,7 @@ skipOnLibuvOnWin = _do_not_skip
skipOnLibuvOnCI = _do_not_skip
skipOnLibuvOnCIOnPyPy = _do_not_skip
skipOnLibuvOnPyPyOnWin = _do_not_skip
skipOnLibuvOnTravisOnCPython27 = _do_not_skip
skipOnLibev = _do_not_skip
......@@ -99,6 +100,10 @@ if sysinfo.LIBUV:
skipOnLibuvOnCI = unittest.skip
if sysinfo.PYPY:
skipOnLibuvOnCIOnPyPy = unittest.skip
if sysinfo.RUNNING_ON_TRAVIS:
if sysinfo.CPYTHON:
if sysinfo.PY27_ONLY:
skipOnLibuvOnTravisOnCPython27 = unittest.skip
if sysinfo.WIN:
skipOnLibuvOnWin = unittest.skip
......
......@@ -87,6 +87,8 @@ elif sys.version_info[0] == 2:
PYPY3 = PYPY and PY3
PY27_ONLY = sys.version_info[0] == 2 and sys.version_info[1] == 7
PYGTE279 = (
sys.version_info[0] == 2
and sys.version_info[1] >= 7
......
......@@ -6,8 +6,8 @@ from gevent import backdoor
def read_until(conn, postfix):
read = b''
if not isinstance(postfix, bytes):
postfix = postfix.encode('utf-8')
assert isinstance(postfix, bytes)
while not read.endswith(postfix):
result = conn.recv(1)
if not result:
......@@ -53,7 +53,9 @@ class Test(greentest.TestCase):
conn.close()
self.close_on_teardown.remove(conn)
@greentest.skipOnAppVeyor("Times out")
@greentest.skipOnLibuvOnTravisOnCPython27(
"segfaults; "
"See https://github.com/gevent/gevent/pull/1156")
def test_multi(self):
self._make_server()
......@@ -100,7 +102,7 @@ class Test(greentest.TestCase):
conn = self._create_connection()
read_until(conn, b'>>> ')
conn.sendall(b'locals()["__builtins__"]\r\n')
response = read_until(conn, '>>> ')
response = read_until(conn, b'>>> ')
self.assertTrue(len(response) < 300, msg="locals() unusable: %s..." % response)
self._close(conn)
......@@ -123,7 +125,7 @@ class Test(greentest.TestCase):
conn = self._create_connection()
read_until(conn, b'>>> ')
conn.sendall(b'bad()\r\n')
response = read_until(conn, '>>> ')
response = read_until(conn, b'>>> ')
response = response.replace('\r\n', '\n')
self.assertEqual('switching out, then throwing in\nGot Empty\nswitching out\nswitched in\n>>> ', response)
......
......@@ -208,8 +208,8 @@ class TestReturn_link(LinksTestCase):
self.assertFalse(p)
self.assertIsInstance(event.get(), greenlet.GreenletExit)
self.assertIsInstance(queue.get().get(), greenlet.GreenletExit)
self.assertIsInstance(event.get(), gevent.GreenletExit)
self.assertIsInstance(queue.get().get(), gevent.GreenletExit)
sleep(DELAY)
self.assertFalse(callback_flag)
......
......@@ -95,7 +95,7 @@ class TestWaiterGet(greentest.timing.AbstractGenericWaitTestCase):
def wait(self, timeout):
with get_hub().loop.timer(timeout) as evt:
evt.start(self.waiter.switch)
evt.start(self.waiter.switch, None)
return self.waiter.get()
......
......@@ -408,25 +408,15 @@ class TestFunctions(greentest.TestCase):
# Issue #635
import gevent.socket
import gevent._socketcommon
orig_get_hub = gevent.socket.get_hub
class get_hub(object):
def wait(self, _io):
gevent.sleep(10)
class io(object):
callback = None
gevent._socketcommon.get_hub = get_hub
try:
try:
gevent.socket.wait(io(), timeout=0.01)
except gevent.socket.timeout:
pass
else:
self.fail("Should raise timeout error")
finally:
gevent._socketcommon.get_hub = orig_get_hub
def start(self, *_args):
gevent.sleep(10)
with self.assertRaises(gevent.socket.timeout):
gevent.socket.wait(io(), timeout=0.01)
def test_signatures(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment