Commit 51461c84 authored by Jason Madden's avatar Jason Madden

WIP: All CFFI watchers have explicit lifetimes

And must be close() when done. The IO watchers already required this
for libuv; doing it for everything simplifies memory management and
object lifetimes and lets us get rid of a bunch of weakref objects
with callbacks.

This is a new requirement and will only affect those that use
CFFI (e.g., PyPy) and specifically it's needed for libuv; libev
doesn't really do much with it.

Some debugging help for forgetting to close objects piggybacks on
tracemalloc on Python 3.

test_ftplib.py fails in Python 3 because of some warnings about
unclosed objects.
test__backdoor.py fails in Python 2 for the same reason.
parent daf20462
......@@ -16,18 +16,12 @@ def _pid_dbg(*args, **kwargs):
kwargs['file'] = sys.stderr
print(os.getpid(), *args, **kwargs)
GEVENT_DEBUG = 0
CRITICAL = 1
ERROR = 3
DEBUG = 5
TRACE = 9
if os.getenv("GEVENT_DEBUG") == 'critical':
GEVENT_DEBUG = CRITICAL
elif os.getenv("GEVENT_DEBUG") == 'error':
GEVENT_DEBUG = ERROR
elif os.getenv('GEVENT_DEBUG') == 'debug':
GEVENT_DEBUG = DEBUG
elif os.getenv('GEVENT_DEBUG') == 'trace':
GEVENT_DEBUG_LEVEL = vars()[os.getenv("GEVENT_DEBUG", 'CRITICAL').upper()]
if GEVENT_DEBUG_LEVEL >= TRACE:
_dbg = _pid_dbg
GEVENT_DEBUG = TRACE
......@@ -23,6 +23,8 @@ class callback(object):
self.callback = None
self.args = None
close = stop
# Note that __nonzero__ and pending are different
# bool() is used in contexts where we need to know whether to schedule another callback,
# so it's true if it's pending or currently running
......
......@@ -6,10 +6,9 @@ from __future__ import absolute_import, print_function
import sys
import os
import traceback
from weakref import ref as WeakRef
from gevent._ffi import _dbg
from gevent._ffi import GEVENT_DEBUG
from gevent._ffi import GEVENT_DEBUG_LEVEL
from gevent._ffi import TRACE
from gevent._ffi import CRITICAL
from gevent._ffi.callback import callback
......@@ -67,7 +66,7 @@ class AbstractCallbacks(object):
def __init__(self, ffi):
self.ffi = ffi
self.callbacks = []
if GEVENT_DEBUG < TRACE:
if GEVENT_DEBUG_LEVEL < TRACE:
self.from_handle = ffi.from_handle
def from_handle(self, handle): # pylint:disable=method-hidden
......@@ -122,7 +121,9 @@ class AbstractCallbacks(object):
args = (revents, ) + args[1:]
the_watcher.callback(*args)
except: # pylint:disable=bare-except
_dbg("Got exception servicing watcher with handle", handle)
_dbg("Got exception servicing watcher with handle", handle, sys.exc_info())
import traceback
traceback.print_exc()
# It's possible for ``the_watcher`` to be undefined (UnboundLocalError)
# if we threw an exception (signal) on the line that created that variable.
# This is typically the case with a signal under libuv
......@@ -137,7 +138,9 @@ class AbstractCallbacks(object):
the_watcher.loop._keepaliveset.add(the_watcher)
return -1
else:
if the_watcher in the_watcher.loop._keepaliveset and the_watcher._watcher is orig_ffi_watcher:
if (the_watcher.loop is not None
and the_watcher in the_watcher.loop._keepaliveset
and the_watcher._watcher is orig_ffi_watcher):
# It didn't stop itself, *and* it didn't stop itself, reset
# its watcher, and start itself again. libuv's io watchers MAY
# do that.
......@@ -205,7 +208,7 @@ class AbstractCallbacks(object):
raise v
def python_stop(self, handle):
if not handle:
if not handle: # pragma: no cover
print(
"WARNING: gevent: Unable to dereference handle; not stopping watcher. "
"Native resources may leak. This is most likely a bug in gevent.",
......@@ -214,7 +217,7 @@ class AbstractCallbacks(object):
# NOTE: Raising exceptions here does nothing, they're swallowed by CFFI.
# Since the C level passed in a null pointer, even dereferencing the handle
# will just produce some exceptions.
if GEVENT_DEBUG < CRITICAL:
if GEVENT_DEBUG_LEVEL < CRITICAL:
return
_dbg("python_stop: stopping watcher with handle", handle)
watcher = self.from_handle(handle)
......@@ -312,39 +315,10 @@ class AbstractLoop(object):
self._watchers = watchers
self._in_callback = False
self._callbacks = []
# Stores python watcher objects while they are started
self._keepaliveset = set()
self._init_loop_and_aux_watchers(flags, default)
# Stores a {ffi_watcher -> weakref(python watcher)} When the python watcher
# weakref goes away, the handle should be popped. The handle
# *must not* be involved in a cycle in the watcher instance
# (but it must not also be collected until the watcher is dead). Watcher constructors
# are responsible for calling loop._register_watcher.
# This stores ffi level objects (the ones that have .data = handle)
# to avoid any python-level cycles. Note that *anywhere* we keep the
# handle, we wind up with a ref cycle (unless we go through some crazy
# weakref gymnastics---maybe), so we do the simple thing and
# keep them on the python watcher
self._active_watchers = dict()
@classmethod
def __make_watcher_ref_callback(cls, typ, active_watchers, ffi_watcher, debug):
# separate method to make sure we have no ref to the watcher
def callback(_):
active_watchers.pop(ffi_watcher)
_dbg("Python weakref callback closing", debug)
typ._watcher_ffi_close(ffi_watcher)
return callback
def _register_watcher(self, python_watcher, ffi_watcher):
self._active_watchers[ffi_watcher] = WeakRef(python_watcher,
self.__make_watcher_ref_callback(
type(python_watcher),
self._active_watchers,
ffi_watcher,
repr(python_watcher)))
def _init_loop_and_aux_watchers(self, flags=None, default=None):
......
......@@ -8,8 +8,17 @@ import sys
import os
import signal as signalmodule
import functools
import warnings
try:
from tracemalloc import get_object_traceback
except ImportError:
def get_object_traceback(_obj):
return None
from gevent._ffi import _dbg
from gevent._ffi import GEVENT_DEBUG_LEVEL
from gevent._ffi import CRITICAL
from gevent._ffi.loop import GEVENT_CORE_EVENTS
from gevent._ffi.loop import _NOARGS
......@@ -17,6 +26,12 @@ __all__ = [
]
try:
ResourceWarning
except NameError:
class ResourceWarning(Warning):
"Python 2 fallback"
class _NoWatcherResult(int):
def __repr__(self):
......@@ -98,7 +113,7 @@ class AbstractWatcherType(type):
def __new__(cls, name, bases, cls_dict):
if name != 'watcher' and not cls_dict.get('_watcher_skip_ffi'):
cls._fill_watcher(name, bases, cls_dict)
if '__del__' in cls_dict:
if '__del__' in cls_dict and GEVENT_DEBUG_LEVEL < CRITICAL:
raise TypeError("CFFI watchers are not allowed to have __del__")
return type.__new__(cls, name, bases, cls_dict)
......@@ -178,10 +193,14 @@ class watcher(object):
_callback = None
_args = None
_handle = None # FFI object to self. This is a GC cycle. See _watcher_create
_watcher = None
_watcher_registers_with_loop_on_create = True
# self._handle has a reference to self, keeping it alive.
# We must keep self._handle alive for ffi.from_handle() to be
# able to work. We only fill this in when we are started,
# and when we are stopped we destroy it.
# NOTE: This is a GC cycle, so we keep it around for as short
# as possible.
_handle = None
def __init__(self, _loop, ref=True, priority=None, args=_NOARGS):
self.loop = _loop
......@@ -190,6 +209,7 @@ class watcher(object):
self.__init_ref = ref
self._watcher_full_init()
def _watcher_full_init(self):
priority = self.__init_priority
ref = self.__init_ref
......@@ -216,23 +236,7 @@ class watcher(object):
pass
def _watcher_create(self, ref): # pylint:disable=unused-argument
# self._handle has a reference to self, keeping it alive.
# We must keep self._handle alive for ffi.from_handle() to be
# able to work.
# THIS IS A GC CYCLE.
self._handle = type(self).new_handle(self) # pylint:disable=no-member
self._watcher = self._watcher_new()
# This call takes care of calling _watcher_ffi_close when
# self goes away, making sure self._watcher stays alive
# that long.
# XXX: All watchers should go to a model like libuv's
# IO watcher that gets explicitly closed so that we can always
# have control over when this gets done.
if self._watcher_registers_with_loop_on_create:
self.loop._register_watcher(self, self._watcher)
self._watcher.data = self._handle
def _watcher_new(self):
return type(self).new(self._watcher_struct_pointer_type) # pylint:disable=no-member
......@@ -289,10 +293,35 @@ class watcher(object):
_watcher_callback = None
_watcher_is_active = None
def close(self):
if self._watcher is None:
return
self.stop()
_watcher = self._watcher
self._watcher = None
_watcher.data = self._FFI.NULL # pylint: disable=no-member
self._watcher_ffi_close(_watcher)
self.loop = None
def __enter__(self):
return self
def __exit__(self, t, v, tb):
self.close()
if GEVENT_DEBUG_LEVEL >= CRITICAL:
def __del__(self):
if self._watcher:
tb = get_object_traceback(self)
tb_msg = ''
if tb is not None:
tb_msg = '\n'.join(tb.format())
tb_msg = '\nTraceback:\n' + tb_msg
warnings.warn("Failed to close watcher %r%s" % (self, tb_msg),
ResourceWarning)
self.close()
# this is not needed, since we keep alive the watcher while it's started
#def __del__(self):
# self._watcher_stop(self.loop._ptr, self._watcher)
def __repr__(self):
formats = self._format()
......@@ -359,14 +388,20 @@ class watcher(object):
self.callback = callback
self.args = args or _NOARGS
self.loop._keepaliveset.add(self)
self._handle = self._watcher.data = type(self).new_handle(self) # pylint:disable=no-member
self._watcher_ffi_start()
self._watcher_ffi_start_unref()
def stop(self):
if self._callback is None:
assert self.loop is None or self not in self.loop._keepaliveset
return
_dbg("Main stop for", self, "keepalive?", self in self.loop._keepaliveset)
self._watcher_ffi_stop_ref()
self._watcher_ffi_stop()
self.loop._keepaliveset.discard(self)
self._handle = None
self._watcher.data = self._FFI.NULL
_dbg("Finished main stop for", self, "keepalive?", self in self.loop._keepaliveset)
self.callback = None
self.args = None
......@@ -414,9 +449,6 @@ class IoMixin(object):
args = (GEVENT_CORE_EVENTS, ) + args
super(IoMixin, self).start(callback, *args)
def close(self):
pass
def _format(self):
return ' fd=%d' % self._fd
......
......@@ -33,24 +33,24 @@ class GreenFileDescriptorIO(RawIOBase):
self._closefd = closefd
self._fileno = fileno
make_nonblocking(fileno)
self._readable = 'r' in mode
self._writable = 'w' in mode
readable = 'r' in mode
writable = 'w' in mode
self.hub = get_hub()
io_watcher = self.hub.loop.io
if self._readable:
if readable:
self._read_event = io_watcher(fileno, 1)
if self._writable:
if writable:
self._write_event = io_watcher(fileno, 2)
self._seekable = None
def readable(self):
return self._readable
return self._read_event is not None
def writable(self):
return self._writable
return self._write_event is not None
def seekable(self):
if self._seekable is None:
......@@ -73,11 +73,20 @@ class GreenFileDescriptorIO(RawIOBase):
if self._closed:
return
self.flush()
# TODO: Can we use 'read_event is not None and write_event is
# not None' to mean _closed?
self._closed = True
if self._readable:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
if self._writable:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
read_event = self._read_event
write_event = self._write_event
self._read_event = self._write_event = None
if read_event is not None:
self.hub.cancel_wait(read_event, cancel_wait_ex)
read_event.close()
if write_event is not None:
self.hub.cancel_wait(write_event, cancel_wait_ex)
write_event.close()
fileno = self._fileno
if self._closefd:
self._fileno = None
......@@ -91,7 +100,7 @@ class GreenFileDescriptorIO(RawIOBase):
# this was fixed in Python 3.3, but we still need our workaround for 2.7. See
# https://github.com/gevent/gevent/issues/675)
def __read(self, n):
if not self._readable:
if self._read_event is None:
raise UnsupportedOperation('read')
while True:
try:
......@@ -123,7 +132,7 @@ class GreenFileDescriptorIO(RawIOBase):
return n
def write(self, b):
if not self._writable:
if self._write_event is None:
raise UnsupportedOperation('write')
while True:
try:
......
......@@ -174,15 +174,12 @@ class socket(object):
"""
if watcher.callback is not None:
raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
if self.timeout is not None:
timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
else:
timeout = None
timeout = Timeout._start_new_or_dummy(self.timeout, timeout_exc, ref=False)
try:
self.hub.wait(watcher)
finally:
if timeout is not None:
timeout.cancel()
timeout.close()
def accept(self):
sock = self._sock
......@@ -207,12 +204,10 @@ class socket(object):
# tied to, is now free to be reused, so these objects are no longer functional.
if self._read_event is not None:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self._read_event.close()
self.hub.cancel_wait(self._read_event, cancel_wait_ex, True)
self._read_event = None
if self._write_event is not None:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
self._write_event.close()
self.hub.cancel_wait(self._write_event, cancel_wait_ex, True)
self._write_event = None
s = self._sock
self._sock = _closedsocket()
......@@ -231,10 +226,8 @@ class socket(object):
if isinstance(address, tuple):
r = getaddrinfo(address[0], address[1], sock.family)
address = r[0][-1]
if self.timeout is not None:
timer = Timeout.start_new(self.timeout, timeout('timed out'))
else:
timer = None
timer = Timeout._start_new_or_dummy(self.timeout, timeout('timed out'))
try:
while True:
err = sock.getsockopt(SOL_SOCKET, SO_ERROR)
......@@ -248,8 +241,7 @@ class socket(object):
else:
raise error(result, strerror(result))
finally:
if timer is not None:
timer.cancel()
timer.close()
def connect_ex(self, address):
try:
......
......@@ -148,15 +148,12 @@ class socket(object):
"""
if watcher.callback is not None:
raise _socketcommon.ConcurrentObjectUseError('This socket is already used by another greenlet: %r' % (watcher.callback, ))
if self.timeout is not None:
timeout = Timeout.start_new(self.timeout, timeout_exc, ref=False)
else:
timeout = None
timer = Timeout._start_new_or_dummy(self.timeout, timeout_exc, ref=False)
try:
self.hub.wait(watcher)
finally:
if timeout is not None:
timeout.cancel()
timer.close()
def dup(self):
"""dup() -> socket object
......@@ -251,12 +248,10 @@ class socket(object):
# objects are no longer functional.
if self._read_event is not None:
self.hub.cancel_wait(self._read_event, cancel_wait_ex)
self._read_event.close()
self.hub.cancel_wait(self._read_event, cancel_wait_ex, True)
self._read_event = None
if self._write_event is not None:
self.hub.cancel_wait(self._write_event, cancel_wait_ex)
self._write_event.close()
self.hub.cancel_wait(self._write_event, cancel_wait_ex, True)
self._write_event = None
_ss.close(self._sock)
......@@ -301,11 +296,8 @@ class socket(object):
if isinstance(address, tuple):
r = getaddrinfo(address[0], address[1], self.family)
address = r[0][-1]
if self.timeout is not None:
timer = Timeout.start_new(self.timeout, timeout('timed out'))
else:
timer = None
try:
with Timeout._start_new_or_dummy(self.timeout, timeout("timed out")):
while True:
err = self.getsockopt(SOL_SOCKET, SO_ERROR)
if err:
......@@ -317,9 +309,6 @@ class socket(object):
self._wait(self._write_event)
else:
raise error(result, strerror(result))
finally:
if timer is not None:
timer.cancel()
def connect_ex(self, address):
try:
......
......@@ -178,6 +178,10 @@ class _fileobject(object):
def __getattr__(self, name):
return getattr(self._fobj, name)
def close(self):
self._fobj.close()
self._sock.close()
def write(self, data):
if not isinstance(data, bytes):
data = data.encode('utf-8')
......
......@@ -163,9 +163,11 @@ class BaseServer(object):
def stop_accepting(self):
if self._watcher is not None:
self._watcher.stop()
self._watcher.close()
self._watcher = None
if self._timer is not None:
self._timer.stop()
self._timer.close()
self._timer = None
def do_handle(self, *args):
......
......@@ -219,6 +219,7 @@ class Greenlet(greenlet):
# variable copy of that list (in _run_callbacks). This isn't a problem,
# except for the leak-tests.
self._start_event.stop()
self._start_event.close()
def __handle_death_before_start(self, *args):
# args is (t, v, tb) or simply t or v
......@@ -620,6 +621,7 @@ class _dummy_event(object):
def start(self, cb): # pylint:disable=unused-argument
raise AssertionError("Cannot start the dummy event")
close = stop
_cancelled_start_event = _dummy_event()
_start_completed_event = _dummy_event()
......
......@@ -166,7 +166,8 @@ def sleep(seconds=0, ref=True):
loop.run_callback(waiter.switch)
waiter.get()
else:
hub.wait(loop.timer(seconds, ref=ref))
with loop.timer(seconds, ref=ref) as t:
hub.wait(t)
def idle(priority=0):
......@@ -661,17 +662,29 @@ class Hub(RawGreenlet):
finally:
watcher.stop()
def cancel_wait(self, watcher, error):
def cancel_wait(self, watcher, error, close_watcher=False):
"""
Cancel an in-progress call to :meth:`wait` by throwing the given *error*
in the waiting greenlet.
.. versionchanged:: 1.3a1
Added the *close_watcher* parameter. If true, the watcher
will be closed after the exception is thrown.
"""
if watcher.callback is not None:
self.loop.run_callback(self._cancel_wait, watcher, error)
def _cancel_wait(self, watcher, error):
if watcher.active:
switch = watcher.callback
self.loop.run_callback(self._cancel_wait, watcher, error, close_watcher)
elif close_watcher:
watcher.close()
def _cancel_wait(self, watcher, error, close_watcher):
# We have to check again to see if it was still active by the time
# our callback actually runs.
active = watcher.active
cb = watcher.callback
if close_watcher:
watcher.close()
if active:
switch = cb
if switch is not None:
greenlet = getattr(switch, '__self__', None)
if greenlet is not None:
......@@ -731,6 +744,7 @@ class Hub(RawGreenlet):
finally:
if timeout is not None:
timeout.stop()
timeout.close()
return False
def destroy(self, destroy_loop=None):
......@@ -995,7 +1009,7 @@ def iwait(objects, timeout=None, count=None):
yield item
finally:
if timeout is not None:
timer.stop()
timer.close()
for aobj in objects:
unlink = getattr(aobj, 'unlink', None)
if unlink:
......
......@@ -602,6 +602,8 @@ cdef public class callback [object PyGeventCallbackObject, type PyGeventCallback
self.callback = None
self.args = None
close = stop
# Note, that __nonzero__ and pending are different
# nonzero is used in contexts where we need to know whether to schedule another callback,
# so it's true if it's pending or currently running
......@@ -791,6 +793,16 @@ cdef public class watcher [object PyGeventWatcherObject, type PyGeventWatcher_Ty
def _format(self):
return ''
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, t, v, tb):
self.close()
return
cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:
......@@ -813,8 +825,6 @@ cdef public class io(watcher) [object PyGeventIOObject, type PyGeventIO_Type]:
PENDING
def close(self):
pass
#ifdef _WIN32
......
......@@ -140,7 +140,7 @@ class watcher(_base.watcher):
@property
def pending(self):
return True if libev.ev_is_pending(self._watcher) else False
return True if self._watcher and libev.ev_is_pending(self._watcher) else False
class io(_base.IoMixin, watcher):
......@@ -222,15 +222,34 @@ class async_(_base.AsyncMixin, watcher):
# Provide BWC for those that have async
locals()['async'] = async_
class _ClosedWatcher(object):
__slots__ = ('pid', 'rpid', 'rstatus')
def __init__(self, other):
self.pid = other.pid
self.rpid = other.rpid
self.rstatus = other.rstatus
def __bool__(self):
return False
__nonzero__ = __bool__
class child(_base.ChildMixin, watcher):
_watcher_type = 'child'
def close(self):
# Capture the properties we defer to our _watcher, because
# we're about to discard it.
closed_watcher = _ClosedWatcher(self._watcher)
super(child, self).close()
self._watcher = closed_watcher
@property
def pid(self):
return self._watcher.pid
@property
def rpid(self, ):
def rpid(self):
return self._watcher.rpid
@rpid.setter
......
......@@ -380,6 +380,10 @@ class io(_base.IoMixin, watcher):
self._calc_and_update_events()
return watcher
def close(self):
super(io, self).close()
del self._multiplex_watchers
def _multiplex_closed(self, watcher):
self._multiplex_watchers.remove(watcher)
if not self._multiplex_watchers:
......@@ -396,9 +400,7 @@ class io(_base.IoMixin, watcher):
# *and* we've opened a new watcher for the fd, that watcher will
# suddenly and mysteriously stop seeing events. So we do this now;
# this method is smart enough not to close the handle twice.
self._watcher_ffi_close(self._watcher)
self._watcher = None
del self._multiplex_watchers
self.close()
else:
self._calc_and_update_events()
_dbg("IO Watcher", self, "has remaining multiplex:",
......@@ -450,7 +452,11 @@ class _SimulatedWithAsyncMixin(object):
def __init__(self, loop, *args, **kwargs):
self._async = loop.async_()
try:
super(_SimulatedWithAsyncMixin, self).__init__(loop, *args, **kwargs)
except:
self._async.close()
raise
def _watcher_create(self, _args):
return
......@@ -482,6 +488,12 @@ class _SimulatedWithAsyncMixin(object):
self.args = None
self._async.stop()
def close(self):
if self._async is not None:
a = self._async
#self._async = None
a.close()
def _register_loop_callback(self):
# called from start()
raise NotImplementedError()
......
......@@ -231,6 +231,7 @@ if hasattr(os, 'fork'):
# XXX: Could handle tracing here by not stopping
# until the pid is terminated
watcher.stop()
try:
_watched_children[watcher.pid] = (watcher.pid, watcher.rstatus, time.time())
if callback:
callback(watcher)
......@@ -238,6 +239,8 @@ if hasattr(os, 'fork'):
_on_child_hook()
# now is as good a time as any to reap children
_reap_children()
finally:
watcher.close()
def _reap_children(timeout=60):
# Remove all the dead children that haven't been waited on
......@@ -311,7 +314,7 @@ if hasattr(os, 'fork'):
# pass through to the OS.
if pid == -1 and options == 0:
hub = get_hub()
watcher = hub.loop.child(0, False)
with hub.loop.child(0, False) as watcher:
hub.wait(watcher)
return watcher.rpid, watcher.rstatus
# There were funky options/pid, so we must go to the OS.
......@@ -336,7 +339,7 @@ if hasattr(os, 'fork'):
watcher = _watched_children[pid]
# We can't start a watcher that's already started,
# so we can't reuse the existing watcher.
new_watcher = watcher.loop.child(pid, False)
with watcher.loop.child(pid, False) as new_watcher:
get_hub().wait(new_watcher)
# Ok, so now the new watcher is done. That means
# the old watcher's callback (_on_child) should
......
......@@ -171,8 +171,11 @@ class StreamServer(BaseServer):
def wrap_socket_and_handle(self, client_socket, address):
# used in case of ssl sockets
try:
ssl_socket = self.wrap_socket(client_socket, **self.ssl_args)
return self.handle(ssl_socket, address)
finally:
ssl_socket.close()
class DatagramServer(BaseServer):
......
......@@ -29,7 +29,11 @@ class ThreadPool(GroupMappingMixin):
self.manager = None
self.pid = os.getpid()
self.fork_watcher = hub.loop.fork(ref=False)
try:
self._init(maxsize)
except:
self.fork_watcher.close()
raise
def _set_maxsize(self, maxsize):
if not isinstance(maxsize, integer_types):
......@@ -110,6 +114,7 @@ class ThreadPool(GroupMappingMixin):
def kill(self):
self.size = 0
self.fork_watcher.close()
def _adjust_step(self):
# if there is a possibility & necessity for adding a thread, do it
......@@ -288,6 +293,7 @@ class ThreadResult(object):
def _on_async(self):
self.async_watcher.stop()
self.async_watcher.close()
if self._call_when_ready:
# Typically this is pool.semaphore.release and we have to
# call this in the Hub; if we don't we get the dreaded
......@@ -311,6 +317,7 @@ class ThreadResult(object):
def destroy(self):
if self.async_watcher is not None:
self.async_watcher.stop()
self.async_watcher.close()
self.async_watcher = None
self.context = None
self.hub = None
......
......@@ -39,6 +39,14 @@ class _FakeTimer(object):
def cancel(self):
return
stop = close = cancel
def __enter__(self):
return self
def __exit__(self, _t, _v, _tb):
return
_FakeTimer = _FakeTimer()
......@@ -124,10 +132,12 @@ class Timeout(BaseException):
Add warning about negative *seconds* values.
"""
def __init__(self, seconds=None, exception=None, ref=True, priority=-1, _use_timer=True):
def __init__(self, seconds=None, exception=None, ref=True, priority=-1,
_use_timer=True, _one_shot=False):
BaseException.__init__(self)
self.seconds = seconds
self.exception = exception
self._one_shot = _one_shot
if seconds is None or not _use_timer:
# Avoid going through the timer codepath if no timeout is
# desired; this avoids some CFFI interactions on PyPy that can lead to a
......@@ -159,7 +169,7 @@ class Timeout(BaseException):
self.timer.start(getcurrent().throw, throws)
@classmethod
def start_new(cls, timeout=None, exception=None, ref=True):
def start_new(cls, timeout=None, exception=None, ref=True, _one_shot=False):
"""Create a started :class:`Timeout`.
This is a shortcut, the exact action depends on *timeout*'s type:
......@@ -175,12 +185,12 @@ class Timeout(BaseException):
if not timeout.pending:
timeout.start()
return timeout
timeout = cls(timeout, exception, ref=ref)
timeout = cls(timeout, exception, ref=ref, _one_shot=_one_shot)
timeout.start()
return timeout
@staticmethod
def _start_new_or_dummy(timeout, exception=None):
def _start_new_or_dummy(timeout, exception=None, ref=True):
# Internal use only in 1.1
# Return an object with a 'cancel' method; if timeout is None,
# this will be a shared instance object that does nothing. Otherwise,
......@@ -194,7 +204,7 @@ class Timeout(BaseException):
# under PyPy in synthetic benchmarks it makes no difference.
if timeout is None:
return _FakeTimer
return Timeout.start_new(timeout, exception)
return Timeout.start_new(timeout, exception, ref, _one_shot=True)
@property
def pending(self):
......@@ -204,6 +214,12 @@ class Timeout(BaseException):
def cancel(self):
"""If the timeout is pending, cancel it. Otherwise, do nothing."""
self.timer.stop()
if self._one_shot:
self.close()
def close(self):
self.timer.stop()
self.timer.close()
def __repr__(self):
classname = type(self).__name__
......@@ -241,9 +257,15 @@ class Timeout(BaseException):
return self
def __exit__(self, typ, value, tb):
self.cancel()
"""
Stop the timer.
.. versionchanged:: 1.3a1
The underlying native timer is also stopped. This object cannot be
used again.
"""
self.close()
if value is self and self.exception is False:
return True
return True # Suppress the exception
def with_timeout(seconds, function, *args, **kwds):
......@@ -256,7 +278,7 @@ def with_timeout(seconds, function, *args, **kwds):
Keyword argument *timeout_value* is not passed to *function*.
"""
timeout_value = kwds.pop("timeout_value", _NONE)
timeout = Timeout.start_new(seconds)
timeout = Timeout.start_new(seconds, _one_shot=True)
try:
try:
return function(*args, **kwds)
......
......@@ -24,6 +24,7 @@ import collections
import types
from functools import wraps
import gevent
import gevent.core
......@@ -119,6 +120,7 @@ def wrap_refcount(method):
break
elif len(deltas) >= 3 and deltas[-1] > 0 and deltas[-1] == deltas[-2] and deltas[-2] == deltas[-3]:
diff = report_diff(hist_before, hist_after)
print(gevent.get_hub().loop._active_watchers)
raise AssertionError('refcount increased by %r\n%s' % (deltas, diff))
# OK, we don't know for sure yet. Let's search for more
if sum(deltas[-3:]) <= 0 or sum(deltas[-4:]) <= 0 or deltas[-4:].count(0) >= 2:
......
......@@ -74,7 +74,7 @@ class _DelayWaitMixin(object):
self.wait(timeout=1)
self.assertIs(exc.exception, timeout)
finally:
timeout.cancel()
timeout.close()
class AbstractGenericWaitTestCase(_DelayWaitMixin, TestCase):
......@@ -116,13 +116,18 @@ class AbstractGenericGetTestCase(_DelayWaitMixin, TestCase):
self._wait_and_check(timeout=timeout)
except gevent.Timeout as ex:
self.assertIs(ex, timeout)
finally:
timeout.close()
self.cleanup()
def test_raises_timeout_Timeout_exc_customized(self):
error = RuntimeError('expected error')
timeout = gevent.Timeout(self._default_wait_timeout, exception=error)
try:
with self.assertRaises(RuntimeError) as exc:
self._wait_and_check(timeout=timeout)
self.assertIs(exc.exception, error)
self.cleanup()
finally:
timeout.close()
......@@ -35,6 +35,7 @@ class TestCoreStat(greentest.TestCase):
self.watcher = self.hub.loop.stat(self.temp_path, interval=-1)
def tearDown(self):
self.watcher.close()
if os.path.exists(self.temp_path):
os.unlink(self.temp_path)
super(TestCoreStat, self).tearDown()
......@@ -68,7 +69,7 @@ class TestCoreStat(greentest.TestCase):
def _wait_on_greenlet(self, func, *greenlet_args):
start = time.time()
self.hub.loop.update()
self.hub.loop.update_now()
greenlet = gevent.spawn_later(DELAY, func, *greenlet_args)
with gevent.Timeout(5 + DELAY + 0.5):
self.hub.wait(self.watcher)
......
from __future__ import print_function
from gevent import core
called = []
from greentest import TestCase
from greentest import main
from greentest import LARGE_TIMEOUT
from greentest.sysinfo import CFFI_BACKEND
def f(x=None):
called.append(1)
class Test(TestCase):
__timeout__ = LARGE_TIMEOUT
repeat = 0
def setUp(self):
self.called = []
self.loop = core.loop(default=True)
self.timer = self.loop.timer(0.001, repeat=self.repeat)
def tearDown(self):
self.timer.close()
def f(self, x=None):
self.called.append(1)
if x is not None:
x.stop()
def assertTimerInKeepalive(self):
if CFFI_BACKEND:
self.assertIn(self.timer, self.loop._keepaliveset)
def main():
loop = core.loop(default=True)
x = loop.timer(0.001)
x.start(f)
if hasattr(loop, '_keepaliveset'):
assert x in loop._keepaliveset
assert x.active, ("active", x.active, "pending", x.pending)
try:
def assertTimerNotInKeepalive(self):
if CFFI_BACKEND:
self.assertNotIn(self.timer, self.loop._keepaliveset)
def test_main(self):
loop = self.loop
x = self.timer
x.start(self.f)
self.assertTimerInKeepalive()
self.assertTrue(x.active, x)
with self.assertRaises((AttributeError, ValueError)):
x.priority = 1
raise AssertionError('must not be able to change priority of active watcher')
except (AttributeError, ValueError):
pass
loop.run()
assert x.pending == 0, x.pending
assert called == [1], called
assert x.callback is None, x.callback
assert x.args is None, x.args
self.assertEqual(x.pending, 0)
self.assertEqual(self.called, [1])
self.assertIsNone(x.callback)
self.assertIsNone(x.args)
if x.priority is not None:
assert x.priority == 0, (x, x.priority)
self.assertEqual(x.priority, 0)
x.priority = 1
assert x.priority == 1, x
self.assertEqual(x.priority, 1)
x.stop()
if hasattr(loop, '_keepaliveset'):
assert x not in loop._keepaliveset
self.assertTimerNotInKeepalive()
class TestAgain(Test):
repeat = 1
def test_main(self):
# Again works for a new timer
x = loop.timer(0.001, repeat=1)
x.again(f, x)
if hasattr(loop, '_keepaliveset'):
assert x in loop._keepaliveset
x = self.timer
x.again(self.f, x)
self.assertTimerInKeepalive()
assert x.args == (x,), x.args
loop.run()
assert called == [1, 1], called
self.assertEqual(x.args, (x,))
self.loop.run()
self.assertEqual(self.called, [1])
x.stop()
if hasattr(loop, '_keepaliveset'):
assert x not in loop._keepaliveset
self.assertTimerNotInKeepalive()
if __name__ == '__main__':
import sys
gettotalrefcount = getattr(sys, 'gettotalrefcount', None)
called[:] = []
if gettotalrefcount is not None:
print(gettotalrefcount())
main()
called[:] = []
if gettotalrefcount is not None:
print(gettotalrefcount())
......@@ -119,7 +119,7 @@ class TestAsyncResult(greentest.TestCase):
result, X,
'Nobody sent anything to event2 yet it received %r' % (result, ))
finally:
t.cancel()
t.close()
g.kill()
def test_nonblocking_get(self):
......
......@@ -99,14 +99,19 @@ class TestGreenIo(TestCase):
# XXX: This is not exactly true on Python 3.
# This produces a ResourceWarning.
oconn = None
try:
conn, _ = listener.accept()
if PY3:
oconn = conn
conn = conn.makefile(mode='wb')
conn.write(b'hello\n')
conn.close()
_write_to_closed(conn, b'a')
finally:
listener.close()
if oconn is not None:
oconn.close()
server = tcp_listener(('0.0.0.0', 0))
gevent.spawn(accept_once, server)
......@@ -116,7 +121,7 @@ class TestGreenIo(TestCase):
assert fd.read() == 'hello\n'
assert fd.read() == ''
timer.cancel()
timer.close()
if __name__ == '__main__':
main()
......@@ -118,11 +118,8 @@ class TestCoroutinePool(unittest.TestCase):
gevent.sleep(0)
self.assertEqual(pool.free_count(), 1)
# shouldn't block when trying to get
t = gevent.Timeout.start_new(0.1)
try:
with gevent.Timeout.start_new(0.1):
pool.apply(gevent.sleep, (0, ))
finally:
t.cancel()
finally:
sys.stderr = normal_err
pool.join()
......@@ -195,7 +192,7 @@ class PoolBasicTests(greentest.TestCase):
else:
raise AssertionError('Expected timeout')
finally:
timeout.cancel()
timeout.close()
self.assertEqual(p.free_count(), 0)
self.assertEqual(len(p), 1)
finally:
......
......@@ -31,7 +31,7 @@ class Test(greentest.TestCase):
gevent.sleep(0.001)
sock.close()
receiver.join(timeout=0.1)
assert receiver.ready(), receiver
self.assertTrue(receiver.ready(), receiver)
self.assertEqual(receiver.value, None)
self.assertIsInstance(receiver.exception, socket.error)
self.assertEqual(receiver.exception.errno, socket.EBADF)
......
......@@ -246,11 +246,10 @@ class Test(greentest.TestCase):
Thread = monkey.get_original('threading', 'Thread')
def fn():
try:
with self.assertRaises(TypeError) as exc:
gevent.subprocess.Popen('echo 123', shell=True)
raise AssertionError("Should not be able to construct Popen")
except Exception as e:
ex.append(e)
ex.append(exc.exception)
thread = Thread(target=fn)
thread.start()
......
......@@ -86,7 +86,8 @@ class PoolBasicTests(TestCase):
def test_init_valueerror(self):
self.switch_expected = False
self.assertRaises(ValueError, ThreadPool, -1)
with self.assertRaises(ValueError):
ThreadPool(-1)
self.pool = None
#
......
......@@ -30,21 +30,23 @@ class TestDirectRaise(greentest.TestCase):
class Test(greentest.TestCase):
def _test(self, timeout):
def _test(self, timeout, close):
try:
get_hub().switch()
self.fail('Must raise Timeout')
except gevent.Timeout as ex:
if ex is not timeout:
raise
if close:
ex.close()
return ex
def _check_expires(self, timeout):
timeout.start()
self._test(timeout)
self._test(timeout, False)
# Restart
timeout.start()
return self._test(timeout)
return self._test(timeout, True)
def test_expires(self):
timeout = gevent.Timeout(SHOULD_EXPIRE)
......@@ -75,7 +77,7 @@ class Test(greentest.TestCase):
self.fail("Most raise TypeError")
except TypeError as ex:
self.assert_type_err(ex)
timeout.cancel()
timeout.close()
class OldStyle:
pass
......@@ -91,7 +93,7 @@ class Test(greentest.TestCase):
self.assertTrue(greentest.PY2, "Old style classes can only be raised on Py2")
t = sys.exc_info()[0]
self.assertEqual(t, OldStyle)
timeout.cancel()
timeout.close()
timeout = gevent.Timeout(SHOULD_EXPIRE, OldStyle()) # instance
timeout.start()
......@@ -105,7 +107,7 @@ class Test(greentest.TestCase):
self.assertTrue(greentest.PY2, "Old style classes can only be raised on Py2")
t = sys.exc_info()[0]
self.assertEqual(t, OldStyle)
timeout.cancel()
timeout.close()
def _check_context_manager_expires(self, timeout, raises=True):
try:
......@@ -140,6 +142,7 @@ class Test(greentest.TestCase):
timeout.cancel()
gevent.sleep(SHOULD_NOT_EXPIRE)
assert not timeout.pending, timeout
timeout.close()
def test_with_timeout(self):
self.assertRaises(gevent.Timeout, gevent.with_timeout, SHOULD_EXPIRE, gevent.sleep, SHOULD_NOT_EXPIRE)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment