Commit bd1899e9 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1604 from gevent/issue1601

Let the greenlets be freed when destroying a hub
parents 3294ebbc 42ae54c3
Destroying a hub after joining it didn't necessarily clean up all
resources associated with the hub, especially if the hub had been
created in a secondary thread that was exiting. The hub and its parent
greenlet could be kept alive.
Now, destroying a hub drops the reference to the hub and ensures it
cannot be switched to again. (Though using a new blocking API call may
still create a new hub.)
Joining a hub also cleans up some (small) memory resources that might
have stuck around for longer before as well.
......@@ -15,8 +15,6 @@ from greenlet import greenlet as RawGreenlet
from greenlet import getcurrent
from greenlet import GreenletExit
__all__ = [
'getcurrent',
'GreenletExit',
......@@ -58,6 +56,7 @@ from gevent.exceptions import LoopExit
from gevent._waiter import Waiter
# Need the real get_ident. We're imported early enough (by gevent/__init__.py)
# that we can be sure nothing is monkey patched yet.
get_thread_ident = __import__(thread_mod_name).get_ident
......@@ -618,14 +617,44 @@ class Hub(WaitOperationsGreenlet):
loop.run()
finally:
loop.error_handler = None # break the refcount cycle
# This function must never return, as it will cause
# switch() in the parent greenlet to return an unexpected
# value. This can show up as unexpected failures e.g.,
# from Waiters raising AssertionError or MulitpleWaiter
# raising invalid IndexError.
#
# It is still possible to kill this greenlet with throw.
# However, in that case switching to it is no longer safe,
# as switch will return immediately.
#
# Note that there's a problem with simply doing
# ``self.parent.throw()`` and never actually exiting this
# greenlet: The greenlet tends to stay alive. This is
# because throwing the exception captures stack frames
# (regardless of what we do with the argument) and those
# get saved. In addition to this object having
# ``gr_frame`` pointing to this method, which contains
# ``self``, which points to the parent, and both of which point to
# an internal thread state dict that points back to the current greenlet for the thread,
# which is likely to be the parent: a cycle.
#
# We can't have ``join()`` tell us to finish, because we
# need to be able to resume after this throw. The only way
# to dispose of the greenlet is to use ``self.destroy()``.
debug = []
if hasattr(loop, 'debug'):
debug = loop.debug()
self.parent.throw(LoopExit('This operation would block forever', self, debug))
# this function must never return, as it will cause switch() in the parent greenlet
# to return an unexpected value
# It is still possible to kill this greenlet with throw. However, in that case
# switching to it is no longer safe, as switch will return immediately
loop = None
self.parent.throw(LoopExit('This operation would block forever',
self,
debug))
# Execution could resume here if another blocking API call is made
# in the same thread and the hub hasn't been destroyed, so clean
# up anything left.
debug = None
def start_periodic_monitoring_thread(self):
if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread:
......@@ -648,13 +677,20 @@ class Hub(WaitOperationsGreenlet):
return self.periodic_monitoring_thread
def join(self, timeout=None):
"""Wait for the event loop to finish. Exits only when there are
no more spawned greenlets, started servers, active timeouts or watchers.
"""
Wait for the event loop to finish. Exits only when there
are no more spawned greenlets, started servers, active
timeouts or watchers.
If *timeout* is provided, wait no longer for the specified number of seconds.
.. caution:: This doesn't clean up all resources associated
with the hub. For that, see :meth:`destroy`.
Returns True if exited because the loop finished execution.
Returns False if exited because of timeout expired.
:param float timeout: If *timeout* is provided, wait no longer
than the specified number of seconds.
:return: `True` if this method returns because the loop
finished execution. Or `False` if the timeout
expired.
"""
assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
if self.dead:
......@@ -668,21 +704,46 @@ class Hub(WaitOperationsGreenlet):
try:
try:
# Switch to the hub greenlet and let it continue.
# Since we're the parent greenlet of the hub, when it exits
# by `parent.throw(LoopExit)`, control will resume here.
# If the timer elapses, however, ``waiter.switch()`` is called and
# again control resumes here, but without an exception.
waiter.get()
except LoopExit:
# Control will immediately be returned to this greenlet.
return True
finally:
# Clean up as much junk as we can. There is a small cycle in the frames,
# and it won't be GC'd.
# this greenlet -> this frame
# this greenlet -> the exception that was thrown
# the exception that was thrown -> a bunch of other frames, including this frame.
# some frame calling self.run() -> self
del waiter # this frame -> waiter -> self
del self # this frame -> self
if timeout is not None:
timeout.stop()
timeout.close()
del timeout
return False
def destroy(self, destroy_loop=None):
"""
Destroy this hub and clean up its resources.
If you manually create hubs, you *should* call this
If you manually create hubs, or you use a hub or the gevent
blocking API from multiple native threads, you *should* call this
method before disposing of the hub object reference.
Once this is done, it is impossible to continue running the
hub. Attempts to use the blocking gevent API with pre-existing
objects from this native thread and bound to this hub will fail.
.. versionchanged:: NEXT
Ensure that Python stack frames and greenlets referenced by this
hub are cleaned up. This guarantees that switching to the hub again
is not safe after this. (It was never safe, but it's even less safe.)
"""
if self.periodic_monitoring_thread is not None:
self.periodic_monitoring_thread.kill()
......@@ -693,6 +754,19 @@ class Hub(WaitOperationsGreenlet):
if self._threadpool is not None:
self._threadpool.kill()
del self._threadpool
# Let the frame be cleaned up by causing the run() function to
# exit. This is the only way to guarantee that the hub itself
# and the main greenlet, if this was a secondary thread, get
# cleaned up. Otherwise there are likely to be reference
# cycles still around. We MUST do this before we destroy the
# loop; if we destroy the loop and then switch into the hub,
# things will go VERY, VERY wrong.
try:
self.throw(GreenletExit)
except LoopExit:
pass
if destroy_loop is None:
destroy_loop = not self.loop.default
if destroy_loop:
......@@ -705,12 +779,12 @@ class Hub(WaitOperationsGreenlet):
# thread.
set_loop(self.loop)
self.loop = None
if _get_hub() is self:
set_hub(None)
# XXX: We can probably simplify the resolver and threadpool properties.
@property
......
......@@ -10,6 +10,9 @@ from collections import namedtuple
from operator import delitem
import signal
from gevent import getcurrent
from gevent.exceptions import LoopExit
from gevent._ffi import _dbg # pylint: disable=unused-import
from gevent._ffi.loop import AbstractLoop
from gevent._ffi.loop import assign_standard_callbacks
......@@ -186,7 +189,17 @@ class loop(AbstractLoop):
self.SIGNAL_CHECK_INTERVAL_MS)
libuv.uv_unref(self._signal_idle)
def __check_and_die(self):
if not self.ptr:
# We've been destroyed during the middle of self.run().
# This method is being called into from C, and it's not
# safe to go back to C (Windows in particular can abort
# the process with "GetQueuedCompletionStatusEx: (6) The
# handle is invalid.") So switch to the parent greenlet.
getcurrent().parent.throw(LoopExit('Destroyed during run'))
def _run_callbacks(self):
self.__check_and_die()
# Manually handle fork watchers.
curpid = os.getpid()
if curpid != self._pid:
......@@ -397,18 +410,21 @@ class loop(AbstractLoop):
del self._fork_watchers
del self._child_watchers
def debug(self):
"""
Return all the handles that are open and their ref status.
"""
handle_state = namedtuple("HandleState",
_HandleState = namedtuple("HandleState",
['handle',
'type',
'watcher',
'ref',
'active',
'closing'])
def debug(self):
"""
Return all the handles that are open and their ref status.
"""
if not self.ptr:
return ["Loop has been destroyed"]
handle_state = self._HandleState
handles = []
# XXX: Convert this to a modern callback.
......@@ -560,6 +576,7 @@ class loop(AbstractLoop):
return result
def now(self):
self.__check_and_die()
# libuv's now is expressed as an integer number of
# milliseconds, so to get it compatible with time.time units
# that this method is supposed to return, we have to divide by 1000.0
......@@ -567,6 +584,7 @@ class loop(AbstractLoop):
return now / 1000.0
def update_now(self):
self.__check_and_die()
libuv.uv_update_time(self.ptr)
def fileno(self):
......
......@@ -262,12 +262,13 @@ class Discovery(object):
coverage=False,
package=None,
config=None,
allow_combine=True,
):
self.config = config or {}
self.ignore = set(ignored or ())
self.tests = tests
self.configured_test_options = config.get('TEST_FILE_OPTIONS', set())
self.allow_combine = allow_combine
if ignore_files:
ignore_files = ignore_files.split(',')
for f in ignore_files:
......@@ -281,12 +282,13 @@ class Discovery(object):
self.package_dir = _dir_from_package_name(package)
class Discovered(object):
def __init__(self, package, configured_test_options, ignore, config):
def __init__(self, package, configured_test_options, ignore, config, allow_combine):
self.orig_dir = os.getcwd()
self.configured_run_alone = config['RUN_ALONE']
self.configured_failing_tests = config['FAILING_TESTS']
self.package = package
self.configured_test_options = configured_test_options
self.allow_combine = allow_combine
self.ignore = ignore
self.to_import = []
......@@ -343,7 +345,8 @@ class Discovery(object):
def __can_monkey_combine(self, filename, contents):
return (
not self.__has_config(filename)
self.allow_combine
and not self.__has_config(filename)
and self.__makes_simple_monkey_patch(contents)
and self.__file_allows_monkey_combine(contents)
and self.__file_allows_combine(contents)
......@@ -356,7 +359,8 @@ class Discovery(object):
def __can_nonmonkey_combine(self, filename, contents):
return (
not self.__has_config(filename)
self.allow_combine
and not self.__has_config(filename)
and self.__makes_no_monkey_patch(contents)
and self.__file_allows_combine(contents)
and self.__calls_unittest_main_toplevel(contents)
......@@ -485,7 +489,7 @@ class Discovery(object):
def discovered(self):
tests = self.tests
discovered = self.Discovered(self.package, self.configured_test_options,
self.ignore, self.config)
self.ignore, self.config, self.allow_combine)
# We need to glob relative names, our config is based on filenames still
with self._in_dir(self.package_dir):
......@@ -684,7 +688,7 @@ def main():
parser.add_argument('--discover', action='store_true')
parser.add_argument('--full', action='store_true')
parser.add_argument('--config', default='known_failures.py')
parser.add_argument('--failfast', action='store_true')
parser.add_argument('--failfast', '-x', action='store_true')
parser.add_argument("--coverage", action="store_true")
parser.add_argument("--quiet", action="store_true", default=True)
parser.add_argument("--verbose", action="store_false", dest='quiet')
......@@ -695,6 +699,10 @@ def main():
help="Use up to the given number of parallel processes to execute tests. "
"Defaults to %(default)s."
)
parser.add_argument(
'--no-combine', default=True, action='store_false',
help="Do not combine tests into process groups."
)
parser.add_argument('-u', '--use', metavar='RES1,RES2,...',
action='store', type=parse_resources,
help='specify which special resource intensive tests '
......@@ -773,6 +781,7 @@ def main():
coverage=coverage,
package=options.package,
config=config,
allow_combine=options.no_combine,
)
if options.discover:
for cmd, options in tests:
......
import unittest
import gevent
from gevent.testing import ignores_leakcheck
class Test(unittest.TestCase):
class TestJoin(unittest.TestCase):
def test(self):
def test_join_many_times(self):
# hub.join() guarantees that loop has exited cleanly
res = gevent.get_hub().join()
self.assertTrue(res)
self.assertFalse(gevent.get_hub().dead)
res = gevent.get_hub().join()
self.assertTrue(res)
......@@ -18,6 +20,50 @@ class Test(unittest.TestCase):
res = gevent.get_hub().join()
self.assertTrue(res)
@ignores_leakcheck
def test_join_in_new_thread_doesnt_leak_hub_or_greenlet(self):
# https://github.com/gevent/gevent/issues/1601
import threading
import gc
from gevent._greenlet_primitives import get_reachable_greenlets
def _clean():
for _ in range(2):
while gc.collect():
pass
_clean()
count_before = len(get_reachable_greenlets())
def thread_main():
g = gevent.Greenlet(run=lambda: 0)
g.start()
g.join()
hub = gevent.get_hub()
hub.join()
hub.destroy(destroy_loop=True)
del hub
def tester(main):
t = threading.Thread(target=main)
t.start()
t.join()
_clean()
for _ in range(10):
tester(thread_main)
del tester
del thread_main
count_after = len(get_reachable_greenlets())
if count_after > count_before:
# We could be off by exactly 1. Not entirely clear where.
# But it only happens the first time.
count_after -= 1
# If we were run in multiple process, our count could actually have
# gone down due to the GC's we did.
self.assertEqual(count_after, count_before)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment