Commit 9e8efe72 authored by Jason Madden's avatar Jason Madden

Add support for a background monitoring thread to be associated with each hub.

Right now, it is used to detect blocked event loops, and it is extensible by users. In the future there will be some more default monitoring options (e.g., memory).

Refs #1021.
parent 61a5f2ef
......@@ -23,6 +23,21 @@
- Add additional optimizations for spawning greenlets, making it
faster than 1.3a2.
- Add an optional monitoring thread for each hub. When enabled, this
thread (by default) looks for greenlets that block the event loop
for more than 0.1s. You can add your own periodic monitoring
functions to this thread.
- When gevent prints a timestamp as part of an error message, it is
now in UTC format as specified by RFC3339.
- Threadpool threads that exit now always destroy their hub (if one
was created). This prevents some forms of resource leaks (notably
visible as blocking functions reported by the new monitoring abilities).
- Hub objects now include the value of their ``name`` attribute in
their repr.
1.3a2 (2018-03-06)
==================
......
......@@ -23,6 +23,7 @@ if PY3:
integer_types = (int,)
text_type = str
native_path_types = (str, bytes)
thread_mod_name = '_thread'
else:
import __builtin__ # pylint:disable=import-error
......@@ -30,6 +31,7 @@ else:
text_type = __builtin__.unicode
integer_types = (int, __builtin__.long)
native_path_types = string_types
thread_mod_name = 'thread'
## Exceptions
......
......@@ -265,6 +265,30 @@ class ImportableSetting(object):
return value
return self._import([self.shortname_map.get(x, x) for x in value])
class BoolSettingMixin(object):
validate = staticmethod(validate_bool)
# Don't do string-to-list conversion.
_convert = staticmethod(convert_str_value_as_is)
class IntSettingMixin(object):
# Don't do string-to-list conversion.
def _convert(self, value):
if value:
return int(value)
validate = staticmethod(validate_anything)
class FloatSettingMixin(object):
def _convert(self, value):
if value:
return float(value)
def validate(self, value):
if value is not None and value <= 0:
raise ValueError("Must be > 0")
return value
class Resolver(ImportableSetting, Setting):
......@@ -364,7 +388,7 @@ class FileObject(ImportableSetting, Setting):
}
class WatchChildren(Setting):
class WatchChildren(BoolSettingMixin, Setting):
desc = """\
Should we *not* watch children with the event loop watchers?
......@@ -376,14 +400,11 @@ class WatchChildren(Setting):
environment_key = 'GEVENT_NOWAITPID'
default = False
validate = staticmethod(validate_bool)
class TraceMalloc(Setting):
class TraceMalloc(IntSettingMixin, Setting):
name = 'trace_malloc'
environment_key = 'PYTHONTRACEMALLOC'
default = False
validate = staticmethod(validate_bool)
desc = """\
Should FFI objects track their allocation?
......@@ -399,15 +420,11 @@ class TraceMalloc(Setting):
"""
class TrackGreenletTree(Setting):
class TrackGreenletTree(BoolSettingMixin, Setting):
name = 'track_greenlet_tree'
environment_key = 'GEVENT_TRACK_GREENLET_TREE'
default = True
validate = staticmethod(validate_bool)
# Don't do string-to-list conversion.
_convert = staticmethod(convert_str_value_as_is)
desc = """\
Should `Greenlet` objects track their spawning tree?
......@@ -419,6 +436,57 @@ class TrackGreenletTree(Setting):
.. versionadded:: 1.3b1
"""
class MonitorThread(BoolSettingMixin, Setting):
name = 'monitor_thread'
environment_key = 'GEVENT_ENABLE_MONITOR_THREAD'
default = False
desc = """\
Should each hub start a native OS thread to monitor
for problems?
Such a thread will periodically check to see if the event loop
is blocked for longer than `max_blocking_time`, producing output on
the hub's exception stream (stderr by default) if it detects this condition.
If this setting is true, then this thread will be created
the first time the hub is switched to,
or you can call `gevent.hub.Hub.start_periodic_monitoring_thread` at any
time to create it (from the same thread that will run the hub). That function
will return an object with a method ``add_monitoring_function(function, period)``
that you can call to add your own periodic monitoring function. ``function``
will be called with one argument, the hub it is monitoring. It will be called
in a separate native thread than the one running the hub and **must not**
attempt to use the gevent asynchronous API.
.. seealso:: `max_blocking_time`
.. versionadded:: 1.3b1
"""
class MaxBlockingTime(FloatSettingMixin, Setting):
name = 'max_blocking_time'
environment_key = 'GEVENT_MAX_BLOCKING_TIME'
default = 0.1
desc = """\
If the `monitor_thread` is enabled, this is
approximately how long (in seconds)
the event loop will be allowed to block before a warning is issued.
This function depends on using `greenlet.settrace`, so installing
your own trace function after starting the monitoring thread will
cause this feature to misbehave unless you call the function
returned by `greenlet.settrace`. If you install a tracing function *before*
the monitoring thread is started, it will still be called.
.. note:: In the unlikely event of creating and using multiple different
gevent hubs in the same native thread in a short period of time,
especially without destroying the hubs, false positives may be reported.
.. versionadded:: 1.3b1
"""
# The ares settings are all interpreted by
# gevent/resolver/ares.pyx, so we don't do
# any validation here.
......@@ -540,7 +608,7 @@ class ResolverNameservers(AresSettingMixin, Setting):
return 'servers'
# Generic timeout, works for dnspython and ares
class ResolverTimeout(AresSettingMixin, Setting):
class ResolverTimeout(FloatSettingMixin, AresSettingMixin, Setting):
document = True
name = 'resolver_timeout'
environment_key = 'GEVENT_RESOLVER_TIMEOUT'
......@@ -552,10 +620,6 @@ class ResolverTimeout(AresSettingMixin, Setting):
.. versionadded:: 1.3a2
"""
def _convert(self, value):
if value:
return float(value)
@property
def kwarg_name(self):
return 'timeout'
......
......@@ -83,7 +83,7 @@ class Condition(object):
self.__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
waiter.acquire()
waiter.acquire() # Block on the native lock
finally:
self._acquire_restore(saved_state)
......
This diff is collapsed.
......@@ -197,13 +197,18 @@ class ThreadPool(GroupMappingMixin):
with _lock:
self._size -= 1
_destroy_worker_hub = False
# XXX: This used to be false by default. It really seems like
# it should be true to avoid leaking resources.
_destroy_worker_hub = True
def _worker(self):
# pylint:disable=too-many-branches
need_decrease = True
try:
while 1: # tiny bit faster than True on Py2
h = _get_hub()
if h is not None:
h.name = 'ThreadPool Worker Hub'
task_queue = self.task_queue
task = task_queue.get()
try:
......
......@@ -13,7 +13,6 @@ import traceback
from greenlet import getcurrent
from greenlet import greenlet as RawGreenlet
from gevent.local import all_local_dicts_for_greenlet
__all__ = [
'wrap_errors',
......@@ -76,8 +75,10 @@ class wrap_errors(object):
def __getattr__(self, name):
return getattr(self.__func, name)
def format_run_info():
def format_run_info(_current_thread_ident=None):
"""
format_run_info() -> []
Request information about the running threads of the current process.
This is a debugging utility. Its output has no guarantees other than being
......@@ -85,7 +86,10 @@ def format_run_info():
:return: A sequence of text lines detailing the stacks of running
threads and greenlets. (One greenlet will duplicate one thread,
the current thread and greenlet.) Extra information about
the current thread and greenlet. If there are multiple running threads,
the stack for the current greenlet may be incorrectly duplicated in multiple
greenlets.)
Extra information about
:class:`gevent.greenlet.Greenlet` object will also be returned.
.. versionadded:: 1.3a1
......@@ -97,24 +101,30 @@ def format_run_info():
lines = []
_format_thread_info(lines)
_format_thread_info(lines, _current_thread_ident)
_format_greenlet_info(lines)
return lines
def _format_thread_info(lines):
def _format_thread_info(lines, current_thread_ident):
import threading
import sys
threads = {th.ident: th.name for th in threading.enumerate()}
threads = {th.ident: th for th in threading.enumerate()}
lines.append('*' * 80)
lines.append('* Threads')
thread = None
frame = None
for thread, frame in sys._current_frames().items():
for thread_ident, frame in sys._current_frames().items():
lines.append("*" * 80)
lines.append('Thread 0x%x (%s)\n' % (thread, threads.get(thread)))
thread = threads.get(thread_ident)
name = thread.name if thread else None
if getattr(thread, 'gevent_monitoring_thread', None):
name = repr(thread.gevent_monitoring_thread())
if current_thread_ident == thread_ident:
name = '%s) (CURRENT' % (name,)
lines.append('Thread 0x%x (%s)\n' % (thread_ident, name))
lines.append(''.join(traceback.format_stack(frame)))
# We may have captured our own frame, creating a reference
......@@ -288,6 +298,9 @@ class GreenletTree(object):
return (getattr(greenlet, 'spawning_greenlet', None) or _noop)()
def __render_locals(self, tree):
# Defer the import to avoid cycles
from gevent.local import all_local_dicts_for_greenlet
gr_locals = all_local_dicts_for_greenlet(self.greenlet)
if gr_locals:
tree.child_data("Greenlet Locals:")
......@@ -309,9 +322,11 @@ class GreenletTree(object):
label += '; not running'
tree.node_label(label)
if self.greenlet.parent is not None:
tree.child_data('Parent: ' + repr(self.greenlet.parent))
if getattr(self.greenlet, 'gevent_monitoring_thread', None) is not None:
tree.child_data('Monitoring Thread:' + repr(self.greenlet.gevent_monitoring_thread()))
if self.greenlet and tree.details and tree.details['stacks']:
self.__render_tb(tree, 'Running:', self.greenlet.gr_frame)
......
......@@ -26,6 +26,7 @@ import re
import gevent
from gevent import socket
from gevent.hub import Waiter, get_hub
from gevent._compat import PYPY
DELAY = 0.1
......@@ -115,5 +116,57 @@ class TestWaiter(greentest.TestCase):
g.kill()
class TestPeriodicMonitoringThread(greentest.TestCase):
def setUp(self):
super(TestPeriodicMonitoringThread, self).setUp()
self.monitor_thread = gevent.config.monitor_thread
gevent.config.monitor_thread = True
self.monitor_fired = 0
def tearDown(self):
if not self.monitor_thread and get_hub().periodic_monitoring_thread:
# If it was true, nothing to do. If it was false, tear things down.
get_hub().periodic_monitoring_thread.kill()
get_hub().periodic_monitoring_thread = None
gevent.config.monitor_thread = self.monitor_thread
def _monitor(self, _hub):
self.monitor_fired += 1
def test_config(self):
self.assertEqual(0.1, gevent.config.max_blocking_time)
def test_blocking(self):
import io
hub = get_hub()
monitor = hub.start_periodic_monitoring_thread()
self.assertIsNotNone(monitor)
before_funs = monitor._additional_monitoring_functions
monitor.add_monitoring_function(self._monitor, 0)
self.assertIn((self._monitor, 0), monitor.monitoring_functions())
# We must make sure we have switched greenlets at least once,
# otherwise we can't detect a failure.
gevent.sleep(0.01)
stream = hub.exception_stream = io.BytesIO() if str is bytes else io.StringIO()
assert hub.exception_stream is stream
try:
time.sleep(0.3) # Thrice the default; PyPy is very slow to format stacks
# XXX: This is racy even on CPython
finally:
monitor._additional_monitoring_functions = before_funs
assert hub.exception_stream is stream
del hub.exception_stream
if not PYPY:
# PyPy may still be formatting the stacks in the other thread.
self.assertGreaterEqual(self.monitor_fired, 1)
data = stream.getvalue()
self.assertIn('appears to be blocked', data)
self.assertIn('PeriodicMonitoringThread', data)
if __name__ == '__main__':
greentest.main()
......@@ -135,31 +135,32 @@ class TestTree(greentest.TestCase):
self.maxDiff = None
expected = """\
<greenlet.greenlet object at X>
: Parent: None
: Greenlet Locals:
: Local <class '__main__.MyLocal'> at X
: {'foo': 42}
+--- <QuietHub at X default default pending=0 ref=0>
+--- <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <greenlet.greenlet object at X>
+--- <Greenlet "Greenlet-1" at X: _run>; finished with value <Greenlet "Greenlet-0" at X
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "Greenlet-0" at X: _run>; finished with exception ExpectedException()
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-2" at X: _run>; finished with value <Greenlet "Greenlet-4" at X
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "Greenlet-4" at X: _run>; finished with exception ExpectedException()
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-3" at X: _run>; finished with value <Greenlet "Greenlet-5" at X
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Spawn Tree Locals
: {'stl': 'STL'}
| +--- <Greenlet "Greenlet-5" at X: _run>; finished with value <Greenlet "Greenlet-6" at X
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "Greenlet-6" at X: _run>; finished with exception ExpectedException()
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-7" at X: _run>; finished with value <gevent.util.GreenletTree obje
Parent: <QuietHub at X default default pending=0 ref=0>
Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
""".strip()
self.assertEqual(value, expected)
self.assertEqual(expected, value)
@greentest.ignores_leakcheck
def test_tree_no_track(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment