Commit 8917d73a authored by Jason Madden's avatar Jason Madden

Start having the monitor thread emit events for monitored conditions.

This is the only reasonable path I could think of to enable memory monitoring, with gevent just being responsible for monitoring the memory and detecting overage conditions, while users plug in their own policies.
parent 76cbf2fa
......@@ -26,7 +26,9 @@
- Add an optional monitoring thread for each hub. When enabled, this
thread (by default) looks for greenlets that block the event loop
for more than 0.1s. You can add your own periodic monitoring
functions to this thread.
functions to this thread. Set ``GEVENT_MONITOR_THREAD_ENABLE`` to
use it, and ``GEVENT_MAX_BLOCKING_TIME`` to configure the blocking
interval.
- When gevent prints a timestamp as part of an error message, it is
now in UTC format as specified by RFC3339.
......@@ -42,6 +44,11 @@
are interested in, reducing CPU usage. Reported in :issue:`1144` by
wwqgtxx.
- Add a simple event framework for decoupled communication. It uses
:mod:`zope.event` if that is installed. The monitoring thread emits
events when it detects certain conditions, like loop blocked or
memory limits exceeded.
1.3a2 (2018-03-06)
==================
......
......@@ -131,7 +131,7 @@ install:
# pip will build them from source using the MSVC compiler matching the
# target Python version and architecture
# Note that psutil won't build under PyPy on Windows.
- "%CMD_IN_ENV% pip install -e git+https://github.com/cython/cython.git@63cd3bbb5eac22b92808eeb90b512359e3def20a#egg=cython"
- "%CMD_IN_ENV% pip install -U cython zope.interface zope.event"
- "%CMD_IN_ENV% pip install -U setuptools wheel greenlet cffi dnspython idna requests"
- ps:
......
......@@ -23,10 +23,15 @@ idna
psutil
# benchmarks use this
perf
# Used in a test
# Events
zope.event
zope.interface
# Tests
requests
# For viewing README.rst (restview --long-description),
# CONTRIBUTING.rst, etc.
# https://github.com/mgedmin/restview
restview
-r rtd-requirements.txt
......@@ -39,10 +39,16 @@ extensions = [
'sphinx.ext.intersphinx',
'mysphinxext',
'sphinx.ext.extlinks',
'sphinx.ext.viewcode',
'repoze.sphinx.autointerface',
]
intersphinx_mapping = {'http://docs.python.org/': None,
'https://greenlet.readthedocs.io/en/latest/': None}
intersphinx_mapping = {
'http://docs.python.org/': None,
'https://greenlet.readthedocs.io/en/latest/': None,
'https://zopeevent.readthedocs.io/en/latest/': None,
'https://zopecomponent.readthedocs.io/en/latest/': None,
}
extlinks = {'issue': ('https://github.com/gevent/gevent/issues/%s',
'issue #'),
......
......@@ -23,4 +23,5 @@ API reference
gevent.threadpool
gevent.time
gevent.util
gevent.events
lowlevel
......@@ -241,6 +241,10 @@ def run_setup(ext_modules, run_make):
'dnspython',
'idna',
],
'events': [
'zope.event',
'zope.interface',
],
},
# It's always safe to pass the CFFI keyword, even if
# cffi is not installed: it's just ignored in that case.
......
......@@ -436,9 +436,13 @@ class TrackGreenletTree(BoolSettingMixin, Setting):
.. versionadded:: 1.3b1
"""
## Monitoring settings
# All env keys should begin with GEVENT_MONITOR
class MonitorThread(BoolSettingMixin, Setting):
name = 'monitor_thread'
environment_key = 'GEVENT_ENABLE_MONITOR_THREAD'
environment_key = 'GEVENT_MONITOR_THREAD_ENABLE'
default = False
desc = """\
......@@ -466,6 +470,8 @@ class MonitorThread(BoolSettingMixin, Setting):
class MaxBlockingTime(FloatSettingMixin, Setting):
name = 'max_blocking_time'
# This environment key doesn't follow the convention because it's
# meant to match a key used by existing projects
environment_key = 'GEVENT_MAX_BLOCKING_TIME'
default = 0.1
......
......@@ -12,10 +12,13 @@ from greenlet import getcurrent
from gevent import config as GEVENT_CONFIG
from gevent.monkey import get_original
from gevent.util import format_run_info
from gevent.events import notify
from gevent.events import EventLoopBlocked
from gevent._compat import thread_mod_name
from gevent._util import gmctime
# Clocks
try:
# Python 3.3+ (PEP 418)
......@@ -268,7 +271,14 @@ class PeriodicMonitoringThread(object):
report.extend(format_run_info(greenlet_stacks=False,
current_thread_ident=self.monitor_thread_ident))
report.append(report[0])
hub.exception_stream.write('\n'.join(report))
stream = hub.exception_stream
for line in report:
# Printing line by line may interleave with other things,
# but it should also prevent a "reentrant call to print"
# when the report is large.
print(line, file=stream)
notify(EventLoopBlocked(active_greenlet, GEVENT_CONFIG.max_blocking_time, report))
return (active_greenlet, report)
def ignore_current_greenlet_blocking(self):
......
# -*- coding: utf-8 -*-
# Copyright 2018 gevent. See LICENSE for details.
"""
Publish/subscribe event infrastructure.
When certain "interesting" things happen during the lifetime of the
process, gevent will "publish" an event (an object). That event is
delivered to interested "subscribers" (functions that take one
parameter, the event object).
Higher level frameworks may take this foundation and build richer
models on it.
If :mod:`zope.event` is installed, then it will be used to provide the
functionality of `notify` and `subscribers`. See
:mod:`zope.event.classhandler` for a simple class-based approach to
subscribing to a filtered list of events, and see `zope.component
<https://zopecomponent.readthedocs.io/en/latest/event.html>`_ for a
much higher-level, flexible system. If you are using one of these systems,
you generally will not want to directly modify `subscribers`.
.. versionadded:: 1.3b1
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
__all__ = [
'subscribers',
'IEventLoopBlocked',
'EventLoopBlocked',
]
try:
from zope.event import subscribers
from zope.event import notify
except ImportError:
#: Applications may register for notification of events by appending a
#: callable to the ``subscribers`` list.
#:
#: Each subscriber takes a single argument, which is the event object
#: being published.
#:
#: Exceptions raised by subscribers will be propagated *without* running
#: any remaining subscribers.
subscribers = []
def notify(event):
"""
Notify all subscribers of ``event``.
"""
for subscriber in subscribers:
subscriber(event)
notify = notify # export
try:
from zope.interface import Interface
from zope.interface import implementer
from zope.interface import Attribute
except ImportError:
class Interface(object):
pass
def implementer(_iface):
def dec(c):
return c
return dec
def Attribute(s):
return s
class IEventLoopBlocked(Interface):
"""
The event emitted when the event loop is blocked.
This event is emitted in the monitor thread.
"""
greenlet = Attribute("The greenlet that appeared to be blocking the loop.")
blocking_time = Attribute("The approximate time in seconds the loop has been blocked.")
info = Attribute("A sequence of string lines providing extra info.")
@implementer(IEventLoopBlocked)
class EventLoopBlocked(object):
"""
The event emitted when the event loop is blocked.
Implements `IEventLoopBlocked`.
"""
def __init__(self, greenlet, blocking_time, info):
self.greenlet = greenlet
self.blocking_time = blocking_time
self.info = info
......@@ -611,6 +611,8 @@ class timer(_base.TimerMixin, watcher):
self._after, self._repeat = args
if self._after and self._after < 0.001:
import warnings
# XXX: The stack level is hard to determine, could be getting here
# through a number of different ways.
warnings.warn("libuv only supports millisecond timer resolution; "
"all times less will be set to 1 ms",
stacklevel=6)
......
......@@ -127,6 +127,7 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {}
error_fatal = True
uses_handle_error = True
close_on_teardown = ()
__old_subscribers = ()
def run(self, *args, **kwargs):
# pylint:disable=arguments-differ
......@@ -136,6 +137,8 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {}
def setUp(self):
super(TestCase, self).setUp()
from gevent import events
self.__old_subscribers = events.subscribers[:]
# Especially if we're running in leakcheck mode, where
# the same test gets executed repeatedly, we need to update the
# current time. Tests don't always go through the full event loop,
......@@ -148,6 +151,9 @@ class TestCase(TestCaseMetaClass("NewBase", (TimeAssertMixin, BaseTestCase,), {}
def tearDown(self):
if getattr(self, 'skipTearDown', False):
return
from gevent import events
events.subscribers[:] = self.__old_subscribers
cleanup = getattr(self, 'cleanup', _noop)
cleanup()
self._error = self._none
......
......@@ -55,7 +55,6 @@ class TestPeriodicMonitoringThread(unittest.TestCase):
self.assertIs(self.hub, self.pmt.hub)
del self.hub
import gc
gc.collect()
self.assertIsNone(self.pmt.hub)
......@@ -151,6 +150,12 @@ class TestPeriodicMonitoringThread(unittest.TestCase):
def test_monitor_blocking(self):
# Initially there's no active greenlet and no switches,
# so nothing is considered blocked
from gevent.events import subscribers
from gevent.events import IEventLoopBlocked
from zope.interface.verify import verifyObject
events = []
subscribers.append(events.append)
self.assertFalse(self.pmt.monitor_blocking(self.hub))
# Give it an active greenlet
......@@ -160,13 +165,18 @@ class TestPeriodicMonitoringThread(unittest.TestCase):
# We've switched, so we're not blocked
self.assertFalse(self.pmt.monitor_blocking(self.hub))
self.assertFalse(events)
# Again without switching is a problem.
self.assertTrue(self.pmt.monitor_blocking(self.hub))
self.assertTrue(events)
verifyObject(IEventLoopBlocked, events[0])
del events[:]
# But we can order it not to be a problem
self.pmt.ignore_current_greenlet_blocking()
self.assertFalse(self.pmt.monitor_blocking(self.hub))
self.assertFalse(events)
# And back again
self.pmt.monitor_current_greenlet_blocking()
......
# -*- coding: utf-8 -*-
# Copyright 2018 gevent. See LICENSE.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
from gevent import events
from zope.interface import verify
class TestImplements(unittest.TestCase):
def test_event_loop_blocked(self):
verify.verifyClass(events.IEventLoopBlocked, events.EventLoopBlocked)
class TestEvents(unittest.TestCase):
def test_is_zope(self):
from zope import event
self.assertIs(events.subscribers, event.subscribers)
self.assertIs(events.notify, event.notify)
if __name__ == '__main__':
unittest.main()
......@@ -50,8 +50,12 @@ class Test(greentest.TestCase):
def test2(self):
timer = gevent.get_hub().loop.timer(0)
timer.start(hello2)
gevent.sleep(0.1)
assert sys.exc_info() == (None, None, None), sys.exc_info()
try:
gevent.sleep(0.1)
self.assertEqual(sys.exc_info(), (None, None, None))
finally:
timer.close()
if __name__ == '__main__':
......
......@@ -39,8 +39,11 @@ class TestCloseSocketWhilePolling(greentest.TestCase):
with self.assertRaises(Exception):
sock = socket.socket()
self._close_on_teardown(sock)
get_hub().loop.timer(0, sock.close)
sock.connect(('python.org', 81))
t = get_hub().loop.timer(0, sock.close)
try:
sock.connect(('python.org', 81))
finally:
t.close()
gevent.sleep(0)
......
......@@ -17,3 +17,4 @@ test__issue330.py
test___ident.py
test___config.py
test___monitor.py
test__events.py
......@@ -133,3 +133,4 @@ test__issue_728.py
test__refcount_core.py
test__api.py
test__monitor.py
test__events.py
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment