Commit 3b12c0e8 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1184 from gevent/issue1182

Add gevent.util.assert_switches 
parents f187fd05 5f33d756
......@@ -26,6 +26,9 @@
- The long-deprecated and undocumented module ``gevent.wsgi`` was removed.
- Add `gevent.util.assert_switches` to build on the monitoring
functions. Fixes :issue:`1182`.
1.3b1 (2018-04-13)
==================
......
......@@ -27,6 +27,9 @@ interval. When such a blocking greenlet is detected, it will print
:attr:`~gevent.hub.Hub.exception_stream`. It will also emit the
:class:`gevent.events.EventLoopBlocked` event.
.. seealso:: :func:`gevent.util.assert_switches`
For a scoped version of this.
Memory Usage
------------
......
......@@ -50,6 +50,149 @@ except ImportError:
class MonitorWarning(RuntimeWarning):
"""The type of warnings we emit."""
class GreenletTracer(object):
# A counter, incremented by the greenlet trace function
# we install on every greenlet switch. This is reset when the
# periodic monitoring thread runs.
greenlet_switch_counter = 0
# The greenlet last switched to.
active_greenlet = None
# The trace function that was previously installed,
# if any.
previous_trace_function = None
def __init__(self):
prev_trace = settrace(self)
self.previous_trace_function = prev_trace
def kill(self): # pylint:disable=method-hidden
# Must be called in the monitored thread.
settrace(self.previous_trace_function)
self.previous_trace_function = None
# Become a no-op
self.kill = lambda: None
def __call__(self, event, args):
# This function runs in the thread we are monitoring.
self.greenlet_switch_counter += 1
if event in ('switch', 'throw'):
# args is (origin, target). This is the only defined
# case
self.active_greenlet = args[1]
else:
self.active_greenlet = None
if self.previous_trace_function is not None:
self.previous_trace_function(event, args)
def did_block_hub(self, hub):
# Check to see if we have blocked since the last call to this
# method. Returns a true value if we blocked (not in the hub),
# a false value if everything is fine.
# This may be called in the same thread being traced or a
# different thread; if a different thread, there is a race
# condition with this being incremented in the thread we're
# monitoring, but probably not often enough to lead to
# annoying false positives.
active_greenlet = self.active_greenlet
did_switch = self.greenlet_switch_counter != 0
self.greenlet_switch_counter = 0
if did_switch or active_greenlet is None or active_greenlet is hub:
# Either we switched, or nothing is running (we got a
# trace event we don't know about or were requested to
# ignore), or we spent the whole time in the hub, blocked
# for IO. Nothing to report.
return False
return True, active_greenlet
def ignore_current_greenlet_blocking(self):
# Don't pay attention to the current greenlet.
self.active_greenlet = None
def monitor_current_greenlet_blocking(self):
self.active_greenlet = getcurrent()
def did_block_hub_report(self, hub, active_greenlet, format_kwargs):
report = ['=' * 80,
'\n%s : Greenlet %s appears to be blocked' %
(gmctime(), active_greenlet)]
report.append(" Reported by %s" % (self,))
try:
frame = sys._current_frames()[hub.thread_ident]
except KeyError:
# The thread holding the hub has died. Perhaps we shouldn't
# even report this?
stack = ["Unknown: No thread found for hub %r\n" % (hub,)]
else:
stack = traceback.format_stack(frame)
report.append('Blocked Stack (for thread id %s):' % (hex(hub.thread_ident),))
report.append(''.join(stack))
report.append("Info:")
report.extend(format_run_info(**format_kwargs))
return report
class _HubTracer(GreenletTracer):
def __init__(self, hub, max_blocking_time):
GreenletTracer.__init__(self)
self.max_blocking_time = max_blocking_time
self.hub = hub
def kill(self): # pylint:disable=method-hidden
self.hub = None
GreenletTracer.kill(self)
class HubSwitchTracer(_HubTracer):
# A greenlet tracer that records the last time we switched *into* the hub.
last_entered_hub = 0
def __call__(self, event, args):
GreenletTracer.__call__(self, event, args)
if self.active_greenlet is self.hub:
self.last_entered_hub = perf_counter()
def did_block_hub(self, hub):
if perf_counter() - self.last_entered_hub > self.max_blocking_time:
return True, self.active_greenlet
class MaxSwitchTracer(_HubTracer):
# A greenlet tracer that records the maximum time between switches,
# not including time spent in the hub.
max_blocking = 0
def __init__(self, hub, max_blocking_time):
_HubTracer.__init__(self, hub, max_blocking_time)
self.last_switch = perf_counter()
def __call__(self, event, args):
old_active = self.active_greenlet
GreenletTracer.__call__(self, event, args)
if old_active is not self.hub and old_active is not None:
# If we're switching out of the hub, the blocking
# time doesn't count.
switched_at = perf_counter()
self.max_blocking = max(self.max_blocking,
switched_at - self.last_switch)
def did_block_hub(self, hub):
if self.max_blocking == 0:
# We never switched. Check the time now
self.max_blocking = perf_counter() - self.last_switch
if self.max_blocking > self.max_blocking_time:
return True, self.active_greenlet
class _MonitorEntry(object):
__slots__ = ('function', 'period', 'last_run_time')
......@@ -65,25 +208,16 @@ class _MonitorEntry(object):
def __repr__(self):
return repr((self.function, self.period, self.last_run_time))
@implementer(IPeriodicMonitorThread)
class PeriodicMonitoringThread(object):
# This doesn't extend threading.Thread because that gets monkey-patched.
# We use the low-level 'start_new_thread' primitive instead.
# The amount of seconds we will sleep when we think we have nothing
# to do.
inactive_sleep_time = 2.0
# A counter, incremented by the greenlet trace function
# we install on every greenlet switch. This is reset when the
# periodic monitoring thread runs.
_greenlet_switch_counter = 0
# The greenlet being switched to.
_active_greenlet = None
# The trace function that was previously installed,
# if any.
previous_trace_function = None
# The absolute minimum we will sleep, regardless of
# what particular monitoring functions want to say.
min_sleep_time = 0.005
......@@ -104,6 +238,9 @@ class PeriodicMonitoringThread(object):
# to 0 when we go back below.
_memory_exceeded = 0
# The instance of GreenletTracer we're using
_greenlet_tracer = None
def __init__(self, hub):
self._hub_wref = wref(hub, self._on_hub_gc)
self.should_run = True
......@@ -111,8 +248,7 @@ class PeriodicMonitoringThread(object):
# Must be installed in the thread that the hub is running in;
# the trace function is threadlocal
assert get_thread_ident() == hub.thread_ident
prev_trace = settrace(self.greenlet_trace)
self.previous_trace_function = prev_trace
self._greenlet_tracer = GreenletTracer()
self._monitoring_functions = [_MonitorEntry(self.monitor_blocking,
GEVENT_CONFIG.max_blocking_time)]
......@@ -125,17 +261,6 @@ class PeriodicMonitoringThread(object):
def hub(self):
return self._hub_wref()
def greenlet_trace(self, event, args):
# This function runs in the thread we are monitoring.
self._greenlet_switch_counter += 1
if event in ('switch', 'throw'):
# args is (origin, target). This is the only defined
# case
self._active_greenlet = args[1]
else:
self._active_greenlet = None
if self.previous_trace_function is not None:
self.previous_trace_function(event, args)
def monitoring_functions(self):
# Return a list of _MonitorEntry objects
......@@ -186,8 +311,7 @@ class PeriodicMonitoringThread(object):
# Stop this monitoring thread from running.
self.should_run = False
# Uninstall our tracing hook
settrace(self.previous_trace_function)
self.previous_trace_function = None
self._greenlet_tracer.kill()
def _on_hub_gc(self, _):
self.kill()
......@@ -246,38 +370,15 @@ class PeriodicMonitoringThread(object):
# For tests, we return a true value when we think we found something
# blocking
# There is a race condition with this being incremented in the
# thread we're monitoring, but probably not often enough to lead
# to annoying false positives.
active_greenlet = self._active_greenlet
did_switch = self._greenlet_switch_counter != 0
self._greenlet_switch_counter = 0
if did_switch or active_greenlet is None or active_greenlet is hub:
# Either we switched, or nothing is running (we got a
# trace event we don't know about or were requested to
# ignore), or we spent the whole time in the hub, blocked
# for IO. Nothing to report.
did_block = self._greenlet_tracer.did_block_hub(hub)
if not did_block:
return
report = ['=' * 80,
'\n%s : Greenlet %s appears to be blocked' %
(gmctime(), active_greenlet)]
report.append(" Reported by %s" % (self,))
try:
frame = sys._current_frames()[hub.thread_ident]
except KeyError:
# The thread holding the hub has died. Perhaps we shouldn't
# even report this?
stack = ["Unknown: No thread found for hub %r\n" % (hub,)]
else:
stack = traceback.format_stack(frame)
report.append('Blocked Stack (for thread id %s):' % (hex(hub.thread_ident),))
report.append(''.join(stack))
report.append("Info:")
report.extend(format_run_info(greenlet_stacks=False,
current_thread_ident=self.monitor_thread_ident))
report.append(report[0])
active_greenlet = did_block[1]
report = self._greenlet_tracer.did_block_hub_report(
hub, active_greenlet,
dict(greenlet_stacks=False, current_thread_ident=self.monitor_thread_ident))
stream = hub.exception_stream
for line in report:
# Printing line by line may interleave with other things,
......@@ -289,11 +390,10 @@ class PeriodicMonitoringThread(object):
return (active_greenlet, report)
def ignore_current_greenlet_blocking(self):
# Don't pay attention to the current greenlet.
self._active_greenlet = None
self._greenlet_tracer.ignore_current_greenlet_blocking()
def monitor_current_greenlet_blocking(self):
self._active_greenlet = getcurrent()
self._greenlet_tracer.monitor_current_greenlet_blocking()
def can_monitor_memory_usage(self):
return Process is not None
......
......@@ -23,6 +23,7 @@ __all__ = [
'print_run_info',
'GreenletTree',
'wrap_errors',
'assert_switches',
]
# PyPy is very slow at formatting stacks
......@@ -507,3 +508,85 @@ class GreenletTree(object):
Returns the `GreenletTree` for the current thread.
"""
return cls._forest()[1]
class _FailedToSwitch(AssertionError):
pass
class assert_switches(object):
"""
A context manager for ensuring a block of code switches greenlets.
This performs a similar function as the :doc:`monitoring thread
</monitoring>`, but the scope is limited to the body of the with
statement. If the code within the body doesn't yield to the hub
(and doesn't raise an exception), then upon exiting the
context manager an :exc:`AssertionError` will be raised.
This is useful in unit tests and for debugging purposes.
:keyword float max_blocking_time: If given, the body is allowed
to block for up to this many fractional seconds before
an error is raised.
:keyword bool hub_only: If True, then *max_blocking_time* only
refers to the amount of time spent between switches into the
hub. If False, then it refers to the maximum time between
*any* switches. If *max_blocking_time* is not given, has no
effect.
Example::
# This will always raise an exception: nothing switched
with assert_switches():
pass
# This will never raise an exception; nothing switched,
# but it happened very fast
with assert_switches(max_blocking_time=1.0):
pass
.. versionadded:: 1.3
"""
hub = None
tracer = None
def __init__(self, max_blocking_time=None, hub_only=False):
self.max_blocking_time = max_blocking_time
self.hub_only = hub_only
def __enter__(self):
from gevent import get_hub
from gevent import _monitor
self.hub = hub = get_hub()
# TODO: We could optimize this to use the GreenletTracer
# installed by the monitoring thread, if there is one.
# As it is, we will chain trace calls back to it.
if not self.max_blocking_time:
self.tracer = _monitor.GreenletTracer()
elif self.hub_only:
self.tracer = _monitor.HubSwitchTracer(hub, self.max_blocking_time)
else:
self.tracer = _monitor.MaxSwitchTracer(hub, self.max_blocking_time)
self.tracer.monitor_current_greenlet_blocking()
return self
def __exit__(self, t, v, tb):
self.tracer.kill()
hub = self.hub; self.hub = None
tracer = self.tracer; self.tracer = None
# Only check if there was no exception raised, we
# don't want to hide anything
if t is not None:
return
did_block = tracer.did_block_hub(hub)
if did_block:
active_greenlet = did_block[1]
report_lines = tracer.did_block_hub_report(hub, active_greenlet, {})
raise _FailedToSwitch('\n'.join(report_lines))
......@@ -50,7 +50,7 @@ class _AbstractTestPeriodicMonitoringThread(object):
def tearDown(self):
monitor.start_new_thread = self._orig_start_new_thread
monitor.thread_sleep = self._orig_thread_sleep
prev = self.pmt.previous_trace_function
prev = self.pmt._greenlet_tracer.previous_trace_function
self.pmt.kill()
assert gettrace() is prev, (gettrace(), prev)
settrace(None)
......@@ -62,7 +62,7 @@ class TestPeriodicMonitoringThread(_AbstractTestPeriodicMonitoringThread,
def test_constructor(self):
self.assertEqual(0xDEADBEEF, self.pmt.monitor_thread_ident)
self.assertEqual(gettrace(), self.pmt.greenlet_trace)
self.assertEqual(gettrace(), self.pmt._greenlet_tracer)
def test_hub_wref(self):
self.assertIs(self.hub, self.pmt.hub)
......@@ -178,31 +178,31 @@ class TestPeriodicMonitorBlocking(_AbstractTestPeriodicMonitoringThread,
settrace(f)
self.pmt = monitor.PeriodicMonitoringThread(self.hub)
self.assertEqual(gettrace(), self.pmt.greenlet_trace)
self.assertIs(self.pmt.previous_trace_function, f)
self.assertEqual(gettrace(), self.pmt._greenlet_tracer)
self.assertIs(self.pmt._greenlet_tracer.previous_trace_function, f)
self.pmt.greenlet_trace('event', 'args')
self.pmt._greenlet_tracer('event', 'args')
self.assertEqual([('event', 'args')], called)
def test_greenlet_trace(self):
self.assertEqual(0, self.pmt._greenlet_switch_counter)
def test__greenlet_tracer(self):
self.assertEqual(0, self.pmt._greenlet_tracer.greenlet_switch_counter)
# Unknown event still counts as a switch (should it?)
self.pmt.greenlet_trace('unknown', None)
self.assertEqual(1, self.pmt._greenlet_switch_counter)
self.assertIsNone(self.pmt._active_greenlet)
self.pmt._greenlet_tracer('unknown', None)
self.assertEqual(1, self.pmt._greenlet_tracer.greenlet_switch_counter)
self.assertIsNone(self.pmt._greenlet_tracer.active_greenlet)
origin = object()
target = object()
self.pmt.greenlet_trace('switch', (origin, target))
self.assertEqual(2, self.pmt._greenlet_switch_counter)
self.assertIs(target, self.pmt._active_greenlet)
self.pmt._greenlet_tracer('switch', (origin, target))
self.assertEqual(2, self.pmt._greenlet_tracer.greenlet_switch_counter)
self.assertIs(target, self.pmt._greenlet_tracer.active_greenlet)
# Unknown event removes active greenlet
self.pmt.greenlet_trace('unknown', self)
self.assertEqual(3, self.pmt._greenlet_switch_counter)
self.assertIsNone(self.pmt._active_greenlet)
self.pmt._greenlet_tracer('unknown', self)
self.assertEqual(3, self.pmt._greenlet_tracer.greenlet_switch_counter)
self.assertIsNone(self.pmt._greenlet_tracer.active_greenlet)
def test_monitor_blocking(self):
# Initially there's no active greenlet and no switches,
......@@ -218,7 +218,7 @@ class TestPeriodicMonitorBlocking(_AbstractTestPeriodicMonitoringThread,
# Give it an active greenlet
origin = object()
target = object()
self.pmt.greenlet_trace('switch', (origin, target))
self.pmt._greenlet_tracer('switch', (origin, target))
# We've switched, so we're not blocked
self.assertFalse(self.pmt.monitor_blocking(self.hub))
......
......@@ -7,6 +7,7 @@ from __future__ import division
from __future__ import print_function
import gc
import unittest
import greentest
......@@ -208,5 +209,64 @@ class TestTree(greentest.TestCase):
self.assertEqual(expected, value)
class TestAssertSwitches(unittest.TestCase):
def test_time_sleep(self):
# A real blocking function
from time import sleep
with self.assertRaises(util._FailedToSwitch):
with util.assert_switches():
sleep(0.001)
# Supply a max allowed and exceed it
with self.assertRaises(util._FailedToSwitch):
with util.assert_switches(0.001):
sleep(0.1)
# Stay within it, but don't switch to the hub
with self.assertRaises(util._FailedToSwitch):
with util.assert_switches(0.001, hub_only=True):
sleep(0)
# Stay within it, and we only watch for any switch
with util.assert_switches(0.001, hub_only=False):
sleep(0)
def test_no_switches_no_function(self):
# No blocking time given, no switch performed: exception
with self.assertRaises(util._FailedToSwitch):
with util.assert_switches():
pass
# blocking time given, for all greenlets, no switch performed: nothing
with util.assert_switches(max_blocking_time=1, hub_only=False):
pass
def test_exception_not_supressed(self):
with self.assertRaises(NameError):
with util.assert_switches():
raise NameError()
def test_nested(self):
from greenlet import gettrace
with util.assert_switches() as outer:
self.assertEqual(gettrace(), outer.tracer)
self.assertIsNotNone(outer.tracer.active_greenlet)
with util.assert_switches() as inner:
self.assertEqual(gettrace(), inner.tracer)
self.assertEqual(inner.tracer.previous_trace_function, outer.tracer)
inner.tracer('switch', (self, self))
self.assertIs(self, inner.tracer.active_greenlet)
self.assertIs(self, outer.tracer.active_greenlet)
self.assertEqual(gettrace(), outer.tracer)
if __name__ == '__main__':
greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment