Commit 76cbf2fa authored by Jason Madden's avatar Jason Madden

Remove most routine calls to _dbg, leaving error/unexpected calls in place.

I wasn't even using them to debug anymore, they were too verbose to be blanket enabled.

Fixes #1146.

Also tweak the leakcheck tests for threadpools.
parent c6686730
......@@ -12,7 +12,6 @@ import traceback
from gevent._ffi import _dbg
from gevent._ffi import GEVENT_DEBUG_LEVEL
from gevent._ffi import TRACE
from gevent._ffi import CRITICAL
from gevent._ffi.callback import callback
from gevent._compat import PYPY
......@@ -75,9 +74,7 @@ class AbstractCallbacks(object):
self.from_handle = ffi.from_handle
def from_handle(self, handle): # pylint:disable=method-hidden
_dbg("Getting from handle", handle)
x = self.ffi.from_handle(handle)
_dbg("Got from handle", handle, x)
return x
def python_callback(self, handle, revents):
......@@ -224,7 +221,6 @@ class AbstractCallbacks(object):
# Since the C level passed in a null pointer, even dereferencing the handle
# will just produce some exceptions.
return
_dbg("python_stop: stopping watcher with handle", handle)
watcher = self.from_handle(handle)
watcher.stop()
......
......@@ -52,7 +52,7 @@ except ImportError: # Python < 3.4
from gevent._compat import fsencode
from gevent._ffi import _dbg
from gevent._ffi import _dbg # pylint:disable=unused-import
from gevent._ffi import GEVENT_DEBUG_LEVEL
from gevent._ffi import DEBUG
from gevent._ffi.loop import GEVENT_CORE_EVENTS
......@@ -438,13 +438,11 @@ class watcher(object):
if self._callback is None:
assert self.loop is None or self not in self.loop._keepaliveset
return
_dbg("Main stop for", self, "keepalive?", self in self.loop._keepaliveset)
self._watcher_ffi_stop_ref()
self._watcher_ffi_stop()
self.loop._keepaliveset.discard(self)
self._handle = None
self._watcher_set_data(self._watcher, self._FFI.NULL) # pylint:disable=no-member
_dbg("Finished main stop for", self, "keepalive?", self in self.loop._keepaliveset)
self.callback = None
self.args = None
......
......@@ -120,7 +120,6 @@ class loop(AbstractLoop):
libuv.uv_check_init(self._ptr, self._check)
libuv.uv_check_start(self._check, libuv.python_check_callback)
libuv.uv_unref(self._check)
_dbg("Started check watcher", ffi.cast('void*', self._check))
# We also have to have an idle watcher to be able to handle
# signals in a timely manner. Without them, libuv won't loop again
......@@ -393,7 +392,6 @@ class loop(AbstractLoop):
def run(self, nowait=False, once=False):
_dbg("Entering libuv.uv_run")
# we can only respect one flag or the other.
# nowait takes precedence because it can't block
mode = libuv.UV_RUN_DEFAULT
......
......@@ -118,7 +118,6 @@ class watcher(_base.watcher):
# Instead, this is arranged as a callback to GC when the
# watcher class dies. Obviously it's important to keep the ffi
# watcher alive.
_dbg("Request to close handle", ffi.cast('void*', ffi_watcher), ffi_watcher)
# We can pass in "subclasses" if uv_handle_t that line up at the C level,
# but that don't in CFFI without a cast. But be careful what we use the cast
# for, don't pass it back to C.
......@@ -128,7 +127,6 @@ class watcher(_base.watcher):
# and trying to close it results in libuv terminating the process.
# Sigh. Same thing if it's already in the process of being
# closed.
_dbg("Closing handle", ffi_watcher)
_closing_watchers.add(ffi_watcher)
libuv.uv_close(ffi_watcher, libuv._uv_close_callback)
......@@ -136,7 +134,6 @@ class watcher(_base.watcher):
def _watcher_ffi_set_init_ref(self, ref):
_dbg("Creating", type(self), "with ref", ref)
self.ref = ref
def _watcher_ffi_init(self, args):
......@@ -146,28 +143,22 @@ class watcher(_base.watcher):
*args)
def _watcher_ffi_start(self):
_dbg("Starting", self)
self._watcher_start(self._watcher, self._watcher_callback)
_dbg("\tStarted", self)
def _watcher_ffi_stop(self):
_dbg("Stopping", self, self._watcher_stop)
if self._watcher:
# The multiplexed io watcher deletes self._watcher
# when it closes down. If that's in the process of
# an error handler, AbstractCallbacks.unhandled_onerror
# will try to close us again.
self._watcher_stop(self._watcher)
_dbg("Stopped", self)
@_base.only_if_watcher
def _watcher_ffi_ref(self):
_dbg("Reffing", self)
libuv.uv_ref(self._watcher)
@_base.only_if_watcher
def _watcher_ffi_unref(self):
_dbg("Unreffing", self)
libuv.uv_unref(self._watcher)
def _watcher_ffi_start_unref(self):
......@@ -255,7 +246,6 @@ class io(_base.IoMixin, watcher):
def _set_events(self, events):
if events == self._events:
return
_dbg("Changing event mask for", self, "from", self._events, "to", events)
self._events = events
if self.active:
# We're running but libuv specifically says we can
......@@ -266,7 +256,6 @@ class io(_base.IoMixin, watcher):
events = property(_get_events, _set_events)
def _watcher_ffi_start(self):
_dbg("Starting watcher", self, "with events", self._events)
self._watcher_start(self._watcher, self._events, self._watcher_callback)
if sys.platform.startswith('win32'):
......@@ -320,9 +309,6 @@ class io(_base.IoMixin, watcher):
_base.not_while_active(lambda self, nv: setattr(self, '_events', nv)))
def start(self, callback, *args, **kwargs):
_dbg("Starting IO multiplex watcher for", self.fd,
"callback", callback, "events", self.events,
"owner", self._watcher_ref)
self.pass_events = kwargs.get("pass_events")
self.callback = callback
self.args = args
......@@ -336,9 +322,6 @@ class io(_base.IoMixin, watcher):
watcher._calc_and_update_events()
def stop(self):
_dbg("Stopping IO multiplex watcher for", self.fd,
"callback", self.callback, "events", self.events,
"owner", self._watcher_ref)
self.callback = None
self.pass_events = None
self.args = None
......@@ -366,7 +349,6 @@ class io(_base.IoMixin, watcher):
lambda self, nv: self._watcher_ref._set_fd(nv))
def _io_maybe_stop(self):
_dbg("IO maybe stop on behalf of multiplex", self, "fd", self._fd, "events", self._events)
self._calc_and_update_events()
for w in self._multiplex_watchers:
if w.callback is not None:
......@@ -378,7 +360,6 @@ class io(_base.IoMixin, watcher):
self.stop()
def _io_start(self):
_dbg("IO start on behalf of multiplex", self, "fd", self._fd, "events", self._events)
self._calc_and_update_events()
self.start(self._io_callback, pass_events=True)
......@@ -404,7 +385,6 @@ class io(_base.IoMixin, watcher):
def _multiplex_closed(self, watcher):
self._multiplex_watchers.remove(watcher)
if not self._multiplex_watchers:
_dbg("IO Watcher", self, "has no more multiplexes")
self.stop() # should already be stopped
self._no_more_watchers()
# It is absolutely critical that we control when the call
......@@ -420,8 +400,6 @@ class io(_base.IoMixin, watcher):
self.close()
else:
self._calc_and_update_events()
_dbg("IO Watcher", self, "has remaining multiplex:",
self._multiplex_watchers)
def _no_more_watchers(self):
# The loop sets this on an individual watcher to delete it from
......@@ -448,14 +426,11 @@ class io(_base.IoMixin, watcher):
# See test__makefile_ref.TestSSL for examples.
# return
_dbg("Callback event for watcher", self._fd, "event", events)
for watcher in self._multiplex_watchers:
if not watcher.callback:
# Stopped
_dbg("Watcher", self, "has stopped multiplex", watcher)
continue
assert watcher._watcher_ref is self, (self, watcher._watcher_ref)
_dbg("Event for watcher", self._fd, events, watcher.events, events & watcher.events)
send_event = (events & watcher.events) or events < 0
if send_event:
......
......@@ -3,12 +3,6 @@ import sys
PY3 = sys.version_info[0] >= 3
if PY3:
advance_iterator = next
else:
def advance_iterator(it):
return it.next()
if PY3:
import builtins
exec_ = getattr(builtins, "exec")
......
......@@ -8,7 +8,6 @@ import greentest
import greentest.timing
import random
from greentest import ExpectedException
from greentest import six
import unittest
......@@ -352,14 +351,14 @@ class TestPool(greentest.TestCase): # pylint:disable=too-many-public-methods
def test_imap_it_small(self):
it = self.pool.imap(sqr, range(SMALL_RANGE))
for i in range(SMALL_RANGE):
self.assertEqual(six.advance_iterator(it), i * i)
self.assertRaises(StopIteration, lambda: six.advance_iterator(it))
self.assertEqual(next(it), i * i)
self.assertRaises(StopIteration, next, it)
def test_imap_it_large(self):
it = self.pool.imap(sqr, range(LARGE_RANGE))
for i in range(LARGE_RANGE):
self.assertEqual(six.advance_iterator(it), i * i)
self.assertRaises(StopIteration, lambda: six.advance_iterator(it))
self.assertEqual(next(it), i * i)
self.assertRaises(StopIteration, next, it)
def test_imap_random(self):
it = self.pool.imap(sqr_random_sleep, range(SMALL_RANGE))
......
from __future__ import print_function
import sys
from time import time, sleep
import contextlib
import random
import weakref
import gc
import greentest
import gevent.threadpool
from gevent.threadpool import ThreadPool
import gevent
from greentest import ExpectedException
from greentest import six
from greentest import PYPY
import gc
# pylint:disable=too-many-ancestors
......@@ -33,20 +35,39 @@ class TestCase(greentest.TestCase):
__timeout__ = greentest.LARGE_TIMEOUT
pool = None
ClassUnderTest = ThreadPool
def _FUT(self):
return self.ClassUnderTest
def _makeOne(self, size, increase=greentest.RUN_LEAKCHECKS):
self.pool = pool = self._FUT()(size)
if increase:
# Max size to help eliminate false positives
self.pool.size = size
return pool
def cleanup(self):
pool = getattr(self, 'pool', None)
pool = self.pool
if pool is not None:
kill = getattr(pool, 'kill', None) or getattr(pool, 'shutdown', None)
kill = getattr(pool, 'kill', None) or getattr(pool, 'shutdown')
kill()
del kill
del self.pool
if greentest.RUN_LEAKCHECKS:
# Each worker thread created a greenlet object and switched to it.
# It's a custom subclass, but even if it's not, it appears that
# the root greenlet for the new thread sticks around until there's a
# gc. Simply calling 'getcurrent()' is enough to "leak" a greenlet.greenlet
# and a weakref.
for _ in range(3):
gc.collect()
class PoolBasicTests(TestCase):
def test_execute_async(self):
self.pool = pool = ThreadPool(2)
pool = self._makeOne(2)
r = []
first = pool.spawn(r.append, 1)
first.get()
......@@ -65,12 +86,12 @@ class PoolBasicTests(TestCase):
self.assertEqualFlakyRaceCondition(sorted(r), [1, 2, 3, 4])
def test_apply(self):
self.pool = pool = ThreadPool(1)
pool = self._makeOne(1)
result = pool.apply(lambda a: ('foo', a), (1, ))
self.assertEqual(result, ('foo', 1))
def test_apply_raises(self):
self.pool = pool = ThreadPool(1)
pool = self._makeOne(1)
def raiser():
raise ExpectedException()
......@@ -85,8 +106,7 @@ class PoolBasicTests(TestCase):
def test_init_valueerror(self):
self.switch_expected = False
with self.assertRaises(ValueError):
ThreadPool(-1)
self.pool = None
self._makeOne(-1)
#
# tests from standard library test/test_multiprocessing.py
......@@ -122,18 +142,17 @@ class _AbstractPoolTest(TestCase):
size = 1
ClassUnderTest = ThreadPool
MAP_IS_GEN = False
def setUp(self):
greentest.TestCase.setUp(self)
self.pool = self.ClassUnderTest(self.size)
self._makeOne(self.size)
@greentest.ignores_leakcheck
def test_map(self):
pmap = self.pool.map
if self.MAP_IS_GEN:
pmap = lambda *args: list(self.pool.map(*args))
pmap = lambda f, i: list(self.pool.map(f, i))
self.assertEqual(pmap(sqr, range(10)), list(map(sqr, range(10))))
self.assertEqual(pmap(sqr, range(100)), list(map(sqr, range(100))))
......@@ -202,30 +221,30 @@ class TestPool(_AbstractPoolTest):
def test_imap_it_small(self):
it = self.pool.imap(sqr, range(SMALL_RANGE))
for i in range(SMALL_RANGE):
self.assertEqual(six.advance_iterator(it), i * i)
self.assertRaises(StopIteration, lambda: six.advance_iterator(it))
self.assertEqual(next(it), i * i)
self.assertRaises(StopIteration, next, it)
def test_imap_it_large(self):
it = self.pool.imap(sqr, range(LARGE_RANGE))
for i in range(LARGE_RANGE):
self.assertEqual(six.advance_iterator(it), i * i)
self.assertRaises(StopIteration, lambda: six.advance_iterator(it))
self.assertEqual(next(it), i * i)
self.assertRaises(StopIteration, next, it)
def test_imap_gc(self):
it = self.pool.imap(sqr, range(SMALL_RANGE))
for i in range(SMALL_RANGE):
self.assertEqual(six.advance_iterator(it), i * i)
self.assertEqual(next(it), i * i)
gc.collect()
self.assertRaises(StopIteration, lambda: six.advance_iterator(it))
self.assertRaises(StopIteration, next, it)
def test_imap_unordered_gc(self):
it = self.pool.imap_unordered(sqr, range(SMALL_RANGE))
result = []
for _ in range(SMALL_RANGE):
result.append(six.advance_iterator(it))
result.append(next(it))
gc.collect()
with self.assertRaises(StopIteration):
six.advance_iterator(it)
next(it)
self.assertEqual(sorted(result), [x * x for x in range(SMALL_RANGE)])
def test_imap_random(self):
......@@ -253,7 +272,7 @@ class TestPool(_AbstractPoolTest):
result.join()
def sleep(self, x):
sleep(float(x) / 10.)
sleep(float(x) / 10.0)
return str(x)
def test_imap_unordered_sleep(self):
......@@ -269,6 +288,7 @@ class TestPool(_AbstractPoolTest):
class TestPool2(TestPool):
size = 2
@greentest.ignores_leakcheck # Asking for the hub in the new thread shows up as a "leak"
def test_recursive_apply(self):
p = self.pool
......@@ -285,8 +305,6 @@ class TestPool2(TestPool):
result = p.apply(a)
self.assertEqual(result, "B")
# Asking for the hub in the new thread shows up as a "leak"
test_recursive_apply.ignore_leakcheck = True
class TestPool3(TestPool):
......@@ -323,15 +341,15 @@ class TestJoinEmpty(TestCase):
# Running this test standalone doesn't crash PyPy, only when it's run
# as part of this whole file. Removing it does solve the crash though.
def test(self):
self.pool = ThreadPool(1)
self.pool.join()
pool = self._makeOne(1)
pool.join()
class TestSpawn(TestCase):
switch_expected = True
def test(self):
self.pool = pool = ThreadPool(1)
pool = self._makeOne(1)
self.assertEqual(len(pool), 0)
log = []
sleep_n_log = lambda item, seconds: [sleep(seconds), log.append(item)]
......@@ -360,12 +378,12 @@ class TestErrorInIterator(TestCase):
error_fatal = False
def test(self):
self.pool = ThreadPool(3)
self.pool = self._makeOne(3)
self.assertRaises(greentest.ExpectedException, self.pool.map, lambda x: None, error_iter())
gevent.sleep(0.001)
def test_unordered(self):
self.pool = ThreadPool(3)
self.pool = self._makeOne(3)
def unordered():
return list(self.pool.imap_unordered(lambda x: None, error_iter()))
......@@ -377,7 +395,7 @@ class TestErrorInIterator(TestCase):
class TestMaxsize(TestCase):
def test_inc(self):
self.pool = ThreadPool(0)
self.pool = self._makeOne(0)
done = []
# Try to be careful not to tick over the libuv timer.
# See libuv/loop.py:_start_callback_timer
......@@ -391,7 +409,7 @@ class TestMaxsize(TestCase):
self.assertEqualFlakyRaceCondition(done, [1, 2])
def test_setzero(self):
pool = self.pool = ThreadPool(3)
pool = self.pool = self._makeOne(3)
pool.spawn(sleep, 0.1)
pool.spawn(sleep, 0.2)
pool.spawn(sleep, 0.3)
......@@ -405,7 +423,7 @@ class TestMaxsize(TestCase):
class TestSize(TestCase):
def test(self):
pool = self.pool = ThreadPool(2)
pool = self.pool = self._makeOne(2, increase=False)
self.assertEqual(pool.size, 0)
pool.size = 1
self.assertEqual(pool.size, 1)
......@@ -414,15 +432,12 @@ class TestSize(TestCase):
pool.size = 1
self.assertEqual(pool.size, 1)
def set_neg():
with self.assertRaises(ValueError):
pool.size = -1
self.assertRaises(ValueError, set_neg)
def set_too_big():
with self.assertRaises(ValueError):
pool.size = 3
self.assertRaises(ValueError, set_too_big)
pool.size = 0
self.assertEqual(pool.size, 0)
pool.size = 2
......@@ -432,7 +447,7 @@ class TestSize(TestCase):
class TestRef(TestCase):
def test(self):
pool = self.pool = ThreadPool(2)
pool = self.pool = self._makeOne(2)
refs = []
obj = SomeClass()
......@@ -480,7 +495,7 @@ def noop():
class TestRefCount(TestCase):
def test(self):
pool = ThreadPool(1)
pool = self._makeOne(1)
pool.spawn(noop)
gevent.sleep(0)
pool.kill()
......@@ -518,7 +533,7 @@ if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
future.calledback += 1
raise greentest.ExpectedException("Expected, ignored")
future = pool.submit(fn)
future = pool.submit(fn) # pylint:disable=no-member
future.calledback = 0
future.add_done_callback(callback)
self.assertRaises(FutureTimeoutError, future.result, timeout=0.001)
......@@ -574,7 +589,7 @@ if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
gevent.sleep(0.5)
return 42
future = pool.submit(fn)
future = pool.submit(fn) # pylint:disable=no-member
if self.MONKEY_PATCHED:
# Things work as expected when monkey-patched
_done, not_done = cf_wait((future,), timeout=0.001)
......@@ -609,7 +624,7 @@ if hasattr(gevent.threadpool, 'ThreadPoolExecutor'):
gevent.sleep(0.5)
return 42
future = pool.submit(fn)
future = pool.submit(fn) # pylint:disable=no-member
def spawned():
return 2016
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment