Commit 708e0fd1 authored by Denis Bilenko's avatar Denis Bilenko

core: make io class similar to all other watchers, use the same callback...

core: make io class similar to all other watchers, use the same callback function for all watchers; make loop and watcher classes public

- the way io calls its callback is changed: it used to pass (events, self) always, now it passes *args like the other watchers do.
  in order to find out which events fired, start the watcher as watcher.start(func, EVENTS, *args) - func will be called as func(<EVENTS FIRED>, args)
- do a type check that callback passed to start() or to watcher.callback.__set__ is a callable;
- same for 'args' attribute, except allow it to be None
- declare some of the methods as cpdef for speed when used in C code
- make '_incref' readonly rather than public
- add cdef methods _start() and _feed() because start() and feed() use *args and cannot be "cpdef"
- update modules affect by API change: gevent.select and gevent.server
parent 41d09e7f
static void gevent_handle_error(PyObject* loop, PyObject* where) {
static void gevent_handle_error(struct PyGeventLoopObject* loop, PyObject* where) {
PyThreadState *tstate;
PyObject *type, *value, *traceback, *handler, *result, *tuple;
int reported;
tstate = PyThreadState_GET();
type = tstate->curexc_type;
if (!type)
......@@ -10,106 +9,34 @@ static void gevent_handle_error(PyObject* loop, PyObject* where) {
traceback = tstate->curexc_traceback;
if (!value) value = Py_None;
if (!traceback) traceback = Py_None;
Py_INCREF(type);
Py_INCREF(value);
Py_INCREF(traceback);
reported = 0;
handler = PyObject_GetAttr(loop, __pyx_n_s__handle_error);
if (handler) {
if (handler != Py_None) {
tuple = PyTuple_New(4);
if (tuple) {
reported = 1;
Py_INCREF(where);
PyTuple_SET_ITEM(tuple, 0, where);
PyTuple_SET_ITEM(tuple, 1, type);
PyTuple_SET_ITEM(tuple, 2, value);
PyTuple_SET_ITEM(tuple, 3, traceback);
PyErr_Clear();
result = PyObject_Call(handler, tuple, NULL);
if (result) {
Py_DECREF(result);
}
else {
PyErr_WriteUnraisable(handler);
}
Py_DECREF(tuple);
}
Py_DECREF(handler);
}
}
if (!reported) {
PyErr_WriteUnraisable(loop);
Py_DECREF(type);
Py_DECREF(value);
Py_DECREF(traceback);
}
PyErr_Clear();
}
static inline void gevent_handle_signal_error(PyObject* loop) {
gevent_handle_error(loop, Py_None);
}
/* Calls callback(watcher, revents) and reports errors.
* Returns 1 on success, 0 on failure
* */
static inline int gevent_callback(PyObject* callback, PyObject* watcher, int revents, PyObject* loop) {
int success;
PyObject *py_revents, *tuple, *result;
PyErr_CheckSignals();
if (PyErr_Occurred()) gevent_handle_signal_error(loop);
success = 0;
py_revents = PyInt_FromLong(revents);
if (py_revents) {
tuple = PyTuple_New(2);
if (tuple) {
Py_INCREF(watcher);
PyTuple_SET_ITEM(tuple, 0, watcher);
PyTuple_SET_ITEM(tuple, 1, py_revents);
result = PyObject_Call(callback, tuple, NULL);
if (result) {
success = 1;
Py_DECREF(result);
}
else {
gevent_handle_error(loop, watcher);
}
Py_DECREF(tuple);
}
else {
Py_DECREF(py_revents);
}
}
PyErr_Clear();
return success;
}
/* Calls callback(*args) and reports errors */
static void gevent_callback_simple(PyObject* callback, PyObject* watcher, PyObject* args, PyObject* loop) {
PyObject* result;
PyErr_CheckSignals();
if (PyErr_Occurred()) gevent_handle_signal_error(loop);
result = PyObject_Call(callback, args, NULL);
result = ((struct __pyx_vtabstruct_6gevent_4core_loop *)loop->__pyx_vtab)->handle_error(loop, where, type, value, traceback, 0);
if (result) {
Py_DECREF(result);
}
else {
gevent_handle_error(loop, watcher);
PyErr_Print();
PyErr_Clear();
}
PyErr_Clear();
Py_DECREF(type);
Py_DECREF(value);
Py_DECREF(traceback);
}
static inline void gevent_check_signals(struct PyGeventLoopObject* loop) {
PyErr_CheckSignals();
if (PyErr_Occurred()) gevent_handle_error(loop, Py_None);
}
#define GET_OBJECT(EV_PTR, PY_TYPE, MEMBER) \
((struct PY_TYPE *)(((char *)EV_PTR) - offsetof(struct PY_TYPE, MEMBER)))
......@@ -123,76 +50,88 @@ static void gevent_callback_simple(PyObject* callback, PyObject* watcher, PyObje
#endif
static inline void gevent_stop(PyObject* self) {
static inline void gevent_stop(struct PyGeventTimerObject* watcher) {
PyObject *result, *callable;
callable = PyObject_GetAttr(self, __pyx_n_s__stop);
if (callable) {
result = PyObject_Call(callable, __pyx_empty_tuple, NULL);
if (result) {
Py_DECREF(result);
}
else {
PyErr_WriteUnraisable(callable);
}
Py_DECREF(callable);
result = ((struct __pyx_vtabstruct_6gevent_4core_timer *)watcher->__pyx_vtab)->stop(watcher, 0);
if (result) {
Py_DECREF(result);
}
else {
gevent_handle_error(watcher->loop, watcher);
}
}
#define timer_offsetof offsetof(struct __pyx_obj_6gevent_4core_timer, _watcher)
#define signal_offsetof offsetof(struct __pyx_obj_6gevent_4core_signal, _watcher)
#define idle_offsetof offsetof(struct __pyx_obj_6gevent_4core_idle, _watcher)
#define prepare_offsetof offsetof(struct __pyx_obj_6gevent_4core_prepare, _watcher)
#define callback_offsetof offsetof(struct __pyx_obj_6gevent_4core_callback, _watcher)
#define io_offsetof offsetof(struct PyGeventIOObject, _watcher)
#define timer_offsetof offsetof(struct PyGeventTimerObject, _watcher)
#define signal_offsetof offsetof(struct PyGeventSignalObject, _watcher)
#define idle_offsetof offsetof(struct PyGeventIdleObject, _watcher)
#define prepare_offsetof offsetof(struct PyGeventPrepareObject, _watcher)
#define callback_offsetof offsetof(struct PyGeventCallbackObject, _watcher)
#define CHECK_OFFSETOF (timer_offsetof == signal_offsetof) && (timer_offsetof == idle_offsetof) && (timer_offsetof == prepare_offsetof) && (timer_offsetof == callback_offsetof)
#define CHECK_OFFSETOF (timer_offsetof == signal_offsetof) && (timer_offsetof == idle_offsetof) && (timer_offsetof == prepare_offsetof) && (timer_offsetof == callback_offsetof) && (timer_offsetof == io_offsetof)
static void gevent_simple_callback(struct ev_loop *_loop, void *watcher, int revents) {
char STATIC_ASSERTION__same_offsetof[(CHECK_OFFSETOF)?1:-1];
struct __pyx_obj_6gevent_4core_timer *self;
static void gevent_callback(struct ev_loop *_loop, void *c_watcher, int revents) {
struct PyGeventTimerObject *watcher;
PyObject *result, *py_events;
GIL_ENSURE;
/* we use this callback for all watchers, not just timer
* we can do this, because layout of struct members is the same for all watchers */
self = ((struct __pyx_obj_6gevent_4core_timer *)(((char *)watcher) - timer_offsetof));
Py_INCREF(self);
gevent_callback_simple(self->callback, (PyObject*)self, self->args, (PyObject*)self->loop);
if (!ev_is_active(watcher)) {
gevent_stop((PyObject*)self);
watcher = ((struct PyGeventTimerObject *)(((char *)c_watcher) - timer_offsetof));
Py_INCREF(watcher);
gevent_check_signals(watcher->loop);
if (PyTuple_Size(watcher->args) > 0 && PyTuple_GET_ITEM(watcher->args, 0) == GEVENT_CORE_EVENTS) {
py_events = PyInt_FromLong(revents);
if (py_events) {
Py_DECREF(GEVENT_CORE_EVENTS);
PyTuple_SET_ITEM(watcher->args, 0, py_events);
}
else {
gevent_handle_error(watcher->loop, (PyObject*)watcher);
goto end;
}
}
Py_DECREF(self);
GIL_RELEASE;
}
static void gevent_io_callback(struct ev_loop *loop, struct ev_io *watcher, int revents) {
struct __pyx_obj_6gevent_4core_io *self;
GIL_ENSURE;
self = GET_OBJECT(watcher, __pyx_obj_6gevent_4core_io, _watcher);
Py_INCREF(self);
if (!gevent_callback(self->callback, (PyObject*)self, revents, (PyObject*)self->loop)) {
gevent_stop((PyObject*)self);
else {
py_events = NULL;
}
result = PyObject_Call(watcher->_callback, watcher->args != Py_None ? watcher->args : __pyx_empty_tuple, NULL);
if (result) {
Py_DECREF(result);
}
else {
gevent_handle_error(watcher->loop, (PyObject*)watcher);
if (revents & (EV_READ|EV_WRITE)) {
/* this was an 'io' watcher: not stopping it will likely to cause the failing callback to be called repeatedly */
gevent_stop((PyObject*)watcher);
}
}
Py_DECREF(self);
if (py_events) {
Py_INCREF(GEVENT_CORE_EVENTS);
PyTuple_SET_ITEM(watcher->args, 0, GEVENT_CORE_EVENTS);
}
if (!ev_is_active(c_watcher)) {
/* watcher will never be run again: calling stop() will clear 'callback' and 'args' */
gevent_stop((PyObject*)watcher);
}
end:
Py_DECREF(watcher);
GIL_RELEASE;
}
static void gevent_signal_check(struct ev_loop *_loop, void *watcher, int revents) {
struct __pyx_obj_6gevent_4core_loop *loop;
char STATIC_ASSERTION__same_offsetof[(CHECK_OFFSETOF)?1:-1];
GIL_ENSURE;
PyErr_CheckSignals();
loop = GET_OBJECT(watcher, __pyx_obj_6gevent_4core_loop, _signal_checker);
if (PyErr_Occurred()) gevent_handle_signal_error((PyObject*)loop);
gevent_check_signals(GET_OBJECT(watcher, PyGeventLoopObject, _signal_checker));
GIL_RELEASE;
}
#if defined(GEVENT_WINDOWS)
#if defined(_WIN32)
static void gevent_periodic_signal_check(struct ev_loop *_loop, void *watcher, int revents) {
struct __pyx_obj_6gevent_4core_loop *loop;
GIL_ENSURE;
PyErr_CheckSignals();
loop = GET_OBJECT(watcher, __pyx_obj_6gevent_4core_loop, _periodic_signal_checker);
if (PyErr_Occurred()) gevent_handle_signal_error((PyObject*)loop);
gevent_check_signals(GET_OBJECT(watcher, PyGeventLoopObject, _periodic_signal_checker));
GIL_RELEASE;
}
......
static void gevent_io_callback(struct ev_loop*, struct ev_io*, int);
static void gevent_simple_callback(struct ev_loop *, void *, int);
static void gevent_callback(struct ev_loop *, void *, int);
static void gevent_signal_check(struct ev_loop *, void *, int);
#if defined(GEVENT_WINDOWS)
#if defined(_WIN32)
static void gevent_periodic_signal_check(struct ev_loop *, void *, int);
#endif
`#' DO NOT EDIT -- this file is auto generated from __file__ on syscmd(date)
cimport cython
cimport libev
......@@ -11,11 +13,12 @@ __all__ = ['get_version',
cdef extern from "Python.h":
void Py_INCREF(void* o)
void Py_DECREF(void* o)
void Py_XDECREF(void* o)
int Py_ReprEnter(void* o)
void Py_ReprLeave(void* o)
void Py_INCREF(void*)
void Py_DECREF(void*)
void Py_XDECREF(void*)
int Py_ReprEnter(void*)
void Py_ReprLeave(void*)
int PyCallable_Check(void*)
cdef extern from "frameobject.h":
ctypedef struct PyThreadState:
......@@ -25,15 +28,14 @@ cdef extern from "frameobject.h":
PyThreadState* PyThreadState_GET()
cdef extern from "callbacks.h":
void gevent_io_callback(libev.ev_loop, libev.ev_io, int)
void gevent_simple_callback(libev.ev_loop, void*, int)
void gevent_callback(libev.ev_loop, void*, int)
void gevent_signal_check(libev.ev_loop, void*, int)
void gevent_periodic_signal_check(libev.ev_loop, void*, int)
cdef extern from *:
int FD_SETSIZE
int _open_osfhandle(int, int)
cdef void IFDEF_WINDOWS "#if defined(GEVENT_WINDOWS) //" ()
cdef void IFDEF_WINDOWS "#if defined(_WIN32) //" ()
cdef void IFDEF_EV_STANDALONE "#if defined(EV_STANDALONE) //" ()
cdef void ENDIF "#endif //" ()
int ioctlsocket(int, int, unsigned long*)
int FIONREAD
......@@ -75,6 +77,26 @@ SIGNALFD = libev.EVFLAG_SIGNALFD
NOSIGMASK = libev.EVFLAG_NOSIGMASK
@cython.internal
cdef class EVENTSType:
"""A special object to pass to watcher.start which gets replaced by *events* that fired.
For example, if watcher is started as:
>>> io = loop.io(1, READ|WRITE)
>>> io.start(callback, EVENTS, 'hello')
Then the callback will be called with 2 arguments:
1) integer representing the event fired (READ, WRITE, READ|WRITE)
2) 'hello'
"""
def __repr__(self):
return 'gevent.core.EVENTS'
cdef public object GEVENT_CORE_EVENTS = EVENTSType()
EVENTS = GEVENT_CORE_EVENTS
def get_version():
return 'libev-%d.%02d' % (libev.ev_version_major(), libev.ev_version_minor())
......@@ -177,7 +199,7 @@ def time(self):
return libev.ev_time()
cdef class loop:
cdef public class loop [object PyGeventLoopObject, type PyGeventLoop_Type]:
cdef libev.ev_loop* _ptr
cdef public object error_handler
cdef libev.ev_prepare _signal_checker
......@@ -312,27 +334,29 @@ cdef class loop:
return value
return backend
property activecnt: # XXX only available if we embed libev
property activecnt:
def __get__(self):
IFDEF_EV_STANDALONE()
return self._ptr.activecnt
ENDIF()
def io(self, int fd, int events):
cpdef io(self, int fd, int events):
return io(self, fd, events)
def timer(self, double after, double repeat=0.0):
cpdef timer(self, double after, double repeat=0.0):
return timer(self, after, repeat)
def signal(self, int signum):
cpdef signal(self, int signum):
return signal(self, signum)
def idle(self):
cpdef idle(self):
return idle(self)
def prepare(self):
cpdef prepare(self):
return prepare(self)
def callback(self):
cpdef callback(self):
return callback(self)
def run_callback(self, func, *args):
......@@ -347,14 +371,27 @@ define(INCREF, ``if self._incref == 0:
define(WATCHER_BASE, `cdef public loop loop
cdef public object callback
cdef public object args
cdef public int _incref # 1 - increfed, 0 - not increfed
cdef object _callback
cdef public tuple args
cdef readonly int _incref # 1 - increfed, 0 - not increfed
cdef libev.ev_$1 _watcher
def stop(self):
property callback:
def __get__(self):
return self._callback
def __set__(self, object callback):
if not PyCallable_Check(<void*>callback):
raise TypeError("Expected callable, not %r" % callback)
self._callback = callback
def __del__(self):
self._callback = None
cpdef stop(self):
libev.ev_$1_stop(self.loop._ptr, &self._watcher)
self.callback = None
self._callback = None
self.args = None
if self._incref == 1:
Py_DECREF(<void*>self)
......@@ -376,6 +413,12 @@ define(WATCHER_BASE, `cdef public loop loop
libev.ev_set_priority(&self._watcher, priority)
def feed(self, int revents, object callback, *args):
self.callback = callback
self.args = args
libev.ev_feed_event(self.loop._ptr, &self._watcher, revents)
INCREF
cdef _feed(self, int revents, object callback, tuple args=()):
self.callback = callback
self.args = args
libev.ev_feed_event(self.loop._ptr, &self._watcher, revents)
......@@ -389,6 +432,12 @@ define(ACTIVE, `property active:
define(START, `def start(self, object callback, *args):
self.callback = callback
self.args = args
libev.ev_$1_start(self.loop._ptr, &self._watcher)
INCREF
cdef _start(self, object callback, tuple args=()):
self.callback = callback
self.args = args
libev.ev_$1_start(self.loop._ptr, &self._watcher)
......@@ -403,12 +452,12 @@ define(WATCHER, `WATCHER_BASE($1)
define(INIT, `def __init__(self, loop loop$2):
libev.ev_$1_init(&self._watcher, <void *>gevent_simple_callback$3)
libev.ev_$1_init(&self._watcher, <void *>gevent_callback$3)
self.loop = loop
self._incref = 0')
cdef class watcher:
cdef public class watcher [object PyGeventWatcherObject, type PyGeventWatcher_Type]:
"""Abstract base class for all the watchers"""
def __repr__(self):
......@@ -452,7 +501,7 @@ cdef int _open_osfhandle(int handle) except -1:
return fd
cdef class io(watcher):
cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:
WATCHER(io)
......@@ -460,7 +509,7 @@ cdef class io(watcher):
IFDEF_WINDOWS()
fd = _open_osfhandle(fd)
ENDIF()
libev.ev_io_init(&self._watcher, <void *>gevent_io_callback, fd, events)
libev.ev_io_init(&self._watcher, <void *>gevent_callback, fd, events)
self.loop = loop
self._incref = 0
......@@ -472,7 +521,7 @@ cdef class io(watcher):
def __set__(self, int fd):
if libev.ev_is_active(&self._watcher):
raise AttributeError("'io' watcher attribute 'fd' is read-only while watcher is active")
libev.ev_io_init(&self._watcher, <void *>gevent_io_callback, fd, self._watcher.events)
libev.ev_io_init(&self._watcher, <void *>gevent_callback, fd, self._watcher.events)
property events:
......@@ -482,7 +531,7 @@ cdef class io(watcher):
def __set__(self, int events):
if libev.ev_is_active(&self._watcher):
raise AttributeError("'io' watcher attribute 'events' is read-only while watcher is active")
libev.ev_io_init(&self._watcher, <void *>gevent_io_callback, self._watcher.fd, events)
libev.ev_io_init(&self._watcher, <void *>gevent_callback, self._watcher.fd, events)
property events_str:
......@@ -493,7 +542,7 @@ cdef class io(watcher):
return ' fd=%s events=%s' % (self._watcher.fd, self.events_str)
cdef class timer(watcher):
cdef public class timer(watcher) [object PyGeventTimerObject, type PyGeventTimer_Type]:
WATCHER(timer)
......@@ -505,28 +554,28 @@ cdef class timer(watcher):
return self._watcher.at
cdef class signal(watcher):
cdef public class signal(watcher) [object PyGeventSignalObject, type PyGeventSignal_Type]:
WATCHER(signal)
INIT(signal, ``, int signalnum'', ``, signalnum'')
cdef class idle(watcher):
cdef public class idle(watcher) [object PyGeventIdleObject, type PyGeventIdle_Type]:
WATCHER(idle)
INIT(idle)
cdef class prepare(watcher):
cdef public class prepare(watcher) [object PyGeventPrepareObject, type PyGeventPrepare_Type]:
WATCHER(prepare)
INIT(prepare)
cdef class callback(watcher):
cdef public class callback(watcher) [object PyGeventCallbackObject, type PyGeventCallback_Type]:
"""Pseudo-watcher used to execute a callback in the loop as soon as possible."""
# does not matter which type we actually use, since we are going to feed() events, not start watchers
......@@ -540,6 +589,12 @@ cdef class callback(watcher):
libev.ev_feed_event(self.loop._ptr, &self._watcher, libev.EV_CUSTOM)
INCREF
cdef _start(self, object callback, tuple args=()):
self.callback = callback
self.args = args
libev.ev_feed_event(self.loop._ptr, &self._watcher, libev.EV_CUSTOM)
INCREF
property active:
def __get__(self):
......
......@@ -2,7 +2,7 @@
from gevent.timeout import Timeout
from gevent.event import Event
from gevent.core import MAXPRI
from gevent.core import MAXPRI, READ, WRITE, EVENTS
from gevent.hub import get_hub
__implements__ = ['select']
......@@ -32,12 +32,12 @@ class SelectResult(object):
self.write = []
self.event = Event()
def update(self, watcher, event):
if event & 1:
self.read.append(watcher.args[0])
def update(self, events, socket):
if events & READ:
self.read.append(socket)
self.event.set()
elif event & 2:
self.write.append(watcher.args[0])
elif events & WRITE:
self.write.append(socket)
self.event.set()
......@@ -55,12 +55,12 @@ def select(rlist, wlist, xlist, timeout=None):
for readfd in rlist:
watcher = io(get_fileno(readfd), 1)
watcher.priority = MAXPRI
watcher.start(result.update, readfd)
watcher.start(result.update, EVENTS, readfd)
watchers.append(watcher)
for writefd in wlist:
watcher = io(get_fileno(writefd), 2)
watcher.priority = MAXPRI
watcher.start(result.update, writefd)
watcher.start(result.update, EVENTS, writefd)
watchers.append(watcher)
except IOError, ex:
raise error(*ex.args)
......
......@@ -102,8 +102,7 @@ class StreamServer(BaseServer):
self._start_accepting_timer.stop()
self._start_accepting_timer = None
def _do_accept(self, event, _evtype):
assert event is self._accept_event
def _do_accept(self):
address = None
try:
if self.full():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment