Commit 7e5973a9 authored by Denis Bilenko's avatar Denis Bilenko

major change: use libev for an event loop, dns, http and wsgi modules currently don't work

- core module gets a completely new interface, incompatible with gevent-0.13
- SystemExit, KeyboardInterrupt and SystemExit are now re-raised in the main greenlet
- Python's signal module works

__init__.py:
  - add get_hub to __all__
  - remove shutdown()

get_hub() now accepts arguments to pass the loop (e.g. get_hub('select'))

Hub class:

  - new methods: handle_error, wait, cancel_wait, join
  - removed method: shutdown (replaced by join())

Timeout class:

  - allow passing string as exception

coros, event, greenlet:

  - use hub's handle_error to report_error rather than traceback module
  - save hub as an attribute rather than calling get_hub() all the time

--HG--
rename : gevent/core.pyx => gevent/core_.pyx
parent b145eef6
......@@ -9,7 +9,8 @@ See http://www.gevent.org/ for the documentation.
version_info = (0, 14, 0)
__version__ = '0.14.0dev'
__all__ = ['Greenlet',
__all__ = ['get_hub',
'Greenlet',
'GreenletExit',
'spawn',
'spawn_later',
......@@ -26,7 +27,6 @@ __all__ = ['Greenlet',
'kill',
'signal',
'fork',
'shutdown',
'core',
'reinit']
......@@ -45,13 +45,13 @@ spawn_link = Greenlet.spawn_link
spawn_link_value = Greenlet.spawn_link_value
spawn_link_exception = Greenlet.spawn_link_exception
from gevent.timeout import Timeout, with_timeout
from gevent.hub import getcurrent, GreenletExit, spawn_raw, sleep, kill, signal, shutdown
from gevent.hub import getcurrent, GreenletExit, spawn_raw, sleep, kill, signal
try:
from gevent.hub import fork
except ImportError:
__all__.remove('fork')
from gevent.hub import get_hub
def reinit():
from gevent.hub import get_hub
return get_hub().reinit()
return get_hub().loop.fork()
static void handle_error(PyObject* loop, PyObject* arg) {
PyThreadState *tstate;
PyObject *type, *value, *traceback, *handler, *result;
int reported;
if (!loop)
return;
tstate = PyThreadState_GET();
type = tstate->curexc_type;
if (!type)
return;
value = tstate->curexc_value;
traceback = tstate->curexc_traceback;
if (!value) value = Py_None;
if (!traceback) traceback = Py_None;
Py_INCREF(type);
Py_INCREF(value);
Py_INCREF(traceback);
reported = 0;
handler = PyObject_GetAttrString(loop, "handle_error");
if (handler) {
if (handler != Py_None) {
PyObject* tuple;
if (arg) {
tuple = PyTuple_New(4);
}
else {
tuple = PyTuple_New(3);
}
if (tuple) {
reported = 1;
if (arg) {
Py_INCREF(arg);
PyTuple_SET_ITEM(tuple, 0, arg);
PyTuple_SET_ITEM(tuple, 1, type);
PyTuple_SET_ITEM(tuple, 2, value);
PyTuple_SET_ITEM(tuple, 3, traceback);
}
else {
PyTuple_SET_ITEM(tuple, 0, type);
PyTuple_SET_ITEM(tuple, 1, value);
PyTuple_SET_ITEM(tuple, 2, traceback);
}
PyErr_Clear();
result = PyObject_Call(handler, tuple, NULL);
if (result) {
Py_DECREF(result);
}
else {
PyErr_WriteUnraisable(handler);
}
Py_DECREF(tuple);
}
Py_DECREF(handler);
}
}
if (!reported) {
PyErr_WriteUnraisable(loop);
Py_DECREF(type);
Py_DECREF(value);
Py_DECREF(traceback);
}
PyErr_Clear();
}
static inline void handle_signal_error(PyObject* loop) {
Py_INCREF(Py_None);
handle_error(loop, Py_None);
Py_DECREF(Py_None);
}
/* Calls callback(watcher, revents) and reports errors.
* Returns 1 on success, 0 on failure
* */
static inline int _callback(PyObject* callback, PyObject* watcher, int revents, PyObject* loop) {
int success;
PyObject *py_revents, *tuple, *result;
PyErr_CheckSignals();
if (PyErr_Occurred()) handle_signal_error(loop);
success = 0;
py_revents = PyInt_FromLong(revents);
if (py_revents) {
tuple = PyTuple_New(2);
if (tuple) {
Py_INCREF(watcher);
PyTuple_SET_ITEM(tuple, 0, watcher);
PyTuple_SET_ITEM(tuple, 1, py_revents);
result = PyObject_Call(callback, tuple, NULL);
if (result) {
success = 1;
Py_DECREF(result);
}
else {
handle_error(loop, watcher);
}
Py_DECREF(tuple);
}
else {
Py_DECREF(py_revents);
}
}
PyErr_Clear();
return success;
}
/* Calls callback(*args) and reports errors */
static void _callback_simple(PyObject* callback, PyObject* watcher, PyObject* args, PyObject* loop) {
PyObject* result;
PyErr_CheckSignals();
if (PyErr_Occurred()) handle_signal_error(loop);
result = PyObject_Call(callback, args, NULL);
if (result) {
Py_DECREF(result);
}
else {
handle_error(loop, watcher);
}
PyErr_Clear();
}
#define GET_OBJECT(EV_PTR, PY_TYPE, MEMBER) \
((struct PY_TYPE *)(((char *)EV_PTR) - offsetof(struct PY_TYPE, MEMBER)))
#ifdef WITH_THREAD
#define GIL_ENSURE PyGILState_STATE ___save = PyGILState_Ensure();
#define GIL_RELEASE PyGILState_Release(___save);
#else
#define GIL_ENSURE
#define GIL_RELEASE
#endif
static inline void stop(PyObject* self) {
PyObject *result, *callable;
callable = PyObject_GetAttrString(self, "stop");
if (callable) {
result = PyObject_Call(callable, __pyx_empty_tuple, NULL);
if (result) {
Py_DECREF(result);
}
else {
PyErr_WriteUnraisable(callable);
}
Py_DECREF(callable);
}
}
#define timer_offsetof offsetof(struct __pyx_obj_6gevent_4core_timer, _watcher)
#define signal_offsetof offsetof(struct __pyx_obj_6gevent_4core_signal, _watcher)
#define idle_offsetof offsetof(struct __pyx_obj_6gevent_4core_idle, _watcher)
#define prepare_offsetof offsetof(struct __pyx_obj_6gevent_4core_prepare, _watcher)
#define callback_offsetof offsetof(struct __pyx_obj_6gevent_4core_callback, _watcher)
#define CHECK_OFFSETOF (timer_offsetof == signal_offsetof) && (timer_offsetof == idle_offsetof) && (timer_offsetof == prepare_offsetof) && (timer_offsetof == callback_offsetof)
static void simple_callback(struct ev_loop *_loop, void *watcher, int revents) {
char STATIC_ASSERTION__same_offsetof[(CHECK_OFFSETOF)?1:-1];
struct __pyx_obj_6gevent_4core_timer *self;
GIL_ENSURE;
/* we use this callback for all watchers, not just timer
* we can do this, because layout of struct members is the same for all watchers */
self = ((struct __pyx_obj_6gevent_4core_timer *)(((char *)watcher) - timer_offsetof));
Py_INCREF(self);
_callback_simple(self->callback, (PyObject*)self, self->args, (PyObject*)self->loop);
if (!ev_active(watcher)) {
stop((PyObject*)self);
}
Py_DECREF(self);
GIL_RELEASE;
}
static void io_callback(struct ev_loop *loop, struct ev_io *watcher, int revents) {
struct __pyx_obj_6gevent_4core_io *self;
GIL_ENSURE;
self = GET_OBJECT(watcher, __pyx_obj_6gevent_4core_io, _watcher);
Py_INCREF(self);
if (!_callback(self->callback, (PyObject*)self, revents, (PyObject*)self->loop)) {
stop((PyObject*)self);
}
Py_DECREF(self);
GIL_RELEASE;
}
static void signal_check(struct ev_loop *_loop, void *watcher, int revents) {
struct __pyx_obj_6gevent_4core_loop *loop;
GIL_ENSURE;
PyErr_CheckSignals();
loop = GET_OBJECT(watcher, __pyx_obj_6gevent_4core_loop, _signal_checker);
if (PyErr_Occurred()) handle_signal_error((PyObject*)loop);
GIL_RELEASE;
}
static void periodic_signal_check(struct ev_loop *_loop, void *watcher, int revents) {
struct __pyx_obj_6gevent_4core_loop *loop;
GIL_ENSURE;
PyErr_CheckSignals();
loop = GET_OBJECT(watcher, __pyx_obj_6gevent_4core_loop, _periodic_signal_checker);
if (PyErr_Occurred()) handle_signal_error((PyObject*)loop);
GIL_RELEASE;
}
static void io_callback(struct ev_loop*, struct ev_io*, int);
static void simple_callback(struct ev_loop *, void *, int);
static void signal_check(struct ev_loop *, void *, int);
static void periodic_signal_check(struct ev_loop *, void *, int);
This diff is collapsed.
This diff is collapsed.
......@@ -2,7 +2,6 @@
# Copyright (c) 2009-2010 Denis Bilenko. See LICENSE for details.
import sys
import traceback
from gevent.hub import get_hub, getcurrent
from gevent.timeout import Timeout
......@@ -22,7 +21,8 @@ class Semaphore(object):
raise ValueError("semaphore initial value must be >= 0")
self._links = []
self.counter = value
self._notifier = None
self.hub = get_hub()
self._notifier = self.hub.loop.callback()
def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._links))
......@@ -33,25 +33,19 @@ class Semaphore(object):
def release(self):
self.counter += 1
if self._links and self.counter > 0 and self._notifier is None:
self._notifier = get_hub().reactor.active_event(self._notify_links, list(self._links))
if self._links and self.counter > 0 and not self._notifier.active:
self._notifier.start(self._notify_links, list(self._links))
# XXX so what if there was another release() with different self._links? it would be ignored then?
def _notify_links(self, links):
try:
for link in links:
if self.counter <= 0:
return
if link in self._links:
try:
link(self)
except:
traceback.print_exc()
try:
sys.stderr.write('Failed to notify link %r of %r\n\n' % (link, self))
except:
traceback.print_exc()
finally:
self._notifier = None
for link in links:
if self.counter <= 0:
return
if link in self._links:
try:
link(self)
except:
self.hub.handle_error((link, self), *sys.exc_info())
def rawlink(self, callback):
"""Register a callback to call when a counter is more than zero.
......@@ -62,8 +56,8 @@ class Semaphore(object):
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.append(callback)
if self.counter > 0 and self._notifier is None:
self._notifier = get_hub().reactor.active_event(self._notify_links, list(self._links))
if self.counter > 0 and not self._notifier.active:
self._notifier.start(self._notify_links, list(self._links))
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
......@@ -82,7 +76,7 @@ class Semaphore(object):
timer = Timeout.start_new(timeout)
try:
try:
result = get_hub().switch()
result = self.hub.switch()
assert result is self, 'Invalid switch into Semaphore.wait(): %r' % (result, )
except Timeout, ex:
if ex is not timer:
......@@ -106,7 +100,7 @@ class Semaphore(object):
timer = Timeout.start_new(timeout)
try:
try:
result = get_hub().switch()
result = self.hub.switch()
assert result is self, 'Invalid switch into Semaphore.acquire(): %r' % (result, )
except Timeout, ex:
if ex is timer:
......
......@@ -2,7 +2,6 @@
"""Basic synchronization primitives: Event and AsyncResult"""
import sys
import traceback
from gevent.hub import get_hub, getcurrent, _NONE
from gevent.timeout import Timeout
......@@ -19,8 +18,11 @@ class Event(object):
"""
def __init__(self):
self._links = []
self._links = set()
self._todo = set()
self._flag = False
self.hub = get_hub()
self._notifier = self.hub.loop.callback()
def __str__(self):
return '<%s %s>' % (self.__class__.__name__, (self._flag and 'set') or 'clear')
......@@ -37,9 +39,10 @@ class Event(object):
Greenlets that call :meth:`wait` once the flag is true will not block at all.
"""
self._flag = True
if self._links:
self._todo.update(self._links)
if self._todo and not self._notifier.active:
# schedule a job to notify the links already set
get_hub().reactor.active_event(self._notify_links, list(self._links))
self._notifier.start(self._notify_links)
def clear(self):
"""Reset the internal flag to false.
......@@ -69,7 +72,7 @@ class Event(object):
timer = Timeout.start_new(timeout)
try:
try:
result = get_hub().switch()
result = self.hub.switch()
assert result is self, 'Invalid switch into Event.wait(): %r' % (result, )
except Timeout, ex:
if ex is not timer:
......@@ -88,9 +91,10 @@ class Event(object):
"""
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.append(callback)
if self._flag:
get_hub().reactor.active_event(self._notify_links, list(self._links)) # XXX just pass [callback]
self._links.add(callback)
if self._flag and not self._notifier.active:
self._todo.add(callback)
self._notifier.start(self._notify_links)
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
......@@ -99,18 +103,14 @@ class Event(object):
except ValueError:
pass
def _notify_links(self, links):
assert getcurrent() is get_hub()
for link in links:
def _notify_links(self):
while self._todo:
link = self._todo.pop()
if link in self._links: # check that link was not notified yet and was not removed by the client
try:
link(self)
except:
traceback.print_exc()
try:
sys.stderr.write('Failed to notify link %r of %r\n\n' % (link, self))
except:
traceback.print_exc()
self.hub.handle_error((link, self), *sys.exc_info())
class AsyncResult(object):
......@@ -150,7 +150,8 @@ class AsyncResult(object):
self._links = set()
self.value = None
self._exception = _NONE
self._notifier = None
self.hub = get_hub()
self._notifier = self.hub.loop.callback()
def ready(self):
"""Return true if and only if it holds a value or an exception"""
......@@ -175,8 +176,8 @@ class AsyncResult(object):
"""
self.value = value
self._exception = None
if self._links and self._notifier is None:
self._notifier = get_hub().reactor.active_event(self._notify_links)
if self._links and not self._notifier.active:
self._notifier.start(self._notify_links)
def set_exception(self, exception):
"""Store the exception. Wake up the waiters.
......@@ -185,8 +186,8 @@ class AsyncResult(object):
Sequential calls to :meth:`wait` and :meth:`get` will not block at all.
"""
self._exception = exception
if self._links and self._notifier is None:
self._notifier = get_hub().reactor.active_event(self._notify_links)
if self._links and not self._notifier.active:
self._notifier.start(self._notify_links)
def get(self, block=True, timeout=None):
"""Return the stored value or raise the exception.
......@@ -209,7 +210,7 @@ class AsyncResult(object):
try:
timer = Timeout.start_new(timeout)
try:
result = get_hub().switch()
result = self.hub.switch()
assert result is self, 'Invalid switch into AsyncResult.get(): %r' % (result, )
finally:
timer.cancel()
......@@ -250,7 +251,7 @@ class AsyncResult(object):
try:
timer = Timeout.start_new(timeout)
try:
result = get_hub().switch()
result = self.hub.switch()
assert result is self, 'Invalid switch into AsyncResult.wait(): %r' % (result, )
finally:
timer.cancel()
......@@ -266,20 +267,12 @@ class AsyncResult(object):
return self.value
def _notify_links(self):
try:
assert getcurrent() is get_hub()
while self._links:
link = self._links.pop()
try:
link(self)
except:
traceback.print_exc()
try:
sys.stderr.write('Failed to notify link %r of %r\n\n' % (link, self))
except:
traceback.print_exc()
finally:
self._notifier = None
while self._links:
link = self._links.pop()
try:
link(self)
except:
self.hub.handle_error((link, self), *sys.exc_info())
def rawlink(self, callback):
"""Register a callback to call when a value or an exception is set.
......@@ -290,8 +283,8 @@ class AsyncResult(object):
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.add(callback)
if self.ready() and self._notifier is None:
self._notifier = get_hub().reactor.active_event(self._notify_links)
if self.ready() and not self._notifier.active:
self._notifier.start(self._notify_links)
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
......
......@@ -142,12 +142,13 @@ class Greenlet(greenlet):
self._links = set()
self.value = None
self._exception = _NONE
self._notifier = None
self._start_event = None
loop = self.parent.loop
self._notifier = loop.callback_ref()
self._start_event = loop.callback_ref()
@property
def started(self):
return self._start_event is not None or bool(self)
return self._start_event.pending or bool(self)
def ready(self):
"""Return true if and only if the greenlet has finished execution."""
......@@ -206,9 +207,7 @@ class Greenlet(greenlet):
a) cancel the event that will start it
b) fire the notifications as if an exception was raised in a greenlet
"""
if self._start_event is not None:
self._start_event.cancel()
self._start_event = None
self._start_event.stop()
try:
greenlet.throw(self, *args)
finally:
......@@ -231,12 +230,13 @@ class Greenlet(greenlet):
def start(self):
"""Schedule the greenlet to run in this loop iteration"""
assert not self.started, 'Greenlet already started'
self._start_event = get_hub().reactor.active_event(self.switch)
self._start_event.start(self.switch)
def start_later(self, seconds):
"""Schedule the greenlet to run in the future loop iteration *seconds* later"""
assert not self.started, 'Greenlet already started'
self._start_event = get_hub().reactor.timer(seconds, self.switch)
self._start_event = self.parent.loop.timer_ref(seconds)
self._start_event.start(self.switch)
@classmethod
def spawn(cls, *args, **kwargs):
......@@ -286,12 +286,10 @@ class Greenlet(greenlet):
`Changed in version 0.13.0:` *block* is now ``True`` by default.
"""
if self._start_event is not None:
self._start_event.cancel()
self._start_event = None
self._start_event.stop()
if not self.dead:
waiter = Waiter()
get_hub().reactor.active_event(_kill, self, exception, waiter)
self.parent.loop.run_callback(_kill, self, exception, waiter)
if block:
waiter.get()
self.join(timeout)
......@@ -363,33 +361,24 @@ class Greenlet(greenlet):
def _report_result(self, result):
self._exception = None
self.value = result
if self._links and self._notifier is None:
self._notifier = get_hub().reactor.active_event(self._notify_links)
if self._links and not self._notifier.active:
self._notifier.start(self._notify_links)
def _report_error(self, exc_info):
exception = exc_info[1]
if isinstance(exception, GreenletExit):
self._report_result(exception)
return
try:
traceback.print_exception(*exc_info)
except:
pass
self._exception = exception
if self._links and self._notifier is None:
self._notifier = get_hub().reactor.active_event(self._notify_links)
if self._links and not self._notifier.active:
self._notifier.start(self._notify_links)
info = str(self) + ' failed with '
try:
info += self._exception.__class__.__name__
except Exception:
info += str(self._exception) or repr(self._exception)
sys.stderr.write(info + '\n\n')
self.parent.handle_error(self, *exc_info)
def run(self):
try:
self._start_event = None
self._start_event.stop()
try:
result = self._run(*self.args, **self.kwargs)
except:
......@@ -409,8 +398,8 @@ class Greenlet(greenlet):
if not callable(callback):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.add(callback)
if self.ready() and self._notifier is None:
self._notifier = get_hub().reactor.active_event(self._notify_links)
if self.ready() and not self._notifier.active:
self._notifier.start(self._notify_links)
def link(self, receiver=None, GreenletLink=GreenletLink, SpawnedLink=SpawnedLink):
"""Link greenlet's completion to callable or another greenlet.
......@@ -461,26 +450,19 @@ class Greenlet(greenlet):
self.link(receiver=receiver, GreenletLink=GreenletLink, SpawnedLink=SpawnedLink)
def _notify_links(self):
try:
while self._links:
link = self._links.pop()
try:
link(self)
except:
traceback.print_exc()
try:
sys.stderr.write('Failed to notify link %s of %r\n\n' % (getfuncname(link), self))
except:
traceback.print_exc()
finally:
self._notifier = None
while self._links:
link = self._links.pop()
try:
link(self)
except:
self.parent.handle_error((link, self), *sys.exc_info())
def _kill(greenlet, exception, waiter):
try:
greenlet.throw(exception)
except:
traceback.print_exc()
greenlet.parent.handle_error(greenlet, *sys.exc_info())
waiter.switch()
......@@ -517,7 +499,7 @@ def _killall3(greenlets, exception, waiter):
try:
g.throw(exception)
except:
traceback.print_exc()
g.parent.handle_error(g, *sys.exc_info())
if not g.dead:
diehards.append(g)
waiter.switch(diehards)
......@@ -529,23 +511,26 @@ def _killall(greenlets, exception):
try:
g.throw(exception)
except:
traceback.print_exc()
g.parent.handle_error(g, *sys.exc_info())
def killall(greenlets, exception=GreenletExit, block=True, timeout=None):
if not greenlets:
return
loop = greenlets[0].loop
if block:
waiter = Waiter()
get_hub().reactor.active_event(_killall3, greenlets, exception, waiter)
if block:
t = Timeout.start_new(timeout)
try:
alive = waiter.get()
if alive:
joinall(alive, raise_error=False)
finally:
t.cancel()
x = loop.callback()
x.start(_killall3, greenlets, exception, waiter)
t = Timeout.start_new(timeout)
try:
alive = waiter.get()
if alive:
joinall(alive, raise_error=False)
finally:
t.cancel()
else:
get_hub().reactor.active_event(_killall, greenlets, exception)
loop.run_callback(_killall, greenlets, exception)
class LinkedExited(Exception):
......
This diff is collapsed.
cdef extern from "ev.c":
int EV_MINPRI
int EV_MAXPRI
int EV_VERSION_MAJOR
int EV_VERSION_MINOR
int EV_UNDEF
int EV_NONE
int EV_READ
int EV_WRITE
int EV__IOFDSET
int EV_TIMER
int EV_PERIODIC
int EV_SIGNAL
int EV_CHILD
int EV_STAT
int EV_IDLE
int EV_PREPARE
int EV_CHECK
int EV_EMBED
int EV_FORK
int EV_CLEANUP
int EV_ASYNC
int EV_CUSTOM
int EV_ERROR
int EVFLAG_AUTO
int EVFLAG_NOENV
int EVFLAG_FORKCHECK
int EVFLAG_NOINOTIFY
int EVFLAG_SIGNALFD
int EVFLAG_NOSIGMASK
int EVBACKEND_SELECT
int EVBACKEND_POLL
int EVBACKEND_EPOLL
int EVBACKEND_KQUEUE
int EVBACKEND_DEVPOLL
int EVBACKEND_PORT
int EVBACKEND_IOCP
int EVBACKEND_ALL
int EVBACKEND_MASK
int EVRUN_NOWAIT
int EVRUN_ONCE
int EVBREAK_CANCEL
int EVBREAK_ONE
int EVBREAK_ALL
struct ev_loop:
int activecnt
struct ev_io:
int fd
int events
struct ev_timer:
double at
struct ev_signal:
pass
struct ev_idle:
pass
struct ev_prepare:
pass
int ev_version_major()
int ev_version_minor()
unsigned int ev_supported_backends()
unsigned int ev_recommended_backends()
unsigned int ev_embeddable_backends()
double ev_time()
void ev_set_syserr_cb(void *)
int ev_priority(void*)
void ev_set_priority(void*, int)
int ev_is_pending(void*)
int ev_is_active(void*)
void ev_io_init(ev_io*, void* callback, int fd, int events)
void ev_io_start(ev_loop*, ev_io*)
void ev_io_stop(ev_loop*, ev_io*)
void ev_feed_event(ev_loop*, void*, int)
void ev_timer_init(ev_timer*, void* callback, double, double)
void ev_timer_start(ev_loop*, ev_timer*)
void ev_timer_stop(ev_loop*, ev_timer*)
void ev_signal_init(ev_signal*, void* callback, int)
void ev_signal_start(ev_loop*, ev_signal*)
void ev_signal_stop(ev_loop*, ev_signal*)
void ev_idle_init(ev_idle*, void* callback)
void ev_idle_start(ev_loop*, ev_idle*)
void ev_idle_stop(ev_loop*, ev_idle*)
void ev_prepare_init(ev_prepare*, void* callback)
void ev_prepare_start(ev_loop*, ev_prepare*)
void ev_prepare_stop(ev_loop*, ev_prepare*)
ev_loop* ev_default_loop(unsigned int flags)
ev_loop* ev_loop_new(unsigned int flags)
void ev_loop_destroy(ev_loop*)
void ev_loop_fork(ev_loop*)
int ev_is_default_loop(ev_loop*)
unsigned int ev_iteration(ev_loop*)
unsigned int ev_depth(ev_loop*)
unsigned int ev_backend(ev_loop*)
void ev_verify(ev_loop*)
void ev_run(ev_loop*, int flags) nogil
double ev_now(ev_loop*)
void ev_now_update(ev_loop*)
void ev_ref(ev_loop*)
void ev_unref(ev_loop*)
void ev_break(ev_loop*, int)
......@@ -56,7 +56,8 @@ class Queue(object):
self.maxsize = maxsize
self.getters = set()
self.putters = set()
self._event_unlock = None
self.hub = get_hub()
self._event_unlock = self.hub.loop.callback()
self._init(maxsize)
# QQQ make maxsize into a property with setter that schedules unlock if necessary
......@@ -84,8 +85,6 @@ class Queue(object):
result += ' getters[%s]' % len(self.getters)
if self.putters:
result += ' putters[%s]' % len(self.putters)
if self._event_unlock is not None:
result += ' unlocking'
return result
def qsize(self):
......@@ -119,7 +118,7 @@ class Queue(object):
self._put(item)
if self.getters:
self._schedule_unlock()
elif not block and get_hub() is getcurrent():
elif not block and self.hub is getcurrent():
# we're in the mainloop, so we cannot wait; we can switch() to other greenlets though
# find a getter and deliver an item to it
while self.getters:
......@@ -169,7 +168,7 @@ class Queue(object):
if self.putters:
self._schedule_unlock()
return self._get()
elif not block and get_hub() is getcurrent():
elif not block and self.hub is getcurrent():
# special case to make get_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
......@@ -202,45 +201,39 @@ class Queue(object):
return self.get(False)
def _unlock(self):
try:
while True:
if self.qsize() and self.getters:
while True:
if self.qsize() and self.getters:
getter = self.getters.pop()
if getter:
try:
item = self._get()
except:
getter.throw(*sys.exc_info())
else:
getter.switch(item)
elif self.putters and self.getters:
putter = self.putters.pop()
if putter:
getter = self.getters.pop()
if getter:
try:
item = self._get()
except:
getter.throw(*sys.exc_info())
else:
getter.switch(item)
elif self.putters and self.getters:
putter = self.putters.pop()
if putter:
getter = self.getters.pop()
if getter:
item = putter.item
putter.item = _NONE # this makes greenlet calling put() not to call _put() again
self._put(item)
item = self._get()
getter.switch(item)
putter.switch(putter)
else:
self.putters.add(putter)
elif self.putters and (self.getters or self.qsize() < self.maxsize):
putter = self.putters.pop()
putter.switch(putter)
else:
break
finally:
self._event_unlock = None # QQQ maybe it's possible to obtain this info from libevent?
# i.e. whether this event is pending _OR_ currently executing
item = putter.item
putter.item = _NONE # this makes greenlet calling put() not to call _put() again
self._put(item)
item = self._get()
getter.switch(item)
putter.switch(putter)
else:
self.putters.add(putter)
elif self.putters and (self.getters or self.qsize() < self.maxsize):
putter = self.putters.pop()
putter.switch(putter)
else:
break
# testcase: 2 greenlets: while True: q.put(q.get()) - nothing else has a change to execute
# to avoid this, schedule unlock with timer(0, ...) once in a while
def _schedule_unlock(self):
if self._event_unlock is None:
self._event_unlock = get_hub().reactor.active_event(self._unlock)
# QQQ re-activate event (with event_active libevent call) instead of creating a new one each time
self._event_unlock.start(self._unlock)
def __iter__(self):
return self
......
......@@ -2,7 +2,7 @@
from gevent.timeout import Timeout
from gevent.event import Event
from gevent import core
from gevent.core import MAXPRI
from gevent.hub import get_hub
__implements__ = ['select']
......@@ -25,24 +25,20 @@ def get_fileno(obj):
class SelectResult(object):
__slots__ = ['read', 'write', 'event', 'timer']
__slots__ = ['read', 'write', 'event']
def __init__(self):
self.read = []
self.write = []
self.event = Event()
self.timer = None
def update(self, event, evtype):
if evtype & core.EV_READ:
self.read.append(event.arg)
if self.timer is None:
self.timer = get_hub().reactor.timer(0, self.event.set)
elif evtype & core.EV_WRITE:
self.write.append(event.arg)
if self.timer is None:
self.timer = get_hub().reactor.timer(0, self.event.set)
# using timer(0, ...) to let other active events call update() before Event.wait() returns
def update(self, watcher, event):
if event & 1:
self.read.append(watcher.args[0])
self.event.set()
elif event & 2:
self.write.append(watcher.args[0])
self.event.set()
def select(rlist, wlist, xlist, timeout=None):
......@@ -50,25 +46,27 @@ def select(rlist, wlist, xlist, timeout=None):
Note: *xlist* is ignored.
"""
allevents = []
watchers = []
timeout = Timeout.start_new(timeout)
io = get_hub().loop.io
result = SelectResult()
try:
try:
reactor = get_hub().reactor
for readfd in rlist:
event = reactor.read_event(get_fileno(readfd))
event.add(None, result.update, arg=readfd)
allevents.append(event)
watcher = io(get_fileno(readfd), 1)
watcher.start(result.update, readfd)
watcher.priority = MAXPRI
watchers.append(watcher)
for writefd in wlist:
event = reactor.write_event(get_fileno(writefd))
event.add(None, result.update, arg=writefd)
allevents.append(event)
watcher = io(get_fileno(writefd), 2)
watcher.start(result.update, writefd)
watcher.priority = MAXPRI
watchers.append(watcher)
except IOError, ex:
raise error(*ex.args)
result.event.wait(timeout=timeout)
return result.read, result.write, []
finally:
for evt in allevents:
evt.cancel()
for watcher in watchers:
watcher.stop()
timeout.cancel()
......@@ -54,6 +54,7 @@ class StreamServer(BaseServer):
self.delay = self.min_delay
self._accept_event = None
self._start_accepting_timer = None
self.loop = get_hub().loop
def set_listener(self, listener, backlog=None):
BaseServer.set_listener(self, listener, backlog=backlog)
......@@ -86,8 +87,8 @@ class StreamServer(BaseServer):
def start_accepting(self):
if self._accept_event is None:
self._accept_event = get_hub().reactor.read_event(self.socket.fileno(), persist=True)
self._accept_event.add(None, self._do_accept)
self._accept_event = self.loop.io(self.socket.fileno(), 1)
self._accept_event.start(self._do_accept)
def _start_accepting_if_started(self, _event=None):
if self.started:
......@@ -95,10 +96,10 @@ class StreamServer(BaseServer):
def stop_accepting(self):
if self._accept_event is not None:
self._accept_event.cancel()
self._accept_event.stop()
self._accept_event = None
if self._start_accepting_timer is not None:
self._start_accepting_timer.cancel()
self._start_accepting_timer.stop()
self._start_accepting_timer = None
def _do_accept(self, event, _evtype):
......@@ -138,7 +139,8 @@ class StreamServer(BaseServer):
traceback.print_exc()
if self.delay >= 0:
self.stop_accepting()
self._start_accepting_timer = get_hub().reactor.timer(self.delay, self._start_accepting_if_started)
self._start_accepting_timer = self.loop.timer(self.delay)
self._start_accepting_timer.start(self._start_accepting_if_started)
self.delay = min(self.max_delay, self.delay * 2)
def is_fatal_error(self, ex):
......
This diff is collapsed.
......@@ -20,7 +20,7 @@ except AttributeError:
import sys
import errno
from gevent.socket import socket, _fileobject, wait_read, wait_write, timeout_default
from gevent.socket import socket, _fileobject, timeout_default
from gevent.socket import error as socket_error, EBADF
__implements__ = ['SSLSocket',
......@@ -113,7 +113,7 @@ class SSLSocket(socket):
raise
sys.exc_clear()
try:
wait_read(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorReadTimeout, event=self._read_event)
self._wait(self._read_event, timeout_exc=_SSLErrorReadTimeout)
except socket_error, ex:
if ex[0] == EBADF:
return ''
......@@ -124,7 +124,7 @@ class SSLSocket(socket):
sys.exc_clear()
try:
# note: using _SSLErrorReadTimeout rather than _SSLErrorWriteTimeout below is intentional
wait_write(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorReadTimeout, event=self._write_event)
self._wait(self._write_event, timeout_exc=_SSLErrorReadTimeout)
except socket_error, ex:
if ex[0] == EBADF:
return ''
......@@ -144,7 +144,7 @@ class SSLSocket(socket):
raise
sys.exc_clear()
try:
wait_read(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorWriteTimeout, event=self._read_event)
self._wait(self._read_event, timeout_exc=_SSLErrorWriteTimeout)
except socket_error, ex:
if ex[0] == EBADF:
return 0
......@@ -154,7 +154,7 @@ class SSLSocket(socket):
raise
sys.exc_clear()
try:
wait_write(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorWriteTimeout, event=self._write_event)
self._wait(self._write_event, timeout_exc=_SSLErrorWriteTimeout)
except socket_error, ex:
if ex[0] == EBADF:
return 0
......@@ -192,7 +192,7 @@ class SSLSocket(socket):
return 0
sys.exc_clear()
try:
wait_read(self.fileno(), timeout=timeout, event=self._read_event)
self._wait(self._read_event)
except socket_error, ex:
if ex[0] == EBADF:
return 0
......@@ -202,7 +202,7 @@ class SSLSocket(socket):
return 0
sys.exc_clear()
try:
wait_write(self.fileno(), timeout=timeout, event=self._write_event)
self._wait(self._write_event)
except socket_error, ex:
if ex[0] == EBADF:
return 0
......@@ -255,7 +255,7 @@ class SSLSocket(socket):
if self.timeout == 0.0:
raise
try:
wait_read(self.fileno(), timeout=self.timeout, event=self._read_event)
self._wait(self._read_event)
except socket_error, ex:
if ex[0] == EBADF:
return 0
......@@ -298,7 +298,7 @@ class SSLSocket(socket):
raise
sys.exc_clear()
try:
wait_read(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorReadTimeout, event=self._read_event)
self._wait(self._read_event, timeout_exc=_SSLErrorReadTimeout)
except socket_error, ex:
if ex[0] == EBADF:
return ''
......@@ -308,7 +308,7 @@ class SSLSocket(socket):
raise
sys.exc_clear()
try:
wait_write(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorWriteTimeout, event=self._write_event)
self._wait(self._write_event, timeout_exc=_SSLErrorWriteTimeout)
except socket_error, ex:
if ex[0] == EBADF:
return ''
......@@ -345,12 +345,12 @@ class SSLSocket(socket):
if self.timeout == 0.0:
raise
sys.exc_clear()
wait_read(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorHandshakeTimeout, event=self._read_event)
self._wait(self._read_event, timeout_exc=_SSLErrorHandshakeTimeout)
elif ex.args[0] == SSL_ERROR_WANT_WRITE:
if self.timeout == 0.0:
raise
sys.exc_clear()
wait_write(self.fileno(), timeout=self.timeout, timeout_exc=_SSLErrorHandshakeTimeout, event=self._write_event)
self._wait(self._write_event, timeout_exc=_SSLErrorHandshakeTimeout)
else:
raise
......
......@@ -85,17 +85,21 @@ class Timeout(BaseException):
def __init__(self, seconds=None, exception=None):
self.seconds = seconds
self.exception = exception
self.timer = get_hub().reactor.timer()
if seconds is not None:
self.timer = get_hub().loop.timer_ref(seconds)
else:
self.timer = get_hub().loop.timer_ref(0.0)
def start(self):
"""Schedule the timeout."""
assert not self.pending, '%r is already started; to restart it, cancel it first' % self
if self.seconds is None: # "fake" timeout (never expires)
pass
elif self.exception is None or self.exception is False: # timeout that raises self
self.timer.add(self.seconds, getcurrent().throw, self)
elif self.exception is None or self.exception is False or isinstance(self.exception, basestring):
# timeout that raises self
self.timer.start(getcurrent().throw, self)
else: # regular timeout with user-provided exception
self.timer.add(self.seconds, getcurrent().throw, self.exception)
self.timer.start(getcurrent().throw, self.exception)
@classmethod
def start_new(cls, timeout=None, exception=None):
......@@ -120,15 +124,11 @@ class Timeout(BaseException):
@property
def pending(self):
"""Return True if the timeout is scheduled to be raised."""
if self.timer is not None:
return self.timer.pending
else:
return False
return self.timer.pending or self.timer.active
def cancel(self):
"""If the timeout is pending, cancel it. Otherwise, do nothing."""
if self.timer is not None:
self.timer.cancel()
self.timer.stop()
def __repr__(self):
try:
......@@ -163,10 +163,10 @@ class Timeout(BaseException):
elif self.exception is False:
return '%s second%s (silent)' % (self.seconds, suffix)
else:
return '%s second%s (%s)' % (self.seconds, suffix, self.exception)
return '%s second%s: %s' % (self.seconds, suffix, self.exception)
def __enter__(self):
if not self.timer.pending:
if not self.pending:
self.start()
return self
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment