Commit 0eefec3a authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1722 from gevent/issue1670

Install thread profiling/tracing hooks in ThreadPool worker threads while the task runs
parents abcd3a95 60cc6b0f
Make worker threads created by :class:`gevent.threadpool.ThreadPool` install
the :func:`threading.setprofile` and :func:`threading.settrace` hooks
while tasks are running. This provides visibility to profiling and
tracing tools like yappi.
Reported by Suhail Muhammed.
...@@ -721,5 +721,104 @@ class TestThreadResult(greentest.TestCase): ...@@ -721,5 +721,104 @@ class TestThreadResult(greentest.TestCase):
self.assertIsNotNone(tr.receiver) self.assertIsNotNone(tr.receiver)
class TestWorkerProfileAndTrace(TestCase):
# Worker threads should execute the test and trace functions.
# (When running the user code.)
# https://github.com/gevent/gevent/issues/1670
old_profile = None
old_trace = None
def setUp(self):
super(TestWorkerProfileAndTrace, self).setUp()
self.old_profile = gevent.threadpool._get_thread_profile()
self.old_trace = gevent.threadpool._get_thread_trace()
def tearDown(self):
import threading
threading.setprofile(self.old_profile)
threading.settrace(self.old_trace)
def test_get_profile(self):
import threading
threading.setprofile(self)
self.assertIs(gevent.threadpool._get_thread_profile(), self)
def test_get_trace(self):
import threading
threading.settrace(self)
self.assertIs(gevent.threadpool._get_thread_trace(), self)
def _test_func_called_in_task(self, func):
import threading
import sys
setter = getattr(threading, 'set' + func)
getter = getattr(sys, 'get' + func)
called = [0]
def callback(*_args):
called[0] += 1
def task():
test.assertIsNotNone(getter)
return 1701
before_task = []
after_task = []
test = self
class Pool(ThreadPool):
class _WorkerGreenlet(ThreadPool._WorkerGreenlet):
# pylint:disable=signature-differs
def _before_run_task(self, func, *args):
before_task.append(func)
before_task.append(getter())
ThreadPool._WorkerGreenlet._before_run_task(self, func, *args)
before_task.append(getter())
def _after_run_task(self, func, *args):
after_task.append(func)
after_task.append(getter())
ThreadPool._WorkerGreenlet._after_run_task(self, func, *args)
after_task.append(getter())
self.ClassUnderTest = Pool
pool = self._makeOne(1, create_all_worker_threads=True)
assert isinstance(pool, Pool)
# Do this after creating the pool and its thread to verify we don't
# capture the function at thread creation time.
setter(callback)
res = pool.apply(task)
self.assertEqual(res, 1701)
self.assertGreaterEqual(called[0], 1)
# Shutdown the pool. PyPy2.7-7.3.1 on Windows/Appveyor was
# properly seeing the before_task value, but after_task was empty.
# That suggested a memory consistency type issue, where the updates
# written by the other thread weren't fully visible to this thread
# yet. Try to kill it to see if that helps. (Couldn't reproduce
# on macOS).
#
# https://ci.appveyor.com/project/jamadden/gevent/build/job/wo9likk85cduui7n#L867
pool.kill()
# The function is active only for the scope of the function
self.assertEqual(before_task, [task, None, callback])
self.assertEqual(after_task, [task, callback, None])
def test_profile_called_in_task(self):
self._test_func_called_in_task('profile')
def test_trace_called_in_task(self):
self._test_func_called_in_task('trace')
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
...@@ -40,6 +40,17 @@ def _format_hub(hub): ...@@ -40,6 +40,17 @@ def _format_hub(hub):
hub.__class__.__name__, id(hub), hub.thread_ident hub.__class__.__name__, id(hub), hub.thread_ident
) )
def _get_thread_profile(_sys=sys):
if 'threading' in _sys.modules:
return _sys.modules['threading']._profile_hook
def _get_thread_trace(_sys=sys):
if 'threading' in _sys.modules:
return _sys.modules['threading']._trace_hook
class _WorkerGreenlet(RawGreenlet): class _WorkerGreenlet(RawGreenlet):
# Exists to produce a more useful repr for worker pool # Exists to produce a more useful repr for worker pool
# threads/greenlets, and manage the communication of the worker # threads/greenlets, and manage the communication of the worker
...@@ -137,12 +148,27 @@ class _WorkerGreenlet(RawGreenlet): ...@@ -137,12 +148,27 @@ class _WorkerGreenlet(RawGreenlet):
file=stderr) file=stderr)
tb = tb.tb_next tb = tb.tb_next
def _before_run_task(self, func, args, kwargs, thread_result,
_sys=sys,
_get_thread_profile=_get_thread_profile,
_get_thread_trace=_get_thread_trace):
# pylint:disable=unused-argument
_sys.setprofile(_get_thread_profile())
_sys.settrace(_get_thread_trace())
def _after_run_task(self, func, args, kwargs, thread_result, _sys=sys):
# pylint:disable=unused-argument
_sys.setprofile(None)
_sys.settrace(None)
def __run_task(self, func, args, kwargs, thread_result): def __run_task(self, func, args, kwargs, thread_result):
self._before_run_task(func, args, kwargs, thread_result)
try: try:
thread_result.set(func(*args, **kwargs)) thread_result.set(func(*args, **kwargs))
except: # pylint:disable=bare-except except: # pylint:disable=bare-except
thread_result.handle_error((self, func), self._exc_info()) thread_result.handle_error((self, func), self._exc_info())
finally: finally:
self._after_run_task(func, args, kwargs, thread_result)
del func, args, kwargs, thread_result del func, args, kwargs, thread_result
def run(self): def run(self):
...@@ -236,12 +262,23 @@ class ThreadPool(GroupMappingMixin): ...@@ -236,12 +262,23 @@ class ThreadPool(GroupMappingMixin):
The `len` of instances of this class is the number of enqueued The `len` of instances of this class is the number of enqueued
(unfinished) tasks. (unfinished) tasks.
Just before a task starts running in a worker thread,
the values of :func:`threading.setprofile` and :func:`threading.settrace`
are consulted. Any values there are installed in that thread for the duration
of the task (using :func:`sys.setprofile` and :func:`sys.settrace`, respectively).
(Because worker threads are long-lived and outlast any given task, this arrangement
lets the hook functions change between tasks, but does not let them see the
bookkeeping done by the worker thread itself.)
.. caution:: Instances of this class are only true if they have .. caution:: Instances of this class are only true if they have
unfinished tasks. unfinished tasks.
.. versionchanged:: 1.5a3 .. versionchanged:: 1.5a3
The undocumented ``apply_e`` function, deprecated since 1.1, The undocumented ``apply_e`` function, deprecated since 1.1,
was removed. was removed.
.. versionchanged:: NEXT
Install the profile and trace functions in the worker thread while
the worker thread is running the supplied task.
""" """
__slots__ = ( __slots__ = (
...@@ -268,6 +305,8 @@ class ThreadPool(GroupMappingMixin): ...@@ -268,6 +305,8 @@ class ThreadPool(GroupMappingMixin):
'task_queue', 'task_queue',
) )
_WorkerGreenlet = _WorkerGreenlet
def __init__(self, maxsize, hub=None): def __init__(self, maxsize, hub=None):
if hub is None: if hub is None:
hub = get_hub() hub = get_hub()
...@@ -421,7 +460,7 @@ class ThreadPool(GroupMappingMixin): ...@@ -421,7 +460,7 @@ class ThreadPool(GroupMappingMixin):
self.fork_watcher.stop() self.fork_watcher.stop()
def _adjust_wait(self): def _adjust_wait(self):
delay = 0.0001 delay = self.hub.loop.approx_timer_resolution
while True: while True:
self._adjust_step() self._adjust_step()
if len(self._worker_greenlets) <= self._maxsize: if len(self._worker_greenlets) <= self._maxsize:
...@@ -437,7 +476,7 @@ class ThreadPool(GroupMappingMixin): ...@@ -437,7 +476,7 @@ class ThreadPool(GroupMappingMixin):
self.manager = Greenlet.spawn(self._adjust_wait) self.manager = Greenlet.spawn(self._adjust_wait)
def _add_thread(self): def _add_thread(self):
_WorkerGreenlet(self) self._WorkerGreenlet(self)
def spawn(self, func, *args, **kwargs): def spawn(self, func, *args, **kwargs):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment