Commit 34d56d26 authored by Jason Madden's avatar Jason Madden

Move PeriodicMonitoringThread to its own file.

And give it unittest coverage. (Using it as an actual thread didn't
produce coverage metrics because we use 'greenlet' concurrency.)

Add a rudimentary scheduler so that functions that don't want to run
as often as the minimum stay (very roughly) on their period.
parent 6f9e51fe
This diff is collapsed.
......@@ -139,3 +139,10 @@ class readproperty(object):
return self
return self.func(inst)
def gmctime():
"""
Returns the current time as a string in RFC3339 format.
"""
import time
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
......@@ -14,7 +14,7 @@ from weakref import ref as wref
from greenlet import greenlet as RawGreenlet
from greenlet import getcurrent
from greenlet import GreenletExit
from greenlet import settrace
__all__ = [
......@@ -37,10 +37,11 @@ from gevent._compat import thread_mod_name
from gevent._util import _NONE
from gevent._util import readproperty
from gevent._util import Lazy
from gevent._util import gmctime
from gevent._ident import IdentRegistry
from gevent.monkey import get_original
from gevent.util import format_run_info
# These must be the "real" native thread versions,
......@@ -453,174 +454,6 @@ def _config(default, envvar):
hub_ident_registry = IdentRegistry()
def _gmctime():
import time
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
class PeriodicMonitoringThread(object):
# A counter, incremented by the greenlet trace function
# we install on every greenlet switch. This is reset when the
# periodic monitoring thread runs.
_greenlet_switch_counter = 0
# The greenlet being switched to.
_active_greenlet = None
# The trace function that was previously installed,
# if any.
previous_trace_function = None
# The absolute minimum we will sleep, regardless of
# what particular monitoring functions want to say.
min_sleep_time = 0.005
# A list of tuples: [(function(hub), period))]
_additional_monitoring_functions = ()
def __init__(self, hub):
self._hub_wref = wref(hub, self.kill)
self.should_run = True
# Must be installed in the thread that the hub is running in;
# the trace function is threadlocal
assert get_thread_ident() == hub.thread_ident
prev_trace = settrace(self.greenlet_trace)
self.previous_trace_function = prev_trace
# Create the actual monitoring thread. This is effectively a "daemon"
# thread.
self.monitor_thread_ident = get_original(thread_mod_name, 'start_new_thread')(self, ())
def greenlet_trace(self, event, args):
# This function runs in the thread we are monitoring.
self._greenlet_switch_counter += 1
if event in ('switch', 'throw'):
# args is (origin, target). This is the only defined
# case
self._active_greenlet = args[1]
else:
self._active_greenlet = None
if self.previous_trace_function is not None:
self.previous_trace_function(event, args)
def monitoring_functions(self):
# Return a list of tuples: [(function, period)]
# Calculate our hardcoded list every time so that changes to max_blocking_time can
# happen at runtime.
monitoring_functions = [(self.monitor_blocking, GEVENT_CONFIG.max_blocking_time)]
monitoring_functions.extend(self._additional_monitoring_functions)
return monitoring_functions
def add_monitoring_function(self, function, period):
self._additional_monitoring_functions += ((function, period),)
def calculate_sleep_time(self, monitoring_functions):
min_sleep = min(x[1] or 0 for x in monitoring_functions)
return max((min_sleep, self.min_sleep_time))
def kill(self):
# Stop this monitoring thread from running.
self.should_run = False
# Uninstall our tracing hook
settrace(self.previous_trace_function)
def __call__(self):
# The function that runs in the monitoring thread.
# We cannot use threading.current_thread because it would
# create an immortal DummyThread object.
getcurrent().gevent_monitoring_thread = wref(self)
thread_sleep = get_original('time', 'sleep')
try:
while self.should_run:
functions = self.monitoring_functions()
assert functions
sleep_time = self.calculate_sleep_time(functions)
thread_sleep(sleep_time)
# Make sure the hub is still around, and still active,
# and keep it around while we are here.
hub = self._hub_wref()
if not hub:
self.kill()
if self.should_run:
for f, period in functions:
if period:
f(hub)
except SystemExit:
pass
except: # pylint:disable=bare-except
# We're a daemon thread, so swallow any exceptions that get here
# during interpreter shutdown.
if not sys or not sys.stderr:
# Interpreter is shutting down
pass
else:
hub = self._hub_wref()
if hub is not None:
hub.handle_error(self, *sys.exc_info())
def monitor_blocking(self, hub):
# Called periodically to see if the trace function has
# fired to switch greenlets. If not, we will print
# the greenlet tree.
# There is a race condition with this being incremented in the
# thread we're monitoring, but probably not often enough to lead
# to annoying false positives.
did_switch = self._greenlet_switch_counter != 0
self._greenlet_switch_counter = 0
if did_switch or self._active_greenlet is None or isinstance(self._active_greenlet, Hub):
# Either we switched, or nothing is running (we got a
# trace event we don't know about or were requested to
# ignore), or we spent the whole time in the hub, blocked
# for IO. Nothing to report.
return
if (self._active_greenlet is getcurrent()
or getattr(self._active_greenlet, 'gevent_monitoring_thread', None)):
# Ourself or another monitoring thread for the same hub thread.
# This happens if multiple hubs are used in quick succession (probably only in tests)
# Ignore it. (XXX: Maybe we should kill() ourself?)
return
report = ['\n%s : Greenlet %s appears to be blocked' %
(_gmctime(), self._active_greenlet)]
report.append(" Reported by %s" % (self,))
try:
frame = sys._current_frames()[hub.thread_ident]
except KeyError:
# The thread holding the hub has died. Perhaps we shouldn't
# even report this?
stack = ["Unknown: No thread found for hub %r\n" % (hub,)]
else:
stack = traceback.format_stack(frame)
report.append('Blocked Stack (for thread id %s):' % (hex(hub.thread_ident),))
report.append(''.join(stack))
report.append("Info:")
report.extend(format_run_info(self.monitor_thread_ident))
hub.exception_stream.write('\n'.join(report))
def ignore_current_greenlet_blocking(self):
# Don't pay attention to the current greenlet.
self._active_greenlet = None
def monitor_current_greenlet_blocking(self):
self._active_greenlet = getcurrent()
def __repr__(self):
return '<%s at %s in thread %s greenlet %r for %r>' % (
self.__class__.__name__,
hex(id(self)),
hex(self.monitor_thread_ident),
getcurrent(),
self._hub_wref())
class Hub(TrackedRawGreenlet):
"""
A greenlet that runs the event loop.
......@@ -798,7 +631,7 @@ class Hub(TrackedRawGreenlet):
del tb
try:
errstream.write(_gmctime())
errstream.write(gmctime())
errstream.write(' ' if context is not None else '\n')
except: # pylint:disable=bare-except
# Possible not safe to import under certain
......@@ -917,6 +750,7 @@ class Hub(TrackedRawGreenlet):
# if hubs are started and stopped within the thread. This shows up
# in the threadpool tests. The monitoring threads will eventually notice their
# hub object is gone.
from gevent._monitor import PeriodicMonitoringThread
self.periodic_monitoring_thread = PeriodicMonitoringThread(self)
return self.periodic_monitoring_thread
......
......@@ -81,9 +81,11 @@ class wrap_errors(object):
def __getattr__(self, name):
return getattr(self.__func, name)
def format_run_info(_current_thread_ident=None):
def format_run_info(thread_stacks=True,
greenlet_stacks=True,
current_thread_ident=None):
"""
format_run_info() -> []
format_run_info(thread_stacks=True, greenlet_stacks=True) -> [str]
Request information about the running threads of the current process.
......@@ -107,11 +109,11 @@ def format_run_info(_current_thread_ident=None):
lines = []
_format_thread_info(lines, _current_thread_ident)
_format_greenlet_info(lines)
_format_thread_info(lines, thread_stacks, current_thread_ident)
_format_greenlet_info(lines, greenlet_stacks)
return lines
def _format_thread_info(lines, current_thread_ident):
def _format_thread_info(lines, thread_stacks, current_thread_ident):
import threading
import sys
......@@ -131,7 +133,10 @@ def _format_thread_info(lines, current_thread_ident):
if current_thread_ident == thread_ident:
name = '%s) (CURRENT' % (name,)
lines.append('Thread 0x%x (%s)\n' % (thread_ident, name))
lines.append(''.join(traceback.format_stack(frame, _STACK_LIMIT)))
if thread_stacks:
lines.append(''.join(traceback.format_stack(frame, _STACK_LIMIT)))
else:
lines.append('\t...stack elided...')
# We may have captured our own frame, creating a reference
# cycle, so clear it out.
......@@ -140,14 +145,14 @@ def _format_thread_info(lines, current_thread_ident):
del lines
del threads
def _format_greenlet_info(lines):
def _format_greenlet_info(lines, greenlet_stacks):
# Use the gc module to inspect all objects to find the greenlets
# since there isn't a global registry
lines.append('*' * 80)
lines.append('* Greenlets')
lines.append('*' * 80)
for tree in GreenletTree.forest():
lines.extend(tree.format_lines(details=True))
lines.extend(tree.format_lines(details={'running_stacks': greenlet_stacks}))
del lines
......@@ -266,6 +271,12 @@ class GreenletTree(object):
def __getattr__(self, name):
return getattr(self.greenlet, name)
DEFAULT_DETAILS = {
'running_stacks': True,
'spawning_stacks': True,
'locals': True,
}
def format_lines(self, details=True):
"""
Return a sequence of lines for the greenlet tree.
......@@ -277,7 +288,11 @@ class GreenletTree(object):
if not details:
details = {}
else:
details = {'stacks': True, 'locals': True}
details = self.DEFAULT_DETAILS.copy()
else:
params = details
details = self.DEFAULT_DETAILS.copy()
details.update(params)
tree = _TreeFormatter(details, depth=0)
lines = [l[0] if isinstance(l, tuple) else l
for l in self._render(tree)]
......@@ -333,12 +348,12 @@ class GreenletTree(object):
if getattr(self.greenlet, 'gevent_monitoring_thread', None) is not None:
tree.child_data('Monitoring Thread:' + repr(self.greenlet.gevent_monitoring_thread()))
if self.greenlet and tree.details and tree.details['stacks']:
if self.greenlet and tree.details and tree.details['running_stacks']:
self.__render_tb(tree, 'Running:', self.greenlet.gr_frame)
spawning_stack = getattr(self.greenlet, 'spawning_stack', None)
if spawning_stack and tree.details and tree.details['stacks']:
if spawning_stack and tree.details and tree.details['spawning_stacks']:
self.__render_tb(tree, 'Spawned at:', spawning_stack)
spawning_parent = self.__spawning_parent(self.greenlet)
......
# Copyright 2018 gevent contributors. See LICENSE for details.
import gc
import unittest
from greenlet import gettrace
from greenlet import settrace
from gevent.monkey import get_original
from gevent._compat import thread_mod_name
from gevent._compat import NativeStrIO
from gevent import _monitor as monitor
get_ident = get_original(thread_mod_name, 'get_ident')
class MockHub(object):
def __init__(self):
self.thread_ident = get_ident()
self.exception_stream = NativeStrIO()
self.dead = False
def __bool__(self):
return not self.dead
__nonzero__ = __bool__
def handle_error(self, *args): # pylint:disable=unused-argument
raise # pylint:disable=misplaced-bare-raise
class TestPeriodicMonitoringThread(unittest.TestCase):
def setUp(self):
self._orig_start_new_thread = monitor.start_new_thread
self._orig_thread_sleep = monitor.thread_sleep
monitor.thread_sleep = lambda _s: gc.collect() # For PyPy
monitor.start_new_thread = lambda _f, _a: 0xDEADBEEF
self.hub = MockHub()
self.pmt = monitor.PeriodicMonitoringThread(self.hub)
def tearDown(self):
monitor.start_new_thread = self._orig_start_new_thread
monitor.thread_sleep = self._orig_thread_sleep
prev = self.pmt.previous_trace_function
self.pmt.kill()
assert gettrace() is prev, (gettrace(), prev)
settrace(None)
def test_constructor(self):
self.assertEqual(0xDEADBEEF, self.pmt.monitor_thread_ident)
self.assertEqual(gettrace(), self.pmt.greenlet_trace)
def test_hub_wref(self):
self.assertIs(self.hub, self.pmt.hub)
del self.hub
import gc
gc.collect()
self.assertIsNone(self.pmt.hub)
# And it killed itself.
self.assertFalse(self.pmt.should_run)
self.assertIsNone(gettrace())
def test_previous_trace(self):
self.pmt.kill()
self.assertIsNone(gettrace())
called = []
def f(*args):
called.append(args)
settrace(f)
self.pmt = monitor.PeriodicMonitoringThread(self.hub)
self.assertEqual(gettrace(), self.pmt.greenlet_trace)
self.assertIs(self.pmt.previous_trace_function, f)
self.pmt.greenlet_trace('event', 'args')
self.assertEqual([('event', 'args')], called)
def test_greenlet_trace(self):
self.assertEqual(0, self.pmt._greenlet_switch_counter)
# Unknown event still counts as a switch (should it?)
self.pmt.greenlet_trace('unknown', None)
self.assertEqual(1, self.pmt._greenlet_switch_counter)
self.assertIsNone(self.pmt._active_greenlet)
origin = object()
target = object()
self.pmt.greenlet_trace('switch', (origin, target))
self.assertEqual(2, self.pmt._greenlet_switch_counter)
self.assertIs(target, self.pmt._active_greenlet)
# Unknown event removes active greenlet
self.pmt.greenlet_trace('unknown', self)
self.assertEqual(3, self.pmt._greenlet_switch_counter)
self.assertIsNone(self.pmt._active_greenlet)
def test_add_monitoring_function(self):
self.assertRaises(ValueError, self.pmt.add_monitoring_function, None, 1)
self.assertRaises(ValueError, self.pmt.add_monitoring_function, lambda: None, -1)
def f():
pass
# Add
self.pmt.add_monitoring_function(f, 1)
self.assertEqual(2, len(self.pmt.monitoring_functions()))
self.assertEqual(1, self.pmt.monitoring_functions()[1].period)
# Update
self.pmt.add_monitoring_function(f, 2)
self.assertEqual(2, len(self.pmt.monitoring_functions()))
self.assertEqual(2, self.pmt.monitoring_functions()[1].period)
# Remove
self.pmt.add_monitoring_function(f, None)
self.assertEqual(1, len(self.pmt.monitoring_functions()))
def test_calculate_sleep_time(self):
self.assertEqual(
self.pmt.monitoring_functions()[0].period,
self.pmt.calculate_sleep_time())
# Pretend that GEVENT_CONFIG.max_blocking_time was set to 0,
# to disable this monitor.
self.pmt._calculated_sleep_time = 0
self.assertEqual(
self.pmt.inactive_sleep_time,
self.pmt.calculate_sleep_time()
)
# Getting the list of monitoring functions will also
# do this, if it looks like it has changed
self.pmt.monitoring_functions()[0].period = -1
self.pmt._calculated_sleep_time = 0
self.pmt.monitoring_functions()
self.assertEqual(
self.pmt.monitoring_functions()[0].period,
self.pmt.calculate_sleep_time())
self.assertEqual(
self.pmt.monitoring_functions()[0].period,
self.pmt._calculated_sleep_time)
def test_monitor_blocking(self):
# Initially there's no active greenlet and no switches,
# so nothing is considered blocked
self.assertFalse(self.pmt.monitor_blocking(self.hub))
# Give it an active greenlet
origin = object()
target = object()
self.pmt.greenlet_trace('switch', (origin, target))
# We've switched, so we're not blocked
self.assertFalse(self.pmt.monitor_blocking(self.hub))
# Again without switching is a problem.
self.assertTrue(self.pmt.monitor_blocking(self.hub))
# But we can order it not to be a problem
self.pmt.ignore_current_greenlet_blocking()
self.assertFalse(self.pmt.monitor_blocking(self.hub))
# And back again
self.pmt.monitor_current_greenlet_blocking()
self.assertTrue(self.pmt.monitor_blocking(self.hub))
# A bad thread_ident in the hub doesn't mess things up
self.hub.thread_ident = -1
self.assertTrue(self.pmt.monitor_blocking(self.hub))
def test_call_destroyed_hub(self):
# Add a function that destroys the hub so we break out (eventually)
# This clears the wref, which eventually calls kill()
def f(_hub):
_hub = None
self.hub = None
gc.collect()
self.pmt.add_monitoring_function(f, 0.1)
self.pmt()
self.assertFalse(self.pmt.should_run)
def test_call_dead_hub(self):
# Add a function that makes the hub go false (e.g., it quit)
# This causes the function to kill itself.
def f(hub):
hub.dead = True
self.pmt.add_monitoring_function(f, 0.1)
self.pmt()
self.assertFalse(self.pmt.should_run)
def test_call_SystemExit(self):
# breaks the loop
def f(_hub):
raise SystemExit()
self.pmt.add_monitoring_function(f, 0.1)
self.pmt()
def test_call_other_error(self):
class MyException(Exception):
pass
def f(_hub):
raise MyException()
self.pmt.add_monitoring_function(f, 0.1)
with self.assertRaises(MyException):
self.pmt()
if __name__ == '__main__':
unittest.main()
......@@ -159,7 +159,7 @@ class TestPeriodicMonitoringThread(greentest.TestCase):
def test_config(self):
self.assertEqual(0.1, gevent.config.max_blocking_time)
def _run_monitoring_threads(self, monitor):
def _run_monitoring_threads(self, monitor, kill=True):
self.assertTrue(monitor.should_run)
from threading import Condition
cond = Condition()
......@@ -167,20 +167,18 @@ class TestPeriodicMonitoringThread(greentest.TestCase):
def monitor_cond(_hub):
cond.acquire()
# Only run once. Especially helpful on PyPy, where
# formatting stacks is expensive.
monitor.kill()
cond.notifyAll()
cond.release()
if kill:
# Only run once. Especially helpful on PyPy, where
# formatting stacks is expensive.
monitor.kill()
before_funs = monitor._additional_monitoring_functions
monitor.add_monitoring_function(
monitor_cond,
0.01)
monitor.add_monitoring_function(monitor_cond, 0.01)
cond.wait()
cond.release()
monitor._additional_monitoring_functions = before_funs
monitor.add_monitoring_function(monitor_cond, None)
@greentest.ignores_leakcheck
def test_kill_removes_trace(self):
......@@ -197,10 +195,12 @@ class TestPeriodicMonitoringThread(greentest.TestCase):
stream = hub.exception_stream = NativeStrIO()
monitor = hub.start_periodic_monitoring_thread()
self.assertIsNotNone(monitor)
before_funs = monitor._additional_monitoring_functions
self.assertEqual(1, len(monitor.monitoring_functions()))
monitor.add_monitoring_function(self._monitor, 0.1)
self.assertIn((self._monitor, 0.1), monitor.monitoring_functions())
self.assertEqual(2, len(monitor.monitoring_functions()))
self.assertEqual(self._monitor, monitor.monitoring_functions()[1].function)
self.assertEqual(0.1, monitor.monitoring_functions()[1].period)
# We must make sure we have switched greenlets at least once,
# otherwise we can't detect a failure.
......@@ -210,7 +210,8 @@ class TestPeriodicMonitoringThread(greentest.TestCase):
time.sleep(0.3) # Thrice the default
self._run_monitoring_threads(monitor)
finally:
monitor._additional_monitoring_functions = before_funs
monitor.add_monitoring_function(self._monitor, None)
self.assertEqual(1, len(monitor._monitoring_functions))
assert hub.exception_stream is stream
monitor.kill()
del hub.exception_stream
......@@ -295,7 +296,7 @@ class TestPeriodicMonitoringThread(greentest.TestCase):
task = threadpool.spawn(task)
# Now wait until the monitoring threads have run.
self._run_monitoring_threads(worker_monitor)
self._run_monitoring_threads(worker_monitor, kill=False)
# and be sure the task ran
task.get()
worker_monitor.kill()
......
......@@ -113,7 +113,8 @@ class TestTree(greentest.TestCase):
s4.join()
tree = s4.value
return tree, str(tree), tree.format(details={'stacks': False})
return tree, str(tree), tree.format(details={'running_stacks': False,
'spawning_stacks': False})
@greentest.ignores_leakcheck
def test_tree(self):
......
test___monkey_patching.py
test__monkey_ssl_warning.py
test___monitor.py
......@@ -15,3 +15,5 @@ test__socket_timeout.py
test__examples.py
test__issue330.py
test___ident.py
test___config.py
test___monitor.py
......@@ -132,3 +132,4 @@ test__issue639.py
test__issue_728.py
test__refcount_core.py
test__api.py
test__monitor.py
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment