Commit 0d403f2f authored by Alexey Borzenkov's avatar Alexey Borzenkov

Implement vfd for libev on win32 (no-op on other platforms)

parent 4dd94990
...@@ -4,6 +4,9 @@ cimport libev ...@@ -4,6 +4,9 @@ cimport libev
from python cimport * from python cimport *
libev.vfd_init()
__all__ = ['get_version', __all__ = ['get_version',
'get_header_version', 'get_header_version',
'supported_backends', 'supported_backends',
...@@ -448,7 +451,7 @@ define(WATCHER, `WATCHER_BASE($1) ...@@ -448,7 +451,7 @@ define(WATCHER, `WATCHER_BASE($1)
ACTIVE($1)') ACTIVE($1)')
define(INIT, `def __cinit__(self, loop loop$2): define(INIT, `def __init__(self, loop loop$2):
libev.ev_$1_init(&self._watcher, <void *>gevent_callback_$1$3) libev.ev_$1_init(&self._watcher, <void *>gevent_callback_$1$3)
self.loop = loop self.loop = loop
self._incref = 0') self._incref = 0')
...@@ -479,59 +482,36 @@ cdef public class watcher [object PyGeventWatcherObject, type PyGeventWatcher_Ty ...@@ -479,59 +482,36 @@ cdef public class watcher [object PyGeventWatcherObject, type PyGeventWatcher_Ty
return '' return ''
cdef int _open_osfhandle(int handle) except -1:
cdef int fd = handle
IFDEF_WINDOWS()
fd = libev._open_osfhandle(fd, 0)
if fd < 0:
raise IOError("handle=%s does not have a valid file descriptor" % handle)
if fd >= FD_SETSIZE:
raise IOError("fd %s (handle=%s) is bigger than FD_SETSIZE (%s)" % (fd, handle, FD_SETSIZE))
cdef unsigned long arg
if ioctlsocket(handle, FIONREAD, &arg) != 0:
raise IOError("fd=%s (handle=%s) is not a socket descriptor (file descriptors are not supported)" % (fd, handle))
ENDIF()
return fd
cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]: cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:
WATCHER(io) WATCHER(io)
cdef int _initialized # for __dealloc__, whether io is initialized cdef int _initialized # for __dealloc__, whether io is initialized
def __cinit__(self, loop loop, int fd, int events): def __init__(self, loop loop, long fd, int events):
IFDEF_WINDOWS() cdef int vfd = libev.vfd_open(fd)
fd = _open_osfhandle(fd) if self._initialized:
ENDIF() libev.vfd_free(self._watcher.fd)
libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, fd, events) libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, events)
self._initialized = 1 self._initialized = 1
self.loop = loop self.loop = loop
self._incref = 0 self._incref = 0
def __dealloc__(self): def __dealloc__(self):
IFDEF_WINDOWS()
if self._initialized: if self._initialized:
libev.close(self._watcher.fd) libev.vfd_free(self._watcher.fd)
ENDIF()
property fd: property fd:
def __get__(self): def __get__(self):
cdef int fd = self._watcher.fd return libev.vfd_get(self._watcher.fd)
IFDEF_WINDOWS()
fd = libev._get_osfhandle(fd)
ENDIF()
return fd
def __set__(self, int fd): def __set__(self, long fd):
if libev.ev_is_active(&self._watcher): if libev.ev_is_active(&self._watcher):
raise AttributeError("'io' watcher attribute 'fd' is read-only while watcher is active") raise AttributeError("'io' watcher attribute 'fd' is read-only while watcher is active")
IFDEF_WINDOWS() cdef int vfd = libev.vfd_open(fd)
fd = _open_osfhandle(fd) # careful, might raise libev.vfd_free(self._watcher.fd)
libev.close(self._watcher.fd) # close the old one libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, vfd, self._watcher.events)
ENDIF()
libev.ev_io_init(&self._watcher, <void *>gevent_callback_io, fd, self._watcher.events)
property events: property events:
...@@ -549,7 +529,7 @@ cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]: ...@@ -549,7 +529,7 @@ cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:
return _events_to_str(self._watcher.events) return _events_to_str(self._watcher.events)
def _format(self): def _format(self):
return ' fd=%s events=%s' % (self._watcher.fd, self.events_str) return ' fd=%s events=%s' % (self.fd, self.events_str)
cdef public class timer(watcher) [object PyGeventTimerObject, type PyGeventTimer_Type]: cdef public class timer(watcher) [object PyGeventTimerObject, type PyGeventTimer_Type]:
......
cdef extern from "libev_vfd.h":
void vfd_init()
long vfd_get(int)
int vfd_open(long) except -1
void vfd_free(int)
cdef extern from "libev.h": cdef extern from "libev.h":
int EV_MINPRI int EV_MINPRI
int EV_MAXPRI int EV_MAXPRI
...@@ -128,8 +134,3 @@ cdef extern from "libev.h": ...@@ -128,8 +134,3 @@ cdef extern from "libev.h":
void ev_ref(ev_loop*) void ev_ref(ev_loop*)
void ev_unref(ev_loop*) void ev_unref(ev_loop*)
void ev_break(ev_loop*, int) void ev_break(ev_loop*, int)
# not from libev, but it's there to avoid collisions with gevent.core._open_osfhandle
int _open_osfhandle(int, int)
int _get_osfhandle(int)
void close(int)
#ifdef _WIN32
#ifdef EV_STANDALONE
/*
* If libev on win32 is embedded, then we can use an
* arbitrary mapping between integer fds and OS
* handles. Then by defining special macros libev
* will use our functions.
*/
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <winsock.h>
typedef struct vfd_entry_t
{
long handle; /* OS handle, i.e. SOCKET */
int count; /* Reference count, 0 if free */
int next; /* Next free fd, -1 if last */
} vfd_entry;
#define VFD_INCREMENT 128
static int vfd_num = 0; /* num allocated fds */
static int vfd_max = 0; /* max allocated fds */
static int vfd_next = -1; /* next free fd for reuse */
static PyObject* vfd_map = NULL; /* map OS handle -> virtual fd */
static vfd_entry* vfd_entries = NULL; /* list of virtual fd entries */
#ifdef WITH_THREAD
static CRITICAL_SECTION vfd_lock;
#define VFD_LOCK_INIT InitializeCriticalSection(&vfd_lock)
#define VFD_LOCK_ENTER EnterCriticalSection(&vfd_lock)
#define VFD_LOCK_LEAVE LeaveCriticalSection(&vfd_lock)
#define VFD_GIL_DECLARE PyGILState_STATE ___save
#define VFD_GIL_ENSURE ___save = PyGILState_Ensure()
#define VFD_GIL_RELEASE PyGILState_Release(___save)
#else
#define VFD_LOCK_INIT
#define VFD_LOCK_ENTER
#define VFD_LOCK_LEAVE
#define VFD_GIL_DECLARE
#define VFD_GIL_ENSURE
#define VFD_GIL_RELEASE
#endif
static void vfd_init()
{
VFD_LOCK_INIT;
}
/*
* Given a virtual fd returns an OS handle or -1
* This function is speed critical, so it cannot use GIL
*/
static long vfd_get(int fd)
{
int handle = -1;
VFD_LOCK_ENTER;
if (vfd_entries != NULL && fd >= 0 && fd < vfd_num)
handle = vfd_entries[fd].handle;
VFD_LOCK_LEAVE;
return handle;
}
#define EV_FD_TO_WIN32_HANDLE(fd) vfd_get((fd))
/*
* Given an OS handle finds or allocates a virtual fd
* Returns -1 on failure and sets Python exception if pyexc is non-zero
*/
static int vfd_open_(long handle, int pyexc)
{
VFD_GIL_DECLARE;
int fd = -1;
unsigned long arg;
PyObject* key = NULL;
PyObject* value;
VFD_GIL_ENSURE;
if (ioctlsocket(handle, FIONREAD, &arg) != 0) {
if (pyexc)
PyErr_Format(PyExc_IOError, "%ld is not a socket (files are not supported)", handle);
goto done;
}
if (vfd_map == NULL) {
vfd_map = PyDict_New();
if (vfd_map == NULL)
goto done;
}
if (vfd_entries == NULL) {
vfd_entry* entries = PyMem_Malloc(sizeof(vfd_entry) * VFD_INCREMENT);
if (entries == NULL) {
if (pyexc)
PyErr_NoMemory();
goto done;
}
VFD_LOCK_ENTER;
vfd_entries = entries;
vfd_max = VFD_INCREMENT;
vfd_num = 0;
VFD_LOCK_LEAVE;
}
key = PyLong_FromLong(handle);
/* check if it's already in the dict */
value = PyDict_GetItem(vfd_map, key);
if (value != NULL) {
/* is it safe to use PyInt_AS_LONG(value) here? */
fd = PyInt_AsLong(value);
if (fd >= 0) {
++vfd_entries[fd].count;
goto done;
}
}
/* use the free entry, if available */
if (vfd_next >= 0) {
fd = vfd_next;
vfd_next = vfd_entries[fd].next;
VFD_LOCK_ENTER;
goto allocated;
}
/* check if it would be out of bounds */
if (vfd_num >= FD_SETSIZE) {
/* libev's select doesn't support more that FD_SETSIZE fds */
if (pyexc)
PyErr_Format(PyExc_IOError, "cannot watch more than %d sockets", (int)FD_SETSIZE);
goto done;
}
/* allocate more space if needed */
VFD_LOCK_ENTER;
if (vfd_num >= vfd_max) {
int newsize = vfd_max + VFD_INCREMENT;
vfd_entry* entries = PyMem_Realloc(vfd_entries, sizeof(vfd_entry) * newsize);
if (entries == NULL) {
VFD_LOCK_LEAVE;
if (pyexc)
PyErr_NoMemory();
goto done;
}
vfd_entries = entries;
vfd_max += VFD_INCREMENT;
}
fd = vfd_num++;
allocated:
/* vfd_lock must be acquired when entering here */
vfd_entries[fd].handle = handle;
vfd_entries[fd].count = 1;
value = PyInt_FromLong(fd);
PyDict_SetItem(vfd_map, key, value);
Py_DECREF(value);
VFD_LOCK_LEAVE;
done:
Py_XDECREF(key);
VFD_GIL_RELEASE;
return fd;
}
#define vfd_open(fd) vfd_open_((fd), 1)
#define EV_WIN32_HANDLE_TO_FD(handle) vfd_open_((handle), 0)
static void vfd_free_(int fd, int needclose)
{
VFD_GIL_DECLARE;
PyObject* key;
VFD_GIL_ENSURE;
if (fd < 0 || fd >= vfd_num)
goto done; /* out of bounds */
if (vfd_entries[fd].count <= 0)
goto done; /* free entry, ignore */
if (!--vfd_entries[fd].count) {
/* fd has just been freed */
long handle = vfd_entries[fd].handle;
vfd_entries[fd].handle = -1;
vfd_entries[fd].next = vfd_next;
vfd_next = fd;
if (needclose)
closesocket(handle);
/* vfd_map is assumed to be != NULL */
key = PyLong_FromLong(handle);
PyDict_DelItem(vfd_map, key);
Py_DECREF(key);
}
done:
VFD_GIL_RELEASE;
}
#define vfd_free(fd) vfd_free_((fd), 0)
#define EV_WIN32_CLOSE_FD(fd) vfd_free_((fd), 1)
#else
/*
* If libev on win32 is not embedded in gevent, then
* the only way to map vfds is to use the default of
* using runtime fds in libev. Note that it will leak
* fds, because there's no way of closing them safely
*/
#define vfd_init() do {} while(0)
#define vfd_get(fd) _get_osfhandle((fd))
#define vfd_open(fd) _open_osfhandle((fd), 0)
#define vfd_free(fd)
#endif
#else
/*
* On non-win32 platforms vfd_* are noop macros
*/
#define vfd_init() do {} while(0)
#define vfd_get(fd) (fd)
#define vfd_open(fd) ((int)(fd))
#define vfd_free(fd)
#endif
...@@ -266,10 +266,6 @@ class socket(object): ...@@ -266,10 +266,6 @@ class socket(object):
io = self.hub.loop.io io = self.hub.loop.io
self._read_event = io(fileno, 1) self._read_event = io(fileno, 1)
self._write_event = io(fileno, 2) self._write_event = io(fileno, 2)
if is_windows:
self._connect_event = io(fileno, 3)
else:
self._connect_event = self._write_event
def __repr__(self): def __repr__(self):
return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._formatinfo()) return '<%s at %s %s>' % (type(self).__name__, hex(id(self)), self._formatinfo())
...@@ -337,8 +333,6 @@ class socket(object): ...@@ -337,8 +333,6 @@ class socket(object):
# use self._read_event.loop.run_callback # use self._read_event.loop.run_callback
self.hub.cancel_wait(self._read_event, cancel_wait_ex) self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self.hub.cancel_wait(self._write_event, cancel_wait_ex) self.hub.cancel_wait(self._write_event, cancel_wait_ex)
if self._connect_event is not self._write_event:
self.hub.cancel_wait(self._connect_event, cancel_wait_ex)
self._sock = _closedsocket() self._sock = _closedsocket()
dummy = self._sock._dummy dummy = self._sock._dummy
for method in _delegate_methods: for method in _delegate_methods:
...@@ -364,7 +358,7 @@ class socket(object): ...@@ -364,7 +358,7 @@ class socket(object):
if not result or result == EISCONN: if not result or result == EISCONN:
break break
elif (result in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or (result == EINVAL and is_windows): elif (result in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or (result == EINVAL and is_windows):
self._wait(self._connect_event) self._wait(self._write_event)
else: else:
raise error(result, strerror(result)) raise error(result, strerror(result))
finally: finally:
...@@ -541,8 +535,6 @@ class socket(object): ...@@ -541,8 +535,6 @@ class socket(object):
else: else:
self.hub.cancel_wait(self._read_event, cancel_wait_ex) self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self.hub.cancel_wait(self._write_event, cancel_wait_ex) self.hub.cancel_wait(self._write_event, cancel_wait_ex)
if self._connect_event is not self._write_event:
self.hub.cancel_wait(self._connect_event, cancel_wait_ex)
self._sock.shutdown(how) self._sock.shutdown(how)
family = property(lambda self: self._sock.family, doc="the socket family") family = property(lambda self: self._sock.family, doc="the socket family")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment