Commit d5f8e220 authored by Jason Madden's avatar Jason Madden

Let libuv loops exit even after starting a child watcher.

Fixes #1581.

This was exposed by running test__os and test__queue in the same process in that order. test__os watches children,
and test__queue has a test that expects a LoopExit. It wasn't raised because the child watcher (really the signal watcher)
was referenced, keeping the loop running.
parent 29b28f5f
libuv loops that have watched children can now exit. Previously, the
SIGCHLD watcher kept the loop alive even if there were no longer any
watched children.
......@@ -30,6 +30,12 @@ class _EVENTSType(object):
EVENTS = GEVENT_CORE_EVENTS = _EVENTSType()
class _DiscardedSet(frozenset):
__slots__ = ()
def discard(self, o):
"Does nothing."
#####
## Note on CFFI objects, callbacks and the lifecycle of watcher objects
#
......@@ -383,6 +389,8 @@ class AbstractLoop(object):
# whether they were the default loop.
_default = None
_keepaliveset = _DiscardedSet()
def __init__(self, ffi, lib, watchers, flags=None, default=None):
self._ffi = ffi
self._lib = lib
......
......@@ -345,25 +345,36 @@ class watcher(object):
# may fail if __init__ did; will be harmlessly printed
self.close()
__in_repr = False
def __repr__(self):
formats = self._format()
result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats)
if self.pending:
result += " pending"
if self.callback is not None:
fself = getattr(self.callback, '__self__', None)
if fself is self:
result += " callback=<bound method %s of self>" % (self.callback.__name__)
else:
result += " callback=%r" % (self.callback, )
if self.args is not None:
result += " args=%r" % (self.args, )
if self.callback is None and self.args is None:
result += " stopped"
result += " watcher=%s" % (self._watcher)
result += " handle=%s" % (self._watcher_handle)
result += " ref=%s" % (self.ref)
return result + ">"
basic = "<%s at 0x%x" % (self.__class__.__name__, id(self))
if self.__in_repr:
return basic + '>'
# Running child watchers have been seen to have a
# recursive repr in ``self.args``, thanks to ``gevent.os.fork_and_watch``
# passing the watcher as an argument to its callback.
self.__in_repr = True
try:
result = '%s%s' % (basic, self._format())
if self.pending:
result += " pending"
if self.callback is not None:
fself = getattr(self.callback, '__self__', None)
if fself is self:
result += " callback=<bound method %s of self>" % (self.callback.__name__)
else:
result += " callback=%r" % (self.callback, )
if self.args is not None:
result += " args=%r" % (self.args, )
if self.callback is None and self.args is None:
result += " stopped"
result += " watcher=%s" % (self._watcher)
result += " handle=%s" % (self._watcher_handle)
result += " ref=%s" % (self.ref)
return result + ">"
finally:
self.__in_repr = False
@property
def _watcher_handle(self):
......
cimport cython
from gevent.__waiter cimport Waiter
from gevent._event cimport Event
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef bint _greenlet_imported
cdef _heappush
cdef _heappop
cdef _heapify
cdef _Empty
cdef _Full
cdef Timeout
cdef InvalidSwitchError
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
@cython.final
cdef _safe_remove(deq, item)
@cython.final
@cython.internal
cdef class ItemWaiter(Waiter):
cdef readonly item
cdef readonly queue
cdef _safe_remove(deq, item)
cdef class Queue:
cdef __weakref__
......@@ -33,6 +54,7 @@ cdef class Queue:
cpdef Py_ssize_t qsize(self)
cpdef bint empty(self)
cpdef bint full(self)
cpdef _create_queue(self, items=*)
cpdef put(self, item, block=*, timeout=*)
cpdef put_nowait(self, item)
......@@ -46,6 +68,13 @@ cdef class Queue:
cdef _schedule_unlock(self)
@cython.final
@cython.internal
cdef class ItemWaiter(Waiter):
cdef readonly item
cdef readonly Queue queue
@cython.final
cdef class UnboundQueue(Queue):
pass
......
......@@ -588,6 +588,8 @@ class loop(AbstractLoop):
self._sigchld_watcher = ffi.new('uv_signal_t*')
libuv.uv_signal_init(self.ptr, self._sigchld_watcher)
self._sigchld_watcher.data = self._handle_to_self
# Don't let this keep the loop alive
libuv.uv_unref(self._sigchld_watcher)
libuv.uv_signal_start(self._sigchld_watcher,
libuv.python_sigchld_callback,
......@@ -635,7 +637,7 @@ class loop(AbstractLoop):
except ValueError:
pass
# Now's a good time to clean up any dead lists we don't need
# Now's a good time to clean up any dead watchers we don't need
# anymore
for pid in list(self._child_watchers):
if not self._child_watchers[pid]:
......
......@@ -474,22 +474,25 @@ class _SimulatedWithAsyncMixin(object):
return self._async.active
def start(self, cb, *args):
assert self._async is not None
self._register_loop_callback()
self.callback = cb
self.args = args
self._async.start(cb, *args)
#watcher.start(self, cb, *args)
def stop(self):
self._unregister_loop_callback()
self.callback = None
self.args = None
self._async.stop()
if self._async is not None:
# If we're stop() after close().
# That should be allowed.
self._async.stop()
def close(self):
if self._async is not None:
a = self._async
#self._async = None
self._async = None
a.close()
def _register_loop_callback(self):
......@@ -503,9 +506,7 @@ class _SimulatedWithAsyncMixin(object):
class fork(_SimulatedWithAsyncMixin,
_base.ForkMixin,
watcher):
# We'll have to implement this one completely manually
# Right now it doesn't matter much since libuv doesn't survive
# a fork anyway. (That's a work in progress)
# We'll have to implement this one completely manually.
_watcher_skip_ffi = False
def _register_loop_callback(self):
......
......@@ -277,9 +277,11 @@ if hasattr(os, 'fork'):
# just not waited on yet.
now = time.time()
oldest_allowed = now - timeout
dead = [pid for pid, val
in _watched_children.items()
if isinstance(val, tuple) and val[2] < oldest_allowed]
dead = [
pid for pid, val
in _watched_children.items()
if isinstance(val, tuple) and val[2] < oldest_allowed
]
for pid in dead:
del _watched_children[pid]
......@@ -296,7 +298,10 @@ if hasattr(os, 'fork'):
:func:`os.waitpid`. Some combinations of *options* may not
be supported cooperatively (as of 1.1 that includes
WUNTRACED). Using a *pid* of 0 to request waiting on only processes
from the current process group is not cooperative.
from the current process group is not cooperative. A *pid* of -1
to wait for any child is non-blocking, but may or may not
require a trip around the event loop, depending on whether any children
have already terminated but not been waited on.
Availability: POSIX.
......@@ -316,12 +321,19 @@ if hasattr(os, 'fork'):
if pid <= 0:
# magic functions for multiple children.
if pid == -1:
# Any child. If we have one that we're watching and that finished,
# we will use that one. Otherwise, let the OS take care of it.
# Any child. If we have one that we're watching
# and that finished, we will use that one,
# preferring the oldest. Otherwise, let the OS
# take care of it.
finished_at = None
for k, v in _watched_children.items():
if isinstance(v, tuple):
if (
isinstance(v, tuple)
and (finished_at is None or v[2] < finished_at)
):
pid = k
break
finished_at = v[2]
if pid <= 0:
# We didn't have one that was ready. If there are
# no funky options set, and the pid was -1
......
......@@ -38,12 +38,14 @@ if sys.version_info[0] == 2:
import Queue as __queue__ # python 3: pylint:disable=import-error
else:
import queue as __queue__ # python 2: pylint:disable=import-error
Full = __queue__.Full
Empty = __queue__.Empty
# We re-export these exceptions to client modules.
# But we also want fast access to them from Cython with a cdef,
# and we do that with the _ definition.
_Full = Full = __queue__.Full
_Empty = Empty = __queue__.Empty
from gevent.timeout import Timeout
from gevent._hub_local import get_hub_noargs as get_hub
from greenlet import getcurrent
from gevent.exceptions import InvalidSwitchError
__all__ = []
......@@ -71,6 +73,8 @@ def _safe_remove(deq, item):
import gevent._waiter
locals()['Waiter'] = gevent._waiter.Waiter
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
class ItemWaiter(Waiter): # pylint:disable=undefined-variable
# pylint:disable=assigning-non-slot
......@@ -256,7 +260,7 @@ class Queue(object):
self._put(item)
if self.getters:
self._schedule_unlock()
elif self.hub is getcurrent():
elif self.hub is getcurrent(): # pylint:disable=undefined-variable
# We're in the mainloop, so we cannot wait; we can switch to other greenlets though.
# Check if possible to get a free slot in the queue.
while self.getters and self.qsize() and self.qsize() >= self._maxsize:
......@@ -290,13 +294,14 @@ class Queue(object):
"""
self.put(item, False)
def __get_or_peek(self, method, block, timeout):
# Internal helper method. The `method` should be either
# self._get when called from self.get() or self._peek when
# called from self.peek(). Call this after the initial check
# to see if there are items in the queue.
if self.hub is getcurrent():
if self.hub is getcurrent(): # pylint:disable=undefined-variable
# special case to make get_nowait() or peek_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
......@@ -305,12 +310,12 @@ class Queue(object):
self.putters.popleft().put_and_switch()
if self.qsize():
return method()
raise Empty()
raise Empty
if not block:
# We can't block, we're not the hub, and we have nothing
# to return. No choice...
raise Empty()
raise Empty
waiter = Waiter() # pylint:disable=undefined-variable
timeout = Timeout._start_new_or_dummy(timeout, Empty)
......@@ -362,7 +367,8 @@ class Queue(object):
(*timeout* is ignored in that case).
"""
if self.qsize():
# XXX: Why doesn't this schedule an unlock like get() does?
# This doesn't schedule an unlock like get() does because we're not
# actually making any space.
return self._peek()
return self.__get_or_peek(self._peek, block, timeout)
......@@ -604,7 +610,7 @@ class Channel(object):
return True
def put(self, item, block=True, timeout=None):
if self.hub is getcurrent():
if self.hub is getcurrent(): # pylint:disable=undefined-variable
if self.getters:
getter = self.getters.popleft()
getter.switch(item)
......@@ -634,7 +640,7 @@ class Channel(object):
self.put(item, False)
def get(self, block=True, timeout=None):
if self.hub is getcurrent():
if self.hub is getcurrent(): # pylint:disable=undefined-variable
if self.putters:
item, putter = self.putters.popleft()
self.hub.loop.run_callback(putter.switch, putter)
......@@ -681,5 +687,11 @@ class Channel(object):
next = __next__ # Py2
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent._queue')
......@@ -147,12 +147,24 @@ class StringAssertMixin(object):
class TestTimeout(gevent.Timeout):
_expire_info = ''
def __init__(self, timeout):
gevent.Timeout.__init__(self, timeout, 'test timed out\n', ref=False)
def __init__(self, timeout, method='Not Given'):
gevent.Timeout.__init__(
self,
timeout,
'%r: test timed out\n' % (method,),
ref=False
)
def _on_expiration(self, prev_greenlet, ex):
from gevent.util import format_run_info
self._expire_info = '\n'.join(format_run_info())
loop = gevent.get_hub().loop
debug_info = 'N/A'
if hasattr(loop, 'debug'):
debug_info = [str(s) for s in loop.debug()]
run_info = format_run_info()
self._expire_info = 'Loop Debug:\n%s\nRun Info:\n%s' % (
'\n'.join(debug_info), '\n'.join(run_info)
)
gevent.Timeout._on_expiration(self, prev_greenlet, ex)
def __str__(self):
......@@ -166,7 +178,7 @@ def _wrap_timeout(timeout, method):
@wraps(method)
def wrapper(self, *args, **kwargs):
with TestTimeout(timeout):
with TestTimeout(timeout, method):
return method(self, *args, **kwargs)
return wrapper
......
......@@ -160,11 +160,13 @@ class TestForkAndWatch(greentest.TestCase):
pid = os.fork_and_watch()
if pid:
os.waitpid(-1, 0)
# Can't assert on what the pid actually was,
# Can't assert on what the found pid actually was,
# our testrunner may have spawned multiple children.
os._reap_children(0) # make the leakchecker happy
else: # pragma: no cover
gevent.sleep(2)
# The test framework will catch a regular SystemExit
# from sys.exit(), we need to just kill the process.
os._exit(0)
def test_waitpid_wrong_neg(self):
......
......@@ -24,7 +24,8 @@ class TestQueue(TestCase):
def test_peek_empty(self):
q = queue.Queue()
# No putters waiting, in the main loop: LoopExit
self.assertRaises(LoopExit, q.peek)
with self.assertRaises(LoopExit):
q.peek()
def waiter(q):
self.assertRaises(Empty, q.peek, timeout=0.01)
......@@ -323,6 +324,8 @@ class TestNoWait(TestCase):
assert q.empty(), q
def test_get_nowait_unlock_channel(self):
# get_nowait runs fine in the hub, and
# it switches to a waiting putter if needed.
result = []
q = queue.Channel()
p = gevent.spawn(q.put, 5)
......@@ -330,19 +333,21 @@ class TestNoWait(TestCase):
def store_result(func, *args):
result.append(func(*args))
assert q.empty(), q
assert q.full(), q
self.assertTrue(q.empty())
self.assertTrue(q.full())
gevent.sleep(0.001)
assert q.empty(), q
assert q.full(), q
self.assertTrue(q.empty())
self.assertTrue(q.full())
get_hub().loop.run_callback(store_result, q.get_nowait)
gevent.sleep(0.001)
assert q.empty(), q
assert q.full(), q
assert result == [5], result
assert p.ready(), p
assert p.dead, p
assert q.empty(), q
self.assertTrue(q.empty())
self.assertTrue(q.full())
self.assertEqual(result, [5])
self.assertTrue(p.ready())
self.assertTrue(p.dead)
self.assertTrue(q.empty())
# put_nowait must work from the mainloop
def test_put_nowait_unlock(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment