Commit fcfc9304 authored by Jason Madden's avatar Jason Madden

More careful handling of threadpool threads that have a hub.

The threadpolo worker greenlet itself is meant to spend much of its time blocking, so don't report on it if it does.

Only if we switch greenlets while running the user's code should we start looking for blocking.
parent 670cbf8b
...@@ -13,6 +13,9 @@ PY2 = sys.version_info[0] == 2 ...@@ -13,6 +13,9 @@ PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] >= 3 PY3 = sys.version_info[0] >= 3
PYPY = hasattr(sys, 'pypy_version_info') PYPY = hasattr(sys, 'pypy_version_info')
WIN = sys.platform.startswith("win") WIN = sys.platform.startswith("win")
LINUX = sys.platform.startswith('linux')
OSX = sys.platform == 'darwin'
PURE_PYTHON = PYPY or os.getenv('PURE_PYTHON') PURE_PYTHON = PYPY or os.getenv('PURE_PYTHON')
...@@ -33,6 +36,9 @@ else: ...@@ -33,6 +36,9 @@ else:
native_path_types = string_types native_path_types = string_types
thread_mod_name = 'thread' thread_mod_name = 'thread'
def NativeStrIO():
import io
return io.BytesIO() if str is bytes else io.StringIO()
## Exceptions ## Exceptions
if PY3: if PY3:
......
...@@ -539,7 +539,7 @@ class PeriodicMonitoringThread(object): ...@@ -539,7 +539,7 @@ class PeriodicMonitoringThread(object):
# Make sure the hub is still around, and still active, # Make sure the hub is still around, and still active,
# and keep it around while we are here. # and keep it around while we are here.
hub = self._hub_wref() hub = self._hub_wref()
if hub: if hub and self.should_run:
for f, _ in functions: for f, _ in functions:
f(hub) f(hub)
except SystemExit: except SystemExit:
...@@ -569,8 +569,9 @@ class PeriodicMonitoringThread(object): ...@@ -569,8 +569,9 @@ class PeriodicMonitoringThread(object):
if did_switch or self._active_greenlet is None or isinstance(self._active_greenlet, Hub): if did_switch or self._active_greenlet is None or isinstance(self._active_greenlet, Hub):
# Either we switched, or nothing is running (we got a # Either we switched, or nothing is running (we got a
# trace event we don't know about), or we spent the whole time in the hub, # trace event we don't know about or were requested to
# blocked for IO. Nothing to report. # ignore), or we spent the whole time in the hub, blocked
# for IO. Nothing to report.
return return
if (self._active_greenlet is getcurrent() if (self._active_greenlet is getcurrent()
...@@ -597,6 +598,13 @@ class PeriodicMonitoringThread(object): ...@@ -597,6 +598,13 @@ class PeriodicMonitoringThread(object):
report.extend(format_run_info(self.monitor_thread_ident)) report.extend(format_run_info(self.monitor_thread_ident))
hub.exception_stream.write('\n'.join(report)) hub.exception_stream.write('\n'.join(report))
def ignore_current_greenlet_blocking(self):
# Don't pay attention to the current greenlet.
self._active_greenlet = None
def monitor_current_greenlet_blocking(self):
self._active_greenlet = getcurrent()
def __repr__(self): def __repr__(self):
return '<%s at %s in thread %s greenlet %r for %r>' % ( return '<%s at %s in thread %s greenlet %r for %r>' % (
self.__class__.__name__, self.__class__.__name__,
...@@ -976,7 +984,7 @@ class Hub(TrackedRawGreenlet): ...@@ -976,7 +984,7 @@ class Hub(TrackedRawGreenlet):
self._resolver = value self._resolver = value
def _del_resolver(self): def _del_resolver(self):
del self._resolver self._resolver = None
resolver = property(_get_resolver, _set_resolver, _del_resolver) resolver = property(_get_resolver, _set_resolver, _del_resolver)
...@@ -995,7 +1003,7 @@ class Hub(TrackedRawGreenlet): ...@@ -995,7 +1003,7 @@ class Hub(TrackedRawGreenlet):
self._threadpool = value self._threadpool = value
def _del_threadpool(self): def _del_threadpool(self):
del self._threadpool self._threadpool = None
threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool) threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool)
......
...@@ -22,6 +22,8 @@ class ThreadPool(GroupMappingMixin): ...@@ -22,6 +22,8 @@ class ThreadPool(GroupMappingMixin):
""" """
.. note:: The method :meth:`apply_async` will always return a new .. note:: The method :meth:`apply_async` will always return a new
greenlet, bypassing the threadpool entirely. greenlet, bypassing the threadpool entirely.
.. caution:: Instances of this class are only true if they have
unfinished tasks.
""" """
def __init__(self, maxsize, hub=None): def __init__(self, maxsize, hub=None):
...@@ -60,6 +62,8 @@ class ThreadPool(GroupMappingMixin): ...@@ -60,6 +62,8 @@ class ThreadPool(GroupMappingMixin):
def __len__(self): def __len__(self):
# XXX just do unfinished_tasks property # XXX just do unfinished_tasks property
# Note that this becomes the boolean value of this class,
# that's probably not what we want!
return self.task_queue.unfinished_tasks return self.task_queue.unfinished_tasks
def _get_size(self): def _get_size(self):
...@@ -165,7 +169,7 @@ class ThreadPool(GroupMappingMixin): ...@@ -165,7 +169,7 @@ class ThreadPool(GroupMappingMixin):
:return: A :class:`gevent.event.AsyncResult`. :return: A :class:`gevent.event.AsyncResult`.
""" """
while True: while 1:
semaphore = self._semaphore semaphore = self._semaphore
semaphore.acquire() semaphore.acquire()
if semaphore is self._semaphore: if semaphore is self._semaphore:
...@@ -201,6 +205,11 @@ class ThreadPool(GroupMappingMixin): ...@@ -201,6 +205,11 @@ class ThreadPool(GroupMappingMixin):
# it should be true to avoid leaking resources. # it should be true to avoid leaking resources.
_destroy_worker_hub = True _destroy_worker_hub = True
def __ignore_current_greenlet_blocking(self, hub):
if hub is not None and hub.periodic_monitoring_thread is not None:
hub.periodic_monitoring_thread.ignore_current_greenlet_blocking()
def _worker(self): def _worker(self):
# pylint:disable=too-many-branches # pylint:disable=too-many-branches
need_decrease = True need_decrease = True
...@@ -210,6 +219,11 @@ class ThreadPool(GroupMappingMixin): ...@@ -210,6 +219,11 @@ class ThreadPool(GroupMappingMixin):
if h is not None: if h is not None:
h.name = 'ThreadPool Worker Hub' h.name = 'ThreadPool Worker Hub'
task_queue = self.task_queue task_queue = self.task_queue
# While we block, don't let the monitoring thread, if any,
# report us as blocked. Indeed, so long as we never
# try to switch greenlets, don't report us as blocked---
# the threadpool is *meant* to run blocking tasks
self.__ignore_current_greenlet_blocking(h)
task = task_queue.get() task = task_queue.get()
try: try:
if task is None: if task is None:
......
...@@ -13,6 +13,7 @@ import traceback ...@@ -13,6 +13,7 @@ import traceback
from greenlet import getcurrent from greenlet import getcurrent
from greenlet import greenlet as RawGreenlet from greenlet import greenlet as RawGreenlet
from gevent._compat import PYPY
__all__ = [ __all__ = [
'wrap_errors', 'wrap_errors',
...@@ -20,6 +21,11 @@ __all__ = [ ...@@ -20,6 +21,11 @@ __all__ = [
'GreenletTree', 'GreenletTree',
] ]
# PyPy is very slow at formatting stacks
# for some reason.
_STACK_LIMIT = 20 if PYPY else None
def _noop(): def _noop():
return None return None
...@@ -125,7 +131,7 @@ def _format_thread_info(lines, current_thread_ident): ...@@ -125,7 +131,7 @@ def _format_thread_info(lines, current_thread_ident):
if current_thread_ident == thread_ident: if current_thread_ident == thread_ident:
name = '%s) (CURRENT' % (name,) name = '%s) (CURRENT' % (name,)
lines.append('Thread 0x%x (%s)\n' % (thread_ident, name)) lines.append('Thread 0x%x (%s)\n' % (thread_ident, name))
lines.append(''.join(traceback.format_stack(frame))) lines.append(''.join(traceback.format_stack(frame, _STACK_LIMIT)))
# We may have captured our own frame, creating a reference # We may have captured our own frame, creating a reference
# cycle, so clear it out. # cycle, so clear it out.
...@@ -290,7 +296,7 @@ class GreenletTree(object): ...@@ -290,7 +296,7 @@ class GreenletTree(object):
@staticmethod @staticmethod
def __render_tb(tree, label, frame): def __render_tb(tree, label, frame):
tree.child_data(label) tree.child_data(label)
tb = ''.join(traceback.format_stack(frame)) tb = ''.join(traceback.format_stack(frame, _STACK_LIMIT))
tree.child_multidata(tb) tree.child_multidata(tb)
@staticmethod @staticmethod
......
...@@ -28,8 +28,8 @@ PYPY = gsysinfo.PYPY ...@@ -28,8 +28,8 @@ PYPY = gsysinfo.PYPY
CPYTHON = not PYPY CPYTHON = not PYPY
VERBOSE = sys.argv.count('-v') > 1 VERBOSE = sys.argv.count('-v') > 1
WIN = gsysinfo.WIN WIN = gsysinfo.WIN
LINUX = sys.platform.startswith('linux') LINUX = gsysinfo.LINUX
OSX = sys.platform == 'darwin' OSX = gsysinfo.OSX
PURE_PYTHON = gsysinfo.PURE_PYTHON PURE_PYTHON = gsysinfo.PURE_PYTHON
......
...@@ -19,14 +19,16 @@ ...@@ -19,14 +19,16 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE. # THE SOFTWARE.
import re
import time
import greentest import greentest
import greentest.timing import greentest.timing
import time
import re
import gevent import gevent
from gevent import socket from gevent import socket
from gevent.hub import Waiter, get_hub from gevent.hub import Waiter, get_hub
from gevent._compat import PYPY from gevent._compat import NativeStrIO
DELAY = 0.1 DELAY = 0.1
...@@ -116,58 +118,187 @@ class TestWaiter(greentest.TestCase): ...@@ -116,58 +118,187 @@ class TestWaiter(greentest.TestCase):
g.kill() g.kill()
@greentest.skipOnCI("Racy on CI")
class TestPeriodicMonitoringThread(greentest.TestCase): class TestPeriodicMonitoringThread(greentest.TestCase):
def _reset_hub(self):
hub = get_hub()
try:
del hub.exception_stream
except AttributeError:
pass
if hub._threadpool is not None:
hub.threadpool.join()
hub.threadpool.kill()
del hub.threadpool
def setUp(self): def setUp(self):
super(TestPeriodicMonitoringThread, self).setUp() super(TestPeriodicMonitoringThread, self).setUp()
self.monitor_thread = gevent.config.monitor_thread self.monitor_thread = gevent.config.monitor_thread
gevent.config.monitor_thread = True gevent.config.monitor_thread = True
self.monitor_fired = 0 self.monitor_fired = 0
self.monitored_hubs = set()
self._reset_hub()
def tearDown(self): def tearDown(self):
if not self.monitor_thread and get_hub().periodic_monitoring_thread: hub = get_hub()
if not self.monitor_thread and hub.periodic_monitoring_thread:
# If it was true, nothing to do. If it was false, tear things down. # If it was true, nothing to do. If it was false, tear things down.
get_hub().periodic_monitoring_thread.kill() hub.periodic_monitoring_thread.kill()
get_hub().periodic_monitoring_thread = None hub.periodic_monitoring_thread = None
gevent.config.monitor_thread = self.monitor_thread gevent.config.monitor_thread = self.monitor_thread
self.monitored_hubs = None
self._reset_hub()
def _monitor(self, _hub): def _monitor(self, hub):
self.monitor_fired += 1 self.monitor_fired += 1
if self.monitored_hubs is not None:
self.monitored_hubs.add(hub)
def test_config(self): def test_config(self):
self.assertEqual(0.1, gevent.config.max_blocking_time) self.assertEqual(0.1, gevent.config.max_blocking_time)
def _run_monitoring_threads(self, monitor):
self.assertTrue(monitor.should_run)
from threading import Condition
cond = Condition()
cond.acquire()
def monitor_cond(_hub):
cond.acquire()
# Only run once. Especially helpful on PyPy, where
# formatting stacks is expensive.
monitor.kill()
cond.notifyAll()
cond.release()
before_funs = monitor._additional_monitoring_functions
monitor.add_monitoring_function(
monitor_cond,
0)
cond.wait()
cond.release()
monitor._additional_monitoring_functions = before_funs
@greentest.ignores_leakcheck @greentest.ignores_leakcheck
def test_blocking(self): def test_blocking_this_thread(self):
import io
hub = get_hub() hub = get_hub()
stream = hub.exception_stream = NativeStrIO()
monitor = hub.start_periodic_monitoring_thread() monitor = hub.start_periodic_monitoring_thread()
self.assertIsNotNone(monitor) self.assertIsNotNone(monitor)
before_funs = monitor._additional_monitoring_functions before_funs = monitor._additional_monitoring_functions
monitor.add_monitoring_function(self._monitor, 0) monitor.add_monitoring_function(self._monitor, 0.1)
self.assertIn((self._monitor, 0), monitor.monitoring_functions()) self.assertIn((self._monitor, 0.1), monitor.monitoring_functions())
# We must make sure we have switched greenlets at least once, # We must make sure we have switched greenlets at least once,
# otherwise we can't detect a failure. # otherwise we can't detect a failure.
gevent.sleep(0.01) gevent.sleep(0.0001)
stream = hub.exception_stream = io.BytesIO() if str is bytes else io.StringIO()
assert hub.exception_stream is stream assert hub.exception_stream is stream
try: try:
time.sleep(0.3) # Thrice the default; PyPy is very slow to format stacks time.sleep(0.3) # Thrice the default
# XXX: This is racy even on CPython self._run_monitoring_threads(monitor)
finally: finally:
monitor._additional_monitoring_functions = before_funs monitor._additional_monitoring_functions = before_funs
assert hub.exception_stream is stream assert hub.exception_stream is stream
monitor.kill()
del hub.exception_stream del hub.exception_stream
if not PYPY:
# PyPy may still be formatting the stacks in the other thread.
self.assertGreaterEqual(self.monitor_fired, 1) self.assertGreaterEqual(self.monitor_fired, 1)
data = stream.getvalue() data = stream.getvalue()
self.assertIn('appears to be blocked', data) self.assertIn('appears to be blocked', data)
self.assertIn('PeriodicMonitoringThread', data) self.assertIn('PeriodicMonitoringThread', data)
def _prep_worker_thread(self):
hub = get_hub()
threadpool = hub.threadpool
worker_hub = threadpool.apply(get_hub)
stream = worker_hub.exception_stream = NativeStrIO()
# It does not have a monitoring thread yet
self.assertIsNone(worker_hub.periodic_monitoring_thread)
# So switch to it and give it one.
threadpool.apply(gevent.sleep, (0.01,))
self.assertIsNotNone(worker_hub.periodic_monitoring_thread)
worker_monitor = worker_hub.periodic_monitoring_thread
worker_monitor.add_monitoring_function(self._monitor, 0.1)
return worker_hub, stream, worker_monitor
@greentest.ignores_leakcheck
def test_blocking_threadpool_thread_task_queue(self):
# A threadpool thread spends much of its time
# blocked on the native Lock object. Unless we take
# care, if that thread had created a hub, it will constantly
# be reported as blocked.
worker_hub, stream, worker_monitor = self._prep_worker_thread()
# Now wait until the monitoring threads have run.
self._run_monitoring_threads(worker_monitor)
worker_monitor.kill()
# We did run the monitor in the worker thread, but it
# did NOT report itself blocked by the worker thread sitting there.
self.assertIn(worker_hub, self.monitored_hubs)
self.assertEqual(stream.getvalue(), '')
@greentest.ignores_leakcheck
def test_blocking_threadpool_thread_one_greenlet(self):
# If the background threadpool thread has no other greenlets to run
# and never switches, then even if it has a hub
# we don't report it blocking. The threadpool is *meant* to run
# tasks that block.
hub = get_hub()
threadpool = hub.threadpool
worker_hub, stream, worker_monitor = self._prep_worker_thread()
task = threadpool.spawn(time.sleep, 0.3)
# Now wait until the monitoring threads have run.
self._run_monitoring_threads(worker_monitor)
# and be sure the task ran
task.get()
worker_monitor.kill()
# We did run the monitor in the worker thread, but it
# did NOT report itself blocked by the worker thread
self.assertIn(worker_hub, self.monitored_hubs)
self.assertEqual(stream.getvalue(), '')
@greentest.ignores_leakcheck
def test_blocking_threadpool_thread_multi_greenlet(self):
# If the background threadpool thread ever switches
# greenlets, monitoring goes into affect.
hub = get_hub()
threadpool = hub.threadpool
worker_hub, stream, worker_monitor = self._prep_worker_thread()
def task():
g = gevent.spawn(time.sleep, 0.7)
g.join()
task = threadpool.spawn(task)
# Now wait until the monitoring threads have run.
self._run_monitoring_threads(worker_monitor)
# and be sure the task ran
task.get()
worker_monitor.kill()
# We did run the monitor in the worker thread, and it
# DID report itself blocked by the worker thread
self.assertIn(worker_hub, self.monitored_hubs)
data = stream.getvalue()
self.assertIn('appears to be blocked', data)
self.assertIn('PeriodicMonitoringThread', data)
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment