Commit b0f738c2 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1194 from gevent/libuv-queue

Queue all libuv callbacks and run them at the end of the iteration
parents d8e3b6c5 c337edea
......@@ -32,6 +32,10 @@
- A started monitor thread for the active hub now survives a fork. See
:issue:`1185`.
- libuv now collects all pending watchers and runs their callbacks at
the end of the loop iteration using UV_RUN_ONCE. This eliminates the
need to patch libuv to be greenlet-safe.
1.3b1 (2018-04-13)
==================
......
......@@ -12,18 +12,9 @@
Updating libuv
==============
- Apply the gevent-libuv.patch to updates of libuv.
[deps] $ patch -p0 < gevent-libuv.patch
- Clean up the libuv tree:
- rm -rf libuv/.github
- rm -rf libuv/docs
- rm -rf libuv/samples
- rm -rf libuv/test
- rm -rf libuv/tools
- Create new patches by downloading the source tarball:
[deps] $ tar -xf libuv-v1.20.1.tar.gz
[deps] $ diff -r -u libuv-v1.20.1/ libuv > gevent-libuv.patch
diff -r -u libuv-v1.20.1/include/uv-unix.h libuv/include/uv-unix.h
--- libuv-v1.20.1/include/uv-unix.h 2018-04-18 08:18:43.000000000 -0500
+++ libuv/include/uv-unix.h 2018-04-20 12:16:19.000000000 -0500
@@ -207,8 +207,11 @@
uv_handle_t* closing_handles; \
void* process_handles[2]; \
void* prepare_handles[2]; \
+ void* prepare_handles_queue[2]; \
void* check_handles[2]; \
+ void* check_handles_queue[2]; \
void* idle_handles[2]; \
+ void* idle_handles_queue[2]; \
void* async_handles[2]; \
void (*async_unused)(void); /* TODO(bnoordhuis) Remove in libuv v2. */ \
uv__io_t async_io_watcher; \
diff -r -u libuv-v1.20.1/src/unix/loop-watcher.c libuv/src/unix/loop-watcher.c
--- libuv-v1.20.1/src/unix/loop-watcher.c 2018-04-18 08:18:43.000000000 -0500
+++ libuv/src/unix/loop-watcher.c 2018-04-20 13:59:36.000000000 -0500
@@ -22,6 +22,20 @@
#include "uv.h"
#include "internal.h"
+/*
+ * gevent: Fix for https://github.com/gevent/gevent/issues/1126
+ *
+ * Using a stack-based queue variable in uv__run_* badly breaks for
+ * certain stack manipulations when greenlets switch. Windows keeps
+ * the stack in the loop. We originally used malloc/free in uv__run_
+ * to avoid changing any files but this one, but that benchmarked
+ * fairly slow and widely variable across processes
+ * (https://groups.google.com/d/msg/libuv/8BxOk40Dii4/Ke1yotOQBwAJ) so
+ * we moved them to the loop. We can't use global static variables
+ * because of multiple threads.
+ */
+#include <stdlib.h>
+
#define UV_LOOP_WATCHER_DEFINE(name, type) \
int uv_##name##_init(uv_loop_t* loop, uv_##name##_t* handle) { \
uv__handle_init(loop, (uv_handle_t*)handle, UV_##type); \
@@ -47,10 +61,10 @@
\
void uv__run_##name(uv_loop_t* loop) { \
uv_##name##_t* h; \
- QUEUE queue; \
+ QUEUE* queue = &loop->name##_handles_queue; \
QUEUE* q; \
- QUEUE_MOVE(&loop->name##_handles, &queue); \
- while (!QUEUE_EMPTY(&queue)) { \
+ QUEUE_MOVE(&loop->name##_handles, queue); \
+ while (!QUEUE_EMPTY(queue)) { \
q = QUEUE_HEAD(&queue); \
h = QUEUE_DATA(q, uv_##name##_t, queue); \
QUEUE_REMOVE(q); \
......@@ -207,11 +207,8 @@ typedef struct {
uv_handle_t* closing_handles; \
void* process_handles[2]; \
void* prepare_handles[2]; \
void* prepare_handles_queue[2]; \
void* check_handles[2]; \
void* check_handles_queue[2]; \
void* idle_handles[2]; \
void* idle_handles_queue[2]; \
void* async_handles[2]; \
void (*async_unused)(void); /* TODO(bnoordhuis) Remove in libuv v2. */ \
uv__io_t async_io_watcher; \
......
......@@ -22,20 +22,6 @@
#include "uv.h"
#include "internal.h"
/*
* gevent: Fix for https://github.com/gevent/gevent/issues/1126
*
* Using a stack-based queue variable in uv__run_* badly breaks for
* certain stack manipulations when greenlets switch. Windows keeps
* the stack in the loop. We originally used malloc/free in uv__run_
* to avoid changing any files but this one, but that benchmarked
* fairly slow and widely variable across processes
* (https://groups.google.com/d/msg/libuv/8BxOk40Dii4/Ke1yotOQBwAJ) so
* we moved them to the loop. We can't use global static variables
* because of multiple threads.
*/
#include <stdlib.h>
#define UV_LOOP_WATCHER_DEFINE(name, type) \
int uv_##name##_init(uv_loop_t* loop, uv_##name##_t* handle) { \
uv__handle_init(loop, (uv_handle_t*)handle, UV_##type); \
......@@ -61,10 +47,10 @@
\
void uv__run_##name(uv_loop_t* loop) { \
uv_##name##_t* h; \
QUEUE* queue = &loop->name##_handles_queue; \
QUEUE queue; \
QUEUE* q; \
QUEUE_MOVE(&loop->name##_handles, queue); \
while (!QUEUE_EMPTY(queue)) { \
QUEUE_MOVE(&loop->name##_handles, &queue); \
while (!QUEUE_EMPTY(&queue)) { \
q = QUEUE_HEAD(&queue); \
h = QUEUE_DATA(q, uv_##name##_t, queue); \
QUEUE_REMOVE(q); \
......
......@@ -241,6 +241,9 @@ differences in the way gevent behaves using libuv compared to libev.
has some support for priorities and this is exposed in the low-level
gevent API, but it was never documented.
- Low-level ``prepare`` watchers are not available. gevent uses
prepare watchers for internal purposes.
Performance
===========
......
......@@ -98,6 +98,7 @@ class AbstractCallbacks(object):
This function should never return 0, as that's the default value that
Python exceptions will produce.
"""
#print("Running callback", handle)
orig_ffi_watcher = None
try:
# Even dereferencing the handle needs to be inside the try/except;
......@@ -121,6 +122,7 @@ class AbstractCallbacks(object):
args = _NOARGS
if args and args[0] == GEVENT_CORE_EVENTS:
args = (revents, ) + args[1:]
#print("Calling function", the_watcher.callback, args)
the_watcher.callback(*args)
except: # pylint:disable=bare-except
_dbg("Got exception servicing watcher with handle", handle, sys.exc_info())
......
......@@ -124,6 +124,8 @@ class ILoop(Interface):
"""
Create and return a watcher that fires before the event loop
polls for IO.
.. caution:: This method is not supported by libuv.
"""
def check(ref=True, priority=None):
......
......@@ -36,6 +36,17 @@ class LoopExit(Exception):
"""
def __repr__(self):
if len(self.args) == 3: # From the hub
import pprint
return "%s\n\tHub: %s\n\tHandles:\n%s" % (
self.args[0], self.args[1],
pprint.pformat(self.args[2])
)
return Exception.__repr__(self)
def __str__(self):
return repr(self)
class BlockingSwitchOutError(AssertionError):
"""
......
......@@ -79,6 +79,7 @@ enum uv_fs_event_flags {
const char* uv_strerror(int);
const char* uv_err_name(int);
const char* uv_version_string(void);
const char* uv_handle_type_name(uv_handle_type type);
// handle structs and types
struct uv_loop_s {
......@@ -362,6 +363,7 @@ extern "Python" {
// libuv specific callback
void _uv_close_callback(uv_handle_t* handle);
void python_sigchld_callback(uv_signal_t* handle, int signum);
void python_queue_callback(uv_handle_t* handle, int revents);
}
// A variable we fill in.
static void (*gevent_noop)(void* handle);
......
......@@ -4,6 +4,7 @@
typedef void* GeventWatcherObject;
static int python_callback(GeventWatcherObject handle, int revents);
static void python_queue_callback(uv_handle_t* watcher_ptr, int revents);
static void python_handle_error(GeventWatcherObject handle, int revents);
static void python_stop(GeventWatcherObject handle);
......@@ -11,7 +12,7 @@ static void _gevent_noop(void* handle) {}
static void (*gevent_noop)(void* handle) = &_gevent_noop;
static void _gevent_generic_callback1(uv_handle_t* watcher, int arg)
static void _gevent_generic_callback1_unused(uv_handle_t* watcher, int arg)
{
// Python code may set this to NULL or even change it
// out from under us, which would tend to break things.
......@@ -66,6 +67,11 @@ static void _gevent_generic_callback1(uv_handle_t* watcher, int arg)
}
static void _gevent_generic_callback1(uv_handle_t* watcher, int arg)
{
python_queue_callback(watcher, arg);
}
static void _gevent_generic_callback0(uv_handle_t* handle)
{
_gevent_generic_callback1(handle, 0);
......
......@@ -37,10 +37,18 @@ class _Callbacks(AbstractCallbacks):
def python_timer0_callback(self, watcher_ptr):
return self.python_prepare_callback(watcher_ptr)
def python_queue_callback(self, watcher_ptr, revents):
watcher_handle = watcher_ptr.data
the_watcher = self.from_handle(watcher_handle)
the_watcher.loop._queue_callback(watcher_ptr, revents)
_callbacks = assign_standard_callbacks(
ffi, libuv, _Callbacks,
[('python_sigchld_callback', None),
('python_timer0_callback', None)])
('python_timer0_callback', None),
('python_queue_callback', None)])
from gevent._ffi.loop import EVENTS
GEVENT_CORE_EVENTS = EVENTS # export
......@@ -92,6 +100,10 @@ class loop(AbstractLoop):
self._fork_watchers = set()
self._pid = os.getpid()
self._default = self._ptr == libuv.uv_default_loop()
self._queued_callbacks = []
def _queue_callback(self, watcher_ptr, revents):
self._queued_callbacks.append((watcher_ptr, revents))
def _init_loop(self, flags, default):
if default is None:
......@@ -128,7 +140,7 @@ class loop(AbstractLoop):
# and call into its check and prepare handlers.
# Note that this basically forces us into a busy-loop
# XXX: As predicted, using an idle watcher causes our process
# to eat 100% CPU time. We instead use a timer with a max of a 1 second
# to eat 100% CPU time. We instead use a timer with a max of a .3 second
# delay to notice signals. Note that this timeout also implements fork
# watchers, effectively.
......@@ -152,6 +164,30 @@ class loop(AbstractLoop):
self._pid = curpid
for watcher in self._fork_watchers:
watcher._on_fork()
# The contents of queued_callbacks at this point should be timers
# that expired when the loop began along with any idle watchers.
# We need to run them so that any manual callbacks they want to schedule
# get added to the list and ran next before we go on to poll for IO.
# This is critical for libuv on linux: closing a socket schedules some manual
# callbacks to actually stop the watcher; if those don't run before
# we poll for IO, then libuv can abort the process for the closed file descriptor.
# XXX: There's still a race condition here because we may not run *all* the manual
# callbacks. We need a way to prioritize those.
# Running these before the manual callbacks lead to some
# random test failures. In test__event.TestEvent_SetThenClear
# we would get a LoopExit sometimes. The problem occurred when
# a timer expired on entering the first loop; we would process
# it there, and then process the callback that it created
# below, leaving nothing for the loop to do. Having the
# self.run() manually process manual callbacks before
# continuing solves the problem. (But we must still run callbacks
# here again.)
self._prepare_ran_callbacks = self.__run_queued_callbacks()
super(loop, self)._run_callbacks()
def _init_and_start_prepare(self):
......@@ -324,15 +360,9 @@ class loop(AbstractLoop):
"""
Return all the handles that are open and their ref status.
"""
# XXX: Disabled because, at least on Windows, the times this
# gets called often produce `SystemError: ffi.from_handle():
# dead or bogus handle object`, and sometimes that crashes the process.
return []
def _really_debug(self):
handle_state = namedtuple("HandleState",
['handle',
'type',
'watcher',
'ref',
'active',
......@@ -347,6 +377,7 @@ class loop(AbstractLoop):
else:
watcher = None
handles.append(handle_state(handle,
ffi.string(libuv.uv_handle_type_name(handle.type)),
watcher,
libuv.uv_has_ref(handle),
libuv.uv_is_active(handle),
......@@ -392,6 +423,33 @@ class loop(AbstractLoop):
# In 1.12, the uv_loop_fork function was added (by gevent!)
libuv.uv_loop_fork(self._ptr)
_prepare_ran_callbacks = False
def __run_queued_callbacks(self):
if not self._queued_callbacks:
return False
cbs = list(self._queued_callbacks)
self._queued_callbacks = []
for watcher_ptr, arg in cbs:
handle = watcher_ptr.data
if not handle:
# It's been stopped and possibly closed
assert not libuv.uv_is_active(watcher_ptr)
continue
val = _callbacks.python_callback(handle, arg)
if val == -1:
_callbacks.python_handle_error(handle, arg)
elif val == 1:
if not libuv.uv_is_active(watcher_ptr):
if watcher_ptr.data != handle:
if watcher_ptr.data:
_callbacks.python_stop(None)
else:
_callbacks.python_stop(handle)
return True
def run(self, nowait=False, once=False):
# we can only respect one flag or the other.
......@@ -402,17 +460,33 @@ class loop(AbstractLoop):
if nowait:
mode = libuv.UV_RUN_NOWAIT
# if mode == libuv.UV_RUN_DEFAULT:
# print("looping in python")
# ptr = self._ptr
# ran_error = 0
# while ran_error == 0:
# ran_error = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
# if ran_error != 0:
# print("Error running loop", libuv.uv_err_name(ran_error),
# libuv.uv_strerror(ran_error))
# return ran_error
return libuv.uv_run(self._ptr, mode)
if mode == libuv.UV_RUN_DEFAULT:
while self._ptr and self._ptr.data:
# This is here to better preserve order guarantees. See _run_callbacks
# for details.
# It may get run again from the prepare watcher, so potentially we
# could take twice as long as the switch interval.
self._run_callbacks()
self._prepare_ran_callbacks = False
ran_status = libuv.uv_run(self._ptr, libuv.UV_RUN_ONCE)
# Note that we run queued callbacks when the prepare watcher runs,
# thus accounting for timers that expired before polling for IO,
# and idle watchers. This next call should get IO callbacks and
# callbacks from timers that expired *after* polling for IO.
ran_callbacks = self.__run_queued_callbacks()
if not ran_status and not ran_callbacks and not self._prepare_ran_callbacks:
# A return of 0 means there are no referenced and
# active handles. The loop is over.
# If we didn't run any callbacks, then we couldn't schedule
# anything to switch in the future, so there's no point
# running again.
return ran_status
return 0 # Somebody closed the loop
result = libuv.uv_run(self._ptr, mode)
self.__run_queued_callbacks()
return result
def now(self):
# libuv's now is expressed as an integer number of
......@@ -515,9 +589,12 @@ class loop(AbstractLoop):
return io_watcher.multiplex(events)
def timer(self, after, repeat=0.0, ref=True, priority=None):
if after <= 0 and repeat <= 0:
# Make sure we can spin the loop. See timer.
# XXX: Note that this doesn't have a `again` method.
return self._watchers.OneShotCheck(self, ref, priority)
return super(loop, self).timer(after, repeat, ref, priority)
def prepare(self, ref=True, priority=None):
# We run arbitrary code in python_prepare_callback. That could switch
# greenlets. If it does that while also manipulating the active prepare
# watchers, we could corrupt the process state, since the prepare watcher
# queue is iterated on the stack (on unix). We could workaround this by implementing
# prepare watchers in pure Python.
# See https://github.com/gevent/gevent/issues/1126
raise TypeError("prepare watchers are not currently supported in libuv. "
"If you need them, please contact the maintainers.")
......@@ -332,23 +332,33 @@ def print_list(lst):
for name in lst:
log(' - %s', name)
def _setup_environ():
def _setup_environ(debug=False):
if 'PYTHONWARNINGS' not in os.environ and not sys.warnoptions:
# Enable default warnings such as ResourceWarning.
# On Python 3[.6], the system site.py module has
# "open(fullname, 'rU')" which produces the warning that
# 'U' is deprecated, so ignore warnings from site.py
# importlib/_bootstrap.py likes to spit out "ImportWarning:
# can't resolve package from __spec__ or __package__, falling
# back on __name__ and __path__". I have no idea what that means, but it seems harmless
# and is annoying.
os.environ['PYTHONWARNINGS'] = 'default,ignore:::site:,ignore:::importlib._bootstrap:,ignore:::importlib._bootstrap_external:'
# action:message:category:module:line
os.environ['PYTHONWARNINGS'] = ','.join([
# Enable default warnings such as ResourceWarning.
'default',
# On Python 3[.6], the system site.py module has
# "open(fullname, 'rU')" which produces the warning that
# 'U' is deprecated, so ignore warnings from site.py
'ignore:::site:',
# pkgutil on Python 2 complains about missing __init__.py
'ignore:::pkgutil',
# importlib/_bootstrap.py likes to spit out "ImportWarning:
# can't resolve package from __spec__ or __package__, falling
# back on __name__ and __path__". I have no idea what that means, but it seems harmless
# and is annoying.
'ignore:::importlib._bootstrap:',
'ignore:::importlib._bootstrap_external:',
# importing ABCs from collections, not collections.abc
'ignore:::pkg_resources._vendor.pyparsing:',
])
if 'PYTHONFAULTHANDLER' not in os.environ:
os.environ['PYTHONFAULTHANDLER'] = 'true'
if 'GEVENT_DEBUG' not in os.environ:
if 'GEVENT_DEBUG' not in os.environ and debug:
os.environ['GEVENT_DEBUG'] = 'debug'
if 'PYTHONTRACEMALLOC' not in os.environ:
......@@ -380,6 +390,7 @@ def main():
parser.add_argument("--coverage", action="store_true")
parser.add_argument("--quiet", action="store_true", default=True)
parser.add_argument("--verbose", action="store_false", dest='quiet')
parser.add_argument("--debug", action="store_true", default=False)
parser.add_argument('tests', nargs='*')
options = parser.parse_args()
FAILING_TESTS = []
......@@ -398,7 +409,7 @@ def main():
os.environ['COVERAGE_FILE'] = os.path.abspath(".") + os.sep + ".coverage"
print("Enabling coverage to", os.environ['COVERAGE_FILE'])
_setup_environ()
_setup_environ(debug=options.debug)
if options.config:
config = {}
......
......@@ -162,6 +162,15 @@ if PYPY:
'test__socket_dns.py',
]
if LIBUV:
IGNORED_TESTS += [
# This hangs for no apparent reason when run by the testrunner,
# even wher maked standalone
# when run standalone from the command line, it's fine.
# Issue in pypy2 6.0?
'test__monkey_sigchld_2.py',
]
if TRAVIS:
FAILING_TESTS += [
# This fails to get the correct results, sometimes. I can't reproduce locally
......
......@@ -424,7 +424,7 @@ class ConditionTests(BaseTestCase):
self.assertEqual(len(results), 5)
for dt in results:
# XXX: libuv sometimes produces 0.19958
self.assertTimeWithinRange(dt, 0.2, 2.0)
self.assertTimeWithinRange(dt, 0.19, 2.0)
class BaseSemaphoreTests(BaseTestCase):
......
from __future__ import print_function
from gevent import config
import greentest
from greentest import TestCase
from greentest import main
from greentest import LARGE_TIMEOUT
from greentest.sysinfo import CFFI_BACKEND
......@@ -11,12 +11,14 @@ class Test(TestCase):
__timeout__ = LARGE_TIMEOUT
repeat = 0
timer_duration = 0.001
def setUp(self):
super(Test, self).setUp()
self.called = []
self.loop = config.loop(default=False)
self.timer = self.loop.timer(0.001, repeat=self.repeat)
self.timer = self.loop.timer(self.timer_duration, repeat=self.repeat)
assert not self.loop.default
def cleanup(self):
# cleanup instead of tearDown to cooperate well with
......@@ -86,5 +88,67 @@ class TestAgain(Test):
self.assertTimerNotInKeepalive()
class TestTimerResolution(Test):
def test_resolution(self):
# Make sure that having an active IO watcher
# doesn't badly throw off our timer resolution.
# (This was a specific problem with libuv)
# https://github.com/gevent/gevent/pull/1194
from gevent._compat import perf_counter
import socket
s = socket.socket()
self._close_on_teardown(s)
fd = s.fileno()
ran_at_least_once = False
fired_at = []
def timer_counter():
fired_at.append(perf_counter())
loop = self.loop
timer_multiplier = 11
max_time = self.timer_duration * timer_multiplier
assert max_time < 0.3
for _ in range(150):
# in libuv, our signal timer fires every 300ms; depending on
# when this runs, we could artificially get a better
# resolution than we expect. Run it multiple times to be more sure.
io = loop.io(fd, 1)
io.start(lambda events=None: None)
now = perf_counter()
del fired_at[:]
timer = self.timer
timer.start(timer_counter)
loop.run(once=True)
io.stop()
io.close()
timer.stop()
if fired_at:
ran_at_least_once = True
self.assertEqual(1, len(fired_at))
self.assertTimeWithinRange(fired_at[0] - now,
0,
max_time)
if not greentest.RUNNING_ON_CI:
# Hmm, this always fires locally on mocOS but
# not an Travis?
self.assertTrue(ran_at_least_once)
if __name__ == '__main__':
main()
greentest.main()
......@@ -176,8 +176,8 @@ class TestEvent_SetThenClear(greentest.TestCase):
gevent.sleep(0.001)
e.set()
e.clear()
for t in waiters:
t.join()
for greenlet in waiters:
greenlet.join()
class TestEvent_SetThenClear100(TestEvent_SetThenClear):
......
......@@ -351,18 +351,22 @@ class TestNoWait(TestCase):
def store_result(func, *args):
result.append(func(*args))
assert q.empty(), q
assert not q.full(), q
self.assertTrue(q.empty(), q)
self.assertFalse(q.full(), q)
gevent.sleep(0.001)
assert q.empty(), q
assert not q.full(), q
self.assertTrue(q.empty(), q)
self.assertFalse(q.full(), q)
get_hub().loop.run_callback(store_result, q.put_nowait, 10)
assert not p.ready(), p
self.assertFalse(p.ready(), p)
gevent.sleep(0.001)
assert result == [None], result
assert p.ready(), p
assert not q.full(), q
assert q.empty(), q
self.assertEqual(result, [None])
self.assertTrue(p.ready(), p)
self.assertFalse(q.full(), q)
self.assertTrue(q.empty(), q)
class TestJoinEmpty(TestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment