Commit b43f8686 authored by Jason Madden's avatar Jason Madden

I solved the test_ftplib.py windows failures locally by more careful handling...

I solved the test_ftplib.py windows failures locally by more careful handling of lifetimes of io watchers: they should be disposed when the fileno is. I investigated a more drastic approach using new watchers entirely every time we restart, but it doesn't seem necessary, at least on CPython (I can't even run PyPy+libuv locally on darwin though, so there are more problems lurking there).
parent 3198560e
"""
Internal helpers for FFI implementations.
"""
from __future__ import print_function, absolute_import
import os
import sys
def _dbg(*args, **kwargs):
# pylint:disable=unused-argument
pass
#_dbg = print
def _pid_dbg(*args, **kwargs):
kwargs['file'] = sys.stderr
print(os.getpid(), *args, **kwargs)
GEVENT_DEBUG = 0
TRACE = 9
if os.getenv('GEVENT_DEBUG') == 'trace':
_dbg = _pid_dbg
GEVENT_DEBUG = TRACE
......@@ -8,6 +8,9 @@ import os
import traceback
from weakref import ref as WeakRef
from gevent._ffi import _dbg
from gevent._ffi import GEVENT_DEBUG
from gevent._ffi import TRACE
from gevent._ffi.callback import callback
__all__ = [
......@@ -59,10 +62,24 @@ EVENTS = GEVENT_CORE_EVENTS = _EVENTSType()
####
class _Callbacks(object):
if GEVENT_DEBUG >= TRACE:
def __init__(self, ffi):
self.ffi = ffi
self.callbacks = []
def from_handle(self, handle): # pylint:disable=method-hidden
_dbg("Getting from handle", handle)
traceback.print_stack()
x = self.ffi.from_handle(handle)
_dbg("Got from handle", handle, x)
return x
else:
def __init__(self, ffi):
self.ffi = ffi
self.callbacks = []
self.from_handle = self.ffi.from_handle
def python_callback(self, handle, revents):
"""
Returns an integer having one of three values:
......@@ -79,13 +96,15 @@ class _Callbacks(object):
Everything went according to plan, but the watcher has already
been stopped. Its memory may no longer be valid.
"""
orig_ffi_watcher = None
try:
# Even dereferencing the handle needs to be inside the try/except;
# if we don't return normally (e.g., a signal) then we wind up going
# to the 'onerror' handler, which
# is not what we want; that can permanently wedge the loop depending
# on which callback was executing
the_watcher = self.ffi.from_handle(handle)
the_watcher = self.from_handle(handle)
orig_ffi_watcher = the_watcher._watcher
args = the_watcher.args
if args is None:
# Legacy behaviour from corecext: convert None into ()
......@@ -101,7 +120,7 @@ class _Callbacks(object):
try:
the_watcher
except UnboundLocalError:
the_watcher = self.ffi.from_handle(handle)
the_watcher = self.from_handle(handle)
the_watcher._exc_info = sys.exc_info()
# Depending on when the exception happened, the watcher
# may or may not have been stopped. We need to make sure its
......@@ -109,14 +128,17 @@ class _Callbacks(object):
the_watcher.loop._keepaliveset.add(the_watcher)
return -1
else:
if the_watcher in the_watcher.loop._keepaliveset:
# It didn't stop itself
if the_watcher in the_watcher.loop._keepaliveset and the_watcher._watcher is orig_ffi_watcher:
# It didn't stop itself, *and* it didn't stop itself, reset
# its watcher, and start itself again. libuv's io watchers MAY
# do that.
_dbg("The watcher has not stopped itself", the_watcher)
return 0
return 1 # It stopped itself
def python_handle_error(self, handle, _revents):
try:
watcher = self.ffi.from_handle(handle)
watcher = self.from_handle(handle)
exc_info = watcher._exc_info
del watcher._exc_info
# In the past, we passed the ``watcher`` itself as the context,
......@@ -147,13 +169,13 @@ class _Callbacks(object):
# This is supposed to be called for signals, etc.
if tb is not None:
handle = tb.tb_frame.f_locals['handle']
watcher = self.ffi.from_handle(handle)
watcher = self.from_handle(handle)
watcher.loop._check_callback_handle_error(t, v, tb)
else:
raise v
def python_stop(self, handle):
watcher = self.ffi.from_handle(handle)
watcher = self.from_handle(handle)
watcher.stop()
......
......@@ -9,6 +9,7 @@ import os
import signal as signalmodule
import functools
from gevent._ffi import _dbg
from gevent._ffi.loop import GEVENT_CORE_EVENTS
from gevent._ffi.loop import _NOARGS
......@@ -16,6 +17,12 @@ __all__ = [
]
class _NoWatcherResult(int):
def __repr__(self):
return "<NoWatcher>"
_NoWatcherResult = _NoWatcherResult(0)
def events_to_str(event_field, all_events):
result = []
......@@ -35,10 +42,25 @@ def not_while_active(func):
@functools.wraps(func)
def nw(self, *args, **kwargs):
if self.active:
raise AttributeError("not while active")
raise ValueError("not while active")
func(self, *args, **kwargs)
return nw
def only_if_watcher(func):
@functools.wraps(func)
def if_w(self):
if self._watcher:
return func(self)
return _NoWatcherResult
return if_w
def error_if_no_watcher(func):
@functools.wraps(func)
def no_w(self):
if not self._watcher:
raise ValueError("No watcher present", self)
func(self)
return no_w
class LazyOnClass(object):
......@@ -154,8 +176,26 @@ class watcher(object):
_handle = None # FFI object to self. This is a GC cycle. See _watcher_create
_watcher = None
# Do we create the native resources when this class is created?
# If so, we call _watcher_full_init from the constructor.
# Otherwise, it must be called before we are started.
# If a subclass sets this to false, they must make that call.
# Currently unused. Experimental functionality for libuv.
_watcher_init_on_init = True
def __init__(self, _loop, ref=True, priority=None, args=_NOARGS):
self.loop = _loop
self.__init_priority = priority
self.__init_args = args
self.__init_ref = ref
if self._watcher_init_on_init:
self._watcher_full_init()
def _watcher_full_init(self):
priority = self.__init_priority
ref = self.__init_ref
args = self.__init_args
self._watcher_create(ref)
......@@ -266,12 +306,14 @@ class watcher(object):
result += " args=%r" % (self.args, )
if self.callback is None and self.args is None:
result += " stopped"
result += " watcher=%s" % (self._watcher)
result += " handle=%s" % (self._watcher_handle)
result += " ref=%s" % (self.ref)
return result + ">"
@property
def _watcher_handle(self):
if self._watcher:
return self._watcher.data
def _format(self):
......@@ -318,10 +360,11 @@ class watcher(object):
self._watcher_ffi_start_unref()
def stop(self):
_dbg("Main stop for", self, "keepalive?", self in self.loop._keepaliveset)
self._watcher_ffi_stop_ref()
self._watcher_ffi_stop()
self.loop._keepaliveset.discard(self)
_dbg("Finished main stop for", self, "keepalive?", self in self.loop._keepaliveset)
self.callback = None
self.args = None
......@@ -337,7 +380,9 @@ class watcher(object):
@property
def active(self):
return True if self._watcher_is_active(self._watcher) else False
if self._watcher is not None and self._watcher_is_active(self._watcher):
return True
return False
@property
def pending(self):
......
......@@ -202,13 +202,22 @@ class socket(object):
def close(self, _closedsocket=_closedsocket, cancel_wait_ex=cancel_wait_ex):
# This function should not reference any globals. See Python issue #808164.
# Also break any reference to the loop.io objects. Our fileno, which they were
# tied to, is now free to be reused, so these objects are no longer functional.
if self._read_event is not None:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self._read_event = None
if self._write_event is not None:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
self._write_event = None
s = self._sock
self._sock = _closedsocket()
if PYPY:
s._drop()
@property
def closed(self):
return isinstance(self._sock, _closedsocket)
......@@ -261,7 +270,10 @@ class socket(object):
def makefile(self, mode='r', bufsize=-1):
# Two things to look out for:
# 1) Closing the original socket object should not close the
# socket (hence creating a new instance)
# socket (hence creating a new socket instance);
# An alternate approach is what _socket3.py does, which is to
# keep count of the times makefile objects have been opened (Py3's
# SocketIO helps with that).
# 2) The resulting fileobject must keep the timeout in order
# to be compatible with the stdlib's socket.makefile.
# Pass self as _sock to preserve timeout.
......
......@@ -237,6 +237,7 @@ class socket(object):
return text
def _decref_socketios(self):
# Called by SocketIO when it is closed.
if self._io_refs > 0:
self._io_refs -= 1
if self._closed:
......@@ -244,8 +245,17 @@ class socket(object):
def _real_close(self, _ss=_socket.socket, cancel_wait_ex=cancel_wait_ex):
# This function should not reference any globals. See Python issue #808164.
# Break any reference to the loop.io objects. Our fileno,
# which they were tied to, is now free to be reused, so these
# objects are no longer functional.
if self._read_event is not None:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self._read_event = None
if self._write_event is not None:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
self._write_event = None
_ss.close(self._sock)
# Break any references to the underlying socket object. Tested
......
......@@ -10,9 +10,9 @@ static void (*gevent_noop)(void* handle) = &_gevent_noop;
static void _gevent_generic_callback1(void* vwatcher, int arg)
{
uv_handle_t* watcher = (uv_handle_t*)vwatcher;
void* handle = watcher->data;
int cb_result = python_callback(handle, arg);
const uv_handle_t* watcher = (uv_handle_t*)vwatcher;
const void* handle = watcher->data; // Python code may set this to NULL.
const int cb_result = python_callback(handle, arg);
switch(cb_result) {
case -1:
// in case of exception, call self.loop.handle_error;
......
......@@ -11,7 +11,6 @@ import signal
from weakref import WeakValueDictionary
from gevent._compat import PYPY
from gevent._compat import WIN
from gevent._ffi.loop import AbstractLoop
from gevent.libuv import _corecffi # pylint:disable=no-name-in-module,import-error
from gevent._ffi.loop import assign_standard_callbacks
......@@ -50,8 +49,8 @@ def get_header_version():
def supported_backends():
return ['default']
if PYPY and WIN:
def gcsOnPyPy(f):
if PYPY:
def gcBefore(f):
import functools
import gc
......@@ -61,7 +60,7 @@ if PYPY and WIN:
return f(self, *args)
return m
else:
def gcsOnPyPy(f):
def gcBefore(f):
return f
class loop(AbstractLoop):
......@@ -350,7 +349,7 @@ class loop(AbstractLoop):
watcher._set_status(status)
@gcsOnPyPy
@gcBefore
def io(self, fd, events, ref=True, priority=None):
# We don't keep a hard ref to the root object;
# the caller must keep the multiplexed watcher
......
......@@ -12,6 +12,7 @@ ffi = _corecffi.ffi
libuv = _corecffi.lib
from gevent._ffi import watcher as _base
from gevent._ffi import _dbg
_closing_handles = set()
......@@ -20,18 +21,6 @@ _closing_handles = set()
def _uv_close_callback(handle):
_closing_handles.remove(handle)
def _dbg(*args, **kwargs):
# pylint:disable=unused-argument
pass
#_dbg = print
def _pid_dbg(*args, **kwargs):
import os
kwargs['file'] = sys.stderr
print(os.getpid(), *args, **kwargs)
# _dbg = _pid_dbg
_events = [(libuv.UV_READABLE, "READ"),
(libuv.UV_WRITABLE, "WRITE")]
......@@ -157,14 +146,16 @@ class watcher(_base.watcher):
_dbg("\tStarted", self)
def _watcher_ffi_stop(self):
_dbg("Stoping", self)
_dbg("Stopping", self, self._watcher_stop)
self._watcher_stop(self._watcher)
_dbg("Stoped", self)
_dbg("Stopped", self)
@_base.only_if_watcher
def _watcher_ffi_ref(self):
_dbg("Reffing", self)
libuv.uv_ref(self._watcher)
@_base.only_if_watcher
def _watcher_ffi_unref(self):
_dbg("Unreffing", self)
libuv.uv_unref(self._watcher)
......@@ -196,6 +187,34 @@ class io(_base.IoMixin, watcher):
_watcher_type = 'poll'
_watcher_callback_name = '_gevent_poll_callback2'
# On Windows is critical to be able to garbage collect these
# objects in a timely fashion so that they don't get reused
# for multiplexing completely different sockets. This is because
# uv_poll_init_socket does a lot of setup for the socket to make
# polling work. If get reused for another socket that has the same
# fileno, things break badly. (In theory this could be a problem
# on posix too, but in practice it isn't).
# TODO: We should probably generalize this to all
# ffi watchers. Avoiding GC cycles as much as possible
# is a good thing, and potentially allocating new handles
# as needed gets us better memory locality.
# Especially on Windows, we must also account for the case that a
# reference to this object has leaked (e.g., the socket object is
# still around), but the fileno has been closed and a new one
# opened. We must still get a new native watcher at that point. We
# handle this case by simply making sure that we don't even have
# a native watcher until the object is started, and we shut it down
# when the object is stopped.
# XXX: I was able to solve at least Windows test_ftplib.py issues with more of a
# careful use of io objects in socket.py, so delaying this entirely is at least
# temporarily on hold. Instead sticking with the _watcher_create
# function override for the moment.
#_watcher_init_on_init = False
EVENT_MASK = libuv.UV_READABLE | libuv.UV_WRITABLE | libuv.UV_DISCONNECT
def __init__(self, loop, fd, events, ref=True, priority=None):
......@@ -239,6 +258,17 @@ class io(_base.IoMixin, watcher):
def _set_events(self, events):
self._events = events
# This is what we'd do if we set _watcher_init_on_init to False:
# def start(self, *args, **kwargs):
# assert self._watcher is None
# self._watcher_full_init()
# super(io, self).start(*args, **kwargs)
# Along with disposing of self._watcher in _watcher_ffi_stop.
# In that method, it's possible that we might be started again right after this,
# in which case we will create a new set of FFI objects.
# TODO: Does anything leak in that case? Verify...
def _watcher_ffi_start(self):
assert self._handle is None
self._handle = self._watcher.data = type(self).new_handle(self)
......@@ -248,6 +278,7 @@ class io(_base.IoMixin, watcher):
# We may or may not have been started yet, so
# we may or may not have a handle; either way,
# drop it.
_dbg("Stopping io watcher", self)
self._handle = None
self._watcher.data = ffi.NULL
super(io, self)._watcher_ffi_stop()
......@@ -347,6 +378,7 @@ class io(_base.IoMixin, watcher):
self.stop()
def _io_start(self):
_dbg("IO start on behalf of multiplex", self)
self.start(self._io_callback, pass_events=True)
def multiplex(self, events):
......
......@@ -14,6 +14,7 @@ import os
import re
TRAVIS = os.environ.get("TRAVIS") == "true"
APPVEYOR = os.environ.get('APPVEYOR')
OSX = sys.platform == 'darwin'
PYPY = hasattr(sys, 'pypy_version_info')
WIN = sys.platform.startswith("win")
......@@ -307,6 +308,11 @@ if LIBUV:
disabled_tests += [
]
if APPVEYOR:
disabled_tests += [
]
def _make_run_with_original(mod_name, func_name):
@contextlib.contextmanager
def with_orig():
......
......@@ -20,7 +20,7 @@ def main():
try:
x.priority = 1
raise AssertionError('must not be able to change priority of active watcher')
except AttributeError:
except (AttributeError, ValueError):
pass
loop.run()
assert x.pending == 0, x.pending
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment