Commit c6637d27 authored by Jason Madden's avatar Jason Madden

Fix race conditions in libuv child watchers.

Fixes #1104
parent 9bef8a3b
...@@ -80,6 +80,8 @@ ...@@ -80,6 +80,8 @@
- Be more careful about issuing a warning about patching SSL on - Be more careful about issuing a warning about patching SSL on
Python 2. See :issue:`1108`. Python 2. See :issue:`1108`.
- Fix a race condition in libuv child callbacks. See :issue:`1104`.
1.3a1 (2018-01-27) 1.3a1 (2018-01-27)
================== ==================
......
...@@ -5,7 +5,6 @@ watchers will depend on the specific event loop. ...@@ -5,7 +5,6 @@ watchers will depend on the specific event loop.
# pylint:disable=not-callable # pylint:disable=not-callable
from __future__ import absolute_import, print_function from __future__ import absolute_import, print_function
import os
import signal as signalmodule import signal as signalmodule
import functools import functools
import warnings import warnings
...@@ -589,8 +588,10 @@ class ChildMixin(object): ...@@ -589,8 +588,10 @@ class ChildMixin(object):
@property @property
def rpid(self): def rpid(self):
return os.getpid() # The received pid, the result of the waitpid() call.
return self._rpid
_rpid = None
_rstatus = 0 _rstatus = 0
@property @property
......
...@@ -380,3 +380,6 @@ typedef struct _gevent_fs_poll_s { ...@@ -380,3 +380,6 @@ typedef struct _gevent_fs_poll_s {
} gevent_fs_poll_t; } gevent_fs_poll_t;
static void _gevent_fs_poll_callback3(void* handle, int status, const uv_stat_t* prev, const uv_stat_t* curr); static void _gevent_fs_poll_callback3(void* handle, int status, const uv_stat_t* prev, const uv_stat_t* curr);
static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg);
static void gevent_close_all_handles(uv_loop_t* loop);
...@@ -108,3 +108,15 @@ static void _gevent_fs_poll_callback3(void* handlep, int status, const uv_stat_t ...@@ -108,3 +108,15 @@ static void _gevent_fs_poll_callback3(void* handlep, int status, const uv_stat_t
_gevent_generic_callback1((uv_handle_t*)handle, 0); _gevent_generic_callback1((uv_handle_t*)handle, 0);
} }
static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg)
{
if( handle && !uv_is_closing(handle) ) {
uv_close(handle, NULL);
}
}
static void gevent_close_all_handles(uv_loop_t* loop)
{
uv_walk(loop, gevent_uv_walk_callback_close, NULL);
}
...@@ -248,6 +248,9 @@ class loop(AbstractLoop): ...@@ -248,6 +248,9 @@ class loop(AbstractLoop):
def _stop_aux_watchers(self): def _stop_aux_watchers(self):
assert self._prepare
assert self._check
assert self._signal_idle
libuv.uv_prepare_stop(self._prepare) libuv.uv_prepare_stop(self._prepare)
libuv.uv_ref(self._prepare) # Why are we doing this? libuv.uv_ref(self._prepare) # Why are we doing this?
libuv.uv_check_stop(self._check) libuv.uv_check_stop(self._check)
...@@ -256,6 +259,8 @@ class loop(AbstractLoop): ...@@ -256,6 +259,8 @@ class loop(AbstractLoop):
libuv.uv_timer_stop(self._signal_idle) libuv.uv_timer_stop(self._signal_idle)
libuv.uv_ref(self._signal_idle) libuv.uv_ref(self._signal_idle)
libuv.uv_check_stop(self._timer0)
def _setup_for_run_callback(self): def _setup_for_run_callback(self):
self._start_callback_timer() self._start_callback_timer()
libuv.uv_ref(self._timer0) libuv.uv_ref(self._timer0)
...@@ -279,14 +284,7 @@ class loop(AbstractLoop): ...@@ -279,14 +284,7 @@ class loop(AbstractLoop):
# run the loop once to clear them out and # run the loop once to clear them out and
# close again. # close again.
def walk(handle, _arg): libuv.gevent_close_all_handles(ptr)
if not libuv.uv_is_closing(handle):
libuv.uv_close(handle, ffi.NULL)
libuv.uv_walk(ptr,
ffi.callback("void(*)(uv_handle_t*,void*)",
walk),
ffi.NULL)
ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE) ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
if ran_has_more_callbacks: if ran_has_more_callbacks:
...@@ -294,6 +292,20 @@ class loop(AbstractLoop): ...@@ -294,6 +292,20 @@ class loop(AbstractLoop):
closed_failed = libuv.uv_loop_close(ptr) closed_failed = libuv.uv_loop_close(ptr)
assert closed_failed == 0, closed_failed assert closed_failed == 0, closed_failed
# Destroy the native resources *after* we have closed
# the loop. If we do it before, walking the handles
# attached to the loop is likely to segfault.
del self._prepare
del self._check
del self._signal_idle
del self._timer0
# Destroy any watchers we're still holding on to.
del self._io_watchers
del self._fork_watchers
del self._child_watchers
def _can_destroy_loop(self, ptr): def _can_destroy_loop(self, ptr):
# We're being asked to destroy a loop that's, # We're being asked to destroy a loop that's,
# at the time it was constructed, was the default loop. # at the time it was constructed, was the default loop.
...@@ -324,6 +336,7 @@ class loop(AbstractLoop): ...@@ -324,6 +336,7 @@ class loop(AbstractLoop):
'closing']) 'closing'])
handles = [] handles = []
# XXX: Convert this to a modern callback.
def walk(handle, _arg): def walk(handle, _arg):
data = handle.data data = handle.data
if data: if data:
...@@ -446,6 +459,9 @@ class loop(AbstractLoop): ...@@ -446,6 +459,9 @@ class loop(AbstractLoop):
def _sigchld_callback(self): def _sigchld_callback(self):
# Signals can arrive at (relatively) any time. To eliminate
# race conditions, and behave more like libev, we "queue"
# sigchld to run when we run callbacks.
while True: while True:
try: try:
pid, status, _usage = os.wait3(os.WNOHANG) pid, status, _usage = os.wait3(os.WNOHANG)
...@@ -457,8 +473,26 @@ class loop(AbstractLoop): ...@@ -457,8 +473,26 @@ class loop(AbstractLoop):
break break
children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, []) children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, [])
for watcher in children_watchers: for watcher in children_watchers:
watcher._set_status(status) self.run_callback(watcher._set_waitpid_status, pid, status)
# Don't invoke child watchers for 0 more than once
self._child_watchers[0] = []
def _register_child_watcher(self, watcher):
self._child_watchers[watcher._pid].append(watcher)
def _unregister_child_watcher(self, watcher):
try:
# stop() should be idempotent
self._child_watchers[watcher._pid].remove(watcher)
except ValueError:
pass
# Now's a good time to clean up any dead lists we don't need
# anymore
for pid in list(self._child_watchers):
if not self._child_watchers[pid]:
del self._child_watchers[pid]
def io(self, fd, events, ref=True, priority=None): def io(self, fd, events, ref=True, priority=None):
# We rely on hard references here and explicit calls to # We rely on hard references here and explicit calls to
......
...@@ -541,16 +541,13 @@ class child(_SimulatedWithAsyncMixin, ...@@ -541,16 +541,13 @@ class child(_SimulatedWithAsyncMixin,
def _register_loop_callback(self): def _register_loop_callback(self):
self.loop._child_watchers[self._pid].append(self) self.loop._register_child_watcher(self)
def _unregister_loop_callback(self): def _unregister_loop_callback(self):
try: self.loop._unregister_child_watcher(self)
# stop() should be idempotent
self.loop._child_watchers[self._pid].remove(self)
except ValueError:
pass
def _set_status(self, status): def _set_waitpid_status(self, pid, status):
self._rpid = pid
self._rstatus = status self._rstatus = status
self._async.send() self._async.send()
......
...@@ -323,6 +323,11 @@ if hasattr(os, 'fork'): ...@@ -323,6 +323,11 @@ if hasattr(os, 'fork'):
if pid in _watched_children: if pid in _watched_children:
# yes, we're watching it # yes, we're watching it
# Note that the remainder of this code must be careful to NOT
# yield to the event loop except at well known times, or
# we have a race condition between the _on_child callback and the
# code here that could lead to a process to hang.
if options & _WNOHANG or isinstance(_watched_children[pid], tuple): if options & _WNOHANG or isinstance(_watched_children[pid], tuple):
# We're either asked not to block, or it already finished, in which # We're either asked not to block, or it already finished, in which
# case blocking doesn't matter # case blocking doesn't matter
...@@ -339,7 +344,10 @@ if hasattr(os, 'fork'): ...@@ -339,7 +344,10 @@ if hasattr(os, 'fork'):
# cooperative. We know it's our child, etc, so this should work. # cooperative. We know it's our child, etc, so this should work.
watcher = _watched_children[pid] watcher = _watched_children[pid]
# We can't start a watcher that's already started, # We can't start a watcher that's already started,
# so we can't reuse the existing watcher. # so we can't reuse the existing watcher. Notice that the
# old watcher must not have fired already, or during this time, but
# only after we successfully `start()` the watcher. So this must
# not yield to the event loop.
with watcher.loop.child(pid, False) as new_watcher: with watcher.loop.child(pid, False) as new_watcher:
get_hub().wait(new_watcher) get_hub().wait(new_watcher)
# Ok, so now the new watcher is done. That means # Ok, so now the new watcher is done. That means
......
# pylint:disable=no-member
from __future__ import absolute_import, print_function, division
import sys import sys
import unittest import unittest
import greentest import greentest
...@@ -6,13 +7,14 @@ import greentest ...@@ -6,13 +7,14 @@ import greentest
from gevent import core from gevent import core
class TestCore(unittest.TestCase): class TestCore(unittest.TestCase):
def test_get_version(self): def test_get_version(self):
version = core.get_version() version = core.get_version() # pylint: disable=no-member
self.assertIsInstance(version, str) self.assertIsInstance(version, str)
self.assertTrue(version) self.assertTrue(version)
header_version = core.get_header_version() header_version = core.get_header_version() # pylint: disable=no-member
self.assertIsInstance(header_version, str) self.assertIsInstance(header_version, str)
self.assertTrue(header_version) self.assertTrue(header_version)
self.assertEqual(version, header_version) self.assertEqual(version, header_version)
...@@ -20,8 +22,18 @@ class TestCore(unittest.TestCase): ...@@ -20,8 +22,18 @@ class TestCore(unittest.TestCase):
class TestWatchers(unittest.TestCase): class TestWatchers(unittest.TestCase):
def makeOne(self): def _makeOne(self):
return core.loop() return core.loop() # pylint:disable=no-member
def destroyOne(self, loop):
loop.destroy()
def setUp(self):
self.loop = self._makeOne()
def tearDown(self):
self.destroyOne(self.loop)
del self.loop
def test_io(self): def test_io(self):
if sys.platform == 'win32': if sys.platform == 'win32':
...@@ -31,47 +43,57 @@ class TestWatchers(unittest.TestCase): ...@@ -31,47 +43,57 @@ class TestWatchers(unittest.TestCase):
else: else:
Error = ValueError Error = ValueError
win32 = False win32 = False
with self.assertRaises(Error): with self.assertRaises(Error):
self.makeOne().io(-1, 1) self.loop.io(-1, 1)
if hasattr(core, 'TIMER'): if hasattr(core, 'TIMER'):
# libev # libev
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.makeOne().io(1, core.TIMER) self.loop.io(1, core.TIMER) # pylint:disable=no-member
# Test we can set events and io before it's started # Test we can set events and io before it's started
if not win32: if not win32:
# We can't do this with arbitrary FDs on windows; # We can't do this with arbitrary FDs on windows;
# see libev_vfd.h # see libev_vfd.h
io = self.makeOne().io(1, core.READ) io = self.loop.io(1, core.READ) # pylint:disable=no-member
io.fd = 2 io.fd = 2
self.assertEqual(io.fd, 2) self.assertEqual(io.fd, 2)
io.events = core.WRITE io.events = core.WRITE # pylint:disable=no-member
if not hasattr(core, 'libuv'): if not hasattr(core, 'libuv'):
# libev # libev
# pylint:disable=no-member
self.assertEqual(core._events_to_str(io.events), 'WRITE|_IOFDSET') self.assertEqual(core._events_to_str(io.events), 'WRITE|_IOFDSET')
else: else:
self.assertEqual(core._events_to_str(io.events), 'WRITE')
self.assertEqual(core._events_to_str(io.events), # pylint:disable=no-member
'WRITE')
io.start(lambda: None) io.start(lambda: None)
io.close() io.close()
def test_timer_constructor(self): def test_timer_constructor(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.makeOne().timer(1, -1) self.loop.timer(1, -1)
def test_signal_constructor(self): def test_signal_constructor(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.makeOne().signal(1000) self.loop.signal(1000)
class TestWatchersDefault(TestWatchers): class TestWatchersDefault(TestWatchers):
def makeOne(self): def _makeOne(self):
return core.loop(default=True) return core.loop(default=True) # pylint:disable=no-member
def destroyOne(self, loop):
return
@greentest.skipOnLibuvOnPyPyOnWin("This crashes with PyPy 5.10.0, only on Windows. " @greentest.skipOnLibuvOnPyPyOnWin("This crashes with PyPy 5.10.0, only on Windows. "
"See https://ci.appveyor.com/project/denik/gevent/build/1.0.1380/job/lrlvid6mkjtyrhn5#L1103") "See https://ci.appveyor.com/project/denik/gevent/build/1.0.1380/job/lrlvid6mkjtyrhn5#L1103")
class TestWatchersDefaultDestroyed(TestWatchers): class TestWatchersDefaultDestroyed(TestWatchers):
def makeOne(self): def makeOne(self):
# pylint: disable=no-member
l = core.loop(default=True) l = core.loop(default=True)
l.destroy() l.destroy()
del l del l
...@@ -81,6 +103,7 @@ class TestWatchersDefaultDestroyed(TestWatchers): ...@@ -81,6 +103,7 @@ class TestWatchersDefaultDestroyed(TestWatchers):
class TestLibev(unittest.TestCase): class TestLibev(unittest.TestCase):
def test_flags_conversion(self): def test_flags_conversion(self):
# pylint: disable=no-member
if sys.platform != 'win32': if sys.platform != 'win32':
self.assertEqual(core.loop(2, default=False).backend_int, 2) self.assertEqual(core.loop(2, default=False).backend_int, 2)
self.assertEqual(core.loop('select', default=False).backend, 'select') self.assertEqual(core.loop('select', default=False).backend, 'select')
...@@ -94,11 +117,14 @@ class TestLibev(unittest.TestCase): ...@@ -94,11 +117,14 @@ class TestLibev(unittest.TestCase):
class TestEvents(unittest.TestCase): class TestEvents(unittest.TestCase):
def test_events_conversion(self): def test_events_conversion(self):
self.assertEqual(core._events_to_str(core.READ | core.WRITE), 'READ|WRITE') self.assertEqual(core._events_to_str(core.READ | core.WRITE), # pylint: disable=no-member
'READ|WRITE')
def test_EVENTS(self): def test_EVENTS(self):
self.assertEqual(str(core.EVENTS), 'gevent.core.EVENTS') self.assertEqual(str(core.EVENTS), # pylint: disable=no-member
self.assertEqual(repr(core.EVENTS), 'gevent.core.EVENTS') 'gevent.core.EVENTS')
self.assertEqual(repr(core.EVENTS), # pylint: disable=no-member
'gevent.core.EVENTS')
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment