Commit f2f08825 authored by Jason Madden's avatar Jason Madden

Queue all libuv callbacks and run them at the end.

This should eliminate the need to patch and the need to avoid 0 duration timers.

This works more like libev. We could theoretically implement priorities using this system.
parent ab5272c5
......@@ -32,6 +32,10 @@
- A started monitor thread for the active hub now survives a fork. See
:issue:`1185`.
- libuv now collects all pending watchers and runs their callbacks at
the end of the loop iteration using UV_RUN_ONCE. This eliminates the
need to patch libuv to be greenlet-safe.
1.3b1 (2018-04-13)
==================
......
......@@ -98,6 +98,7 @@ class AbstractCallbacks(object):
This function should never return 0, as that's the default value that
Python exceptions will produce.
"""
#print("Running callback", handle)
orig_ffi_watcher = None
try:
# Even dereferencing the handle needs to be inside the try/except;
......@@ -121,6 +122,7 @@ class AbstractCallbacks(object):
args = _NOARGS
if args and args[0] == GEVENT_CORE_EVENTS:
args = (revents, ) + args[1:]
#print("Calling function", the_watcher.callback, args)
the_watcher.callback(*args)
except: # pylint:disable=bare-except
_dbg("Got exception servicing watcher with handle", handle, sys.exc_info())
......
......@@ -36,6 +36,17 @@ class LoopExit(Exception):
"""
def __repr__(self):
if len(self.args) == 3: # From the hub
import pprint
return "%s\n\tHub: %s\n\tHandles:\n%s" % (
self.args[0], self.args[1],
pprint.pformat(self.args[2])
)
return Exception.__repr__(self)
def __str__(self):
return repr(self)
class BlockingSwitchOutError(AssertionError):
"""
......
......@@ -79,6 +79,7 @@ enum uv_fs_event_flags {
const char* uv_strerror(int);
const char* uv_err_name(int);
const char* uv_version_string(void);
const char* uv_handle_type_name(uv_handle_type type);
// handle structs and types
struct uv_loop_s {
......@@ -362,6 +363,7 @@ extern "Python" {
// libuv specific callback
void _uv_close_callback(uv_handle_t* handle);
void python_sigchld_callback(uv_signal_t* handle, int signum);
void python_queue_callback(uv_handle_t* handle, int revents);
}
// A variable we fill in.
static void (*gevent_noop)(void* handle);
......
......@@ -4,6 +4,7 @@
typedef void* GeventWatcherObject;
static int python_callback(GeventWatcherObject handle, int revents);
static void python_queue_callback(uv_handle_t* watcher_ptr, int revents);
static void python_handle_error(GeventWatcherObject handle, int revents);
static void python_stop(GeventWatcherObject handle);
......@@ -11,7 +12,7 @@ static void _gevent_noop(void* handle) {}
static void (*gevent_noop)(void* handle) = &_gevent_noop;
static void _gevent_generic_callback1(uv_handle_t* watcher, int arg)
static void _gevent_generic_callback1_unused(uv_handle_t* watcher, int arg)
{
// Python code may set this to NULL or even change it
// out from under us, which would tend to break things.
......@@ -66,6 +67,11 @@ static void _gevent_generic_callback1(uv_handle_t* watcher, int arg)
}
static void _gevent_generic_callback1(uv_handle_t* watcher, int arg)
{
python_queue_callback(watcher, arg);
}
static void _gevent_generic_callback0(uv_handle_t* handle)
{
_gevent_generic_callback1(handle, 0);
......
......@@ -37,10 +37,18 @@ class _Callbacks(AbstractCallbacks):
def python_timer0_callback(self, watcher_ptr):
return self.python_prepare_callback(watcher_ptr)
def python_queue_callback(self, watcher_ptr, revents):
watcher_handle = watcher_ptr.data
the_watcher = self.from_handle(watcher_handle)
the_watcher.loop._queue_callback(watcher_ptr, revents)
_callbacks = assign_standard_callbacks(
ffi, libuv, _Callbacks,
[('python_sigchld_callback', None),
('python_timer0_callback', None)])
('python_timer0_callback', None),
('python_queue_callback', None)])
from gevent._ffi.loop import EVENTS
GEVENT_CORE_EVENTS = EVENTS # export
......@@ -92,6 +100,10 @@ class loop(AbstractLoop):
self._fork_watchers = set()
self._pid = os.getpid()
self._default = self._ptr == libuv.uv_default_loop()
self._queued_callbacks = []
def _queue_callback(self, watcher_ptr, revents):
self._queued_callbacks.append((watcher_ptr, revents))
def _init_loop(self, flags, default):
if default is None:
......@@ -324,15 +336,9 @@ class loop(AbstractLoop):
"""
Return all the handles that are open and their ref status.
"""
# XXX: Disabled because, at least on Windows, the times this
# gets called often produce `SystemError: ffi.from_handle():
# dead or bogus handle object`, and sometimes that crashes the process.
return []
def _really_debug(self):
handle_state = namedtuple("HandleState",
['handle',
'type',
'watcher',
'ref',
'active',
......@@ -347,6 +353,7 @@ class loop(AbstractLoop):
else:
watcher = None
handles.append(handle_state(handle,
ffi.string(libuv.uv_handle_type_name(handle.type)),
watcher,
libuv.uv_has_ref(handle),
libuv.uv_is_active(handle),
......@@ -392,6 +399,23 @@ class loop(AbstractLoop):
# In 1.12, the uv_loop_fork function was added (by gevent!)
libuv.uv_loop_fork(self._ptr)
def __run_queued_callbacks(self):
cbs = list(self._queued_callbacks)
self._queued_callbacks = []
for watcher_ptr, arg in cbs:
handle = watcher_ptr.data
val = _callbacks.python_callback(handle, arg)
if val == -1:
_callbacks.python_handle_error(handle, arg)
elif val == 1:
if not libuv.uv_is_active(watcher_ptr):
if watcher_ptr.data != handle:
if watcher_ptr.data:
_callbacks.python_stop(None)
else:
_callbacks.python_stop(handle)
return bool(cbs)
def run(self, nowait=False, once=False):
# we can only respect one flag or the other.
......@@ -402,17 +426,24 @@ class loop(AbstractLoop):
if nowait:
mode = libuv.UV_RUN_NOWAIT
# if mode == libuv.UV_RUN_DEFAULT:
# print("looping in python")
# ptr = self._ptr
# ran_error = 0
# while ran_error == 0:
# ran_error = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
# if ran_error != 0:
# print("Error running loop", libuv.uv_err_name(ran_error),
# libuv.uv_strerror(ran_error))
# return ran_error
return libuv.uv_run(self._ptr, mode)
if mode == libuv.UV_RUN_DEFAULT:
while self._ptr:
#print('Looping in python', self._ptr)
ran_status = libuv.uv_run(self._ptr, libuv.UV_RUN_ONCE)
#print("Looped in python", ran_status, self._queued_callbacks)
ran_callbacks = self.__run_queued_callbacks()
if not ran_status and not ran_callbacks:
# A return of 0 means there are no referenced and
# active handles. The loop is over.
# If we didn't run any callbacks, then we couldn't schedule
# anything to switch in the future, so there's no point
# running again.
return ran_status
return 0 # Somebody closed the loop
result = libuv.uv_run(self._ptr, mode)
self.__run_queued_callbacks()
return result
def now(self):
# libuv's now is expressed as an integer number of
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment