Commit 6c7b8ee8 authored by Denis Bilenko's avatar Denis Bilenko

major backward-incompatible changes to core; it is now libevent-2 based

- libevent1.4 currently does not compile
- interface is not compatible with previous version
- core no longer initializes the event base (Hub does it)
- core.event and core.http accept event_base
- core.event clears 'callback' and 'arg' after callback is executed or it was cancelled (unless it was persistent). this helps to avoid creating GC cycles
- core: read_event/write_event/timer/active_event/.. are gone (but now available as methods on event_base, with different interface, however)
parent 29aceb20
......@@ -44,7 +44,6 @@ core.EV_WRITE = 0x04
core.EV_SIGNAL = 0x08
core.EV_PERSIST = 0x10
from gevent.core import reinit
from gevent.greenlet import Greenlet, joinall, killall
spawn = Greenlet.spawn
spawn_later = Greenlet.spawn_later
......@@ -57,3 +56,8 @@ try:
from gevent.hub import fork
except ImportError:
__all__.remove('fork')
def reinit():
from gevent.hub import get_hub
return get_hub().reinit()
This diff is collapsed.
......@@ -3,7 +3,6 @@
import sys
import traceback
from gevent.core import active_event
from gevent.hub import get_hub, getcurrent
from gevent.timeout import Timeout
......@@ -35,7 +34,7 @@ class Semaphore(object):
def release(self):
self.counter += 1
if self._links and self.counter > 0 and self._notifier is None:
self._notifier = active_event(self._notify_links, list(self._links))
self._notifier = get_hub().reactor.active_event(self._notify_links, list(self._links))
def _notify_links(self, links):
try:
......@@ -64,7 +63,7 @@ class Semaphore(object):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.append(callback)
if self.counter > 0 and self._notifier is None:
self._notifier = active_event(self._notify_links, list(self._links))
self._notifier = get_hub().reactor.active_event(self._notify_links, list(self._links))
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
......
......@@ -20,10 +20,6 @@ __all__ = ['DNSError',
'QUERY_NO_SEARCH']
# move from here into Hub.__init__ (once event_init() is move here as well)
core.dns_init()
class DNSError(gaierror):
"""A subclass of :class:`socket.gaierror` used by :mod:`evdns` functions to report errors.
......
......@@ -3,7 +3,6 @@
import sys
import traceback
from gevent import core
from gevent.hub import get_hub, getcurrent, _NONE
from gevent.timeout import Timeout
......@@ -40,7 +39,7 @@ class Event(object):
self._flag = True
if self._links:
# schedule a job to notify the links already set
core.active_event(self._notify_links, list(self._links))
get_hub().reactor.active_event(self._notify_links, list(self._links))
def clear(self):
"""Reset the internal flag to false.
......@@ -92,7 +91,7 @@ class Event(object):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.append(callback)
if self._flag:
core.active_event(self._notify_links, list(self._links)) # XXX just pass [callback]
get_hub().reactor.active_event(self._notify_links, list(self._links)) # XXX just pass [callback]
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
......@@ -178,7 +177,7 @@ class AsyncResult(object):
self.value = value
self._exception = None
if self._links and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
self._notifier = get_hub().reactor.active_event(self._notify_links)
def set_exception(self, exception):
"""Store the exception. Wake up the waiters.
......@@ -188,7 +187,7 @@ class AsyncResult(object):
"""
self._exception = exception
if self._links and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
self._notifier = get_hub().reactor.active_event(self._notify_links)
def get(self, block=True, timeout=None):
"""Return the stored value or raise the exception.
......@@ -293,7 +292,7 @@ class AsyncResult(object):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.add(callback)
if self.ready() and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
self._notifier = get_hub().reactor.active_event(self._notify_links)
def unlink(self, callback):
"""Remove the callback set by :meth:`rawlink`"""
......
......@@ -54,7 +54,7 @@ cdef extern from "libevent.h":
# evhttp
ctypedef void (*evhttp_handler)(evhttp_request *, void *arg)
evhttp* evhttp_new(event_base *base)
evhttp* evhttp_new(void *base)
int evhttp_bind_socket(evhttp *http, char* address, int port)
int evhttp_accept_socket(evhttp *http, int fd)
void evhttp_free(evhttp* http)
......@@ -511,21 +511,27 @@ cdef class http:
cdef public object default_response_headers
cdef public dict _requests
def __init__(self, object handle, object default_response_headers=None):
def __init__(self, object handle, object default_response_headers=None, event_base base=None):
self.handle = handle
if default_response_headers is None:
self.default_response_headers = []
else:
self.default_response_headers = default_response_headers
self._requests = {} # maps connection id to WeakKeyDictionary which holds requests
self.__obj = evhttp_new(current_base)
self._requests = {} # maps connection id to WeakKeyDictionary which holds requests
if base is None:
if levent.current_base:
self.__obj = evhttp_new(levent.current_base)
else:
raise ValueError('Please provide event_base')
else:
self.__obj = evhttp_new(base._ptr)
evhttp_set_gencb(self.__obj, _http_cb_handler, <void *>self)
def __dealloc__(self):
if self.__obj != NULL:
evhttp_set_gencb(self.__obj, _http_cb_reply_error, NULL)
evhttp_free(self.__obj)
self.__obj = NULL
self.__obj = NULL
property _obj:
......
......@@ -2,7 +2,6 @@
import sys
import traceback
from gevent import core
from gevent.hub import greenlet, getcurrent, get_hub, GreenletExit, Waiter, kill
from gevent.timeout import Timeout
......@@ -242,12 +241,12 @@ class Greenlet(greenlet):
def start(self):
"""Schedule the greenlet to run in this loop iteration"""
assert not self.started, 'Greenlet already started'
self._start_event = core.active_event(self.switch)
self._start_event = get_hub().reactor.active_event(self.switch)
def start_later(self, seconds):
"""Schedule the greenlet to run in the future loop iteration *seconds* later"""
assert not self.started, 'Greenlet already started'
self._start_event = core.timer(seconds, self.switch)
self._start_event = get_hub().reactor.timer(seconds, self.switch)
@classmethod
def spawn(cls, *args, **kwargs):
......@@ -302,7 +301,7 @@ class Greenlet(greenlet):
self._start_event = None
if not self.dead:
waiter = Waiter()
core.active_event(_kill, self, exception, waiter)
get_hub().reactor.active_event(_kill, self, exception, waiter)
if block:
waiter.get()
self.join(timeout)
......@@ -375,7 +374,7 @@ class Greenlet(greenlet):
self._exception = None
self.value = result
if self._links and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
self._notifier = get_hub().reactor.active_event(self._notify_links)
def _report_error(self, exc_info):
exception = exc_info[1]
......@@ -389,7 +388,7 @@ class Greenlet(greenlet):
self._exception = exception
if self._links and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
self._notifier = get_hub().reactor.active_event(self._notify_links)
info = str(self) + ' failed with '
try:
......@@ -421,7 +420,7 @@ class Greenlet(greenlet):
raise TypeError('Expected callable: %r' % (callback, ))
self._links.add(callback)
if self.ready() and self._notifier is None:
self._notifier = core.active_event(self._notify_links)
self._notifier = get_hub().reactor.active_event(self._notify_links)
def link(self, receiver=None, GreenletLink=GreenletLink, SpawnedLink=SpawnedLink):
"""Link greenlet's completion to callable or another greenlet.
......@@ -546,7 +545,7 @@ def _killall(greenlets, exception):
def killall(greenlets, exception=GreenletExit, block=True, timeout=None):
if block:
waiter = Waiter()
core.active_event(_killall3, greenlets, exception, waiter)
get_hub().reactor.active_event(_killall3, greenlets, exception, waiter)
if block:
t = Timeout.start_new(timeout)
try:
......@@ -556,7 +555,7 @@ def killall(greenlets, exception=GreenletExit, block=True, timeout=None):
finally:
t.cancel()
else:
core.active_event(_killall, greenlets, exception)
get_hub().reactor.active_event(_killall, greenlets, exception)
class LinkedExited(Exception):
......
# Copyright (c) 2009-2010 Denis Bilenko. See LICENSE for details.
from gevent import core
from gevent.baseserver import BaseServer
from gevent.hub import get_hub
__all__ = ['HTTPServer']
......@@ -38,7 +39,8 @@ class HTTPServer(BaseServer):
request.send_reply(503, 'Service Unavailable', msg)
def start_accepting(self):
self.http = core.http(self._on_request, self.default_response_headers)
reactor = get_hub().reactor
self.http = core.http(self._on_request, self.default_response_headers, base=reactor)
self.http.accept(self.socket.fileno())
def stop_accepting(self):
......
......@@ -56,11 +56,11 @@ def _switch_helper(function, args, kwargs):
def spawn_raw(function, *args, **kwargs):
if kwargs:
g = greenlet(_switch_helper, get_hub())
core.active_event(g.switch, function, args, kwargs)
get_hub().reactor.active_event(g.switch, function, args, kwargs)
return g
else:
g = greenlet(function, get_hub())
core.active_event(g.switch, *args)
get_hub().reactor.active_event(g.switch, *args)
return g
......@@ -74,7 +74,7 @@ def sleep(seconds=0):
unique_mark = object()
if not seconds >= 0:
raise IOError(22, 'Invalid argument')
timer = core.timer(seconds, getcurrent().switch, unique_mark)
timer = get_hub().reactor.timer(seconds, getcurrent().switch, unique_mark)
try:
switch_result = get_hub().switch()
assert switch_result is unique_mark, 'Invalid switch into sleep(): %r' % (switch_result, )
......@@ -91,25 +91,25 @@ def kill(greenlet, exception=GreenletExit):
so you have to use this function.
"""
if not greenlet.dead:
core.active_event(greenlet.throw, exception)
get_hub().reactor.active_event(greenlet.throw, exception)
def _wrap_signal_handler(handler, args, kwargs):
try:
handler(*args, **kwargs)
except:
core.active_event(MAIN.throw, *sys.exc_info())
get_hub().reactor.active_event(MAIN.throw, *sys.exc_info())
def signal(signalnum, handler, *args, **kwargs):
return core.signal(signalnum, lambda: spawn_raw(_wrap_signal_handler, handler, args, kwargs))
return get_hub().reactor.signal(signalnum, lambda: spawn_raw(_wrap_signal_handler, handler, args, kwargs))
if _original_fork is not None:
def fork():
result = _original_fork()
core.reinit()
get_hub().reactor.reinit()
return result
......@@ -121,19 +121,31 @@ def shutdown():
hub.shutdown()
def get_hub_class():
"""Return the type of hub to use for the current thread.
If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
"""
global _threadlocal
try:
hubtype = _threadlocal.Hub
except AttributeError:
hubtype = None
if hubtype is None:
hubtype = _threadlocal.Hub = Hub
return hubtype
def get_hub():
"""Return the hub for the current thread.
If hub does not exists in the current thread, the new one is created with call to :meth:`get_hub_class`.
"""
global _threadlocal
try:
return _threadlocal.hub
except AttributeError:
try:
hubtype = _threadlocal.Hub
except AttributeError:
# do not pretend to support multiple threads because it's not implemented properly by core.pyx
# this may change in the future, although currently I don't have a strong need for this
raise NotImplementedError('gevent is only usable from a single thread')
if hubtype is None:
hubtype = Hub
hubtype = get_hub_class()
hub = _threadlocal.hub = hubtype()
return hub
......@@ -144,13 +156,19 @@ class Hub(greenlet):
It is created automatically by :func:`get_hub`.
"""
def __init__(self):
reactor_class = core.event_base
def __init__(self, reactor=None):
greenlet.__init__(self)
self.keyboard_interrupt_signal = None
if reactor is None:
self.reactor = self.reactor_class()
else:
self.reactor = reactor
def switch(self):
cur = getcurrent()
assert cur is not self, 'Cannot switch to MAINLOOP from MAINLOOP'
assert cur is not self, 'Impossible to call blocking function in the event loop callback'
exc_info = sys.exc_info()
try:
sys.exc_clear()
......@@ -166,16 +184,16 @@ class Hub(greenlet):
def run(self):
global _threadlocal
assert self is getcurrent(), 'Do not call run() directly'
assert self is getcurrent(), 'Do not call Hub.run() directly'
try:
self.keyboard_interrupt_signal = signal(2, core.active_event, MAIN.throw, KeyboardInterrupt)
self.keyboard_interrupt_signal = signal(2, self.reactor.active_event, MAIN.throw, KeyboardInterrupt)
except IOError:
pass # no signal() on windows
try:
loop_count = 0
while True:
try:
result = core.dispatch()
result = self.reactor.dispatch()
except IOError, ex:
loop_count += 1
if loop_count > 15:
......@@ -196,7 +214,9 @@ class Hub(greenlet):
if self.keyboard_interrupt_signal is not None:
self.keyboard_interrupt_signal.cancel()
self.keyboard_interrupt_signal = None
core.dns_shutdown()
dns = self.reactor.dns
if dns is not None:
dns.free()
if not self or self.dead:
if _threadlocal.__dict__.get('hub') is self:
_threadlocal.__dict__.pop('hub')
......@@ -231,7 +251,7 @@ class Waiter(object):
The :meth:`get` method must be called from a greenlet other than :class:`Hub`.
>>> result = Waiter()
>>> _ = core.timer(0.1, result.switch, 'hello from Waiter')
>>> _ = get_hub().reactor.timer(0.1, result.switch, 'hello from Waiter')
>>> result.get() # blocks for 0.1 seconds
'hello from Waiter'
......@@ -239,7 +259,7 @@ class Waiter(object):
:class:`Waiter` stores the value.
>>> result = Waiter()
>>> _ = core.timer(0.1, result.switch, 'hi from Waiter')
>>> _ = get_hub().reactor.timer(0.1, result.switch, 'hi from Waiter')
>>> sleep(0.2)
>>> result.get() # returns immediatelly without blocking
'hi from Waiter'
......
ctypedef void (*event_handler)(int fd, short evtype, void *arg)
ctypedef void (*event_log_cb)(int severity, char *msg)
cdef extern from "libevent.h":
# event.h:
struct timeval:
unsigned int tv_sec
unsigned int tv_usec
struct event:
void* ev_base
int ev_fd
short ev_events
int ev_flags
void *ev_arg
char* event_get_version()
int EV_TIMEOUT
int EV_READ
int EV_WRITE
int EV_SIGNAL
int EV_PERSIST
int EVLIST_TIMEOUT
int EVLIST_INSERTED
int EVLIST_SIGNAL
int EVLIST_ACTIVE
int EVLIST_INTERNAL
int EVLIST_INIT
int DNS_ERR_NONE
int DNS_ERR_FORMAT
int DNS_ERR_SERVERFAILED
int DNS_ERR_NOTEXIST
int DNS_ERR_NOTIMPL
int DNS_ERR_REFUSED
int DNS_ERR_TRUNCATED
int DNS_ERR_UNKNOWN
int DNS_ERR_TIMEOUT
int DNS_ERR_SHUTDOWN
int DNS_IPv4_A
int DNS_PTR
int DNS_IPv6_AAAA
int DNS_QUERY_NO_SEARCH
void* event_base_new()
int event_reinit(void *base)
int event_base_dispatch(void*) nogil
char* event_base_get_method(void*)
void event_base_free(void*)
int event_base_set(void *, event*)
void event_set(event *ev, int fd, short event, event_handler handler, void *arg)
int event_add(event *ev, timeval *tv)
int event_del(event *ev)
int event_pending(event *ev, short, timeval *tv)
void event_active(event *ev, int res, short ncalls)
int EVLOOP_ONCE
int EVLOOP_NONBLOCK
char* _EVENT_VERSION
struct evutil_addrinfo:
int ai_flags # AI_PASSIVE, AI_CANONNAME, AI_NUMERICHOST
int ai_family # PF_xxx
int ai_socktype # SOCK_xxx
int ai_protocol # 0 or IPPROTO_xxx for IPv4 and IPv6
size_t ai_addrlen # length of ai_addr
char *ai_canonname # canonical name for nodename
void *ai_addr # binary address
evutil_addrinfo *ai_next # next structure in linked list
ctypedef void (*evdns_callback_type)(int result, char t, int count, int ttl, void *addrs, void *arg)
ctypedef void (*evdns_getaddrinfo_cb)(int result, evutil_addrinfo *res, void *arg)
void* evdns_base_new(void *event_base, int initialize_nameservers)
void* current_base
void evdns_base_free(void *dns_base, int fail_requests)
char* evdns_err_to_string(int err)
int evdns_base_nameserver_ip_add(void *dns_base, char *ip_as_string)
int evdns_base_count_nameservers(void *base)
void *evdns_base_resolve_ipv4(void *dns_base, char *name, int flags, evdns_callback_type callback, void *ptr)
void *evdns_base_resolve_ipv6(void *dns_base, char *name, int flags, evdns_callback_type callback, void *ptr)
void *evdns_base_resolve_reverse(void *dns_base, void *, int flags, evdns_callback_type callback, void *ptr)
void *evdns_base_resolve_reverse_ipv6(void *dns_base, void *, int flags, evdns_callback_type callback, void *ptr)
void evdns_cancel_request(void *dns_base, void *req)
int evdns_base_set_option(void *dns_base, char *option, char *val)
void *evdns_getaddrinfo(void *dns_base, char *nodename, char *servname, evutil_addrinfo *hints_in, evdns_getaddrinfo_cb cb, void *arg)
void evdns_getaddrinfo_cancel(void*)
int EVUTIL_EAI_CANCEL
......@@ -20,6 +20,7 @@
#include "event2/buffer_compat.h"
#include "event2/dns.h"
#include "event2/dns_compat.h"
#include "event2/util.h"
#define EVBUFFER_DRAIN evbuffer_drain
#define EVHTTP_SET_CB evhttp_set_cb
......
......@@ -34,7 +34,6 @@ from Queue import Full, Empty
from gevent.timeout import Timeout
from gevent.hub import get_hub, Waiter, getcurrent, _NONE
from gevent import core
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue']
......@@ -240,7 +239,7 @@ class Queue(object):
def _schedule_unlock(self):
if self._event_unlock is None:
self._event_unlock = core.active_event(self._unlock)
self._event_unlock = get_hub().reactor.active_event(self._unlock)
# QQQ re-activate event (with event_active libevent call) instead of creating a new one each time
def __iter__(self):
......
# Copyright (c) 2009-2010 Denis Bilenko. See LICENSE for details.
from gevent import core
from gevent.timeout import Timeout
from gevent.event import Event
from gevent import core
from gevent.hub import get_hub
__implements__ = ['select']
__all__ = ['error'] + __implements__
......@@ -36,12 +37,12 @@ class SelectResult(object):
if evtype & core.EV_READ:
self.read.append(event.arg)
if self.timer is None:
self.timer = core.timer(0, self.event.set)
self.timer = get_hub().reactor.timer(0, self.event.set)
elif evtype & core.EV_WRITE:
self.write.append(event.arg)
if self.timer is None:
self.timer = core.timer(0, self.event.set)
# using core.timer(0, ...) to let other active events call update() before Event.wait() returns
self.timer = get_hub().reactor.timer(0, self.event.set)
# using timer(0, ...) to let other active events call update() before Event.wait() returns
def select(rlist, wlist, xlist, timeout=None):
......@@ -54,10 +55,15 @@ def select(rlist, wlist, xlist, timeout=None):
result = SelectResult()
try:
try:
reactor = get_hub().reactor
for readfd in rlist:
allevents.append(core.read_event(get_fileno(readfd), result.update, arg=readfd))
event = reactor.read_event(get_fileno(readfd))
event.add(None, result.update, arg=readfd)
allevents.append(event)
for writefd in wlist:
allevents.append(core.write_event(get_fileno(writefd), result.update, arg=writefd))
event = reactor.write_event(get_fileno(writefd))
event.add(None, result.update, arg=writefd)
allevents.append(event)
except IOError, ex:
raise error(*ex.args)
result.event.wait(timeout=timeout)
......
......@@ -4,8 +4,8 @@ import sys
import errno
import traceback
from gevent import socket
from gevent import core
from gevent.baseserver import BaseServer
from gevent.hub import get_hub
__all__ = ['StreamServer']
......@@ -88,7 +88,8 @@ class StreamServer(BaseServer):
def start_accepting(self):
if self._accept_event is None:
self._accept_event = core.read_event(self.socket.fileno(), self._do_accept, persist=True)
self._accept_event = get_hub().reactor.read_event(self.socket.fileno(), persist=True)
self._accept_event.add(None, self._do_accept)
def _start_accepting_if_started(self, _event=None):
if self.started:
......@@ -139,7 +140,7 @@ class StreamServer(BaseServer):
traceback.print_exc()
if self.delay >= 0:
self.stop_accepting()
self._start_accepting_timer = core.timer(self.delay, self.start_accepting)
self._start_accepting_timer = get_hub().reactor.timer(self.delay, self._start_accepting_if_started)
self.delay = min(self.max_delay, self.delay * 2)
def is_fatal_error(self, ex):
......
This diff is collapsed.
......@@ -13,8 +13,7 @@ to arbitrary code.
which no switches occur, :class:`Timeout` is powerless.
"""
from gevent import core
from gevent.hub import getcurrent, _NONE
from gevent.hub import getcurrent, _NONE, get_hub
__all__ = ['Timeout',
'with_timeout']
......@@ -86,17 +85,17 @@ class Timeout(BaseException):
def __init__(self, seconds=None, exception=None):
self.seconds = seconds
self.exception = exception
self.timer = None
self.timer = get_hub().reactor.timer()
def start(self):
"""Schedule the timeout."""
assert not self.pending, '%r is already started; to restart it, cancel it first' % self
if self.seconds is None: # "fake" timeout (never expires)
self.timer = None
pass
elif self.exception is None or self.exception is False: # timeout that raises self
self.timer = core.timer(self.seconds, getcurrent().throw, self)
self.timer.add(self.seconds, getcurrent().throw, self)
else: # regular timeout with user-provided exception
self.timer = core.timer(self.seconds, getcurrent().throw, self.exception)
self.timer.add(self.seconds, getcurrent().throw, self.exception)
@classmethod
def start_new(cls, timeout=None, exception=None):
......@@ -167,7 +166,7 @@ class Timeout(BaseException):
return '%s second%s (%s)' % (self.seconds, suffix, self.exception)
def __enter__(self):
if self.timer is None:
if not self.timer.pending:
self.start()
return self
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment