Commit 11707488 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1143 from gevent/monitor-threads

Add support for a background monitoring thread to be associated with each hub.
parents c7491589 34d56d26
......@@ -23,6 +23,21 @@
- Add additional optimizations for spawning greenlets, making it
faster than 1.3a2.
- Add an optional monitoring thread for each hub. When enabled, this
thread (by default) looks for greenlets that block the event loop
for more than 0.1s. You can add your own periodic monitoring
functions to this thread.
- When gevent prints a timestamp as part of an error message, it is
now in UTC format as specified by RFC3339.
- Threadpool threads that exit now always destroy their hub (if one
was created). This prevents some forms of resource leaks (notably
visible as blocking functions reported by the new monitoring abilities).
- Hub objects now include the value of their ``name`` attribute in
their repr.
1.3a2 (2018-03-06)
==================
......
# cython: auto_pickle=False
cdef Timeout
cdef get_hub
from _greenlet cimport get_hub
cdef bint _greenlet_imported
......
......@@ -13,6 +13,9 @@ PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] >= 3
PYPY = hasattr(sys, 'pypy_version_info')
WIN = sys.platform.startswith("win")
LINUX = sys.platform.startswith('linux')
OSX = sys.platform == 'darwin'
PURE_PYTHON = PYPY or os.getenv('PURE_PYTHON')
......@@ -23,6 +26,7 @@ if PY3:
integer_types = (int,)
text_type = str
native_path_types = (str, bytes)
thread_mod_name = '_thread'
else:
import __builtin__ # pylint:disable=import-error
......@@ -30,7 +34,11 @@ else:
text_type = __builtin__.unicode
integer_types = (int, __builtin__.long)
native_path_types = string_types
thread_mod_name = 'thread'
def NativeStrIO():
import io
return io.BytesIO() if str is bytes else io.StringIO()
## Exceptions
if PY3:
......
......@@ -265,6 +265,30 @@ class ImportableSetting(object):
return value
return self._import([self.shortname_map.get(x, x) for x in value])
class BoolSettingMixin(object):
validate = staticmethod(validate_bool)
# Don't do string-to-list conversion.
_convert = staticmethod(convert_str_value_as_is)
class IntSettingMixin(object):
# Don't do string-to-list conversion.
def _convert(self, value):
if value:
return int(value)
validate = staticmethod(validate_anything)
class FloatSettingMixin(object):
def _convert(self, value):
if value:
return float(value)
def validate(self, value):
if value is not None and value <= 0:
raise ValueError("Must be > 0")
return value
class Resolver(ImportableSetting, Setting):
......@@ -364,7 +388,7 @@ class FileObject(ImportableSetting, Setting):
}
class WatchChildren(Setting):
class WatchChildren(BoolSettingMixin, Setting):
desc = """\
Should we *not* watch children with the event loop watchers?
......@@ -376,14 +400,11 @@ class WatchChildren(Setting):
environment_key = 'GEVENT_NOWAITPID'
default = False
validate = staticmethod(validate_bool)
class TraceMalloc(Setting):
class TraceMalloc(IntSettingMixin, Setting):
name = 'trace_malloc'
environment_key = 'PYTHONTRACEMALLOC'
default = False
validate = staticmethod(validate_bool)
desc = """\
Should FFI objects track their allocation?
......@@ -399,15 +420,11 @@ class TraceMalloc(Setting):
"""
class TrackGreenletTree(Setting):
class TrackGreenletTree(BoolSettingMixin, Setting):
name = 'track_greenlet_tree'
environment_key = 'GEVENT_TRACK_GREENLET_TREE'
default = True
validate = staticmethod(validate_bool)
# Don't do string-to-list conversion.
_convert = staticmethod(convert_str_value_as_is)
desc = """\
Should `Greenlet` objects track their spawning tree?
......@@ -419,6 +436,57 @@ class TrackGreenletTree(Setting):
.. versionadded:: 1.3b1
"""
class MonitorThread(BoolSettingMixin, Setting):
name = 'monitor_thread'
environment_key = 'GEVENT_ENABLE_MONITOR_THREAD'
default = False
desc = """\
Should each hub start a native OS thread to monitor
for problems?
Such a thread will periodically check to see if the event loop
is blocked for longer than `max_blocking_time`, producing output on
the hub's exception stream (stderr by default) if it detects this condition.
If this setting is true, then this thread will be created
the first time the hub is switched to,
or you can call `gevent.hub.Hub.start_periodic_monitoring_thread` at any
time to create it (from the same thread that will run the hub). That function
will return an object with a method ``add_monitoring_function(function, period)``
that you can call to add your own periodic monitoring function. ``function``
will be called with one argument, the hub it is monitoring. It will be called
in a separate native thread than the one running the hub and **must not**
attempt to use the gevent asynchronous API.
.. seealso:: `max_blocking_time`
.. versionadded:: 1.3b1
"""
class MaxBlockingTime(FloatSettingMixin, Setting):
name = 'max_blocking_time'
environment_key = 'GEVENT_MAX_BLOCKING_TIME'
default = 0.1
desc = """\
If the `monitor_thread` is enabled, this is
approximately how long (in seconds)
the event loop will be allowed to block before a warning is issued.
This function depends on using `greenlet.settrace`, so installing
your own trace function after starting the monitoring thread will
cause this feature to misbehave unless you call the function
returned by `greenlet.settrace`. If you install a tracing function *before*
the monitoring thread is started, it will still be called.
.. note:: In the unlikely event of creating and using multiple different
gevent hubs in the same native thread in a short period of time,
especially without destroying the hubs, false positives may be reported.
.. versionadded:: 1.3b1
"""
# The ares settings are all interpreted by
# gevent/resolver/ares.pyx, so we don't do
# any validation here.
......@@ -540,7 +608,7 @@ class ResolverNameservers(AresSettingMixin, Setting):
return 'servers'
# Generic timeout, works for dnspython and ares
class ResolverTimeout(AresSettingMixin, Setting):
class ResolverTimeout(FloatSettingMixin, AresSettingMixin, Setting):
document = True
name = 'resolver_timeout'
environment_key = 'GEVENT_RESOLVER_TIMEOUT'
......@@ -552,10 +620,6 @@ class ResolverTimeout(AresSettingMixin, Setting):
.. versionadded:: 1.3a2
"""
def _convert(self, value):
if value:
return float(value)
@property
def kwarg_name(self):
return 'timeout'
......
......@@ -11,7 +11,7 @@ import functools
import sys
from gevent.hub import get_hub
from gevent.hub import _get_hub_noargs as get_hub
from gevent._compat import integer_types
from gevent._compat import reraise
from gevent.lock import Semaphore, DummySemaphore
......
......@@ -75,9 +75,6 @@ cdef inline list _extract_stack(int limit)
@cython.locals(previous=_Frame, frame=tuple, f=_Frame)
cdef _Frame _Frame_from_list(list frames)
@cython.final
cdef inline get_hub()
cdef class Greenlet(greenlet):
cdef readonly object value
......@@ -126,6 +123,12 @@ cdef class Greenlet(greenlet):
# method
cpdef _raise_exception(self)
@cython.final
cdef greenlet get_hub()
# XXX: TODO: Move the definition of TrackedRawGreenlet
# into a file that can be cython compiled so get_hub can
# return that.
# Declare a bunch of imports as cdefs so they can
# be accessed directly as static vars without
# doing a module global lookup. This is especially important
......
# Copyright (c) 2018 gevent. See LICENSE for details.
from __future__ import print_function, absolute_import, division
import sys
import traceback
from weakref import ref as wref
from greenlet import settrace
from greenlet import getcurrent
from gevent import config as GEVENT_CONFIG
from gevent.monkey import get_original
from gevent.util import format_run_info
from gevent._compat import thread_mod_name
from gevent._util import gmctime
# Clocks
try:
# Python 3.3+ (PEP 418)
from time import perf_counter
except ImportError:
import time
if sys.platform == "win32":
perf_counter = time.clock
else:
perf_counter = time.time
__all__ = [
'PeriodicMonitoringThread',
]
get_thread_ident = get_original(thread_mod_name, 'get_ident')
start_new_thread = get_original(thread_mod_name, 'start_new_thread')
thread_sleep = get_original('time', 'sleep')
class _MonitorEntry(object):
__slots__ = ('function', 'period', 'last_run_time')
def __init__(self, function, period):
self.function = function
self.period = period
self.last_run_time = 0
def __eq__(self, other):
return self.function == other.function and self.period == other.period
def __repr__(self):
return repr((self.function, self.period, self.last_run_time))
class PeriodicMonitoringThread(object):
# The amount of seconds we will sleep when we think we have nothing
# to do.
inactive_sleep_time = 2.0
# A counter, incremented by the greenlet trace function
# we install on every greenlet switch. This is reset when the
# periodic monitoring thread runs.
_greenlet_switch_counter = 0
# The greenlet being switched to.
_active_greenlet = None
# The trace function that was previously installed,
# if any.
previous_trace_function = None
# The absolute minimum we will sleep, regardless of
# what particular monitoring functions want to say.
min_sleep_time = 0.005
# A list of _MonitorEntry objects: [(function(hub), period, last_run_time))]
# The first entry is always our entry for self.monitor_blocking
_monitoring_functions = None
# The calculated min sleep time for the monitoring functions list.
_calculated_sleep_time = None
def __init__(self, hub):
self._hub_wref = wref(hub, self._on_hub_gc)
self.should_run = True
# Must be installed in the thread that the hub is running in;
# the trace function is threadlocal
assert get_thread_ident() == hub.thread_ident
prev_trace = settrace(self.greenlet_trace)
self.previous_trace_function = prev_trace
self._monitoring_functions = [_MonitorEntry(self.monitor_blocking,
GEVENT_CONFIG.max_blocking_time)]
self._calculated_sleep_time = GEVENT_CONFIG.max_blocking_time
# Create the actual monitoring thread. This is effectively a "daemon"
# thread.
self.monitor_thread_ident = start_new_thread(self, ())
@property
def hub(self):
return self._hub_wref()
def greenlet_trace(self, event, args):
# This function runs in the thread we are monitoring.
self._greenlet_switch_counter += 1
if event in ('switch', 'throw'):
# args is (origin, target). This is the only defined
# case
self._active_greenlet = args[1]
else:
self._active_greenlet = None
if self.previous_trace_function is not None:
self.previous_trace_function(event, args)
def monitoring_functions(self):
# Return a list of _MonitorEntry objects
# Update max_blocking_time each time.
mbt = GEVENT_CONFIG.max_blocking_time # XXX: Events so we know when this changes.
if mbt != self._monitoring_functions[0].period:
self._monitoring_functions[0].period = mbt
self._calculated_sleep_time = min(x.period for x in self._monitoring_functions)
return self._monitoring_functions
def add_monitoring_function(self, function, period):
"""
Schedule the *function* to be called approximately every *period* fractional seconds.
The *function* receives one argument, the hub being monitored. It is called
in the monitoring thread, *not* the hub thread.
If the *function* is already a monitoring function, then its *period*
will be updated for future runs.
If the *period* is ``None``, then the function will be removed.
A *period* less than or equal to zero is not allowed.
"""
if not callable(function):
raise ValueError("function must be callable")
if period is None:
# Remove.
self._monitoring_functions = [
x for x in self._monitoring_functions
if x.function != function
]
elif period <= 0:
raise ValueError("Period must be positive.")
else:
# Add or update period
entry = _MonitorEntry(function, period)
self._monitoring_functions = [
x if x.function != function else entry
for x in self._monitoring_functions
]
if entry not in self._monitoring_functions:
self._monitoring_functions.append(entry)
self._calculated_sleep_time = min(x.period for x in self._monitoring_functions)
def calculate_sleep_time(self):
min_sleep = self._calculated_sleep_time
if min_sleep <= 0:
# Everyone wants to be disabled. Sleep for a longer period of
# time than usual so we don't spin unnecessarily. We might be
# enabled again in the future.
return self.inactive_sleep_time
return max((min_sleep, self.min_sleep_time))
def kill(self):
if not self.should_run:
# Prevent overwriting trace functions.
return
# Stop this monitoring thread from running.
self.should_run = False
# Uninstall our tracing hook
settrace(self.previous_trace_function)
self.previous_trace_function = None
def _on_hub_gc(self, _):
self.kill()
def __call__(self):
# The function that runs in the monitoring thread.
# We cannot use threading.current_thread because it would
# create an immortal DummyThread object.
getcurrent().gevent_monitoring_thread = wref(self)
try:
while self.should_run:
functions = self.monitoring_functions()
assert functions
sleep_time = self.calculate_sleep_time()
thread_sleep(sleep_time)
# Make sure the hub is still around, and still active,
# and keep it around while we are here.
hub = self.hub
if not hub:
self.kill()
if self.should_run:
this_run = perf_counter()
for entry in functions:
f = entry.function
period = entry.period
last_run = entry.last_run_time
if period and last_run + period <= this_run:
entry.last_run_time = this_run
f(hub)
del hub # break our reference to hub while we sleep
except SystemExit:
pass
except: # pylint:disable=bare-except
# We're a daemon thread, so swallow any exceptions that get here
# during interpreter shutdown.
if not sys or not sys.stderr: # pragma: no cover
# Interpreter is shutting down
pass
else:
hub = self.hub
if hub is not None:
# XXX: This tends to do bad things like end the process, because we
# try to switch *threads*, which can't happen. Need something better.
hub.handle_error(self, *sys.exc_info())
def monitor_blocking(self, hub):
# Called periodically to see if the trace function has
# fired to switch greenlets. If not, we will print
# the greenlet tree.
# For tests, we return a true value when we think we found something
# blocking
# There is a race condition with this being incremented in the
# thread we're monitoring, but probably not often enough to lead
# to annoying false positives.
active_greenlet = self._active_greenlet
did_switch = self._greenlet_switch_counter != 0
self._greenlet_switch_counter = 0
if did_switch or active_greenlet is None or active_greenlet is hub:
# Either we switched, or nothing is running (we got a
# trace event we don't know about or were requested to
# ignore), or we spent the whole time in the hub, blocked
# for IO. Nothing to report.
return
report = ['=' * 80,
'\n%s : Greenlet %s appears to be blocked' %
(gmctime(), active_greenlet)]
report.append(" Reported by %s" % (self,))
try:
frame = sys._current_frames()[hub.thread_ident]
except KeyError:
# The thread holding the hub has died. Perhaps we shouldn't
# even report this?
stack = ["Unknown: No thread found for hub %r\n" % (hub,)]
else:
stack = traceback.format_stack(frame)
report.append('Blocked Stack (for thread id %s):' % (hex(hub.thread_ident),))
report.append(''.join(stack))
report.append("Info:")
report.extend(format_run_info(greenlet_stacks=False,
current_thread_ident=self.monitor_thread_ident))
report.append(report[0])
hub.exception_stream.write('\n'.join(report))
return (active_greenlet, report)
def ignore_current_greenlet_blocking(self):
# Don't pay attention to the current greenlet.
self._active_greenlet = None
def monitor_current_greenlet_blocking(self):
self._active_greenlet = getcurrent()
def __repr__(self):
return '<%s at %s in thread %s greenlet %r for %r>' % (
self.__class__.__name__,
hex(id(self)),
hex(self.monitor_thread_ident),
getcurrent(),
self._hub_wref())
......@@ -2,7 +2,6 @@
from __future__ import print_function, absolute_import, division
import sys
from gevent.hub import get_hub
from gevent.timeout import Timeout
......@@ -11,13 +10,13 @@ __all__ = [
'BoundedSemaphore',
]
# In Cython, we define these as 'cdef inline' functions. The
# In Cython, we define these as 'cdef [inline]' functions. The
# compilation unit cannot have a direct assignment to them (import
# is assignment) without generating a 'lvalue is not valid target'
# error.
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
locals()['get_hub'] = __import__('gevent').get_hub
class Semaphore(object):
"""
......@@ -88,7 +87,8 @@ class Semaphore(object):
# NOTE: Passing the bound method will cause a memory leak on PyPy
# with Cython <= 0.23.3. You must use >= 0.23.4.
# See https://bitbucket.org/pypy/pypy/issues/2149/memory-leak-for-python-subclass-of-cpyext#comment-22371546
self._notifier = get_hub().loop.run_callback(self._notify_links)
hub = get_hub() # pylint:disable=undefined-variable
self._notifier = hub.loop.run_callback(self._notify_links)
def _notify_links(self):
# Subclasses CANNOT override. This is a cdef method.
......@@ -176,7 +176,7 @@ class Semaphore(object):
timer = Timeout._start_new_or_dummy(timeout)
try:
try:
result = get_hub().switch()
result = get_hub().switch() # pylint:disable=undefined-variable
assert result is self, 'Invalid switch into Semaphore.wait/acquire(): %r' % (result, )
except Timeout as ex:
if ex is not timer:
......
......@@ -70,7 +70,7 @@ __imports__.extend(__py3_imports__)
import time
import sys
from gevent.hub import get_hub
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import ConcurrentObjectUseError
from gevent.timeout import Timeout
from gevent._compat import string_types, integer_types, PY3
......
......@@ -83,7 +83,7 @@ class Condition(object):
self.__waiters.append(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
waiter.acquire()
waiter.acquire() # Block on the native lock
finally:
self._acquire_restore(saved_state)
......
......@@ -139,3 +139,10 @@ class readproperty(object):
return self
return self.func(inst)
def gmctime():
"""
Returns the current time as a string in RFC3339 format.
"""
import time
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
......@@ -2,11 +2,16 @@
"""Basic synchronization primitives: Event and AsyncResult"""
from __future__ import print_function
import sys
from gevent.hub import get_hub, getcurrent, _NONE
from gevent._util import _NONE
from gevent._compat import reraise
from gevent._tblib import dump_traceback, load_traceback
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import getcurrent
from gevent.hub import InvalidSwitchError
from gevent.timeout import Timeout
from gevent._tblib import dump_traceback, load_traceback
__all__ = ['Event', 'AsyncResult']
......
......@@ -13,6 +13,7 @@ from gevent._compat import reraise
from gevent._compat import PYPY as _PYPY
from gevent._tblib import dump_traceback
from gevent._tblib import load_traceback
from gevent.hub import GreenletExit
from gevent.hub import InvalidSwitchError
from gevent.hub import Waiter
......@@ -20,6 +21,7 @@ from gevent.hub import _threadlocal
from gevent.hub import get_hub_class
from gevent.hub import iwait
from gevent.hub import wait
from gevent.timeout import Timeout
from gevent._config import config as GEVENT_CONFIG
......@@ -45,6 +47,7 @@ locals()['greenlet_init'] = lambda: None
def get_hub():
# This is identical to gevent.hub._get_hub_noargs so that it
# can be inlined for greenlet spawning by cython.
# It is also cimported in semaphore.pxd
hub = _threadlocal.hub
if hub is None:
hubtype = get_hub_class()
......
......@@ -11,7 +11,10 @@ import sys
import traceback
from weakref import ref as wref
from greenlet import greenlet as RawGreenlet, getcurrent, GreenletExit
from greenlet import greenlet as RawGreenlet
from greenlet import getcurrent
from greenlet import GreenletExit
__all__ = [
......@@ -30,37 +33,35 @@ __all__ = [
from gevent._config import config as GEVENT_CONFIG
from gevent._compat import string_types
from gevent._compat import xrange
from gevent._compat import thread_mod_name
from gevent._util import _NONE
from gevent._util import readproperty
from gevent._util import Lazy
from gevent._util import gmctime
from gevent._ident import IdentRegistry
if sys.version_info[0] <= 2:
import thread # pylint:disable=import-error
else:
import _thread as thread # python 2 pylint:disable=import-error
from gevent.monkey import get_original
# These must be the "real" native thread versions,
# not monkey-patched.
threadlocal = thread._local
class _threadlocal(threadlocal):
# These must be the "real" native thread versions,
# not monkey-patched.
class _Threadlocal(get_original(thread_mod_name, '_local')):
def __init__(self):
# Use a class with an initializer so that we can test
# for 'is None' instead of catching AttributeError, making
# the code cleaner and possibly solving some corner cases
# (like #687)
threadlocal.__init__(self)
super(_Threadlocal, self).__init__()
self.Hub = None
self.loop = None
self.hub = None
_threadlocal = _threadlocal()
_threadlocal = _Threadlocal()
get_ident = thread.get_ident
MAIN_THREAD = get_ident()
get_thread_ident = get_original(thread_mod_name, 'get_ident')
MAIN_THREAD = get_thread_ident() # XXX: Assuming import is done on the main thread.
class LoopExit(Exception):
......@@ -480,9 +481,22 @@ class Hub(TrackedRawGreenlet):
threadpool_size = 10
# An instance of PeriodicMonitoringThread, if started.
periodic_monitoring_thread = None
# The ident of the thread we were created in, which should be the
# thread that we run in.
thread_ident = None
#: A string giving the name of this hub. Useful for associating hubs
#: with particular threads. Printed as part of the default repr.
#:
#: .. versionadded:: 1.3b1
name = ''
def __init__(self, loop=None, default=None):
TrackedRawGreenlet.__init__(self, None, None)
self.thread_ident = get_thread_ident()
if hasattr(loop, 'run'):
if default is not None:
raise TypeError("Unexpected argument: default")
......@@ -493,7 +507,7 @@ class Hub(TrackedRawGreenlet):
# loop. See #237 and #238.
self.loop = _threadlocal.loop
else:
if default is None and get_ident() != MAIN_THREAD:
if default is None and self.thread_ident != MAIN_THREAD:
default = False
if loop is None:
......@@ -516,6 +530,16 @@ class Hub(TrackedRawGreenlet):
def backend(self):
return GEVENT_CONFIG.libev_backend
@property
def main_hub(self):
"""
Is this the hub for the main thread?
.. versionadded:: 1.3b1
"""
return self.thread_ident == MAIN_THREAD
def __repr__(self):
if self.loop is None:
info = 'destroyed'
......@@ -524,11 +548,16 @@ class Hub(TrackedRawGreenlet):
info = self.loop._format()
except Exception as ex: # pylint:disable=broad-except
info = str(ex) or repr(ex) or 'error'
result = '<%s at 0x%x %s' % (self.__class__.__name__, id(self), info)
result = '<%s %r at 0x%x %s' % (
self.__class__.__name__,
self.name,
id(self),
info)
if self._resolver is not None:
result += ' resolver=%r' % self._resolver
if self._threadpool is not None:
result += ' threadpool=%r' % self._threadpool
result += ' thread_ident=%s' % (hex(self.thread_ident), )
return result + '>'
def handle_error(self, context, type, value, tb):
......@@ -602,8 +631,7 @@ class Hub(TrackedRawGreenlet):
del tb
try:
import time
errstream.write(time.ctime())
errstream.write(gmctime())
errstream.write(' ' if context is not None else '\n')
except: # pylint:disable=bare-except
# Possible not safe to import under certain
......@@ -695,7 +723,8 @@ class Hub(TrackedRawGreenlet):
programming error.
"""
assert self is getcurrent(), 'Do not call Hub.run() directly'
while True:
self.start_periodic_monitoring_thread()
while 1:
loop = self.loop
loop.error_handler = self
try:
......@@ -711,6 +740,20 @@ class Hub(TrackedRawGreenlet):
# It is still possible to kill this greenlet with throw. However, in that case
# switching to it is no longer safe, as switch will return immediately
def start_periodic_monitoring_thread(self):
if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread:
# TODO: If we're the main thread, then add the memory monitoring
# function.
# Note that it is possible for one real thread to
# (temporarily) wind up with multiple monitoring threads,
# if hubs are started and stopped within the thread. This shows up
# in the threadpool tests. The monitoring threads will eventually notice their
# hub object is gone.
from gevent._monitor import PeriodicMonitoringThread
self.periodic_monitoring_thread = PeriodicMonitoringThread(self)
return self.periodic_monitoring_thread
def join(self, timeout=None):
"""Wait for the event loop to finish. Exits only when there are
no more spawned greenlets, started servers, active timeouts or watchers.
......@@ -742,6 +785,9 @@ class Hub(TrackedRawGreenlet):
return False
def destroy(self, destroy_loop=None):
if self.periodic_monitoring_thread is not None:
self.periodic_monitoring_thread.kill()
self.periodic_monitoring_thread = None
if self._resolver is not None:
self._resolver.close()
del self._resolver
......@@ -764,6 +810,8 @@ class Hub(TrackedRawGreenlet):
if _threadlocal.hub is self:
_threadlocal.hub = None
# XXX: We can probably simplify the resolver and threadpool properties.
@property
def resolver_class(self):
return GEVENT_CONFIG.resolver
......@@ -777,7 +825,7 @@ class Hub(TrackedRawGreenlet):
self._resolver = value
def _del_resolver(self):
del self._resolver
self._resolver = None
resolver = property(_get_resolver, _set_resolver, _del_resolver)
......@@ -796,7 +844,7 @@ class Hub(TrackedRawGreenlet):
self._threadpool = value
def _del_threadpool(self):
del self._threadpool
self._threadpool = None
threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool)
......
......@@ -45,7 +45,8 @@ from __future__ import absolute_import
import os
import sys
from gevent.hub import get_hub, reinit
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import reinit
from gevent._config import config
from gevent._compat import PY3
from gevent._util import copy_globals
......
......@@ -38,7 +38,9 @@ Full = __queue__.Full
Empty = __queue__.Empty
from gevent.timeout import Timeout
from gevent.hub import get_hub, Waiter, getcurrent
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import Waiter
from gevent.hub import getcurrent
from gevent.hub import InvalidSwitchError
......
......@@ -7,7 +7,7 @@ from __future__ import absolute_import, division, print_function
import sys
from gevent.event import Event
from gevent.hub import get_hub
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import sleep as _g_sleep
from gevent._compat import integer_types
from gevent._compat import iteritems
......
......@@ -100,7 +100,7 @@ def signal(signalnum, handler):
# Note that this conflicts with gevent.subprocess and other users
# of child watchers, until the next time gevent.subprocess/loop.install_sigchld()
# is called.
from gevent import get_hub # Are we always safe to import here?
from gevent.hub import get_hub # Are we always safe to import here?
_signal_signal(signalnum, handler)
get_hub().loop.reset_sigchld()
return old_handler
......
......@@ -40,7 +40,10 @@ import signal
import sys
import traceback
from gevent.event import AsyncResult
from gevent.hub import get_hub, linkproxy, sleep, getcurrent
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import linkproxy
from gevent.hub import sleep
from gevent.hub import getcurrent
from gevent._compat import integer_types, string_types, xrange
from gevent._compat import PY3
from gevent._compat import reraise
......
......@@ -3,7 +3,10 @@ from __future__ import absolute_import
import sys
import os
from gevent._compat import integer_types
from gevent.hub import get_hub, getcurrent, sleep, _get_hub
from gevent.hub import _get_hub_noargs as get_hub
from gevent.hub import getcurrent
from gevent.hub import sleep
from gevent.hub import _get_hub
from gevent.event import AsyncResult
from gevent.greenlet import Greenlet
from gevent.pool import GroupMappingMixin
......@@ -19,6 +22,8 @@ class ThreadPool(GroupMappingMixin):
"""
.. note:: The method :meth:`apply_async` will always return a new
greenlet, bypassing the threadpool entirely.
.. caution:: Instances of this class are only true if they have
unfinished tasks.
"""
def __init__(self, maxsize, hub=None):
......@@ -57,6 +62,8 @@ class ThreadPool(GroupMappingMixin):
def __len__(self):
# XXX just do unfinished_tasks property
# Note that this becomes the boolean value of this class,
# that's probably not what we want!
return self.task_queue.unfinished_tasks
def _get_size(self):
......@@ -162,7 +169,7 @@ class ThreadPool(GroupMappingMixin):
:return: A :class:`gevent.event.AsyncResult`.
"""
while True:
while 1:
semaphore = self._semaphore
semaphore.acquire()
if semaphore is self._semaphore:
......@@ -194,14 +201,29 @@ class ThreadPool(GroupMappingMixin):
with _lock:
self._size -= 1
_destroy_worker_hub = False
# XXX: This used to be false by default. It really seems like
# it should be true to avoid leaking resources.
_destroy_worker_hub = True
def __ignore_current_greenlet_blocking(self, hub):
if hub is not None and hub.periodic_monitoring_thread is not None:
hub.periodic_monitoring_thread.ignore_current_greenlet_blocking()
def _worker(self):
# pylint:disable=too-many-branches
need_decrease = True
try:
while True:
while 1: # tiny bit faster than True on Py2
h = _get_hub()
if h is not None:
h.name = 'ThreadPool Worker Hub'
task_queue = self.task_queue
# While we block, don't let the monitoring thread, if any,
# report us as blocked. Indeed, so long as we never
# try to switch greenlets, don't report us as blocked---
# the threadpool is *meant* to run blocking tasks
self.__ignore_current_greenlet_blocking(h)
task = task_queue.get()
try:
if task is None:
......
......@@ -14,8 +14,12 @@ module add timeouts to arbitrary code.
which no switches occur, :class:`Timeout` is powerless.
"""
from __future__ import absolute_import, print_function, division
from gevent._compat import string_types
from gevent.hub import getcurrent, _NONE, get_hub
from gevent._util import _NONE
from gevent.hub import getcurrent
from gevent.hub import _get_hub_noargs as get_hub
__all__ = [
'Timeout',
......
......@@ -13,7 +13,7 @@ import traceback
from greenlet import getcurrent
from greenlet import greenlet as RawGreenlet
from gevent.local import all_local_dicts_for_greenlet
from gevent._compat import PYPY
__all__ = [
'wrap_errors',
......@@ -21,6 +21,11 @@ __all__ = [
'GreenletTree',
]
# PyPy is very slow at formatting stacks
# for some reason.
_STACK_LIMIT = 20 if PYPY else None
def _noop():
return None
......@@ -76,8 +81,12 @@ class wrap_errors(object):
def __getattr__(self, name):
return getattr(self.__func, name)
def format_run_info():
def format_run_info(thread_stacks=True,
greenlet_stacks=True,
current_thread_ident=None):
"""
format_run_info(thread_stacks=True, greenlet_stacks=True) -> [str]
Request information about the running threads of the current process.
This is a debugging utility. Its output has no guarantees other than being
......@@ -85,7 +94,10 @@ def format_run_info():
:return: A sequence of text lines detailing the stacks of running
threads and greenlets. (One greenlet will duplicate one thread,
the current thread and greenlet.) Extra information about
the current thread and greenlet. If there are multiple running threads,
the stack for the current greenlet may be incorrectly duplicated in multiple
greenlets.)
Extra information about
:class:`gevent.greenlet.Greenlet` object will also be returned.
.. versionadded:: 1.3a1
......@@ -97,25 +109,34 @@ def format_run_info():
lines = []
_format_thread_info(lines)
_format_greenlet_info(lines)
_format_thread_info(lines, thread_stacks, current_thread_ident)
_format_greenlet_info(lines, greenlet_stacks)
return lines
def _format_thread_info(lines):
def _format_thread_info(lines, thread_stacks, current_thread_ident):
import threading
import sys
threads = {th.ident: th.name for th in threading.enumerate()}
threads = {th.ident: th for th in threading.enumerate()}
lines.append('*' * 80)
lines.append('* Threads')
thread = None
frame = None
for thread, frame in sys._current_frames().items():
for thread_ident, frame in sys._current_frames().items():
lines.append("*" * 80)
lines.append('Thread 0x%x (%s)\n' % (thread, threads.get(thread)))
lines.append(''.join(traceback.format_stack(frame)))
thread = threads.get(thread_ident)
name = thread.name if thread else None
if getattr(thread, 'gevent_monitoring_thread', None):
name = repr(thread.gevent_monitoring_thread())
if current_thread_ident == thread_ident:
name = '%s) (CURRENT' % (name,)
lines.append('Thread 0x%x (%s)\n' % (thread_ident, name))
if thread_stacks:
lines.append(''.join(traceback.format_stack(frame, _STACK_LIMIT)))
else:
lines.append('\t...stack elided...')
# We may have captured our own frame, creating a reference
# cycle, so clear it out.
......@@ -124,14 +145,14 @@ def _format_thread_info(lines):
del lines
del threads
def _format_greenlet_info(lines):
def _format_greenlet_info(lines, greenlet_stacks):
# Use the gc module to inspect all objects to find the greenlets
# since there isn't a global registry
lines.append('*' * 80)
lines.append('* Greenlets')
lines.append('*' * 80)
for tree in GreenletTree.forest():
lines.extend(tree.format_lines(details=True))
lines.extend(tree.format_lines(details={'running_stacks': greenlet_stacks}))
del lines
......@@ -250,6 +271,12 @@ class GreenletTree(object):
def __getattr__(self, name):
return getattr(self.greenlet, name)
DEFAULT_DETAILS = {
'running_stacks': True,
'spawning_stacks': True,
'locals': True,
}
def format_lines(self, details=True):
"""
Return a sequence of lines for the greenlet tree.
......@@ -261,7 +288,11 @@ class GreenletTree(object):
if not details:
details = {}
else:
details = {'stacks': True, 'locals': True}
details = self.DEFAULT_DETAILS.copy()
else:
params = details
details = self.DEFAULT_DETAILS.copy()
details.update(params)
tree = _TreeFormatter(details, depth=0)
lines = [l[0] if isinstance(l, tuple) else l
for l in self._render(tree)]
......@@ -280,7 +311,7 @@ class GreenletTree(object):
@staticmethod
def __render_tb(tree, label, frame):
tree.child_data(label)
tb = ''.join(traceback.format_stack(frame))
tb = ''.join(traceback.format_stack(frame, _STACK_LIMIT))
tree.child_multidata(tb)
@staticmethod
......@@ -288,6 +319,9 @@ class GreenletTree(object):
return (getattr(greenlet, 'spawning_greenlet', None) or _noop)()
def __render_locals(self, tree):
# Defer the import to avoid cycles
from gevent.local import all_local_dicts_for_greenlet
gr_locals = all_local_dicts_for_greenlet(self.greenlet)
if gr_locals:
tree.child_data("Greenlet Locals:")
......@@ -309,15 +343,17 @@ class GreenletTree(object):
label += '; not running'
tree.node_label(label)
if self.greenlet.parent is not None:
tree.child_data('Parent: ' + repr(self.greenlet.parent))
tree.child_data('Parent: ' + repr(self.greenlet.parent))
if getattr(self.greenlet, 'gevent_monitoring_thread', None) is not None:
tree.child_data('Monitoring Thread:' + repr(self.greenlet.gevent_monitoring_thread()))
if self.greenlet and tree.details and tree.details['stacks']:
if self.greenlet and tree.details and tree.details['running_stacks']:
self.__render_tb(tree, 'Running:', self.greenlet.gr_frame)
spawning_stack = getattr(self.greenlet, 'spawning_stack', None)
if spawning_stack and tree.details and tree.details['stacks']:
if spawning_stack and tree.details and tree.details['spawning_stacks']:
self.__render_tb(tree, 'Spawned at:', spawning_stack)
spawning_parent = self.__spawning_parent(self.greenlet)
......
......@@ -28,8 +28,8 @@ PYPY = gsysinfo.PYPY
CPYTHON = not PYPY
VERBOSE = sys.argv.count('-v') > 1
WIN = gsysinfo.WIN
LINUX = sys.platform.startswith('linux')
OSX = sys.platform == 'darwin'
LINUX = gsysinfo.LINUX
OSX = gsysinfo.OSX
PURE_PYTHON = gsysinfo.PURE_PYTHON
......
# Copyright 2018 gevent contributors. See LICENSE for details.
import gc
import unittest
from greenlet import gettrace
from greenlet import settrace
from gevent.monkey import get_original
from gevent._compat import thread_mod_name
from gevent._compat import NativeStrIO
from gevent import _monitor as monitor
get_ident = get_original(thread_mod_name, 'get_ident')
class MockHub(object):
def __init__(self):
self.thread_ident = get_ident()
self.exception_stream = NativeStrIO()
self.dead = False
def __bool__(self):
return not self.dead
__nonzero__ = __bool__
def handle_error(self, *args): # pylint:disable=unused-argument
raise # pylint:disable=misplaced-bare-raise
class TestPeriodicMonitoringThread(unittest.TestCase):
def setUp(self):
self._orig_start_new_thread = monitor.start_new_thread
self._orig_thread_sleep = monitor.thread_sleep
monitor.thread_sleep = lambda _s: gc.collect() # For PyPy
monitor.start_new_thread = lambda _f, _a: 0xDEADBEEF
self.hub = MockHub()
self.pmt = monitor.PeriodicMonitoringThread(self.hub)
def tearDown(self):
monitor.start_new_thread = self._orig_start_new_thread
monitor.thread_sleep = self._orig_thread_sleep
prev = self.pmt.previous_trace_function
self.pmt.kill()
assert gettrace() is prev, (gettrace(), prev)
settrace(None)
def test_constructor(self):
self.assertEqual(0xDEADBEEF, self.pmt.monitor_thread_ident)
self.assertEqual(gettrace(), self.pmt.greenlet_trace)
def test_hub_wref(self):
self.assertIs(self.hub, self.pmt.hub)
del self.hub
import gc
gc.collect()
self.assertIsNone(self.pmt.hub)
# And it killed itself.
self.assertFalse(self.pmt.should_run)
self.assertIsNone(gettrace())
def test_previous_trace(self):
self.pmt.kill()
self.assertIsNone(gettrace())
called = []
def f(*args):
called.append(args)
settrace(f)
self.pmt = monitor.PeriodicMonitoringThread(self.hub)
self.assertEqual(gettrace(), self.pmt.greenlet_trace)
self.assertIs(self.pmt.previous_trace_function, f)
self.pmt.greenlet_trace('event', 'args')
self.assertEqual([('event', 'args')], called)
def test_greenlet_trace(self):
self.assertEqual(0, self.pmt._greenlet_switch_counter)
# Unknown event still counts as a switch (should it?)
self.pmt.greenlet_trace('unknown', None)
self.assertEqual(1, self.pmt._greenlet_switch_counter)
self.assertIsNone(self.pmt._active_greenlet)
origin = object()
target = object()
self.pmt.greenlet_trace('switch', (origin, target))
self.assertEqual(2, self.pmt._greenlet_switch_counter)
self.assertIs(target, self.pmt._active_greenlet)
# Unknown event removes active greenlet
self.pmt.greenlet_trace('unknown', self)
self.assertEqual(3, self.pmt._greenlet_switch_counter)
self.assertIsNone(self.pmt._active_greenlet)
def test_add_monitoring_function(self):
self.assertRaises(ValueError, self.pmt.add_monitoring_function, None, 1)
self.assertRaises(ValueError, self.pmt.add_monitoring_function, lambda: None, -1)
def f():
pass
# Add
self.pmt.add_monitoring_function(f, 1)
self.assertEqual(2, len(self.pmt.monitoring_functions()))
self.assertEqual(1, self.pmt.monitoring_functions()[1].period)
# Update
self.pmt.add_monitoring_function(f, 2)
self.assertEqual(2, len(self.pmt.monitoring_functions()))
self.assertEqual(2, self.pmt.monitoring_functions()[1].period)
# Remove
self.pmt.add_monitoring_function(f, None)
self.assertEqual(1, len(self.pmt.monitoring_functions()))
def test_calculate_sleep_time(self):
self.assertEqual(
self.pmt.monitoring_functions()[0].period,
self.pmt.calculate_sleep_time())
# Pretend that GEVENT_CONFIG.max_blocking_time was set to 0,
# to disable this monitor.
self.pmt._calculated_sleep_time = 0
self.assertEqual(
self.pmt.inactive_sleep_time,
self.pmt.calculate_sleep_time()
)
# Getting the list of monitoring functions will also
# do this, if it looks like it has changed
self.pmt.monitoring_functions()[0].period = -1
self.pmt._calculated_sleep_time = 0
self.pmt.monitoring_functions()
self.assertEqual(
self.pmt.monitoring_functions()[0].period,
self.pmt.calculate_sleep_time())
self.assertEqual(
self.pmt.monitoring_functions()[0].period,
self.pmt._calculated_sleep_time)
def test_monitor_blocking(self):
# Initially there's no active greenlet and no switches,
# so nothing is considered blocked
self.assertFalse(self.pmt.monitor_blocking(self.hub))
# Give it an active greenlet
origin = object()
target = object()
self.pmt.greenlet_trace('switch', (origin, target))
# We've switched, so we're not blocked
self.assertFalse(self.pmt.monitor_blocking(self.hub))
# Again without switching is a problem.
self.assertTrue(self.pmt.monitor_blocking(self.hub))
# But we can order it not to be a problem
self.pmt.ignore_current_greenlet_blocking()
self.assertFalse(self.pmt.monitor_blocking(self.hub))
# And back again
self.pmt.monitor_current_greenlet_blocking()
self.assertTrue(self.pmt.monitor_blocking(self.hub))
# A bad thread_ident in the hub doesn't mess things up
self.hub.thread_ident = -1
self.assertTrue(self.pmt.monitor_blocking(self.hub))
def test_call_destroyed_hub(self):
# Add a function that destroys the hub so we break out (eventually)
# This clears the wref, which eventually calls kill()
def f(_hub):
_hub = None
self.hub = None
gc.collect()
self.pmt.add_monitoring_function(f, 0.1)
self.pmt()
self.assertFalse(self.pmt.should_run)
def test_call_dead_hub(self):
# Add a function that makes the hub go false (e.g., it quit)
# This causes the function to kill itself.
def f(hub):
hub.dead = True
self.pmt.add_monitoring_function(f, 0.1)
self.pmt()
self.assertFalse(self.pmt.should_run)
def test_call_SystemExit(self):
# breaks the loop
def f(_hub):
raise SystemExit()
self.pmt.add_monitoring_function(f, 0.1)
self.pmt()
def test_call_other_error(self):
class MyException(Exception):
pass
def f(_hub):
raise MyException()
self.pmt.add_monitoring_function(f, 0.1)
with self.assertRaises(MyException):
self.pmt()
if __name__ == '__main__':
unittest.main()
......@@ -19,13 +19,16 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import time
import greentest
import greentest.timing
import time
import re
import gevent
from gevent import socket
from gevent.hub import Waiter, get_hub
from gevent._compat import NativeStrIO
DELAY = 0.1
......@@ -115,5 +118,196 @@ class TestWaiter(greentest.TestCase):
g.kill()
@greentest.skipOnCI("Racy on CI")
class TestPeriodicMonitoringThread(greentest.TestCase):
def _reset_hub(self):
hub = get_hub()
try:
del hub.exception_stream
except AttributeError:
pass
if hub._threadpool is not None:
hub.threadpool.join()
hub.threadpool.kill()
del hub.threadpool
def setUp(self):
super(TestPeriodicMonitoringThread, self).setUp()
self.monitor_thread = gevent.config.monitor_thread
gevent.config.monitor_thread = True
self.monitor_fired = 0
self.monitored_hubs = set()
self._reset_hub()
def tearDown(self):
hub = get_hub()
if not self.monitor_thread and hub.periodic_monitoring_thread:
# If it was true, nothing to do. If it was false, tear things down.
hub.periodic_monitoring_thread.kill()
hub.periodic_monitoring_thread = None
gevent.config.monitor_thread = self.monitor_thread
self.monitored_hubs = None
self._reset_hub()
def _monitor(self, hub):
self.monitor_fired += 1
if self.monitored_hubs is not None:
self.monitored_hubs.add(hub)
def test_config(self):
self.assertEqual(0.1, gevent.config.max_blocking_time)
def _run_monitoring_threads(self, monitor, kill=True):
self.assertTrue(monitor.should_run)
from threading import Condition
cond = Condition()
cond.acquire()
def monitor_cond(_hub):
cond.acquire()
cond.notifyAll()
cond.release()
if kill:
# Only run once. Especially helpful on PyPy, where
# formatting stacks is expensive.
monitor.kill()
monitor.add_monitoring_function(monitor_cond, 0.01)
cond.wait()
cond.release()
monitor.add_monitoring_function(monitor_cond, None)
@greentest.ignores_leakcheck
def test_kill_removes_trace(self):
from greenlet import gettrace
hub = get_hub()
hub.start_periodic_monitoring_thread()
self.assertIsNotNone(gettrace())
hub.periodic_monitoring_thread.kill()
self.assertIsNone(gettrace())
@greentest.ignores_leakcheck
def test_blocking_this_thread(self):
hub = get_hub()
stream = hub.exception_stream = NativeStrIO()
monitor = hub.start_periodic_monitoring_thread()
self.assertIsNotNone(monitor)
self.assertEqual(1, len(monitor.monitoring_functions()))
monitor.add_monitoring_function(self._monitor, 0.1)
self.assertEqual(2, len(monitor.monitoring_functions()))
self.assertEqual(self._monitor, monitor.monitoring_functions()[1].function)
self.assertEqual(0.1, monitor.monitoring_functions()[1].period)
# We must make sure we have switched greenlets at least once,
# otherwise we can't detect a failure.
gevent.sleep(0.0001)
assert hub.exception_stream is stream
try:
time.sleep(0.3) # Thrice the default
self._run_monitoring_threads(monitor)
finally:
monitor.add_monitoring_function(self._monitor, None)
self.assertEqual(1, len(monitor._monitoring_functions))
assert hub.exception_stream is stream
monitor.kill()
del hub.exception_stream
self.assertGreaterEqual(self.monitor_fired, 1)
data = stream.getvalue()
self.assertIn('appears to be blocked', data)
self.assertIn('PeriodicMonitoringThread', data)
def _prep_worker_thread(self):
hub = get_hub()
threadpool = hub.threadpool
worker_hub = threadpool.apply(get_hub)
stream = worker_hub.exception_stream = NativeStrIO()
# It does not have a monitoring thread yet
self.assertIsNone(worker_hub.periodic_monitoring_thread)
# So switch to it and give it one.
threadpool.apply(gevent.sleep, (0.01,))
self.assertIsNotNone(worker_hub.periodic_monitoring_thread)
worker_monitor = worker_hub.periodic_monitoring_thread
worker_monitor.add_monitoring_function(self._monitor, 0.1)
return worker_hub, stream, worker_monitor
@greentest.ignores_leakcheck
def test_blocking_threadpool_thread_task_queue(self):
# A threadpool thread spends much of its time
# blocked on the native Lock object. Unless we take
# care, if that thread had created a hub, it will constantly
# be reported as blocked.
worker_hub, stream, worker_monitor = self._prep_worker_thread()
# Now wait until the monitoring threads have run.
self._run_monitoring_threads(worker_monitor)
worker_monitor.kill()
# We did run the monitor in the worker thread, but it
# did NOT report itself blocked by the worker thread sitting there.
self.assertIn(worker_hub, self.monitored_hubs)
self.assertEqual(stream.getvalue(), '')
@greentest.ignores_leakcheck
def test_blocking_threadpool_thread_one_greenlet(self):
# If the background threadpool thread has no other greenlets to run
# and never switches, then even if it has a hub
# we don't report it blocking. The threadpool is *meant* to run
# tasks that block.
hub = get_hub()
threadpool = hub.threadpool
worker_hub, stream, worker_monitor = self._prep_worker_thread()
task = threadpool.spawn(time.sleep, 0.3)
# Now wait until the monitoring threads have run.
self._run_monitoring_threads(worker_monitor)
# and be sure the task ran
task.get()
worker_monitor.kill()
# We did run the monitor in the worker thread, but it
# did NOT report itself blocked by the worker thread
self.assertIn(worker_hub, self.monitored_hubs)
self.assertEqual(stream.getvalue(), '')
@greentest.ignores_leakcheck
def test_blocking_threadpool_thread_multi_greenlet(self):
# If the background threadpool thread ever switches
# greenlets, monitoring goes into affect.
hub = get_hub()
threadpool = hub.threadpool
worker_hub, stream, worker_monitor = self._prep_worker_thread()
def task():
g = gevent.spawn(time.sleep, 0.7)
g.join()
task = threadpool.spawn(task)
# Now wait until the monitoring threads have run.
self._run_monitoring_threads(worker_monitor, kill=False)
# and be sure the task ran
task.get()
worker_monitor.kill()
# We did run the monitor in the worker thread, and it
# DID report itself blocked by the worker thread
self.assertIn(worker_hub, self.monitored_hubs)
data = stream.getvalue()
self.assertIn('appears to be blocked', data)
self.assertIn('PeriodicMonitoringThread', data)
if __name__ == '__main__':
greentest.main()
......@@ -113,7 +113,8 @@ class TestTree(greentest.TestCase):
s4.join()
tree = s4.value
return tree, str(tree), tree.format(details={'stacks': False})
return tree, str(tree), tree.format(details={'running_stacks': False,
'spawning_stacks': False})
@greentest.ignores_leakcheck
def test_tree(self):
......@@ -135,31 +136,32 @@ class TestTree(greentest.TestCase):
self.maxDiff = None
expected = """\
<greenlet.greenlet object at X>
: Parent: None
: Greenlet Locals:
: Local <class '__main__.MyLocal'> at X
: {'foo': 42}
+--- <QuietHub at X default default pending=0 ref=0>
+--- <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <greenlet.greenlet object at X>
+--- <Greenlet "Greenlet-1" at X: _run>; finished with value <Greenlet "Greenlet-0" at X
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "Greenlet-0" at X: _run>; finished with exception ExpectedException()
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-2" at X: _run>; finished with value <Greenlet "Greenlet-4" at X
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "Greenlet-4" at X: _run>; finished with exception ExpectedException()
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-3" at X: _run>; finished with value <Greenlet "Greenlet-5" at X
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Spawn Tree Locals
: {'stl': 'STL'}
| +--- <Greenlet "Greenlet-5" at X: _run>; finished with value <Greenlet "Greenlet-6" at X
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
| +--- <Greenlet "Greenlet-6" at X: _run>; finished with exception ExpectedException()
: Parent: <QuietHub at X default default pending=0 ref=0>
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <Greenlet "Greenlet-7" at X: _run>; finished with value <gevent.util.GreenletTree obje
Parent: <QuietHub at X default default pending=0 ref=0>
Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
""".strip()
self.assertEqual(value, expected)
self.assertEqual(expected, value)
@greentest.ignores_leakcheck
def test_tree_no_track(self):
......
test___monkey_patching.py
test__monkey_ssl_warning.py
test___monitor.py
......@@ -15,3 +15,5 @@ test__socket_timeout.py
test__examples.py
test__issue330.py
test___ident.py
test___config.py
test___monitor.py
......@@ -132,3 +132,4 @@ test__issue639.py
test__issue_728.py
test__refcount_core.py
test__api.py
test__monitor.py
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment