Commit 69fed3c6 authored by Jason Madden's avatar Jason Madden

multiplex io. fixes test__socket.

parent e4a04e89
......@@ -7,7 +7,7 @@ from __future__ import absolute_import, print_function
import os
from collections import defaultdict
import signal
import sys
from gevent._ffi.loop import AbstractLoop
from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error
......@@ -39,6 +39,7 @@ class loop(AbstractLoop):
AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default)
self.__loop_pid = os.getpid()
self._child_watchers = defaultdict(list)
self._io_watchers = dict()
def _init_loop(self, flags, default):
if default is None:
......@@ -212,3 +213,16 @@ class loop(AbstractLoop):
children_watchers = self._child_watchers.get(pid, []) + self._child_watchers.get(0, [])
for watcher in children_watchers:
watcher._set_status(status)
def io(self, fd, events, ref=True, priority=None):
# XXX: Lifetime management. When can this root watcher
# go away?
io_watchers = self._io_watchers
try:
io_watcher = io_watchers[fd]
except KeyError:
io_watcher = self._watchers.io(self, fd, self._watchers.io.EVENT_MASK)
io_watchers[fd] = io_watcher
return io_watcher.multiplex(events)
# pylint: disable=too-many-lines, protected-access, redefined-outer-name, not-callable
from __future__ import absolute_import, print_function
import weakref
import gevent.libuv._corecffi as _corecffi # pylint:disable=no-name-in-module,import-error
ffi = _corecffi.ffi # pylint:disable=no-member
......@@ -13,6 +15,7 @@ _closing_handles = set()
@ffi.callback("void(*)(uv_handle_t*)")
def _uv_close_callback(handle):
#print("Handle callback", handle)
_closing_handles.remove(handle)
......@@ -41,10 +44,10 @@ class watcher(_base.watcher):
# Sadly, doing this causes crashes if there were multiple
# watchers for a given FD. See https://github.com/gevent/gevent/issues/790#issuecomment-208076604
#print("Del", ffi.cast('void*', self._watcher), 'started', libuv.uv_is_active(self._watcher), type(self), id(self))
#if hasattr(self, '_fd'):
# print("FD", self._fd)
if not libuv.uv_is_closing(self._watcher):
print("Closing handle", self._watcher)
self._watcher.data = ffi.NULL
_closing_handles.add(self._watcher)
libuv.uv_close(self._watcher, _uv_close_callback)
......@@ -100,6 +103,8 @@ class io(_base.IoMixin, watcher):
EVENT_MASK = libuv.UV_READABLE | libuv.UV_WRITABLE
_multiplex_watchers = None
def __init__(self, loop, fd, events, ref=True, priority=None):
super(io, self).__init__(loop, fd, events, ref=ref, priority=priority, _args=(fd,))
self._fd = fd
......@@ -123,6 +128,82 @@ class io(_base.IoMixin, watcher):
def _watcher_ffi_start(self):
self._watcher_start(self._watcher, self._events, self._watcher_callback)
class _multiplexwatcher(object):
callback = None
args = ()
pass_events = False
events = 0
# XXX: Can't handle ref counting like libev does
# when we multiplex.
ref = True
def __init__(self, events, watcher):
self.events = events
self._watcher_ref = weakref.ref(watcher)
def start(self, callback, *args, **kwargs):
self.pass_events = kwargs.get("pass_events")
self.callback = callback
self.args = args
watcher = self._watcher_ref()
if watcher is not None and not watcher.active:
watcher._io_start()
def stop(self):
self.callback = None
self.pass_events = None
self.args = None
watcher = self._watcher_ref()
watcher._io_maybe_stop()
@property
def active(self):
return self.callback is not None
def _io_maybe_stop(self):
for r in self._multiplex_watchers:
w = r()
if w is None:
continue
if w.callback is not None:
return
# If we get here, nothing was started
# so we can take ourself out of the polling set
self.stop()
def _io_start(self):
self.start(self._io_callback, pass_events=True)
def multiplex(self, events):
if not self._multiplex_watchers:
self._multiplex_watchers = []
watcher = self._multiplexwatcher(events, self)
watcher_ref = weakref.ref(watcher, self._multiplex_watchers.remove)
self._multiplex_watchers.append(watcher_ref)
return watcher
def _io_callback(self, events):
if events < 0:
# actually a status error code
print("Callback error",
ffi.string(libuv.uv_err_name(events)),
ffi.string(libuv.uv_strerror(events)))
return
for watcher_ref in self._multiplex_watchers:
watcher = watcher_ref()
if not watcher or not watcher.callback:
continue
if events & watcher.events:
if not watcher.pass_events:
watcher.callback(*watcher.args)
else:
watcher.callback(events, *watcher.args)
class fork(_base.ForkMixin):
# We'll have to implement this one completely manually
......@@ -184,6 +265,7 @@ class child(_base.ChildMixin, watcher):
self._rstatus = status
self._async.send()
class async(_base.AsyncMixin, watcher):
def _watcher_ffi_init(self, args):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment