tests: review report and mark known failures as expected

import time
import sys
import neo
import os
from collections import Counter, defaultdict
from cStringIO import StringIO
from unittest.runner import _WritelnDecorator
from neo.tests import getTempDirectory, __dict__ as neo_tests__dict__
from neo.tests.benchmark import BenchmarkRunner
ZODB_TEST_MODULES = [
class NeoTestRunner(unittest.TestResult):
class NeoTestRunner(unittest.TextTestResult):
""" Custom result class to build report with statistics per module """
def __init__(self, title):
def __init__(self, title, verbosity):
super(NeoTestRunner, self).__init__(
_WritelnDecorator(sys.stderr), False, verbosity)
self._title = title
self.modulesStats = {}
self.failedImports = {}
self.lastStart = None
self.run_dict = defaultdict(int)
self.time_dict = defaultdict(int)
self.temp_directory = getTempDirectory()
def wasSuccessful(self):
return not (self.failures or self.errors or self.unexpectedSuccesses)
def run(self, name, modules):
print '\n', name
suite = unittest.TestSuite()
class ModuleStats(object):
class ModuleStats(object):
run = 0
errors = 0
success = 0
failures = 0
time = 0.0
def _getModuleStats(self, test):
module = test.__class__.__module__
module = tuple(module.split('.'))
return self.modulesStats[module]
except KeyError:
self.modulesStats[module] = self.ModuleStats()
return self.modulesStats[module]
def _updateTimer(self, stats):
stats.time += time.time() - self.lastStart
def startTest(self, test):
unittest.TestResult.startTest(self, test)" * TEST %s", test)
stats = self._getModuleStats(test) += 1
self.lastStart = time.time()
super(NeoTestRunner, self).startTest(test)
self.run_dict[test.__class__.__module__] += 1
self.start_time = time.time()
def addSuccess(self, test):
print "OK"
unittest.TestResult.addSuccess(self, test)
stats = self._getModuleStats(test)
stats.success += 1
def addError(self, test, err):
print "ERROR"
unittest.TestResult.addError(self, test, err)
stats = self._getModuleStats(test)
stats.errors += 1
def addFailure(self, test, err):
print "FAIL"
unittest.TestResult.addFailure(self, test, err)
stats = self._getModuleStats(test)
stats.failures += 1
def stopTest(self, test):
self.time_dict[test.__class__.__module__] += \
time.time() - self.start_time
super(NeoTestRunner, self).stopTest(test)
def _buildSummary(self, add_status):
success = self.testsRun - len(self.errors) - len(self.failures)
unexpected_count = len(self.errors) + len(self.failures) \
+ len(self.unexpectedSuccesses)
expected_count = len(self.expectedFailures)
success = self.testsRun - unexpected_count - expected_count
add_status('Directory', self.temp_directory)
if self.testsRun:
add_status('Status', '%.3f%%' % (success * 100.0 / self.testsRun))
for var in os.environ.iterkeys():
for var in os.environ:
if var.startswith('NEO_TEST'):
add_status(var, os.environ[var])
# visual
header = "%25s | run | success | errors | fails | time \n" % 'Test Module'
separator = "%25s-+---------+---------+---------+---------+----------\n" % ('-' * 25)
format = "%25s | %3s | %3s | %3s | %3s | %6.2fs \n"
group_f = "%25s | | | | | \n"
header = "%25s | run | unexpected | expected | skipped | time \n" % 'Test Module'
separator = "%25s-+-------+------------+----------+---------+----------\n" % ('-' * 25)
format = "%25s | %3s | %3s | %3s | %3s | %6.2fs \n"
group_f = "%25s | | | | | \n"
# header
s = ' ' * 30 + ' NEO TESTS REPORT'
s += '\n'
s += '\n' + header + separator
s = ' ' * 30 + ' NEO TESTS REPORT\n\n' + header + separator
group = None
t_success = 0
unexpected = Counter(x[0].__class__.__module__
for x in (self.errors, self.failures)
for x in x)
for x in self.unexpectedSuccesses)
expected = Counter(x[0].__class__.__module__
for x in self.expectedFailures)
skipped = Counter(x[0].__class__.__module__
for x in self.skipped)
total_time = 0
# for each test case
for k, v in sorted(self.modulesStats.items()):
for k, v in sorted(self.run_dict.iteritems()):
# display group below its content
_group = '.'.join(k[:-1])
if group is None:
group = _group
_group, name = k.rsplit('.', 1)
if _group != group:
s += separator + group_f % group + separator
if group:
s += separator + group_f % group + separator
group = _group
# test case stats
t_success += v.success
run, success = or '.', v.success or '.'
errors, failures = v.errors or '.', v.failures or '.'
name = k[-1].lstrip('test')
args = (name, run, success, errors, failures, v.time)
s += format % args
t = self.time_dict[k]
total_time += t
s += format % (name.lstrip('test'), v, unexpected.get(k, '.'),
expected.get(k, '.'), skipped.get(k, '.'), t)
# the last group
s += separator + group_f % group + separator
# the final summary
errors, failures = len(self.errors) or '.', len(self.failures) or '.'
args = ("Summary", self.testsRun, t_success, errors, failures, self.time)
s += format % args + separator + '\n'
return s
def _buildErrors(self):
s = ''
test_formatter = lambda t:
if len(self.errors):
s += '\nERRORS:\n'
for test, trace in self.errors:
s += "%s\n" % test_formatter(test)
s += "-------------------------------------------------------------\n"
s += trace
s += "-------------------------------------------------------------\n"
s += '\n'
if len(self.failures):
s += '\nFAILURES:\n'
for test, trace in self.failures:
s += "%s\n" % test_formatter(test)
s += "-------------------------------------------------------------\n"
s += trace
s += "-------------------------------------------------------------\n"
s += '\n'
return s
def _buildWarnings(self):
s = '\n'
if self.failedImports:
s += 'Failed imports :\n'
for module, err in self.failedImports.items():
s += '%s:\n%s' % (module, err)
s += '\n'
return s
s += format % ("Summary", self.testsRun, unexpected_count or '.',
expected_count or '.', len(self.skipped) or '.',
total_time) + separator + '\n'
return "%s Tests, %s Failed" % (self.testsRun, unexpected_count), s
def buildReport(self, add_status):
self.time = sum([s.time for s in self.modulesStats.values()])
# TODO: Add 'Broken' for known failures (not a regression)
# and 'Fixed' for unexpected successes.
self.subject = "%s Tests, %s Failed" % (
self.testsRun, len(self.errors) + len(self.failures))
summary = self._buildSummary(add_status)
errors = self._buildErrors()
warnings = self._buildWarnings()
report = '\n'.join([summary, errors, warnings])
return (self.subject, report)
subject, summary = self._buildSummary(add_status)
body = StringIO()
for test in self.unexpectedSuccesses:
body.write("UNEXPECTED SUCCESS: %s\n" % self.getDescription(test)) = _WritelnDecorator(body)
return subject, body.getvalue()
class TestRunner(BenchmarkRunner):
class TestRunner(BenchmarkRunner):
help='Unit & threaded tests')
parser.add_option('-z', '--zodb', action='store_true',
help='ZODB test suite running on a NEO')
parser.add_option('-v', '--verbose', action='store_true',
help='Verbose output')
parser.format_epilog = lambda _: """
Environment Variables:
NEO_TESTS_ADAPTER Default is SQLite for threaded clusters,
......@@ -291,14 +237,13 @@ Environment Variables:
unit = options.unit,
functional = options.functional,
zodb = options.zodb,
verbosity = 2 if options.verbose else 1,
def start(self):
config = self._config
# run requested tests
runner = NeoTestRunner(
title=config.title or 'Neo',
runner = NeoTestRunner(config.title or 'Neo', config.verbosity)
if config.unit:'Unit tests', UNIT_TEST_MODULES)
import __builtin__
import __builtin__
import errno
import functools
import os
import random
import socket
from neo.lib.protocol import NodeTypes, Packets, UUID_NAMESPACES
from neo.lib.util import getAddressType
from time import time
from struct import pack, unpack
from import _ExpectedFailure, _UnexpectedSuccess
from ZODB.utils import newTid
except ImportError:
def expectedFailure(exception=AssertionError):
def decorator(func):
def wrapper(*args, **kw):
func(*args, **kw)
except exception, e:
# XXX: passing sys.exc_info() causes deadlocks
raise _ExpectedFailure((type(e), None, None))
raise _UnexpectedSuccess
return functools.wraps(func)(wrapper)
if callable(exception) and not isinstance(exception, type):
func = exception
exception = Exception
return decorator(func)
return decorator
DB_PREFIX = os.getenv('NEO_DB_PREFIX', 'test_neo')
DB_ADMIN = os.getenv('NEO_DB_ADMIN', 'root')
DB_PASSWD = os.getenv('NEO_DB_PASSWD', '')
class NeoTestBase(unittest.TestCase):
class NeoTestBase(unittest.TestCase):
def setUp(self):
sys.stdout.write(' * %s ' % (, ))
sys.stdout.flush() = self.setupLog()
......@@ -126,17 +142,15 @@ class NeoTestBase(unittest.TestCase):
test_case, ='.', 1)
logging.setup(os.path.join(getTempDirectory(), test_case + '.log'))
def tearDown(self,
success='ok' if sys.version_info < (2, 7) else 'success'):
def tearDown(self):
assert self.tearDown.im_func is NeoTestBase.tearDown.im_func
def _tearDown(self, success):
# Kill all unfinished transactions for next test.
# Note we don't even abort them because it may require a valid
# connection to a master node (see Storage.sync()).
class failureException(AssertionError):
def __init__(self, msg=None):
from ZODB.POSException import ConflictError
from ZODB.POSException import ConflictError
from ZODB.tests.StorageTestBase import zodb_pickle
from persistent import Persistent
from .. import expectedFailure
from . import NEOCluster, NEOFunctionalTest
def testExportFileStorageBug(self):
def testExportFileStorageBug(self):
# currently fails due to a bug in ZODB.FileStorage
from import TransactionManager, \
from neo.lib.connection import ConnectionClosed, MTClientConnection
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
from .. import expectedFailure, _UnexpectedSuccess
from . import ClientApplication, NEOCluster, NEOThreadedTest, Patch
from neo.lib.util import add64, makeChecksum
from neo.client.pool import CELL_CONNECTED, CELL_GOOD
self.assertEqual(self._testDeadlockAvoidance([2, 4]),
self.assertEqual(self._testDeadlockAvoidance([2, 4]),
[DelayedError, DelayedError, ConflictError, ConflictError])
def testDeadlockAvoidance(self):
# This test fail because deadlock avoidance is not fully implemented.
# 0: C1 -> S1
# XXX: This is an expected failure. A ttid column was added to
# XXX: This is an expected failure. A ttid column was added to
# 'trans' table to permit recovery, by checking that the
# transaction was really committed.
self.assertRaises(ConnectionClosed, t.commit)
raise _UnexpectedSuccess
except ConnectionClosed:
expectedFailure(self.assertIn)('x', c.root())
import unittest
# along with this program. If not, see <>.
import unittest
from ZODB.tests.PackableStorage import PackableStorageWithOptionalGC
except ImportError:
from ZODB.tests.PackableStorage import PackableStorage as \
from ZODB.tests.PackableStorage import PackableUndoStorage
from ZODB.tests.PackableStorage import \
PackableStorageWithOptionalGC, PackableUndoStorage
from . import ZODBTestCase
from .. import expectedFailure
from . import ZODBTestCase
class PackableTests(ZODBTestCase, StorageTestBase,
......@@ -32,6 +29,10 @@ class PackableTests(ZODBTestCase, StorageTestBase,
def setUp(self):
super(PackableTests, self).setUp(cluster_kw={'adapter': 'MySQL'})
checkPackAllRevisions = expectedFailure()(
checkPackUndoLog = expectedFailure()(PackableUndoStorage.checkPackUndoLog)
if __name__ == "__main__":
suite = unittest.makeSuite(PackableTests, 'check')
from ZODB.tests.StorageTestBase import StorageTestBase
from ZODB.tests.TransactionalUndoStorage import TransactionalUndoStorage
from ZODB.tests.ConflictResolution import ConflictResolvingTransUndoStorage
from .. import expectedFailure
from . import ZODBTestCase
class UndoTests(ZODBTestCase, StorageTestBase, TransactionalUndoStorage,
checkTransactionalUndoAfterPack = expectedFailure()(
# Don't run this test. It cannot run with pipelined store, and is not executed
# on Zeo - but because Zeo doesn't have an iterator, while Neo has.
revision[:7], os.path.basename(test_home), backend)
revision[:7], os.path.basename(test_home), backend)
if tests:[os.path.join(bin, 'neotestrunner'),
'-' + tests, '--title', 'NEO tests ' + title,
'-v' + tests, '--title', 'NEO tests ' + title,
] + sys.argv[1:arg_count])
if 'm' in tasks:[os.path.join(bin, 'python'),
