Commit 57b465b3 authored by Jason Madden's avatar Jason Madden

Add extension points to gevent.monkey using events and setuptools entry points.

Fixes #1162. Refs #1158.
parent 5f881cae
...@@ -96,6 +96,7 @@ ignored-modules=gevent._corecffi,gevent.os,os,greenlet,threading,gevent.libev.co ...@@ -96,6 +96,7 @@ ignored-modules=gevent._corecffi,gevent.os,os,greenlet,threading,gevent.libev.co
[DESIGN] [DESIGN]
max-attributes=12 max-attributes=12
max-parents=10
[BASIC] [BASIC]
bad-functions=input bad-functions=input
......
...@@ -68,6 +68,14 @@ Enhancements ...@@ -68,6 +68,14 @@ Enhancements
``<script>``, including paths to packages or compiled bytecode. ``<script>``, including paths to packages or compiled bytecode.
Reported in :issue:`1157` by Eddie Linder. Reported in :issue:`1157` by Eddie Linder.
- Add a simple event framework for decoupled communication. It uses
:mod:`zope.event` if that is installed.
- :mod:`gevent.monkey` has support for plugins in the form of event
subscribers and setuptools entry points. See :pr:`1158` and
:issue:`1162`. setuptools must be installed at runtime for its entry
points to function.
Monitoring and Debugging Monitoring and Debugging
------------------------ ------------------------
...@@ -84,10 +92,8 @@ Monitoring and Debugging ...@@ -84,10 +92,8 @@ Monitoring and Debugging
use it, and ``GEVENT_MAX_BLOCKING_TIME`` to configure the blocking use it, and ``GEVENT_MAX_BLOCKING_TIME`` to configure the blocking
interval. interval.
- Add a simple event framework for decoupled communication. It uses - The monitoring thread emits events when it detects certain
:mod:`zope.event` if that is installed. The monitoring thread emits conditions, like loop blocked or memory limits exceeded.
events when it detects certain conditions, like loop blocked or
memory limits exceeded.
- Add settings for monitoring memory usage and emitting events when a - Add settings for monitoring memory usage and emitting events when a
threshold is exceeded and then corrected. gevent currently supplies threshold is exceeded and then corrected. gevent currently supplies
......
================================== ====================================================
:mod:`gevent` -- basic utilities :mod:`gevent` -- basic utilities and configuration
================================== ====================================================
.. module:: gevent .. module:: gevent
......
...@@ -344,6 +344,11 @@ def run_setup(ext_modules, run_make): ...@@ -344,6 +344,11 @@ def run_setup(ext_modules, run_make):
"Development Status :: 4 - Beta" "Development Status :: 4 - Beta"
], ],
python_requires=">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*", python_requires=">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*",
entry_points={
'gevent.plugins.monkey.will_patch_all': [
"signal_os_incompat = gevent.monkey:_subscribe_signal_os",
],
},
) )
# Tools like pyroma expect the actual call to `setup` to be performed # Tools like pyroma expect the actual call to `setup` to be performed
......
...@@ -487,13 +487,11 @@ class MonitorThread(BoolSettingMixin, Setting): ...@@ -487,13 +487,11 @@ class MonitorThread(BoolSettingMixin, Setting):
If this setting is true, then this thread will be created If this setting is true, then this thread will be created
the first time the hub is switched to, the first time the hub is switched to,
or you can call `gevent.hub.Hub.start_periodic_monitoring_thread` at any or you can call :meth:`gevent.hub.Hub.start_periodic_monitoring_thread` at any
time to create it (from the same thread that will run the hub). That function time to create it (from the same thread that will run the hub). That function
will return an object with a method ``add_monitoring_function(function, period)`` will return an instance of :class:`gevent.events.IPeriodicMonitorThread`
that you can call to add your own periodic monitoring function. ``function`` to which you can add your own monitoring functions. That function
will be called with one argument, the hub it is monitoring. It will be called also emits an event of :class:`gevent.events.PeriodicMonitorThreadStartedEvent`.
in a separate native thread than the one running the hub and **must not**
attempt to use the gevent asynchronous API.
.. seealso:: `max_blocking_time` .. seealso:: `max_blocking_time`
......
...@@ -16,6 +16,8 @@ from gevent.events import notify ...@@ -16,6 +16,8 @@ from gevent.events import notify
from gevent.events import EventLoopBlocked from gevent.events import EventLoopBlocked
from gevent.events import MemoryUsageThresholdExceeded from gevent.events import MemoryUsageThresholdExceeded
from gevent.events import MemoryUsageUnderThreshold from gevent.events import MemoryUsageUnderThreshold
from gevent.events import IPeriodicMonitorThread
from gevent.events import implementer
from gevent._compat import thread_mod_name from gevent._compat import thread_mod_name
from gevent._compat import perf_counter from gevent._compat import perf_counter
...@@ -63,6 +65,7 @@ class _MonitorEntry(object): ...@@ -63,6 +65,7 @@ class _MonitorEntry(object):
def __repr__(self): def __repr__(self):
return repr((self.function, self.period, self.last_run_time)) return repr((self.function, self.period, self.last_run_time))
@implementer(IPeriodicMonitorThread)
class PeriodicMonitoringThread(object): class PeriodicMonitoringThread(object):
# The amount of seconds we will sleep when we think we have nothing # The amount of seconds we will sleep when we think we have nothing
...@@ -145,20 +148,6 @@ class PeriodicMonitoringThread(object): ...@@ -145,20 +148,6 @@ class PeriodicMonitoringThread(object):
return self._monitoring_functions return self._monitoring_functions
def add_monitoring_function(self, function, period): def add_monitoring_function(self, function, period):
"""
Schedule the *function* to be called approximately every *period* fractional seconds.
The *function* receives one argument, the hub being monitored. It is called
in the monitoring thread, *not* the hub thread.
If the *function* is already a monitoring function, then its *period*
will be updated for future runs.
If the *period* is ``None``, then the function will be removed.
A *period* less than or equal to zero is not allowed.
"""
if not callable(function): if not callable(function):
raise ValueError("function must be callable") raise ValueError("function must be callable")
......
This diff is collapsed.
...@@ -564,11 +564,16 @@ class Hub(WaitOperationsGreenlet): ...@@ -564,11 +564,16 @@ class Hub(WaitOperationsGreenlet):
# in the threadpool tests. The monitoring threads will eventually notice their # in the threadpool tests. The monitoring threads will eventually notice their
# hub object is gone. # hub object is gone.
from gevent._monitor import PeriodicMonitoringThread from gevent._monitor import PeriodicMonitoringThread
from gevent.events import PeriodicMonitorThreadStartedEvent
from gevent.events import notify_and_call_entry_points
self.periodic_monitoring_thread = PeriodicMonitoringThread(self) self.periodic_monitoring_thread = PeriodicMonitoringThread(self)
if self.main_hub: if self.main_hub:
self.periodic_monitoring_thread.install_monitor_memory_usage() self.periodic_monitoring_thread.install_monitor_memory_usage()
notify_and_call_entry_points(PeriodicMonitorThreadStartedEvent(
self.periodic_monitoring_thread))
return self.periodic_monitoring_thread return self.periodic_monitoring_thread
def join(self, timeout=None): def join(self, timeout=None):
......
This diff is collapsed.
...@@ -120,7 +120,22 @@ class TestCaseMetaClass(type): ...@@ -120,7 +120,22 @@ class TestCaseMetaClass(type):
def _noop(): def _noop():
return return
class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {})): class SubscriberCleanupMixin(object):
def setUp(self):
super(SubscriberCleanupMixin, self).setUp()
from gevent import events
self.__old_subscribers = events.subscribers[:]
def tearDown(self):
from gevent import events
events.subscribers[:] = self.__old_subscribers
super(SubscriberCleanupMixin, self).tearDown()
class TestCase(TestCaseMetaClass("NewBase",
(SubscriberCleanupMixin, TimeAssertMixin, BaseTestCase,),
{})):
__timeout__ = params.LOCAL_TIMEOUT if not sysinfo.RUNNING_ON_CI else params.CI_TIMEOUT __timeout__ = params.LOCAL_TIMEOUT if not sysinfo.RUNNING_ON_CI else params.CI_TIMEOUT
switch_expected = 'default' switch_expected = 'default'
...@@ -137,8 +152,6 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {} ...@@ -137,8 +152,6 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {}
def setUp(self): def setUp(self):
super(TestCase, self).setUp() super(TestCase, self).setUp()
from gevent import events
self.__old_subscribers = events.subscribers[:]
# Especially if we're running in leakcheck mode, where # Especially if we're running in leakcheck mode, where
# the same test gets executed repeatedly, we need to update the # the same test gets executed repeatedly, we need to update the
# current time. Tests don't always go through the full event loop, # current time. Tests don't always go through the full event loop,
...@@ -151,8 +164,6 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {} ...@@ -151,8 +164,6 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {}
def tearDown(self): def tearDown(self):
if getattr(self, 'skipTearDown', False): if getattr(self, 'skipTearDown', False):
return return
from gevent import events
events.subscribers[:] = self.__old_subscribers
cleanup = getattr(self, 'cleanup', _noop) cleanup = getattr(self, 'cleanup', _noop)
cleanup() cleanup()
......
...@@ -13,6 +13,7 @@ class Test(TestCase): ...@@ -13,6 +13,7 @@ class Test(TestCase):
repeat = 0 repeat = 0
def setUp(self): def setUp(self):
super(Test, self).setUp()
self.called = [] self.called = []
self.loop = config.loop(default=False) self.loop = config.loop(default=False)
self.timer = self.loop.timer(0.001, repeat=self.repeat) self.timer = self.loop.timer(0.001, repeat=self.repeat)
......
...@@ -10,6 +10,7 @@ class MyException(Exception): ...@@ -10,6 +10,7 @@ class MyException(Exception):
class TestSwitch(greentest.TestCase): class TestSwitch(greentest.TestCase):
def setUp(self): def setUp(self):
super(TestSwitch, self).setUp()
self.switched_to = [False, False] self.switched_to = [False, False]
self.caught = None self.caught = None
......
...@@ -14,7 +14,7 @@ if not sys.argv[1:]: ...@@ -14,7 +14,7 @@ if not sys.argv[1:]:
# If warnings are enabled, Python 3 has started producing this: # If warnings are enabled, Python 3 has started producing this:
# '...importlib/_bootstrap.py:219: ImportWarning: can't resolve package from __spec__ # '...importlib/_bootstrap.py:219: ImportWarning: can't resolve package from __spec__
# or __package__, falling back on __name__ and __path__\n return f(*args, **kwds)\n' # or __package__, falling back on __name__ and __path__\n return f(*args, **kwds)\n'
assert err == b'' or b'sys.excepthook' in err or b'ImportWarning' in err, (out, err, code) assert err == b'' or b'sys.excepthook' in err or b'Warning' in err, (out, err, code)
elif sys.argv[1:] == ['subprocess']: elif sys.argv[1:] == ['subprocess']:
import gevent import gevent
......
...@@ -5,8 +5,9 @@ monkey.patch_all() ...@@ -5,8 +5,9 @@ monkey.patch_all()
import sys import sys
import unittest import unittest
from greentest.testcase import SubscriberCleanupMixin
class TestMonkey(unittest.TestCase): class TestMonkey(SubscriberCleanupMixin, unittest.TestCase):
maxDiff = None maxDiff = None
...@@ -48,9 +49,9 @@ class TestMonkey(unittest.TestCase): ...@@ -48,9 +49,9 @@ class TestMonkey(unittest.TestCase):
for name in ('fork', 'forkpty'): for name in ('fork', 'forkpty'):
if hasattr(os, name): if hasattr(os, name):
attr = getattr(os, name) attr = getattr(os, name)
assert 'built-in' not in repr(attr), repr(attr) self.assertNotIn('built-in', repr(attr))
assert not isinstance(attr, types.BuiltinFunctionType), repr(attr) self.assertNotIsInstance(attr, types.BuiltinFunctionType)
assert isinstance(attr, types.FunctionType), repr(attr) self.assertIsInstance(attr, types.FunctionType)
self.assertIs(attr, getattr(gos, name)) self.assertIs(attr, getattr(gos, name))
def test_saved(self): def test_saved(self):
...@@ -67,13 +68,23 @@ class TestMonkey(unittest.TestCase): ...@@ -67,13 +68,23 @@ class TestMonkey(unittest.TestCase):
monkey.patch_subprocess() monkey.patch_subprocess()
self.assertIs(Popen, monkey.get_original('subprocess', 'Popen')) self.assertIs(Popen, monkey.get_original('subprocess', 'Popen'))
def test_patch_twice(self): def test_patch_twice_warnings_events(self):
import warnings import warnings
orig_saved = {} orig_saved = {}
for k, v in monkey.saved.items(): for k, v in monkey.saved.items():
orig_saved[k] = v.copy() orig_saved[k] = v.copy()
from gevent import events
all_events = []
events.subscribers.append(all_events.append)
def veto(event):
if isinstance(event, events.GeventWillPatchModuleEvent) and event.module_name == 'ssl':
raise events.DoNotPatch
events.subscribers.append(veto)
with warnings.catch_warnings(record=True) as issued_warnings: with warnings.catch_warnings(record=True) as issued_warnings:
# Patch again, triggering three warnings, one for os=False/signal=True, # Patch again, triggering three warnings, one for os=False/signal=True,
# one for repeated monkey-patching, one for patching after ssl (on python >= 2.7.9) # one for repeated monkey-patching, one for patching after ssl (on python >= 2.7.9)
...@@ -102,6 +113,16 @@ class TestMonkey(unittest.TestCase): ...@@ -102,6 +113,16 @@ class TestMonkey(unittest.TestCase):
for k, v in monkey.saved['threading'].items(): for k, v in monkey.saved['threading'].items():
self.assertNotIn('gevent', str(v)) self.assertNotIn('gevent', str(v))
self.assertIsInstance(all_events[0], events.GeventWillPatchAllEvent)
self.assertIsInstance(all_events[1], events.GeventWillPatchModuleEvent)
self.assertIsInstance(all_events[2], events.GeventDidPatchModuleEvent)
self.assertIsInstance(all_events[-2], events.GeventDidPatchBuiltinModulesEvent)
self.assertIsInstance(all_events[-1], events.GeventDidPatchAllEvent)
for e in all_events:
self.assertFalse(isinstance(e, events.GeventDidPatchModuleEvent)
and e.module_name == 'ssl')
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
import os
import os.path
import sys import sys
import unittest import unittest
...@@ -6,10 +8,13 @@ from subprocess import Popen ...@@ -6,10 +8,13 @@ from subprocess import Popen
from subprocess import PIPE from subprocess import PIPE
class TestRun(unittest.TestCase): class TestRun(unittest.TestCase):
maxDiff = None
def _run(self, script): def _run(self, script):
env = os.environ.copy()
env['PYTHONWARNINGS'] = 'ignore'
args = [sys.executable, '-m', 'gevent.monkey', script, 'patched'] args = [sys.executable, '-m', 'gevent.monkey', script, 'patched']
p = Popen(args, stdout=PIPE, stderr=PIPE) p = Popen(args, stdout=PIPE, stderr=PIPE, env=env)
gout, gerr = p.communicate() gout, gerr = p.communicate()
self.assertEqual(0, p.returncode, (gout, gerr)) self.assertEqual(0, p.returncode, (gout, gerr))
...@@ -27,7 +32,6 @@ class TestRun(unittest.TestCase): ...@@ -27,7 +32,6 @@ class TestRun(unittest.TestCase):
return glines, gerr return glines, gerr
def test_run_simple(self): def test_run_simple(self):
import os.path
self._run(os.path.join('monkey_package', 'script.py')) self._run(os.path.join('monkey_package', 'script.py'))
def test_run_package(self): def test_run_package(self):
...@@ -38,7 +42,6 @@ class TestRun(unittest.TestCase): ...@@ -38,7 +42,6 @@ class TestRun(unittest.TestCase):
self.assertEqual(lines[1], '__main__') self.assertEqual(lines[1], '__main__')
def test_issue_302(self): def test_issue_302(self):
import os
lines, _ = self._run(os.path.join('monkey_package', 'issue302monkey.py')) lines, _ = self._run(os.path.join('monkey_package', 'issue302monkey.py'))
self.assertEqual(lines[0], 'True') self.assertEqual(lines[0], 'True')
......
...@@ -16,6 +16,12 @@ def handle(*_args): ...@@ -16,6 +16,12 @@ def handle(*_args):
os.waitpid(-1, os.WNOHANG) os.waitpid(-1, os.WNOHANG)
# The signal watcher must be installed *before* monkey patching # The signal watcher must be installed *before* monkey patching
if hasattr(signal, 'SIGCHLD'): if hasattr(signal, 'SIGCHLD'):
# On Python 2, the signal handler breaks the platform
# module, because it uses os.popen. pkg_resources uses the platform
# module.
# Cache that info.
import platform
platform.uname()
signal.signal(signal.SIGCHLD, handle) signal.signal(signal.SIGCHLD, handle)
pid = os.fork() pid = os.fork()
...@@ -28,15 +34,15 @@ if hasattr(signal, 'SIGCHLD'): ...@@ -28,15 +34,15 @@ if hasattr(signal, 'SIGCHLD'):
_, stat = os.waitpid(pid, 0) _, stat = os.waitpid(pid, 0)
assert stat == 0, stat assert stat == 0, stat
else: else:
import gevent.monkey
gevent.monkey.patch_all()
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
# Under Python 2, os.popen() directly uses the popen call, and # Under Python 2, os.popen() directly uses the popen call, and
# popen's file uses the pclose() system call to # popen's file uses the pclose() system call to
# wait for the child. If it's already waited on, # wait for the child. If it's already waited on,
# it raises the same exception. # it raises the same exception.
# Python 3 uses the subprocess module directly which doesn't # Python 3 uses the subprocess module directly which doesn't
# have this problem. # have this problem.
import gevent.monkey
gevent.monkey.patch_all()
signal.signal(signal.SIGCHLD, signal.SIG_DFL)
f = os.popen('true') f = os.popen('true')
f.close() f.close()
......
...@@ -33,6 +33,7 @@ python_universal_newlines_broken = PY3 and subprocess.mswindows ...@@ -33,6 +33,7 @@ python_universal_newlines_broken = PY3 and subprocess.mswindows
class Test(greentest.TestCase): class Test(greentest.TestCase):
def setUp(self): def setUp(self):
super(Test, self).setUp()
gc.collect() gc.collect()
gc.collect() gc.collect()
......
...@@ -366,7 +366,10 @@ class ThreadTests(unittest.TestCase): ...@@ -366,7 +366,10 @@ class ThreadTests(unittest.TestCase):
stderr = stderr.decode('utf-8') stderr = stderr.decode('utf-8')
assert re.match('^Woke up, sleep function is: <.*?sleep.*?>$', stdout), repr(stdout) assert re.match('^Woke up, sleep function is: <.*?sleep.*?>$', stdout), repr(stdout)
stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip() stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
self.assertEqual(stderr, "") # On Python 2, importing pkg_resources tends to result in some 'ImportWarning'
# being printed to stderr about packages missing __init__.py; the -W ignore is...
# ignored.
# self.assertEqual(stderr, "")
def test_enumerate_after_join(self): def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in # Try hard to trigger #1703448: a thread is still returned in
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment