Commit 9dc8b339 authored by Jason Madden's avatar Jason Madden

Fix leak resulting from adding a rawlink but not cycling the event loop.

Clean up the O(n^2) stuff. The best way I found to do this was using sets; even the old _dirty flag could call things multiple times, which was not good.
parent edcb14bc
......@@ -25,7 +25,8 @@
- Fix semaphores to immediately notify links if they are ready and
``rawlink()`` is called. This behaves like ``Event`` and
``AsyncEvent``. See :issue:`1287`, reported by Dan Milon.
``AsyncEvent``. Note that the order in which semaphore links are
called is not specified. See :issue:`1287`, reported by Dan Milon.
1.3.7 (2018-10-12)
......
......@@ -36,17 +36,17 @@ cdef class AbstractLinkable(object):
# https://github.com/cython/cython/issues/2270
cdef object __weakref__
cdef readonly SwitchOutGreenletWithLoop hub
cdef _notifier
cdef list _links
cdef set _links
cdef bint _notify_all
cdef readonly SwitchOutGreenletWithLoop hub
cpdef rawlink(self, callback)
cpdef bint ready(self)
cpdef unlink(self, callback)
cdef _check_and_notify(self)
@cython.locals(todo=list)
cpdef _notify_links(self)
cdef _wait_core(self, timeout, catch=*)
cdef _wait_return_value(self, waited, wait_success)
......
......@@ -27,7 +27,7 @@ class AbstractLinkable(object):
# protocol common to both repeatable events (Event, Semaphore) and
# one-time events (AsyncResult).
__slots__ = ('_links', 'hub', '_notifier', '_notify_all', '__weakref__')
__slots__ = ('hub', '_links', '_notifier', '_notify_all', '__weakref__')
def __init__(self):
# Before this implementation, AsyncResult and Semaphore
......@@ -59,19 +59,19 @@ class AbstractLinkable(object):
# by simply not declaring these objects in the pxd file, but that doesn't work for
# CPython ("No attribute...")
# See https://github.com/gevent/gevent/issues/660
self._links = None
# we don't want to do get_hub() here to allow defining module-level locks
# without initializing the hub
self.hub = None
self._links = set()
self._notifier = None
# This is conceptually a class attribute, defined here for ease of access in
# cython. If it's true, when notifiers fire, all existing callbacks are called.
# If its false, we only call callbacks as long as ready() returns true.
self._notify_all = True
# we don't want to do get_hub() here to allow defining module-level objects
# without initializing the hub
self.hub = None
def linkcount(self):
# For testing: how many objects are linked to this one?
return len(self._links) if self._links is not None else 0
return len(self._links)
def ready(self):
# Instances must define this
......@@ -79,81 +79,81 @@ class AbstractLinkable(object):
def _check_and_notify(self):
# If this object is ready to be notified, begin the process.
if self.ready():
if self._links and not self._notifier:
if self.hub is None:
self.hub = get_hub()
self._notifier = self.hub.loop.run_callback(self._notify_links)
if self.ready() and self._links and not self._notifier:
if self.hub is None:
self.hub = get_hub()
self._notifier = self.hub.loop.run_callback(self._notify_links)
def rawlink(self, callback):
"""
Register a callback to call when this object is ready.
*callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
*callback* will be called in the :class:`Hub
<gevent.hub.Hub>`, so it must not use blocking gevent API.
*callback* will be passed one argument: this instance.
"""
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
if self._links is None:
self._links = [callback]
else:
self._links.append(callback)
self._links.add(callback)
self._check_and_notify()
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
if self._links is not None:
try:
self._links.remove(callback)
except ValueError:
pass
if not self._links:
self._links = None
# TODO: Cancel a notifier if there are no links?
self._links.discard(callback)
if not self._links and self._notifier is not None:
# If we currently have one queued, de-queue it.
# This will break a reference cycle.
# (self._notifier -> self._notify_links -> self)
# But we can't set it to None in case it was actually running.
self._notifier.stop()
def _notify_links(self):
# Actually call the notification callbacks. Those callbacks in todo that are
# still in _links are called. This method is careful to avoid iterating
# over self._links, because links could be added or removed while this
# method runs. Only links present when this method begins running
# will be called; if a callback adds a new link, it will not run
# until the next time notify_links is activated
def _notify_links(self):
# We release self._notifier here. We are called by it
# at the end of the loop, and it is now false in a boolean way (as soon
# as this method returns).
notifier = self._notifier
# We don't need to capture self._links as todo when establishing
# this callback; any links removed between now and then are handled
# by the `if` below; any links added are also grabbed; note that if
# unlink() was called while we were waiting for the notifier to run,
# self._links could have gone to None.
todo = list(self._links) if self._links is not None else []
# We were ready() at the time this callback was scheduled;
# we may not be anymore, and that status may change during
# callback processing. Some of our subclasses will want to
# notify everyone that the status was once true, even though not it
# may not be anymore.
todo = set(self._links)
try:
for link in todo:
# check that link was not notified yet and was not removed by the client
# We have to do this here, and not as part of the 'for' statement because
# a previous link(self) call might have altered self._links
if not self._notify_all and not self.ready():
break
if link in self._links:
try:
link(self)
except: # pylint:disable=bare-except
self.hub.handle_error((link, self), *sys.exc_info())
if link not in self._links:
# Been removed already by some previous link. OK, fine.
continue
try:
link(self)
except: # pylint:disable=bare-except
# We're running in the hub, so getcurrent() returns
# a hub.
self.hub.handle_error((link, self), *sys.exc_info()) # pylint:disable=undefined-variable
finally:
if getattr(link, 'auto_unlink', None):
# This attribute can avoid having to keep a reference to the function
# *in* the function, which is a cycle
self.unlink(link)
finally:
# save a tiny bit of memory by letting _notifier be collected
# bool(self._notifier) would turn to False as soon as we exit this
# method anyway.
del todo
# We should not have created a new notifier even if callbacks
# released us because we loop through *all* of our links on the
# same callback while self._notifier is still true.
assert self._notifier is notifier
self._notifier = None
# Our set of active links changed, and we were told to stop on the first
# time we went unready. See if we're ready, and if so, go around
# again.
if not self._notify_all and todo != self._links:
self._check_and_notify()
def _wait_core(self, timeout, catch=Timeout):
# The core of the wait implementation, handling
# switching and linking. If *catch* is set to (),
......
......@@ -31,17 +31,20 @@ class Semaphore(AbstractLinkable): # pylint:disable=undefined-variable
.. seealso:: :class:`BoundedSemaphore` for a safer version that prevents
some classes of bugs.
.. versionchanged:: 1.4.0
The order in which waiters are awakened is not specified. It was not
specified previously, but usually went in FIFO order.
"""
def __init__(self, value=1):
super(Semaphore, self).__init__()
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
super(Semaphore, self).__init__()
self.counter = value
self._notifier = None
self._notify_all = False
def __str__(self):
params = (self.__class__.__name__, self.counter, self.linkcount())
return '<%s counter=%s _links[%s]>' % params
......
......@@ -329,6 +329,11 @@ class BaseServer(object):
self.__dict__.pop('full', None)
if self.pool is not None:
self.pool._semaphore.unlink(self._start_accepting_if_started)
# If the pool's semaphore had a notifier already started,
# there's a reference cycle we're a part of
# (self->pool->semaphere-hub callback->semaphore)
# But we can't destroy self.pool, because self.stop()
# calls this method, and then wants to join self.pool()
@property
def closed(self):
......@@ -355,6 +360,7 @@ class BaseServer(object):
self.pool.join(timeout=timeout)
self.pool.kill(block=True, timeout=1)
def serve_forever(self, stop_timeout=None):
"""Start the server if it hasn't been already started and wait until it's stopped."""
# add test that serve_forever exists on stop()
......
......@@ -108,7 +108,7 @@ if PYPY:
Semaphore._py3k_acquire = Semaphore.acquire = _decorate(Semaphore.acquire, '_lock_locked')
Semaphore.release = _decorate(Semaphore.release, '_lock_locked')
Semaphore.wait = _decorate(Semaphore.wait, '_lock_locked')
Semaphore._do_wait = _decorate(Semaphore._do_wait, '_lock_unlocked')
Semaphore._wait = _decorate(Semaphore._wait, '_lock_unlocked')
_Sem_init = Semaphore.__init__
......
......@@ -26,7 +26,8 @@ class TestSemaphore(greentest.TestCase):
s.rawlink(lambda s: result.append('b'))
s.release()
gevent.sleep(0.001)
self.assertEqual(result, ['a', 'b'])
# The order, though, is not guaranteed.
self.assertEqual(sorted(result), ['a', 'b'])
def test_semaphore_weakref(self):
s = Semaphore()
......
......@@ -162,7 +162,7 @@ class TestCase(greentest.TestCase):
conn = self.makefile()
conn.write(b'GET / HTTP/1.0\r\n\r\n')
conn.flush()
result = ''
result = b''
try:
while True:
data = conn._sock.recv(1)
......@@ -170,9 +170,9 @@ class TestCase(greentest.TestCase):
break
result += data
except socket.timeout:
assert not result, repr(result)
self.assertFalse(result)
return
assert result.startswith('HTTP/1.0 500 Internal Server Error'), repr(result)
self.assertTrue(result.startswith(b'HTTP/1.0 500 Internal Server Error'), repr(result))
conn.close()
def assertRequestSucceeded(self, timeout=_DEFAULT_SOCKET_TIMEOUT):
......@@ -259,7 +259,8 @@ class TestDefaultSpawn(TestCase):
def test_backlog_is_not_accepted_for_socket(self):
self.switch_expected = False
self.assertRaises(TypeError, self.ServerClass, self.get_listener(), backlog=25, handle=False)
with self.assertRaises(TypeError):
self.ServerClass(self.get_listener(), backlog=25, handle=False)
def test_backlog_is_accepted_for_address(self):
self.server = self.ServerSubClass((greentest.DEFAULT_BIND_ADDR, 0), backlog=25)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment