Commit 56cfd118 authored by Jason Madden's avatar Jason Madden

Add gevent.util.assert_switches

Based on refactoring existing code in _monitor.py

Flexibly allows checking for any switching, switches that take longer
than a set amount of time, or switches that exceed a time limit
between getting back to the hub.

Fixes #1182.
parent 46a133db
...@@ -26,6 +26,9 @@ ...@@ -26,6 +26,9 @@
- The long-deprecated and undocumented module ``gevent.wsgi`` was removed. - The long-deprecated and undocumented module ``gevent.wsgi`` was removed.
- Add `gevent.util.assert_switches` to build on the monitoring
functions. Fixes :issue:`1182`.
1.3b1 (2018-04-13) 1.3b1 (2018-04-13)
================== ==================
......
...@@ -27,6 +27,9 @@ interval. When such a blocking greenlet is detected, it will print ...@@ -27,6 +27,9 @@ interval. When such a blocking greenlet is detected, it will print
:attr:`~gevent.hub.Hub.exception_stream`. It will also emit the :attr:`~gevent.hub.Hub.exception_stream`. It will also emit the
:class:`gevent.events.EventLoopBlocked` event. :class:`gevent.events.EventLoopBlocked` event.
.. seealso:: :func:`gevent.util.assert_switches`
For a scoped version of this.
Memory Usage Memory Usage
------------ ------------
......
...@@ -50,6 +50,149 @@ except ImportError: ...@@ -50,6 +50,149 @@ except ImportError:
class MonitorWarning(RuntimeWarning): class MonitorWarning(RuntimeWarning):
"""The type of warnings we emit.""" """The type of warnings we emit."""
class GreenletTracer(object):
# A counter, incremented by the greenlet trace function
# we install on every greenlet switch. This is reset when the
# periodic monitoring thread runs.
greenlet_switch_counter = 0
# The greenlet last switched to.
active_greenlet = None
# The trace function that was previously installed,
# if any.
previous_trace_function = None
def __init__(self):
prev_trace = settrace(self)
self.previous_trace_function = prev_trace
def kill(self): # pylint:disable=method-hidden
# Must be called in the monitored thread.
settrace(self.previous_trace_function)
self.previous_trace_function = None
# Become a no-op
self.kill = lambda: None
def __call__(self, event, args):
# This function runs in the thread we are monitoring.
self.greenlet_switch_counter += 1
if event in ('switch', 'throw'):
# args is (origin, target). This is the only defined
# case
self.active_greenlet = args[1]
else:
self.active_greenlet = None
if self.previous_trace_function is not None:
self.previous_trace_function(event, args)
def did_block_hub(self, hub):
# Check to see if we have blocked since the last call to this
# method. Returns a true value if we blocked (not in the hub),
# a false value if everything is fine.
# This may be called in the same thread being traced or a
# different thread; if a different thread, there is a race
# condition with this being incremented in the thread we're
# monitoring, but probably not often enough to lead to
# annoying false positives.
active_greenlet = self.active_greenlet
did_switch = self.greenlet_switch_counter != 0
self.greenlet_switch_counter = 0
if did_switch or active_greenlet is None or active_greenlet is hub:
# Either we switched, or nothing is running (we got a
# trace event we don't know about or were requested to
# ignore), or we spent the whole time in the hub, blocked
# for IO. Nothing to report.
return False
return True, active_greenlet
def ignore_current_greenlet_blocking(self):
# Don't pay attention to the current greenlet.
self.active_greenlet = None
def monitor_current_greenlet_blocking(self):
self.active_greenlet = getcurrent()
def did_block_hub_report(self, hub, active_greenlet, format_kwargs):
report = ['=' * 80,
'\n%s : Greenlet %s appears to be blocked' %
(gmctime(), active_greenlet)]
report.append(" Reported by %s" % (self,))
try:
frame = sys._current_frames()[hub.thread_ident]
except KeyError:
# The thread holding the hub has died. Perhaps we shouldn't
# even report this?
stack = ["Unknown: No thread found for hub %r\n" % (hub,)]
else:
stack = traceback.format_stack(frame)
report.append('Blocked Stack (for thread id %s):' % (hex(hub.thread_ident),))
report.append(''.join(stack))
report.append("Info:")
report.extend(format_run_info(**format_kwargs))
return report
class _HubTracer(GreenletTracer):
def __init__(self, hub, max_blocking_time):
GreenletTracer.__init__(self)
self.max_blocking_time = max_blocking_time
self.hub = hub
def kill(self): # pylint:disable=method-hidden
self.hub = None
GreenletTracer.kill(self)
class HubSwitchTracer(_HubTracer):
# A greenlet tracer that records the last time we switched *into* the hub.
last_entered_hub = 0
def __call__(self, event, args):
GreenletTracer.__call__(self, event, args)
if self.active_greenlet is self.hub:
self.last_entered_hub = perf_counter()
def did_block_hub(self, hub):
if perf_counter() - self.last_entered_hub > self.max_blocking_time:
return True, self.active_greenlet
class MaxSwitchTracer(_HubTracer):
# A greenlet tracer that records the maximum time between switches,
# not including time spent in the hub.
max_blocking = 0
def __init__(self, hub, max_blocking_time):
_HubTracer.__init__(self, hub, max_blocking_time)
self.last_switch = perf_counter()
def __call__(self, event, args):
old_active = self.active_greenlet
GreenletTracer.__call__(self, event, args)
if old_active is not self.hub and old_active is not None:
# If we're switching out of the hub, the blocking
# time doesn't count.
switched_at = perf_counter()
self.max_blocking = max(self.max_blocking,
switched_at - self.last_switch)
def did_block_hub(self, hub):
if self.max_blocking == 0:
# We never switched. Check the time now
self.max_blocking = perf_counter() - self.last_switch
if self.max_blocking > self.max_blocking_time:
return True, self.active_greenlet
class _MonitorEntry(object): class _MonitorEntry(object):
__slots__ = ('function', 'period', 'last_run_time') __slots__ = ('function', 'period', 'last_run_time')
...@@ -65,25 +208,16 @@ class _MonitorEntry(object): ...@@ -65,25 +208,16 @@ class _MonitorEntry(object):
def __repr__(self): def __repr__(self):
return repr((self.function, self.period, self.last_run_time)) return repr((self.function, self.period, self.last_run_time))
@implementer(IPeriodicMonitorThread) @implementer(IPeriodicMonitorThread)
class PeriodicMonitoringThread(object): class PeriodicMonitoringThread(object):
# This doesn't extend threading.Thread because that gets monkey-patched.
# We use the low-level 'start_new_thread' primitive instead.
# The amount of seconds we will sleep when we think we have nothing # The amount of seconds we will sleep when we think we have nothing
# to do. # to do.
inactive_sleep_time = 2.0 inactive_sleep_time = 2.0
# A counter, incremented by the greenlet trace function
# we install on every greenlet switch. This is reset when the
# periodic monitoring thread runs.
_greenlet_switch_counter = 0
# The greenlet being switched to.
_active_greenlet = None
# The trace function that was previously installed,
# if any.
previous_trace_function = None
# The absolute minimum we will sleep, regardless of # The absolute minimum we will sleep, regardless of
# what particular monitoring functions want to say. # what particular monitoring functions want to say.
min_sleep_time = 0.005 min_sleep_time = 0.005
...@@ -104,6 +238,9 @@ class PeriodicMonitoringThread(object): ...@@ -104,6 +238,9 @@ class PeriodicMonitoringThread(object):
# to 0 when we go back below. # to 0 when we go back below.
_memory_exceeded = 0 _memory_exceeded = 0
# The instance of GreenletTracer we're using
_greenlet_tracer = None
def __init__(self, hub): def __init__(self, hub):
self._hub_wref = wref(hub, self._on_hub_gc) self._hub_wref = wref(hub, self._on_hub_gc)
self.should_run = True self.should_run = True
...@@ -111,8 +248,7 @@ class PeriodicMonitoringThread(object): ...@@ -111,8 +248,7 @@ class PeriodicMonitoringThread(object):
# Must be installed in the thread that the hub is running in; # Must be installed in the thread that the hub is running in;
# the trace function is threadlocal # the trace function is threadlocal
assert get_thread_ident() == hub.thread_ident assert get_thread_ident() == hub.thread_ident
prev_trace = settrace(self.greenlet_trace) self._greenlet_tracer = GreenletTracer()
self.previous_trace_function = prev_trace
self._monitoring_functions = [_MonitorEntry(self.monitor_blocking, self._monitoring_functions = [_MonitorEntry(self.monitor_blocking,
GEVENT_CONFIG.max_blocking_time)] GEVENT_CONFIG.max_blocking_time)]
...@@ -125,17 +261,6 @@ class PeriodicMonitoringThread(object): ...@@ -125,17 +261,6 @@ class PeriodicMonitoringThread(object):
def hub(self): def hub(self):
return self._hub_wref() return self._hub_wref()
def greenlet_trace(self, event, args):
# This function runs in the thread we are monitoring.
self._greenlet_switch_counter += 1
if event in ('switch', 'throw'):
# args is (origin, target). This is the only defined
# case
self._active_greenlet = args[1]
else:
self._active_greenlet = None
if self.previous_trace_function is not None:
self.previous_trace_function(event, args)
def monitoring_functions(self): def monitoring_functions(self):
# Return a list of _MonitorEntry objects # Return a list of _MonitorEntry objects
...@@ -186,8 +311,7 @@ class PeriodicMonitoringThread(object): ...@@ -186,8 +311,7 @@ class PeriodicMonitoringThread(object):
# Stop this monitoring thread from running. # Stop this monitoring thread from running.
self.should_run = False self.should_run = False
# Uninstall our tracing hook # Uninstall our tracing hook
settrace(self.previous_trace_function) self._greenlet_tracer.kill()
self.previous_trace_function = None
def _on_hub_gc(self, _): def _on_hub_gc(self, _):
self.kill() self.kill()
...@@ -246,38 +370,15 @@ class PeriodicMonitoringThread(object): ...@@ -246,38 +370,15 @@ class PeriodicMonitoringThread(object):
# For tests, we return a true value when we think we found something # For tests, we return a true value when we think we found something
# blocking # blocking
# There is a race condition with this being incremented in the did_block = self._greenlet_tracer.did_block_hub(hub)
# thread we're monitoring, but probably not often enough to lead if not did_block:
# to annoying false positives.
active_greenlet = self._active_greenlet
did_switch = self._greenlet_switch_counter != 0
self._greenlet_switch_counter = 0
if did_switch or active_greenlet is None or active_greenlet is hub:
# Either we switched, or nothing is running (we got a
# trace event we don't know about or were requested to
# ignore), or we spent the whole time in the hub, blocked
# for IO. Nothing to report.
return return
report = ['=' * 80, active_greenlet = did_block[1]
'\n%s : Greenlet %s appears to be blocked' % report = self._greenlet_tracer.did_block_hub_report(
(gmctime(), active_greenlet)] hub, active_greenlet,
report.append(" Reported by %s" % (self,)) dict(greenlet_stacks=False, current_thread_ident=self.monitor_thread_ident))
try:
frame = sys._current_frames()[hub.thread_ident]
except KeyError:
# The thread holding the hub has died. Perhaps we shouldn't
# even report this?
stack = ["Unknown: No thread found for hub %r\n" % (hub,)]
else:
stack = traceback.format_stack(frame)
report.append('Blocked Stack (for thread id %s):' % (hex(hub.thread_ident),))
report.append(''.join(stack))
report.append("Info:")
report.extend(format_run_info(greenlet_stacks=False,
current_thread_ident=self.monitor_thread_ident))
report.append(report[0])
stream = hub.exception_stream stream = hub.exception_stream
for line in report: for line in report:
# Printing line by line may interleave with other things, # Printing line by line may interleave with other things,
...@@ -289,11 +390,10 @@ class PeriodicMonitoringThread(object): ...@@ -289,11 +390,10 @@ class PeriodicMonitoringThread(object):
return (active_greenlet, report) return (active_greenlet, report)
def ignore_current_greenlet_blocking(self): def ignore_current_greenlet_blocking(self):
# Don't pay attention to the current greenlet. self._greenlet_tracer.ignore_current_greenlet_blocking()
self._active_greenlet = None
def monitor_current_greenlet_blocking(self): def monitor_current_greenlet_blocking(self):
self._active_greenlet = getcurrent() self._greenlet_tracer.monitor_current_greenlet_blocking()
def can_monitor_memory_usage(self): def can_monitor_memory_usage(self):
return Process is not None return Process is not None
......
...@@ -23,6 +23,7 @@ __all__ = [ ...@@ -23,6 +23,7 @@ __all__ = [
'print_run_info', 'print_run_info',
'GreenletTree', 'GreenletTree',
'wrap_errors', 'wrap_errors',
'assert_switches',
] ]
# PyPy is very slow at formatting stacks # PyPy is very slow at formatting stacks
...@@ -507,3 +508,85 @@ class GreenletTree(object): ...@@ -507,3 +508,85 @@ class GreenletTree(object):
Returns the `GreenletTree` for the current thread. Returns the `GreenletTree` for the current thread.
""" """
return cls._forest()[1] return cls._forest()[1]
class _FailedToSwitch(AssertionError):
pass
class assert_switches(object):
"""
A context manager for ensuring a block of code switches greenlets.
This performs a similar function as the :doc:`monitoring thread
</monitoring>`, but the scope is limited to the body of the with
statement. If the code within the body doesn't yield to the hub
(and doesn't raise an exception), then upon exiting the
context manager an :exc:`AssertionError` will be raised.
This is useful in unit tests and for debugging purposes.
:keyword float max_blocking_time: If given, the body is allowed
to block for up to this many fractional seconds before
an error is raised.
:keyword bool hub_only: If True, then *max_blocking_time* only
refers to the amount of time spent between switches into the
hub. If False, then it refers to the maximum time between
*any* switches. If *max_blocking_time* is not given, has no
effect.
Example::
# This will always raise an exception: nothing switched
with assert_switches():
pass
# This will never raise an exception; nothing switched,
# but it happened very fast
with assert_switches(max_blocking_time=1.0):
pass
.. versionadded:: 1.3
"""
hub = None
tracer = None
def __init__(self, max_blocking_time=None, hub_only=False):
self.max_blocking_time = max_blocking_time
self.hub_only = hub_only
def __enter__(self):
from gevent import get_hub
from gevent import _monitor
self.hub = hub = get_hub()
# TODO: We could optimize this to use the GreenletTracer
# installed by the monitoring thread, if there is one.
# As it is, we will chain trace calls back to it.
if not self.max_blocking_time:
self.tracer = _monitor.GreenletTracer()
elif self.hub_only:
self.tracer = _monitor.HubSwitchTracer(hub, self.max_blocking_time)
else:
self.tracer = _monitor.MaxSwitchTracer(hub, self.max_blocking_time)
self.tracer.monitor_current_greenlet_blocking()
return self
def __exit__(self, t, v, tb):
self.tracer.kill()
hub = self.hub; self.hub = None
tracer = self.tracer; self.tracer = None
# Only check if there was no exception raised, we
# don't want to hide anything
if t is not None:
return
did_block = tracer.did_block_hub(hub)
if did_block:
active_greenlet = did_block[1]
report_lines = tracer.did_block_hub_report(hub, active_greenlet, {})
raise _FailedToSwitch('\n'.join(report_lines))
...@@ -50,7 +50,7 @@ class _AbstractTestPeriodicMonitoringThread(object): ...@@ -50,7 +50,7 @@ class _AbstractTestPeriodicMonitoringThread(object):
def tearDown(self): def tearDown(self):
monitor.start_new_thread = self._orig_start_new_thread monitor.start_new_thread = self._orig_start_new_thread
monitor.thread_sleep = self._orig_thread_sleep monitor.thread_sleep = self._orig_thread_sleep
prev = self.pmt.previous_trace_function prev = self.pmt._greenlet_tracer.previous_trace_function
self.pmt.kill() self.pmt.kill()
assert gettrace() is prev, (gettrace(), prev) assert gettrace() is prev, (gettrace(), prev)
settrace(None) settrace(None)
...@@ -62,7 +62,7 @@ class TestPeriodicMonitoringThread(_AbstractTestPeriodicMonitoringThread, ...@@ -62,7 +62,7 @@ class TestPeriodicMonitoringThread(_AbstractTestPeriodicMonitoringThread,
def test_constructor(self): def test_constructor(self):
self.assertEqual(0xDEADBEEF, self.pmt.monitor_thread_ident) self.assertEqual(0xDEADBEEF, self.pmt.monitor_thread_ident)
self.assertEqual(gettrace(), self.pmt.greenlet_trace) self.assertEqual(gettrace(), self.pmt._greenlet_tracer)
def test_hub_wref(self): def test_hub_wref(self):
self.assertIs(self.hub, self.pmt.hub) self.assertIs(self.hub, self.pmt.hub)
...@@ -178,31 +178,31 @@ class TestPeriodicMonitorBlocking(_AbstractTestPeriodicMonitoringThread, ...@@ -178,31 +178,31 @@ class TestPeriodicMonitorBlocking(_AbstractTestPeriodicMonitoringThread,
settrace(f) settrace(f)
self.pmt = monitor.PeriodicMonitoringThread(self.hub) self.pmt = monitor.PeriodicMonitoringThread(self.hub)
self.assertEqual(gettrace(), self.pmt.greenlet_trace) self.assertEqual(gettrace(), self.pmt._greenlet_tracer)
self.assertIs(self.pmt.previous_trace_function, f) self.assertIs(self.pmt._greenlet_tracer.previous_trace_function, f)
self.pmt.greenlet_trace('event', 'args') self.pmt._greenlet_tracer('event', 'args')
self.assertEqual([('event', 'args')], called) self.assertEqual([('event', 'args')], called)
def test_greenlet_trace(self): def test__greenlet_tracer(self):
self.assertEqual(0, self.pmt._greenlet_switch_counter) self.assertEqual(0, self.pmt._greenlet_tracer.greenlet_switch_counter)
# Unknown event still counts as a switch (should it?) # Unknown event still counts as a switch (should it?)
self.pmt.greenlet_trace('unknown', None) self.pmt._greenlet_tracer('unknown', None)
self.assertEqual(1, self.pmt._greenlet_switch_counter) self.assertEqual(1, self.pmt._greenlet_tracer.greenlet_switch_counter)
self.assertIsNone(self.pmt._active_greenlet) self.assertIsNone(self.pmt._greenlet_tracer.active_greenlet)
origin = object() origin = object()
target = object() target = object()
self.pmt.greenlet_trace('switch', (origin, target)) self.pmt._greenlet_tracer('switch', (origin, target))
self.assertEqual(2, self.pmt._greenlet_switch_counter) self.assertEqual(2, self.pmt._greenlet_tracer.greenlet_switch_counter)
self.assertIs(target, self.pmt._active_greenlet) self.assertIs(target, self.pmt._greenlet_tracer.active_greenlet)
# Unknown event removes active greenlet # Unknown event removes active greenlet
self.pmt.greenlet_trace('unknown', self) self.pmt._greenlet_tracer('unknown', self)
self.assertEqual(3, self.pmt._greenlet_switch_counter) self.assertEqual(3, self.pmt._greenlet_tracer.greenlet_switch_counter)
self.assertIsNone(self.pmt._active_greenlet) self.assertIsNone(self.pmt._greenlet_tracer.active_greenlet)
def test_monitor_blocking(self): def test_monitor_blocking(self):
# Initially there's no active greenlet and no switches, # Initially there's no active greenlet and no switches,
...@@ -218,7 +218,7 @@ class TestPeriodicMonitorBlocking(_AbstractTestPeriodicMonitoringThread, ...@@ -218,7 +218,7 @@ class TestPeriodicMonitorBlocking(_AbstractTestPeriodicMonitoringThread,
# Give it an active greenlet # Give it an active greenlet
origin = object() origin = object()
target = object() target = object()
self.pmt.greenlet_trace('switch', (origin, target)) self.pmt._greenlet_tracer('switch', (origin, target))
# We've switched, so we're not blocked # We've switched, so we're not blocked
self.assertFalse(self.pmt.monitor_blocking(self.hub)) self.assertFalse(self.pmt.monitor_blocking(self.hub))
......
...@@ -7,6 +7,7 @@ from __future__ import division ...@@ -7,6 +7,7 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
import gc import gc
import unittest
import greentest import greentest
...@@ -208,5 +209,64 @@ class TestTree(greentest.TestCase): ...@@ -208,5 +209,64 @@ class TestTree(greentest.TestCase):
self.assertEqual(expected, value) self.assertEqual(expected, value)
class TestAssertSwitches(unittest.TestCase):
def test_time_sleep(self):
# A real blocking function
from time import sleep
with self.assertRaises(util._FailedToSwitch):
with util.assert_switches():
sleep(0.001)
# Supply a max allowed and exceed it
with self.assertRaises(util._FailedToSwitch):
with util.assert_switches(0.001):
sleep(0.1)
# Stay within it, but don't switch to the hub
with self.assertRaises(util._FailedToSwitch):
with util.assert_switches(0.001, hub_only=True):
sleep(0)
# Stay within it, and we only watch for any switch
with util.assert_switches(0.001, hub_only=False):
sleep(0)
def test_no_switches_no_function(self):
# No blocking time given, no switch performed: exception
with self.assertRaises(util._FailedToSwitch):
with util.assert_switches():
pass
# blocking time given, for all greenlets, no switch performed: nothing
with util.assert_switches(max_blocking_time=1, hub_only=False):
pass
def test_exception_not_supressed(self):
with self.assertRaises(NameError):
with util.assert_switches():
raise NameError()
def test_nested(self):
from greenlet import gettrace
with util.assert_switches() as outer:
self.assertEqual(gettrace(), outer.tracer)
self.assertIsNotNone(outer.tracer.active_greenlet)
with util.assert_switches() as inner:
self.assertEqual(gettrace(), inner.tracer)
self.assertEqual(inner.tracer.previous_trace_function, outer.tracer)
inner.tracer('switch', (self, self))
self.assertIs(self, inner.tracer.active_greenlet)
self.assertIs(self, outer.tracer.active_greenlet)
self.assertEqual(gettrace(), outer.tracer)
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment