Commit 71c29c15 authored by Jason Madden's avatar Jason Madden

Clean hub resources in a compatible way.

join() still allows the hub to be re-entered, so we have to move the cleanup to destroy()
parent 3d802b85
Destroying a hub after joining it didn't necessarily clean up all
resources associated with the hub, especially if the hub had been
created in a secondary thread that was exiting. The hub and its parent
greenlet could be kept alive.
Now, destroying a hub drops the reference to the hub and ensures it
cannot be switched to again. (Though using a new blocking API call may
still create a new hub.)
Joining a hub also cleans up some (small) memory resources that might
have stuck around for longer before as well.
......@@ -15,8 +15,6 @@ from greenlet import greenlet as RawGreenlet
from greenlet import getcurrent
from greenlet import GreenletExit
__all__ = [
'getcurrent',
'GreenletExit',
......@@ -35,7 +33,6 @@ from gevent._compat import thread_mod_name
from gevent._util import readproperty
from gevent._util import Lazy
from gevent._util import gmctime
from gevent._util import _NONE as _FINISHED
from gevent._ident import IdentRegistry
from gevent._hub_local import get_hub
......@@ -621,37 +618,43 @@ class Hub(WaitOperationsGreenlet):
finally:
loop.error_handler = None # break the refcount cycle
# this function must never return, as it will cause switch() in the parent greenlet
# to return an unexpected value
# It is still possible to kill this greenlet with throw. However, in that case
# switching to it is no longer safe, as switch will return immediately.
# However, there's a problem with simply doing `self.parent.throw()` and never actually
# exiting this greenlet: The greenlet tends to stay alive. This is because throwing
# the exception captures stack frames (regardless of what we do with the argument)
# and those get saved. In addition to this object having `gr_frame` pointing to this
# method, which contains ``self``, which is a cycle.
# This function must never return, as it will cause
# switch() in the parent greenlet to return an unexpected
# value. This can show up as unexpected failures e.g.,
# from Waiters raising AssertionError or MulitpleWaiter
# raising invalid IndexError.
#
# It is still possible to kill this greenlet with throw.
# However, in that case switching to it is no longer safe,
# as switch will return immediately.
#
# To properly clean this up from join(), we have a
# two-step protocol. First, we throw the exception, as
# normal. If we were blocked in ``join()`` waiting to come
# back here, it sends us a sentinel value that tells us,
# no, really, you should exit, and we return. Further
# calls to ``join`` will still succeed. Attempting to
# switch back to this object is undefined, as always.
# Note that there's a problem with simply doing
# ``self.parent.throw()`` and never actually exiting this
# greenlet: The greenlet tends to stay alive. This is
# because throwing the exception captures stack frames
# (regardless of what we do with the argument) and those
# get saved. In addition to this object having
# ``gr_frame`` pointing to this method, which contains
# ``self``, which points to the parent, and both of which point to
# an internal thread state dict that points back to the current greenlet for the thread,
# which is likely to be the parent: a cycle.
#
# We can't have ``join()`` tell us to finish, because we
# need to be able to resume after this throw. The only way
# to dispose of the greenlet is to use ``self.destroy()``.
debug = []
if hasattr(loop, 'debug'):
debug = loop.debug()
loop = None
x = self.parent.throw(LoopExit(
'This operation would block forever',
self,
debug
))
if x is _FINISHED:
return
self.parent.throw(LoopExit('This operation would block forever',
self,
debug))
# Execution could resume here if another blocking API call is made
# in the same thread and the hub hasn't been destroyed, so clean
# up anything left.
debug = None
def start_periodic_monitoring_thread(self):
if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread:
......@@ -674,13 +677,20 @@ class Hub(WaitOperationsGreenlet):
return self.periodic_monitoring_thread
def join(self, timeout=None):
"""Wait for the event loop to finish. Exits only when there are
no more spawned greenlets, started servers, active timeouts or watchers.
"""
Wait for the event loop to finish. Exits only when there
are no more spawned greenlets, started servers, active
timeouts or watchers.
If *timeout* is provided, wait no longer for the specified number of seconds.
.. caution:: This doesn't clean up all resources associated
with the hub. For that, see :meth:`destroy`.
Returns True if exited because the loop finished execution.
Returns False if exited because of timeout expired.
:param float timeout: If *timeout* is provided, wait no longer
than the specified number of seconds.
:return: `True` if this method returns because the loop
finished execution. Or `False` if the timeout
expired.
"""
assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
if self.dead:
......@@ -702,21 +712,38 @@ class Hub(WaitOperationsGreenlet):
waiter.get()
except LoopExit:
# Control will immediately be returned to this greenlet.
# We can't use ``self.switch`` because it doesn't take parameters.
RawGreenlet.switch(self, _FINISHED)
return True
finally:
# Clean up as much junk as we can. There is a small cycle in the frames,
# and it won't be GC'd.
# this greenlet -> this frame
# this greenlet -> the exception that was thrown
# the exception that was thrown -> a bunch of other frames, including this frame.
# some frame calling self.run() -> self
del waiter # this frame -> waiter -> self
del self # this frame -> self
if timeout is not None:
timeout.stop()
timeout.close()
del timeout
return False
def destroy(self, destroy_loop=None):
"""
Destroy this hub and clean up its resources.
If you manually create hubs, you *should* call this
If you manually create hubs, or you use a hub or the gevent
blocking API from multiple native threads, you *should* call this
method before disposing of the hub object reference.
Once this is done, it is impossible to continue running the
hub. Attempts to use the blocking gevent API with pre-existing
objects from this native thread and bound to this hub will fail.
.. versionchanged:: NEXT
Ensure that Python stack frames and greenlets referenced by this
hub are cleaned up. This guarantees that switching to the hub again
is not safe after this. (It was never safe, but it's even less safe.)
"""
if self.periodic_monitoring_thread is not None:
self.periodic_monitoring_thread.kill()
......@@ -727,6 +754,19 @@ class Hub(WaitOperationsGreenlet):
if self._threadpool is not None:
self._threadpool.kill()
del self._threadpool
# Let the frame be cleaned up by causing the run() function to
# exit. This is the only way to guarantee that the hub itself
# and the main greenlet, if this was a secondary thread, get
# cleaned up. Otherwise there are likely to be reference
# cycles still around. We MUST do this before we destroy the
# loop; if we destroy the loop and then switch into the hub,
# things will go VERY, VERY wrong.
try:
self.throw(GreenletExit)
except LoopExit:
pass
if destroy_loop is None:
destroy_loop = not self.loop.default
if destroy_loop:
......@@ -739,12 +779,12 @@ class Hub(WaitOperationsGreenlet):
# thread.
set_loop(self.loop)
self.loop = None
if _get_hub() is self:
set_hub(None)
# XXX: We can probably simplify the resolver and threadpool properties.
@property
......
......@@ -262,12 +262,13 @@ class Discovery(object):
coverage=False,
package=None,
config=None,
allow_combine=True,
):
self.config = config or {}
self.ignore = set(ignored or ())
self.tests = tests
self.configured_test_options = config.get('TEST_FILE_OPTIONS', set())
self.allow_combine = allow_combine
if ignore_files:
ignore_files = ignore_files.split(',')
for f in ignore_files:
......@@ -281,12 +282,13 @@ class Discovery(object):
self.package_dir = _dir_from_package_name(package)
class Discovered(object):
def __init__(self, package, configured_test_options, ignore, config):
def __init__(self, package, configured_test_options, ignore, config, allow_combine):
self.orig_dir = os.getcwd()
self.configured_run_alone = config['RUN_ALONE']
self.configured_failing_tests = config['FAILING_TESTS']
self.package = package
self.configured_test_options = configured_test_options
self.allow_combine = allow_combine
self.ignore = ignore
self.to_import = []
......@@ -343,7 +345,8 @@ class Discovery(object):
def __can_monkey_combine(self, filename, contents):
return (
not self.__has_config(filename)
self.allow_combine
and not self.__has_config(filename)
and self.__makes_simple_monkey_patch(contents)
and self.__file_allows_monkey_combine(contents)
and self.__file_allows_combine(contents)
......@@ -356,7 +359,8 @@ class Discovery(object):
def __can_nonmonkey_combine(self, filename, contents):
return (
not self.__has_config(filename)
self.allow_combine
and not self.__has_config(filename)
and self.__makes_no_monkey_patch(contents)
and self.__file_allows_combine(contents)
and self.__calls_unittest_main_toplevel(contents)
......@@ -485,7 +489,7 @@ class Discovery(object):
def discovered(self):
tests = self.tests
discovered = self.Discovered(self.package, self.configured_test_options,
self.ignore, self.config)
self.ignore, self.config, self.allow_combine)
# We need to glob relative names, our config is based on filenames still
with self._in_dir(self.package_dir):
......@@ -684,7 +688,7 @@ def main():
parser.add_argument('--discover', action='store_true')
parser.add_argument('--full', action='store_true')
parser.add_argument('--config', default='known_failures.py')
parser.add_argument('--failfast', action='store_true')
parser.add_argument('--failfast', '-x', action='store_true')
parser.add_argument("--coverage", action="store_true")
parser.add_argument("--quiet", action="store_true", default=True)
parser.add_argument("--verbose", action="store_false", dest='quiet')
......@@ -695,6 +699,10 @@ def main():
help="Use up to the given number of parallel processes to execute tests. "
"Defaults to %(default)s."
)
parser.add_argument(
'--no-combine', default=True, action='store_false',
help="Do not combine tests into process groups."
)
parser.add_argument('-u', '--use', metavar='RES1,RES2,...',
action='store', type=parse_resources,
help='specify which special resource intensive tests '
......@@ -773,6 +781,7 @@ def main():
coverage=coverage,
package=options.package,
config=config,
allow_combine=options.no_combine,
)
if options.discover:
for cmd, options in tests:
......
......@@ -26,6 +26,11 @@ class TestJoin(unittest.TestCase):
import threading
import gc
from gevent._greenlet_primitives import get_reachable_greenlets
def _clean():
for _ in range(2):
while gc.collect():
pass
_clean()
count_before = len(get_reachable_greenlets())
def thread_main():
......@@ -37,17 +42,15 @@ class TestJoin(unittest.TestCase):
hub.destroy(destroy_loop=True)
del hub
def tester():
t = threading.Thread(target=thread_main)
def tester(main):
t = threading.Thread(target=main)
t.start()
t.join()
while gc.collect():
pass
_clean()
for _ in range(10):
tester()
tester(thread_main)
del tester
del thread_main
......@@ -57,10 +60,10 @@ class TestJoin(unittest.TestCase):
# We could be off by exactly 1. Not entirely clear where.
# But it only happens the first time.
count_after -= 1
# If we were run in multiple process, our count could actually have
# gone down due to the GC's we did.
self.assertEqual(count_after, count_before)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment