Commit 8eb62985 authored by Jason Madden's avatar Jason Madden

Checkpoint on making io watchers deterministic

By closing them when we're done, we can remove reliance on GC to clean
up multiplexed watchers and solve problems for PyPy and Windows (and
probably eventually simplify the handle cleanup we were doing.)

This makes many more tests on PyPy pass on Darwin; down from a score
of failures to about 4, but a baffling 4. test__os:TestOS_nb times
out; investigation shows that both the read and write sides of the
pipe are waiting on events, both are in libuv, and the write side
should be writable, but it's not getting an event. Not clear why this
is. I also see it with CPython 3.6, and I don't think I did before.
parent fa3b489f
......@@ -144,7 +144,7 @@ class _Callbacks(object):
# The normal, expected scenario when we find the watcher still
# in the keepaliveset is that it is still active at the event loop
# level, so we don't expect that python_stop gets called.
_dbg("The watcher has not stopped itself, possibly still active", the_watcher)
#_dbg("The watcher has not stopped itself, possibly still active", the_watcher)
return 1
return 2 # it stopped itself
......
......@@ -410,6 +410,9 @@ class IoMixin(object):
args = (GEVENT_CORE_EVENTS, ) + args
super(IoMixin, self).start(callback, *args)
def close(self):
pass
class TimerMixin(object):
_watcher_type = 'timer'
......
......@@ -208,9 +208,11 @@ class socket(object):
if self._read_event is not None:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self._read_event.close()
self._read_event = None
if self._write_event is not None:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
self._write_event.close()
self._write_event = None
s = self._sock
self._sock = _closedsocket()
......
......@@ -252,9 +252,11 @@ class socket(object):
if self._read_event is not None:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self._read_event.close()
self._read_event = None
if self._write_event is not None:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
self._write_event.close()
self._write_event = None
_ss.close(self._sock)
......
......@@ -179,7 +179,10 @@ def wait_read(fileno, timeout=None, timeout_exc=_NONE):
.. seealso:: :func:`cancel_wait`
"""
io = get_hub().loop.io(fileno, 1)
return wait(io, timeout, timeout_exc)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
......@@ -196,7 +199,10 @@ def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
# pylint:disable=unused-argument
io = get_hub().loop.io(fileno, 2)
return wait(io, timeout, timeout_exc)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
......@@ -214,7 +220,10 @@ def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
# pylint:disable=unused-argument
io = get_hub().loop.io(fileno, 3)
return wait(io, timeout, timeout_exc)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
#: The exception raised by default on a call to :func:`cancel_wait`
class cancel_wait_ex(error): # pylint: disable=undefined-variable
......
......@@ -381,6 +381,7 @@ cdef public class channel [object PyGeventAresChannelObject, type PyGeventAresCh
watcher.events = events
else:
watcher.stop()
watcher.close()
self._watchers.pop(socket, None)
if not self._watchers:
self._timer.stop()
......
......@@ -808,6 +808,9 @@ cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:
PENDING
def close(self):
pass
#ifdef _WIN32
def __init__(self, loop loop, libev.vfd_socket_t fd, int events, ref=True, priority=None):
......
......@@ -41,7 +41,9 @@ enum uv_poll_event {
UV_READABLE = 1,
UV_WRITABLE = 2,
/* new in 1.9 */
UV_DISCONNECT = 4
UV_DISCONNECT = 4,
/* new in 1.14.0 */
UV_PRIORITIZED = 8,
};
enum uv_fs_event {
......
......@@ -7,8 +7,9 @@ from __future__ import absolute_import, print_function
import os
from collections import defaultdict
from collections import namedtuple
from operator import delitem
import signal
from weakref import WeakValueDictionary
from gevent._compat import PYPY
from gevent._ffi.loop import AbstractLoop
......@@ -81,7 +82,7 @@ class loop(AbstractLoop):
AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default)
self.__loop_pid = os.getpid()
self._child_watchers = defaultdict(list)
self._io_watchers = WeakValueDictionary()
self._io_watchers = dict()
self._fork_watchers = set()
self._pid = os.getpid()
......@@ -363,8 +364,13 @@ class loop(AbstractLoop):
io_watchers = self._io_watchers
try:
io_watcher = io_watchers[fd]
assert io_watcher._multiplex_watchers, ("IO Watcher %s unclosed but should be dead" % io_watcher)
except KeyError:
io_watcher = self._watchers.io(self, fd, self._watchers.io.EVENT_MASK)
# Start the watcher with just the events that we're interested in.
# as multiplexers are added, the real event mask will be updated to keep in sync.
# If we watch for too much, we get spurious wakeups and busy loops.
io_watcher = self._watchers.io(self, fd, 0)
io_watchers[fd] = io_watcher
io_watcher._no_more_watchers = lambda: delitem(io_watchers, fd)
return io_watcher.multiplex(events)
......@@ -217,6 +217,8 @@ class io(_base.IoMixin, watcher):
EVENT_MASK = libuv.UV_READABLE | libuv.UV_WRITABLE | libuv.UV_DISCONNECT
_multiplex_watchers = ()
def __init__(self, loop, fd, events, ref=True, priority=None):
super(io, self).__init__(loop, fd, events, ref=ref, priority=priority, _args=(fd,))
self._fd = fd
......@@ -254,9 +256,16 @@ class io(_base.IoMixin, watcher):
def _get_events(self):
return self._events
@_base.not_while_active
def _set_events(self, events):
if events == self._events:
return
_dbg("Changing event mask for", self, "from", self._events, "to", events)
self._events = events
if self.active:
# We're running but libuv specifically says we can
# call start again to change our event mask.
assert self._handle is not None
self._watcher_start(self._watcher, self._events, self._watcher_callback)
# This is what we'd do if we set _watcher_init_on_init to False:
# def start(self, *args, **kwargs):
......@@ -272,6 +281,7 @@ class io(_base.IoMixin, watcher):
def _watcher_ffi_start(self):
assert self._handle is None
self._handle = self._watcher.data = type(self).new_handle(self)
_dbg("Starting watcher", self, "with events", self._events)
self._watcher_start(self._watcher, self._events, self._watcher_callback)
def _watcher_ffi_stop(self):
......@@ -320,7 +330,7 @@ class io(_base.IoMixin, watcher):
ref = True
def __init__(self, events, watcher):
self.events = events
self._events = events
# References:
# These objects keep the original IO object alive;
......@@ -330,9 +340,13 @@ class io(_base.IoMixin, watcher):
# has also gone away.
self._watcher_ref = watcher
@property
def events(self):
return self._events
def start(self, callback, *args, **kwargs):
_dbg("Starting IO multiplex watcher for", self.fd,
"callback", callback,
"callback", callback, "events", self.events,
"owner", self._watcher_ref)
self.pass_events = kwargs.get("pass_events")
self.callback = callback
......@@ -344,7 +358,7 @@ class io(_base.IoMixin, watcher):
def stop(self):
_dbg("Stopping IO multiplex watcher for", self.fd,
"callback", self.callback,
"callback", self.callback, "events", self.events,
"owner", self._watcher_ref)
self.callback = None
self.pass_events = None
......@@ -352,6 +366,11 @@ class io(_base.IoMixin, watcher):
watcher = self._watcher_ref
watcher._io_maybe_stop()
def close(self):
if self._watcher_ref is not None:
self._watcher_ref._multiplex_closed(self)
self._watcher_ref = None
@property
def active(self):
return self.callback is not None
......@@ -367,9 +386,8 @@ class io(_base.IoMixin, watcher):
lambda self, nv: self._watcher_ref._set_fd(nv))
def _io_maybe_stop(self):
for r in self._multiplex_watchers:
w = r()
if w is not None and w.callback is not None:
for w in self._multiplex_watchers:
if w.callback is not None:
# There's still a reference to it, and it's started,
# so we can't stop.
return
......@@ -378,20 +396,41 @@ class io(_base.IoMixin, watcher):
self.stop()
def _io_start(self):
_dbg("IO start on behalf of multiplex", self)
_dbg("IO start on behalf of multiplex", self, "fd", self._fd, "events", self._events)
self.start(self._io_callback, pass_events=True)
def multiplex(self, events):
if not self._multiplex_watchers:
self._multiplex_watchers = []
def _calc_and_update_events(self):
events = 0
for watcher in self._multiplex_watchers:
events |= watcher.events
self._set_events(events)
def multiplex(self, events):
watcher = self._multiplexwatcher(events, self)
watcher_ref = weakref.ref(watcher, self._multiplex_watchers.remove)
#watcher_ref = weakref.ref(watcher, self._multiplex_watchers.remove)
# We must not keep a hard ref to the returned object.
self._multiplex_watchers.append(watcher_ref)
#self._multiplex_watchers.append(watcher_ref)
self._multiplex_watchers.append(watcher)
self._calc_and_update_events()
return watcher
def _multiplex_closed(self, watcher):
self._multiplex_watchers.remove(watcher)
if not self._multiplex_watchers:
_dbg("IO Watcher", self, "has no more multiplexes")
self.stop() # should already be stopped
self._no_more_watchers()
del self._multiplex_watchers
else:
self._calc_and_update_events()
_dbg("IO Watcher", self, "has remaining multiplex:",
self._multiplex_watchers)
def _no_more_watchers(self):
pass
def _io_callback(self, events):
if events < 0:
# actually a status error code
......@@ -412,13 +451,14 @@ class io(_base.IoMixin, watcher):
# See test__makefile_ref.TestSSL for examples.
# return
#_dbg("Callback event for watcher", self._fd, "event", events)
for watcher_ref in self._multiplex_watchers:
watcher = watcher_ref()
if not watcher or not watcher.callback:
_dbg("Callback event for watcher", self._fd, "event", events)
for watcher in self._multiplex_watchers:
if not watcher.callback:
# Stopped
_dbg("Watcher", self, "has stopped multiplex", watcher)
continue
#_dbg("Event for watcher", self._fd, events, watcher.events, events & watcher.events)
assert watcher._watcher_ref is self, (self, watcher._watcher_ref)
_dbg("Event for watcher", self._fd, events, watcher.events, events & watcher.events)
send_event = (events & watcher.events) or events < 0
if send_event:
......
......@@ -90,7 +90,10 @@ if fcntl:
hub, event = None, None
while True:
try:
return _read(fd, n)
result = _read(fd, n)
if event is not None:
event.close()
return result
except OSError as e:
if e.errno not in ignored_errors:
raise
......@@ -101,6 +104,7 @@ if fcntl:
event = hub.loop.io(fd, 1)
hub.wait(event)
def nb_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the
number of bytes written.
......@@ -110,7 +114,10 @@ if fcntl:
hub, event = None, None
while True:
try:
return _write(fd, buf)
result = _write(fd, buf)
if event is not None:
event.close()
return result
except OSError as e:
if e.errno not in ignored_errors:
raise
......
......@@ -101,6 +101,7 @@ class SelectResult(object):
def _closeall(self, watchers):
for watcher in watchers:
watcher.stop()
watcher.close()
del watchers[:]
def select(self, rlist, wlist, timeout):
......@@ -208,6 +209,9 @@ if original_poll is not None:
# that. Should we raise an error?
fileno = get_fileno(fd)
if fileno in self.fds:
self.fds[fileno].close()
watcher = self.loop.io(fileno, flags)
watcher.priority = self.loop.MAXPRI
self.fds[fileno] = watcher
......@@ -243,6 +247,8 @@ if original_poll is not None:
library. Previously gevent did nothing.
"""
fileno = get_fileno(fd)
io = self.fds[fileno]
io.close()
del self.fds[fileno]
del original_poll
......@@ -390,7 +390,7 @@ if (PY3 and PYPY) or (PYPY and WIN and LIBUV):
# pypy3 is very slow right now,
# as is PyPy2 on windows (which only has libuv)
CI_TIMEOUT = 15
if PYPY and WIN and LIBUV:
if PYPY and LIBUV:
# slow and flaky timeouts
LOCAL_TIMEOUT = CI_TIMEOUT
else:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment