Commit 090b360a authored by Jason Madden's avatar Jason Madden

Add monitoring for memory usage, emitting events as it moves around the threshold.

Needs specific tests.
parent 320be8f0
...@@ -49,6 +49,10 @@ ...@@ -49,6 +49,10 @@
events when it detects certain conditions, like loop blocked or events when it detects certain conditions, like loop blocked or
memory limits exceeded. memory limits exceeded.
- Add settings for monitoring memory usage and emitting events when a
threshold is exceeded and then corrected. gevent currently supplies
no policy for what to do when memory exceeds the configured limit.
1.3a2 (2018-03-06) 1.3a2 (2018-03-06)
================== ==================
......
...@@ -279,17 +279,41 @@ class IntSettingMixin(object): ...@@ -279,17 +279,41 @@ class IntSettingMixin(object):
validate = staticmethod(validate_anything) validate = staticmethod(validate_anything)
class FloatSettingMixin(object): class _PositiveValueMixin(object):
def _convert(self, value):
if value:
return float(value)
def validate(self, value): def validate(self, value):
if value is not None and value <= 0: if value is not None and value <= 0:
raise ValueError("Must be > 0") raise ValueError("Must be positive")
return value return value
class FloatSettingMixin(_PositiveValueMixin):
def _convert(self, value):
if value:
return float(value)
class ByteCountSettingMixin(_PositiveValueMixin):
_MULTIPLES = {
# All keys must be the same size.
'kb': 1024,
'mb': 1024 * 1024,
'gb': 1024 * 1024 * 1024,
}
_SUFFIX_SIZE = 2
def _convert(self, value):
if not value:
return
value = value.lower()
for s, m in self._MULTIPLES.items():
if value[-self._SUFFIX_SIZE:] == s:
return int(value[:-self._SUFFIX_SIZE]) * m
return int(value)
class Resolver(ImportableSetting, Setting): class Resolver(ImportableSetting, Setting):
desc = """\ desc = """\
...@@ -493,6 +517,36 @@ class MaxBlockingTime(FloatSettingMixin, Setting): ...@@ -493,6 +517,36 @@ class MaxBlockingTime(FloatSettingMixin, Setting):
.. versionadded:: 1.3b1 .. versionadded:: 1.3b1
""" """
class MonitorMemoryPeriod(FloatSettingMixin, Setting):
name = 'memory_monitor_period'
environment_key = 'GEVENT_MONITOR_MEMORY_PERIOD'
default = 5
desc = """\
If `monitor_thread` is enabled, this is approximately how long
(in seconds) we will go between checking the processes memory usage.
Checking the memory usage is relatively expensive on some operating
systems, so this should not be too low. gevent will place a floor
value on it.
"""
class MonitorMemoryMaxUsage(ByteCountSettingMixin, Setting):
name = 'max_memory_usage'
environment_key = 'GEVENT_MONITOR_MEMORY_MAX'
default = None
desc = """\
If `monitor_thread` is enabled,
then if memory usage exceeds this amount (in bytes), events will
be emitted. See `gevent.events`.
There is no default value for this setting. If you wish to
cap memory usage, you must choose a value.
"""
# The ares settings are all interpreted by # The ares settings are all interpreted by
# gevent/resolver/ares.pyx, so we don't do # gevent/resolver/ares.pyx, so we don't do
# any validation here. # any validation here.
......
...@@ -14,6 +14,8 @@ from gevent.monkey import get_original ...@@ -14,6 +14,8 @@ from gevent.monkey import get_original
from gevent.util import format_run_info from gevent.util import format_run_info
from gevent.events import notify from gevent.events import notify
from gevent.events import EventLoopBlocked from gevent.events import EventLoopBlocked
from gevent.events import MemoryUsageThresholdExceeded
from gevent.events import MemoryUsageUnderThreshold
from gevent._compat import thread_mod_name from gevent._compat import thread_mod_name
from gevent._util import gmctime from gevent._util import gmctime
...@@ -39,6 +41,24 @@ get_thread_ident = get_original(thread_mod_name, 'get_ident') ...@@ -39,6 +41,24 @@ get_thread_ident = get_original(thread_mod_name, 'get_ident')
start_new_thread = get_original(thread_mod_name, 'start_new_thread') start_new_thread = get_original(thread_mod_name, 'start_new_thread')
thread_sleep = get_original('time', 'sleep') thread_sleep = get_original('time', 'sleep')
try:
# The standard library 'resource' module doesn't provide
# a standard way to get the RSS measure, only the maximum.
# You might be tempted to try to compute something by adding
# together text and data sizes, but on many systems those come back
# zero. So our only option is psutil.
from psutil import Process, AccessDenied
# Make sure it works (why would we be denied access to our own process?)
try:
Process().memory_full_info()
except AccessDenied:
Process = None
except ImportError:
pass
class MonitorWarning(RuntimeWarning):
"""The type of warnings we emit."""
class _MonitorEntry(object): class _MonitorEntry(object):
__slots__ = ('function', 'period', 'last_run_time') __slots__ = ('function', 'period', 'last_run_time')
...@@ -76,6 +96,10 @@ class PeriodicMonitoringThread(object): ...@@ -76,6 +96,10 @@ class PeriodicMonitoringThread(object):
# what particular monitoring functions want to say. # what particular monitoring functions want to say.
min_sleep_time = 0.005 min_sleep_time = 0.005
# The minimum period in seconds at which we will check memory usage.
# Getting memory usage is fairly expensive.
min_memory_monitor_period = 2
# A list of _MonitorEntry objects: [(function(hub), period, last_run_time))] # A list of _MonitorEntry objects: [(function(hub), period, last_run_time))]
# The first entry is always our entry for self.monitor_blocking # The first entry is always our entry for self.monitor_blocking
_monitoring_functions = None _monitoring_functions = None
...@@ -83,6 +107,11 @@ class PeriodicMonitoringThread(object): ...@@ -83,6 +107,11 @@ class PeriodicMonitoringThread(object):
# The calculated min sleep time for the monitoring functions list. # The calculated min sleep time for the monitoring functions list.
_calculated_sleep_time = None _calculated_sleep_time = None
# A boolean value that also happens to capture the
# memory usage at the time we exceeded the threshold. Reset
# to 0 when we go back below.
_memory_exceeded = 0
def __init__(self, hub): def __init__(self, hub):
self._hub_wref = wref(hub, self._on_hub_gc) self._hub_wref = wref(hub, self._on_hub_gc)
self.should_run = True self.should_run = True
...@@ -288,6 +317,46 @@ class PeriodicMonitoringThread(object): ...@@ -288,6 +317,46 @@ class PeriodicMonitoringThread(object):
def monitor_current_greenlet_blocking(self): def monitor_current_greenlet_blocking(self):
self._active_greenlet = getcurrent() self._active_greenlet = getcurrent()
def can_monitor_memory_usage(self):
return Process is not None
def install_monitor_memory_usage(self):
# Start monitoring memory usage, if possible.
# If not possible, emit a warning.
if not self.can_monitor_memory_usage:
import warnings
warnings.warn("Unable to monitor memory usage. Install psutil.",
MonitorWarning)
return
self.add_monitoring_function(self.monitor_memory_usage,
max(GEVENT_CONFIG.memory_monitor_period,
self.min_memory_monitor_period))
def monitor_memory_usage(self, _hub):
max_allowed = GEVENT_CONFIG.max_memory_usage
if not max_allowed:
# They disabled it.
return
rusage = Process().memory_full_info()
# uss only documented available on Windows, Linux, and OS X.
# If not available, fall back to rss as an aproximation.
mem_usage = getattr(rusage, 'uss', 0) or rusage.rss
if mem_usage > max_allowed:
if mem_usage > self._memory_exceeded:
# We're still growing
notify(MemoryUsageThresholdExceeded(
mem_usage, max_allowed, rusage))
self._memory_exceeded = mem_usage
else:
# we're below. Were we above it last time?
if self._memory_exceeded:
notify(MemoryUsageUnderThreshold(
mem_usage, max_allowed, rusage, self._memory_exceeded))
self._memory_exceeded = 0
def __repr__(self): def __repr__(self):
return '<%s at %s in thread %s greenlet %r for %r>' % ( return '<%s at %s in thread %s greenlet %r for %r>' % (
self.__class__.__name__, self.__class__.__name__,
......
...@@ -30,6 +30,10 @@ __all__ = [ ...@@ -30,6 +30,10 @@ __all__ = [
'subscribers', 'subscribers',
'IEventLoopBlocked', 'IEventLoopBlocked',
'EventLoopBlocked', 'EventLoopBlocked',
'IMemoryUsageThresholdExceeded',
'MemoryUsageThresholdExceeded',
'IMemoryUsageUnderThreshold',
'MemoryUsageUnderThreshold',
] ]
try: try:
...@@ -94,3 +98,70 @@ class EventLoopBlocked(object): ...@@ -94,3 +98,70 @@ class EventLoopBlocked(object):
self.greenlet = greenlet self.greenlet = greenlet
self.blocking_time = blocking_time self.blocking_time = blocking_time
self.info = info self.info = info
class IMemoryUsageThresholdExceeded(Interface):
"""
The event emitted when the memory usage threshold is exceeded.
This event is emitted only while memory continues to grow
above the threshold. Only if the condition or stabilized is corrected (memory
usage drops) will the event be emitted in the future.
This event is emitted in the monitor thread.
"""
mem_usage = Attribute("The current process memory usage, in bytes.")
max_allowed = Attribute("The maximum allowed memory usage, in bytes.")
memory_info = Attribute("The tuple of memory usage stats return by psutil.")
class _AbstractMemoryEvent(object):
def __init__(self, mem_usage, max_allowed, memory_info):
self.mem_usage = mem_usage
self.max_allowed = max_allowed
self.memory_info = memory_info
def __repr__(self):
return "<%s used=%d max=%d details=%r>" % (
self.__class__.__name__,
self.mem_usage,
self.max_allowed,
self.memory_info,
)
@implementer(IMemoryUsageThresholdExceeded)
class MemoryUsageThresholdExceeded(_AbstractMemoryEvent):
"""
Implementation of `IMemoryUsageThresholdExceeded`.
"""
class IMemoryUsageUnderThreshold(Interface):
"""
The event emitted when the memory usage drops below the
threshold after having previously been above it.
This event is emitted only the first time memory usage is detected
to be below the threshold after having previously been above it.
If memory usage climbs again, a `IMemoryUsageThresholdExceeded`
event will be broadcast, and then this event could be broadcast again.
This event is emitted in the monitor thread.
"""
mem_usage = Attribute("The current process memory usage, in bytes.")
max_allowed = Attribute("The maximum allowed memory usage, in bytes.")
max_memory_usage = Attribute("The memory usage that caused the previous "
"IMemoryUsageThresholdExceeded event.")
memory_info = Attribute("The tuple of memory usage stats return by psutil.")
@implementer(IMemoryUsageUnderThreshold)
class MemoryUsageUnderThreshold(_AbstractMemoryEvent):
"""
Implementation of `IMemoryUsageUnderThreshold`.
"""
def __init__(self, mem_usage, max_allowed, memory_info, max_usage):
super(MemoryUsageUnderThreshold, self).__init__(mem_usage, max_allowed, memory_info)
self.max_memory_usage = max_usage
...@@ -61,7 +61,7 @@ class _Threadlocal(get_original(thread_mod_name, '_local')): ...@@ -61,7 +61,7 @@ class _Threadlocal(get_original(thread_mod_name, '_local')):
_threadlocal = _Threadlocal() _threadlocal = _Threadlocal()
get_thread_ident = get_original(thread_mod_name, 'get_ident') get_thread_ident = get_original(thread_mod_name, 'get_ident')
MAIN_THREAD = get_thread_ident() # XXX: Assuming import is done on the main thread. MAIN_THREAD_IDENT = get_thread_ident() # XXX: Assuming import is done on the main thread.
class LoopExit(Exception): class LoopExit(Exception):
...@@ -507,7 +507,7 @@ class Hub(TrackedRawGreenlet): ...@@ -507,7 +507,7 @@ class Hub(TrackedRawGreenlet):
# loop. See #237 and #238. # loop. See #237 and #238.
self.loop = _threadlocal.loop self.loop = _threadlocal.loop
else: else:
if default is None and self.thread_ident != MAIN_THREAD: if default is None and self.thread_ident != MAIN_THREAD_IDENT:
default = False default = False
if loop is None: if loop is None:
...@@ -537,7 +537,7 @@ class Hub(TrackedRawGreenlet): ...@@ -537,7 +537,7 @@ class Hub(TrackedRawGreenlet):
.. versionadded:: 1.3b1 .. versionadded:: 1.3b1
""" """
return self.thread_ident == MAIN_THREAD return self.thread_ident == MAIN_THREAD_IDENT
def __repr__(self): def __repr__(self):
...@@ -742,9 +742,6 @@ class Hub(TrackedRawGreenlet): ...@@ -742,9 +742,6 @@ class Hub(TrackedRawGreenlet):
def start_periodic_monitoring_thread(self): def start_periodic_monitoring_thread(self):
if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread: if self.periodic_monitoring_thread is None and GEVENT_CONFIG.monitor_thread:
# TODO: If we're the main thread, then add the memory monitoring
# function.
# Note that it is possible for one real thread to # Note that it is possible for one real thread to
# (temporarily) wind up with multiple monitoring threads, # (temporarily) wind up with multiple monitoring threads,
# if hubs are started and stopped within the thread. This shows up # if hubs are started and stopped within the thread. This shows up
...@@ -752,6 +749,10 @@ class Hub(TrackedRawGreenlet): ...@@ -752,6 +749,10 @@ class Hub(TrackedRawGreenlet):
# hub object is gone. # hub object is gone.
from gevent._monitor import PeriodicMonitoringThread from gevent._monitor import PeriodicMonitoringThread
self.periodic_monitoring_thread = PeriodicMonitoringThread(self) self.periodic_monitoring_thread = PeriodicMonitoringThread(self)
if self.main_hub:
self.periodic_monitoring_thread.install_monitor_memory_usage()
return self.periodic_monitoring_thread return self.periodic_monitoring_thread
def join(self, timeout=None): def join(self, timeout=None):
......
...@@ -15,6 +15,19 @@ class TestImplements(unittest.TestCase): ...@@ -15,6 +15,19 @@ class TestImplements(unittest.TestCase):
def test_event_loop_blocked(self): def test_event_loop_blocked(self):
verify.verifyClass(events.IEventLoopBlocked, events.EventLoopBlocked) verify.verifyClass(events.IEventLoopBlocked, events.EventLoopBlocked)
def test_mem_threshold(self):
verify.verifyClass(events.IMemoryUsageThresholdExceeded,
events.MemoryUsageThresholdExceeded)
verify.verifyObject(events.IMemoryUsageThresholdExceeded,
events.MemoryUsageThresholdExceeded(0, 0, 0))
def test_mem_decreased(self):
verify.verifyClass(events.IMemoryUsageUnderThreshold,
events.MemoryUsageUnderThreshold)
verify.verifyObject(events.IMemoryUsageUnderThreshold,
events.MemoryUsageUnderThreshold(0, 0, 0, 0))
class TestEvents(unittest.TestCase): class TestEvents(unittest.TestCase):
def test_is_zope(self): def test_is_zope(self):
......
...@@ -199,11 +199,11 @@ class TestPeriodicMonitoringThread(greentest.TestCase): ...@@ -199,11 +199,11 @@ class TestPeriodicMonitoringThread(greentest.TestCase):
monitor = hub.start_periodic_monitoring_thread() monitor = hub.start_periodic_monitoring_thread()
self.assertIsNotNone(monitor) self.assertIsNotNone(monitor)
self.assertEqual(1, len(monitor.monitoring_functions()))
monitor.add_monitoring_function(self._monitor, 0.1)
self.assertEqual(2, len(monitor.monitoring_functions())) self.assertEqual(2, len(monitor.monitoring_functions()))
self.assertEqual(self._monitor, monitor.monitoring_functions()[1].function) monitor.add_monitoring_function(self._monitor, 0.1)
self.assertEqual(0.1, monitor.monitoring_functions()[1].period) self.assertEqual(3, len(monitor.monitoring_functions()))
self.assertEqual(self._monitor, monitor.monitoring_functions()[-1].function)
self.assertEqual(0.1, monitor.monitoring_functions()[-1].period)
# We must make sure we have switched greenlets at least once, # We must make sure we have switched greenlets at least once,
# otherwise we can't detect a failure. # otherwise we can't detect a failure.
...@@ -214,7 +214,7 @@ class TestPeriodicMonitoringThread(greentest.TestCase): ...@@ -214,7 +214,7 @@ class TestPeriodicMonitoringThread(greentest.TestCase):
self._run_monitoring_threads(monitor) self._run_monitoring_threads(monitor)
finally: finally:
monitor.add_monitoring_function(self._monitor, None) monitor.add_monitoring_function(self._monitor, None)
self.assertEqual(1, len(monitor._monitoring_functions)) self.assertEqual(2, len(monitor._monitoring_functions))
assert hub.exception_stream is stream assert hub.exception_stream is stream
monitor.kill() monitor.kill()
del hub.exception_stream del hub.exception_stream
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment