Commit 3d802b85 authored by Jason Madden's avatar Jason Madden

Working on an improved loop exit protocol.

Addressing #1601.

This solution works, but breaks anything that tries to use the hub again after joining it once, because the hub greenlet dies. So that can't be right.
parent 3294ebbc
......@@ -35,6 +35,7 @@ from gevent._compat import thread_mod_name
from gevent._util import readproperty
from gevent._util import Lazy
from gevent._util import gmctime
from gevent._util import _NONE as _FINISHED
from gevent._ident import IdentRegistry
from gevent._hub_local import get_hub
......@@ -58,6 +59,7 @@ from gevent.exceptions import LoopExit
from gevent._waiter import Waiter
# Need the real get_ident. We're imported early enough (by gevent/__init__.py)
# that we can be sure nothing is monkey patched yet.
get_thread_ident = __import__(thread_mod_name).get_ident
......@@ -618,14 +620,38 @@ class Hub(WaitOperationsGreenlet):
loop.run()
finally:
loop.error_handler = None # break the refcount cycle
# this function must never return, as it will cause switch() in the parent greenlet
# to return an unexpected value
# It is still possible to kill this greenlet with throw. However, in that case
# switching to it is no longer safe, as switch will return immediately.
# However, there's a problem with simply doing `self.parent.throw()` and never actually
# exiting this greenlet: The greenlet tends to stay alive. This is because throwing
# the exception captures stack frames (regardless of what we do with the argument)
# and those get saved. In addition to this object having `gr_frame` pointing to this
# method, which contains ``self``, which is a cycle.
#
# To properly clean this up from join(), we have a
# two-step protocol. First, we throw the exception, as
# normal. If we were blocked in ``join()`` waiting to come
# back here, it sends us a sentinel value that tells us,
# no, really, you should exit, and we return. Further
# calls to ``join`` will still succeed. Attempting to
# switch back to this object is undefined, as always.
debug = []
if hasattr(loop, 'debug'):
debug = loop.debug()
self.parent.throw(LoopExit('This operation would block forever', self, debug))
# this function must never return, as it will cause switch() in the parent greenlet
# to return an unexpected value
# It is still possible to kill this greenlet with throw. However, in that case
# switching to it is no longer safe, as switch will return immediately
loop = None
x = self.parent.throw(LoopExit(
'This operation would block forever',
self,
debug
))
if x is _FINISHED:
return
def start_periodic_monitoring_thread(self):
if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread:
......@@ -668,8 +694,16 @@ class Hub(WaitOperationsGreenlet):
try:
try:
# Switch to the hub greenlet and let it continue.
# Since we're the parent greenlet of the hub, when it exits
# by `parent.throw(LoopExit)`, control will resume here.
# If the timer elapses, however, ``waiter.switch()`` is called and
# again control resumes here, but without an exception.
waiter.get()
except LoopExit:
# Control will immediately be returned to this greenlet.
# We can't use ``self.switch`` because it doesn't take parameters.
RawGreenlet.switch(self, _FINISHED)
return True
finally:
if timeout is not None:
......
import unittest
import gevent
from gevent.testing import ignores_leakcheck
class Test(unittest.TestCase):
class TestJoin(unittest.TestCase):
def test(self):
def test_join_many_times(self):
# hub.join() guarantees that loop has exited cleanly
res = gevent.get_hub().join()
self.assertTrue(res)
self.assertFalse(gevent.get_hub().dead)
res = gevent.get_hub().join()
self.assertTrue(res)
......@@ -18,6 +20,47 @@ class Test(unittest.TestCase):
res = gevent.get_hub().join()
self.assertTrue(res)
@ignores_leakcheck
def test_join_in_new_thread_doesnt_leak_hub_or_greenlet(self):
# https://github.com/gevent/gevent/issues/1601
import threading
import gc
from gevent._greenlet_primitives import get_reachable_greenlets
count_before = len(get_reachable_greenlets())
def thread_main():
g = gevent.Greenlet(run=lambda: 0)
g.start()
g.join()
hub = gevent.get_hub()
hub.join()
hub.destroy(destroy_loop=True)
del hub
def tester():
t = threading.Thread(target=thread_main)
t.start()
t.join()
while gc.collect():
pass
for _ in range(10):
tester()
del tester
del thread_main
count_after = len(get_reachable_greenlets())
if count_after > count_before:
# We could be off by exactly 1. Not entirely clear where.
# But it only happens the first time.
count_after -= 1
self.assertEqual(count_after, count_before)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment