Commit 8181ce31 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1294 from gevent/issue1287

Make Semaphore.rawlink() start notifiers if needed.
parents b23342ce a0f41b7f
...@@ -19,6 +19,7 @@ src/gevent/_tracer.c ...@@ -19,6 +19,7 @@ src/gevent/_tracer.c
src/gevent/queue.c src/gevent/queue.c
src/gevent/_hub_primitives.c src/gevent/_hub_primitives.c
src/gevent/_greenlet_primitives.c src/gevent/_greenlet_primitives.c
src/gevent/_abstract_linkable.c
src/gevent/libev/corecext.c src/gevent/libev/corecext.c
src/gevent/libev/corecext.h src/gevent/libev/corecext.h
src/gevent/libev/_corecffi.c src/gevent/libev/_corecffi.c
......
...@@ -23,6 +23,11 @@ ...@@ -23,6 +23,11 @@
use it in a ``with`` block to avoid leaking resources. See use it in a ``with`` block to avoid leaking resources. See
:pr:`1290`, provided by Josh Snyder. :pr:`1290`, provided by Josh Snyder.
- Fix semaphores to immediately notify links if they are ready and
``rawlink()`` is called. This behaves like ``Event`` and
``AsyncEvent``. Note that the order in which semaphore links are
called is not specified. See :issue:`1287`, reported by Dan Milon.
1.3.7 (2018-10-12) 1.3.7 (2018-10-12)
================== ==================
......
...@@ -87,6 +87,11 @@ GREENLET = Extension(name="gevent._greenlet", ...@@ -87,6 +87,11 @@ GREENLET = Extension(name="gevent._greenlet",
], ],
include_dirs=include_dirs) include_dirs=include_dirs)
ABSTRACT_LINKABLE = Extension(name="gevent.__abstract_linkable",
sources=["src/gevent/_abstract_linkable.py"],
depends=['src/gevent/__abstract_linkable.pxd'],
include_dirs=include_dirs)
IDENT = Extension(name="gevent.__ident", IDENT = Extension(name="gevent.__ident",
sources=["src/gevent/_ident.py"], sources=["src/gevent/_ident.py"],
...@@ -143,6 +148,7 @@ _to_cythonize = [ ...@@ -143,6 +148,7 @@ _to_cythonize = [
GREENLET, GREENLET,
TRACER, TRACER,
ABSTRACT_LINKABLE,
SEMAPHORE, SEMAPHORE,
LOCAL, LOCAL,
...@@ -155,6 +161,7 @@ _to_cythonize = [ ...@@ -155,6 +161,7 @@ _to_cythonize = [
EXT_MODULES = [ EXT_MODULES = [
CORE, CORE,
ARES, ARES,
ABSTRACT_LINKABLE,
SEMAPHORE, SEMAPHORE,
LOCAL, LOCAL,
GREENLET, GREENLET,
...@@ -232,6 +239,7 @@ if PYPY: ...@@ -232,6 +239,7 @@ if PYPY:
EXT_MODULES.remove(LOCAL) EXT_MODULES.remove(LOCAL)
EXT_MODULES.remove(GREENLET) EXT_MODULES.remove(GREENLET)
EXT_MODULES.remove(SEMAPHORE) EXT_MODULES.remove(SEMAPHORE)
EXT_MODULES.remove(ABSTRACT_LINKABLE)
# As of PyPy 5.10, this builds, but won't import (missing _Py_ReprEnter) # As of PyPy 5.10, this builds, but won't import (missing _Py_ReprEnter)
EXT_MODULES.remove(CORE) EXT_MODULES.remove(CORE)
...@@ -243,6 +251,7 @@ if PYPY: ...@@ -243,6 +251,7 @@ if PYPY:
_to_cythonize.remove(GREENLET) _to_cythonize.remove(GREENLET)
_to_cythonize.remove(SEMAPHORE) _to_cythonize.remove(SEMAPHORE)
_to_cythonize.remove(IDENT) _to_cythonize.remove(IDENT)
_to_cythonize.remove(ABSTRACT_LINKABLE)
EXT_MODULES.remove(IMAP) EXT_MODULES.remove(IMAP)
_to_cythonize.remove(IMAP) _to_cythonize.remove(IMAP)
......
cimport cython
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef InvalidSwitchError
cdef Timeout
cdef bint _greenlet_imported
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef void _init()
cdef class AbstractLinkable(object):
# We declare the __weakref__ here in the base (even though
# that's not really what we want) as a workaround for a Cython
# issue we see reliably on 3.7b4 and sometimes on 3.6. See
# https://github.com/cython/cython/issues/2270
cdef object __weakref__
cdef readonly SwitchOutGreenletWithLoop hub
cdef _notifier
cdef set _links
cdef bint _notify_all
cpdef rawlink(self, callback)
cpdef bint ready(self)
cpdef unlink(self, callback)
cdef _check_and_notify(self)
cpdef _notify_links(self)
cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success)
cdef _wait(self, timeout=*)
cimport cython cimport cython
from gevent.__hub_local cimport get_hub_noargs as get_hub from gevent.__abstract_linkable cimport AbstractLinkable
cdef Timeout cdef Timeout
cdef bint _greenlet_imported
cdef extern from "greenlet/greenlet.h": cdef class Semaphore(AbstractLinkable):
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef void _init()
cdef class Semaphore:
cdef public int counter cdef public int counter
cdef readonly list _links
cdef readonly object _notifier
cdef public int _dirty
cdef object __weakref__
cpdef bint locked(self) cpdef bint locked(self)
cpdef int release(self) except -1000 cpdef int release(self) except -1000
cpdef rawlink(self, object callback) # We don't really want this to be public, but
cpdef unlink(self, object callback) # threadpool uses it
cpdef _start_notify(self) cpdef _start_notify(self)
cpdef _notify_links(self)
cdef _do_wait(self, object timeout)
cpdef int wait(self, object timeout=*) except -1000 cpdef int wait(self, object timeout=*) except -1000
cpdef bint acquire(self, int blocking=*, object timeout=*) except -1000 cpdef bint acquire(self, int blocking=*, object timeout=*) except -1000
cpdef __enter__(self) cpdef __enter__(self)
......
# -*- coding: utf-8 -*-
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
"""
Internal module, support for the linkable protocol for "event" like objects.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
from gevent._hub_local import get_hub_noargs as get_hub
from gevent.exceptions import InvalidSwitchError
from gevent.timeout import Timeout
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
__all__ = [
'AbstractLinkable',
]
class AbstractLinkable(object):
# Encapsulates the standard parts of the linking and notifying
# protocol common to both repeatable events (Event, Semaphore) and
# one-time events (AsyncResult).
__slots__ = ('hub', '_links', '_notifier', '_notify_all', '__weakref__')
def __init__(self):
# Before this implementation, AsyncResult and Semaphore
# maintained the order of notifications, but Event did not.
# In gevent 1.3, before Semaphore extended this class,
# that was changed to not maintain the order. It was done because
# Event guaranteed to only call callbacks once (a set) but
# AsyncResult had no such guarantees.
# Semaphore likes to maintain order of callbacks, though,
# so when it was added we went back to a list implementation
# for storing callbacks. But we want to preserve the unique callback
# property, so we manually check.
# We generally don't expect to have so many waiters (for any of those
# objects) that testing membership and removing is a bottleneck.
# In PyPy 2.6.1 with Cython 0.23, `cdef public` or `cdef
# readonly` or simply `cdef` attributes of type `object` can appear to leak if
# a Python subclass is used (this is visible simply
# instantiating this subclass if _links=[]). Our _links and
# _notifier are such attributes, and gevent.thread subclasses
# this class. Thus, we carefully manage the lifetime of the
# objects we put in these attributes so that, in the normal
# case of a semaphore used correctly (deallocated when it's not
# locked and no one is waiting), the leak goes away (because
# these objects are back to None). This can also be solved on PyPy
# by simply not declaring these objects in the pxd file, but that doesn't work for
# CPython ("No attribute...")
# See https://github.com/gevent/gevent/issues/660
self._links = set()
self._notifier = None
# This is conceptually a class attribute, defined here for ease of access in
# cython. If it's true, when notifiers fire, all existing callbacks are called.
# If its false, we only call callbacks as long as ready() returns true.
self._notify_all = True
# we don't want to do get_hub() here to allow defining module-level objects
# without initializing the hub
self.hub = None
def linkcount(self):
# For testing: how many objects are linked to this one?
return len(self._links)
def ready(self):
# Instances must define this
raise NotImplementedError
def _check_and_notify(self):
# If this object is ready to be notified, begin the process.
if self.ready() and self._links and not self._notifier:
if self.hub is None:
self.hub = get_hub()
self._notifier = self.hub.loop.run_callback(self._notify_links)
def rawlink(self, callback):
"""
Register a callback to call when this object is ready.
*callback* will be called in the :class:`Hub
<gevent.hub.Hub>`, so it must not use blocking gevent API.
*callback* will be passed one argument: this instance.
"""
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.add(callback)
self._check_and_notify()
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
self._links.discard(callback)
if not self._links and self._notifier is not None:
# If we currently have one queued, de-queue it.
# This will break a reference cycle.
# (self._notifier -> self._notify_links -> self)
# But we can't set it to None in case it was actually running.
self._notifier.stop()
def _notify_links(self):
# We release self._notifier here. We are called by it
# at the end of the loop, and it is now false in a boolean way (as soon
# as this method returns).
notifier = self._notifier
# We were ready() at the time this callback was scheduled;
# we may not be anymore, and that status may change during
# callback processing. Some of our subclasses will want to
# notify everyone that the status was once true, even though not it
# may not be anymore.
todo = set(self._links)
try:
for link in todo:
if not self._notify_all and not self.ready():
break
if link not in self._links:
# Been removed already by some previous link. OK, fine.
continue
try:
link(self)
except: # pylint:disable=bare-except
# We're running in the hub, so getcurrent() returns
# a hub.
self.hub.handle_error((link, self), *sys.exc_info()) # pylint:disable=undefined-variable
finally:
if getattr(link, 'auto_unlink', None):
# This attribute can avoid having to keep a reference to the function
# *in* the function, which is a cycle
self.unlink(link)
finally:
# We should not have created a new notifier even if callbacks
# released us because we loop through *all* of our links on the
# same callback while self._notifier is still true.
assert self._notifier is notifier
self._notifier = None
# Our set of active links changed, and we were told to stop on the first
# time we went unready. See if we're ready, and if so, go around
# again.
if not self._notify_all and todo != self._links:
self._check_and_notify()
def _wait_core(self, timeout, catch=Timeout):
# The core of the wait implementation, handling
# switching and linking. If *catch* is set to (),
# a timeout that elapses will be allowed to be raised.
# Returns a true value if the wait succeeded without timing out.
switch = getcurrent().switch # pylint:disable=undefined-variable
self.rawlink(switch)
try:
with Timeout._start_new_or_dummy(timeout) as timer:
try:
if self.hub is None:
self.hub = get_hub()
result = self.hub.switch()
if result is not self: # pragma: no cover
raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
return True
except catch as ex:
if ex is not timer:
raise
# test_set_and_clear and test_timeout in test_threading
# rely on the exact return values, not just truthish-ness
return False
finally:
self.unlink(switch)
def _wait_return_value(self, waited, wait_success):
# pylint:disable=unused-argument
return None
def _wait(self, timeout=None):
if self.ready():
return self._wait_return_value(False, False)
gotit = self._wait_core(timeout)
return self._wait_return_value(True, gotit)
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__abstract_linkable')
cimport cython cimport cython
from gevent.__greenlet_primitives cimport SwitchOutGreenletWithLoop
from gevent.__hub_local cimport get_hub_noargs as get_hub from gevent.__hub_local cimport get_hub_noargs as get_hub
from gevent.__abstract_linkable cimport AbstractLinkable
cdef _None cdef _None
cdef reraise cdef reraise
cdef dump_traceback cdef dump_traceback
cdef load_traceback cdef load_traceback
cdef InvalidSwitchError
cdef Timeout cdef Timeout
cdef bint _greenlet_imported
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
cdef void _init()
cdef class _AbstractLinkable:
# We declare the __weakref__ here in the base (even though
# that's not really what we want) as a workaround for a Cython
# issue we see reliably on 3.7b4 and sometimes on 3.6. See
# https://github.com/cython/cython/issues/2270
cdef object __weakref__
cdef _notifier
cdef set _links
cdef readonly SwitchOutGreenletWithLoop hub
cpdef rawlink(self, callback)
cpdef bint ready(self)
cpdef unlink(self, callback)
cdef _check_and_notify(self)
@cython.locals(todo=set)
cpdef _notify_links(self)
cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success)
cdef _wait(self, timeout=*)
cdef class Event(_AbstractLinkable): cdef class Event(AbstractLinkable):
cdef bint _flag cdef bint _flag
cdef class AsyncResult(_AbstractLinkable): cdef class AsyncResult(AbstractLinkable):
cdef readonly _value cdef readonly _value
cdef readonly tuple _exc_info cdef readonly tuple _exc_info
......
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False # cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
from __future__ import print_function, absolute_import, division from __future__ import print_function, absolute_import, division
import sys
from gevent.timeout import Timeout
__all__ = [ __all__ = [
'Semaphore', 'Semaphore',
'BoundedSemaphore', 'BoundedSemaphore',
] ]
# In Cython, we define these as 'cdef [inline]' functions. The def _get_linkable():
# compilation unit cannot have a direct assignment to them (import x = __import__('gevent._abstract_linkable')
# is assignment) without generating a 'lvalue is not valid target' return x._abstract_linkable.AbstractLinkable
# error. locals()['AbstractLinkable'] = _get_linkable()
locals()['getcurrent'] = __import__('greenlet').getcurrent del _get_linkable
locals()['greenlet_init'] = lambda: None
locals()['get_hub'] = __import__('gevent').get_hub
class Semaphore(object): class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
""" """
Semaphore(value=1) -> Semaphore Semaphore(value=1) -> Semaphore
...@@ -36,33 +31,22 @@ class Semaphore(object): ...@@ -36,33 +31,22 @@ class Semaphore(object):
.. seealso:: :class:`BoundedSemaphore` for a safer version that prevents .. seealso:: :class:`BoundedSemaphore` for a safer version that prevents
some classes of bugs. some classes of bugs.
.. versionchanged:: 1.4.0
The order in which waiters are awakened is not specified. It was not
specified previously, but usually went in FIFO order.
""" """
def __init__(self, value=1): def __init__(self, value=1):
if value < 0: if value < 0:
raise ValueError("semaphore initial value must be >= 0") raise ValueError("semaphore initial value must be >= 0")
super(Semaphore, self).__init__()
self.counter = value self.counter = value
self._dirty = False self._notify_all = False
# In PyPy 2.6.1 with Cython 0.23, `cdef public` or `cdef
# readonly` or simply `cdef` attributes of type `object` can appear to leak if
# a Python subclass is used (this is visible simply
# instantiating this subclass if _links=[]). Our _links and
# _notifier are such attributes, and gevent.thread subclasses
# this class. Thus, we carefully manage the lifetime of the
# objects we put in these attributes so that, in the normal
# case of a semaphore used correctly (deallocated when it's not
# locked and no one is waiting), the leak goes away (because
# these objects are back to None). This can also be solved on PyPy
# by simply not declaring these objects in the pxd file, but that doesn't work for
# CPython ("No attribute...")
# See https://github.com/gevent/gevent/issues/660
self._links = None
self._notifier = None
# we don't want to do get_hub() here to allow defining module-level locks
# without initializing the hub
def __str__(self): def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._links) if self._links else 0) params = (self.__class__.__name__, self.counter, self.linkcount())
return '<%s counter=%s _links[%s]>' % params return '<%s counter=%s _links[%s]>' % params
def locked(self): def locked(self):
...@@ -75,117 +59,22 @@ class Semaphore(object): ...@@ -75,117 +59,22 @@ class Semaphore(object):
Release the semaphore, notifying any waiters if needed. Release the semaphore, notifying any waiters if needed.
""" """
self.counter += 1 self.counter += 1
self._start_notify() self._check_and_notify()
return self.counter return self.counter
def _start_notify(self): def ready(self):
if self._links and self.counter > 0 and not self._notifier: return self.counter > 0
# We create a new self._notifier each time through the loop,
# if needed. (it has a __bool__ method that tells whether it has
# been run; once it's run once---at the end of the loop---it becomes
# false.)
# NOTE: Passing the bound method will cause a memory leak on PyPy
# with Cython <= 0.23.3. You must use >= 0.23.4.
# See https://bitbucket.org/pypy/pypy/issues/2149/memory-leak-for-python-subclass-of-cpyext#comment-22371546
hub = get_hub() # pylint:disable=undefined-variable
self._notifier = hub.loop.run_callback(self._notify_links)
def _notify_links(self):
# Subclasses CANNOT override. This is a cdef method.
# We release self._notifier here. We are called by it
# at the end of the loop, and it is now false in a boolean way (as soon
# as this method returns).
# If we get acquired/released again, we will create a new one, but there's
# no need to keep it around until that point (making it potentially climb
# into older GC generations, notably on PyPy)
notifier = self._notifier
try:
while True:
self._dirty = False
if not self._links:
# In case we were manually unlinked before
# the callback. Which shouldn't happen
return
for link in self._links:
if self.counter <= 0:
return
try:
link(self) # Must use Cython >= 0.23.4 on PyPy else this leaks memory
except: # pylint:disable=bare-except
getcurrent().handle_error((link, self), *sys.exc_info()) # pylint:disable=undefined-variable
if self._dirty:
# We mutated self._links so we need to start over
break
if not self._dirty:
return
finally:
# We should not have created a new notifier even if callbacks
# released us because we loop through *all* of our links on the
# same callback while self._notifier is still true.
assert self._notifier is notifier
self._notifier = None
def rawlink(self, callback):
"""
rawlink(callback) -> None
Register a callback to call when a counter is more than zero.
*callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API. def _start_notify(self):
*callback* will be passed one argument: this instance. self._check_and_notify()
This method is normally called automatically by :meth:`acquire` and :meth:`wait`; most code def _wait_return_value(self, waited, wait_success):
will not need to use it. if waited:
""" return wait_success
if not callable(callback): # We didn't even wait, we must be good to go.
raise TypeError('Expected callable:', callback) # XXX: This is probably dead code, we're careful not to go into the wait
if self._links is None: # state if we don't expect to need to
self._links = [callback] return True
else:
self._links.append(callback)
self._dirty = True
def unlink(self, callback):
"""
unlink(callback) -> None
Remove the callback set by :meth:`rawlink`.
This method is normally called automatically by :meth:`acquire` and :meth:`wait`; most
code will not need to use it.
"""
try:
self._links.remove(callback)
self._dirty = True
except (ValueError, AttributeError):
pass
if not self._links:
self._links = None
# TODO: Cancel a notifier if there are no links?
def _do_wait(self, timeout):
"""
Wait for up to *timeout* seconds to expire. If timeout
elapses, return the exception. Otherwise, return None.
Raises timeout if a different timer expires.
"""
switch = getcurrent().switch # pylint:disable=undefined-variable
self.rawlink(switch)
try:
timer = Timeout._start_new_or_dummy(timeout)
try:
try:
result = get_hub().switch() # pylint:disable=undefined-variable
assert result is self, 'Invalid switch into Semaphore.wait/acquire(): %r' % (result, )
except Timeout as ex:
if ex is not timer:
raise
return ex
finally:
timer.cancel()
finally:
self.unlink(switch)
def wait(self, timeout=None): def wait(self, timeout=None):
""" """
...@@ -205,7 +94,7 @@ class Semaphore(object): ...@@ -205,7 +94,7 @@ class Semaphore(object):
if self.counter > 0: if self.counter > 0:
return self.counter return self.counter
self._do_wait(timeout) # return value irrelevant, whether we got it or got a timeout self._wait(timeout) # return value irrelevant, whether we got it or got a timeout
return self.counter return self.counter
def acquire(self, blocking=True, timeout=None): def acquire(self, blocking=True, timeout=None):
...@@ -236,8 +125,8 @@ class Semaphore(object): ...@@ -236,8 +125,8 @@ class Semaphore(object):
if not blocking: if not blocking:
return False return False
timeout = self._do_wait(timeout) success = self._wait(timeout)
if timeout is not None: if not success:
# Our timer expired. # Our timer expired.
return False return False
...@@ -282,10 +171,6 @@ class BoundedSemaphore(Semaphore): ...@@ -282,10 +171,6 @@ class BoundedSemaphore(Semaphore):
Semaphore.release(self) Semaphore.release(self)
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
# By building the semaphore with Cython under PyPy, we get # By building the semaphore with Cython under PyPy, we get
# atomic operations (specifically, exiting/releasing), at the # atomic operations (specifically, exiting/releasing), at the
......
...@@ -329,6 +329,11 @@ class BaseServer(object): ...@@ -329,6 +329,11 @@ class BaseServer(object):
self.__dict__.pop('full', None) self.__dict__.pop('full', None)
if self.pool is not None: if self.pool is not None:
self.pool._semaphore.unlink(self._start_accepting_if_started) self.pool._semaphore.unlink(self._start_accepting_if_started)
# If the pool's semaphore had a notifier already started,
# there's a reference cycle we're a part of
# (self->pool->semaphere-hub callback->semaphore)
# But we can't destroy self.pool, because self.stop()
# calls this method, and then wants to join self.pool()
@property @property
def closed(self): def closed(self):
...@@ -355,6 +360,7 @@ class BaseServer(object): ...@@ -355,6 +360,7 @@ class BaseServer(object):
self.pool.join(timeout=timeout) self.pool.join(timeout=timeout)
self.pool.kill(block=True, timeout=1) self.pool.kill(block=True, timeout=1)
def serve_forever(self, stop_timeout=None): def serve_forever(self, stop_timeout=None):
"""Start the server if it hasn't been already started and wait until it's stopped.""" """Start the server if it hasn't been already started and wait until it's stopped."""
# add test that serve_forever exists on stop() # add test that serve_forever exists on stop()
......
...@@ -3,15 +3,11 @@ ...@@ -3,15 +3,11 @@
"""Basic synchronization primitives: Event and AsyncResult""" """Basic synchronization primitives: Event and AsyncResult"""
from __future__ import print_function from __future__ import print_function
import sys
from gevent._util import _NONE from gevent._util import _NONE
from gevent._compat import reraise from gevent._compat import reraise
from gevent._tblib import dump_traceback, load_traceback from gevent._tblib import dump_traceback, load_traceback
from gevent._hub_local import get_hub_noargs as get_hub
from gevent.exceptions import InvalidSwitchError
from gevent.timeout import Timeout from gevent.timeout import Timeout
...@@ -20,128 +16,18 @@ __all__ = [ ...@@ -20,128 +16,18 @@ __all__ = [
'AsyncResult', 'AsyncResult',
] ]
locals()['getcurrent'] = __import__('greenlet').getcurrent def _get_linkable():
locals()['greenlet_init'] = lambda: None x = __import__('gevent._abstract_linkable')
return x._abstract_linkable.AbstractLinkable
locals()['AbstractLinkable'] = _get_linkable()
class _AbstractLinkable(object): del _get_linkable
# Encapsulates the standard parts of the linking and notifying protocol
# common to both repeatable events and one-time events (AsyncResult).
__slots__ = ('_links', 'hub', '_notifier')
def __init__(self):
# Also previously, AsyncResult maintained the order of notifications, but Event
# did not; this implementation does not. (Event also only call callbacks one
# time (set), but AsyncResult permitted duplicates.)
# HOWEVER, gevent.queue.Queue does guarantee the order of getters relative
# to putters. Some existing documentation out on the net likes to refer to
# gevent as "deterministic", such that running the same program twice will
# produce results in the same order (so long as I/O isn't involved). This could
# be an argument to maintain order. (One easy way to do that while guaranteeing
# uniqueness would be with a 2.7+ OrderedDict.)
self._links = set()
self.hub = get_hub()
self._notifier = None
def ready(self):
# Instances must define this
raise NotImplementedError()
def _check_and_notify(self):
# If this object is ready to be notified, begin the process.
if self.ready():
if self._links and not self._notifier:
self._notifier = self.hub.loop.run_callback(self._notify_links)
def rawlink(self, callback):
"""
Register a callback to call when this object is ready.
*callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
*callback* will be passed one argument: this instance.
"""
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.add(callback)
self._check_and_notify()
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
try:
self._links.remove(callback)
except KeyError:
pass
def _notify_links(self):
# Actually call the notification callbacks. Those callbacks in todo that are
# still in _links are called. This method is careful to avoid iterating
# over self._links, because links could be added or removed while this
# method runs. Only links present when this method begins running
# will be called; if a callback adds a new link, it will not run
# until the next time notify_links is activated
# We don't need to capture self._links as todo when establishing
# this callback; any links removed between now and then are handled
# by the `if` below; any links added are also grabbed
todo = set(self._links)
for link in todo:
# check that link was not notified yet and was not removed by the client
# We have to do this here, and not as part of the 'for' statement because
# a previous link(self) call might have altered self._links
if link in self._links:
try:
link(self)
except: # pylint:disable=bare-except
self.hub.handle_error((link, self), *sys.exc_info())
if getattr(link, 'auto_unlink', None):
# This attribute can avoid having to keep a reference to the function
# *in* the function, which is a cycle
self.unlink(link)
# save a tiny bit of memory by letting _notifier be collected
# bool(self._notifier) would turn to False as soon as we exit this
# method anyway.
del todo
self._notifier = None
def _wait_core(self, timeout, catch=Timeout):
# The core of the wait implementation, handling
# switching and linking. If *catch* is set to (),
# a timeout that elapses will be allowed to be raised.
# Returns a true value if the wait succeeded without timing out.
switch = getcurrent().switch # pylint:disable=undefined-variable
self.rawlink(switch)
try:
with Timeout._start_new_or_dummy(timeout) as timer:
try:
result = self.hub.switch()
if result is not self: # pragma: no cover
raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
return True
except catch as ex:
if ex is not timer:
raise
# test_set_and_clear and test_timeout in test_threading
# rely on the exact return values, not just truthish-ness
return False
finally:
self.unlink(switch)
def _wait_return_value(self, waited, wait_success): # Sadly, something about the way we have to "import" AbstractLinkable
# pylint:disable=unused-argument # breaks pylint's inference of slots, even though they're declared
return None # right here.
# pylint:disable=assigning-non-slot
def _wait(self, timeout=None): class Event(AbstractLinkable): # pylint:disable=undefined-variable
if self.ready():
return self._wait_return_value(False, False)
gotit = self._wait_core(timeout)
return self._wait_return_value(True, gotit)
class Event(_AbstractLinkable):
"""A synchronization primitive that allows one greenlet to wake up one or more others. """A synchronization primitive that allows one greenlet to wake up one or more others.
It has the same interface as :class:`threading.Event` but works across greenlets. It has the same interface as :class:`threading.Event` but works across greenlets.
...@@ -157,14 +43,15 @@ class Event(_AbstractLinkable): ...@@ -157,14 +43,15 @@ class Event(_AbstractLinkable):
the waiting greenlets being awakened. These details may change in the future. the waiting greenlets being awakened. These details may change in the future.
""" """
__slots__ = ('_flag', '__weakref__') __slots__ = ('_flag',)
def __init__(self): def __init__(self):
_AbstractLinkable.__init__(self) super(Event, self).__init__()
self._flag = False self._flag = False
def __str__(self): def __str__(self):
return '<%s %s _links[%s]>' % (self.__class__.__name__, (self._flag and 'set') or 'clear', len(self._links)) return '<%s %s _links[%s]>' % (self.__class__.__name__, (self._flag and 'set') or 'clear',
self.linkcount())
def is_set(self): def is_set(self):
"""Return true if and only if the internal flag is true.""" """Return true if and only if the internal flag is true."""
...@@ -246,7 +133,7 @@ class Event(_AbstractLinkable): ...@@ -246,7 +133,7 @@ class Event(_AbstractLinkable):
pass pass
class AsyncResult(_AbstractLinkable): class AsyncResult(AbstractLinkable): # pylint:disable=undefined-variable
"""A one-time event that stores a value or an exception. """A one-time event that stores a value or an exception.
Like :class:`Event` it wakes up all the waiters when :meth:`set` or :meth:`set_exception` Like :class:`Event` it wakes up all the waiters when :meth:`set` or :meth:`set_exception`
...@@ -299,7 +186,7 @@ class AsyncResult(_AbstractLinkable): ...@@ -299,7 +186,7 @@ class AsyncResult(_AbstractLinkable):
__slots__ = ('_value', '_exc_info', '_imap_task_index') __slots__ = ('_value', '_exc_info', '_imap_task_index')
def __init__(self): def __init__(self):
_AbstractLinkable.__init__(self) super(AsyncResult, self).__init__()
self._value = _NONE self._value = _NONE
self._exc_info = () self._exc_info = ()
...@@ -332,7 +219,7 @@ class AsyncResult(_AbstractLinkable): ...@@ -332,7 +219,7 @@ class AsyncResult(_AbstractLinkable):
result += 'exception=%r ' % self._exception result += 'exception=%r ' % self._exception
if self._exception is _NONE: if self._exception is _NONE:
result += 'unset ' result += 'unset '
return result + ' _links[%s]>' % len(self._links) return result + ' _links[%s]>' % self.linkcount()
def ready(self): def ready(self):
"""Return true if and only if it holds a value or an exception""" """Return true if and only if it holds a value or an exception"""
...@@ -471,11 +358,6 @@ class AsyncResult(_AbstractLinkable): ...@@ -471,11 +358,6 @@ class AsyncResult(_AbstractLinkable):
# exception is a method, we use it as a property # exception is a method, we use it as a property
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent._event') import_c_accel(globals(), 'gevent._event')
...@@ -108,7 +108,7 @@ if PYPY: ...@@ -108,7 +108,7 @@ if PYPY:
Semaphore._py3k_acquire = Semaphore.acquire = _decorate(Semaphore.acquire, '_lock_locked') Semaphore._py3k_acquire = Semaphore.acquire = _decorate(Semaphore.acquire, '_lock_locked')
Semaphore.release = _decorate(Semaphore.release, '_lock_locked') Semaphore.release = _decorate(Semaphore.release, '_lock_locked')
Semaphore.wait = _decorate(Semaphore.wait, '_lock_locked') Semaphore.wait = _decorate(Semaphore.wait, '_lock_locked')
Semaphore._do_wait = _decorate(Semaphore._do_wait, '_lock_unlocked') Semaphore._wait = _decorate(Semaphore._wait, '_lock_unlocked')
_Sem_init = Semaphore.__init__ _Sem_init = Semaphore.__init__
......
...@@ -32,8 +32,9 @@ class Testiwait(greentest.TestCase): ...@@ -32,8 +32,9 @@ class Testiwait(greentest.TestCase):
let = gevent.spawn(sem1.release) let = gevent.spawn(sem1.release)
with gevent.iwait((sem1, sem2)) as iterator: with gevent.iwait((sem1, sem2)) as iterator:
self.assertEqual(sem1, next(iterator)) self.assertEqual(sem1, next(iterator))
assert len(sem2._links) == 1 self.assertEqual(sem2.linkcount(), 1)
assert sem2._links is None or len(sem2._links) == 0
self.assertEqual(sem2.linkcount(), 0)
let.get() let.get()
......
...@@ -8,6 +8,7 @@ try: ...@@ -8,6 +8,7 @@ try:
except ImportError: # Py2 except ImportError: # Py2
from thread import allocate_lock as std_allocate_lock from thread import allocate_lock as std_allocate_lock
# pylint:disable=broad-except
class TestSemaphore(greentest.TestCase): class TestSemaphore(greentest.TestCase):
...@@ -25,7 +26,8 @@ class TestSemaphore(greentest.TestCase): ...@@ -25,7 +26,8 @@ class TestSemaphore(greentest.TestCase):
s.rawlink(lambda s: result.append('b')) s.rawlink(lambda s: result.append('b'))
s.release() s.release()
gevent.sleep(0.001) gevent.sleep(0.001)
self.assertEqual(result, ['a', 'b']) # The order, though, is not guaranteed.
self.assertEqual(sorted(result), ['a', 'b'])
def test_semaphore_weakref(self): def test_semaphore_weakref(self):
s = Semaphore() s = Semaphore()
...@@ -50,6 +52,13 @@ class TestSemaphore(greentest.TestCase): ...@@ -50,6 +52,13 @@ class TestSemaphore(greentest.TestCase):
test_semaphore_in_class_with_del.ignore_leakcheck = True test_semaphore_in_class_with_del.ignore_leakcheck = True
def test_rawlink_on_unacquired_runs_notifiers(self):
# https://github.com/gevent/gevent/issues/1287
# Rawlinking a ready semaphore should fire immediately,
# not raise LoopExit
s = Semaphore()
gevent.wait([s])
class TestLock(greentest.TestCase): class TestLock(greentest.TestCase):
......
...@@ -162,7 +162,7 @@ class TestCase(greentest.TestCase): ...@@ -162,7 +162,7 @@ class TestCase(greentest.TestCase):
conn = self.makefile() conn = self.makefile()
conn.write(b'GET / HTTP/1.0\r\n\r\n') conn.write(b'GET / HTTP/1.0\r\n\r\n')
conn.flush() conn.flush()
result = '' result = b''
try: try:
while True: while True:
data = conn._sock.recv(1) data = conn._sock.recv(1)
...@@ -170,9 +170,9 @@ class TestCase(greentest.TestCase): ...@@ -170,9 +170,9 @@ class TestCase(greentest.TestCase):
break break
result += data result += data
except socket.timeout: except socket.timeout:
assert not result, repr(result) self.assertFalse(result)
return return
assert result.startswith('HTTP/1.0 500 Internal Server Error'), repr(result) self.assertTrue(result.startswith(b'HTTP/1.0 500 Internal Server Error'), repr(result))
conn.close() conn.close()
def assertRequestSucceeded(self, timeout=_DEFAULT_SOCKET_TIMEOUT): def assertRequestSucceeded(self, timeout=_DEFAULT_SOCKET_TIMEOUT):
...@@ -259,7 +259,8 @@ class TestDefaultSpawn(TestCase): ...@@ -259,7 +259,8 @@ class TestDefaultSpawn(TestCase):
def test_backlog_is_not_accepted_for_socket(self): def test_backlog_is_not_accepted_for_socket(self):
self.switch_expected = False self.switch_expected = False
self.assertRaises(TypeError, self.ServerClass, self.get_listener(), backlog=25, handle=False) with self.assertRaises(TypeError):
self.ServerClass(self.get_listener(), backlog=25, handle=False)
def test_backlog_is_accepted_for_address(self): def test_backlog_is_accepted_for_address(self):
self.server = self.ServerSubClass((greentest.DEFAULT_BIND_ADDR, 0), backlog=25) self.server = self.ServerSubClass((greentest.DEFAULT_BIND_ADDR, 0), backlog=25)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment