Commit dac56d64 authored by Jason Madden's avatar Jason Madden

Properly clean up native resources in gevent.signal_handler() when cancel is called.

Fixes #1606

Also fix reporting errors from threadpool tasks, and make waiting for threadpool tasks a bit cheaper.
parent bd1899e9
Fix some potential crashes under libuv when using
``gevent.signal_handler``. The crashes were seen running the test
suite and were non-deterministic.
......@@ -22,57 +22,66 @@ start_new_thread, Lock, get_thread_ident, = monkey.get_original(thread_mod_name,
])
# pylint 2.0.dev2 things collections.dequeue.popleft() doesn't return
# pylint:disable=assignment-from-no-return
class _Condition(object):
# pylint:disable=method-hidden
__slots__ = (
'_lock',
'_waiters',
)
def __init__(self, lock):
self.__lock = lock
self.__waiters = []
self._lock = lock
self._waiters = []
# No need to special case for _release_save and
# _acquire_restore; those are only used for RLock, and
# we don't use those.
def __enter__(self):
return self.__lock.__enter__()
return self._lock.__enter__()
def __exit__(self, t, v, tb):
return self.__lock.__exit__(t, v, tb)
return self._lock.__exit__(t, v, tb)
def __repr__(self):
return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
return "<Condition(%s, %d)>" % (self._lock, len(self._waiters))
def wait(self, wait_lock):
# TODO: It would be good to support timeouts here so that we can
# let idle threadpool threads die. Under Python 3, ``Lock.acquire``
# has that ability, but Python 2 doesn't expose that. We could use
# libuv's ``uv_cond_wait`` to implement this whole class and get timeouts
# everywhere.
def wait(self):
# This variable is for the monitoring utils to know that
# this is an idle frame and shouldn't be counted.
gevent_threadpool_worker_idle = True # pylint:disable=unused-variable
# Our __lock MUST be owned, but we don't check that.
waiter = Lock()
waiter.acquire()
self.__waiters.append(waiter)
self.__lock.release()
# Our ``_lock`` MUST be owned, but we don't check that.
# The ``wait_lock`` must be *un*owned.
wait_lock.acquire()
self._waiters.append(wait_lock)
self._lock.release()
try:
waiter.acquire() # Block on the native lock
wait_lock.acquire() # Block on the native lock
finally:
self.__lock.acquire()
self._lock.acquire()
# just good form to release the lock we're holding before it goes
# out of scope
waiter.release()
wait_lock.release()
def notify_one(self):
# The lock SHOULD be owned, but we don't check that.
try:
waiter = self.__waiters.pop()
waiter = self._waiters.pop()
except IndexError:
# Nobody around
pass
else:
# The owner of the ``waiter`` is blocked on
# acquiring it again, so when we ``release`` it, it
# is free to be scheduled and resume.
waiter.release()
......@@ -138,15 +147,26 @@ class Queue(object):
self.unfinished_tasks += 1
self._not_empty.notify_one()
def get(self):
def get(self, cookie):
"""Remove and return an item from the queue.
"""
with self._mutex:
while not self._queue:
self._not_empty.wait()
# Temporarily release our mutex and wait for someone
# to wake us up. There *should* be an item in the queue
# after that.
self._not_empty.wait(cookie)
item = self._queue.popleft()
return item
def allocate_cookie(self):
"""
Create and return the *cookie* to pass to `get()`.
Each thread that will use `get` needs a distinct cookie.
"""
return Lock()
def kill(self):
"""
Call to destroy this object.
......
......@@ -222,7 +222,8 @@ class signal(object):
This returns an object with the useful method ``cancel``, which,
when called, will prevent future deliveries of *signalnum* from
calling *handler*.
calling *handler*. It's best to keep the returned object alive
until you call ``cancel``.
.. note::
......@@ -235,6 +236,10 @@ class signal(object):
The ``handler`` argument is required to
be callable at construction time.
.. versionchanged:: NEXT
The ``cancel`` method now properly cleans up all native resources,
and drops references to all the arguments of this function.
"""
# This is documented as a function, not a class,
# so we're free to change implementation details.
......@@ -247,27 +252,39 @@ class signal(object):
self.hub = _get_hub_noargs()
self.watcher = self.hub.loop.signal(signalnum, ref=False)
self.watcher.start(self._start)
self.handler = handler
self.args = args
self.kwargs = kwargs
if self.greenlet_class is None:
from gevent import Greenlet
type(self).greenlet_class = Greenlet
self.greenlet_class = Greenlet
def _get_ref(self):
return self.watcher.ref
def _set_ref(self, value):
self.watcher.ref = value
self.watcher.start(self._start)
ref = property(_get_ref, _set_ref)
del _get_ref, _set_ref
ref = property(
lambda self: self.watcher.ref,
lambda self, nv: setattr(self.watcher, 'ref', nv)
)
def cancel(self):
if self.watcher is not None:
self.watcher.stop()
# Must close the watcher at a deterministic time, otherwise
# when CFFI reclaims the memory, the native loop might still
# have some reference to it; if anything tries to touch it
# we can wind up writing to memory that is no longer valid,
# leading to a wide variety of crashes.
self.watcher.close()
self.watcher = None
self.handler = None
self.args = None
self.kwargs = None
self.hub = None
self.greenlet_class = None
def _start(self):
# TODO: Maybe this should just be Greenlet.spawn()?
try:
greenlet = self.greenlet_class(self.handle)
greenlet.switch()
......
......@@ -32,26 +32,25 @@ class TestSignal(greentest.TestCase):
def test_alarm(self):
sig = gevent.signal_handler(signal.SIGALRM, raise_Expected)
assert sig.ref is False, repr(sig.ref)
self.assertFalse(sig.ref)
sig.ref = True
assert sig.ref is True
self.assertTrue(sig.ref)
sig.ref = False
try:
def test():
signal.alarm(1)
try:
with self.assertRaises(Expected) as exc:
gevent.sleep(2)
raise AssertionError('must raise Expected')
except Expected as ex:
assert str(ex) == 'TestSignal', ex
# also let's check that alarm is persistent
signal.alarm(1)
ex = exc.exception
self.assertEqual(str(ex), 'TestSignal')
try:
gevent.sleep(2)
raise AssertionError('must raise Expected')
except Expected as ex:
assert str(ex) == 'TestSignal', ex
test()
# also let's check that the handler stays installed.
test()
finally:
sig.cancel() # pylint:disable=no-member
sig.cancel()
@greentest.skipIf((greentest.PY3
......
......@@ -42,7 +42,8 @@ def _format_hub(hub):
class _WorkerGreenlet(RawGreenlet):
# Exists to produce a more useful repr for worker pool
# threads/greenlets.
# threads/greenlets, and manage the communication of the worker
# thread with the threadpool.
# Inform the gevent.util.GreenletTree that this should be
# considered the root (for printing purposes)
......@@ -57,6 +58,9 @@ class _WorkerGreenlet(RawGreenlet):
# The hub of the threadpool we're working for. Just for info.
_hub = None
# A cookie passed to task_queue.get()
_task_queue_cookie = None
def __init__(self, threadpool):
# Construct in the main thread (owner of the threadpool)
# The parent greenlet and thread identifier will be set once the
......@@ -76,8 +80,8 @@ class _WorkerGreenlet(RawGreenlet):
self._stderr = stderr
# We can capture the task_queue; even though it can change if the threadpool
# is re-innitted, we won't be running in that case
self._task_queue = threadpool.task_queue
self._task_queue = threadpool.task_queue # type:gevent._threading.Queue
self._task_queue_cookie = self._task_queue.allocate_cookie()
self._unregister_worker = threadpool._unregister_worker
threadpool._register_worker(self)
......@@ -131,6 +135,7 @@ class _WorkerGreenlet(RawGreenlet):
name = co.co_name
print(' File "%s", line %d, in %s' % (filename, lineno, name),
file=stderr)
tb = tb.tb_next
def __run_task(self, func, args, kwargs, thread_result):
try:
......@@ -146,20 +151,24 @@ class _WorkerGreenlet(RawGreenlet):
exc_info = sys.exc_info
fixup_hub_before_block = self.__fixup_hub_before_block
task_queue_get = self._task_queue.get
task_queue_cookie = self._task_queue_cookie
run_task = self.__run_task
task_queue_done = self._task_queue.task_done
try:
try: # pylint:disable=too-many-nested-blocks
while 1: # tiny bit faster than True on Py2
fixup_hub_before_block()
task = task_queue_get()
task = task_queue_get(task_queue_cookie)
try:
if task is None:
return
run_task(*task)
except:
task = repr(task)
raise
finally:
task = None
task = None if not isinstance(task, str) else task
task_queue_done()
except Exception as e: # pylint:disable=broad-except
print(
......@@ -179,12 +188,13 @@ class _WorkerGreenlet(RawGreenlet):
self._unregister_worker(self)
self._unregister_worker = lambda _: None
self._task_queue = None
self._task_queue_cookie = None
if hub_of_worker is not None:
hub_of_worker.destroy(True)
def __repr__(self, _format_hub=_format_hub):
return "<ThreadPoolWorker at 0x%x thread_ident=0x%x hub=%s>" % (
return "<ThreadPoolWorker at 0x%x thread_ident=0x%x threadpool-hub=%s>" % (
id(self),
self._thread_ident,
_format_hub(self._hub)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment