Commit d5942f40 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Make the monitor thread survive a fork. (#1186)

* Make the monitor thread survive a fork.

* Add coverage for the new get_process.

* psutil is not on windows, move the test to a protected location.

* No, really.
parent 3b12c0e8
...@@ -29,6 +29,9 @@ ...@@ -29,6 +29,9 @@
- Add `gevent.util.assert_switches` to build on the monitoring - Add `gevent.util.assert_switches` to build on the monitoring
functions. Fixes :issue:`1182`. functions. Fixes :issue:`1182`.
- A started monitor thread for the active hub now survives a fork. See
:issue:`1185`.
1.3b1 (2018-04-13) 1.3b1 (2018-04-13)
================== ==================
......
# Copyright (c) 2018 gevent. See LICENSE for details. # Copyright (c) 2018 gevent. See LICENSE for details.
from __future__ import print_function, absolute_import, division from __future__ import print_function, absolute_import, division
import os
import sys import sys
import traceback import traceback
...@@ -32,20 +33,7 @@ get_thread_ident = get_original(thread_mod_name, 'get_ident') ...@@ -32,20 +33,7 @@ get_thread_ident = get_original(thread_mod_name, 'get_ident')
start_new_thread = get_original(thread_mod_name, 'start_new_thread') start_new_thread = get_original(thread_mod_name, 'start_new_thread')
thread_sleep = get_original('time', 'sleep') thread_sleep = get_original('time', 'sleep')
try:
# The standard library 'resource' module doesn't provide
# a standard way to get the RSS measure, only the maximum.
# You might be tempted to try to compute something by adding
# together text and data sizes, but on many systems those come back
# zero. So our only option is psutil.
from psutil import Process, AccessDenied
# Make sure it works (why would we be denied access to our own process?)
try:
Process().memory_full_info()
except AccessDenied: # pragma: no cover
Process = None
except ImportError:
Process = None
class MonitorWarning(RuntimeWarning): class MonitorWarning(RuntimeWarning):
"""The type of warnings we emit.""" """The type of warnings we emit."""
...@@ -257,6 +245,17 @@ class PeriodicMonitoringThread(object): ...@@ -257,6 +245,17 @@ class PeriodicMonitoringThread(object):
# thread. # thread.
self.monitor_thread_ident = start_new_thread(self, ()) self.monitor_thread_ident = start_new_thread(self, ())
# We must track the PID to know if your thread has died after a fork
self.pid = os.getpid()
def _on_fork(self):
# Pseudo-standard method that resolver_ares and threadpool
# also have, called by hub.reinit()
pid = os.getpid()
if pid != self.pid:
self.pid = pid
self.monitor_thread_ident = start_new_thread(self, ())
@property @property
def hub(self): def hub(self):
return self._hub_wref() return self._hub_wref()
...@@ -395,8 +394,28 @@ class PeriodicMonitoringThread(object): ...@@ -395,8 +394,28 @@ class PeriodicMonitoringThread(object):
def monitor_current_greenlet_blocking(self): def monitor_current_greenlet_blocking(self):
self._greenlet_tracer.monitor_current_greenlet_blocking() self._greenlet_tracer.monitor_current_greenlet_blocking()
def _get_process(self): # pylint:disable=method-hidden
try:
# The standard library 'resource' module doesn't provide
# a standard way to get the RSS measure, only the maximum.
# You might be tempted to try to compute something by adding
# together text and data sizes, but on many systems those come back
# zero. So our only option is psutil.
from psutil import Process, AccessDenied
# Make sure it works (why would we be denied access to our own process?)
try:
proc = Process()
proc.memory_full_info()
except AccessDenied: # pragma: no cover
proc = None
except ImportError:
proc = None
self._get_process = lambda: proc
return proc
def can_monitor_memory_usage(self): def can_monitor_memory_usage(self):
return Process is not None return self._get_process() is not None
def install_monitor_memory_usage(self): def install_monitor_memory_usage(self):
# Start monitoring memory usage, if possible. # Start monitoring memory usage, if possible.
...@@ -417,7 +436,7 @@ class PeriodicMonitoringThread(object): ...@@ -417,7 +436,7 @@ class PeriodicMonitoringThread(object):
# They disabled it. # They disabled it.
return -1 # value for tests return -1 # value for tests
rusage = Process().memory_full_info() rusage = self._get_process().memory_full_info()
# uss only documented available on Windows, Linux, and OS X. # uss only documented available on Windows, Linux, and OS X.
# If not available, fall back to rss as an aproximation. # If not available, fall back to rss as an aproximation.
mem_usage = getattr(rusage, 'uss', 0) or rusage.rss mem_usage = getattr(rusage, 'uss', 0) or rusage.rss
......
...@@ -267,8 +267,10 @@ class signal(object): ...@@ -267,8 +267,10 @@ class signal(object):
self.hub.handle_error(None, *sys.exc_info()) self.hub.handle_error(None, *sys.exc_info())
def reinit(): def reinit(hub=None):
""" """
reinit() -> None
Prepare the gevent hub to run in a new (forked) process. Prepare the gevent hub to run in a new (forked) process.
This should be called *immediately* after :func:`os.fork` in the This should be called *immediately* after :func:`os.fork` in the
...@@ -290,47 +292,47 @@ def reinit(): ...@@ -290,47 +292,47 @@ def reinit():
if the fork process can be more smoothly managed. if the fork process can be more smoothly managed.
.. warning:: See remarks in :func:`gevent.os.fork` about greenlets .. warning:: See remarks in :func:`gevent.os.fork` about greenlets
and libev watchers in the child process. and event loop watchers in the child process.
""" """
# Note the signature line in the docstring: hub is not a public param.
# The loop reinit function in turn calls libev's ev_loop_fork # The loop reinit function in turn calls libev's ev_loop_fork
# function. # function.
hub = _get_hub() hub = _get_hub() if hub is None else hub
if hub is None:
if hub is not None: return
# Note that we reinit the existing loop, not destroy it.
# See https://github.com/gevent/gevent/issues/200. # Note that we reinit the existing loop, not destroy it.
hub.loop.reinit() # See https://github.com/gevent/gevent/issues/200.
# libev's fork watchers are slow to fire because the only fire hub.loop.reinit()
# at the beginning of a loop; due to our use of callbacks that # libev's fork watchers are slow to fire because the only fire
# run at the end of the loop, that may be too late. The # at the beginning of a loop; due to our use of callbacks that
# threadpool and resolvers depend on the fork handlers being # run at the end of the loop, that may be too late. The
# run (specifically, the threadpool will fail in the forked # threadpool and resolvers depend on the fork handlers being
# child if there were any threads in it, which there will be # run (specifically, the threadpool will fail in the forked
# if the resolver_thread was in use (the default) before the # child if there were any threads in it, which there will be
# fork.) # if the resolver_thread was in use (the default) before the
# # fork.)
# If the forked process wants to use the threadpool or #
# resolver immediately (in a queued callback), it would hang. # If the forked process wants to use the threadpool or
# # resolver immediately (in a queued callback), it would hang.
# The below is a workaround. Fortunately, both of these #
# methods are idempotent and can be called multiple times # The below is a workaround. Fortunately, all of these
# following a fork if the suddenly started working, or were # methods are idempotent and can be called multiple times
# already working on some platforms. Other threadpools and fork handlers # following a fork if the suddenly started working, or were
# will be called at an arbitrary time later ('soon') # already working on some platforms. Other threadpools and fork handlers
if hasattr(hub.threadpool, '_on_fork'): # will be called at an arbitrary time later ('soon')
hub.threadpool._on_fork() for obj in (hub._threadpool, hub._resolver, hub.periodic_monitoring_thread):
# resolver_ares also has a fork watcher that's not firing getattr(obj, '_on_fork', lambda: None)()
if hasattr(hub.resolver, '_on_fork'):
hub.resolver._on_fork() # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a
# pass around before returning to this greenlet. That will allow any
# TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if
# pass around before returning to this greenlet. That will allow any # we do this, certain tests that heavily mix threads and forking,
# user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear
# we do this, certain tests that heavily mix threads and forking, # why.
# like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear #sleep(0.00001)
# why. #sleep(0.00001)
#sleep(0.00001)
#sleep(0.00001)
hub_ident_registry = IdentRegistry() hub_ident_registry = IdentRegistry()
......
...@@ -19,6 +19,9 @@ from gevent import config as GEVENT_CONFIG ...@@ -19,6 +19,9 @@ from gevent import config as GEVENT_CONFIG
get_ident = get_original(thread_mod_name, 'get_ident') get_ident = get_original(thread_mod_name, 'get_ident')
class MockHub(object): class MockHub(object):
_threadpool = None
_resolver = None
def __init__(self): def __init__(self):
self.thread_ident = get_ident() self.thread_ident = get_ident()
self.exception_stream = NativeStrIO() self.exception_stream = NativeStrIO()
...@@ -32,7 +35,15 @@ class MockHub(object): ...@@ -32,7 +35,15 @@ class MockHub(object):
def handle_error(self, *args): # pylint:disable=unused-argument def handle_error(self, *args): # pylint:disable=unused-argument
raise # pylint:disable=misplaced-bare-raise raise # pylint:disable=misplaced-bare-raise
@property
def loop(self):
return self
def reinit(self):
"mock loop.reinit"
class _AbstractTestPeriodicMonitoringThread(object): class _AbstractTestPeriodicMonitoringThread(object):
# Makes sure we don't actually spin up a new monitoring thread.
# pylint:disable=no-member # pylint:disable=no-member
...@@ -41,9 +52,16 @@ class _AbstractTestPeriodicMonitoringThread(object): ...@@ -41,9 +52,16 @@ class _AbstractTestPeriodicMonitoringThread(object):
self._orig_start_new_thread = monitor.start_new_thread self._orig_start_new_thread = monitor.start_new_thread
self._orig_thread_sleep = monitor.thread_sleep self._orig_thread_sleep = monitor.thread_sleep
monitor.thread_sleep = lambda _s: gc.collect() # For PyPy monitor.thread_sleep = lambda _s: gc.collect() # For PyPy
monitor.start_new_thread = lambda _f, _a: 0xDEADBEEF self.tid = 0xDEADBEEF
def start_new_thread(_f, _a):
r = self.tid
self.tid += 1
return r
monitor.start_new_thread = start_new_thread
self.hub = MockHub() self.hub = MockHub()
self.pmt = monitor.PeriodicMonitoringThread(self.hub) self.pmt = monitor.PeriodicMonitoringThread(self.hub)
self.hub.periodic_monitoring_thread = self.pmt
self.pmt_default_funcs = self.pmt.monitoring_functions()[:] self.pmt_default_funcs = self.pmt.monitoring_functions()[:]
self.len_pmt_default_funcs = len(self.pmt_default_funcs) self.len_pmt_default_funcs = len(self.pmt_default_funcs)
...@@ -64,6 +82,12 @@ class TestPeriodicMonitoringThread(_AbstractTestPeriodicMonitoringThread, ...@@ -64,6 +82,12 @@ class TestPeriodicMonitoringThread(_AbstractTestPeriodicMonitoringThread,
self.assertEqual(0xDEADBEEF, self.pmt.monitor_thread_ident) self.assertEqual(0xDEADBEEF, self.pmt.monitor_thread_ident)
self.assertEqual(gettrace(), self.pmt._greenlet_tracer) self.assertEqual(gettrace(), self.pmt._greenlet_tracer)
@skipOnPyPyOnWindows("psutil doesn't install on PyPy on Win")
def test_get_process(self):
proc = self.pmt._get_process()
self.assertIsNotNone(proc)
self.assertIs(proc, self.pmt._get_process())
def test_hub_wref(self): def test_hub_wref(self):
self.assertIs(self.hub, self.pmt.hub) self.assertIs(self.hub, self.pmt.hub)
del self.hub del self.hub
...@@ -163,6 +187,18 @@ class TestPeriodicMonitoringThread(_AbstractTestPeriodicMonitoringThread, ...@@ -163,6 +187,18 @@ class TestPeriodicMonitoringThread(_AbstractTestPeriodicMonitoringThread,
with self.assertRaises(MyException): with self.assertRaises(MyException):
self.pmt() self.pmt()
def test_hub_reinit(self):
import os
from gevent.hub import reinit
self.pmt.pid = -1
old_tid = self.pmt.monitor_thread_ident
reinit(self.hub)
self.assertEqual(os.getpid(), self.pmt.pid)
self.assertEqual(old_tid + 1, self.pmt.monitor_thread_ident)
class TestPeriodicMonitorBlocking(_AbstractTestPeriodicMonitoringThread, class TestPeriodicMonitorBlocking(_AbstractTestPeriodicMonitoringThread,
unittest.TestCase): unittest.TestCase):
...@@ -264,12 +300,10 @@ class TestPeriodicMonitorMemory(_AbstractTestPeriodicMonitoringThread, ...@@ -264,12 +300,10 @@ class TestPeriodicMonitorMemory(_AbstractTestPeriodicMonitoringThread,
self._old_max = GEVENT_CONFIG.max_memory_usage self._old_max = GEVENT_CONFIG.max_memory_usage
GEVENT_CONFIG.max_memory_usage = None GEVENT_CONFIG.max_memory_usage = None
self._old_process = monitor.Process self.pmt._get_process = lambda: MockProcess(self.rss)
monitor.Process = lambda: MockProcess(self.rss)
def tearDown(self): def tearDown(self):
GEVENT_CONFIG.max_memory_usage = self._old_max GEVENT_CONFIG.max_memory_usage = self._old_max
monitor.Process = self._old_process
super(TestPeriodicMonitorMemory, self).tearDown() super(TestPeriodicMonitorMemory, self).tearDown()
def test_can_monitor_and_install(self): def test_can_monitor_and_install(self):
...@@ -284,7 +318,7 @@ class TestPeriodicMonitorMemory(_AbstractTestPeriodicMonitoringThread, ...@@ -284,7 +318,7 @@ class TestPeriodicMonitorMemory(_AbstractTestPeriodicMonitoringThread,
def test_cannot_monitor_and_install(self): def test_cannot_monitor_and_install(self):
import warnings import warnings
monitor.Process = None self.pmt._get_process = lambda: None
self.assertFalse(self.pmt.can_monitor_memory_usage()) self.assertFalse(self.pmt.can_monitor_memory_usage())
# This emits a warning, visible by default # This emits a warning, visible by default
......
...@@ -141,16 +141,18 @@ class Test(greentest.TestCase): ...@@ -141,16 +141,18 @@ class Test(greentest.TestCase):
timeout.start() timeout.start()
timeout.cancel() timeout.cancel()
gevent.sleep(SHOULD_NOT_EXPIRE) gevent.sleep(SHOULD_NOT_EXPIRE)
assert not timeout.pending, timeout self.assertFalse(timeout.pending, timeout)
timeout.close() timeout.close()
@greentest.ignores_leakcheck
def test_with_timeout(self): def test_with_timeout(self):
self.assertRaises(gevent.Timeout, gevent.with_timeout, SHOULD_EXPIRE, gevent.sleep, SHOULD_NOT_EXPIRE) with self.assertRaises(gevent.Timeout):
gevent.with_timeout(SHOULD_EXPIRE, gevent.sleep, SHOULD_NOT_EXPIRE)
X = object() X = object()
r = gevent.with_timeout(SHOULD_EXPIRE, gevent.sleep, SHOULD_NOT_EXPIRE, timeout_value=X) r = gevent.with_timeout(SHOULD_EXPIRE, gevent.sleep, SHOULD_NOT_EXPIRE, timeout_value=X)
assert r is X, (r, X) self.assertIs(r, X)
r = gevent.with_timeout(SHOULD_NOT_EXPIRE, gevent.sleep, SHOULD_EXPIRE, timeout_value=X) r = gevent.with_timeout(SHOULD_NOT_EXPIRE, gevent.sleep, SHOULD_EXPIRE, timeout_value=X)
assert r is None, r self.assertIsNone(r)
if __name__ == '__main__': if __name__ == '__main__':
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment