Commit ad0f8c7c authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1061 from gevent/close_io_watcher

Explicitly close IO watchers 
parents fa3b489f b4cb1bcf
......@@ -12,6 +12,7 @@ env:
# first group of parallel runs (4) as posible
- TASK=test-py27-libuv
- TASK=test-py36-libuv
- TASK=test-pypy-libuv
- TASK=test-py27-noembed
- TASK=test-pypy
- TASK=test-py36
......
......@@ -187,9 +187,11 @@ test-py36-libuv: $(PY36)
GEVENT_CORE_CFFI_ONLY=libuv make test-py36
test-pypy: $(PYPY)
ls $(BUILD_RUNTIMES)/versions/pypy590/bin/
PYTHON=$(PYPY) PATH=$(BUILD_RUNTIMES)/versions/pypy590/bin:$(PATH) make develop toxtest
test-pypy-libuv: $(PYPY)
GEVENT_CORE_CFFI_ONLY=libuv make test-pypy
test-pypy3: $(PYPY3)
PYTHON=$(PYPY3) PATH=$(BUILD_RUNTIMES)/versions/pypy3.5_590/bin:$(PATH) make develop toxtest
......
......@@ -10,8 +10,10 @@ environment:
# Pre-installed Python versions, which Appveyor may upgrade to
# a later point release.
# We're not quite ready for PyPy+libuv, it doesn't even
# work correctly on Posix.
# We're not quite ready for PyPy+libuv.
# It works correctly on POSIX (linux and darwin),
# but has some strange errors and many timeouts on Windows.
# Most recent build: https://ci.appveyor.com/project/denik/gevent/build/1.0.1174/job/cv63181yj3ebb9cs
# - PYTHON: "C:\\pypy2-v5.9.0-win32"
# PYTHON_ID: "pypy"
# PYTHON_EXE: pypy
......
......@@ -144,7 +144,7 @@ class _Callbacks(object):
# The normal, expected scenario when we find the watcher still
# in the keepaliveset is that it is still active at the event loop
# level, so we don't expect that python_stop gets called.
_dbg("The watcher has not stopped itself, possibly still active", the_watcher)
#_dbg("The watcher has not stopped itself, possibly still active", the_watcher)
return 1
return 2 # it stopped itself
......@@ -296,10 +296,11 @@ class AbstractLoop(object):
@classmethod
def __make_watcher_ref_callback(cls, typ, active_watchers, ffi_watcher):
def __make_watcher_ref_callback(cls, typ, active_watchers, ffi_watcher, debug):
# separate method to make sure we have no ref to the watcher
def callback(_):
active_watchers.pop(ffi_watcher)
_dbg("Python weakref callback closing", debug)
typ._watcher_ffi_close(ffi_watcher)
return callback
......@@ -309,7 +310,8 @@ class AbstractLoop(object):
self.__make_watcher_ref_callback(
type(python_watcher),
self._active_watchers,
ffi_watcher))
ffi_watcher,
repr(python_watcher)))
def _init_loop_and_aux_watchers(self, flags=None, default=None):
......
......@@ -176,21 +176,14 @@ class watcher(object):
_handle = None # FFI object to self. This is a GC cycle. See _watcher_create
_watcher = None
# Do we create the native resources when this class is created?
# If so, we call _watcher_full_init from the constructor.
# Otherwise, it must be called before we are started.
# If a subclass sets this to false, they must make that call.
# Currently unused. Experimental functionality for libuv.
_watcher_init_on_init = True
_watcher_registers_with_loop_on_create = True
def __init__(self, _loop, ref=True, priority=None, args=_NOARGS):
self.loop = _loop
self.__init_priority = priority
self.__init_args = args
self.__init_ref = ref
if self._watcher_init_on_init:
self._watcher_full_init()
self._watcher_full_init()
def _watcher_full_init(self):
priority = self.__init_priority
......@@ -226,8 +219,13 @@ class watcher(object):
self._watcher = self._watcher_new()
# This call takes care of calling _watcher_ffi_close when
# self goes away, making sure self._watcher stays alive
# that long
self.loop._register_watcher(self, self._watcher)
# that long.
# XXX: All watchers should go to a model like libuv's
# IO watcher that gets explicitly closed so that we can always
# have control over when this gets done.
if self._watcher_registers_with_loop_on_create:
self.loop._register_watcher(self, self._watcher)
self._watcher.data = self._handle
......@@ -401,6 +399,7 @@ class IoMixin(object):
raise ValueError('fd must be non-negative: %r' % fd)
if events & ~self.EVENT_MASK:
raise ValueError('illegal event mask: %r' % events)
self._fd = fd
super(IoMixin, self).__init__(loop, ref=ref, priority=priority,
args=_args or (fd, events))
......@@ -410,6 +409,11 @@ class IoMixin(object):
args = (GEVENT_CORE_EVENTS, ) + args
super(IoMixin, self).start(callback, *args)
def close(self):
pass
def _format(self):
return ' fd=%d' % self._fd
class TimerMixin(object):
_watcher_type = 'timer'
......
......@@ -208,9 +208,11 @@ class socket(object):
if self._read_event is not None:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self._read_event.close()
self._read_event = None
if self._write_event is not None:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
self._write_event.close()
self._write_event = None
s = self._sock
self._sock = _closedsocket()
......
......@@ -252,9 +252,11 @@ class socket(object):
if self._read_event is not None:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self._read_event.close()
self._read_event = None
if self._write_event is not None:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
self._write_event.close()
self._write_event = None
_ss.close(self._sock)
......
......@@ -179,7 +179,10 @@ def wait_read(fileno, timeout=None, timeout_exc=_NONE):
.. seealso:: :func:`cancel_wait`
"""
io = get_hub().loop.io(fileno, 1)
return wait(io, timeout, timeout_exc)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
......@@ -196,7 +199,10 @@ def wait_write(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
# pylint:disable=unused-argument
io = get_hub().loop.io(fileno, 2)
return wait(io, timeout, timeout_exc)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
......@@ -214,7 +220,10 @@ def wait_readwrite(fileno, timeout=None, timeout_exc=_NONE, event=_NONE):
"""
# pylint:disable=unused-argument
io = get_hub().loop.io(fileno, 3)
return wait(io, timeout, timeout_exc)
try:
return wait(io, timeout, timeout_exc)
finally:
io.close()
#: The exception raised by default on a call to :func:`cancel_wait`
class cancel_wait_ex(error): # pylint: disable=undefined-variable
......
......@@ -381,6 +381,7 @@ cdef public class channel [object PyGeventAresChannelObject, type PyGeventAresCh
watcher.events = events
else:
watcher.stop()
watcher.close()
self._watchers.pop(socket, None)
if not self._watchers:
self._timer.stop()
......
......@@ -808,6 +808,9 @@ cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:
PENDING
def close(self):
pass
#ifdef _WIN32
def __init__(self, loop loop, libev.vfd_socket_t fd, int events, ref=True, priority=None):
......
......@@ -41,7 +41,9 @@ enum uv_poll_event {
UV_READABLE = 1,
UV_WRITABLE = 2,
/* new in 1.9 */
UV_DISCONNECT = 4
UV_DISCONNECT = 4,
/* new in 1.14.0 */
UV_PRIORITIZED = 8,
};
enum uv_fs_event {
......
......@@ -7,8 +7,9 @@ from __future__ import absolute_import, print_function
import os
from collections import defaultdict
from collections import namedtuple
from operator import delitem
import signal
from weakref import WeakValueDictionary
from gevent._compat import PYPY
from gevent._ffi.loop import AbstractLoop
......@@ -81,7 +82,7 @@ class loop(AbstractLoop):
AbstractLoop.__init__(self, ffi, libuv, _watchers, flags, default)
self.__loop_pid = os.getpid()
self._child_watchers = defaultdict(list)
self._io_watchers = WeakValueDictionary()
self._io_watchers = dict()
self._fork_watchers = set()
self._pid = os.getpid()
......@@ -351,20 +352,20 @@ class loop(AbstractLoop):
@gcBefore
def io(self, fd, events, ref=True, priority=None):
# We don't keep a hard ref to the root object;
# the caller must keep the multiplexed watcher
# alive as long as its in use.
# We go to great pains to avoid GC cycles here, otherwise
# CPython tests (e.g., test_asyncore) fail on Windows.
# For PyPy, though, avoiding cycles isn't enough and we must
# do a GC to force cleaning up old objects.
# We rely on hard references here and explicit calls to
# close() on the returned object to correctly manage
# the watcher lifetimes.
io_watchers = self._io_watchers
try:
io_watcher = io_watchers[fd]
assert io_watcher._multiplex_watchers, ("IO Watcher %s unclosed but should be dead" % io_watcher)
except KeyError:
io_watcher = self._watchers.io(self, fd, self._watchers.io.EVENT_MASK)
# Start the watcher with just the events that we're interested in.
# as multiplexers are added, the real event mask will be updated to keep in sync.
# If we watch for too much, we get spurious wakeups and busy loops.
io_watcher = self._watchers.io(self, fd, 0)
io_watchers[fd] = io_watcher
io_watcher._no_more_watchers = lambda: delitem(io_watchers, fd)
return io_watcher.multiplex(events)
......@@ -4,7 +4,6 @@ from __future__ import absolute_import, print_function
import functools
import sys
import weakref
import gevent.libuv._corecffi as _corecffi # pylint:disable=no-name-in-module,import-error
......@@ -208,41 +207,29 @@ class io(_base.IoMixin, watcher):
# a native watcher until the object is started, and we shut it down
# when the object is stopped.
# XXX: I was able to solve at least Windows test_ftplib.py issues with more of a
# careful use of io objects in socket.py, so delaying this entirely is at least
# temporarily on hold. Instead sticking with the _watcher_create
# function override for the moment.
# XXX: I was able to solve at least Windows test_ftplib.py issues
# with more of a careful use of io objects in socket.py, so
# delaying this entirely is at least temporarily on hold. Instead
# sticking with the _watcher_create function override for the
# moment.
#_watcher_init_on_init = False
# XXX: Note 2: Moving to a deterministic close model, which was necessary
# for PyPy, also seems to solve the Windows issues. So we're completely taking
# this object out of the loop's registration; we don't want GC callbacks and
# uv_close anywhere *near* this object.
_watcher_registers_with_loop_on_create = False
EVENT_MASK = libuv.UV_READABLE | libuv.UV_WRITABLE | libuv.UV_DISCONNECT
_multiplex_watchers = ()
def __init__(self, loop, fd, events, ref=True, priority=None):
super(io, self).__init__(loop, fd, events, ref=ref, priority=priority, _args=(fd,))
self._fd = fd
self._events = events
self._multiplex_watchers = []
def _watcher_create(self, ref):
super(io, self)._watcher_create(ref)
# Immediately break the GC cycle. We restore the cycle before
# we are started and break it again when we are stopped.
# On Windows is critical to be able to garbage collect these
# objects in a timely fashion so that they don't get reused
# for multiplexing completely different sockets. This is because
# uv_poll_init_socket does a lot of setup for the socket to make
# polling work. If get reused for another socket that has the same
# fileno, things break badly. (In theory this could be a problem
# on posix too, but in practice it isn't).
# TODO: We should probably generalize this to all
# ffi watchers. Avoiding GC cycles as much as possible
# is a good thing, and potentially allocating new handles
# as needed gets us better memory locality.
self._handle = None
self._watcher.data = ffi.NULL
def _get_fd(self):
return self._fd
......@@ -254,35 +241,21 @@ class io(_base.IoMixin, watcher):
def _get_events(self):
return self._events
@_base.not_while_active
def _set_events(self, events):
if events == self._events:
return
_dbg("Changing event mask for", self, "from", self._events, "to", events)
self._events = events
# This is what we'd do if we set _watcher_init_on_init to False:
# def start(self, *args, **kwargs):
# assert self._watcher is None
# self._watcher_full_init()
# super(io, self).start(*args, **kwargs)
# Along with disposing of self._watcher in _watcher_ffi_stop.
# In that method, it's possible that we might be started again right after this,
# in which case we will create a new set of FFI objects.
# TODO: Does anything leak in that case? Verify...
if self.active:
# We're running but libuv specifically says we can
# call start again to change our event mask.
assert self._handle is not None
self._watcher_start(self._watcher, self._events, self._watcher_callback)
def _watcher_ffi_start(self):
assert self._handle is None
self._handle = self._watcher.data = type(self).new_handle(self)
_dbg("Starting watcher", self, "with events", self._events)
self._watcher_start(self._watcher, self._events, self._watcher_callback)
def _watcher_ffi_stop(self):
# We may or may not have been started yet, so
# we may or may not have a handle; either way,
# drop it.
_dbg("Stopping io watcher", self)
self._handle = None
self._watcher.data = ffi.NULL
super(io, self)._watcher_ffi_stop()
if sys.platform.startswith('win32'):
# uv_poll can only handle sockets on Windows, but the plain
# uv_poll_init we call on POSIX assumes that the fileno
......@@ -320,19 +293,22 @@ class io(_base.IoMixin, watcher):
ref = True
def __init__(self, events, watcher):
self.events = events
self._events = events
# References:
# These objects keep the original IO object alive;
# These objects must keep the original IO object alive;
# the IO object SHOULD NOT keep these alive to avoid cycles
# When they all go away, the original IO object can go
# away. *Hopefully* that means that the FD they were opened for
# has also gone away.
# We MUST NOT rely on GC to clean up the IO objects, but the explicit
# calls to close(); see _multiplex_closed.
self._watcher_ref = watcher
events = property(
lambda self: self._events,
_base.not_while_active(lambda self, nv: setattr(self, '_events', nv)))
def start(self, callback, *args, **kwargs):
_dbg("Starting IO multiplex watcher for", self.fd,
"callback", callback,
"callback", callback, "events", self.events,
"owner", self._watcher_ref)
self.pass_events = kwargs.get("pass_events")
self.callback = callback
......@@ -344,13 +320,19 @@ class io(_base.IoMixin, watcher):
def stop(self):
_dbg("Stopping IO multiplex watcher for", self.fd,
"callback", self.callback,
"callback", self.callback, "events", self.events,
"owner", self._watcher_ref)
self.callback = None
self.pass_events = None
self.args = None
watcher = self._watcher_ref
watcher._io_maybe_stop()
if watcher is not None:
watcher._io_maybe_stop()
def close(self):
if self._watcher_ref is not None:
self._watcher_ref._multiplex_closed(self)
self._watcher_ref = None
@property
def active(self):
......@@ -363,13 +345,12 @@ class io(_base.IoMixin, watcher):
# ares.pyx depends on this property,
# and test__core uses it too
fd = property(lambda self: self._watcher_ref._fd,
fd = property(lambda self: getattr(self._watcher_ref, '_fd', -1),
lambda self, nv: self._watcher_ref._set_fd(nv))
def _io_maybe_stop(self):
for r in self._multiplex_watchers:
w = r()
if w is not None and w.callback is not None:
for w in self._multiplex_watchers:
if w.callback is not None:
# There's still a reference to it, and it's started,
# so we can't stop.
return
......@@ -378,20 +359,51 @@ class io(_base.IoMixin, watcher):
self.stop()
def _io_start(self):
_dbg("IO start on behalf of multiplex", self)
_dbg("IO start on behalf of multiplex", self, "fd", self._fd, "events", self._events)
self.start(self._io_callback, pass_events=True)
def multiplex(self, events):
if not self._multiplex_watchers:
self._multiplex_watchers = []
def _calc_and_update_events(self):
events = 0
for watcher in self._multiplex_watchers:
events |= watcher.events
self._set_events(events)
watcher = self._multiplexwatcher(events, self)
watcher_ref = weakref.ref(watcher, self._multiplex_watchers.remove)
# We must not keep a hard ref to the returned object.
self._multiplex_watchers.append(watcher_ref)
def multiplex(self, events):
watcher = self._multiplexwatcher(events, self)
self._multiplex_watchers.append(watcher)
self._calc_and_update_events()
return watcher
def _multiplex_closed(self, watcher):
self._multiplex_watchers.remove(watcher)
if not self._multiplex_watchers:
_dbg("IO Watcher", self, "has no more multiplexes")
self.stop() # should already be stopped
self._no_more_watchers()
# It is absolutely critical that we control when the call
# to uv_close() gets made. uv_close() of a uv_poll_t
# handle winds up calling uv__platform_invalidate_fd,
# which, as the name implies, destroys any outstanding
# events for the *fd* that haven't been delivered yet, and also removes
# the *fd* from the poll set. So if this happens later, at some
# non-deterministic time when (cyclic or otherwise) GC runs,
# *and* we've opened a new watcher for the fd, that watcher will
# suddenly and mysteriously stop seeing events. So we do this now;
# this method is smart enough not to close the handle twice.
self._watcher_ffi_close(self._watcher)
self._watcher = None
del self._multiplex_watchers
else:
self._calc_and_update_events()
_dbg("IO Watcher", self, "has remaining multiplex:",
self._multiplex_watchers)
def _no_more_watchers(self):
# The loop sets this on an individual watcher to delete it from
# the active list where it keeps hard references.
pass
def _io_callback(self, events):
if events < 0:
# actually a status error code
......@@ -412,13 +424,14 @@ class io(_base.IoMixin, watcher):
# See test__makefile_ref.TestSSL for examples.
# return
#_dbg("Callback event for watcher", self._fd, "event", events)
for watcher_ref in self._multiplex_watchers:
watcher = watcher_ref()
if not watcher or not watcher.callback:
_dbg("Callback event for watcher", self._fd, "event", events)
for watcher in self._multiplex_watchers:
if not watcher.callback:
# Stopped
_dbg("Watcher", self, "has stopped multiplex", watcher)
continue
#_dbg("Event for watcher", self._fd, events, watcher.events, events & watcher.events)
assert watcher._watcher_ref is self, (self, watcher._watcher_ref)
_dbg("Event for watcher", self._fd, events, watcher.events, events & watcher.events)
send_event = (events & watcher.events) or events < 0
if send_event:
......
......@@ -90,7 +90,10 @@ if fcntl:
hub, event = None, None
while True:
try:
return _read(fd, n)
result = _read(fd, n)
if event is not None:
event.close()
return result
except OSError as e:
if e.errno not in ignored_errors:
raise
......@@ -101,6 +104,7 @@ if fcntl:
event = hub.loop.io(fd, 1)
hub.wait(event)
def nb_write(fd, buf):
"""Write bytes from buffer `buf` to file descriptor `fd`. Return the
number of bytes written.
......@@ -110,7 +114,10 @@ if fcntl:
hub, event = None, None
while True:
try:
return _write(fd, buf)
result = _write(fd, buf)
if event is not None:
event.close()
return result
except OSError as e:
if e.errno not in ignored_errors:
raise
......
......@@ -101,6 +101,7 @@ class SelectResult(object):
def _closeall(self, watchers):
for watcher in watchers:
watcher.stop()
watcher.close()
del watchers[:]
def select(self, rlist, wlist, timeout):
......@@ -208,6 +209,9 @@ if original_poll is not None:
# that. Should we raise an error?
fileno = get_fileno(fd)
if fileno in self.fds:
self.fds[fileno].close()
watcher = self.loop.io(fileno, flags)
watcher.priority = self.loop.MAXPRI
self.fds[fileno] = watcher
......@@ -243,6 +247,8 @@ if original_poll is not None:
library. Previously gevent did nothing.
"""
fileno = get_fileno(fd)
io = self.fds[fileno]
io.close()
del self.fds[fileno]
del original_poll
......@@ -390,7 +390,7 @@ if (PY3 and PYPY) or (PYPY and WIN and LIBUV):
# pypy3 is very slow right now,
# as is PyPy2 on windows (which only has libuv)
CI_TIMEOUT = 15
if PYPY and WIN and LIBUV:
if PYPY and LIBUV:
# slow and flaky timeouts
LOCAL_TIMEOUT = CI_TIMEOUT
else:
......
......@@ -158,6 +158,14 @@ disabled_tests = [
'test_subprocess.ProcessTestCase.test_zombie_fast_process_del',
# relies on subprocess._active which we don't use
# Very slow, tries to open lots and lots of subprocess and files,
# tends to timeout on CI.
'test_subprocess.ProcessTestCase.test_no_leaking',
# This test is also very slow, and has been timing out on Travis
# since November of 2016 on Python 3, but now also seen on Python 2/Pypy.
'test_subprocess.ProcessTestCase.test_leaking_fds_on_error',
'test_ssl.ThreadedTests.test_default_ciphers',
'test_ssl.ThreadedTests.test_empty_cert',
'test_ssl.ThreadedTests.test_malformed_cert',
......@@ -558,8 +566,6 @@ if sys.version_info[0] == 3:
'test_subprocess.ProcessTestCaseNoPoll.test_cwd_with_relative_arg',
'test_subprocess.ProcessTestCase.test_cwd_with_relative_executable',
# This test tends to timeout, starting at the end of November 2016
'test_subprocess.ProcessTestCase.test_leaking_fds_on_error',
]
wrapped_tests.update({
......
......@@ -22,7 +22,7 @@ class TestWatchers(unittest.TestCase):
def test_io(self):
if sys.platform == 'win32':
# libev raises IOError, libuv raises ValueError
Error = (IOError,ValueError)
Error = (IOError, ValueError)
win32 = True
else:
Error = ValueError
......@@ -47,6 +47,7 @@ class TestWatchers(unittest.TestCase):
self.assertEqual(core._events_to_str(io.events), 'WRITE|_IOFDSET')
else:
self.assertEqual(core._events_to_str(io.events), 'WRITE')
io.close()
def test_timer_constructor(self):
with self.assertRaises(ValueError):
......
......@@ -73,10 +73,10 @@ class Test(greentest.TestCase):
loop = core.loop(default=False)
# Watchers aren't reused once all outstanding
# refs go away
# refs go away BUT THEY MUST BE CLOSED
tty_watcher = loop.io(1, core.WRITE)
watcher_handle = tty_watcher._watcher if IS_CFFI else tty_watcher
tty_watcher.close()
del tty_watcher
# XXX: Note there is a cycle in the CFFI code
# from watcher_handle._handle -> watcher_handle.
......@@ -86,7 +86,7 @@ class Test(greentest.TestCase):
tty_watcher = loop.io(1, core.WRITE)
self.assertIsNot(tty_watcher._watcher if IS_CFFI else tty_watcher, watcher_handle)
tty_watcher.close()
loop.destroy()
def reset(watcher, lst):
......
......@@ -67,7 +67,7 @@ if hasattr(os, 'make_nonblocking'):
class TestOS_nb(TestOS_tp):
def pipe(self):
r, w = pipe()
r, w = super(TestOS_nb, self).pipe()
os.make_nonblocking(r)
os.make_nonblocking(w)
return r, w
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment