Commit 41d1f6bb authored by Denis Bilenko's avatar Denis Bilenko

greentest: add hook_stderr(), assert_stderr() methods to TestCase

parent ff37a2c0
......@@ -21,10 +21,10 @@
# package is named greentest, not test, so it won't be confused with test in stdlib
import sys
import os
import errno
import unittest
import time
import traceback
import re
import gevent
......@@ -36,6 +36,14 @@ def exit_unless_25():
if sys.version_info[:2] < (2, 5):
exit_disabled()
VERBOSE = sys.argv.count('-v') > 1
if '--debug-greentest' in sys.argv:
sys.argv.remove('--debug-greentest')
DEBUG = True
else:
DEBUG = False
class TestCase(unittest.TestCase):
......@@ -53,6 +61,13 @@ class TestCase(unittest.TestCase):
self._timer = gevent.Timeout.start_new(self.__timeout__, RuntimeError('test is taking too long'))
def tearDown(self):
try:
if not hasattr(self, 'stderr'):
self.unhook_stderr()
if hasattr(self, 'stderr'):
sys.__stderr__.write(self.stderr)
except:
traceback.print_exc()
if hasattr(self, '_timer'):
self._timer.cancel()
hub = gevent.hub.get_hub()
......@@ -86,13 +101,78 @@ class TestCase(unittest.TestCase):
def testcasename(self):
return self.__class__.__name__ + '.' + self.testname
def hook_stderr(self):
if VERBOSE:
return
from cStringIO import StringIO
self.new_stderr = StringIO()
self.old_stderr = sys.stderr
sys.stderr = self.new_stderr
def unhook_stderr(self):
if VERBOSE:
return
try:
value = self.new_stderr.getvalue()
except AttributeError:
return None
sys.stderr = self.old_stderr
if value:
self.stderr = value
return value
def assert_stderr_traceback(self, typ, value=None):
if VERBOSE:
return
if isinstance(typ, Exception):
if value is None:
value = str(typ)
typ = typ.__class__.__name__
stderr = self.unhook_stderr()
assert stderr is not None, repr(stderr)
traceback_re = '^Traceback \\(most recent call last\\):\n( +.*?\n)+^(?P<type>\w+): (?P<value>.*?)$'
self.extract_re(traceback_re, type=typ, value=value)
def assert_stderr(self, message):
if VERBOSE:
return
exact_re = '^' + message + '.*?\n$.*'
if re.match(exact_re, self.stderr):
self.extract_re(exact_re)
else:
words_re = '^' + '.*?'.join(message.split()) + '.*?\n$'
if re.match(words_re, self.stderr):
self.extract_re(words_re)
else:
if message.endswith('...'):
another_re = '^' + '.*?'.join(message.split()) + '.*?(\n +.*?$){2,5}\n\n'
self.extract_re(another_re)
else:
raise AssertionError('%r did not match:\n%r' % (message, self.stderr))
def assert_mainloop_assertion(self, message=None):
self.assert_stderr_traceback('AssertionError', 'Cannot switch to MAINLOOP from MAINLOOP')
if message is not None:
self.assert_stderr(message)
def extract_re(self, regex, **kwargs):
assert self.stderr is not None
m = re.search(regex, self.stderr, re.DOTALL|re.M)
if m is None:
raise AssertionError('%r did not match:\n%r' % (regex, self.stderr))
for key, expected_value in kwargs.items():
real_value = m.group(key)
if expected_value is not None:
try:
self.assertEqual(real_value, expected_value)
except AssertionError:
print 'failed to process: %s' % self.stderr
raise
if DEBUG:
ate = '\n#ATE#: ' + self.stderr[m.start(0):m.end(0)].replace('\n', '\n#ATE#: ') + '\n'
sys.__stderr__.write(ate)
self.stderr = self.stderr[:m.start(0)] + self.stderr[m.end(0)+1:]
def find_command(command):
for path in os.getenv('PATH', '/usr/bin:/usr/sbin').split(os.pathsep):
path = os.path.join(dir, command)
if os.access(path, os.X_OK):
return path
raise IOError(errno.ENOENT, 'Command not found: %r' % command)
main = unittest.main
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment