Commit 32426779 authored by Denis Bilenko's avatar Denis Bilenko

tests: better implementation of timeouts (does not rely on setUp and tearDown)

parent 576f8f56
......@@ -46,7 +46,19 @@ else:
gettotalrefcount = getattr(sys, 'gettotalrefcount', None)
def wrap(method):
def wrap_timeout(timeout, method):
if timeout is None:
return method
@wraps(method)
def wrapped(self, *args, **kwargs):
with gevent.Timeout(timeout, 'test timed out'):
return method(self, *args, **kwargs)
return wrapped
def wrap_refcount(method):
if gettotalrefcount is None:
return method
@wraps(method)
def wrapped(self, *args, **kwargs):
import gc
......@@ -74,25 +86,37 @@ def wrap(method):
return wrapped
class CheckRefcountMetaClass(type):
class TestCaseMetaClass(type):
# wrap each test method with
# a) timeout check
# b) totalrefcount check
def __new__(meta, classname, bases, classDict):
if classDict.get('check_totalrefcount', True):
for key, value in classDict.items():
if (key.startswith('test_') or key == 'test') and callable(value):
classDict.pop(key)
classDict[key] = wrap(value)
timeout = classDict.get('__timeout__', 'NONE')
if timeout == 'NONE':
timeout = getattr(bases[0], '__timeout__', None)
check_totalrefcount = classDict.get('check_totalrefcount')
if check_totalrefcount is None:
check_totalrefcount = getattr(bases[0], 'check_totalrefcount', True)
for key, value in classDict.items():
if (key.startswith('test_') or key == 'test') and callable(value):
classDict.pop(key)
value = wrap_timeout(timeout, value)
if check_totalrefcount:
value = wrap_refcount(value)
classDict[key] = value
return type.__new__(meta, classname, bases, classDict)
class TestCase0(BaseTestCase):
class TestCase(BaseTestCase):
__metaclass__ = TestCaseMetaClass
__timeout__ = 1
switch_expected = 'default'
_switch_count = None
check_totalrefcount = True
def __init__(self, *args, **kwargs):
BaseTestCase.__init__(self, *args, **kwargs)
self._timer = None
self._hub = gevent.hub.get_hub()
self._switch_count = None
......@@ -105,7 +129,6 @@ class TestCase0(BaseTestCase):
gevent.sleep(0) # switch at least once to setup signal handlers
if hasattr(self._hub, 'switch_count'):
self._switch_count = self._hub.switch_count
self._timer = gevent.Timeout.start_new(self.__timeout__, RuntimeError('test is taking too long'))
def tearDown(self):
if hasattr(self, 'cleanup'):
......@@ -117,23 +140,6 @@ class TestCase0(BaseTestCase):
sys.__stderr__.write(self.stderr)
except:
traceback.print_exc()
if getattr(self, '_timer', None) is not None:
self._timer.cancel()
self._timer = None
if self._switch_count is not None and hasattr(self._hub, 'switch_count'):
msg = ''
if self._hub.switch_count < self._switch_count:
msg = 'hub.switch_count decreased?\n'
elif self._hub.switch_count == self._switch_count:
if self.switch_expected:
msg = '%s.%s did not switch\n' % (type(self).__name__, self.testname)
elif self._hub.switch_count > self._switch_count:
if not self.switch_expected:
msg = '%s.%s switched but expected not to\n' % (type(self).__name__, self.testname)
if msg:
print >> sys.stderr, 'WARNING: ' + msg
else:
sys.stderr.write('WARNING: %s.setUp does not call base class setUp\n' % (type(self).__name__, ))
@property
def testname(self):
......
......@@ -24,7 +24,8 @@ def patch_all(timeout=None):
monkey.patch_all(aggressive=True)
import unittest
import greentest
unittest.TestCase = greentest.TestCase0
unittest.TestCase = greentest.TestCase
unittest.TestCase.check_totalrefcount = False
if timeout is not None:
unittest.TestCase.__timeout__ = timeout
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment