Commit a17456ab authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1632 from gevent/issue1631

Catch greenlet.error when destroying a hub from another thread.
parents 30f63ed2 0dbb6c60
Forking a process that had use the threadpool to run tasks that
created their own hub would fail to clean up the threadpool by raising
``greenlet.error``.
...@@ -14,6 +14,7 @@ import traceback ...@@ -14,6 +14,7 @@ import traceback
from greenlet import greenlet as RawGreenlet from greenlet import greenlet as RawGreenlet
from greenlet import getcurrent from greenlet import getcurrent
from greenlet import GreenletExit from greenlet import GreenletExit
from greenlet import error as GreenletError
__all__ = [ __all__ = [
'getcurrent', 'getcurrent',
...@@ -755,16 +756,22 @@ class Hub(WaitOperationsGreenlet): ...@@ -755,16 +756,22 @@ class Hub(WaitOperationsGreenlet):
If you manually create hubs, or you use a hub or the gevent If you manually create hubs, or you use a hub or the gevent
blocking API from multiple native threads, you *should* call this blocking API from multiple native threads, you *should* call this
method before disposing of the hub object reference. method before disposing of the hub object reference. Ideally,
this should be called from the same thread running the hub, but
it can be called from other threads after that thread has exited.
Once this is done, it is impossible to continue running the Once this is done, it is impossible to continue running the
hub. Attempts to use the blocking gevent API with pre-existing hub. Attempts to use the blocking gevent API with pre-existing
objects from this native thread and bound to this hub will fail. objects from this native thread and bound to this hub will fail.
.. versionchanged:: 20.5.1 .. versionchanged:: 20.5.1
Ensure that Python stack frames and greenlets referenced by this Attempt to ensure that Python stack frames and greenlets referenced by this
hub are cleaned up. This guarantees that switching to the hub again hub are cleaned up. This guarantees that switching to the hub again
is not safe after this. (It was never safe, but it's even less safe.) is not safe after this. (It was never safe, but it's even less safe.)
Note that this only works if the hub is destroyed in the same thread it
is running in. If the hub is destroyed by a different thread
after a ``fork()``, for example, expect some garbage to leak.
""" """
if self.periodic_monitoring_thread is not None: if self.periodic_monitoring_thread is not None:
self.periodic_monitoring_thread.kill() self.periodic_monitoring_thread.kill()
...@@ -786,6 +793,12 @@ class Hub(WaitOperationsGreenlet): ...@@ -786,6 +793,12 @@ class Hub(WaitOperationsGreenlet):
try: try:
self.throw(GreenletExit) self.throw(GreenletExit)
except LoopExit: except LoopExit:
# Expected.
pass
except GreenletError:
# Must be coming from a different thread.
# Note that python stack frames are likely to leak
# in this case.
pass pass
if destroy_loop is None: if destroy_loop is None:
......
...@@ -55,6 +55,7 @@ else: ...@@ -55,6 +55,7 @@ else:
from .sysinfo import VERBOSE from .sysinfo import VERBOSE
from .sysinfo import WIN from .sysinfo import WIN
from .sysinfo import LINUX from .sysinfo import LINUX
from .sysinfo import OSX
from .sysinfo import LIBUV from .sysinfo import LIBUV
from .sysinfo import CFFI_BACKEND from .sysinfo import CFFI_BACKEND
from .sysinfo import DEBUG from .sysinfo import DEBUG
...@@ -110,6 +111,7 @@ from .skipping import skipWithoutResource ...@@ -110,6 +111,7 @@ from .skipping import skipWithoutResource
from .skipping import skipWithoutExternalNetwork from .skipping import skipWithoutExternalNetwork
from .skipping import skipOnPy2 from .skipping import skipOnPy2
from .skipping import skipOnManylinux from .skipping import skipOnManylinux
from .skipping import skipOnMacOnCI
from .exception import ExpectedException from .exception import ExpectedException
...@@ -177,15 +179,4 @@ mock = mock ...@@ -177,15 +179,4 @@ mock = mock
# zope.interface # zope.interface
try: from zope.interface import verify
from zope.interface import verify
except ImportError:
class verify(object):
@staticmethod
def verifyObject(*_):
import warnings
warnings.warn("zope.interface is not installed; not verifying")
return
verify = verify
...@@ -1273,6 +1273,14 @@ if OSX: ...@@ -1273,6 +1273,14 @@ if OSX:
'test_socket.RecvmsgIntoTCPTest.testRecvmsgIntoGenerator', 'test_socket.RecvmsgIntoTCPTest.testRecvmsgIntoGenerator',
] ]
if RUNNING_ON_CI:
disabled_tests += [
# These sometime timeout. Cannot reproduce locally.
'test_ftp.TestTLS_FTPClassMixin.test_mlsd',
'test_ftp.TestTLS_FTPClassMixin.test_retrlines_too_long',
]
if RESOLVER_ARES and PY38 and not RUNNING_ON_CI: if RESOLVER_ARES and PY38 and not RUNNING_ON_CI:
disabled_tests += [ disabled_tests += [
# When updating to 1.16.0 this was seen locally, but not on CI. # When updating to 1.16.0 this was seen locally, but not on CI.
......
...@@ -623,14 +623,25 @@ def print_list(lst): ...@@ -623,14 +623,25 @@ def print_list(lst):
util.log(' - %s', name) util.log(' - %s', name)
def _setup_environ(debug=False): def _setup_environ(debug=False):
if ('PYTHONWARNINGS' not in os.environ def not_set(key):
return not bool(os.environ.get(key))
if (not_set('PYTHONWARNINGS')
and (not sys.warnoptions and (not sys.warnoptions
# Python 3.7 goes from [] to ['default'] for nothing # Python 3.7 goes from [] to ['default'] for nothing
or sys.warnoptions == ['default'])): or sys.warnoptions == ['default'])):
# action:message:category:module:line # action:message:category:module:line
# - when a warning matches
# more than one option, the action for the last matching
# option is performed.
# - action is one of : ignore, default, all, module, once, error
os.environ['PYTHONWARNINGS'] = ','.join([ os.environ['PYTHONWARNINGS'] = ','.join([
# Enable default warnings such as ResourceWarning. # Enable default warnings such as ResourceWarning.
'default', 'default',
'default::DeprecationWarning',
'default::ResourceWarning',
# On Python 3[.6], the system site.py module has # On Python 3[.6], the system site.py module has
# "open(fullname, 'rU')" which produces the warning that # "open(fullname, 'rU')" which produces the warning that
# 'U' is deprecated, so ignore warnings from site.py # 'U' is deprecated, so ignore warnings from site.py
...@@ -655,22 +666,22 @@ def _setup_environ(debug=False): ...@@ -655,22 +666,22 @@ def _setup_environ(debug=False):
'ignore:::dns.zone:', 'ignore:::dns.zone:',
]) ])
if 'PYTHONFAULTHANDLER' not in os.environ: if not_set('PYTHONFAULTHANDLER'):
os.environ['PYTHONFAULTHANDLER'] = 'true' os.environ['PYTHONFAULTHANDLER'] = 'true'
if 'GEVENT_DEBUG' not in os.environ and debug: if not_set('GEVENT_DEBUG') and debug:
os.environ['GEVENT_DEBUG'] = 'debug' os.environ['GEVENT_DEBUG'] = 'debug'
if 'PYTHONTRACEMALLOC' not in os.environ and debug: if not_set('PYTHONTRACEMALLOC') and debug:
# This slows the tests down quite a bit. Reserve # This slows the tests down quite a bit. Reserve
# for debugging. # for debugging.
os.environ['PYTHONTRACEMALLOC'] = '10' os.environ['PYTHONTRACEMALLOC'] = '10'
if 'PYTHONDEVMODE' not in os.environ: if not_set('PYTHONDEVMODE'):
# Python 3.7 and above. # Python 3.7 and above.
os.environ['PYTHONDEVMODE'] = '1' os.environ['PYTHONDEVMODE'] = '1'
if 'PYTHONMALLOC' not in os.environ and debug: if not_set('PYTHONMALLOC') and debug:
# Python 3.6 and above. # Python 3.6 and above.
# This slows the tests down some, but # This slows the tests down some, but
# can detect memory corruption. Unfortunately # can detect memory corruption. Unfortunately
...@@ -682,6 +693,24 @@ def _setup_environ(debug=False): ...@@ -682,6 +693,24 @@ def _setup_environ(debug=False):
os.environ['PYTHONMALLOC'] = 'default' os.environ['PYTHONMALLOC'] = 'default'
os.environ['PYTHONDEVMODE'] = '' os.environ['PYTHONDEVMODE'] = ''
interesting_envs = {
k: os.environ[k]
for k in os.environ
if k.startswith(('PYTHON', 'GEVENT'))
}
widest_k = max(len(k) for k in interesting_envs)
for k, v in sorted(interesting_envs.items()):
util.log('%*s\t=\t%s', widest_k, k, v, color="debug")
util.run(
[
sys.executable,
'-c',
'from __future__ import print_function; '
'import sys; print("sys.warnoptions:\t", sys.warnoptions)',
],
# Don't log the beginning and end of the subprocess.
quiet=True, nested=True)
def main(): def main():
# pylint:disable=too-many-locals,too-many-statements # pylint:disable=too-many-locals,too-many-statements
......
...@@ -377,7 +377,7 @@ def run(command, **kwargs): # pylint:disable=too-many-locals ...@@ -377,7 +377,7 @@ def run(command, **kwargs): # pylint:disable=too-many-locals
assert 'stdout' not in kwargs and 'stderr' not in kwargs, kwargs assert 'stdout' not in kwargs and 'stderr' not in kwargs, kwargs
kwargs['stderr'] = subprocess.STDOUT kwargs['stderr'] = subprocess.STDOUT
kwargs['stdout'] = subprocess.PIPE kwargs['stdout'] = subprocess.PIPE
popen = start(command, quiet=nested, **kwargs) popen = start(command, quiet=quiet, **kwargs)
name = popen.name name = popen.name
try: try:
......
...@@ -86,6 +86,10 @@ class Test(greentest.TestCase): ...@@ -86,6 +86,10 @@ class Test(greentest.TestCase):
self.assertEqual(line, '') self.assertEqual(line, '')
conn.close() conn.close()
@greentest.skipOnMacOnCI(
"Sometimes fails to get the right answers; "
"https://travis-ci.org/github/gevent/gevent/jobs/692184822"
)
@greentest.skipOnLibuvOnTravisOnCPython27( @greentest.skipOnLibuvOnTravisOnCPython27(
"segfaults; " "segfaults; "
"See https://github.com/gevent/gevent/pull/1156") "See https://github.com/gevent/gevent/pull/1156")
......
from contextlib import contextmanager
import unittest import unittest
import gevent import gevent
from gevent.testing import ignores_leakcheck from gevent.testing import ignores_leakcheck
...@@ -20,19 +22,37 @@ class TestJoin(unittest.TestCase): ...@@ -20,19 +22,37 @@ class TestJoin(unittest.TestCase):
res = gevent.get_hub().join() res = gevent.get_hub().join()
self.assertTrue(res) self.assertTrue(res)
@ignores_leakcheck @staticmethod
def test_join_in_new_thread_doesnt_leak_hub_or_greenlet(self): def __clean():
# https://github.com/gevent/gevent/issues/1601
import threading
import gc import gc
from gevent._greenlet_primitives import get_reachable_greenlets
def _clean():
for _ in range(2): for _ in range(2):
while gc.collect(): while gc.collect():
pass pass
_clean()
@contextmanager
def assert_no_greenlet_growth(self):
from gevent._greenlet_primitives import get_reachable_greenlets
clean = self.__clean
clean()
count_before = len(get_reachable_greenlets()) count_before = len(get_reachable_greenlets())
yield
count_after = len(get_reachable_greenlets())
if count_after > count_before:
# We could be off by exactly 1. Not entirely clear where.
# But it only happens the first time.
count_after -= 1
# If we were run in multiple process, our count could actually have
# gone down due to the GC's we did.
self.assertEqual(count_after, count_before)
@ignores_leakcheck
def test_join_in_new_thread_doesnt_leak_hub_or_greenlet(self):
# https://github.com/gevent/gevent/issues/1601
import threading
clean = self.__clean
def thread_main(): def thread_main():
g = gevent.Greenlet(run=lambda: 0) g = gevent.Greenlet(run=lambda: 0)
g.start() g.start()
...@@ -47,22 +67,50 @@ class TestJoin(unittest.TestCase): ...@@ -47,22 +67,50 @@ class TestJoin(unittest.TestCase):
t.start() t.start()
t.join() t.join()
_clean() clean()
with self.assert_no_greenlet_growth():
for _ in range(10): for _ in range(10):
tester(thread_main) tester(thread_main)
del tester del tester
del thread_main del thread_main
count_after = len(get_reachable_greenlets()) @ignores_leakcheck
if count_after > count_before: def test_destroy_in_main_thread_from_new_thread(self):
# We could be off by exactly 1. Not entirely clear where. # https://github.com/gevent/gevent/issues/1631
# But it only happens the first time. import threading
count_after -= 1
# If we were run in multiple process, our count could actually have clean = self.__clean
# gone down due to the GC's we did. class Thread(threading.Thread):
self.assertEqual(count_after, count_before) hub = None
def run(self):
g = gevent.Greenlet(run=lambda: 0)
g.start()
g.join()
del g
hub = gevent.get_hub()
hub.join()
self.hub = hub
def tester(Thread, clean):
t = Thread()
t.start()
t.join()
t.hub.destroy(destroy_loop=True)
t.hub = None
del t
clean()
# Unfortunately, this WILL leak greenlets,
# at least on CPython. The frames of the dead threads
# are referenced by the hub in some sort of cycle, and
# greenlets don't particpate in GC.
for _ in range(10):
tester(Thread, clean)
del tester
del Thread
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -134,6 +134,11 @@ class TestCase(greentest.TestCase): ...@@ -134,6 +134,11 @@ class TestCase(greentest.TestCase):
fd.write(('GET %s HTTP/1.0\r\n\r\n' % url).encode('latin-1')) fd.write(('GET %s HTTP/1.0\r\n\r\n' % url).encode('latin-1'))
fd.flush() fd.flush()
LOCAL_CONN_REFUSED_ERRORS = ()
if greentest.OSX:
# A kernel bug in OS X sometimes results in this
LOCAL_CONN_REFUSED_ERRORS = (errno.EPROTOTYPE,)
def assertConnectionRefused(self): def assertConnectionRefused(self):
with self.assertRaises(socket.error) as exc: with self.assertRaises(socket.error) as exc:
with self.makefile() as conn: with self.makefile() as conn:
...@@ -142,7 +147,7 @@ class TestCase(greentest.TestCase): ...@@ -142,7 +147,7 @@ class TestCase(greentest.TestCase):
ex = exc.exception ex = exc.exception
self.assertIn(ex.args[0], self.assertIn(ex.args[0],
(errno.ECONNREFUSED, errno.EADDRNOTAVAIL, (errno.ECONNREFUSED, errno.EADDRNOTAVAIL,
errno.ECONNRESET, errno.ECONNABORTED), errno.ECONNRESET, errno.ECONNABORTED) + self.LOCAL_CONN_REFUSED_ERRORS,
(ex, ex.args)) (ex, ex.args))
def assert500(self): def assert500(self):
......
...@@ -49,12 +49,17 @@ class TestSSL(test__socket.TestTCP): ...@@ -49,12 +49,17 @@ class TestSSL(test__socket.TestTCP):
# to send a very large amount to make it timeout # to send a very large amount to make it timeout
_test_sendall_data = data_sent = b'hello' * 100000000 _test_sendall_data = data_sent = b'hello' * 100000000
test_sendall_array = greentest.skipOnManylinux("Sometimes misses data")( test_sendall_array = greentest.skipOnMacOnCI("Sometimes misses data")(
greentest.skipOnManylinux("Sometimes misses data")(
test__socket.TestTCP.test_sendall_array test__socket.TestTCP.test_sendall_array
) )
test_sendall_str = greentest.skipOnManylinux("Sometimes misses data")( )
test_sendall_str = greentest.skipOnMacOnCI("Sometimes misses data")(
greentest.skipOnManylinux("Sometimes misses data")(
test__socket.TestTCP.test_sendall_str test__socket.TestTCP.test_sendall_str
) )
)
@greentest.skipOnWindows("Not clear why we're skipping") @greentest.skipOnWindows("Not clear why we're skipping")
def test_ssl_sendall_timeout0(self): def test_ssl_sendall_timeout0(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment