Commit 277e34ca authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1150 from gevent/mem-monitor

Add monitoring for memory usage, emitting events as it moves around the threshold.
parents 320be8f0 0a56f56f
......@@ -4,31 +4,33 @@
.. currentmodule:: gevent
1.3a3 (unreleased)
1.3b1 (unreleased)
==================
- Use strongly typed watcher callbacks in the libuv CFFI extensions.
This prevents dozens of compiler warnings.
Dependencies
------------
- Cython 0.28.1 is now used to build gevent from a source checkout.
Bug Fixes
---------
- On Python 2, when monkey-patching `threading.Event`, also
monkey-patch the underlying class, ``threading._Event``. Some code
may be type-checking for that. See :issue:`1136`.
- Introduce the configuration variable
`gevent.config.track_greenlet_tree` (aka
``GEVENT_TRACK_GREENLET_TREE``) to allow disabling the greenlet tree
features for applications where greenlet spawning is performance
critical. This restores spawning performance to 1.2 levels.
- Fix libuv io watchers polling for events that only stopped watchers
are interested in, reducing CPU usage. Reported in :issue:`1144` by
wwqgtxx.
Enhancements
------------
- Add additional optimizations for spawning greenlets, making it
faster than 1.3a2.
- Add an optional monitoring thread for each hub. When enabled, this
thread (by default) looks for greenlets that block the event loop
for more than 0.1s. You can add your own periodic monitoring
functions to this thread. Set ``GEVENT_MONITOR_THREAD_ENABLE`` to
use it, and ``GEVENT_MAX_BLOCKING_TIME`` to configure the blocking
interval.
- Use strongly typed watcher callbacks in the libuv CFFI extensions.
This prevents dozens of compiler warnings.
- When gevent prints a timestamp as part of an error message, it is
now in UTC format as specified by RFC3339.
......@@ -40,15 +42,33 @@
- Hub objects now include the value of their ``name`` attribute in
their repr.
- Fix libuv io watchers polling for events that only stopped watchers
are interested in, reducing CPU usage. Reported in :issue:`1144` by
wwqgtxx.
Monitoring and Debugging
------------------------
- Introduce the configuration variable
`gevent.config.track_greenlet_tree` (aka
``GEVENT_TRACK_GREENLET_TREE``) to allow disabling the greenlet tree
features for applications where greenlet spawning is performance
critical. This restores spawning performance to 1.2 levels.
- Add an optional monitoring thread for each hub. When enabled, this
thread (by default) looks for greenlets that block the event loop
for more than 0.1s. You can add your own periodic monitoring
functions to this thread. Set ``GEVENT_MONITOR_THREAD_ENABLE`` to
use it, and ``GEVENT_MAX_BLOCKING_TIME`` to configure the blocking
interval.
- Add a simple event framework for decoupled communication. It uses
:mod:`zope.event` if that is installed. The monitoring thread emits
events when it detects certain conditions, like loop blocked or
memory limits exceeded.
- Add settings for monitoring memory usage and emitting events when a
threshold is exceeded and then corrected. gevent currently supplies
no policy for what to do when memory exceeds the configured limit.
``psutil`` must be installed to use this. See :pr:`1150`.
1.3a2 (2018-03-06)
==================
......
......@@ -5,7 +5,7 @@ wheel
# 0.28 is faster, and (important!) lets us specify the target module
# name to be created so that we can have both foo.py and _foo.so
# at the same time.
Cython >= 0.28
Cython >= 0.28.1
# Python 3.7b1 requires this.
greenlet>=0.4.13 ; platform_python_implementation == "CPython"
......
......@@ -18,11 +18,16 @@ _version_info = namedtuple('version_info',
#: The programatic version identifier. The fields have (roughly) the
#: same meaning as :data:`sys.version_info`
#: Deprecated in 1.2.
#: .. deprecated:: 1.2
#: Use ``pkg_resources.parse_version(__version__)`` (or the equivalent
#: ``packaging.version.Version(__version__)``).
version_info = _version_info(1, 3, 0, 'dev', 0)
#: The human-readable PEP 440 version identifier
__version__ = '1.3a3.dev0'
#: The human-readable PEP 440 version identifier.
#: Use ``pkg_resources.parse_version(__version__)`` or
#: ``packaging.version.Version(__version__)`` to get a machine-usable
#: value.
__version__ = '1.3b1.dev0'
__all__ = [
......
......@@ -140,3 +140,17 @@ except ImportError:
# exist. Probably on Python 2 with an astral character.
# Not sure how to handle this.
raise UnicodeEncodeError("Can't encode path to filesystem encoding")
## Clocks
try:
# Python 3.3+ (PEP 418)
from time import perf_counter
perf_counter = perf_counter
except ImportError:
import time
if sys.platform == "win32":
perf_counter = time.clock
else:
perf_counter = time.time
......@@ -279,17 +279,41 @@ class IntSettingMixin(object):
validate = staticmethod(validate_anything)
class FloatSettingMixin(object):
def _convert(self, value):
if value:
return float(value)
class _PositiveValueMixin(object):
def validate(self, value):
if value is not None and value <= 0:
raise ValueError("Must be > 0")
raise ValueError("Must be positive")
return value
class FloatSettingMixin(_PositiveValueMixin):
def _convert(self, value):
if value:
return float(value)
class ByteCountSettingMixin(_PositiveValueMixin):
_MULTIPLES = {
# All keys must be the same size.
'kb': 1024,
'mb': 1024 * 1024,
'gb': 1024 * 1024 * 1024,
}
_SUFFIX_SIZE = 2
def _convert(self, value):
if not value or not isinstance(value, str):
return value
value = value.lower()
for s, m in self._MULTIPLES.items():
if value[-self._SUFFIX_SIZE:] == s:
return int(value[:-self._SUFFIX_SIZE]) * m
return int(value)
class Resolver(ImportableSetting, Setting):
desc = """\
......@@ -493,6 +517,36 @@ class MaxBlockingTime(FloatSettingMixin, Setting):
.. versionadded:: 1.3b1
"""
class MonitorMemoryPeriod(FloatSettingMixin, Setting):
name = 'memory_monitor_period'
environment_key = 'GEVENT_MONITOR_MEMORY_PERIOD'
default = 5
desc = """\
If `monitor_thread` is enabled, this is approximately how long
(in seconds) we will go between checking the processes memory usage.
Checking the memory usage is relatively expensive on some operating
systems, so this should not be too low. gevent will place a floor
value on it.
"""
class MonitorMemoryMaxUsage(ByteCountSettingMixin, Setting):
name = 'max_memory_usage'
environment_key = 'GEVENT_MONITOR_MEMORY_MAX'
default = None
desc = """\
If `monitor_thread` is enabled,
then if memory usage exceeds this amount (in bytes), events will
be emitted. See `gevent.events`.
There is no default value for this setting. If you wish to
cap memory usage, you must choose a value.
"""
# The ares settings are all interpreted by
# gevent/resolver/ares.pyx, so we don't do
# any validation here.
......
......@@ -14,23 +14,14 @@ from gevent.monkey import get_original
from gevent.util import format_run_info
from gevent.events import notify
from gevent.events import EventLoopBlocked
from gevent.events import MemoryUsageThresholdExceeded
from gevent.events import MemoryUsageUnderThreshold
from gevent._compat import thread_mod_name
from gevent._compat import perf_counter
from gevent._util import gmctime
# Clocks
try:
# Python 3.3+ (PEP 418)
from time import perf_counter
except ImportError:
import time
if sys.platform == "win32":
perf_counter = time.clock
else:
perf_counter = time.time
__all__ = [
'PeriodicMonitoringThread',
]
......@@ -39,6 +30,24 @@ get_thread_ident = get_original(thread_mod_name, 'get_ident')
start_new_thread = get_original(thread_mod_name, 'start_new_thread')
thread_sleep = get_original('time', 'sleep')
try:
# The standard library 'resource' module doesn't provide
# a standard way to get the RSS measure, only the maximum.
# You might be tempted to try to compute something by adding
# together text and data sizes, but on many systems those come back
# zero. So our only option is psutil.
from psutil import Process, AccessDenied
# Make sure it works (why would we be denied access to our own process?)
try:
Process().memory_full_info()
except AccessDenied: # pragma: no cover
Process = None
except ImportError:
pass
class MonitorWarning(RuntimeWarning):
"""The type of warnings we emit."""
class _MonitorEntry(object):
__slots__ = ('function', 'period', 'last_run_time')
......@@ -76,6 +85,10 @@ class PeriodicMonitoringThread(object):
# what particular monitoring functions want to say.
min_sleep_time = 0.005
# The minimum period in seconds at which we will check memory usage.
# Getting memory usage is fairly expensive.
min_memory_monitor_period = 2
# A list of _MonitorEntry objects: [(function(hub), period, last_run_time))]
# The first entry is always our entry for self.monitor_blocking
_monitoring_functions = None
......@@ -83,6 +96,11 @@ class PeriodicMonitoringThread(object):
# The calculated min sleep time for the monitoring functions list.
_calculated_sleep_time = None
# A boolean value that also happens to capture the
# memory usage at the time we exceeded the threshold. Reset
# to 0 when we go back below.
_memory_exceeded = 0
def __init__(self, hub):
self._hub_wref = wref(hub, self._on_hub_gc)
self.should_run = True
......@@ -288,6 +306,52 @@ class PeriodicMonitoringThread(object):
def monitor_current_greenlet_blocking(self):
self._active_greenlet = getcurrent()
def can_monitor_memory_usage(self):
return Process is not None
def install_monitor_memory_usage(self):
# Start monitoring memory usage, if possible.
# If not possible, emit a warning.
if not self.can_monitor_memory_usage():
import warnings
warnings.warn("Unable to monitor memory usage. Install psutil.",
MonitorWarning)
return
self.add_monitoring_function(self.monitor_memory_usage,
max(GEVENT_CONFIG.memory_monitor_period,
self.min_memory_monitor_period))
def monitor_memory_usage(self, _hub):
max_allowed = GEVENT_CONFIG.max_memory_usage
if not max_allowed:
# They disabled it.
return -1 # value for tests
rusage = Process().memory_full_info()
# uss only documented available on Windows, Linux, and OS X.
# If not available, fall back to rss as an aproximation.
mem_usage = getattr(rusage, 'uss', 0) or rusage.rss
event = None # Return value for tests
if mem_usage > max_allowed:
if mem_usage > self._memory_exceeded:
# We're still growing
event = MemoryUsageThresholdExceeded(
mem_usage, max_allowed, rusage)
notify(event)
self._memory_exceeded = mem_usage
else:
# we're below. Were we above it last time?
if self._memory_exceeded:
event = MemoryUsageUnderThreshold(
mem_usage, max_allowed, rusage, self._memory_exceeded)
notify(event)
self._memory_exceeded = 0
return event
def __repr__(self):
return '<%s at %s in thread %s greenlet %r for %r>' % (
self.__class__.__name__,
......
......@@ -30,6 +30,10 @@ __all__ = [
'subscribers',
'IEventLoopBlocked',
'EventLoopBlocked',
'IMemoryUsageThresholdExceeded',
'MemoryUsageThresholdExceeded',
'IMemoryUsageUnderThreshold',
'MemoryUsageUnderThreshold',
]
try:
......@@ -94,3 +98,70 @@ class EventLoopBlocked(object):
self.greenlet = greenlet
self.blocking_time = blocking_time
self.info = info
class IMemoryUsageThresholdExceeded(Interface):
"""
The event emitted when the memory usage threshold is exceeded.
This event is emitted only while memory continues to grow
above the threshold. Only if the condition or stabilized is corrected (memory
usage drops) will the event be emitted in the future.
This event is emitted in the monitor thread.
"""
mem_usage = Attribute("The current process memory usage, in bytes.")
max_allowed = Attribute("The maximum allowed memory usage, in bytes.")
memory_info = Attribute("The tuple of memory usage stats return by psutil.")
class _AbstractMemoryEvent(object):
def __init__(self, mem_usage, max_allowed, memory_info):
self.mem_usage = mem_usage
self.max_allowed = max_allowed
self.memory_info = memory_info
def __repr__(self):
return "<%s used=%d max=%d details=%r>" % (
self.__class__.__name__,
self.mem_usage,
self.max_allowed,
self.memory_info,
)
@implementer(IMemoryUsageThresholdExceeded)
class MemoryUsageThresholdExceeded(_AbstractMemoryEvent):
"""
Implementation of `IMemoryUsageThresholdExceeded`.
"""
class IMemoryUsageUnderThreshold(Interface):
"""
The event emitted when the memory usage drops below the
threshold after having previously been above it.
This event is emitted only the first time memory usage is detected
to be below the threshold after having previously been above it.
If memory usage climbs again, a `IMemoryUsageThresholdExceeded`
event will be broadcast, and then this event could be broadcast again.
This event is emitted in the monitor thread.
"""
mem_usage = Attribute("The current process memory usage, in bytes.")
max_allowed = Attribute("The maximum allowed memory usage, in bytes.")
max_memory_usage = Attribute("The memory usage that caused the previous "
"IMemoryUsageThresholdExceeded event.")
memory_info = Attribute("The tuple of memory usage stats return by psutil.")
@implementer(IMemoryUsageUnderThreshold)
class MemoryUsageUnderThreshold(_AbstractMemoryEvent):
"""
Implementation of `IMemoryUsageUnderThreshold`.
"""
def __init__(self, mem_usage, max_allowed, memory_info, max_usage):
super(MemoryUsageUnderThreshold, self).__init__(mem_usage, max_allowed, memory_info)
self.max_memory_usage = max_usage
......@@ -61,7 +61,7 @@ class _Threadlocal(get_original(thread_mod_name, '_local')):
_threadlocal = _Threadlocal()
get_thread_ident = get_original(thread_mod_name, 'get_ident')
MAIN_THREAD = get_thread_ident() # XXX: Assuming import is done on the main thread.
MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.
class LoopExit(Exception):
......@@ -507,7 +507,7 @@ class Hub(TrackedRawGreenlet):
# loop. See #237 and #238.
self.loop = _threadlocal.loop
else:
if default is None and self.thread_ident != MAIN_THREAD:
if default is None and self.thread_ident != MAIN_THREAD_IDENT:
default = False
if loop is None:
......@@ -537,7 +537,7 @@ class Hub(TrackedRawGreenlet):
.. versionadded:: 1.3b1
"""
return self.thread_ident == MAIN_THREAD
return self.thread_ident == MAIN_THREAD_IDENT
def __repr__(self):
......@@ -742,9 +742,6 @@ class Hub(TrackedRawGreenlet):
def start_periodic_monitoring_thread(self):
if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread:
# TODO: If we're the main thread, then add the memory monitoring
# function.
# Note that it is possible for one real thread to
# (temporarily) wind up with multiple monitoring threads,
# if hubs are started and stopped within the thread. This shows up
......@@ -752,6 +749,10 @@ class Hub(TrackedRawGreenlet):
# hub object is gone.
from gevent._monitor import PeriodicMonitoringThread
self.periodic_monitoring_thread = PeriodicMonitoringThread(self)
if self.main_hub:
self.periodic_monitoring_thread.install_monitor_memory_usage()
return self.periodic_monitoring_thread
def join(self, timeout=None):
......
......@@ -39,6 +39,7 @@ skipOnPyPy = _do_not_skip
skipOnPyPyOnCI = _do_not_skip
skipOnPyPy3OnCI = _do_not_skip
skipOnPyPy3 = _do_not_skip
skipOnPyPyOnWindows = _do_not_skip
skipOnPurePython = unittest.skip if sysinfo.PURE_PYTHON else _do_not_skip
skipWithCExtensions = unittest.skip if not sysinfo.PURE_PYTHON else _do_not_skip
......@@ -75,6 +76,9 @@ if sysinfo.PYPY:
if sysinfo.RUNNING_ON_CI:
skipOnPyPyOnCI = unittest.skip
if sysinfo.WIN:
skipOnPyPyOnWindows = unittest.skip
if sysinfo.PYPY3:
skipOnPyPy3 = unittest.skip
if sysinfo.RUNNING_ON_CI:
......
This diff is collapsed.
......@@ -4,12 +4,14 @@ import gevent
from gevent.event import Event, AsyncResult
import greentest
from greentest.skipping import skipUnderCoverage
from greentest.six import xrange
from greentest.timing import AbstractGenericGetTestCase
from greentest.timing import AbstractGenericWaitTestCase
from greentest.timing import SMALL_TICK
from greentest.timing import SMALL_TICK_MAX_ADJ
DELAY = 0.01
DELAY = SMALL_TICK + SMALL_TICK_MAX_ADJ
class TestEventWait(AbstractGenericWaitTestCase):
......@@ -105,21 +107,20 @@ class TestAsyncResult(greentest.TestCase):
gevent.sleep(0)
self.assertEqual(log, [('caught', obj)])
@skipUnderCoverage("This test is racy and sometimes fails")
def test_set(self):
event1 = AsyncResult()
timer_exc = MyException('interrupted')
# Notice that this test is racy
# Notice that this test is racy:
# After DELAY, we set the event. We also try to immediately
# raise the exception with a timer of 0 --- but that depends
# on cycling the loop. Hence the fairly large value for DELAY.
g = gevent.spawn_later(DELAY, event1.set, 'hello event1')
t = gevent.Timeout.start_new(0, timer_exc)
try:
self._close_on_teardown(g.kill)
with gevent.Timeout.start_new(0, timer_exc):
with self.assertRaises(MyException) as exc:
event1.get()
self.assertEqual(timer_exc, exc.exception)
finally:
t.close()
g.kill()
self.assertIs(timer_exc, exc.exception)
def test_set_with_timeout(self):
event2 = AsyncResult()
......
......@@ -15,6 +15,19 @@ class TestImplements(unittest.TestCase):
def test_event_loop_blocked(self):
verify.verifyClass(events.IEventLoopBlocked, events.EventLoopBlocked)
def test_mem_threshold(self):
verify.verifyClass(events.IMemoryUsageThresholdExceeded,
events.MemoryUsageThresholdExceeded)
verify.verifyObject(events.IMemoryUsageThresholdExceeded,
events.MemoryUsageThresholdExceeded(0, 0, 0))
def test_mem_decreased(self):
verify.verifyClass(events.IMemoryUsageUnderThreshold,
events.MemoryUsageUnderThreshold)
verify.verifyObject(events.IMemoryUsageUnderThreshold,
events.MemoryUsageUnderThreshold(0, 0, 0, 0))
class TestEvents(unittest.TestCase):
def test_is_zope(self):
......
......@@ -199,11 +199,11 @@ class TestPeriodicMonitoringThread(greentest.TestCase):
monitor = hub.start_periodic_monitoring_thread()
self.assertIsNotNone(monitor)
self.assertEqual(1, len(monitor.monitoring_functions()))
monitor.add_monitoring_function(self._monitor, 0.1)
self.assertEqual(2, len(monitor.monitoring_functions()))
self.assertEqual(self._monitor, monitor.monitoring_functions()[1].function)
self.assertEqual(0.1, monitor.monitoring_functions()[1].period)
monitor.add_monitoring_function(self._monitor, 0.1)
self.assertEqual(3, len(monitor.monitoring_functions()))
self.assertEqual(self._monitor, monitor.monitoring_functions()[-1].function)
self.assertEqual(0.1, monitor.monitoring_functions()[-1].period)
# We must make sure we have switched greenlets at least once,
# otherwise we can't detect a failure.
......@@ -214,7 +214,7 @@ class TestPeriodicMonitoringThread(greentest.TestCase):
self._run_monitoring_threads(monitor)
finally:
monitor.add_monitoring_function(self._monitor, None)
self.assertEqual(1, len(monitor._monitoring_functions))
self.assertEqual(2, len(monitor._monitoring_functions))
assert hub.exception_stream is stream
monitor.kill()
del hub.exception_stream
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment