Commit 61432c16 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1582 from gevent/combine-tests

Group some tests in the same process
parents 3f41248c d5f8e220
Fix destroying the libuv default loop and then using the default loop
again.
libuv loops that have watched children can now exit. Previously, the
SIGCHLD watcher kept the loop alive even if there were no longer any
watched children.
......@@ -30,6 +30,12 @@ class _EVENTSType(object):
EVENTS = GEVENT_CORE_EVENTS = _EVENTSType()
class _DiscardedSet(frozenset):
__slots__ = ()
def discard(self, o):
"Does nothing."
#####
## Note on CFFI objects, callbacks and the lifecycle of watcher objects
#
......@@ -383,6 +389,8 @@ class AbstractLoop(object):
# whether they were the default loop.
_default = None
_keepaliveset = _DiscardedSet()
def __init__(self, ffi, lib, watchers, flags=None, default=None):
self._ffi = ffi
self._lib = lib
......@@ -397,10 +405,8 @@ class AbstractLoop(object):
def _init_loop_and_aux_watchers(self, flags=None, default=None):
self._ptr = self._init_loop(flags, default)
# self._check is a watcher that runs in each iteration of the
# mainloop, just after the blocking call. It's point is to handle
# signals. It doesn't run watchers or callbacks, it just exists to give
......@@ -541,12 +547,13 @@ class AbstractLoop(object):
raise NotImplementedError()
def destroy(self):
if self._ptr:
ptr = self.ptr
if ptr:
try:
if not self._can_destroy_loop(self._ptr):
if not self._can_destroy_loop(ptr):
return False
self._stop_aux_watchers()
self._destroy_loop(self._ptr)
self._destroy_loop(ptr)
finally:
# not ffi.NULL, we don't want something that can be
# passed to C and crash later. This will create nice friendly
......@@ -566,6 +573,7 @@ class AbstractLoop(object):
@property
def ptr(self):
# Use this when you need to be sure the pointer is valid.
return self._ptr
@property
......@@ -650,7 +658,7 @@ class AbstractLoop(object):
@property
def default(self):
return self._default if self._ptr else False
return self._default if self.ptr else False
@property
def iteration(self):
......@@ -730,9 +738,11 @@ class AbstractLoop(object):
return cb
def _format(self):
if not self._ptr:
ptr = self.ptr
if not ptr:
return 'destroyed'
msg = self.backend
msg = "backend=" + self.backend
msg += ' ptr=' + str(ptr)
if self.default:
msg += ' default'
msg += ' pending=%s' % self.pendingcnt
......@@ -759,6 +769,6 @@ class AbstractLoop(object):
@property
def activecnt(self):
if not self._ptr:
if not self.ptr:
raise ValueError('operation on destroyed loop')
return 0
......@@ -268,7 +268,7 @@ class watcher(object):
raise NotImplementedError()
def _watcher_ffi_stop(self):
self._watcher_stop(self.loop._ptr, self._watcher)
self._watcher_stop(self.loop.ptr, self._watcher)
def _watcher_ffi_ref(self):
raise NotImplementedError()
......@@ -345,9 +345,18 @@ class watcher(object):
# may fail if __init__ did; will be harmlessly printed
self.close()
__in_repr = False
def __repr__(self):
formats = self._format()
result = "<%s at 0x%x%s" % (self.__class__.__name__, id(self), formats)
basic = "<%s at 0x%x" % (self.__class__.__name__, id(self))
if self.__in_repr:
return basic + '>'
# Running child watchers have been seen to have a
# recursive repr in ``self.args``, thanks to ``gevent.os.fork_and_watch``
# passing the watcher as an argument to its callback.
self.__in_repr = True
try:
result = '%s%s' % (basic, self._format())
if self.pending:
result += " pending"
if self.callback is not None:
......@@ -364,6 +373,8 @@ class watcher(object):
result += " handle=%s" % (self._watcher_handle)
result += " ref=%s" % (self.ref)
return result + ">"
finally:
self.__in_repr = False
@property
def _watcher_handle(self):
......
cimport cython
from gevent.__waiter cimport Waiter
from gevent._event cimport Event
from gevent.__hub_local cimport get_hub_noargs as get_hub
cdef bint _greenlet_imported
cdef _heappush
cdef _heappop
cdef _heapify
cdef _Empty
cdef _Full
cdef Timeout
cdef InvalidSwitchError
cdef extern from "greenlet/greenlet.h":
ctypedef class greenlet.greenlet [object PyGreenlet]:
pass
# These are actually macros and so much be included
# (defined) in each .pxd, as are the two functions
# that call them.
greenlet PyGreenlet_GetCurrent()
void PyGreenlet_Import()
cdef inline greenlet getcurrent():
return PyGreenlet_GetCurrent()
cdef inline void greenlet_init():
global _greenlet_imported
if not _greenlet_imported:
PyGreenlet_Import()
_greenlet_imported = True
@cython.final
cdef _safe_remove(deq, item)
@cython.final
@cython.internal
cdef class ItemWaiter(Waiter):
cdef readonly item
cdef readonly queue
cdef _safe_remove(deq, item)
cdef class Queue:
cdef __weakref__
......@@ -33,6 +54,7 @@ cdef class Queue:
cpdef Py_ssize_t qsize(self)
cpdef bint empty(self)
cpdef bint full(self)
cpdef _create_queue(self, items=*)
cpdef put(self, item, block=*, timeout=*)
cpdef put_nowait(self, item)
......@@ -46,6 +68,13 @@ cdef class Queue:
cdef _schedule_unlock(self)
@cython.final
@cython.internal
cdef class ItemWaiter(Waiter):
cdef readonly item
cdef readonly Queue queue
@cython.final
cdef class UnboundQueue(Queue):
pass
......
......@@ -101,6 +101,7 @@ static void gevent_uv_walk_callback_close(uv_handle_t* handle, void* arg)
{
if( handle && !uv_is_closing(handle) ) {
uv_close(handle, NULL);
handle->data = NULL;
}
}
......
......@@ -117,7 +117,7 @@ class loop(AbstractLoop):
self._io_watchers = dict()
self._fork_watchers = set()
self._pid = os.getpid()
self._default = self._ptr == libuv.uv_default_loop()
self._default = (self._ptr == libuv.uv_default_loop())
self._queued_callbacks = []
def _queue_callback(self, watcher_ptr, revents):
......@@ -148,8 +148,18 @@ class loop(AbstractLoop):
_signal_idle = None
@property
def ptr(self):
if not self._ptr:
return None
if self._ptr and not self._ptr.data:
# Another instance of the Python loop destroyed
# the C loop. It was probably the default.
self._ptr = None
return self._ptr
def _init_and_start_check(self):
libuv.uv_check_init(self._ptr, self._check)
libuv.uv_check_init(self.ptr, self._check)
libuv.uv_check_start(self._check, libuv.python_check_callback)
libuv.uv_unref(self._check)
......@@ -167,7 +177,7 @@ class loop(AbstractLoop):
# scheduled, this should also be the same and unnecessary?
# libev does takes this basic approach on Windows.
self._signal_idle = ffi.new("uv_timer_t*")
libuv.uv_timer_init(self._ptr, self._signal_idle)
libuv.uv_timer_init(self.ptr, self._signal_idle)
self._signal_idle.data = self._handle_to_self
sig_cb = ffi.cast('void(*)(uv_timer_t*)', libuv.python_check_callback)
libuv.uv_timer_start(self._signal_idle,
......@@ -210,12 +220,12 @@ class loop(AbstractLoop):
super(loop, self)._run_callbacks()
def _init_and_start_prepare(self):
libuv.uv_prepare_init(self._ptr, self._prepare)
libuv.uv_prepare_init(self.ptr, self._prepare)
libuv.uv_prepare_start(self._prepare, libuv.python_prepare_callback)
libuv.uv_unref(self._prepare)
def _init_callback_timer(self):
libuv.uv_check_init(self._ptr, self._timer0)
libuv.uv_check_init(self.ptr, self._timer0)
def _stop_callback_timer(self):
libuv.uv_check_stop(self._timer0)
......@@ -328,47 +338,60 @@ class loop(AbstractLoop):
self._start_callback_timer()
libuv.uv_ref(self._timer0)
def _can_destroy_loop(self, ptr):
# We're being asked to destroy a loop that's,
# at the time it was constructed, was the default loop.
# If loop objects were constructed more than once,
# it may have already been destroyed, though.
# We track this in the data member.
return ptr.data
def _destroy_loop(self, ptr):
ptr.data = ffi.NULL
libuv.uv_stop(ptr)
return ptr
libuv.gevent_close_all_handles(ptr)
def __close_loop(self, ptr):
closed_failed = 1
while closed_failed:
closed_failed = libuv.uv_loop_close(ptr)
if closed_failed:
assert closed_failed == libuv.UV_EBUSY
if not closed_failed:
break
if closed_failed != libuv.UV_EBUSY:
raise SystemError("Unknown close failure reason", closed_failed)
# We already closed all the handles. Run the loop
# once to let them be cut off from the loop.
ran_has_more_callbacks = libuv.uv_run(ptr, libuv.UV_RUN_ONCE)
if ran_has_more_callbacks:
libuv.uv_run(ptr, libuv.UV_RUN_NOWAIT)
closed_failed = libuv.uv_loop_close(ptr)
assert closed_failed == 0, closed_failed
def _destroy_loop(self, ptr):
# We're being asked to destroy a loop that's, potentially, at
# the time it was constructed, was the default loop. If loop
# objects were constructed more than once, it may have already
# been destroyed, though. We track this in the data member.
data = ptr.data
ptr.data = ffi.NULL
try:
if data:
libuv.uv_stop(ptr)
libuv.gevent_close_all_handles(ptr)
finally:
ptr.data = ffi.NULL
try:
if data:
self.__close_loop(ptr)
finally:
# Destroy the native resources *after* we have closed
# the loop. If we do it before, walking the handles
# attached to the loop is likely to segfault.
# Note that these may have been closed already if the default loop was shared.
if data:
libuv.gevent_zero_check(self._check)
libuv.gevent_zero_check(self._timer0)
libuv.gevent_zero_prepare(self._prepare)
libuv.gevent_zero_timer(self._signal_idle)
libuv.gevent_zero_loop(ptr)
del self._check
del self._prepare
del self._signal_idle
del self._timer0
libuv.gevent_zero_loop(ptr)
# Destroy any watchers we're still holding on to.
del self._io_watchers
del self._fork_watchers
......@@ -402,7 +425,7 @@ class loop(AbstractLoop):
libuv.uv_is_active(handle),
libuv.uv_is_closing(handle)))
libuv.uv_walk(self._ptr,
libuv.uv_walk(self.ptr,
ffi.callback("void(*)(uv_handle_t*,void*)",
walk),
ffi.NULL)
......@@ -416,7 +439,7 @@ class loop(AbstractLoop):
pass
def break_(self, how=None):
libuv.uv_stop(self._ptr)
libuv.uv_stop(self.ptr)
def reinit(self):
# TODO: How to implement? We probably have to simply
......@@ -440,7 +463,7 @@ class loop(AbstractLoop):
# (https://github.com/joyent/libuv/issues/1405)
# In 1.12, the uv_loop_fork function was added (by gevent!)
libuv.uv_loop_fork(self._ptr)
libuv.uv_loop_fork(self.ptr)
_prepare_ran_callbacks = False
......@@ -540,14 +563,14 @@ class loop(AbstractLoop):
# libuv's now is expressed as an integer number of
# milliseconds, so to get it compatible with time.time units
# that this method is supposed to return, we have to divide by 1000.0
now = libuv.uv_now(self._ptr)
now = libuv.uv_now(self.ptr)
return now / 1000.0
def update_now(self):
libuv.uv_update_time(self._ptr)
libuv.uv_update_time(self.ptr)
def fileno(self):
if self._ptr:
if self.ptr:
fd = libuv.uv_backend_fd(self._ptr)
if fd >= 0:
return fd
......@@ -563,8 +586,10 @@ class loop(AbstractLoop):
return
self._sigchld_watcher = ffi.new('uv_signal_t*')
libuv.uv_signal_init(self._ptr, self._sigchld_watcher)
libuv.uv_signal_init(self.ptr, self._sigchld_watcher)
self._sigchld_watcher.data = self._handle_to_self
# Don't let this keep the loop alive
libuv.uv_unref(self._sigchld_watcher)
libuv.uv_signal_start(self._sigchld_watcher,
libuv.python_sigchld_callback,
......@@ -612,7 +637,7 @@ class loop(AbstractLoop):
except ValueError:
pass
# Now's a good time to clean up any dead lists we don't need
# Now's a good time to clean up any dead watchers we don't need
# anymore
for pid in list(self._child_watchers):
if not self._child_watchers[pid]:
......
......@@ -474,22 +474,25 @@ class _SimulatedWithAsyncMixin(object):
return self._async.active
def start(self, cb, *args):
assert self._async is not None
self._register_loop_callback()
self.callback = cb
self.args = args
self._async.start(cb, *args)
#watcher.start(self, cb, *args)
def stop(self):
self._unregister_loop_callback()
self.callback = None
self.args = None
if self._async is not None:
# If we're stop() after close().
# That should be allowed.
self._async.stop()
def close(self):
if self._async is not None:
a = self._async
#self._async = None
self._async = None
a.close()
def _register_loop_callback(self):
......@@ -503,9 +506,7 @@ class _SimulatedWithAsyncMixin(object):
class fork(_SimulatedWithAsyncMixin,
_base.ForkMixin,
watcher):
# We'll have to implement this one completely manually
# Right now it doesn't matter much since libuv doesn't survive
# a fork anyway. (That's a work in progress)
# We'll have to implement this one completely manually.
_watcher_skip_ffi = False
def _register_loop_callback(self):
......@@ -619,7 +620,7 @@ class timer(_base.TimerMixin, watcher):
_again = False
def _watcher_ffi_init(self, args):
self._watcher_init(self.loop._ptr, self._watcher)
self._watcher_init(self.loop.ptr, self._watcher)
self._after, self._repeat = args
if self._after and self._after < 0.001:
import warnings
......@@ -674,7 +675,7 @@ class stat(_base.StatMixin, watcher):
return data
def _watcher_ffi_init(self, args):
return self._watcher_init(self.loop._ptr, self._watcher)
return self._watcher_init(self.loop.ptr, self._watcher)
MIN_STAT_INTERVAL = 0.1074891 # match libev; 0.0 is default
......@@ -707,7 +708,7 @@ class signal(_base.SignalMixin, watcher):
_watcher_callback_name = '_gevent_signal_callback1'
def _watcher_ffi_init(self, args):
self._watcher_init(self.loop._ptr, self._watcher)
self._watcher_init(self.loop.ptr, self._watcher)
self.ref = False # libev doesn't ref these by default
......
......@@ -277,9 +277,11 @@ if hasattr(os, 'fork'):
# just not waited on yet.
now = time.time()
oldest_allowed = now - timeout
dead = [pid for pid, val
dead = [
pid for pid, val
in _watched_children.items()
if isinstance(val, tuple) and val[2] < oldest_allowed]
if isinstance(val, tuple) and val[2] < oldest_allowed
]
for pid in dead:
del _watched_children[pid]
......@@ -296,7 +298,10 @@ if hasattr(os, 'fork'):
:func:`os.waitpid`. Some combinations of *options* may not
be supported cooperatively (as of 1.1 that includes
WUNTRACED). Using a *pid* of 0 to request waiting on only processes
from the current process group is not cooperative.
from the current process group is not cooperative. A *pid* of -1
to wait for any child is non-blocking, but may or may not
require a trip around the event loop, depending on whether any children
have already terminated but not been waited on.
Availability: POSIX.
......@@ -316,12 +321,19 @@ if hasattr(os, 'fork'):
if pid <= 0:
# magic functions for multiple children.
if pid == -1:
# Any child. If we have one that we're watching and that finished,
# we will use that one. Otherwise, let the OS take care of it.
# Any child. If we have one that we're watching
# and that finished, we will use that one,
# preferring the oldest. Otherwise, let the OS
# take care of it.
finished_at = None
for k, v in _watched_children.items():
if isinstance(v, tuple):
if (
isinstance(v, tuple)
and (finished_at is None or v[2] < finished_at)
):
pid = k
break
finished_at = v[2]
if pid <= 0:
# We didn't have one that was ready. If there are
# no funky options set, and the pid was -1
......
......@@ -38,12 +38,14 @@ if sys.version_info[0] == 2:
import Queue as __queue__ # python 3: pylint:disable=import-error
else:
import queue as __queue__ # python 2: pylint:disable=import-error
Full = __queue__.Full
Empty = __queue__.Empty
# We re-export these exceptions to client modules.
# But we also want fast access to them from Cython with a cdef,
# and we do that with the _ definition.
_Full = Full = __queue__.Full
_Empty = Empty = __queue__.Empty
from gevent.timeout import Timeout
from gevent._hub_local import get_hub_noargs as get_hub
from greenlet import getcurrent
from gevent.exceptions import InvalidSwitchError
__all__ = []
......@@ -71,6 +73,8 @@ def _safe_remove(deq, item):
import gevent._waiter
locals()['Waiter'] = gevent._waiter.Waiter
locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None
class ItemWaiter(Waiter): # pylint:disable=undefined-variable
# pylint:disable=assigning-non-slot
......@@ -256,7 +260,7 @@ class Queue(object):
self._put(item)
if self.getters:
self._schedule_unlock()
elif self.hub is getcurrent():
elif self.hub is getcurrent(): # pylint:disable=undefined-variable
# We're in the mainloop, so we cannot wait; we can switch to other greenlets though.
# Check if possible to get a free slot in the queue.
while self.getters and self.qsize() and self.qsize() >= self._maxsize:
......@@ -290,13 +294,14 @@ class Queue(object):
"""
self.put(item, False)
def __get_or_peek(self, method, block, timeout):
# Internal helper method. The `method` should be either
# self._get when called from self.get() or self._peek when
# called from self.peek(). Call this after the initial check
# to see if there are items in the queue.
if self.hub is getcurrent():
if self.hub is getcurrent(): # pylint:disable=undefined-variable
# special case to make get_nowait() or peek_nowait() runnable in the mainloop greenlet
# there are no items in the queue; try to fix the situation by unlocking putters
while self.putters:
......@@ -305,12 +310,12 @@ class Queue(object):
self.putters.popleft().put_and_switch()
if self.qsize():
return method()
raise Empty()
raise Empty
if not block:
# We can't block, we're not the hub, and we have nothing
# to return. No choice...
raise Empty()
raise Empty
waiter = Waiter() # pylint:disable=undefined-variable
timeout = Timeout._start_new_or_dummy(timeout, Empty)
......@@ -362,7 +367,8 @@ class Queue(object):
(*timeout* is ignored in that case).
"""
if self.qsize():
# XXX: Why doesn't this schedule an unlock like get() does?
# This doesn't schedule an unlock like get() does because we're not
# actually making any space.
return self._peek()
return self.__get_or_peek(self._peek, block, timeout)
......@@ -604,7 +610,7 @@ class Channel(object):
return True
def put(self, item, block=True, timeout=None):
if self.hub is getcurrent():
if self.hub is getcurrent(): # pylint:disable=undefined-variable
if self.getters:
getter = self.getters.popleft()
getter.switch(item)
......@@ -634,7 +640,7 @@ class Channel(object):
self.put(item, False)
def get(self, block=True, timeout=None):
if self.hub is getcurrent():
if self.hub is getcurrent(): # pylint:disable=undefined-variable
if self.putters:
item, putter = self.putters.popleft()
self.hub.loop.run_callback(putter.switch, putter)
......@@ -681,5 +687,11 @@ class Channel(object):
next = __next__ # Py2
def _init():
greenlet_init() # pylint:disable=undefined-variable
_init()
from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent._queue')
......@@ -17,7 +17,7 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from __future__ import print_function
from functools import wraps
......@@ -29,7 +29,6 @@ def wrap_error_fatal(method):
def wrapper(self, *args, **kwargs):
# XXX should also be able to do gevent.SYSTEM_ERROR = object
# which is a global default to all hubs
gevent.get_hub().SYSTEM_ERROR = object
try:
return method(self, *args, **kwargs)
......
......@@ -25,6 +25,8 @@ from gevent.hub import Hub
from .exception import ExpectedException
class QuietHub(Hub):
_resolver = None
_threadpool = None
EXPECTED_TEST_ERROR = (ExpectedException,)
......
......@@ -1247,7 +1247,7 @@ disabled_tests += [
'test_ssl.BasicSocketTests.test_openssl_version'
]
if TRAVIS and OSX:
if OSX:
disabled_tests += [
# This sometimes produces OSError: Errno 40: Message too long
......
......@@ -32,6 +32,8 @@ def _do_not_skip(reason):
return _identity
skipOnMac = _do_not_skip
skipOnMacOnCI = _do_not_skip
skipOnWindows = _do_not_skip
skipOnAppVeyor = _do_not_skip
skipOnCI = _do_not_skip
......@@ -62,6 +64,8 @@ skipOnLibev = _do_not_skip
if sysinfo.WIN:
skipOnWindows = unittest.skip
if sysinfo.OSX:
skipOnMac = unittest.skip
if sysinfo.RUNNING_ON_APPVEYOR:
# See comments scattered around about timeouts and the timer
......@@ -76,6 +80,8 @@ if sysinfo.RUNNING_ON_APPVEYOR:
if sysinfo.RUNNING_ON_CI:
skipOnCI = unittest.skip
if sysinfo.OSX:
skipOnMacOnCI = unittest.skip
if sysinfo.RUNNING_ON_MANYLINUX:
skipOnManylinux = unittest.skip
......
......@@ -29,6 +29,7 @@ import gevent
from gevent._util import LazyOnClass
from gevent._compat import perf_counter
from gevent._compat import get_clock_info
from gevent._hub_local import get_hub_if_exists
from . import sysinfo
from . import params
......@@ -146,12 +147,24 @@ class StringAssertMixin(object):
class TestTimeout(gevent.Timeout):
_expire_info = ''
def __init__(self, timeout):
gevent.Timeout.__init__(self, timeout, 'test timed out\n', ref=False)
def __init__(self, timeout, method='Not Given'):
gevent.Timeout.__init__(
self,
timeout,
'%r: test timed out\n' % (method,),
ref=False
)
def _on_expiration(self, prev_greenlet, ex):
from gevent.util import format_run_info
self._expire_info = '\n'.join(format_run_info())
loop = gevent.get_hub().loop
debug_info = 'N/A'
if hasattr(loop, 'debug'):
debug_info = [str(s) for s in loop.debug()]
run_info = format_run_info()
self._expire_info = 'Loop Debug:\n%s\nRun Info:\n%s' % (
'\n'.join(debug_info), '\n'.join(run_info)
)
gevent.Timeout._on_expiration(self, prev_greenlet, ex)
def __str__(self):
......@@ -165,7 +178,7 @@ def _wrap_timeout(timeout, method):
@wraps(method)
def wrapper(self, *args, **kwargs):
with TestTimeout(timeout):
with TestTimeout(timeout, method):
return method(self, *args, **kwargs)
return wrapper
......@@ -277,7 +290,9 @@ class TestCase(TestCaseMetaClass("NewBase",
# so that doesn't always happen. test__pool.py:TestPoolYYY.test_async
# tends to show timeouts that are too short if we don't.
# XXX: Should some core part of the loop call this?
gevent.get_hub().loop.update_now()
hub = get_hub_if_exists()
if hub and hub.loop:
hub.loop.update_now()
self.close_on_teardown = []
self.addCleanup(self._tearDownCloseOnTearDown)
......
This diff is collapsed.
......@@ -364,7 +364,6 @@ class Definitions(DefinitionsBase):
it seems highly load dependent. Observed with both libev and libuv.
""",
when=TRAVIS & (PYPY | OSX),
run_alone=ALWAYS,
# This often takes much longer on PyPy on CI.
options={'timeout': (CI & PYPY, 180)},
)
......@@ -438,11 +437,11 @@ class Definitions(DefinitionsBase):
""",
)
test__pool = test__queue = RunAlone(
test__pool = RunAlone(
"""
On a heavily loaded box, these can all take upwards of 200s.
""",
when=CI & LEAKTEST | PY3
when=CI & LEAKTEST
)
test_socket = RunAlone(
......@@ -453,7 +452,7 @@ class Definitions(DefinitionsBase):
test__refcount = Ignored(
"Sometimes fails to connect for no reason",
when=(CI & OSX) | (CI & PYPY),
when=(CI & OSX) | (CI & PYPY) | APPVEYOR,
ignore_coverage=PYPY
)
......
......@@ -110,7 +110,6 @@ class AbstractTestMixin(object):
except ImportError:
if modname in modules.OPTIONAL_MODULES:
msg = "Unable to import %s" % modname
warnings.warn(msg) # make the testrunner print it
raise unittest.SkipTest(msg)
raise
......
......@@ -29,7 +29,8 @@ class TestTimeout(greentest.TestCase):
while True:
listener.recvfrom(10000)
gevent.spawn(reader)
greader = gevent.spawn(reader)
self._close_on_teardown(greader.kill)
r = Resolver(servers=[address[0]], timeout=0.001, tries=1,
udp_port=address[-1])
......
......@@ -158,4 +158,4 @@ class Test(greentest.TestCase):
if __name__ == '__main__':
greentest.main()
greentest.main() # pragma: testrunner-no-combine
......@@ -7,18 +7,25 @@ try:
except ImportError:
import _thread as thread
from gevent import testing as greentest
hub = gevent.get_hub()
watcher = hub.loop.async_()
class Test(greentest.TestCase):
def test(self):
hub = gevent.get_hub()
watcher = hub.loop.async_()
# BWC for <3.7: This should still be an attribute
assert hasattr(hub.loop, 'async')
# BWC for <3.7: This should still be an attribute
assert hasattr(hub.loop, 'async')
gevent.spawn_later(0.1, thread.start_new_thread, watcher.send, ())
gevent.spawn_later(0.1, thread.start_new_thread, watcher.send, ())
start = time.time()
start = time.time()
with gevent.Timeout(1.0): # Large timeout for appveyor
with gevent.Timeout(1.0): # Large timeout for appveyor
hub.wait(watcher)
print('Watcher %r reacted after %.6f seconds' % (watcher, time.time() - start - 0.1))
print('Watcher %r reacted after %.6f seconds' % (watcher, time.time() - start - 0.1))
if __name__ == '__main__':
greentest.main()
import gevent
from gevent.hub import get_hub
called = []
from gevent import testing as greentest
class Test(greentest.TestCase):
def f():
called.append(1)
def test(self):
loop = get_hub().loop
called = []
def f():
called.append(1)
def main():
loop = get_hub().loop
x = loop.run_callback(f)
assert x, x
......@@ -27,5 +29,4 @@ def main():
if __name__ == '__main__':
called[:] = []
main()
greentest.main()
......@@ -48,4 +48,4 @@ class TestDestroyHub(unittest.TestCase):
hub.destroy()
if __name__ == '__main__':
unittest.main()
unittest.main() # pragma: testrunner-no-combine
......@@ -5,6 +5,20 @@ import unittest
class TestDestroyDefaultLoop(unittest.TestCase):
def tearDown(self):
self._reset_hub()
super(TestDestroyDefaultLoop, self).tearDown()
def _reset_hub(self):
from gevent._hub_local import set_hub
from gevent._hub_local import set_loop
from gevent._hub_local import get_hub_if_exists
hub = get_hub_if_exists()
if hub is not None:
hub.destroy(destroy_loop=True)
set_hub(None)
set_loop(None)
def test_destroy_gc(self):
# Issue 1098: destroying the default loop
# while using the C extension could crash
......@@ -19,6 +33,7 @@ class TestDestroyDefaultLoop(unittest.TestCase):
loop = gevent.config.loop(default=True)
self.assertTrue(loop.default)
# Destroy it
loop.destroy()
# It no longer claims to be the default
self.assertFalse(loop.default)
......@@ -31,10 +46,7 @@ class TestDestroyDefaultLoop(unittest.TestCase):
# crash only happened when that greenlet object
# was collected at exit time, which was most common
# in CPython 3.5)
from gevent._hub_local import set_hub
set_hub(None)
self._reset_hub()
def test_destroy_two(self):
# Get two new loop object, but using the default
......@@ -50,6 +62,10 @@ class TestDestroyDefaultLoop(unittest.TestCase):
# Destroy the second. This doesn't crash.
loop2.destroy()
self.assertFalse(loop2.default)
self.assertFalse(loop2.ptr)
self._reset_hub()
self.assertTrue(gevent.get_hub().loop.ptr)
if __name__ == '__main__':
......
from __future__ import print_function, absolute_import
from gevent import monkey; monkey.patch_all(subprocess=True)
from gevent import monkey; monkey.patch_all()
import signal
import socket
......
from gevent import monkey
monkey.patch_all(subprocess=True)
monkey.patch_all()
from gevent.server import DatagramServer
......@@ -30,4 +30,6 @@ class Test_udp_client(util.TestServer):
if __name__ == '__main__':
main()
# Running this following test__example_portforwarder on Appveyor
# doesn't work in the same process for some reason.
main() # pragma: testrunner-no-combine
......@@ -21,7 +21,8 @@ import sys
import gevent
from gevent import socket
from gevent.testing import TestCase, main, tcp_listener
from gevent import testing as greentest
from gevent.testing import TestCase, tcp_listener
from gevent.testing import gc_collect_if_needed
from gevent.testing import skipOnPyPy
from gevent.testing import params
......@@ -142,4 +143,4 @@ class TestGreenIo(TestCase):
if __name__ == '__main__':
main()
greentest.main()
import sys
import unittest
import threading
import gevent
import gevent.monkey
gevent.monkey.patch_all()
@unittest.skipUnless(
sys.version_info[0] == 2,
......@@ -8,11 +13,6 @@ import unittest
class Test(unittest.TestCase):
def test(self):
import threading
import gevent.monkey
gevent.monkey.patch_all()
import gevent
self.assertIs(threading._sleep, gevent.sleep)
if __name__ == '__main__':
......
......@@ -4,6 +4,8 @@ gevent.monkey.patch_all()
import socket
import multiprocessing
from gevent import testing as greentest
# Make sure that using the resolver in a forked process
# doesn't hang forever.
......@@ -12,7 +14,9 @@ def block():
socket.getaddrinfo('localhost', 8001)
def main():
class Test(greentest.TestCase):
def test(self):
socket.getaddrinfo('localhost', 8001)
p = multiprocessing.Process(target=block)
......@@ -20,4 +24,4 @@ def main():
p.join()
if __name__ == '__main__':
main()
greentest.main()
import gevent
from gevent import testing as greentest
#import socket # on windows
# iwait should not raise `LoopExit: This operation would block forever`
......@@ -13,8 +15,8 @@ def worker(i):
raise ValueError(i)
return i
def main():
class Test(greentest.TestCase):
def test(self):
finished = 0
# Wait on a group that includes one that will already be
# done, plus some that will finish as we watch
......@@ -27,9 +29,12 @@ def main():
finished += 1
# Simulate doing something that causes greenlets to switch;
# a non-zero timeout is crucial
try:
gevent.sleep(0.01)
except ValueError as ex:
self.assertEqual(ex.args[0], 2)
assert finished == 4
self.assertEqual(finished, 4)
if __name__ == '__main__':
main()
greentest.main()
......@@ -10,7 +10,7 @@ import sys
from multiprocessing import Process
from subprocess import Popen, PIPE
import gevent.testing as greentest
from gevent import testing as greentest
def f(sleep_sec):
gevent.sleep(sleep_sec)
......
# Test idle
import gevent
gevent.sleep()
gevent.idle()
from gevent import testing as greentest
class Test(greentest.TestCase):
def test(self):
gevent.sleep()
gevent.idle()
if __name__ == '__main__':
greentest.main()
import gevent
from gevent import testing as greentest
def func():
class Test(greentest.TestCase):
def test(self):
def func():
pass
a = gevent.spawn(func)
b = gevent.spawn(func)
gevent.joinall([a, b, a])
a = gevent.spawn(func)
b = gevent.spawn(func)
gevent.joinall([a, b, a])
if __name__ == '__main__':
greentest.main()
from gevent.core import loop
from gevent import get_hub
from gevent import testing as greentest
count = 0
class Test(greentest.TestCase):
def test(self):
count = [0]
def incr():
count[0] += 1
def incr():
global count
count += 1
loop = get_hub().loop
loop.run_callback(incr)
loop.run()
self.assertEqual(count, [1])
loop = loop()
loop.run_callback(incr)
loop.run()
assert count == 1, count
if __name__ == '__main__':
greentest.main()
import sys
import unittest
from gevent.testing import TestCase, main
from gevent.testing import TestCase
import gevent
from gevent.timeout import Timeout
......@@ -53,4 +53,4 @@ class TestQueue(TestCase): # pragma: no cover
if __name__ == '__main__':
main()
unittest.main()
from subprocess import Popen
from gevent import monkey
monkey.patch_all()
......@@ -79,6 +77,7 @@ class TestMonkey(SubscriberCleanupMixin, unittest.TestCase):
self.assertTrue(monkey.is_object_patched(modname, objname))
def test_patch_subprocess_twice(self):
Popen = monkey.get_original('subprocess', 'Popen')
self.assertNotIn('gevent', repr(Popen))
self.assertIs(Popen, monkey.get_original('subprocess', 'Popen'))
monkey.patch_subprocess()
......
......@@ -160,11 +160,13 @@ class TestForkAndWatch(greentest.TestCase):
pid = os.fork_and_watch()
if pid:
os.waitpid(-1, 0)
# Can't assert on what the pid actually was,
# Can't assert on what the found pid actually was,
# our testrunner may have spawned multiple children.
os._reap_children(0) # make the leakchecker happy
else: # pragma: no cover
gevent.sleep(2)
# The test framework will catch a regular SystemExit
# from sys.exit(), we need to just kill the process.
os._exit(0)
def test_waitpid_wrong_neg(self):
......
......@@ -22,7 +22,7 @@ from __future__ import print_function
from gevent import monkey
monkey.patch_all(thread=False)
monkey.patch_all()
from contextlib import contextmanager
try:
......
import unittest
import gevent.testing as greentest
from gevent.testing import TestCase, main
from gevent.testing import TestCase
import gevent
from gevent.hub import get_hub, LoopExit
from gevent import util
......@@ -24,7 +24,8 @@ class TestQueue(TestCase):
def test_peek_empty(self):
q = queue.Queue()
# No putters waiting, in the main loop: LoopExit
self.assertRaises(LoopExit, q.peek)
with self.assertRaises(LoopExit):
q.peek()
def waiter(q):
self.assertRaises(Empty, q.peek, timeout=0.01)
......@@ -323,6 +324,8 @@ class TestNoWait(TestCase):
assert q.empty(), q
def test_get_nowait_unlock_channel(self):
# get_nowait runs fine in the hub, and
# it switches to a waiting putter if needed.
result = []
q = queue.Channel()
p = gevent.spawn(q.put, 5)
......@@ -330,19 +333,21 @@ class TestNoWait(TestCase):
def store_result(func, *args):
result.append(func(*args))
assert q.empty(), q
assert q.full(), q
self.assertTrue(q.empty())
self.assertTrue(q.full())
gevent.sleep(0.001)
assert q.empty(), q
assert q.full(), q
self.assertTrue(q.empty())
self.assertTrue(q.full())
get_hub().loop.run_callback(store_result, q.get_nowait)
gevent.sleep(0.001)
assert q.empty(), q
assert q.full(), q
assert result == [5], result
assert p.ready(), p
assert p.dead, p
assert q.empty(), q
self.assertTrue(q.empty())
self.assertTrue(q.full())
self.assertEqual(result, [5])
self.assertTrue(p.ready())
self.assertTrue(p.dead)
self.assertTrue(q.empty())
# put_nowait must work from the mainloop
def test_put_nowait_unlock(self):
......@@ -462,4 +467,4 @@ del AbstractGenericGetTestCase
if __name__ == '__main__':
main()
greentest.main()
......@@ -6,24 +6,29 @@ Fails with PyPy 2.2.1
"""
from __future__ import print_function
import sys
import greenlet
from gevent import testing as greentest
print('Your greenlet version: %s' % (getattr(greenlet, '__version__', None), ))
class Test(greentest.TestCase):
def test(self):
import greenlet
result = []
print('Your greenlet version: %s' % (getattr(greenlet, '__version__', None), ))
result = []
def func():
def func():
result.append(repr(sys.exc_info()))
g = greenlet.greenlet(func)
try:
g = greenlet.greenlet(func)
try:
1 / 0
except ZeroDivisionError:
except ZeroDivisionError:
g.switch()
assert result == ['(None, None, None)'], result
self.assertEqual(result, ['(None, None, None)'])
if __name__ == '__main__':
greentest.main()
import sys
import weakref
from gevent import testing as greentest
class Dummy:
class Dummy(object):
def __init__(self):
__import__('gevent.core')
try:
assert weakref.ref(Dummy())() is None
@greentest.skipIf(weakref.ref(Dummy())() is not None,
"Relies on refcounting for fast weakref cleanup")
class Test(greentest.TestCase):
def test(self):
from gevent import socket
s = socket.socket()
r = weakref.ref(s)
s.close()
del s
assert r() is None
except AssertionError: # pragma: no cover
import sys
if hasattr(sys, 'pypy_version_info'):
# PyPy uses a non refcounted GC which may defer
# the collection of the weakref, unlike CPython
pass
else:
raise
self.assertIsNone(r())
assert weakref.ref(Dummy())() is None or hasattr(sys, 'pypy_version_info')
if __name__ == '__main__':
greentest.main()
......@@ -140,7 +140,10 @@ class TestCase(greentest.TestCase):
conn.close()
ex = exc.exception
self.assertIn(ex.args[0], (errno.ECONNREFUSED, errno.EADDRNOTAVAIL), ex)
self.assertIn(ex.args[0],
(errno.ECONNREFUSED, errno.EADDRNOTAVAIL,
errno.ECONNRESET, errno.ECONNABORTED),
(ex, ex.args))
def assert500(self):
self.Settings.assert500(self)
......
......@@ -22,6 +22,7 @@ from gevent.testing import support
from gevent.testing import params
from gevent.testing.sockets import tcp_listener
from gevent.testing.skipping import skipWithoutExternalNetwork
from gevent.testing.skipping import skipOnMacOnCI
# we use threading on purpose so that we can test both regular and
# gevent sockets with the same code
......@@ -49,7 +50,6 @@ class Thread(_Thread):
class TestTCP(greentest.TestCase):
__timeout__ = None
TIMEOUT_ERROR = socket.timeout
long_data = ", ".join([str(x) for x in range(20000)])
......@@ -210,7 +210,9 @@ class TestTCP(greentest.TestCase):
if match_data is None:
match_data = self.long_data
self.assertEqual(read_data, [match_data])
read_data = read_data[0].split(b',')
match_data = match_data.split(b',')
self.assertEqual(read_data, match_data)
def test_sendall_str(self):
self._test_sendall(self.long_data)
......@@ -219,6 +221,7 @@ class TestTCP(greentest.TestCase):
def test_sendall_unicode(self):
self._test_sendall(six.text_type(self.long_data))
@skipOnMacOnCI("Sometimes fails for no apparent reason (buffering?)")
def test_sendall_array(self):
data = array.array("B", self.long_data)
self._test_sendall(data)
......
......@@ -41,6 +41,8 @@ class Test(greentest.TestCase):
raise AssertionError('must raise KeyboardInterrupt')
def test_keyboard_interrupt_stderr_patched(self):
# XXX: This one non-top-level call prevents us from being
# run in a process with other tests.
from gevent import monkey
monkey.patch_sys(stdin=False, stdout=False, stderr=True)
try:
......
"""
Tests specifically for the monkey-patched threading module.
"""
from gevent import monkey; monkey.patch_all()
from gevent import monkey; monkey.patch_all() # pragma: testrunner-no-monkey-combine
import gevent.hub
# check that the locks initialized by 'threading' did not init the hub
......
......@@ -3,7 +3,7 @@
import threading
from gevent import monkey
monkey.patch_all()
monkey.patch_all() # pragma: testrunner-no-monkey-combine
import gevent.testing as greentest
......
......@@ -5,4 +5,4 @@ import threading
# in python code, this used to throw RuntimeErro("Cannot release un-acquired lock")
# See https://github.com/gevent/gevent/issues/615
with threading.RLock():
monkey.patch_all()
monkey.patch_all() # pragma: testrunner-no-monkey-combine
......@@ -25,7 +25,7 @@ class Test(greentest.TestCase):
def target():
tcurrent = threading.current_thread()
monkey.patch_all()
monkey.patch_all() # pragma: testrunner-no-monkey-combine
tcurrent2 = threading.current_thread()
self.assertIsNot(tcurrent, current)
# We get a dummy thread now
......
......@@ -52,6 +52,6 @@ if __name__ == '__main__':
# Only patch after we're running
from gevent import monkey
monkey.patch_all()
monkey.patch_all() # pragma: testrunner-no-monkey-combine
greentest.main()
......@@ -7,7 +7,7 @@ import gevent.testing as greentest
script = """
from gevent import monkey
monkey.patch_all()
monkey.patch_all() # pragma: testrunner-no-monkey-combine
import sys, os, threading, time
......
......@@ -86,7 +86,8 @@ class TestTree(greentest.TestCase):
# so perhaps we need a GC?
for _ in range(3):
gc.collect()
gevent.get_hub().resolver = None # Reset resolver, don't need to see it
gevent.get_hub().threadpool = None # ditto the pool
glets = []
l = MyLocal(42)
assert l
......@@ -135,6 +136,10 @@ class TestTree(greentest.TestCase):
def _normalize_tree_format(self, value):
import re
hexobj = re.compile('0x[0123456789abcdef]+L?', re.I)
hub_repr = repr(gevent.get_hub())
value = value.replace(hub_repr, "<HUB>")
value = hexobj.sub('X', value)
value = value.replace('epoll', 'select')
value = value.replace('select', 'default')
......@@ -142,6 +147,7 @@ class TestTree(greentest.TestCase):
value = re.compile(' fileno=.').sub('', value)
value = value.replace('ref=-1', 'ref=0')
value = value.replace("type.current_tree", 'GreenletTree.current_tree')
value = value.replace('gevent.tests.__main__.MyLocal', '__main__.MyLocal')
return value
@greentest.ignores_leakcheck
......@@ -159,26 +165,26 @@ class TestTree(greentest.TestCase):
: Greenlet Locals:
: Local <class '__main__.MyLocal'> at X
: {'foo': 42}
+--- <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
+--- <HUB>
: Parent: <greenlet.greenlet object at X>
+--- <Greenlet "Greenlet-1" at X: t2>; finished with value <Greenlet "CustomName-0" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
| +--- <Greenlet "CustomName-0" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
+--- <Greenlet "Greenlet-2" at X: t2>; finished with value <Greenlet "CustomName-4" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
| +--- <Greenlet "CustomName-4" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
+--- <Greenlet "Greenlet-3" at X: t3>; finished with value <Greenlet "Greenlet-5" at X
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
: Spawn Tree Locals
: {'stl': 'STL'}
| +--- <Greenlet "Greenlet-5" at X: t2>; finished with value <Greenlet "CustomName-6" at 0x
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
| +--- <Greenlet "CustomName-6" at X: t1>; finished with exception ExpectedException()
: Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
: Parent: <HUB>
+--- <Greenlet "Greenlet-7" at X: <bound method GreenletTree.current_tree of <class 'gevent.util.GreenletTree'>>>; finished with value <gevent.util.GreenletTree obje
Parent: <QuietHub '' at X default default pending=0 ref=0 thread_ident=X>
Parent: <HUB>
""".strip()
self.assertEqual(expected, value)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment