Commit 56d26492 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1110 from gevent/issue1004

Fix race conditions in libuv child watchers.
parents 9bef8a3b c6d869f4
...@@ -80,6 +80,8 @@ ...@@ -80,6 +80,8 @@
- Be more careful about issuing a warning about patching SSL on - Be more careful about issuing a warning about patching SSL on
Python 2. See :issue:`1108`. Python 2. See :issue:`1108`.
- Fix a race condition in libuv child callbacks. See :issue:`1104`.
1.3a1 (2018-01-27) 1.3a1 (2018-01-27)
================== ==================
......
...@@ -236,6 +236,10 @@ class AbstractCallbacks(object): ...@@ -236,6 +236,10 @@ class AbstractCallbacks(object):
def python_prepare_callback(self, watcher_ptr): def python_prepare_callback(self, watcher_ptr):
loop = self._find_loop_from_c_watcher(watcher_ptr) loop = self._find_loop_from_c_watcher(watcher_ptr)
if loop is None: # pragma: no cover
print("WARNING: gevent: running prepare callbacks from a destroyed handle: ",
watcher_ptr)
return
loop._run_callbacks() loop._run_callbacks()
def check_callback_onerror(self, t, v, tb): def check_callback_onerror(self, t, v, tb):
......
...@@ -5,7 +5,6 @@ watchers will depend on the specific event loop. ...@@ -5,7 +5,6 @@ watchers will depend on the specific event loop.
# pylint:disable=not-callable # pylint:disable=not-callable
from __future__ import absolute_import, print_function from __future__ import absolute_import, print_function
import os
import signal as signalmodule import signal as signalmodule
import functools import functools
import warnings import warnings
...@@ -589,8 +588,10 @@ class ChildMixin(object): ...@@ -589,8 +588,10 @@ class ChildMixin(object):
@property @property
def rpid(self): def rpid(self):
return os.getpid() # The received pid, the result of the waitpid() call.
return self._rpid
_rpid = None
_rstatus = 0 _rstatus = 0
@property @property
......
...@@ -380,3 +380,10 @@ typedef struct _gevent_fs_poll_s { ...@@ -380,3 +380,10 @@ typedef struct _gevent_fs_poll_s {
} gevent_fs_poll_t; } gevent_fs_poll_t;
static void _gevent_fs_poll_callback3(void* handle, int status, const uv_stat_t* prev, const uv_stat_t* curr); static void _gevent_fs_poll_callback3(void* handle, int status, const uv_stat_t* prev, const uv_stat_t* curr);
static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg);
static void gevent_close_all_handles(uv_loop_t* loop);
static void gevent_zero_timer(uv_timer_t* handle);
static void gevent_zero_prepare(uv_prepare_t* handle);
static void gevent_zero_check(uv_check_t* handle);
static void gevent_zero_loop(uv_loop_t* handle);
...@@ -108,3 +108,35 @@ static void _gevent_fs_poll_callback3(void* handlep, int status, const uv_stat_t ...@@ -108,3 +108,35 @@ static void _gevent_fs_poll_callback3(void* handlep, int status, const uv_stat_t
_gevent_generic_callback1((uv_handle_t*)handle, 0); _gevent_generic_callback1((uv_handle_t*)handle, 0);
} }
static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg)
{
if( handle && !uv_is_closing(handle) ) {
uv_close(handle, NULL);
}
}
static void gevent_close_all_handles(uv_loop_t* loop)
{
uv_walk(loop, gevent_uv_walk_callback_close, NULL);
}
static void gevent_zero_timer(uv_timer_t* handle)
{
memset(handle, 0, sizeof(uv_timer_t));
}
static void gevent_zero_check(uv_check_t* handle)
{
memset(handle, 0, sizeof(uv_check_t));
}
static void gevent_zero_prepare(uv_prepare_t* handle)
{
memset(handle, 0, sizeof(uv_prepare_t));
}
static void gevent_zero_loop(uv_loop_t* handle)
{
memset(handle, 0, sizeof(uv_loop_t));
}
...@@ -27,7 +27,7 @@ class _Callbacks(AbstractCallbacks): ...@@ -27,7 +27,7 @@ class _Callbacks(AbstractCallbacks):
def _find_loop_from_c_watcher(self, watcher_ptr): def _find_loop_from_c_watcher(self, watcher_ptr):
loop_handle = ffi.cast('uv_handle_t*', watcher_ptr).data loop_handle = ffi.cast('uv_handle_t*', watcher_ptr).data
return self.from_handle(loop_handle) return self.from_handle(loop_handle) if loop_handle else None
def python_sigchld_callback(self, watcher_ptr, _signum): def python_sigchld_callback(self, watcher_ptr, _signum):
self.from_handle(ffi.cast('uv_handle_t*', watcher_ptr).data)._sigchld_callback() self.from_handle(ffi.cast('uv_handle_t*', watcher_ptr).data)._sigchld_callback()
...@@ -248,14 +248,20 @@ class loop(AbstractLoop): ...@@ -248,14 +248,20 @@ class loop(AbstractLoop):
def _stop_aux_watchers(self): def _stop_aux_watchers(self):
assert self._prepare
assert self._check
assert self._signal_idle
libuv.uv_prepare_stop(self._prepare) libuv.uv_prepare_stop(self._prepare)
libuv.uv_ref(self._prepare) # Why are we doing this? libuv.uv_ref(self._prepare) # Why are we doing this?
libuv.uv_check_stop(self._check) libuv.uv_check_stop(self._check)
libuv.uv_ref(self._check) libuv.uv_ref(self._check)
libuv.uv_timer_stop(self._signal_idle) libuv.uv_timer_stop(self._signal_idle)
libuv.uv_ref(self._signal_idle) libuv.uv_ref(self._signal_idle)
libuv.uv_check_stop(self._timer0)
def _setup_for_run_callback(self): def _setup_for_run_callback(self):
self._start_callback_timer() self._start_callback_timer()
libuv.uv_ref(self._timer0) libuv.uv_ref(self._timer0)
...@@ -272,28 +278,40 @@ class loop(AbstractLoop): ...@@ -272,28 +278,40 @@ class loop(AbstractLoop):
# libuv likes to abort() the process in this case. # libuv likes to abort() the process in this case.
return return
libuv.gevent_close_all_handles(ptr)
closed_failed = libuv.uv_loop_close(ptr) closed_failed = libuv.uv_loop_close(ptr)
if closed_failed: if closed_failed:
assert closed_failed == libuv.UV_EBUSY assert closed_failed == libuv.UV_EBUSY
# Walk the open handlers, close them, then # We already closed all the handles. Run the loop
# run the loop once to clear them out and # once to let them be cut off from the loop.
# close again.
def walk(handle, _arg):
if not libuv.uv_is_closing(handle):
libuv.uv_close(handle, ffi.NULL)
libuv.uv_walk(ptr,
ffi.callback("void(*)(uv_handle_t*,void*)",
walk),
ffi.NULL)
ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE) ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
if ran_has_more_callbacks: if ran_has_more_callbacks:
libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT) libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT)
closed_failed = libuv.uv_loop_close(ptr) closed_failed = libuv.uv_loop_close(ptr)
assert closed_failed == 0, closed_failed assert closed_failed == 0, closed_failed
# Destroy the native resources *after* we have closed
# the loop. If we do it before, walking the handles
# attached to the loop is likely to segfault.
libuv.gevent_zero_check(self._check)
libuv.gevent_zero_check(self._timer0)
libuv.gevent_zero_prepare(self._prepare)
libuv.gevent_zero_timer(self._signal_idle)
del self._check
del self._prepare
del self._signal_idle
del self._timer0
libuv.gevent_zero_loop(ptr)
# Destroy any watchers we're still holding on to.
del self._io_watchers
del self._fork_watchers
del self._child_watchers
def _can_destroy_loop(self, ptr): def _can_destroy_loop(self, ptr):
# We're being asked to destroy a loop that's, # We're being asked to destroy a loop that's,
# at the time it was constructed, was the default loop. # at the time it was constructed, was the default loop.
...@@ -324,6 +342,7 @@ class loop(AbstractLoop): ...@@ -324,6 +342,7 @@ class loop(AbstractLoop):
'closing']) 'closing'])
handles = [] handles = []
# XXX: Convert this to a modern callback.
def walk(handle, _arg): def walk(handle, _arg):
data = handle.data data = handle.data
if data: if data:
...@@ -446,6 +465,9 @@ class loop(AbstractLoop): ...@@ -446,6 +465,9 @@ class loop(AbstractLoop):
def _sigchld_callback(self): def _sigchld_callback(self):
# Signals can arrive at (relatively) any time. To eliminate
# race conditions, and behave more like libev, we "queue"
# sigchld to run when we run callbacks.
while True: while True:
try: try:
pid, status, _usage = os.wait3(os.WNOHANG) pid, status, _usage = os.wait3(os.WNOHANG)
...@@ -457,8 +479,26 @@ class loop(AbstractLoop): ...@@ -457,8 +479,26 @@ class loop(AbstractLoop):
break break
children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, []) children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, [])
for watcher in children_watchers: for watcher in children_watchers:
watcher._set_status(status) self.run_callback(watcher._set_waitpid_status, pid, status)
# Don't invoke child watchers for 0 more than once
self._child_watchers[0] = []
def _register_child_watcher(self, watcher):
self._child_watchers[watcher._pid].append(watcher)
def _unregister_child_watcher(self, watcher):
try:
# stop() should be idempotent
self._child_watchers[watcher._pid].remove(watcher)
except ValueError:
pass
# Now's a good time to clean up any dead lists we don't need
# anymore
for pid in list(self._child_watchers):
if not self._child_watchers[pid]:
del self._child_watchers[pid]
def io(self, fd, events, ref=True, priority=None): def io(self, fd, events, ref=True, priority=None):
# We rely on hard references here and explicit calls to # We rely on hard references here and explicit calls to
......
...@@ -541,16 +541,13 @@ class child(_SimulatedWithAsyncMixin, ...@@ -541,16 +541,13 @@ class child(_SimulatedWithAsyncMixin,
def _register_loop_callback(self): def _register_loop_callback(self):
self.loop._child_watchers[self._pid].append(self) self.loop._register_child_watcher(self)
def _unregister_loop_callback(self): def _unregister_loop_callback(self):
try: self.loop._unregister_child_watcher(self)
# stop() should be idempotent
self.loop._child_watchers[self._pid].remove(self)
except ValueError:
pass
def _set_status(self, status): def _set_waitpid_status(self, pid, status):
self._rpid = pid
self._rstatus = status self._rstatus = status
self._async.send() self._async.send()
......
...@@ -323,6 +323,11 @@ if hasattr(os, 'fork'): ...@@ -323,6 +323,11 @@ if hasattr(os, 'fork'):
if pid in _watched_children: if pid in _watched_children:
# yes, we're watching it # yes, we're watching it
# Note that the remainder of this code must be careful to NOT
# yield to the event loop except at well known times, or
# we have a race condition between the _on_child callback and the
# code here that could lead to a process to hang.
if options & _WNOHANG or isinstance(_watched_children[pid], tuple): if options & _WNOHANG or isinstance(_watched_children[pid], tuple):
# We're either asked not to block, or it already finished, in which # We're either asked not to block, or it already finished, in which
# case blocking doesn't matter # case blocking doesn't matter
...@@ -339,7 +344,10 @@ if hasattr(os, 'fork'): ...@@ -339,7 +344,10 @@ if hasattr(os, 'fork'):
# cooperative. We know it's our child, etc, so this should work. # cooperative. We know it's our child, etc, so this should work.
watcher = _watched_children[pid] watcher = _watched_children[pid]
# We can't start a watcher that's already started, # We can't start a watcher that's already started,
# so we can't reuse the existing watcher. # so we can't reuse the existing watcher. Notice that the
# old watcher must not have fired already, or during this time, but
# only after we successfully `start()` the watcher. So this must
# not yield to the event loop.
with watcher.loop.child(pid, False) as new_watcher: with watcher.loop.child(pid, False) as new_watcher:
get_hub().wait(new_watcher) get_hub().wait(new_watcher)
# Ok, so now the new watcher is done. That means # Ok, so now the new watcher is done. That means
......
...@@ -69,6 +69,7 @@ from greentest.skipping import skipOnPyPyOnCI ...@@ -69,6 +69,7 @@ from greentest.skipping import skipOnPyPyOnCI
from greentest.skipping import skipOnPyPy3 from greentest.skipping import skipOnPyPy3
from greentest.skipping import skipIf from greentest.skipping import skipIf
from greentest.skipping import skipOnLibuv from greentest.skipping import skipOnLibuv
from greentest.skipping import skipOnLibuvOnWin
from greentest.skipping import skipOnLibuvOnCI from greentest.skipping import skipOnLibuvOnCI
from greentest.skipping import skipOnLibuvOnCIOnPyPy from greentest.skipping import skipOnLibuvOnCIOnPyPy
from greentest.skipping import skipOnLibuvOnPyPyOnWin from greentest.skipping import skipOnLibuvOnPyPyOnWin
......
...@@ -41,6 +41,7 @@ skipOnPyPy3OnCI = _do_not_skip ...@@ -41,6 +41,7 @@ skipOnPyPy3OnCI = _do_not_skip
skipOnPyPy3 = _do_not_skip skipOnPyPy3 = _do_not_skip
skipOnLibuv = _do_not_skip skipOnLibuv = _do_not_skip
skipOnLibuvOnWin = _do_not_skip
skipOnLibuvOnCI = _do_not_skip skipOnLibuvOnCI = _do_not_skip
skipOnLibuvOnCIOnPyPy = _do_not_skip skipOnLibuvOnCIOnPyPy = _do_not_skip
skipOnLibuvOnPyPyOnWin = _do_not_skip skipOnLibuvOnPyPyOnWin = _do_not_skip
...@@ -91,5 +92,7 @@ if sysinfo.LIBUV: ...@@ -91,5 +92,7 @@ if sysinfo.LIBUV:
if sysinfo.PYPY: if sysinfo.PYPY:
skipOnLibuvOnCIOnPyPy = unittest.skip skipOnLibuvOnCIOnPyPy = unittest.skip
if sysinfo.PYPY and sysinfo.WIN: if sysinfo.WIN:
skipOnLibuvOnWin = unittest.skip
if sysinfo.PYPY:
skipOnLibuvOnPyPyOnWin = unittest.skip skipOnLibuvOnPyPyOnWin = unittest.skip
# pylint:disable=no-member
from __future__ import absolute_import, print_function, division
import sys import sys
import unittest import unittest
import greentest import greentest
...@@ -6,13 +7,14 @@ import greentest ...@@ -6,13 +7,14 @@ import greentest
from gevent import core from gevent import core
class TestCore(unittest.TestCase): class TestCore(unittest.TestCase):
def test_get_version(self): def test_get_version(self):
version = core.get_version() version = core.get_version() # pylint: disable=no-member
self.assertIsInstance(version, str) self.assertIsInstance(version, str)
self.assertTrue(version) self.assertTrue(version)
header_version = core.get_header_version() header_version = core.get_header_version() # pylint: disable=no-member
self.assertIsInstance(header_version, str) self.assertIsInstance(header_version, str)
self.assertTrue(header_version) self.assertTrue(header_version)
self.assertEqual(version, header_version) self.assertEqual(version, header_version)
...@@ -20,8 +22,18 @@ class TestCore(unittest.TestCase): ...@@ -20,8 +22,18 @@ class TestCore(unittest.TestCase):
class TestWatchers(unittest.TestCase): class TestWatchers(unittest.TestCase):
def makeOne(self): def _makeOne(self):
return core.loop() return core.loop() # pylint:disable=no-member
def destroyOne(self, loop):
loop.destroy()
def setUp(self):
self.loop = self._makeOne()
def tearDown(self):
self.destroyOne(self.loop)
del self.loop
def test_io(self): def test_io(self):
if sys.platform == 'win32': if sys.platform == 'win32':
...@@ -31,47 +43,62 @@ class TestWatchers(unittest.TestCase): ...@@ -31,47 +43,62 @@ class TestWatchers(unittest.TestCase):
else: else:
Error = ValueError Error = ValueError
win32 = False win32 = False
with self.assertRaises(Error): with self.assertRaises(Error):
self.makeOne().io(-1, 1) self.loop.io(-1, 1)
if hasattr(core, 'TIMER'): if hasattr(core, 'TIMER'):
# libev # libev
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.makeOne().io(1, core.TIMER) self.loop.io(1, core.TIMER) # pylint:disable=no-member
# Test we can set events and io before it's started # Test we can set events and io before it's started
if not win32: if not win32:
# We can't do this with arbitrary FDs on windows; # We can't do this with arbitrary FDs on windows;
# see libev_vfd.h # see libev_vfd.h
io = self.makeOne().io(1, core.READ) io = self.loop.io(1, core.READ) # pylint:disable=no-member
io.fd = 2 io.fd = 2
self.assertEqual(io.fd, 2) self.assertEqual(io.fd, 2)
io.events = core.WRITE io.events = core.WRITE # pylint:disable=no-member
if not hasattr(core, 'libuv'): if not hasattr(core, 'libuv'):
# libev # libev
# pylint:disable=no-member
self.assertEqual(core._events_to_str(io.events), 'WRITE|_IOFDSET') self.assertEqual(core._events_to_str(io.events), 'WRITE|_IOFDSET')
else: else:
self.assertEqual(core._events_to_str(io.events), 'WRITE')
self.assertEqual(core._events_to_str(io.events), # pylint:disable=no-member
'WRITE')
io.start(lambda: None) io.start(lambda: None)
io.close() io.close()
def test_timer_constructor(self): def test_timer_constructor(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.makeOne().timer(1, -1) self.loop.timer(1, -1)
def test_signal_constructor(self): def test_signal_constructor(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
self.makeOne().signal(1000) self.loop.signal(1000)
class TestWatchersDefault(TestWatchers): class TestWatchersDefault(TestWatchers):
def makeOne(self): def _makeOne(self):
return core.loop(default=True) return core.loop(default=True) # pylint:disable=no-member
def destroyOne(self, loop):
return
@greentest.skipOnLibuvOnPyPyOnWin("This crashes with PyPy 5.10.0, only on Windows. " # XXX: The crash may be fixed? The hang showed up after the crash was
"See https://ci.appveyor.com/project/denik/gevent/build/1.0.1380/job/lrlvid6mkjtyrhn5#L1103") # reproduced and fixed on linux and OS X.
@greentest.skipOnLibuvOnWin(
"This crashes with PyPy 5.10.0, only on Windows. "
"See https://ci.appveyor.com/project/denik/gevent/build/1.0.1380/job/lrlvid6mkjtyrhn5#L1103 "
"It has also timed out, but only on Appveyor CPython 3.6; local CPython 3.6 does not. "
"See https://ci.appveyor.com/project/denik/gevent/build/1.0.1414/job/yn7yi8b53vtqs8lw#L1523")
class TestWatchersDefaultDestroyed(TestWatchers): class TestWatchersDefaultDestroyed(TestWatchers):
def makeOne(self): def _makeOne(self):
# pylint: disable=no-member
l = core.loop(default=True) l = core.loop(default=True)
l.destroy() l.destroy()
del l del l
...@@ -81,6 +108,7 @@ class TestWatchersDefaultDestroyed(TestWatchers): ...@@ -81,6 +108,7 @@ class TestWatchersDefaultDestroyed(TestWatchers):
class TestLibev(unittest.TestCase): class TestLibev(unittest.TestCase):
def test_flags_conversion(self): def test_flags_conversion(self):
# pylint: disable=no-member
if sys.platform != 'win32': if sys.platform != 'win32':
self.assertEqual(core.loop(2, default=False).backend_int, 2) self.assertEqual(core.loop(2, default=False).backend_int, 2)
self.assertEqual(core.loop('select', default=False).backend, 'select') self.assertEqual(core.loop('select', default=False).backend, 'select')
...@@ -94,11 +122,14 @@ class TestLibev(unittest.TestCase): ...@@ -94,11 +122,14 @@ class TestLibev(unittest.TestCase):
class TestEvents(unittest.TestCase): class TestEvents(unittest.TestCase):
def test_events_conversion(self): def test_events_conversion(self):
self.assertEqual(core._events_to_str(core.READ | core.WRITE), 'READ|WRITE') self.assertEqual(core._events_to_str(core.READ | core.WRITE), # pylint: disable=no-member
'READ|WRITE')
def test_EVENTS(self): def test_EVENTS(self):
self.assertEqual(str(core.EVENTS), 'gevent.core.EVENTS') self.assertEqual(str(core.EVENTS), # pylint: disable=no-member
self.assertEqual(repr(core.EVENTS), 'gevent.core.EVENTS') 'gevent.core.EVENTS')
self.assertEqual(repr(core.EVENTS), # pylint: disable=no-member
'gevent.core.EVENTS')
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment