Commit 50d7d3d7 authored by Jason Madden's avatar Jason Madden

Use cffi's extern Python callbacks. Fixes #1049

parent 64f28b5b
......@@ -158,7 +158,7 @@
See :issue:`790` for history and more in-depth discussion.
- gevent is now tested on Python 3.6.3. This includes the following
- gevent is now tested on Python 3.6.4. This includes the following
fixes and changes:
- Errors raised from :mod:`gevent.subprocess` will have a
......@@ -174,6 +174,9 @@
on POSIX platforms under Python 3.6. Now it also works on Windows under
Python 3.6 (as expected) and is backported to all previous versions.
- gevent now uses cffi's "extern 'Python'" callbacks. These should be
faster and more stable. This requires at least cffi 1.4.0. See :issue:`1049`.
1.2.2 (2017-06-05)
==================
......
......@@ -12,6 +12,7 @@ pylint>=1.8.0
prospector[with_pyroma] ; python_version < '3.7'
coverage>=4.0
coveralls>=1.0
# See version requirements in setup.py
cffi
futures
# Makes tests faster
......
......@@ -105,13 +105,15 @@ else:
# Note that we don't add cffi to install_requires, it's
# optional. We tend to build and distribute wheels with the CFFI
# modules built and they can be imported if CFFI is installed.
# install_requires.append('cffi >= 1.3.0')
# We need cffi 1.4.0 for new style callbacks;
# we need cffi 1.11.3 (on CPython 3) to avoid test errors
# install_requires.append('cffi >= 1.4.0')
pass
if IGNORE_CFFI and not PYPY:
# Allow distributors to turn off CFFI builds
# even if it's available, because CFFI always embeds
# our copy of libev and they may not want that.
# our copy of libev/libuv and they may not want that.
del cffi_modules[:]
# If we are running info / help commands, or we're being imported by
......
......@@ -61,7 +61,7 @@ EVENTS = GEVENT_CORE_EVENTS = _EVENTSType()
# to Python again.
# See also https://github.com/gevent/gevent/issues/676
####
class _Callbacks(object):
class AbstractCallbacks(object):
def __init__(self, ffi):
......@@ -194,14 +194,12 @@ class _Callbacks(object):
# a value that causes the C loop to try the callback again?
# at least for signals under libuv, which are delivered at very odd times.
# Hopefully the event still shows up when we poll the next time.
watcher = None
if tb is not None:
handle = tb.tb_frame.f_locals['handle']
if handle: # handle could be NULL
watcher = self.from_handle(handle)
handle = tb.tb_frame.f_locals['handle'] if tb is not None else None
if handle: # handle could be NULL
watcher = self.from_handle(handle)
if watcher is not None:
watcher.loop._check_callback_handle_error(t, v, tb)
watcher.loop.handle_error(None, t, v, tb)
return 1
else:
raise v
......@@ -218,23 +216,62 @@ class _Callbacks(object):
# will just produce some exceptions.
if GEVENT_DEBUG < CRITICAL:
return
import pdb; pdb.set_trace()
_dbg("python_stop: stopping watcher with handle", handle)
watcher = self.from_handle(handle)
watcher.stop()
def python_check_callback(self, watcher_ptr):
# If we have the onerror callback, this is a no-op; all the real
# work to rethrow the exception is done by the onerror callback
# NOTE: Unlike the rest of the functions, this is called with a pointer
# to the C level structure, *not* a pointer to the void* that represents a
# <cdata> for the Python Watcher object.
pass
def python_prepare_callback(self, watcher_ptr):
loop = self._find_loop_from_c_watcher(watcher_ptr)
loop._run_callbacks()
def check_callback_onerror(self, t, v, tb):
watcher_ptr = tb.tb_frame.f_locals['watcher_ptr'] if tb is not None else None
if watcher_ptr:
loop = self._find_loop_from_c_watcher(watcher_ptr)
if loop is not None:
# None as the context argument causes the exception to be raised
# in the main greenlet.
loop.handle_error(None, t, v, tb)
return None
else:
raise v # Let CFFI print
def _find_loop_from_c_watcher(self, watcher_ptr):
raise NotImplementedError()
def assign_standard_callbacks(ffi, lib):
# ns keeps these cdata objects alive at the python level
callbacks = _Callbacks(ffi)
for sig, func in (("int(void* handle, int revents)", callbacks.python_callback),
("void(void* handle, int revents)", callbacks.python_handle_error),
("void(void* handle)", callbacks.python_stop)):
callback = ffi.callback(sig, onerror=callbacks.unhandled_onerror)(func)
def assign_standard_callbacks(ffi, lib, callbacks_class, extras=()): # pylint:disable=unused-argument
# callbacks keeps these cdata objects alive at the python level
callbacks = callbacks_class(ffi)
extras = tuple([(getattr(callbacks, name), error) for name, error in extras])
for (func, error_func) in ((callbacks.python_callback, None),
(callbacks.python_handle_error, None),
(callbacks.python_stop, None),
(callbacks.python_check_callback,
callbacks.check_callback_onerror),
(callbacks.python_prepare_callback,
callbacks.check_callback_onerror)) + extras:
# The name of the callback function matches the 'extern Python' declaration.
error_func = error_func or callbacks.unhandled_onerror
callback = ffi.def_extern(onerror=error_func)(func)
# keep alive the cdata
# (def_extern returns the original function, and it requests that
# the function be "global", so maybe it keeps a hard reference to it somewhere now
# unlike ffi.callback(), and we don't need to do this?)
callbacks.callbacks.append(callback)
# pass to the library C variable
setattr(lib, func.__name__, callback)
# At this point, the library C variable (static function, actually)
# is filled in.
return callbacks
......@@ -252,9 +289,6 @@ else:
_default_loop_destroyed = False
def _loop_callback(ffi, *args, **kwargs):
return ffi.callback(*args, **kwargs)
_NOARGS = ()
......@@ -264,18 +298,17 @@ class AbstractLoop(object):
error_handler = None
_CHECK_POINTER = None
_CHECK_CALLBACK_SIG = None
_TIMER_POINTER = None
_TIMER_CALLBACK_SIG = None
_PREPARE_POINTER = None
_PREPARE_CALLBACK_SIG = None
def __init__(self, ffi, lib, watchers, flags=None, default=None):
self._ffi = ffi
self._lib = lib
self._ptr = None
self._handle_to_self = self._ffi.new_handle(self) # XXX: Reference cycle?
self._watchers = watchers
self._in_callback = False
self._callbacks = []
......@@ -317,22 +350,17 @@ class AbstractLoop(object):
self._ptr = self._init_loop(flags, default)
# self._check is a watcher that runs in each iteration of the
# mainloop, just after the blocking call
self._check = self._ffi.new(self._CHECK_POINTER)
self._check_callback_ffi = _loop_callback(self._ffi,
self._CHECK_CALLBACK_SIG,
self._check_callback,
onerror=self._check_callback_handle_error)
self._check.data = self._handle_to_self
self._init_and_start_check()
# self._prepare is a watcher that runs in each iteration of the mainloop,
# just before the blocking call
self._prepare = self._ffi.new(self._PREPARE_POINTER)
self._prepare_callback_ffi = _loop_callback(self._ffi,
self._PREPARE_CALLBACK_SIG,
self._run_callbacks,
onerror=self._check_callback_handle_error)
self._prepare.data = self._handle_to_self
self._init_and_start_prepare()
# A timer we start and stop on demand. If we have callbacks,
......@@ -343,6 +371,7 @@ class AbstractLoop(object):
# see the "ev_timer" section of the ev manpage (http://linux.die.net/man/3/ev)
# Alternatively, setting the ev maximum block time may also work.
self._timer0 = self._ffi.new(self._TIMER_POINTER)
self._timer0.data = self._handle_to_self
self._init_callback_timer()
# TODO: We may be able to do something nicer and use the existing python_callback
......@@ -372,16 +401,9 @@ class AbstractLoop(object):
raise NotImplementedError()
def _check_callback_handle_error(self, t, v, tb):
# None as the context argument causes the exception to be raised
# in the main greenlet.
self.handle_error(None, t, v, tb)
def _check_callback(self, *args):
# If we have the onerror callback, this is a no-op; all the real
# work to rethrow the exception is done by the onerror callback
pass
def _run_callbacks(self, *_args):
def _run_callbacks(self):
# libuv and libev have different signatures for their prepare/check/timer
# watchers, which is what calls this. We should probably have different methods.
count = 1000
......
......@@ -64,7 +64,8 @@ struct ev_loop {
// Watcher types
// base for all watchers
struct ev_watcher{
GEVENT_STRUCT_DONE _;
void* data;
GEVENT_STRUCT_DONE _;
};
struct ev_io {
......@@ -137,6 +138,9 @@ unsigned int ev_embeddable_backends (void);
ev_tstamp ev_time (void);
void ev_set_syserr_cb(void *);
void ev_set_userdata(struct ev_loop*, void*);
void* ev_userdata(struct ev_loop*);
int ev_priority(void*);
void ev_set_priority(void*, int);
......@@ -212,10 +216,19 @@ void (*gevent_noop)(struct ev_loop *_loop, struct ev_timer *w, int revents);
void ev_sleep (ev_tstamp delay); /* sleep for a while */
/* gevent callbacks */
static int (*python_callback)(void* handle, int revents);
static void (*python_handle_error)(void* handle, int revents);
static void (*python_stop)(void* handle);
/* These will be created as static functions at the end of the
* _source.c and must be declared there too.
*/
extern "Python" {
int python_callback(void* handle, int revents);
void python_handle_error(void* handle, int revents);
void python_stop(void* handle);
void python_check_callback(struct ev_loop*, void*, int);
void python_prepare_callback(struct ev_loop*, void*, int);
// libev specific
void _syserr_cb(char*);
}
/*
* We use a single C callback for every watcher type, which in turn calls the
* Python callbacks. The ev_watcher pointer type can be used for every watcher type
......
......@@ -13,9 +13,10 @@ static void
_gevent_noop(struct ev_loop *_loop, struct ev_timer *w, int revents) { }
void (*gevent_noop)(struct ev_loop *, struct ev_timer *, int) = &_gevent_noop;
static int (*python_callback)(void* handle, int revents);
static void (*python_handle_error)(void* handle, int revents);
static void (*python_stop)(void* handle);
static int python_callback(void* handle, int revents);
static void python_handle_error(void* handle, int revents);
static void python_stop(void* handle);
static void _gevent_generic_callback(struct ev_loop* loop,
struct ev_watcher* watcher,
......
......@@ -42,9 +42,23 @@ else:
#####
from gevent._ffi.loop import AbstractCallbacks
from gevent._ffi.loop import assign_standard_callbacks
_callbacks = assign_standard_callbacks(ffi, libev)
class _Callbacks(AbstractCallbacks):
# pylint:disable=arguments-differ
def python_check_callback(self, _loop, watcher_ptr, _events):
pass
def python_prepare_callback(self, _loop_ptr, watcher_ptr, _events):
AbstractCallbacks.python_prepare_callback(self, watcher_ptr)
def _find_loop_from_c_watcher(self, watcher_ptr):
loop_handle = ffi.cast('struct ev_watcher*', watcher_ptr).data
return self.from_handle(loop_handle)
_callbacks = assign_standard_callbacks(ffi, libev, _Callbacks)
UNDEF = libev.EV_UNDEF
......@@ -206,10 +220,8 @@ class loop(AbstractLoop):
error_handler = None
_CHECK_POINTER = 'struct ev_check *'
_CHECK_CALLBACK_SIG = "void(*)(struct ev_loop *, void*, int)"
_PREPARE_POINTER = 'struct ev_prepare *'
_PREPARE_CALLBACK_SIG = "void(*)(struct ev_loop *, void*, int)"
_TIMER_POINTER = 'struct ev_timer *'
......@@ -240,12 +252,13 @@ class loop(AbstractLoop):
return ptr
def _init_and_start_check(self):
libev.ev_check_init(self._check, self._check_callback_ffi)
libev.ev_check_init(self._check, libev.python_check_callback)
self._check.data = self._handle_to_self
libev.ev_check_start(self._ptr, self._check)
self.unref()
def _init_and_start_prepare(self):
libev.ev_prepare_init(self._prepare, self._prepare_callback_ffi)
libev.ev_prepare_init(self._prepare, libev.python_prepare_callback)
libev.ev_prepare_start(self._ptr, self._prepare)
self.unref()
......@@ -376,7 +389,7 @@ class loop(AbstractLoop):
return self._ptr.activecnt
@ffi.def_extern()
def _syserr_cb(msg):
try:
msg = ffi.string(msg)
......@@ -385,8 +398,6 @@ def _syserr_cb(msg):
set_syserr_cb(None)
raise # let cffi print the traceback
_syserr_cb._cb = ffi.callback("void(*)(char *msg)", _syserr_cb)
def set_syserr_cb(callback):
global __SYSERR_CALLBACK
......@@ -394,7 +405,7 @@ def set_syserr_cb(callback):
libev.ev_set_syserr_cb(ffi.NULL)
__SYSERR_CALLBACK = None
elif callable(callback):
libev.ev_set_syserr_cb(_syserr_cb._cb)
libev.ev_set_syserr_cb(libev._syserr_cb)
__SYSERR_CALLBACK = callback
else:
raise TypeError('Expected callable or None, got %r' % (callback, ))
......
......@@ -341,12 +341,25 @@ int uv_fs_poll_stop(void*);
/* gevent callbacks */
// variables that we fill in. In the case of poll callbacks and fs
// Implemented in Python code as 'def_extern'. In the case of poll callbacks and fs
// callbacks, if *status* is less than 0, it will be passed in the revents
// field. In cases of no extra arguments, revents will be 0.
static int (*python_callback)(void* handle, int revents);
static void (*python_handle_error)(void* handle, int revents);
static void (*python_stop)(void* handle);
// These will be created as static functions at the end of the
// _source.c and must be pre-declared at the top of that file if we
// call them
extern "Python" {
// Standard gevent._ffi.loop callbacks.
int python_callback(void* handle, int revents);
void python_handle_error(void* handle, int revents);
void python_stop(void* handle);
void python_check_callback(void* handle);
void python_prepare_callback(void* handle);
// libuv specific callback
void _uv_close_callback(uv_handle_t* handle);
void python_sigchld_callback(void* handle, int signum);
}
// A variable we fill in.
static void (*gevent_noop)(void* handle);
/*
* We use a single C callback for every watcher type that shares the same signature, which in turn calls the
......
#include "uv.h"
static int (*python_callback)(void* handle, int revents);
static void (*python_handle_error)(void* handle, int revents);
static void (*python_stop)(void* handle);
static int python_callback(void* handle, int revents);
static void python_handle_error(void* handle, int revents);
static void python_stop(void* handle);
static void _gevent_noop(void*handle) {}
......
......@@ -15,6 +15,7 @@ from gevent._compat import PYPY
from gevent._ffi.loop import AbstractLoop
from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error
from gevent._ffi.loop import assign_standard_callbacks
from gevent._ffi.loop import AbstractCallbacks
ffi = _corecffi.ffi
libuv = _corecffi.lib
......@@ -22,7 +23,18 @@ libuv = _corecffi.lib
__all__ = [
]
_callbacks = assign_standard_callbacks(ffi, libuv)
class _Callbacks(AbstractCallbacks):
def _find_loop_from_c_watcher(self, watcher_ptr):
loop_handle = ffi.cast('uv_handle_t*', watcher_ptr).data
return self.from_handle(loop_handle)
def python_sigchld_callback(self, watcher_ptr, _signum):
self.from_handle(ffi.cast('uv_handle_t*', watcher_ptr).data)._sigchld_callback()
_callbacks = assign_standard_callbacks(ffi, libuv, _Callbacks,
[('python_sigchld_callback', None)])
from gevent._ffi.loop import EVENTS
GEVENT_CORE_EVENTS = EVENTS # export
......@@ -71,7 +83,6 @@ class loop(AbstractLoop):
error_handler = None
_CHECK_POINTER = 'uv_check_t *'
_CHECK_CALLBACK_SIG = "void(*)(void*)"
_PREPARE_POINTER = 'uv_prepare_t *'
_PREPARE_CALLBACK_SIG = "void(*)(void*)"
......@@ -107,8 +118,9 @@ class loop(AbstractLoop):
def _init_and_start_check(self):
libuv.uv_check_init(self._ptr, self._check)
libuv.uv_check_start(self._check, self._check_callback_ffi)
libuv.uv_check_start(self._check, libuv.python_check_callback)
libuv.uv_unref(self._check)
_dbg("Started check watcher", ffi.cast('void*', self._check))
# We also have to have an idle watcher to be able to handle
# signals in a timely manner. Without them, libuv won't loop again
......@@ -124,23 +136,24 @@ class loop(AbstractLoop):
# scheduled, this should also be the same and unnecessary?
self._signal_idle = ffi.new("uv_timer_t*")
libuv.uv_timer_init(self._ptr, self._signal_idle)
libuv.uv_timer_start(self._signal_idle, self._check_callback_ffi,
self._signal_idle.data = self._handle_to_self
libuv.uv_timer_start(self._signal_idle, libuv.python_check_callback,
1000,
1000)
libuv.uv_unref(self._signal_idle)
def _run_callbacks(self, handle): # pylint:disable=arguments-differ
def _run_callbacks(self):
# Manually handle fork watchers.
curpid = os.getpid()
if curpid != self._pid:
self._pid = curpid
for watcher in self._fork_watchers:
watcher._on_fork()
super(loop, self)._run_callbacks(handle)
super(loop, self)._run_callbacks()
def _init_and_start_prepare(self):
libuv.uv_prepare_init(self._ptr, self._prepare)
libuv.uv_prepare_start(self._prepare, self._prepare_callback_ffi)
libuv.uv_prepare_start(self._prepare, libuv.python_prepare_callback)
libuv.uv_unref(self._prepare)
def _init_callback_timer(self):
......@@ -197,7 +210,7 @@ class loop(AbstractLoop):
# If we use a 0 duration timer, we can get stuck in a timer loop.
# Python 3.6 fails in test_ftplib.py
libuv.uv_check_start(self._timer0, self._prepare_callback_ffi)
libuv.uv_check_start(self._timer0, libuv.python_prepare_callback)
def _stop_aux_watchers(self):
......@@ -366,10 +379,10 @@ class loop(AbstractLoop):
self._sigchld_watcher = ffi.new('uv_signal_t*')
libuv.uv_signal_init(self._ptr, self._sigchld_watcher)
self._sigchld_callback_ffi = ffi.callback('void(*)(void*, int)',
self.__sigchld_callback)
self._sigchld_watcher.data = self._handle_to_self
libuv.uv_signal_start(self._sigchld_watcher,
self._sigchld_callback_ffi,
libuv.python_sigchld_callback,
signal.SIGCHLD)
def reset_sigchld(self):
......@@ -382,9 +395,9 @@ class loop(AbstractLoop):
# it in install_sigchld?
_watchers.watcher._watcher_ffi_close(self._sigchld_watcher)
del self._sigchld_watcher
del self._sigchld_callback_ffi
def __sigchld_callback(self, _handler, _signum):
def _sigchld_callback(self):
while True:
try:
pid, status, _usage = os.wait3(os.WNOHANG)
......
......@@ -16,7 +16,7 @@ from gevent._ffi import _dbg
_closing_handles = set()
@ffi.callback("void(*)(uv_handle_t*)")
@ffi.def_extern()
def _uv_close_callback(handle):
_closing_handles.remove(handle)
......@@ -120,7 +120,7 @@ class watcher(_base.watcher):
# closed.
_dbg("Closing handle", ffi_watcher, ffi_watcher.type)
_closing_handles.add(ffi_watcher)
libuv.uv_close(ffi_watcher, _uv_close_callback)
libuv.uv_close(ffi_watcher, libuv._uv_close_callback)
ffi_watcher.data = ffi.NULL
......@@ -146,7 +146,12 @@ class watcher(_base.watcher):
def _watcher_ffi_stop(self):
_dbg("Stopping", self, self._watcher_stop)
self._watcher_stop(self._watcher)
if self._watcher:
# The multiplexed io watcher deletes self._watcher
# when it closes down. If that's in the process of
# an error handler, AbstractCallbacks.unhandled_onerror
# will try to close us again.
self._watcher_stop(self._watcher)
_dbg("Stopped", self)
@_base.only_if_watcher
......
......@@ -451,6 +451,8 @@ if PYPY and LIBUV:
else:
LOCAL_TIMEOUT = 1
LARGE_TIMEOUT = max(LOCAL_TIMEOUT, CI_TIMEOUT)
DEFAULT_LOCAL_HOST_ADDR = 'localhost'
DEFAULT_LOCAL_HOST_ADDR6 = DEFAULT_LOCAL_HOST_ADDR
DEFAULT_BIND_ADDR = ''
......
......@@ -299,7 +299,7 @@ TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.082, 0.035, 0.14
class TestPool(greentest.TestCase):
__timeout__ = max(greentest.TestCase.__timeout__, 5)
__timeout__ = greentest.LARGE_TIMEOUT
size = 1
def setUp(self):
......@@ -327,7 +327,7 @@ class TestPool(greentest.TestCase):
def test_async_callback(self):
result = []
res = self.pool.apply_async(sqr, (7, TIMEOUT1,), callback=lambda x: result.append(x))
res = self.pool.apply_async(sqr, (7, TIMEOUT1,), callback=result.append)
get = TimingWrapper(res.get)
self.assertEqual(get(), 49)
self.assertTimeoutAlmostEqual(get.elapsed, TIMEOUT1, 1)
......
......@@ -9,7 +9,6 @@ import os
# Timeouts very flaky on appveyor and PyPy3
_DEFAULT_SOCKET_TIMEOUT = 0.1 if not greentest.EXPECT_POOR_TIMER_RESOLUTION else 1.0
_DEFAULT_TEST_TIMEOUT = 5 if not greentest.EXPECT_POOR_TIMER_RESOLUTION else 10
class SimpleStreamServer(StreamServer):
......@@ -71,7 +70,7 @@ class Settings:
class TestCase(greentest.TestCase):
__timeout__ = _DEFAULT_TEST_TIMEOUT
__timeout__ = greentest.LARGE_TIMEOUT
def cleanup(self):
if getattr(self, 'server', None) is not None:
......
......@@ -2,7 +2,14 @@ from __future__ import print_function
import signal
import greentest
import gevent
import pkg_resources
try:
cffi_version = pkg_resources.get_distribution('cffi').parsed_version
except Exception:
# No cffi installed. Shouldn't happen to gevent standard tests,
# but maybe some downstream distributor removed it.
cffi_version = None
class Expected(Exception):
pass
......@@ -17,7 +24,7 @@ if hasattr(signal, 'SIGALRM'):
class TestSignal(greentest.TestCase):
error_fatal = False
__timeout__ = 5
__timeout__ = greentest.LARGE_TIMEOUT
def test_handler(self):
self.assertRaises(TypeError, gevent.signal, signal.SIGALRM, 1)
......@@ -46,7 +53,9 @@ if hasattr(signal, 'SIGALRM'):
sig.cancel()
@greentest.skipIf(greentest.PY3 and greentest.CFFI_BACKEND and greentest.RUNNING_ON_CI,
@greentest.skipIf((greentest.PY3
and greentest.CFFI_BACKEND
and cffi_version < pkg_resources.parse_version('1.11.3')),
"https://bitbucket.org/cffi/cffi/issues/352/systemerror-returned-a-result-with-an")
@greentest.ignores_leakcheck
def test_reload(self):
......@@ -75,6 +84,8 @@ if hasattr(signal, 'SIGALRM'):
# See https://bitbucket.org/cffi/cffi/issues/352/systemerror-returned-a-result-with-an
# This is fixed in 1.11.3
import gevent.signal # make sure it's in sys.modules pylint:disable=redefined-outer-name
assert gevent.signal
import site
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment