Commit 95ee26cc authored by YOU's avatar YOU Committed by Dylan Trotter

Add unittest module (#140)

parent 23140d0c
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
""""Utilities for manipulating and inspecting OS paths.""" """"Utilities for manipulating and inspecting OS paths."""
from __go__.os import Stat from __go__.os import Stat
from __go__.path.filepath import Abs, Clean, Dir as dirname, IsAbs as isabs, Join # pylint: disable=g-multiple-import,unused-import from __go__.path.filepath import Abs, Base as basename, Clean, Dir as dirname, IsAbs as isabs, Join # pylint: disable=g-multiple-import,unused-import
def abspath(path): def abspath(path):
......
...@@ -12,10 +12,13 @@ import _functools ...@@ -12,10 +12,13 @@ import _functools
partial = _functools.partial partial = _functools.partial
reduce = _functools.reduce reduce = _functools.reduce
def setattr(d, k, v):
d.__dict__[k] = v
# update_wrapper() and wraps() are tools to help write # update_wrapper() and wraps() are tools to help write
# wrapper functions that can handle naive introspection # wrapper functions that can handle naive introspection
WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__doc__') WRAPPER_ASSIGNMENTS = ('__module__', '__name__') #, '__doc__'
WRAPPER_UPDATES = ('__dict__',) WRAPPER_UPDATES = ('__dict__',)
def update_wrapper(wrapper, def update_wrapper(wrapper,
wrapped, wrapped,
......
# Dummy file to make this directory a package.
"""
Tests common to tuple, list and UserList.UserList
"""
import unittest
import sys
from test import test_support as support
# Various iterables
# This is used for checking the constructor (here and in test_deque.py)
def iterfunc(seqn):
'Regular generator'
for i in seqn:
yield i
class Sequence(object):
'Sequence using __getitem__'
def __init__(self, seqn):
self.seqn = seqn
def __getitem__(self, i):
return self.seqn[i]
class IterFunc(object):
'Sequence using iterator protocol'
def __init__(self, seqn):
self.seqn = seqn
self.i = 0
def __iter__(self):
return self
def next(self):
if self.i >= len(self.seqn): raise StopIteration
v = self.seqn[self.i]
self.i += 1
return v
class IterGen(object):
'Sequence using iterator protocol defined with a generator'
def __init__(self, seqn):
self.seqn = seqn
self.i = 0
def __iter__(self):
for val in self.seqn:
yield val
class IterNextOnly(object):
'Missing __getitem__ and __iter__'
def __init__(self, seqn):
self.seqn = seqn
self.i = 0
def next(self):
if self.i >= len(self.seqn): raise StopIteration
v = self.seqn[self.i]
self.i += 1
return v
class IterNoNext(object):
'Iterator missing next()'
def __init__(self, seqn):
self.seqn = seqn
self.i = 0
def __iter__(self):
return self
class IterGenExc(object):
'Test propagation of exceptions'
def __init__(self, seqn):
self.seqn = seqn
self.i = 0
def __iter__(self):
return self
def next(self):
3 // 0
class IterFuncStop(object):
'Test immediate stop'
def __init__(self, seqn):
pass
def __iter__(self):
return self
def next(self):
raise StopIteration
# from itertools import chain, imap
# def itermulti(seqn):
# 'Test multiple tiers of iterators'
# return chain(imap(lambda x:x, iterfunc(IterGen(Sequence(seqn)))))
class LyingTuple(tuple):
def __iter__(self):
yield 1
class LyingList(list):
def __iter__(self):
yield 1
class CommonTest(unittest.TestCase):
# The type to be tested
type2test = None
def test_constructors(self):
l0 = []
l1 = [0]
l2 = [0, 1]
u = self.type2test()
u0 = self.type2test(l0)
u1 = self.type2test(l1)
u2 = self.type2test(l2)
uu = self.type2test(u)
uu0 = self.type2test(u0)
uu1 = self.type2test(u1)
uu2 = self.type2test(u2)
v = self.type2test(tuple(u))
class OtherSeq(object):
def __init__(self, initseq):
self.__data = initseq
def __len__(self):
return len(self.__data)
def __getitem__(self, i):
return self.__data[i]
s = OtherSeq(u0)
v0 = self.type2test(s)
self.assertEqual(len(v0), len(s))
s = "this is also a sequence"
vv = self.type2test(s)
self.assertEqual(len(vv), len(s))
# Create from various iteratables
for s in ("123", "", range(1000), ('do', 1.2), xrange(2000,2200,5)):
for g in (Sequence, IterFunc, IterGen,
# itermulti, iterfunc):
iterfunc):
self.assertEqual(self.type2test(g(s)), self.type2test(s))
self.assertEqual(self.type2test(IterFuncStop(s)), self.type2test())
self.assertEqual(self.type2test(c for c in "123"), self.type2test("123"))
self.assertRaises(TypeError, self.type2test, IterNextOnly(s))
self.assertRaises(TypeError, self.type2test, IterNoNext(s))
self.assertRaises(ZeroDivisionError, self.type2test, IterGenExc(s))
# Issue #23757
self.assertEqual(self.type2test(LyingTuple((2,))), self.type2test((1,)))
self.assertEqual(self.type2test(LyingList([2])), self.type2test([1]))
def test_truth(self):
self.assertFalse(self.type2test())
self.assertTrue(self.type2test([42]))
def test_getitem(self):
u = self.type2test([0, 1, 2, 3, 4])
for i in xrange(len(u)):
self.assertEqual(u[i], i)
self.assertEqual(u[long(i)], i)
for i in xrange(-len(u), -1):
self.assertEqual(u[i], len(u)+i)
self.assertEqual(u[long(i)], len(u)+i)
self.assertRaises(IndexError, u.__getitem__, -len(u)-1)
self.assertRaises(IndexError, u.__getitem__, len(u))
self.assertRaises(ValueError, u.__getitem__, slice(0,10,0))
u = self.type2test()
self.assertRaises(IndexError, u.__getitem__, 0)
self.assertRaises(IndexError, u.__getitem__, -1)
self.assertRaises(TypeError, u.__getitem__)
a = self.type2test([10, 11])
self.assertEqual(a[0], 10)
self.assertEqual(a[1], 11)
self.assertEqual(a[-2], 10)
self.assertEqual(a[-1], 11)
self.assertRaises(IndexError, a.__getitem__, -3)
self.assertRaises(IndexError, a.__getitem__, 3)
def test_getslice(self):
l = [0, 1, 2, 3, 4]
u = self.type2test(l)
self.assertEqual(u[0:0], self.type2test())
self.assertEqual(u[1:2], self.type2test([1]))
self.assertEqual(u[-2:-1], self.type2test([3]))
self.assertEqual(u[-1000:1000], u)
self.assertEqual(u[1000:-1000], self.type2test([]))
self.assertEqual(u[:], u)
self.assertEqual(u[1:None], self.type2test([1, 2, 3, 4]))
self.assertEqual(u[None:3], self.type2test([0, 1, 2]))
# Extended slices
self.assertEqual(u[::], u)
self.assertEqual(u[::2], self.type2test([0, 2, 4]))
self.assertEqual(u[1::2], self.type2test([1, 3]))
self.assertEqual(u[::-1], self.type2test([4, 3, 2, 1, 0]))
self.assertEqual(u[::-2], self.type2test([4, 2, 0]))
self.assertEqual(u[3::-2], self.type2test([3, 1]))
self.assertEqual(u[3:3:-2], self.type2test([]))
self.assertEqual(u[3:2:-2], self.type2test([3]))
self.assertEqual(u[3:1:-2], self.type2test([3]))
self.assertEqual(u[3:0:-2], self.type2test([3, 1]))
self.assertEqual(u[::-100], self.type2test([4]))
self.assertEqual(u[100:-100:], self.type2test([]))
self.assertEqual(u[-100:100:], u)
self.assertEqual(u[100:-100:-1], u[::-1])
self.assertEqual(u[-100:100:-1], self.type2test([]))
self.assertEqual(u[-100L:100L:2L], self.type2test([0, 2, 4]))
# Test extreme cases with long ints
a = self.type2test([0,1,2,3,4])
self.assertEqual(a[ -pow(2,128L): 3 ], self.type2test([0,1,2]))
self.assertEqual(a[ 3: pow(2,145L) ], self.type2test([3,4]))
self.assertRaises(TypeError, u.__getslice__)
def test_contains(self):
u = self.type2test([0, 1, 2])
for i in u:
self.assertIn(i, u)
for i in min(u)-1, max(u)+1:
self.assertNotIn(i, u)
self.assertRaises(TypeError, u.__contains__)
def test_contains_fake(self):
class AllEq(object):
# Sequences must use rich comparison against each item
# (unless "is" is true, or an earlier item answered)
# So instances of AllEq must be found in all non-empty sequences.
def __eq__(self, other):
return True
__hash__ = None # Can't meet hash invariant requirements
self.assertNotIn(AllEq(), self.type2test([]))
self.assertIn(AllEq(), self.type2test([1]))
def test_contains_order(self):
# Sequences must test in-order. If a rich comparison has side
# effects, these will be visible to tests against later members.
# In this test, the "side effect" is a short-circuiting raise.
class DoNotTestEq(Exception):
pass
class StopCompares(object):
def __eq__(self, other):
raise DoNotTestEq
checkfirst = self.type2test([1, StopCompares()])
self.assertIn(1, checkfirst)
checklast = self.type2test([StopCompares(), 1])
self.assertRaises(DoNotTestEq, checklast.__contains__, 1)
def test_len(self):
self.assertEqual(len(self.type2test()), 0)
self.assertEqual(len(self.type2test([])), 0)
self.assertEqual(len(self.type2test([0])), 1)
self.assertEqual(len(self.type2test([0, 1, 2])), 3)
def test_minmax(self):
u = self.type2test([0, 1, 2])
self.assertEqual(min(u), 0)
self.assertEqual(max(u), 2)
def test_addmul(self):
u1 = self.type2test([0])
u2 = self.type2test([0, 1])
self.assertEqual(u1, u1 + self.type2test())
self.assertEqual(u1, self.type2test() + u1)
self.assertEqual(u1 + self.type2test([1]), u2)
self.assertEqual(self.type2test([-1]) + u1, self.type2test([-1, 0]))
self.assertEqual(self.type2test(), u2*0)
self.assertEqual(self.type2test(), 0*u2)
self.assertEqual(self.type2test(), u2*0L)
self.assertEqual(self.type2test(), 0L*u2)
self.assertEqual(u2, u2*1)
self.assertEqual(u2, 1*u2)
self.assertEqual(u2, u2*1L)
self.assertEqual(u2, 1L*u2)
self.assertEqual(u2+u2, u2*2)
self.assertEqual(u2+u2, 2*u2)
self.assertEqual(u2+u2, u2*2L)
self.assertEqual(u2+u2, 2L*u2)
self.assertEqual(u2+u2+u2, u2*3)
self.assertEqual(u2+u2+u2, 3*u2)
class subclass(self.type2test):
pass
u3 = subclass([0, 1])
self.assertEqual(u3, u3*1)
self.assertIsNot(u3, u3*1)
def test_iadd(self):
u = self.type2test([0, 1])
u += self.type2test()
self.assertEqual(u, self.type2test([0, 1]))
u += self.type2test([2, 3])
self.assertEqual(u, self.type2test([0, 1, 2, 3]))
u += self.type2test([4, 5])
self.assertEqual(u, self.type2test([0, 1, 2, 3, 4, 5]))
u = self.type2test("spam")
u += self.type2test("eggs")
self.assertEqual(u, self.type2test("spameggs"))
def test_imul(self):
u = self.type2test([0, 1])
u *= 3
self.assertEqual(u, self.type2test([0, 1, 0, 1, 0, 1]))
def test_getitemoverwriteiter(self):
# Verify that __getitem__ overrides are not recognized by __iter__
class T(self.type2test):
def __getitem__(self, key):
return str(key) + '!!!'
self.assertEqual(iter(T((1,2))).next(), 1)
def test_repeat(self):
for m in xrange(4):
s = tuple(range(m))
for n in xrange(-3, 5):
self.assertEqual(self.type2test(s*n), self.type2test(s)*n)
self.assertEqual(self.type2test(s)*(-4), self.type2test([]))
self.assertEqual(id(s), id(s*1))
def test_bigrepeat(self):
import sys
if sys.maxint <= 2147483647:
x = self.type2test([0])
# x *= 2**16
# self.assertRaises(MemoryError, x.__mul__, 2**16)
# if hasattr(x, '__imul__'):
# self.assertRaises(MemoryError, x.__imul__, 2**16)
x *= 1<<16
self.assertRaises(MemoryError, x.__mul__, 1<<16)
if hasattr(x, '__imul__'):
self.assertRaises(MemoryError, x.__imul__, 1<<16)
def test_subscript(self):
a = self.type2test([10, 11])
self.assertEqual(a.__getitem__(0L), 10)
self.assertEqual(a.__getitem__(1L), 11)
self.assertEqual(a.__getitem__(-2L), 10)
self.assertEqual(a.__getitem__(-1L), 11)
self.assertRaises(IndexError, a.__getitem__, -3)
self.assertRaises(IndexError, a.__getitem__, 3)
self.assertEqual(a.__getitem__(slice(0,1)), self.type2test([10]))
self.assertEqual(a.__getitem__(slice(1,2)), self.type2test([11]))
self.assertEqual(a.__getitem__(slice(0,2)), self.type2test([10, 11]))
self.assertEqual(a.__getitem__(slice(0,3)), self.type2test([10, 11]))
self.assertEqual(a.__getitem__(slice(3,5)), self.type2test([]))
self.assertRaises(ValueError, a.__getitem__, slice(0, 10, 0))
self.assertRaises(TypeError, a.__getitem__, 'x')
def test_count(self):
a = self.type2test([0, 1, 2])*3
self.assertEqual(a.count(0), 3)
self.assertEqual(a.count(1), 3)
self.assertEqual(a.count(3), 0)
self.assertRaises(TypeError, a.count)
class BadExc(Exception):
pass
class BadCmp(object):
def __eq__(self, other):
if other == 2:
raise BadExc()
return False
self.assertRaises(BadExc, a.count, BadCmp())
def test_index(self):
u = self.type2test([0, 1])
self.assertEqual(u.index(0), 0)
self.assertEqual(u.index(1), 1)
self.assertRaises(ValueError, u.index, 2)
u = self.type2test([-2, -1, 0, 0, 1, 2])
self.assertEqual(u.count(0), 2)
self.assertEqual(u.index(0), 2)
self.assertEqual(u.index(0, 2), 2)
self.assertEqual(u.index(-2, -10), 0)
self.assertEqual(u.index(0, 3), 3)
self.assertEqual(u.index(0, 3, 4), 3)
self.assertRaises(ValueError, u.index, 2, 0, -10)
self.assertRaises(TypeError, u.index)
class BadExc(Exception):
pass
class BadCmp(object):
def __eq__(self, other):
if other == 2:
raise BadExc()
return False
a = self.type2test([0, 1, 2, 3])
self.assertRaises(BadExc, a.index, BadCmp())
a = self.type2test([-2, -1, 0, 0, 1, 2])
self.assertEqual(a.index(0), 2)
self.assertEqual(a.index(0, 2), 2)
self.assertEqual(a.index(0, -4), 2)
self.assertEqual(a.index(-2, -10), 0)
self.assertEqual(a.index(0, 3), 3)
self.assertEqual(a.index(0, -3), 3)
self.assertEqual(a.index(0, 3, 4), 3)
self.assertEqual(a.index(0, -3, -2), 3)
self.assertEqual(a.index(0, -4*sys.maxint, 4*sys.maxint), 2)
self.assertRaises(ValueError, a.index, 0, 4*sys.maxint,-4*sys.maxint)
self.assertRaises(ValueError, a.index, 2, 0, -10)
def test_free_after_iterating(self):
support.check_free_after_iterating(self, iter, self.type2test)
support.check_free_after_iterating(self, reversed, self.type2test)
"""Supporting definitions for the Python regression tests."""
# if __name__ != 'test.test_support':
# raise ImportError('test_support must be imported from the test package')
# import contextlib
# import errno
# import functools
# import gc
# import socket
import sys
# import os
# import platform
# import shutil
# import warnings
import unittest
# import importlib
# import UserDict
# import re
# import time
# import struct
# import sysconfig
# try:
# import thread
# except ImportError:
# thread = None
__all__ = ["Error", "TestFailed", "have_unicode", "BasicTestRunner", "run_unittest"]
# __all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
# "verbose", "use_resources", "max_memuse", "record_original_stdout",
# "get_original_stdout", "unload", "unlink", "rmtree", "forget",
# "is_resource_enabled", "requires", "requires_mac_ver",
# "find_unused_port", "bind_port",
# "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
# "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
# "open_urlresource", "check_warnings", "check_py3k_warnings",
# "CleanImport", "EnvironmentVarGuard", "captured_output",
# "captured_stdout", "TransientResource", "transient_internet",
# "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
# "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
# "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
# "check_impl_detail", "get_attribute", "py3k_bytes",
# "import_fresh_module", "threading_cleanup", "reap_children",
# "strip_python_stderr", "IPV6_ENABLED", "run_with_tz"]
class Error(Exception):
"""Base class for regression test exceptions."""
class TestFailed(Error):
"""Test failed."""
# class ResourceDenied(unittest.SkipTest):
# """Test skipped because it requested a disallowed resource.
# This is raised when a test calls requires() for a resource that
# has not been enabled. It is used to distinguish between expected
# and unexpected skips.
# """
# @contextlib.contextmanager
# def _ignore_deprecated_imports(ignore=True):
# """Context manager to suppress package and module deprecation
# warnings when importing them.
# If ignore is False, this context manager has no effect."""
# if ignore:
# with warnings.catch_warnings():
# warnings.filterwarnings("ignore", ".+ (module|package)",
# DeprecationWarning)
# yield
# else:
# yield
# def import_module(name, deprecated=False):
# """Import and return the module to be tested, raising SkipTest if
# it is not available.
# If deprecated is True, any module or package deprecation messages
# will be suppressed."""
# with _ignore_deprecated_imports(deprecated):
# try:
# return importlib.import_module(name)
# except ImportError, msg:
# raise unittest.SkipTest(str(msg))
# def _save_and_remove_module(name, orig_modules):
# """Helper function to save and remove a module from sys.modules
# Raise ImportError if the module can't be imported."""
# # try to import the module and raise an error if it can't be imported
# if name not in sys.modules:
# __import__(name)
# del sys.modules[name]
# for modname in list(sys.modules):
# if modname == name or modname.startswith(name + '.'):
# orig_modules[modname] = sys.modules[modname]
# del sys.modules[modname]
# def _save_and_block_module(name, orig_modules):
# """Helper function to save and block a module in sys.modules
# Return True if the module was in sys.modules, False otherwise."""
# saved = True
# try:
# orig_modules[name] = sys.modules[name]
# except KeyError:
# saved = False
# sys.modules[name] = None
# return saved
# def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
# """Imports and returns a module, deliberately bypassing the sys.modules cache
# and importing a fresh copy of the module. Once the import is complete,
# the sys.modules cache is restored to its original state.
# Modules named in fresh are also imported anew if needed by the import.
# If one of these modules can't be imported, None is returned.
# Importing of modules named in blocked is prevented while the fresh import
# takes place.
# If deprecated is True, any module or package deprecation messages
# will be suppressed."""
# # NOTE: test_heapq, test_json, and test_warnings include extra sanity
# # checks to make sure that this utility function is working as expected
# with _ignore_deprecated_imports(deprecated):
# # Keep track of modules saved for later restoration as well
# # as those which just need a blocking entry removed
# orig_modules = {}
# names_to_remove = []
# _save_and_remove_module(name, orig_modules)
# try:
# for fresh_name in fresh:
# _save_and_remove_module(fresh_name, orig_modules)
# for blocked_name in blocked:
# if not _save_and_block_module(blocked_name, orig_modules):
# names_to_remove.append(blocked_name)
# fresh_module = importlib.import_module(name)
# except ImportError:
# fresh_module = None
# finally:
# for orig_name, module in orig_modules.items():
# sys.modules[orig_name] = module
# for name_to_remove in names_to_remove:
# del sys.modules[name_to_remove]
# return fresh_module
# def get_attribute(obj, name):
# """Get an attribute, raising SkipTest if AttributeError is raised."""
# try:
# attribute = getattr(obj, name)
# except AttributeError:
# raise unittest.SkipTest("module %s has no attribute %s" % (
# obj.__name__, name))
# else:
# return attribute
verbose = 1 # Flag set to 0 by regrtest.py
# use_resources = None # Flag set to [] by regrtest.py
# max_memuse = 0 # Disable bigmem tests (they will still be run with
# # small sizes, to make sure they work.)
# real_max_memuse = 0
# # _original_stdout is meant to hold stdout at the time regrtest began.
# # This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
# # The point is to have some flavor of stdout the user can actually see.
# _original_stdout = None
# def record_original_stdout(stdout):
# global _original_stdout
# _original_stdout = stdout
# def get_original_stdout():
# return _original_stdout or sys.stdout
# def unload(name):
# try:
# del sys.modules[name]
# except KeyError:
# pass
# if sys.platform.startswith("win"):
# def _waitfor(func, pathname, waitall=False):
# # Perform the operation
# func(pathname)
# # Now setup the wait loop
# if waitall:
# dirname = pathname
# else:
# dirname, name = os.path.split(pathname)
# dirname = dirname or '.'
# # Check for `pathname` to be removed from the filesystem.
# # The exponential backoff of the timeout amounts to a total
# # of ~1 second after which the deletion is probably an error
# # anyway.
# # Testing on a i7@4.3GHz shows that usually only 1 iteration is
# # required when contention occurs.
# timeout = 0.001
# while timeout < 1.0:
# # Note we are only testing for the existence of the file(s) in
# # the contents of the directory regardless of any security or
# # access rights. If we have made it this far, we have sufficient
# # permissions to do that much using Python's equivalent of the
# # Windows API FindFirstFile.
# # Other Windows APIs can fail or give incorrect results when
# # dealing with files that are pending deletion.
# L = os.listdir(dirname)
# if not (L if waitall else name in L):
# return
# # Increase the timeout and try again
# time.sleep(timeout)
# timeout *= 2
# warnings.warn('tests may fail, delete still pending for ' + pathname,
# RuntimeWarning, stacklevel=4)
# def _unlink(filename):
# _waitfor(os.unlink, filename)
# def _rmdir(dirname):
# _waitfor(os.rmdir, dirname)
# def _rmtree(path):
# def _rmtree_inner(path):
# for name in os.listdir(path):
# fullname = os.path.join(path, name)
# if os.path.isdir(fullname):
# _waitfor(_rmtree_inner, fullname, waitall=True)
# os.rmdir(fullname)
# else:
# os.unlink(fullname)
# _waitfor(_rmtree_inner, path, waitall=True)
# _waitfor(os.rmdir, path)
# else:
# _unlink = os.unlink
# _rmdir = os.rmdir
# _rmtree = shutil.rmtree
# def unlink(filename):
# try:
# _unlink(filename)
# except OSError:
# pass
# def rmdir(dirname):
# try:
# _rmdir(dirname)
# except OSError as error:
# # The directory need not exist.
# if error.errno != errno.ENOENT:
# raise
# def rmtree(path):
# try:
# _rmtree(path)
# except OSError, e:
# # Unix returns ENOENT, Windows returns ESRCH.
# if e.errno not in (errno.ENOENT, errno.ESRCH):
# raise
# def forget(modname):
# '''"Forget" a module was ever imported by removing it from sys.modules and
# deleting any .pyc and .pyo files.'''
# unload(modname)
# for dirname in sys.path:
# unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
# # Deleting the .pyo file cannot be within the 'try' for the .pyc since
# # the chance exists that there is no .pyc (and thus the 'try' statement
# # is exited) but there is a .pyo file.
# unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
# # Check whether a gui is actually available
# def _is_gui_available():
# if hasattr(_is_gui_available, 'result'):
# return _is_gui_available.result
# reason = None
# if sys.platform.startswith('win'):
# # if Python is running as a service (such as the buildbot service),
# # gui interaction may be disallowed
# import ctypes
# import ctypes.wintypes
# UOI_FLAGS = 1
# WSF_VISIBLE = 0x0001
# class USEROBJECTFLAGS(ctypes.Structure):
# _fields_ = [("fInherit", ctypes.wintypes.BOOL),
# ("fReserved", ctypes.wintypes.BOOL),
# ("dwFlags", ctypes.wintypes.DWORD)]
# dll = ctypes.windll.user32
# h = dll.GetProcessWindowStation()
# if not h:
# raise ctypes.WinError()
# uof = USEROBJECTFLAGS()
# needed = ctypes.wintypes.DWORD()
# res = dll.GetUserObjectInformationW(h,
# UOI_FLAGS,
# ctypes.byref(uof),
# ctypes.sizeof(uof),
# ctypes.byref(needed))
# if not res:
# raise ctypes.WinError()
# if not bool(uof.dwFlags & WSF_VISIBLE):
# reason = "gui not available (WSF_VISIBLE flag not set)"
# elif sys.platform == 'darwin':
# # The Aqua Tk implementations on OS X can abort the process if
# # being called in an environment where a window server connection
# # cannot be made, for instance when invoked by a buildbot or ssh
# # process not running under the same user id as the current console
# # user. To avoid that, raise an exception if the window manager
# # connection is not available.
# from ctypes import cdll, c_int, pointer, Structure
# from ctypes.util import find_library
# app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
# if app_services.CGMainDisplayID() == 0:
# reason = "gui tests cannot run without OS X window manager"
# else:
# class ProcessSerialNumber(Structure):
# _fields_ = [("highLongOfPSN", c_int),
# ("lowLongOfPSN", c_int)]
# psn = ProcessSerialNumber()
# psn_p = pointer(psn)
# if ( (app_services.GetCurrentProcess(psn_p) < 0) or
# (app_services.SetFrontProcess(psn_p) < 0) ):
# reason = "cannot run without OS X gui process"
# # check on every platform whether tkinter can actually do anything
# if not reason:
# try:
# from Tkinter import Tk
# root = Tk()
# root.update()
# root.destroy()
# except Exception as e:
# err_string = str(e)
# if len(err_string) > 50:
# err_string = err_string[:50] + ' [...]'
# reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
# err_string)
# _is_gui_available.reason = reason
# _is_gui_available.result = not reason
# return _is_gui_available.result
# def is_resource_enabled(resource):
# """Test whether a resource is enabled.
# Known resources are set by regrtest.py. If not running under regrtest.py,
# all resources are assumed enabled unless use_resources has been set.
# """
# return use_resources is None or resource in use_resources
# def requires(resource, msg=None):
# """Raise ResourceDenied if the specified resource is not available."""
# if resource == 'gui' and not _is_gui_available():
# raise ResourceDenied(_is_gui_available.reason)
# if not is_resource_enabled(resource):
# if msg is None:
# msg = "Use of the `%s' resource not enabled" % resource
# raise ResourceDenied(msg)
# def requires_mac_ver(*min_version):
# """Decorator raising SkipTest if the OS is Mac OS X and the OS X
# version if less than min_version.
# For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
# is lesser than 10.5.
# """
# def decorator(func):
# @functools.wraps(func)
# def wrapper(*args, **kw):
# if sys.platform == 'darwin':
# version_txt = platform.mac_ver()[0]
# try:
# version = tuple(map(int, version_txt.split('.')))
# except ValueError:
# pass
# else:
# if version < min_version:
# min_version_txt = '.'.join(map(str, min_version))
# raise unittest.SkipTest(
# "Mac OS X %s or higher required, not %s"
# % (min_version_txt, version_txt))
# return func(*args, **kw)
# wrapper.min_version = min_version
# return wrapper
# return decorator
# # Don't use "localhost", since resolving it uses the DNS under recent
# # Windows versions (see issue #18792).
# HOST = "127.0.0.1"
# HOSTv6 = "::1"
# def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
# """Returns an unused port that should be suitable for binding. This is
# achieved by creating a temporary socket with the same family and type as
# the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
# the specified host address (defaults to 0.0.0.0) with the port set to 0,
# eliciting an unused ephemeral port from the OS. The temporary socket is
# then closed and deleted, and the ephemeral port is returned.
# Either this method or bind_port() should be used for any tests where a
# server socket needs to be bound to a particular port for the duration of
# the test. Which one to use depends on whether the calling code is creating
# a python socket, or if an unused port needs to be provided in a constructor
# or passed to an external program (i.e. the -accept argument to openssl's
# s_server mode). Always prefer bind_port() over find_unused_port() where
# possible. Hard coded ports should *NEVER* be used. As soon as a server
# socket is bound to a hard coded port, the ability to run multiple instances
# of the test simultaneously on the same host is compromised, which makes the
# test a ticking time bomb in a buildbot environment. On Unix buildbots, this
# may simply manifest as a failed test, which can be recovered from without
# intervention in most cases, but on Windows, the entire python process can
# completely and utterly wedge, requiring someone to log in to the buildbot
# and manually kill the affected process.
# (This is easy to reproduce on Windows, unfortunately, and can be traced to
# the SO_REUSEADDR socket option having different semantics on Windows versus
# Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
# listen and then accept connections on identical host/ports. An EADDRINUSE
# socket.error will be raised at some point (depending on the platform and
# the order bind and listen were called on each socket).
# However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
# will ever be raised when attempting to bind two identical host/ports. When
# accept() is called on each socket, the second caller's process will steal
# the port from the first caller, leaving them both in an awkwardly wedged
# state where they'll no longer respond to any signals or graceful kills, and
# must be forcibly killed via OpenProcess()/TerminateProcess().
# The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
# instead of SO_REUSEADDR, which effectively affords the same semantics as
# SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
# Source world compared to Windows ones, this is a common mistake. A quick
# look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
# openssl.exe is called with the 's_server' option, for example. See
# http://bugs.python.org/issue2550 for more info. The following site also
# has a very thorough description about the implications of both REUSEADDR
# and EXCLUSIVEADDRUSE on Windows:
# http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
# XXX: although this approach is a vast improvement on previous attempts to
# elicit unused ports, it rests heavily on the assumption that the ephemeral
# port returned to us by the OS won't immediately be dished back out to some
# other process when we close and delete our temporary socket but before our
# calling code has a chance to bind the returned port. We can deal with this
# issue if/when we come across it."""
# tempsock = socket.socket(family, socktype)
# port = bind_port(tempsock)
# tempsock.close()
# del tempsock
# return port
# def bind_port(sock, host=HOST):
# """Bind the socket to a free port and return the port number. Relies on
# ephemeral ports in order to ensure we are using an unbound port. This is
# important as many tests may be running simultaneously, especially in a
# buildbot environment. This method raises an exception if the sock.family
# is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
# or SO_REUSEPORT set on it. Tests should *never* set these socket options
# for TCP/IP sockets. The only case for setting these options is testing
# multicasting via multiple UDP sockets.
# Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
# on Windows), it will be set on the socket. This will prevent anyone else
# from bind()'ing to our host/port for the duration of the test.
# """
# if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
# if hasattr(socket, 'SO_REUSEADDR'):
# if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
# raise TestFailed("tests should never set the SO_REUSEADDR " \
# "socket option on TCP/IP sockets!")
# if hasattr(socket, 'SO_REUSEPORT'):
# try:
# if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
# raise TestFailed("tests should never set the SO_REUSEPORT " \
# "socket option on TCP/IP sockets!")
# except EnvironmentError:
# # Python's socket module was compiled using modern headers
# # thus defining SO_REUSEPORT but this process is running
# # under an older kernel that does not support SO_REUSEPORT.
# pass
# if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
# sock.bind((host, 0))
# port = sock.getsockname()[1]
# return port
# def _is_ipv6_enabled():
# """Check whether IPv6 is enabled on this host."""
# if socket.has_ipv6:
# sock = None
# try:
# sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
# sock.bind((HOSTv6, 0))
# return True
# except socket.error:
# pass
# finally:
# if sock:
# sock.close()
# return False
# IPV6_ENABLED = _is_ipv6_enabled()
# def system_must_validate_cert(f):
# """Skip the test on TLS certificate validation failures."""
# @functools.wraps(f)
# def dec(*args, **kwargs):
# try:
# f(*args, **kwargs)
# except IOError as e:
# if "CERTIFICATE_VERIFY_FAILED" in str(e):
# raise unittest.SkipTest("system does not contain "
# "necessary certificates")
# raise
# return dec
# FUZZ = 1e-6
# def fcmp(x, y): # fuzzy comparison function
# if isinstance(x, float) or isinstance(y, float):
# try:
# fuzz = (abs(x) + abs(y)) * FUZZ
# if abs(x-y) <= fuzz:
# return 0
# except:
# pass
# elif type(x) == type(y) and isinstance(x, (tuple, list)):
# for i in range(min(len(x), len(y))):
# outcome = fcmp(x[i], y[i])
# if outcome != 0:
# return outcome
# return (len(x) > len(y)) - (len(x) < len(y))
# return (x > y) - (x < y)
# # A constant likely larger than the underlying OS pipe buffer size, to
# # make writes blocking.
# # Windows limit seems to be around 512 B, and many Unix kernels have a
# # 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
# # (see issue #17835 for a discussion of this number).
# PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
# # A constant likely larger than the underlying OS socket buffer size, to make
# # writes blocking.
# # The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
# # on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
# # for a discussion of this number).
# SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
# is_jython = sys.platform.startswith('java')
try:
unicode
have_unicode = True
except NameError:
have_unicode = False
# requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support')
# def u(s):
# return unicode(s, 'unicode-escape')
# # FS_NONASCII: non-ASCII Unicode character encodable by
# # sys.getfilesystemencoding(), or None if there is no such character.
# FS_NONASCII = None
# if have_unicode:
# for character in (
# # First try printable and common characters to have a readable filename.
# # For each character, the encoding list are just example of encodings able
# # to encode the character (the list is not exhaustive).
# # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
# unichr(0x00E6),
# # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
# unichr(0x0130),
# # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
# unichr(0x0141),
# # U+03C6 (Greek Small Letter Phi): cp1253
# unichr(0x03C6),
# # U+041A (Cyrillic Capital Letter Ka): cp1251
# unichr(0x041A),
# # U+05D0 (Hebrew Letter Alef): Encodable to cp424
# unichr(0x05D0),
# # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
# unichr(0x060C),
# # U+062A (Arabic Letter Teh): cp720
# unichr(0x062A),
# # U+0E01 (Thai Character Ko Kai): cp874
# unichr(0x0E01),
# # Then try more "special" characters. "special" because they may be
# # interpreted or displayed differently depending on the exact locale
# # encoding and the font.
# # U+00A0 (No-Break Space)
# unichr(0x00A0),
# # U+20AC (Euro Sign)
# unichr(0x20AC),
# ):
# try:
# character.encode(sys.getfilesystemencoding())\
# .decode(sys.getfilesystemencoding())
# except UnicodeError:
# pass
# else:
# FS_NONASCII = character
# break
# # Filename used for testing
# if os.name == 'java':
# # Jython disallows @ in module names
# TESTFN = '$test'
# elif os.name == 'riscos':
# TESTFN = 'testfile'
# else:
# TESTFN = '@test'
# # Unicode name only used if TEST_FN_ENCODING exists for the platform.
# if have_unicode:
# # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
# # TESTFN_UNICODE is a filename that can be encoded using the
# # file system encoding, but *not* with the default (ascii) encoding
# if isinstance('', unicode):
# # python -U
# # XXX perhaps unicode() should accept Unicode strings?
# TESTFN_UNICODE = "@test-\xe0\xf2"
# else:
# # 2 latin characters.
# TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
# TESTFN_ENCODING = sys.getfilesystemencoding()
# # TESTFN_UNENCODABLE is a filename that should *not* be
# # able to be encoded by *either* the default or filesystem encoding.
# # This test really only makes sense on Windows NT platforms
# # which have special Unicode support in posixmodule.
# if (not hasattr(sys, "getwindowsversion") or
# sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME
# TESTFN_UNENCODABLE = None
# else:
# # Japanese characters (I think - from bug 846133)
# TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
# try:
# # XXX - Note - should be using TESTFN_ENCODING here - but for
# # Windows, "mbcs" currently always operates as if in
# # errors=ignore' mode - hence we get '?' characters rather than
# # the exception. 'Latin1' operates as we expect - ie, fails.
# # See [ 850997 ] mbcs encoding ignores errors
# TESTFN_UNENCODABLE.encode("Latin1")
# except UnicodeEncodeError:
# pass
# else:
# print \
# 'WARNING: The filename %r CAN be encoded by the filesystem. ' \
# 'Unicode filename tests may not be effective' \
# % TESTFN_UNENCODABLE
# # Disambiguate TESTFN for parallel testing, while letting it remain a valid
# # module name.
# TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
# # Save the initial cwd
# SAVEDCWD = os.getcwd()
# @contextlib.contextmanager
# def change_cwd(path, quiet=False):
# """Return a context manager that changes the current working directory.
# Arguments:
# path: the directory to use as the temporary current working directory.
# quiet: if False (the default), the context manager raises an exception
# on error. Otherwise, it issues only a warning and keeps the current
# working directory the same.
# """
# saved_dir = os.getcwd()
# try:
# os.chdir(path)
# except OSError:
# if not quiet:
# raise
# warnings.warn('tests may fail, unable to change CWD to: ' + path,
# RuntimeWarning, stacklevel=3)
# try:
# yield os.getcwd()
# finally:
# os.chdir(saved_dir)
# @contextlib.contextmanager
# def temp_cwd(name='tempcwd', quiet=False):
# """
# Context manager that creates a temporary directory and set it as CWD.
# The new CWD is created in the current directory and it's named *name*.
# If *quiet* is False (default) and it's not possible to create or change
# the CWD, an error is raised. If it's True, only a warning is raised
# and the original CWD is used.
# """
# if (have_unicode and isinstance(name, unicode) and
# not os.path.supports_unicode_filenames):
# try:
# name = name.encode(sys.getfilesystemencoding() or 'ascii')
# except UnicodeEncodeError:
# if not quiet:
# raise unittest.SkipTest('unable to encode the cwd name with '
# 'the filesystem encoding.')
# saved_dir = os.getcwd()
# is_temporary = False
# try:
# os.mkdir(name)
# os.chdir(name)
# is_temporary = True
# except OSError:
# if not quiet:
# raise
# warnings.warn('tests may fail, unable to change the CWD to ' + name,
# RuntimeWarning, stacklevel=3)
# try:
# yield os.getcwd()
# finally:
# os.chdir(saved_dir)
# if is_temporary:
# rmtree(name)
# def findfile(file, here=__file__, subdir=None):
# """Try to find a file on sys.path and the working directory. If it is not
# found the argument passed to the function is returned (this does not
# necessarily signal failure; could still be the legitimate path)."""
# if os.path.isabs(file):
# return file
# if subdir is not None:
# file = os.path.join(subdir, file)
# path = sys.path
# path = [os.path.dirname(here)] + path
# for dn in path:
# fn = os.path.join(dn, file)
# if os.path.exists(fn): return fn
# return file
# def sortdict(dict):
# "Like repr(dict), but in sorted order."
# items = dict.items()
# items.sort()
# reprpairs = ["%r: %r" % pair for pair in items]
# withcommas = ", ".join(reprpairs)
# return "{%s}" % withcommas
# def make_bad_fd():
# """
# Create an invalid file descriptor by opening and closing a file and return
# its fd.
# """
# file = open(TESTFN, "wb")
# try:
# return file.fileno()
# finally:
# file.close()
# unlink(TESTFN)
# def check_syntax_error(testcase, statement):
# testcase.assertRaises(SyntaxError, compile, statement,
# '<test string>', 'exec')
# def open_urlresource(url, check=None):
# import urlparse, urllib2
# filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
# fn = os.path.join(os.path.dirname(__file__), "data", filename)
# def check_valid_file(fn):
# f = open(fn)
# if check is None:
# return f
# elif check(f):
# f.seek(0)
# return f
# f.close()
# if os.path.exists(fn):
# f = check_valid_file(fn)
# if f is not None:
# return f
# unlink(fn)
# # Verify the requirement before downloading the file
# requires('urlfetch')
# print >> get_original_stdout(), '\tfetching %s ...' % url
# f = urllib2.urlopen(url, timeout=15)
# try:
# with open(fn, "wb") as out:
# s = f.read()
# while s:
# out.write(s)
# s = f.read()
# finally:
# f.close()
# f = check_valid_file(fn)
# if f is not None:
# return f
# raise TestFailed('invalid resource "%s"' % fn)
# class WarningsRecorder(object):
# """Convenience wrapper for the warnings list returned on
# entry to the warnings.catch_warnings() context manager.
# """
# def __init__(self, warnings_list):
# self._warnings = warnings_list
# self._last = 0
# def __getattr__(self, attr):
# if len(self._warnings) > self._last:
# return getattr(self._warnings[-1], attr)
# elif attr in warnings.WarningMessage._WARNING_DETAILS:
# return None
# raise AttributeError("%r has no attribute %r" % (self, attr))
# @property
# def warnings(self):
# return self._warnings[self._last:]
# def reset(self):
# self._last = len(self._warnings)
# def _filterwarnings(filters, quiet=False):
# """Catch the warnings, then check if all the expected
# warnings have been raised and re-raise unexpected warnings.
# If 'quiet' is True, only re-raise the unexpected warnings.
# """
# # Clear the warning registry of the calling module
# # in order to re-raise the warnings.
# frame = sys._getframe(2)
# registry = frame.f_globals.get('__warningregistry__')
# if registry:
# registry.clear()
# with warnings.catch_warnings(record=True) as w:
# # Set filter "always" to record all warnings. Because
# # test_warnings swap the module, we need to look up in
# # the sys.modules dictionary.
# sys.modules['warnings'].simplefilter("always")
# yield WarningsRecorder(w)
# # Filter the recorded warnings
# reraise = [warning.message for warning in w]
# missing = []
# for msg, cat in filters:
# seen = False
# for exc in reraise[:]:
# message = str(exc)
# # Filter out the matching messages
# if (re.match(msg, message, re.I) and
# issubclass(exc.__class__, cat)):
# seen = True
# reraise.remove(exc)
# if not seen and not quiet:
# # This filter caught nothing
# missing.append((msg, cat.__name__))
# if reraise:
# raise AssertionError("unhandled warning %r" % reraise[0])
# if missing:
# raise AssertionError("filter (%r, %s) did not catch any warning" %
# missing[0])
# @contextlib.contextmanager
# def check_warnings(*filters, **kwargs):
# """Context manager to silence warnings.
# Accept 2-tuples as positional arguments:
# ("message regexp", WarningCategory)
# Optional argument:
# - if 'quiet' is True, it does not fail if a filter catches nothing
# (default True without argument,
# default False if some filters are defined)
# Without argument, it defaults to:
# check_warnings(("", Warning), quiet=True)
# """
# quiet = kwargs.get('quiet')
# if not filters:
# filters = (("", Warning),)
# # Preserve backward compatibility
# if quiet is None:
# quiet = True
# return _filterwarnings(filters, quiet)
# @contextlib.contextmanager
# def check_py3k_warnings(*filters, **kwargs):
# """Context manager to silence py3k warnings.
# Accept 2-tuples as positional arguments:
# ("message regexp", WarningCategory)
# Optional argument:
# - if 'quiet' is True, it does not fail if a filter catches nothing
# (default False)
# Without argument, it defaults to:
# check_py3k_warnings(("", DeprecationWarning), quiet=False)
# """
# if sys.py3kwarning:
# if not filters:
# filters = (("", DeprecationWarning),)
# else:
# # It should not raise any py3k warning
# filters = ()
# return _filterwarnings(filters, kwargs.get('quiet'))
# class CleanImport(object):
# """Context manager to force import to return a new module reference.
# This is useful for testing module-level behaviours, such as
# the emission of a DeprecationWarning on import.
# Use like this:
# with CleanImport("foo"):
# importlib.import_module("foo") # new reference
# """
# def __init__(self, *module_names):
# self.original_modules = sys.modules.copy()
# for module_name in module_names:
# if module_name in sys.modules:
# module = sys.modules[module_name]
# # It is possible that module_name is just an alias for
# # another module (e.g. stub for modules renamed in 3.x).
# # In that case, we also need delete the real module to clear
# # the import cache.
# if module.__name__ != module_name:
# del sys.modules[module.__name__]
# del sys.modules[module_name]
# def __enter__(self):
# return self
# def __exit__(self, *ignore_exc):
# sys.modules.update(self.original_modules)
# class EnvironmentVarGuard(UserDict.DictMixin):
# """Class to help protect the environment variable properly. Can be used as
# a context manager."""
# def __init__(self):
# self._environ = os.environ
# self._changed = {}
# def __getitem__(self, envvar):
# return self._environ[envvar]
# def __setitem__(self, envvar, value):
# # Remember the initial value on the first access
# if envvar not in self._changed:
# self._changed[envvar] = self._environ.get(envvar)
# self._environ[envvar] = value
# def __delitem__(self, envvar):
# # Remember the initial value on the first access
# if envvar not in self._changed:
# self._changed[envvar] = self._environ.get(envvar)
# if envvar in self._environ:
# del self._environ[envvar]
# def keys(self):
# return self._environ.keys()
# def set(self, envvar, value):
# self[envvar] = value
# def unset(self, envvar):
# del self[envvar]
# def __enter__(self):
# return self
# def __exit__(self, *ignore_exc):
# for (k, v) in self._changed.items():
# if v is None:
# if k in self._environ:
# del self._environ[k]
# else:
# self._environ[k] = v
# os.environ = self._environ
# class DirsOnSysPath(object):
# """Context manager to temporarily add directories to sys.path.
# This makes a copy of sys.path, appends any directories given
# as positional arguments, then reverts sys.path to the copied
# settings when the context ends.
# Note that *all* sys.path modifications in the body of the
# context manager, including replacement of the object,
# will be reverted at the end of the block.
# """
# def __init__(self, *paths):
# self.original_value = sys.path[:]
# self.original_object = sys.path
# sys.path.extend(paths)
# def __enter__(self):
# return self
# def __exit__(self, *ignore_exc):
# sys.path = self.original_object
# sys.path[:] = self.original_value
# class TransientResource(object):
# """Raise ResourceDenied if an exception is raised while the context manager
# is in effect that matches the specified exception and attributes."""
# def __init__(self, exc, **kwargs):
# self.exc = exc
# self.attrs = kwargs
# def __enter__(self):
# return self
# def __exit__(self, type_=None, value=None, traceback=None):
# """If type_ is a subclass of self.exc and value has attributes matching
# self.attrs, raise ResourceDenied. Otherwise let the exception
# propagate (if any)."""
# if type_ is not None and issubclass(self.exc, type_):
# for attr, attr_value in self.attrs.iteritems():
# if not hasattr(value, attr):
# break
# if getattr(value, attr) != attr_value:
# break
# else:
# raise ResourceDenied("an optional resource is not available")
# @contextlib.contextmanager
# def transient_internet(resource_name, timeout=30.0, errnos=()):
# """Return a context manager that raises ResourceDenied when various issues
# with the Internet connection manifest themselves as exceptions."""
# default_errnos = [
# ('ECONNREFUSED', 111),
# ('ECONNRESET', 104),
# ('EHOSTUNREACH', 113),
# ('ENETUNREACH', 101),
# ('ETIMEDOUT', 110),
# ]
# default_gai_errnos = [
# ('EAI_AGAIN', -3),
# ('EAI_FAIL', -4),
# ('EAI_NONAME', -2),
# ('EAI_NODATA', -5),
# # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
# # implementation actually returns WSANO_DATA i.e. 11004.
# ('WSANO_DATA', 11004),
# ]
# denied = ResourceDenied("Resource '%s' is not available" % resource_name)
# captured_errnos = errnos
# gai_errnos = []
# if not captured_errnos:
# captured_errnos = [getattr(errno, name, num)
# for (name, num) in default_errnos]
# gai_errnos = [getattr(socket, name, num)
# for (name, num) in default_gai_errnos]
# def filter_error(err):
# n = getattr(err, 'errno', None)
# if (isinstance(err, socket.timeout) or
# (isinstance(err, socket.gaierror) and n in gai_errnos) or
# n in captured_errnos):
# if not verbose:
# sys.stderr.write(denied.args[0] + "\n")
# raise denied
# old_timeout = socket.getdefaulttimeout()
# try:
# if timeout is not None:
# socket.setdefaulttimeout(timeout)
# yield
# except IOError as err:
# # urllib can wrap original socket errors multiple times (!), we must
# # unwrap to get at the original error.
# while True:
# a = err.args
# if len(a) >= 1 and isinstance(a[0], IOError):
# err = a[0]
# # The error can also be wrapped as args[1]:
# # except socket.error as msg:
# # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
# elif len(a) >= 2 and isinstance(a[1], IOError):
# err = a[1]
# else:
# break
# filter_error(err)
# raise
# # XXX should we catch generic exceptions and look for their
# # __cause__ or __context__?
# finally:
# socket.setdefaulttimeout(old_timeout)
# @contextlib.contextmanager
# def captured_output(stream_name):
# """Return a context manager used by captured_stdout and captured_stdin
# that temporarily replaces the sys stream *stream_name* with a StringIO."""
# import StringIO
# orig_stdout = getattr(sys, stream_name)
# setattr(sys, stream_name, StringIO.StringIO())
# try:
# yield getattr(sys, stream_name)
# finally:
# setattr(sys, stream_name, orig_stdout)
# def captured_stdout():
# """Capture the output of sys.stdout:
# with captured_stdout() as s:
# print "hello"
# self.assertEqual(s.getvalue(), "hello")
# """
# return captured_output("stdout")
# def captured_stderr():
# return captured_output("stderr")
# def captured_stdin():
# return captured_output("stdin")
# def gc_collect():
# """Force as many objects as possible to be collected.
# In non-CPython implementations of Python, this is needed because timely
# deallocation is not guaranteed by the garbage collector. (Even in CPython
# this can be the case in case of reference cycles.) This means that __del__
# methods may be called later than expected and weakrefs may remain alive for
# longer than expected. This function tries its best to force all garbage
# objects to disappear.
# """
# gc.collect()
# if is_jython:
# time.sleep(0.1)
# gc.collect()
# gc.collect()
# _header = '2P'
# if hasattr(sys, "gettotalrefcount"):
# _header = '2P' + _header
# _vheader = _header + 'P'
# def calcobjsize(fmt):
# return struct.calcsize(_header + fmt + '0P')
# def calcvobjsize(fmt):
# return struct.calcsize(_vheader + fmt + '0P')
# _TPFLAGS_HAVE_GC = 1<<14
# _TPFLAGS_HEAPTYPE = 1<<9
# def check_sizeof(test, o, size):
# import _testcapi
# result = sys.getsizeof(o)
# # add GC header size
# if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
# ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
# size += _testcapi.SIZEOF_PYGC_HEAD
# msg = 'wrong size for %s: got %d, expected %d' \
# % (type(o), result, size)
# test.assertEqual(result, size, msg)
# #=======================================================================
# # Decorator for running a function in a different locale, correctly resetting
# # it afterwards.
# def run_with_locale(catstr, *locales):
# def decorator(func):
# def inner(*args, **kwds):
# try:
# import locale
# category = getattr(locale, catstr)
# orig_locale = locale.setlocale(category)
# except AttributeError:
# # if the test author gives us an invalid category string
# raise
# except:
# # cannot retrieve original locale, so do nothing
# locale = orig_locale = None
# else:
# for loc in locales:
# try:
# locale.setlocale(category, loc)
# break
# except:
# pass
# # now run the function, resetting the locale on exceptions
# try:
# return func(*args, **kwds)
# finally:
# if locale and orig_locale:
# locale.setlocale(category, orig_locale)
# inner.func_name = func.func_name
# inner.__doc__ = func.__doc__
# return inner
# return decorator
# #=======================================================================
# # Decorator for running a function in a specific timezone, correctly
# # resetting it afterwards.
# def run_with_tz(tz):
# def decorator(func):
# def inner(*args, **kwds):
# try:
# tzset = time.tzset
# except AttributeError:
# raise unittest.SkipTest("tzset required")
# if 'TZ' in os.environ:
# orig_tz = os.environ['TZ']
# else:
# orig_tz = None
# os.environ['TZ'] = tz
# tzset()
# # now run the function, resetting the tz on exceptions
# try:
# return func(*args, **kwds)
# finally:
# if orig_tz is None:
# del os.environ['TZ']
# else:
# os.environ['TZ'] = orig_tz
# time.tzset()
# inner.__name__ = func.__name__
# inner.__doc__ = func.__doc__
# return inner
# return decorator
# #=======================================================================
# # Big-memory-test support. Separate from 'resources' because memory use should be configurable.
# # Some handy shorthands. Note that these are used for byte-limits as well
# # as size-limits, in the various bigmem tests
# _1M = 1024*1024
# _1G = 1024 * _1M
# _2G = 2 * _1G
# _4G = 4 * _1G
# MAX_Py_ssize_t = sys.maxsize
# def set_memlimit(limit):
# global max_memuse
# global real_max_memuse
# sizes = {
# 'k': 1024,
# 'm': _1M,
# 'g': _1G,
# 't': 1024*_1G,
# }
# m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
# re.IGNORECASE | re.VERBOSE)
# if m is None:
# raise ValueError('Invalid memory limit %r' % (limit,))
# memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
# real_max_memuse = memlimit
# if memlimit > MAX_Py_ssize_t:
# memlimit = MAX_Py_ssize_t
# if memlimit < _2G - 1:
# raise ValueError('Memory limit %r too low to be useful' % (limit,))
# max_memuse = memlimit
# def bigmemtest(minsize, memuse, overhead=5*_1M):
# """Decorator for bigmem tests.
# 'minsize' is the minimum useful size for the test (in arbitrary,
# test-interpreted units.) 'memuse' is the number of 'bytes per size' for
# the test, or a good estimate of it. 'overhead' specifies fixed overhead,
# independent of the testsize, and defaults to 5Mb.
# The decorator tries to guess a good value for 'size' and passes it to
# the decorated test function. If minsize * memuse is more than the
# allowed memory use (as defined by max_memuse), the test is skipped.
# Otherwise, minsize is adjusted upward to use up to max_memuse.
# """
# def decorator(f):
# def wrapper(self):
# if not max_memuse:
# # If max_memuse is 0 (the default),
# # we still want to run the tests with size set to a few kb,
# # to make sure they work. We still want to avoid using
# # too much memory, though, but we do that noisily.
# maxsize = 5147
# self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
# else:
# maxsize = int((max_memuse - overhead) / memuse)
# if maxsize < minsize:
# # Really ought to print 'test skipped' or something
# if verbose:
# sys.stderr.write("Skipping %s because of memory "
# "constraint\n" % (f.__name__,))
# return
# # Try to keep some breathing room in memory use
# maxsize = max(maxsize - 50 * _1M, minsize)
# return f(self, maxsize)
# wrapper.minsize = minsize
# wrapper.memuse = memuse
# wrapper.overhead = overhead
# return wrapper
# return decorator
# def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
# def decorator(f):
# def wrapper(self):
# if not real_max_memuse:
# maxsize = 5147
# else:
# maxsize = size
# if ((real_max_memuse or not dry_run)
# and real_max_memuse < maxsize * memuse):
# if verbose:
# sys.stderr.write("Skipping %s because of memory "
# "constraint\n" % (f.__name__,))
# return
# return f(self, maxsize)
# wrapper.size = size
# wrapper.memuse = memuse
# wrapper.overhead = overhead
# return wrapper
# return decorator
# def bigaddrspacetest(f):
# """Decorator for tests that fill the address space."""
# def wrapper(self):
# if max_memuse < MAX_Py_ssize_t:
# if verbose:
# sys.stderr.write("Skipping %s because of memory "
# "constraint\n" % (f.__name__,))
# else:
# return f(self)
# return wrapper
#=======================================================================
# unittest integration.
class BasicTestRunner(object):
def run(self, test):
result = unittest.TestResult()
test(result)
return result
def _id(obj):
return obj
# def requires_resource(resource):
# if resource == 'gui' and not _is_gui_available():
# return unittest.skip(_is_gui_available.reason)
# if is_resource_enabled(resource):
# return _id
# else:
# return unittest.skip("resource {0!r} is not enabled".format(resource))
def cpython_only(test):
return lambda: None
# def cpython_only(test):
# """
# Decorator for tests only applicable on CPython.
# """
# return impl_detail(cpython=True)(test)
# def impl_detail(msg=None, **guards):
# if check_impl_detail(**guards):
# return _id
# if msg is None:
# guardnames, default = _parse_guards(guards)
# if default:
# msg = "implementation detail not available on {0}"
# else:
# msg = "implementation detail specific to {0}"
# guardnames = sorted(guardnames.keys())
# msg = msg.format(' or '.join(guardnames))
# return unittest.skip(msg)
# def _parse_guards(guards):
# # Returns a tuple ({platform_name: run_me}, default_value)
# if not guards:
# return ({'cpython': True}, False)
# is_true = guards.values()[0]
# assert guards.values() == [is_true] * len(guards) # all True or all False
# return (guards, not is_true)
# # Use the following check to guard CPython's implementation-specific tests --
# # or to run them only on the implementation(s) guarded by the arguments.
# def check_impl_detail(**guards):
# """This function returns True or False depending on the host platform.
# Examples:
# if check_impl_detail(): # only on CPython (default)
# if check_impl_detail(jython=True): # only on Jython
# if check_impl_detail(cpython=False): # everywhere except on CPython
# """
# guards, default = _parse_guards(guards)
# return guards.get(platform.python_implementation().lower(), default)
def _run_suite(suite):
"""Run tests from a unittest.TestSuite-derived class."""
if verbose:
runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
else:
runner = BasicTestRunner()
result = runner.run(suite)
if not result.wasSuccessful():
if len(result.errors) == 1 and not result.failures:
err = result.errors[0][1]
elif len(result.failures) == 1 and not result.errors:
err = result.failures[0][1]
else:
err = "multiple errors occurred"
if not verbose:
err += "; run in verbose mode for details"
raise TestFailed(err)
def run_unittest(*classes):
"""Run tests from unittest.TestCase-derived classes."""
valid_types = (unittest.TestSuite, unittest.TestCase)
suite = unittest.TestSuite()
for cls in classes:
if isinstance(cls, str):
if cls in sys.modules:
suite.addTest(unittest.findTestCases(sys.modules[cls]))
else:
raise ValueError("str arguments must be keys in sys.modules")
elif isinstance(cls, valid_types):
suite.addTest(cls)
else:
suite.addTest(unittest.makeSuite(cls))
_run_suite(suite)
# #=======================================================================
# # Check for the presence of docstrings.
# HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
# sys.platform == 'win32' or
# sysconfig.get_config_var('WITH_DOC_STRINGS'))
# requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
# "test requires docstrings")
# #=======================================================================
# # doctest driver.
# def run_doctest(module, verbosity=None):
# """Run doctest on the given module. Return (#failures, #tests).
# If optional argument verbosity is not specified (or is None), pass
# test_support's belief about verbosity on to doctest. Else doctest's
# usual behavior is used (it searches sys.argv for -v).
# """
# import doctest
# if verbosity is None:
# verbosity = verbose
# else:
# verbosity = None
# # Direct doctest output (normally just errors) to real stdout; doctest
# # output shouldn't be compared by regrtest.
# save_stdout = sys.stdout
# sys.stdout = get_original_stdout()
# try:
# f, t = doctest.testmod(module, verbose=verbosity)
# if f:
# raise TestFailed("%d of %d doctests failed" % (f, t))
# finally:
# sys.stdout = save_stdout
# if verbose:
# print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
# return f, t
# #=======================================================================
# # Threading support to prevent reporting refleaks when running regrtest.py -R
# # NOTE: we use thread._count() rather than threading.enumerate() (or the
# # moral equivalent thereof) because a threading.Thread object is still alive
# # until its __bootstrap() method has returned, even after it has been
# # unregistered from the threading module.
# # thread._count(), on the other hand, only gets decremented *after* the
# # __bootstrap() method has returned, which gives us reliable reference counts
# # at the end of a test run.
# def threading_setup():
# if thread:
# return thread._count(),
# else:
# return 1,
# def threading_cleanup(nb_threads):
# if not thread:
# return
# _MAX_COUNT = 10
# for count in range(_MAX_COUNT):
# n = thread._count()
# if n == nb_threads:
# break
# time.sleep(0.1)
# # XXX print a warning in case of failure?
# def reap_threads(func):
# """Use this function when threads are being used. This will
# ensure that the threads are cleaned up even when the test fails.
# If threading is unavailable this function does nothing.
# """
# if not thread:
# return func
# @functools.wraps(func)
# def decorator(*args):
# key = threading_setup()
# try:
# return func(*args)
# finally:
# threading_cleanup(*key)
# return decorator
# def reap_children():
# """Use this function at the end of test_main() whenever sub-processes
# are started. This will help ensure that no extra children (zombies)
# stick around to hog resources and create problems when looking
# for refleaks.
# """
# # Reap all our dead child processes so we don't leave zombies around.
# # These hog resources and might be causing some of the buildbots to die.
# if hasattr(os, 'waitpid'):
# any_process = -1
# while True:
# try:
# # This will raise an exception on Windows. That's ok.
# pid, status = os.waitpid(any_process, os.WNOHANG)
# if pid == 0:
# break
# except:
# break
# @contextlib.contextmanager
# def start_threads(threads, unlock=None):
# threads = list(threads)
# started = []
# try:
# try:
# for t in threads:
# t.start()
# started.append(t)
# except:
# if verbose:
# print("Can't start %d threads, only %d threads started" %
# (len(threads), len(started)))
# raise
# yield
# finally:
# if unlock:
# unlock()
# endtime = starttime = time.time()
# for timeout in range(1, 16):
# endtime += 60
# for t in started:
# t.join(max(endtime - time.time(), 0.01))
# started = [t for t in started if t.isAlive()]
# if not started:
# break
# if verbose:
# print('Unable to join %d threads during a period of '
# '%d minutes' % (len(started), timeout))
# started = [t for t in started if t.isAlive()]
# if started:
# raise AssertionError('Unable to join %d threads' % len(started))
# @contextlib.contextmanager
# def swap_attr(obj, attr, new_val):
# """Temporary swap out an attribute with a new object.
# Usage:
# with swap_attr(obj, "attr", 5):
# ...
# This will set obj.attr to 5 for the duration of the with: block,
# restoring the old value at the end of the block. If `attr` doesn't
# exist on `obj`, it will be created and then deleted at the end of the
# block.
# """
# if hasattr(obj, attr):
# real_val = getattr(obj, attr)
# setattr(obj, attr, new_val)
# try:
# yield
# finally:
# setattr(obj, attr, real_val)
# else:
# setattr(obj, attr, new_val)
# try:
# yield
# finally:
# delattr(obj, attr)
# def py3k_bytes(b):
# """Emulate the py3k bytes() constructor.
# NOTE: This is only a best effort function.
# """
# try:
# # memoryview?
# return b.tobytes()
# except AttributeError:
# try:
# # iterable of ints?
# return b"".join(chr(x) for x in b)
# except TypeError:
# return bytes(b)
# def args_from_interpreter_flags():
# """Return a list of command-line arguments reproducing the current
# settings in sys.flags."""
# import subprocess
# return subprocess._args_from_interpreter_flags()
# def strip_python_stderr(stderr):
# """Strip the stderr of a Python process from potential debug output
# emitted by the interpreter.
# This will typically be run on the result of the communicate() method
# of a subprocess.Popen object.
# """
# stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
# return stderr
# def check_free_after_iterating(test, iter, cls, args=()):
# class A(cls):
# def __del__(self):
# done[0] = True
# try:
# next(it)
# except StopIteration:
# pass
# done = [False]
# it = iter(A(*args))
# # Issue 26494: Shouldn't crash
# test.assertRaises(StopIteration, next, it)
# # The sequence should be deallocated just after the end of iterating
# gc_collect()
# test.assertTrue(done[0])
from test import test_support, seq_tests
# import gc
class TupleTest(seq_tests.CommonTest):
type2test = tuple
def test_constructors(self):
super(TupleTest, self).test_constructors()
# calling built-in types without argument must return empty
self.assertEqual(tuple(), ())
t0_3 = (0, 1, 2, 3)
t0_3_bis = tuple(t0_3)
self.assertTrue(t0_3 is t0_3_bis)
self.assertEqual(tuple([]), ())
self.assertEqual(tuple([0, 1, 2, 3]), (0, 1, 2, 3))
self.assertEqual(tuple(''), ())
self.assertEqual(tuple('spam'), ('s', 'p', 'a', 'm'))
def test_truth(self):
super(TupleTest, self).test_truth()
self.assertTrue(not ())
self.assertTrue((42, ))
def test_len(self):
super(TupleTest, self).test_len()
self.assertEqual(len(()), 0)
self.assertEqual(len((0,)), 1)
self.assertEqual(len((0, 1, 2)), 3)
def test_iadd(self):
super(TupleTest, self).test_iadd()
u = (0, 1)
u2 = u
u += (2, 3)
self.assertTrue(u is not u2)
def test_imul(self):
super(TupleTest, self).test_imul()
u = (0, 1)
u2 = u
u *= 3
self.assertTrue(u is not u2)
def test_tupleresizebug(self):
# Check that a specific bug in _PyTuple_Resize() is squashed.
def f():
for i in range(1000):
yield i
self.assertEqual(list(tuple(f())), range(1000))
def test_hash(self):
# See SF bug 942952: Weakness in tuple hash
# The hash should:
# be non-commutative
# should spread-out closely spaced values
# should not exhibit cancellation in tuples like (x,(x,y))
# should be distinct from element hashes: hash(x)!=hash((x,))
# This test exercises those cases.
# For a pure random hash and N=50, the expected number of occupied
# buckets when tossing 252,600 balls into 2**32 buckets
# is 252,592.6, or about 7.4 expected collisions. The
# standard deviation is 2.73. On a box with 64-bit hash
# codes, no collisions are expected. Here we accept no
# more than 15 collisions. Any worse and the hash function
# is sorely suspect.
N=50
base = range(N)
xp = [(i, j) for i in base for j in base]
inps = base + [(i, j) for i in base for j in xp] + \
[(i, j) for i in xp for j in base] + xp + zip(base)
collisions = len(inps) - len(set(map(hash, inps)))
self.assertTrue(collisions <= 15)
def test_repr(self):
l0 = tuple()
l2 = (0, 1, 2)
a0 = self.type2test(l0)
a2 = self.type2test(l2)
self.assertEqual(str(a0), repr(l0))
self.assertEqual(str(a2), repr(l2))
self.assertEqual(repr(a0), "()")
self.assertEqual(repr(a2), "(0, 1, 2)")
# def _not_tracked(self, t):
# # Nested tuples can take several collections to untrack
# gc.collect()
# gc.collect()
# self.assertFalse(gc.is_tracked(t), t)
# def _tracked(self, t):
# self.assertTrue(gc.is_tracked(t), t)
# gc.collect()
# gc.collect()
# self.assertTrue(gc.is_tracked(t), t)
# @test_support.cpython_only
# def test_track_literals(self):
# # Test GC-optimization of tuple literals
# x, y, z = 1.5, "a", []
# self._not_tracked(())
# self._not_tracked((1,))
# self._not_tracked((1, 2))
# self._not_tracked((1, 2, "a"))
# self._not_tracked((1, 2, (None, True, False, ()), int))
# self._not_tracked((object(),))
# self._not_tracked(((1, x), y, (2, 3)))
# # Tuples with mutable elements are always tracked, even if those
# # elements are not tracked right now.
# self._tracked(([],))
# self._tracked(([1],))
# self._tracked(({},))
# self._tracked((set(),))
# self._tracked((x, y, z))
# def check_track_dynamic(self, tp, always_track):
# x, y, z = 1.5, "a", []
# check = self._tracked if always_track else self._not_tracked
# check(tp())
# check(tp([]))
# check(tp(set()))
# check(tp([1, x, y]))
# check(tp(obj for obj in [1, x, y]))
# check(tp(set([1, x, y])))
# check(tp(tuple([obj]) for obj in [1, x, y]))
# check(tuple(tp([obj]) for obj in [1, x, y]))
# self._tracked(tp([z]))
# self._tracked(tp([[x, y]]))
# self._tracked(tp([{x: y}]))
# self._tracked(tp(obj for obj in [x, y, z]))
# self._tracked(tp(tuple([obj]) for obj in [x, y, z]))
# self._tracked(tuple(tp([obj]) for obj in [x, y, z]))
# @test_support.cpython_only
# def test_track_dynamic(self):
# # Test GC-optimization of dynamically constructed tuples.
# self.check_track_dynamic(tuple, False)
# @test_support.cpython_only
# def test_track_subtypes(self):
# # Tuple subtypes must always be tracked
# class MyTuple(tuple):
# pass
# self.check_track_dynamic(MyTuple, True)
# @test_support.cpython_only
# def test_bug7466(self):
# # Trying to untrack an unfinished tuple could crash Python
# self._not_tracked(tuple(gc.collect() for i in range(101)))
def test_main():
test_support.run_unittest(TupleTest)
if __name__=="__main__":
test_main()
"""
Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's
Smalltalk testing framework.
This module contains the core framework classes that form the basis of
specific test cases and suites (TestCase, TestSuite etc.), and also a
text-based utility class for running the tests and reporting the results
(TextTestRunner).
Simple usage:
import unittest
class IntegerArithmeticTestCase(unittest.TestCase):
def testAdd(self): ## test method names begin 'test*'
self.assertEqual((1 + 2), 3)
self.assertEqual(0 + 1, 1)
def testMultiply(self):
self.assertEqual((0 * 10), 0)
self.assertEqual((5 * 8), 40)
if __name__ == '__main__':
unittest.main()
Further information is available in the bundled documentation, and from
http://docs.python.org/library/unittest.html
Copyright (c) 1999-2003 Steve Purcell
Copyright (c) 2003-2010 Python Software Foundation
This module is free software, and you may redistribute it and/or modify
it under the same terms as Python itself, so long as this copyright message
and disclaimer are retained in their original form.
IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
"""
__all__ = ['TestResult', 'TestCase', 'TestSuite',
'TextTestRunner', 'TestLoader', 'FunctionTestCase', 'main',
'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless',
'expectedFailure', 'TextTestResult', 'installHandler',
'registerResult', 'removeResult', 'removeHandler']
# Expose obsolete functions for backwards compatibility
# __all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases'])
__all__ += (['getTestCaseNames', 'makeSuite', 'findTestCases'])
__unittest = True
import unittest_result
import unittest_case
import unittest_suite
import unittest_loader
# import unittest_main
import unittest_runner
import unittest_signals
# from .result import TestResult
# from .case import (TestCase, FunctionTestCase, SkipTest, skip, skipIf,
# skipUnless, expectedFailure)
# from .suite import BaseTestSuite, TestSuite
# from .loader import (TestLoader, defaultTestLoader, makeSuite, getTestCaseNames,
# findTestCases)
# from .main import TestProgram, main
# from .runner import TextTestRunner, TextTestResult
# from .signals import installHandler, registerResult, removeResult, removeHandler
TestResult = unittest_result.TestResult
TestCase, FunctionTestCase, SkipTest, skip, skipIf, skipUnless, expectedFailure = \
unittest_case.TestCase, unittest_case.FunctionTestCase, unittest_case.SkipTest, \
unittest_case.skip, unittest_case.skipIf, unittest_case.skipUnless, \
unittest_case.expectedFailure
BaseTestSuite, TestSuite = unittest_suite.BaseTestSuite, unittest_suite.TestSuite
TestLoader, defaultTestLoader, makeSuite, getTestCaseNames, findTestCases = \
unittest_loader.TestLoader, unittest_loader.defaultTestLoader, unittest_loader.makeSuite, \
unittest_loader.getTestCaseNames, unittest_loader.findTestCases
# TestProgram, main = unittest_main.TestProgram, unittest_main.main
TextTestRunner, TextTestResult = unittest_runner.TextTestRunner, unittest_runner.TextTestResult
installHandler, registerResult, removeResult, removeHandler = \
unittest_signals.installHandler, unittest_signals.registerResult, \
unittest_signals.removeResult, unittest_signals.removeHandler
# deprecated
_TextTestResult = TextTestResult
"""Test case implementation"""
import collections
import sys
import functools
import difflib
import pprint
import re
import types
import warnings
# from . import result
import unittest_result as result
import unittest_util as _util
# from .util import (
# strclass, safe_repr, unorderable_list_difference,
# _count_diff_all_purpose, _count_diff_hashable
# )
strclass, safe_repr, unorderable_list_difference, _count_diff_all_purpose, \
_count_diff_hashable = _util.strclass, _util.safe_repr, \
_util.unorderable_list_difference, _util._count_diff_all_purpose, \
_util._count_diff_hashable
class KeyboardInterrupt(BaseException):
pass
__unittest = True
DIFF_OMITTED = ('\nDiff is %s characters long. '
'Set self.maxDiff to None to see it.')
class SkipTest(Exception):
"""
Raise this exception in a test to skip it.
Usually you can use TestCase.skipTest() or one of the skipping decorators
instead of raising this directly.
"""
pass
class _ExpectedFailure(Exception):
"""
Raise this when a test is expected to fail.
This is an implementation detail.
"""
def __init__(self, exc_info):
super(_ExpectedFailure, self).__init__()
self.exc_info = exc_info
class _UnexpectedSuccess(Exception):
"""
The test was supposed to fail, but it didn't!
"""
pass
def _id(obj):
return obj
def skip(reason):
"""
Unconditionally skip a test.
"""
def decorator(test_item):
if not isinstance(test_item, (type, types.ClassType)):
# @functools.wraps(test_item)
def skip_wrapper(*args, **kwargs):
raise SkipTest(reason)
skip_wrapper = functools.wraps(test_item)(skip_wrapper)
test_item = skip_wrapper
test_item.__unittest_skip__ = True
test_item.__unittest_skip_why__ = reason
return test_item
return decorator
def skipIf(condition, reason):
"""
Skip a test if the condition is true.
"""
if condition:
return skip(reason)
return _id
def skipUnless(condition, reason):
"""
Skip a test unless the condition is true.
"""
if not condition:
return skip(reason)
return _id
def expectedFailure(func):
# @functools.wraps(func)
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception:
raise _ExpectedFailure(sys.exc_info())
raise _UnexpectedSuccess
wrapper = functools.wraps(func)(wrapper)
return wrapper
class _AssertRaisesContext(object):
"""A context manager used to implement TestCase.assertRaises* methods."""
def __init__(self, expected, test_case, expected_regexp=None):
self.expected = expected
self.failureException = test_case.failureException
self.expected_regexp = expected_regexp
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
if exc_type is None:
try:
exc_name = self.expected.__name__
except AttributeError:
exc_name = str(self.expected)
raise self.failureException(
"{0} not raised".format(exc_name))
if not issubclass(exc_type, self.expected):
# let unexpected exceptions pass through
return False
self.exception = exc_value # store for later retrieval
if self.expected_regexp is None:
return True
expected_regexp = self.expected_regexp
if not expected_regexp.search(str(exc_value)):
raise self.failureException('"%s" does not match "%s"' %
(expected_regexp.pattern, str(exc_value)))
return True
class TestCase(object):
"""A class whose instances are single test cases.
By default, the test code itself should be placed in a method named
'runTest'.
If the fixture may be used for many test cases, create as
many test methods as are needed. When instantiating such a TestCase
subclass, specify in the constructor arguments the name of the test method
that the instance is to execute.
Test authors should subclass TestCase for their own tests. Construction
and deconstruction of the test's environment ('fixture') can be
implemented by overriding the 'setUp' and 'tearDown' methods respectively.
If it is necessary to override the __init__ method, the base class
__init__ method must always be called. It is important that subclasses
should not change the signature of their __init__ method, since instances
of the classes are instantiated automatically by parts of the framework
in order to be run.
When subclassing TestCase, you can set these attributes:
* failureException: determines which exception will be raised when
the instance's assertion methods fail; test methods raising this
exception will be deemed to have 'failed' rather than 'errored'.
* longMessage: determines whether long messages (including repr of
objects used in assert methods) will be printed on failure in *addition*
to any explicit message passed.
* maxDiff: sets the maximum length of a diff in failure messages
by assert methods using difflib. It is looked up as an instance
attribute so can be configured by individual tests if required.
"""
failureException = AssertionError
longMessage = False
maxDiff = 80*8
# If a string is longer than _diffThreshold, use normal comparison instead
# of difflib. See #11763.
# _diffThreshold = 2**16
_diffThreshold = 1<<16
# Attribute used by TestSuite for classSetUp
_classSetupFailed = False
def __init__(self, methodName='runTest'):
"""Create an instance of the class that will use the named test
method when executed. Raises a ValueError if the instance does
not have a method with the specified name.
"""
self._testMethodName = methodName
self._resultForDoCleanups = None
try:
testMethod = getattr(self, methodName)
except AttributeError:
raise ValueError("no such test method in %s: %s" %
(self.__class__, methodName))
# self._testMethodDoc = testMethod.__doc__
self._cleanups = []
# Map types to custom assertEqual functions that will compare
# instances of said type in more detail to generate a more useful
# error message.
self._type_equality_funcs = {}
self.addTypeEqualityFunc(dict, 'assertDictEqual')
self.addTypeEqualityFunc(list, 'assertListEqual')
self.addTypeEqualityFunc(tuple, 'assertTupleEqual')
self.addTypeEqualityFunc(set, 'assertSetEqual')
self.addTypeEqualityFunc(frozenset, 'assertSetEqual')
try:
self.addTypeEqualityFunc(unicode, 'assertMultiLineEqual')
except NameError:
# No unicode support in this build
pass
def addTypeEqualityFunc(self, typeobj, function):
"""Add a type specific assertEqual style function to compare a type.
This method is for use by TestCase subclasses that need to register
their own type equality functions to provide nicer error messages.
Args:
typeobj: The data type to call this function on when both values
are of the same type in assertEqual().
function: The callable taking two arguments and an optional
msg= argument that raises self.failureException with a
useful error message when the two arguments are not equal.
"""
self._type_equality_funcs[typeobj] = function
def addCleanup(self, function, *args, **kwargs):
"""Add a function, with arguments, to be called when the test is
completed. Functions added are called on a LIFO basis and are
called after tearDown on test failure or success.
Cleanup items are called even if setUp fails (unlike tearDown)."""
self._cleanups.append((function, args, kwargs))
def setUp(self):
"Hook method for setting up the test fixture before exercising it."
pass
def tearDown(self):
"Hook method for deconstructing the test fixture after testing it."
pass
# @classmethod
def setUpClass(cls):
"Hook method for setting up class fixture before running tests in the class."
setUpClass = classmethod(setUpClass)
# @classmethod
def tearDownClass(cls):
"Hook method for deconstructing the class fixture after running all tests in the class."
tearDownClass = classmethod(tearDownClass)
def countTestCases(self):
return 1
def defaultTestResult(self):
return result.TestResult()
def shortDescription(self):
"""Returns a one-line description of the test, or None if no
description has been provided.
The default implementation of this method returns the first line of
the specified test method's docstring.
"""
# doc = self._testMethodDoc
# return doc and doc.split("\n")[0].strip() or None
return ''
def id(self):
return "%s.%s" % (strclass(self.__class__), self._testMethodName)
def __eq__(self, other):
if type(self) is not type(other):
return NotImplemented
return self._testMethodName == other._testMethodName
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash((type(self), self._testMethodName))
def __str__(self):
return "%s (%s)" % (self._testMethodName, strclass(self.__class__))
def __repr__(self):
return "<%s testMethod=%s>" % \
(strclass(self.__class__), self._testMethodName)
def _addSkip(self, result, reason):
addSkip = getattr(result, 'addSkip', None)
if addSkip is not None:
addSkip(self, reason)
else:
warnings.warn("TestResult has no addSkip method, skips not reported",
RuntimeWarning, 2)
result.addSuccess(self)
def run(self, result=None):
orig_result = result
if result is None:
result = self.defaultTestResult()
startTestRun = getattr(result, 'startTestRun', None)
if startTestRun is not None:
startTestRun()
self._resultForDoCleanups = result
result.startTest(self)
testMethod = getattr(self, self._testMethodName)
if (getattr(self.__class__, "__unittest_skip__", False) or
getattr(testMethod, "__unittest_skip__", False)):
# If the class or method was skipped.
try:
skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
or getattr(testMethod, '__unittest_skip_why__', ''))
self._addSkip(result, skip_why)
finally:
result.stopTest(self)
return
try:
success = False
try:
self.setUp()
except SkipTest as e:
self._addSkip(result, str(e))
except KeyboardInterrupt:
raise
except:
result.addError(self, sys.exc_info())
else:
try:
testMethod()
except KeyboardInterrupt:
raise
except self.failureException:
result.addFailure(self, sys.exc_info())
except _ExpectedFailure as e:
addExpectedFailure = getattr(result, 'addExpectedFailure', None)
if addExpectedFailure is not None:
addExpectedFailure(self, e.exc_info)
else:
warnings.warn("TestResult has no addExpectedFailure method, reporting as passes",
RuntimeWarning)
result.addSuccess(self)
except _UnexpectedSuccess:
addUnexpectedSuccess = getattr(result, 'addUnexpectedSuccess', None)
if addUnexpectedSuccess is not None:
addUnexpectedSuccess(self)
else:
warnings.warn("TestResult has no addUnexpectedSuccess method, reporting as failures",
RuntimeWarning)
result.addFailure(self, sys.exc_info())
except SkipTest as e:
self._addSkip(result, str(e))
except:
result.addError(self, sys.exc_info())
else:
success = True
try:
self.tearDown()
except KeyboardInterrupt:
raise
except:
result.addError(self, sys.exc_info())
success = False
cleanUpSuccess = self.doCleanups()
success = success and cleanUpSuccess
if success:
result.addSuccess(self)
finally:
result.stopTest(self)
if orig_result is None:
stopTestRun = getattr(result, 'stopTestRun', None)
if stopTestRun is not None:
stopTestRun()
def doCleanups(self):
"""Execute all cleanup functions. Normally called for you after
tearDown."""
result = self._resultForDoCleanups
ok = True
while self._cleanups:
function, args, kwargs = self._cleanups.pop(-1)
try:
function(*args, **kwargs)
except KeyboardInterrupt:
raise
except:
ok = False
result.addError(self, sys.exc_info())
return ok
def __call__(self, *args, **kwds):
return self.run(*args, **kwds)
def debug(self):
"""Run the test without collecting errors in a TestResult"""
self.setUp()
getattr(self, self._testMethodName)()
self.tearDown()
while self._cleanups:
function, args, kwargs = self._cleanups.pop(-1)
function(*args, **kwargs)
def skipTest(self, reason):
"""Skip this test."""
raise SkipTest(reason)
def fail(self, msg=None):
"""Fail immediately, with the given message."""
raise self.failureException(msg)
def assertFalse(self, expr, msg=None):
"""Check that the expression is false."""
if expr:
msg = self._formatMessage(msg, "%s is not false" % safe_repr(expr))
raise self.failureException(msg)
def assertTrue(self, expr, msg=None):
"""Check that the expression is true."""
if not expr:
msg = self._formatMessage(msg, "%s is not true" % safe_repr(expr))
raise self.failureException(msg)
def _formatMessage(self, msg, standardMsg):
"""Honour the longMessage attribute when generating failure messages.
If longMessage is False this means:
* Use only an explicit message if it is provided
* Otherwise use the standard message for the assert
If longMessage is True:
* Use the standard message
* If an explicit message is provided, plus ' : ' and the explicit message
"""
if not self.longMessage:
return msg or standardMsg
if msg is None:
return standardMsg
try:
# don't switch to '{}' formatting in Python 2.X
# it changes the way unicode input is handled
return '%s : %s' % (standardMsg, msg)
except UnicodeDecodeError:
return '%s : %s' % (safe_repr(standardMsg), safe_repr(msg))
def assertRaises(self, excClass, callableObj=None, *args, **kwargs):
"""Fail unless an exception of class excClass is raised
by callableObj when invoked with arguments args and keyword
arguments kwargs. If a different type of exception is
raised, it will not be caught, and the test case will be
deemed to have suffered an error, exactly as for an
unexpected exception.
If called with callableObj omitted or None, will return a
context object used like this::
with self.assertRaises(SomeException):
do_something()
The context manager keeps a reference to the exception as
the 'exception' attribute. This allows you to inspect the
exception after the assertion::
with self.assertRaises(SomeException) as cm:
do_something()
the_exception = cm.exception
self.assertEqual(the_exception.error_code, 3)
"""
context = _AssertRaisesContext(excClass, self)
if callableObj is None:
return context
with context:
callableObj(*args, **kwargs)
def _getAssertEqualityFunc(self, first, second):
"""Get a detailed comparison function for the types of the two args.
Returns: A callable accepting (first, second, msg=None) that will
raise a failure exception if first != second with a useful human
readable error message for those types.
"""
#
# NOTE(gregory.p.smith): I considered isinstance(first, type(second))
# and vice versa. I opted for the conservative approach in case
# subclasses are not intended to be compared in detail to their super
# class instances using a type equality func. This means testing
# subtypes won't automagically use the detailed comparison. Callers
# should use their type specific assertSpamEqual method to compare
# subclasses if the detailed comparison is desired and appropriate.
# See the discussion in http://bugs.python.org/issue2578.
#
if type(first) is type(second):
asserter = self._type_equality_funcs.get(type(first))
if asserter is not None:
if isinstance(asserter, basestring):
asserter = getattr(self, asserter)
return asserter
return self._baseAssertEqual
def _baseAssertEqual(self, first, second, msg=None):
"""The default assertEqual implementation, not type specific."""
if not first == second:
standardMsg = '%s != %s' % (safe_repr(first), safe_repr(second))
msg = self._formatMessage(msg, standardMsg)
raise self.failureException(msg)
def assertEqual(self, first, second, msg=None):
"""Fail if the two objects are unequal as determined by the '=='
operator.
"""
assertion_func = self._getAssertEqualityFunc(first, second)
assertion_func(first, second, msg=msg)
def assertNotEqual(self, first, second, msg=None):
"""Fail if the two objects are equal as determined by the '!='
operator.
"""
if not first != second:
msg = self._formatMessage(msg, '%s == %s' % (safe_repr(first),
safe_repr(second)))
raise self.failureException(msg)
def assertAlmostEqual(self, first, second, places=None, msg=None, delta=None):
"""Fail if the two objects are unequal as determined by their
difference rounded to the given number of decimal places
(default 7) and comparing to zero, or by comparing that the
between the two objects is more than the given delta.
Note that decimal places (from zero) are usually not the same
as significant digits (measured from the most signficant digit).
If the two objects compare equal then they will automatically
compare almost equal.
"""
if first == second:
# shortcut
return
if delta is not None and places is not None:
raise TypeError("specify delta or places not both")
if delta is not None:
if abs(first - second) <= delta:
return
standardMsg = '%s != %s within %s delta' % (safe_repr(first),
safe_repr(second),
safe_repr(delta))
else:
if places is None:
places = 7
if round(abs(second-first), places) == 0:
return
standardMsg = '%s != %s within %r places' % (safe_repr(first),
safe_repr(second),
places)
msg = self._formatMessage(msg, standardMsg)
raise self.failureException(msg)
def assertNotAlmostEqual(self, first, second, places=None, msg=None, delta=None):
"""Fail if the two objects are equal as determined by their
difference rounded to the given number of decimal places
(default 7) and comparing to zero, or by comparing that the
between the two objects is less than the given delta.
Note that decimal places (from zero) are usually not the same
as significant digits (measured from the most signficant digit).
Objects that are equal automatically fail.
"""
if delta is not None and places is not None:
raise TypeError("specify delta or places not both")
if delta is not None:
if not (first == second) and abs(first - second) > delta:
return
standardMsg = '%s == %s within %s delta' % (safe_repr(first),
safe_repr(second),
safe_repr(delta))
else:
if places is None:
places = 7
if not (first == second) and round(abs(second-first), places) != 0:
return
standardMsg = '%s == %s within %r places' % (safe_repr(first),
safe_repr(second),
places)
msg = self._formatMessage(msg, standardMsg)
raise self.failureException(msg)
# Synonyms for assertion methods
# The plurals are undocumented. Keep them that way to discourage use.
# Do not add more. Do not remove.
# Going through a deprecation cycle on these would annoy many people.
assertEquals = assertEqual
assertNotEquals = assertNotEqual
assertAlmostEquals = assertAlmostEqual
assertNotAlmostEquals = assertNotAlmostEqual
assert_ = assertTrue
# These fail* assertion method names are pending deprecation and will
# be a DeprecationWarning in 3.2; http://bugs.python.org/issue2578
def _deprecate(original_func):
def deprecated_func(*args, **kwargs):
warnings.warn(
'Please use {0} instead.'.format(original_func.__name__),
PendingDeprecationWarning, 2)
return original_func(*args, **kwargs)
return deprecated_func
failUnlessEqual = _deprecate(assertEqual)
failIfEqual = _deprecate(assertNotEqual)
failUnlessAlmostEqual = _deprecate(assertAlmostEqual)
failIfAlmostEqual = _deprecate(assertNotAlmostEqual)
failUnless = _deprecate(assertTrue)
failUnlessRaises = _deprecate(assertRaises)
failIf = _deprecate(assertFalse)
def assertSequenceEqual(self, seq1, seq2, msg=None, seq_type=None):
"""An equality assertion for ordered sequences (like lists and tuples).
For the purposes of this function, a valid ordered sequence type is one
which can be indexed, has a length, and has an equality operator.
Args:
seq1: The first sequence to compare.
seq2: The second sequence to compare.
seq_type: The expected datatype of the sequences, or None if no
datatype should be enforced.
msg: Optional message to use on failure instead of a list of
differences.
"""
if seq_type is not None:
seq_type_name = seq_type.__name__
if not isinstance(seq1, seq_type):
raise self.failureException('First sequence is not a %s: %s'
% (seq_type_name, safe_repr(seq1)))
if not isinstance(seq2, seq_type):
raise self.failureException('Second sequence is not a %s: %s'
% (seq_type_name, safe_repr(seq2)))
else:
seq_type_name = "sequence"
differing = None
try:
len1 = len(seq1)
except (TypeError, NotImplementedError):
differing = 'First %s has no length. Non-sequence?' % (
seq_type_name)
if differing is None:
try:
len2 = len(seq2)
except (TypeError, NotImplementedError):
differing = 'Second %s has no length. Non-sequence?' % (
seq_type_name)
if differing is None:
if seq1 == seq2:
return
seq1_repr = safe_repr(seq1)
seq2_repr = safe_repr(seq2)
if len(seq1_repr) > 30:
seq1_repr = seq1_repr[:30] + '...'
if len(seq2_repr) > 30:
seq2_repr = seq2_repr[:30] + '...'
elements = (seq_type_name.capitalize(), seq1_repr, seq2_repr)
differing = '%ss differ: %s != %s\n' % elements
for i in xrange(min(len1, len2)):
try:
item1 = seq1[i]
except (TypeError, IndexError, NotImplementedError):
differing += ('\nUnable to index element %d of first %s\n' %
(i, seq_type_name))
break
try:
item2 = seq2[i]
except (TypeError, IndexError, NotImplementedError):
differing += ('\nUnable to index element %d of second %s\n' %
(i, seq_type_name))
break
if item1 != item2:
differing += ('\nFirst differing element %d:\n%s\n%s\n' %
(i, safe_repr(item1), safe_repr(item2)))
break
else:
if (len1 == len2 and seq_type is None and
type(seq1) != type(seq2)):
# The sequences are the same, but have differing types.
return
if len1 > len2:
differing += ('\nFirst %s contains %d additional '
'elements.\n' % (seq_type_name, len1 - len2))
try:
differing += ('First extra element %d:\n%s\n' %
(len2, safe_repr(seq1[len2])))
except (TypeError, IndexError, NotImplementedError):
differing += ('Unable to index element %d '
'of first %s\n' % (len2, seq_type_name))
elif len1 < len2:
differing += ('\nSecond %s contains %d additional '
'elements.\n' % (seq_type_name, len2 - len1))
try:
differing += ('First extra element %d:\n%s\n' %
(len1, safe_repr(seq2[len1])))
except (TypeError, IndexError, NotImplementedError):
differing += ('Unable to index element %d '
'of second %s\n' % (len1, seq_type_name))
standardMsg = differing
diffMsg = '\n' + '\n'.join(
difflib.ndiff(pprint.pformat(seq1).splitlines(),
pprint.pformat(seq2).splitlines()))
standardMsg = self._truncateMessage(standardMsg, diffMsg)
msg = self._formatMessage(msg, standardMsg)
self.fail(msg)
def _truncateMessage(self, message, diff):
max_diff = self.maxDiff
if max_diff is None or len(diff) <= max_diff:
return message + diff
return message + (DIFF_OMITTED % len(diff))
def assertListEqual(self, list1, list2, msg=None):
"""A list-specific equality assertion.
Args:
list1: The first list to compare.
list2: The second list to compare.
msg: Optional message to use on failure instead of a list of
differences.
"""
self.assertSequenceEqual(list1, list2, msg, seq_type=list)
def assertTupleEqual(self, tuple1, tuple2, msg=None):
"""A tuple-specific equality assertion.
Args:
tuple1: The first tuple to compare.
tuple2: The second tuple to compare.
msg: Optional message to use on failure instead of a list of
differences.
"""
self.assertSequenceEqual(tuple1, tuple2, msg, seq_type=tuple)
def assertSetEqual(self, set1, set2, msg=None):
"""A set-specific equality assertion.
Args:
set1: The first set to compare.
set2: The second set to compare.
msg: Optional message to use on failure instead of a list of
differences.
assertSetEqual uses ducktyping to support different types of sets, and
is optimized for sets specifically (parameters must support a
difference method).
"""
try:
difference1 = set1.difference(set2)
except TypeError, e:
self.fail('invalid type when attempting set difference: %s' % e)
except AttributeError, e:
self.fail('first argument does not support set difference: %s' % e)
try:
difference2 = set2.difference(set1)
except TypeError, e:
self.fail('invalid type when attempting set difference: %s' % e)
except AttributeError, e:
self.fail('second argument does not support set difference: %s' % e)
if not (difference1 or difference2):
return
lines = []
if difference1:
lines.append('Items in the first set but not the second:')
for item in difference1:
lines.append(repr(item))
if difference2:
lines.append('Items in the second set but not the first:')
for item in difference2:
lines.append(repr(item))
standardMsg = '\n'.join(lines)
self.fail(self._formatMessage(msg, standardMsg))
def assertIn(self, member, container, msg=None):
"""Just like self.assertTrue(a in b), but with a nicer default message."""
if member not in container:
standardMsg = '%s not found in %s' % (safe_repr(member),
safe_repr(container))
self.fail(self._formatMessage(msg, standardMsg))
def assertNotIn(self, member, container, msg=None):
"""Just like self.assertTrue(a not in b), but with a nicer default message."""
if member in container:
standardMsg = '%s unexpectedly found in %s' % (safe_repr(member),
safe_repr(container))
self.fail(self._formatMessage(msg, standardMsg))
def assertIs(self, expr1, expr2, msg=None):
"""Just like self.assertTrue(a is b), but with a nicer default message."""
if expr1 is not expr2:
standardMsg = '%s is not %s' % (safe_repr(expr1),
safe_repr(expr2))
self.fail(self._formatMessage(msg, standardMsg))
def assertIsNot(self, expr1, expr2, msg=None):
"""Just like self.assertTrue(a is not b), but with a nicer default message."""
if expr1 is expr2:
standardMsg = 'unexpectedly identical: %s' % (safe_repr(expr1),)
self.fail(self._formatMessage(msg, standardMsg))
def assertDictEqual(self, d1, d2, msg=None):
self.assertIsInstance(d1, dict, 'First argument is not a dictionary')
self.assertIsInstance(d2, dict, 'Second argument is not a dictionary')
if d1 != d2:
standardMsg = '%s != %s' % (safe_repr(d1, True), safe_repr(d2, True))
diff = ('\n' + '\n'.join(difflib.ndiff(
pprint.pformat(d1).splitlines(),
pprint.pformat(d2).splitlines())))
standardMsg = self._truncateMessage(standardMsg, diff)
self.fail(self._formatMessage(msg, standardMsg))
def assertDictContainsSubset(self, expected, actual, msg=None):
"""Checks whether actual is a superset of expected."""
missing = []
mismatched = []
for key, value in expected.iteritems():
if key not in actual:
missing.append(key)
elif value != actual[key]:
mismatched.append('%s, expected: %s, actual: %s' %
(safe_repr(key), safe_repr(value),
safe_repr(actual[key])))
if not (missing or mismatched):
return
standardMsg = ''
if missing:
standardMsg = 'Missing: %s' % ','.join(safe_repr(m) for m in
missing)
if mismatched:
if standardMsg:
standardMsg += '; '
standardMsg += 'Mismatched values: %s' % ','.join(mismatched)
self.fail(self._formatMessage(msg, standardMsg))
def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
"""An unordered sequence specific comparison. It asserts that
actual_seq and expected_seq have the same element counts.
Equivalent to::
self.assertEqual(Counter(iter(actual_seq)),
Counter(iter(expected_seq)))
Asserts that each element has the same count in both sequences.
Example:
- [0, 1, 1] and [1, 0, 1] compare equal.
- [0, 0, 1] and [0, 1] compare unequal.
"""
first_seq, second_seq = list(expected_seq), list(actual_seq)
with warnings.catch_warnings():
if sys.py3kwarning:
# Silence Py3k warning raised during the sorting
for _msg in ["(code|dict|type) inequality comparisons",
"builtin_function_or_method order comparisons",
"comparing unequal types"]:
warnings.filterwarnings("ignore", _msg, DeprecationWarning)
try:
first = collections.Counter(first_seq)
second = collections.Counter(second_seq)
except TypeError:
# Handle case with unhashable elements
differences = _count_diff_all_purpose(first_seq, second_seq)
else:
if first == second:
return
differences = _count_diff_hashable(first_seq, second_seq)
if differences:
standardMsg = 'Element counts were not equal:\n'
lines = ['First has %d, Second has %d: %r' % diff for diff in differences]
diffMsg = '\n'.join(lines)
standardMsg = self._truncateMessage(standardMsg, diffMsg)
msg = self._formatMessage(msg, standardMsg)
self.fail(msg)
def assertMultiLineEqual(self, first, second, msg=None):
"""Assert that two multi-line strings are equal."""
self.assertIsInstance(first, basestring,
'First argument is not a string')
self.assertIsInstance(second, basestring,
'Second argument is not a string')
if first != second:
# don't use difflib if the strings are too long
if (len(first) > self._diffThreshold or
len(second) > self._diffThreshold):
self._baseAssertEqual(first, second, msg)
firstlines = first.splitlines(True)
secondlines = second.splitlines(True)
if len(firstlines) == 1 and first.strip('\r\n') == first:
firstlines = [first + '\n']
secondlines = [second + '\n']
standardMsg = '%s != %s' % (safe_repr(first, True),
safe_repr(second, True))
diff = '\n' + ''.join(difflib.ndiff(firstlines, secondlines))
standardMsg = self._truncateMessage(standardMsg, diff)
self.fail(self._formatMessage(msg, standardMsg))
def assertLess(self, a, b, msg=None):
"""Just like self.assertTrue(a < b), but with a nicer default message."""
if not a < b:
standardMsg = '%s not less than %s' % (safe_repr(a), safe_repr(b))
self.fail(self._formatMessage(msg, standardMsg))
def assertLessEqual(self, a, b, msg=None):
"""Just like self.assertTrue(a <= b), but with a nicer default message."""
if not a <= b:
standardMsg = '%s not less than or equal to %s' % (safe_repr(a), safe_repr(b))
self.fail(self._formatMessage(msg, standardMsg))
def assertGreater(self, a, b, msg=None):
"""Just like self.assertTrue(a > b), but with a nicer default message."""
if not a > b:
standardMsg = '%s not greater than %s' % (safe_repr(a), safe_repr(b))
self.fail(self._formatMessage(msg, standardMsg))
def assertGreaterEqual(self, a, b, msg=None):
"""Just like self.assertTrue(a >= b), but with a nicer default message."""
if not a >= b:
standardMsg = '%s not greater than or equal to %s' % (safe_repr(a), safe_repr(b))
self.fail(self._formatMessage(msg, standardMsg))
def assertIsNone(self, obj, msg=None):
"""Same as self.assertTrue(obj is None), with a nicer default message."""
if obj is not None:
standardMsg = '%s is not None' % (safe_repr(obj),)
self.fail(self._formatMessage(msg, standardMsg))
def assertIsNotNone(self, obj, msg=None):
"""Included for symmetry with assertIsNone."""
if obj is None:
standardMsg = 'unexpectedly None'
self.fail(self._formatMessage(msg, standardMsg))
def assertIsInstance(self, obj, cls, msg=None):
"""Same as self.assertTrue(isinstance(obj, cls)), with a nicer
default message."""
if not isinstance(obj, cls):
standardMsg = '%s is not an instance of %r' % (safe_repr(obj), cls)
self.fail(self._formatMessage(msg, standardMsg))
def assertNotIsInstance(self, obj, cls, msg=None):
"""Included for symmetry with assertIsInstance."""
if isinstance(obj, cls):
standardMsg = '%s is an instance of %r' % (safe_repr(obj), cls)
self.fail(self._formatMessage(msg, standardMsg))
def assertRaisesRegexp(self, expected_exception, expected_regexp,
callable_obj=None, *args, **kwargs):
"""Asserts that the message in a raised exception matches a regexp.
Args:
expected_exception: Exception class expected to be raised.
expected_regexp: Regexp (re pattern object or string) expected
to be found in error message.
callable_obj: Function to be called.
args: Extra args.
kwargs: Extra kwargs.
"""
if expected_regexp is not None:
expected_regexp = re.compile(expected_regexp)
context = _AssertRaisesContext(expected_exception, self, expected_regexp)
if callable_obj is None:
return context
with context:
callable_obj(*args, **kwargs)
def assertRegexpMatches(self, text, expected_regexp, msg=None):
"""Fail the test unless the text matches the regular expression."""
if isinstance(expected_regexp, basestring):
expected_regexp = re.compile(expected_regexp)
if not expected_regexp.search(text):
msg = msg or "Regexp didn't match"
msg = '%s: %r not found in %r' % (msg, expected_regexp.pattern, text)
raise self.failureException(msg)
def assertNotRegexpMatches(self, text, unexpected_regexp, msg=None):
"""Fail the test if the text matches the regular expression."""
if isinstance(unexpected_regexp, basestring):
unexpected_regexp = re.compile(unexpected_regexp)
match = unexpected_regexp.search(text)
if match:
msg = msg or "Regexp matched"
msg = '%s: %r matches %r in %r' % (msg,
text[match.start():match.end()],
unexpected_regexp.pattern,
text)
raise self.failureException(msg)
class FunctionTestCase(TestCase):
"""A test case that wraps a test function.
This is useful for slipping pre-existing test functions into the
unittest framework. Optionally, set-up and tidy-up functions can be
supplied. As with TestCase, the tidy-up ('tearDown') function will
always be called if the set-up ('setUp') function ran successfully.
"""
def __init__(self, testFunc, setUp=None, tearDown=None, description=None):
super(FunctionTestCase, self).__init__()
self._setUpFunc = setUp
self._tearDownFunc = tearDown
self._testFunc = testFunc
self._description = description
def setUp(self):
if self._setUpFunc is not None:
self._setUpFunc()
def tearDown(self):
if self._tearDownFunc is not None:
self._tearDownFunc()
def runTest(self):
self._testFunc()
def id(self):
return self._testFunc.__name__
def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
return self._setUpFunc == other._setUpFunc and \
self._tearDownFunc == other._tearDownFunc and \
self._testFunc == other._testFunc and \
self._description == other._description
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash((type(self), self._setUpFunc, self._tearDownFunc,
self._testFunc, self._description))
def __str__(self):
return "%s (%s)" % (strclass(self.__class__),
self._testFunc.__name__)
def __repr__(self):
return "<%s tec=%s>" % (strclass(self.__class__),
self._testFunc)
def shortDescription(self):
if self._description is not None:
return self._description
# doc = self._testFunc.__doc__
return doc and doc.split("\n")[0].strip() or None
"""Loading unittests."""
import os
import re
import sys
import traceback
import types
# from functools import cmp_to_key as _CmpToKey
# from fnmatch import fnmatch
import functools
import fnmatch as _fnmatch
_CmpToKey = functools.cmp_to_key
fnmatch = _fnmatch.fnmatch
# from . import case, suite
import unittest_case as case
import unittest_suite as suite
__unittest = True
# what about .pyc or .pyo (etc)
# we would need to avoid loading the same tests multiple times
# from '.py', '.pyc' *and* '.pyo'
VALID_MODULE_NAME = re.compile(r'[_a-z]\w*\.py$', re.IGNORECASE)
def _make_failed_import_test(name, suiteClass):
message = 'Failed to import test module: %s\n%s' % (name, traceback.format_exc())
return _make_failed_test('ModuleImportFailure', name, ImportError(message),
suiteClass)
def _make_failed_load_tests(name, exception, suiteClass):
return _make_failed_test('LoadTestsFailure', name, exception, suiteClass)
def _make_failed_test(classname, methodname, exception, suiteClass):
def testFailure(self):
raise exception
attrs = {methodname: testFailure}
TestClass = type(classname, (case.TestCase,), attrs)
return suiteClass((TestClass(methodname),))
class TestLoader(object):
"""
This class is responsible for loading tests according to various criteria
and returning them wrapped in a TestSuite
"""
testMethodPrefix = 'test'
sortTestMethodsUsing = cmp
suiteClass = suite.TestSuite
_top_level_dir = None
def loadTestsFromTestCase(self, testCaseClass):
"""Return a suite of all tests cases contained in testCaseClass"""
if issubclass(testCaseClass, suite.TestSuite):
raise TypeError("Test cases should not be derived from TestSuite." \
" Maybe you meant to derive from TestCase?")
testCaseNames = self.getTestCaseNames(testCaseClass)
if not testCaseNames and hasattr(testCaseClass, 'runTest'):
testCaseNames = ['runTest']
loaded_suite = self.suiteClass(map(testCaseClass, testCaseNames))
return loaded_suite
def loadTestsFromModule(self, module, use_load_tests=True):
"""Return a suite of all tests cases contained in the given module"""
tests = []
for name in dir(module):
obj = getattr(module, name)
if isinstance(obj, type) and issubclass(obj, case.TestCase):
tests.append(self.loadTestsFromTestCase(obj))
load_tests = getattr(module, 'load_tests', None)
tests = self.suiteClass(tests)
if use_load_tests and load_tests is not None:
try:
return load_tests(self, tests, None)
except Exception, e:
return _make_failed_load_tests(module.__name__, e,
self.suiteClass)
return tests
def loadTestsFromName(self, name, module=None):
"""Return a suite of all tests cases given a string specifier.
The name may resolve either to a module, a test case class, a
test method within a test case class, or a callable object which
returns a TestCase or TestSuite instance.
The method optionally resolves the names relative to a given module.
"""
parts = name.split('.')
if module is None:
parts_copy = parts[:]
while parts_copy:
try:
module = __import__('.'.join(parts_copy))
break
except ImportError:
del parts_copy[-1]
if not parts_copy:
raise
parts = parts[1:]
obj = module
for part in parts:
parent, obj = obj, getattr(obj, part)
if isinstance(obj, types.ModuleType):
return self.loadTestsFromModule(obj)
elif isinstance(obj, type) and issubclass(obj, case.TestCase):
return self.loadTestsFromTestCase(obj)
elif (isinstance(obj, types.UnboundMethodType) and
isinstance(parent, type) and
issubclass(parent, case.TestCase)):
name = parts[-1]
inst = parent(name)
return self.suiteClass([inst])
elif isinstance(obj, suite.TestSuite):
return obj
elif hasattr(obj, '__call__'):
test = obj()
if isinstance(test, suite.TestSuite):
return test
elif isinstance(test, case.TestCase):
return self.suiteClass([test])
else:
raise TypeError("calling %s returned %s, not a test" %
(obj, test))
else:
raise TypeError("don't know how to make test from: %s" % obj)
def loadTestsFromNames(self, names, module=None):
"""Return a suite of all tests cases found using the given sequence
of string specifiers. See 'loadTestsFromName()'.
"""
suites = [self.loadTestsFromName(name, module) for name in names]
return self.suiteClass(suites)
def getTestCaseNames(self, testCaseClass):
"""Return a sorted sequence of method names found within testCaseClass
"""
def isTestMethod(attrname, testCaseClass=testCaseClass,
prefix=self.testMethodPrefix):
return attrname.startswith(prefix) and \
hasattr(getattr(testCaseClass, attrname), '__call__')
# testFnNames = filter(isTestMethod, dir(testCaseClass))
testFnNames = [x for x in dir(testCaseClass) if isTestMethod(x)]
if self.sortTestMethodsUsing:
testFnNames.sort(key=_CmpToKey(self.sortTestMethodsUsing))
return testFnNames
def discover(self, start_dir, pattern='test*.py', top_level_dir=None):
"""Find and return all test modules from the specified start
directory, recursing into subdirectories to find them. Only test files
that match the pattern will be loaded. (Using shell style pattern
matching.)
All test modules must be importable from the top level of the project.
If the start directory is not the top level directory then the top
level directory must be specified separately.
If a test package name (directory with '__init__.py') matches the
pattern then the package will be checked for a 'load_tests' function. If
this exists then it will be called with loader, tests, pattern.
If load_tests exists then discovery does *not* recurse into the package,
load_tests is responsible for loading all tests in the package.
The pattern is deliberately not stored as a loader attribute so that
packages can continue discovery themselves. top_level_dir is stored so
load_tests does not need to pass this argument in to loader.discover().
"""
set_implicit_top = False
if top_level_dir is None and self._top_level_dir is not None:
# make top_level_dir optional if called from load_tests in a package
top_level_dir = self._top_level_dir
elif top_level_dir is None:
set_implicit_top = True
top_level_dir = start_dir
top_level_dir = os.path.abspath(top_level_dir)
if not top_level_dir in sys.path:
# all test modules must be importable from the top level directory
# should we *unconditionally* put the start directory in first
# in sys.path to minimise likelihood of conflicts between installed
# modules and development versions?
sys.path.insert(0, top_level_dir)
self._top_level_dir = top_level_dir
is_not_importable = False
if os.path.isdir(os.path.abspath(start_dir)):
start_dir = os.path.abspath(start_dir)
if start_dir != top_level_dir:
is_not_importable = not os.path.isfile(os.path.join(start_dir, '__init__.py'))
else:
# support for discovery from dotted module names
try:
__import__(start_dir)
except ImportError:
is_not_importable = True
else:
the_module = sys.modules[start_dir]
top_part = start_dir.split('.')[0]
start_dir = os.path.abspath(os.path.dirname((the_module.__file__)))
if set_implicit_top:
self._top_level_dir = self._get_directory_containing_module(top_part)
sys.path.remove(top_level_dir)
if is_not_importable:
raise ImportError('Start directory is not importable: %r' % start_dir)
tests = list(self._find_tests(start_dir, pattern))
return self.suiteClass(tests)
def _get_directory_containing_module(self, module_name):
module = sys.modules[module_name]
full_path = os.path.abspath(module.__file__)
if os.path.basename(full_path).lower().startswith('__init__.py'):
return os.path.dirname(os.path.dirname(full_path))
else:
# here we have been given a module rather than a package - so
# all we can do is search the *same* directory the module is in
# should an exception be raised instead
return os.path.dirname(full_path)
def _get_name_from_path(self, path):
path = os.path.splitext(os.path.normpath(path))[0]
_relpath = os.path.relpath(path, self._top_level_dir)
assert not os.path.isabs(_relpath), "Path must be within the project"
assert not _relpath.startswith('..'), "Path must be within the project"
name = _relpath.replace(os.path.sep, '.')
return name
def _get_module_from_name(self, name):
__import__(name)
return sys.modules[name]
def _match_path(self, path, full_path, pattern):
# override this method to use alternative matching strategy
return fnmatch(path, pattern)
def _find_tests(self, start_dir, pattern):
"""Used by discovery. Yields test suites it loads."""
paths = os.listdir(start_dir)
for path in paths:
full_path = os.path.join(start_dir, path)
if os.path.isfile(full_path):
if not VALID_MODULE_NAME.match(path):
# valid Python identifiers only
continue
if not self._match_path(path, full_path, pattern):
continue
# if the test file matches, load it
name = self._get_name_from_path(full_path)
try:
module = self._get_module_from_name(name)
except:
yield _make_failed_import_test(name, self.suiteClass)
else:
mod_file = os.path.abspath(getattr(module, '__file__', full_path))
realpath = os.path.splitext(os.path.realpath(mod_file))[0]
fullpath_noext = os.path.splitext(os.path.realpath(full_path))[0]
if realpath.lower() != fullpath_noext.lower():
module_dir = os.path.dirname(realpath)
mod_name = os.path.splitext(os.path.basename(full_path))[0]
expected_dir = os.path.dirname(full_path)
msg = ("%r module incorrectly imported from %r. Expected %r. "
"Is this module globally installed?")
raise ImportError(msg % (mod_name, module_dir, expected_dir))
yield self.loadTestsFromModule(module)
elif os.path.isdir(full_path):
if not os.path.isfile(os.path.join(full_path, '__init__.py')):
continue
load_tests = None
tests = None
if fnmatch(path, pattern):
# only check load_tests if the package directory itself matches the filter
name = self._get_name_from_path(full_path)
package = self._get_module_from_name(name)
load_tests = getattr(package, 'load_tests', None)
tests = self.loadTestsFromModule(package, use_load_tests=False)
if load_tests is None:
if tests is not None:
# tests loaded from package file
yield tests
# recurse into the package
for test in self._find_tests(full_path, pattern):
yield test
else:
try:
yield load_tests(self, tests, pattern)
except Exception, e:
yield _make_failed_load_tests(package.__name__, e,
self.suiteClass)
defaultTestLoader = TestLoader()
def _makeLoader(prefix, sortUsing, suiteClass=None):
loader = TestLoader()
loader.sortTestMethodsUsing = sortUsing
loader.testMethodPrefix = prefix
if suiteClass:
loader.suiteClass = suiteClass
return loader
def getTestCaseNames(testCaseClass, prefix, sortUsing=cmp):
return _makeLoader(prefix, sortUsing).getTestCaseNames(testCaseClass)
def makeSuite(testCaseClass, prefix='test', sortUsing=cmp,
suiteClass=suite.TestSuite):
return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromTestCase(testCaseClass)
def findTestCases(module, prefix='test', sortUsing=cmp,
suiteClass=suite.TestSuite):
return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromModule(module)
"""Test result object"""
import os
import sys
import traceback
# from StringIO import StringIO
import StringIO as _StringIO
StringIO = _StringIO.StringIO
# from . import util
# from functools import wraps
import unittest_util as util
import functools
wraps = functools.wraps
__unittest = True
def failfast(method):
# @wraps(method)
def inner(self, *args, **kw):
if getattr(self, 'failfast', False):
self.stop()
return method(self, *args, **kw)
inner = wraps(method)(inner)
return inner
STDOUT_LINE = '\nStdout:\n%s'
STDERR_LINE = '\nStderr:\n%s'
class TestResult(object):
"""Holder for test result information.
Test results are automatically managed by the TestCase and TestSuite
classes, and do not need to be explicitly manipulated by writers of tests.
Each instance holds the total number of tests run, and collections of
failures and errors that occurred among those test runs. The collections
contain tuples of (testcase, exceptioninfo), where exceptioninfo is the
formatted traceback of the error that occurred.
"""
_previousTestClass = None
_testRunEntered = False
_moduleSetUpFailed = False
def __init__(self, stream=None, descriptions=None, verbosity=None):
self.failfast = False
self.failures = []
self.errors = []
self.testsRun = 0
self.skipped = []
self.expectedFailures = []
self.unexpectedSuccesses = []
self.shouldStop = False
self.buffer = False
self._stdout_buffer = None
self._stderr_buffer = None
self._original_stdout = sys.stdout
self._original_stderr = sys.stderr
self._mirrorOutput = False
def printErrors(self):
"Called by TestRunner after test run"
def startTest(self, test):
"Called when the given test is about to be run"
self.testsRun += 1
self._mirrorOutput = False
self._setupStdout()
def _setupStdout(self):
if self.buffer:
if self._stderr_buffer is None:
self._stderr_buffer = StringIO()
self._stdout_buffer = StringIO()
sys.stdout = self._stdout_buffer
sys.stderr = self._stderr_buffer
def startTestRun(self):
"""Called once before any tests are executed.
See startTest for a method called before each test.
"""
def stopTest(self, test):
"""Called when the given test has been run"""
self._restoreStdout()
self._mirrorOutput = False
def _restoreStdout(self):
if self.buffer:
if self._mirrorOutput:
output = sys.stdout.getvalue()
error = sys.stderr.getvalue()
if output:
if not output.endswith('\n'):
output += '\n'
self._original_stdout.write(STDOUT_LINE % output)
if error:
if not error.endswith('\n'):
error += '\n'
self._original_stderr.write(STDERR_LINE % error)
sys.stdout = self._original_stdout
sys.stderr = self._original_stderr
self._stdout_buffer.seek(0)
self._stdout_buffer.truncate()
self._stderr_buffer.seek(0)
self._stderr_buffer.truncate()
def stopTestRun(self):
"""Called once after all tests are executed.
See stopTest for a method called after each test.
"""
# @failfast
def addError(self, test, err):
"""Called when an error has occurred. 'err' is a tuple of values as
returned by sys.exc_info().
"""
self.errors.append((test, self._exc_info_to_string(err, test)))
self._mirrorOutput = True
addError = failfast(addError)
# @failfast
def addFailure(self, test, err):
"""Called when an error has occurred. 'err' is a tuple of values as
returned by sys.exc_info()."""
self.failures.append((test, self._exc_info_to_string(err, test)))
self._mirrorOutput = True
addFailure = failfast(addFailure)
def addSuccess(self, test):
"Called when a test has completed successfully"
pass
def addSkip(self, test, reason):
"""Called when a test is skipped."""
self.skipped.append((test, reason))
def addExpectedFailure(self, test, err):
"""Called when an expected failure/error occurred."""
self.expectedFailures.append(
(test, self._exc_info_to_string(err, test)))
# @failfast
def addUnexpectedSuccess(self, test):
"""Called when a test was expected to fail, but succeed."""
self.unexpectedSuccesses.append(test)
addUnexpectedSuccess = failfast(addUnexpectedSuccess)
def wasSuccessful(self):
"Tells whether or not this result was a success"
return len(self.failures) == len(self.errors) == 0
def stop(self):
"Indicates that the tests should be aborted"
self.shouldStop = True
def _exc_info_to_string(self, err, test):
"""Converts a sys.exc_info()-style tuple of values into a string."""
exctype, value, tb = err
# Skip test runner traceback levels
while tb and self._is_relevant_tb_level(tb):
tb = tb.tb_next
if exctype is test.failureException:
# Skip assert*() traceback levels
length = self._count_relevant_tb_levels(tb)
msgLines = traceback.format_exception(exctype, value, tb, length)
else:
msgLines = traceback.format_exception(exctype, value, tb)
if self.buffer:
output = sys.stdout.getvalue()
error = sys.stderr.getvalue()
if output:
if not output.endswith('\n'):
output += '\n'
msgLines.append(STDOUT_LINE % output)
if error:
if not error.endswith('\n'):
error += '\n'
msgLines.append(STDERR_LINE % error)
return ''.join(msgLines)
def _is_relevant_tb_level(self, tb):
return '__unittest' in tb.tb_frame.f_globals
def _count_relevant_tb_levels(self, tb):
length = 0
while tb and not self._is_relevant_tb_level(tb):
length += 1
tb = tb.tb_next
return length
def __repr__(self):
return ("<%s run=%i errors=%i failures=%i>" %
(util.strclass(self.__class__), self.testsRun, len(self.errors),
len(self.failures)))
"""Running tests"""
import sys
import time
# from . import result
# from .signals import registerResult
import unittest_result as result
import unittest_signals
registerResult = unittest_signals.registerResult
__unittest = True
class _WritelnDecorator(object):
"""Used to decorate file-like objects with a handy 'writeln' method"""
def __init__(self, stream):
self.stream = stream
def __getattr__(self, attr):
if attr in ('stream', '__getstate__'):
raise AttributeError(attr)
return getattr(self.stream,attr)
def writeln(self, arg=None):
if arg:
self.write(arg)
self.write('\n') # text-mode streams translate to \r\n if needed
def write(self, arg):
self.stream.write(arg)
def flush(self):
pass
class TextTestResult(result.TestResult):
"""A test result class that can print formatted text results to a stream.
Used by TextTestRunner.
"""
separator1 = '=' * 70
separator2 = '-' * 70
def __init__(self, stream, descriptions, verbosity):
super(TextTestResult, self).__init__(stream, descriptions, verbosity)
self.stream = stream
self.showAll = verbosity > 1
self.dots = verbosity == 1
self.descriptions = descriptions
def getDescription(self, test):
doc_first_line = test.shortDescription()
if self.descriptions and doc_first_line:
return '\n'.join((str(test), doc_first_line))
else:
return str(test)
def startTest(self, test):
super(TextTestResult, self).startTest(test)
if self.showAll:
self.stream.write(self.getDescription(test))
self.stream.write(" ... ")
self.stream.flush()
def addSuccess(self, test):
super(TextTestResult, self).addSuccess(test)
if self.showAll:
self.stream.writeln("ok")
elif self.dots:
self.stream.write('.')
self.stream.flush()
def addError(self, test, err):
super(TextTestResult, self).addError(test, err)
if self.showAll:
self.stream.writeln("ERROR")
elif self.dots:
self.stream.write('E')
self.stream.flush()
def addFailure(self, test, err):
super(TextTestResult, self).addFailure(test, err)
if self.showAll:
self.stream.writeln("FAIL")
elif self.dots:
self.stream.write('F')
self.stream.flush()
def addSkip(self, test, reason):
super(TextTestResult, self).addSkip(test, reason)
if self.showAll:
self.stream.writeln("skipped {0!r}".format(reason))
elif self.dots:
self.stream.write("s")
self.stream.flush()
def addExpectedFailure(self, test, err):
super(TextTestResult, self).addExpectedFailure(test, err)
if self.showAll:
self.stream.writeln("expected failure")
elif self.dots:
self.stream.write("x")
self.stream.flush()
def addUnexpectedSuccess(self, test):
super(TextTestResult, self).addUnexpectedSuccess(test)
if self.showAll:
self.stream.writeln("unexpected success")
elif self.dots:
self.stream.write("u")
self.stream.flush()
def printErrors(self):
if self.dots or self.showAll:
self.stream.writeln()
self.printErrorList('ERROR', self.errors)
self.printErrorList('FAIL', self.failures)
def printErrorList(self, flavour, errors):
for test, err in errors:
self.stream.writeln(self.separator1)
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
self.stream.writeln(self.separator2)
self.stream.writeln("%s" % err)
class TextTestRunner(object):
"""A test runner class that displays results in textual form.
It prints out the names of tests as they are run, errors as they
occur, and a summary of the results at the end of the test run.
"""
resultclass = TextTestResult
def __init__(self, stream=sys.stderr, descriptions=True, verbosity=1,
failfast=False, buffer=False, resultclass=None):
self.stream = _WritelnDecorator(stream)
self.descriptions = descriptions
self.verbosity = verbosity
self.failfast = failfast
self.buffer = buffer
if resultclass is not None:
self.resultclass = resultclass
def _makeResult(self):
return self.resultclass(self.stream, self.descriptions, self.verbosity)
def run(self, test):
"Run the given test case or test suite."
result = self._makeResult()
registerResult(result)
result.failfast = self.failfast
result.buffer = self.buffer
startTime = time.time()
startTestRun = getattr(result, 'startTestRun', None)
if startTestRun is not None:
startTestRun()
try:
test(result)
finally:
stopTestRun = getattr(result, 'stopTestRun', None)
if stopTestRun is not None:
stopTestRun()
stopTime = time.time()
timeTaken = stopTime - startTime
result.printErrors()
if hasattr(result, 'separator2'):
self.stream.writeln(result.separator2)
run = result.testsRun
# self.stream.writeln("Ran %d test%s in %.3fs" %
self.stream.writeln("Ran %d test%s in %fs" %
(run, run != 1 and "s" or "", timeTaken))
self.stream.writeln()
expectedFails = unexpectedSuccesses = skipped = 0
try:
results = map(len, (result.expectedFailures,
result.unexpectedSuccesses,
result.skipped))
except AttributeError:
pass
else:
expectedFails, unexpectedSuccesses, skipped = results
infos = []
if not result.wasSuccessful():
self.stream.write("FAILED")
failed, errored = map(len, (result.failures, result.errors))
if failed:
infos.append("failures=%d" % failed)
if errored:
infos.append("errors=%d" % errored)
else:
self.stream.write("OK")
if skipped:
infos.append("skipped=%d" % skipped)
if expectedFails:
infos.append("expected failures=%d" % expectedFails)
if unexpectedSuccesses:
infos.append("unexpected successes=%d" % unexpectedSuccesses)
if infos:
self.stream.writeln(" (%s)" % (", ".join(infos),))
else:
self.stream.write("\n")
return result
# TODO: support signal
# import signal
import weakref
# from functools import wraps
import functools
wraps = functools.wraps
__unittest = True
class _InterruptHandler(object):
pass
# def __init__(self, default_handler):
# self.called = False
# self.original_handler = default_handler
# if isinstance(default_handler, int):
# print 'signal not supported yet'
# if default_handler == signal.SIG_DFL:
# # Pretend it's signal.default_int_handler instead.
# default_handler = signal.default_int_handler
# elif default_handler == signal.SIG_IGN:
# # Not quite the same thing as SIG_IGN, but the closest we
# # can make it: do nothing.
# def default_handler(unused_signum, unused_frame):
# pass
# else:
# raise TypeError("expected SIGINT signal handler to be "
# "signal.SIG_IGN, signal.SIG_DFL, or a "
# "callable object")
# self.default_handler = default_handler
# def __call__(self, signum, frame):
# installed_handler = signal.getsignal(signal.SIGINT)
# if installed_handler is not self:
# # if we aren't the installed handler, then delegate immediately
# # to the default handler
# self.default_handler(signum, frame)
# if self.called:
# self.default_handler(signum, frame)
# self.called = True
# for result in _results.keys():
# result.stop()
_results = weakref.WeakKeyDictionary()
def registerResult(result):
_results[result] = 1
def removeResult(result):
return bool(_results.pop(result, None))
_interrupt_handler = None
def installHandler():
global _interrupt_handler
pass
# if _interrupt_handler is None:
# default_handler = signal.getsignal(signal.SIGINT)
# _interrupt_handler = _InterruptHandler(default_handler)
# signal.signal(signal.SIGINT, _interrupt_handler)
def removeHandler(method=None):
pass
# if method is not None:
# # @wraps(method)
# def inner(*args, **kwargs):
# initial = signal.getsignal(signal.SIGINT)
# removeHandler()
# try:
# return method(*args, **kwargs)
# finally:
# signal.signal(signal.SIGINT, initial)
# inner = wraps(method)(inner)
# return inner
# global _interrupt_handler
# if _interrupt_handler is not None:
# signal.signal(signal.SIGINT, _interrupt_handler.original_handler)
"""TestSuite"""
import sys
# from . import case
# from . import util
import unittest_case as case
import unittest_util as util
__unittest = True
def _call_if_exists(parent, attr):
func = getattr(parent, attr, lambda: None)
func()
class BaseTestSuite(object):
"""A simple test suite that doesn't provide class or module shared fixtures.
"""
def __init__(self, tests=()):
self._tests = []
self.addTests(tests)
def __repr__(self):
return "<%s tests=%s>" % (util.strclass(self.__class__), list(self))
def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
return list(self) == list(other)
def __ne__(self, other):
return not self == other
# Can't guarantee hash invariant, so flag as unhashable
__hash__ = None
def __iter__(self):
return iter(self._tests)
def countTestCases(self):
cases = 0
for test in self:
cases += test.countTestCases()
return cases
def addTest(self, test):
# sanity checks
if not hasattr(test, '__call__'):
raise TypeError("{} is not callable".format(repr(test)))
if isinstance(test, type) and issubclass(test,
(case.TestCase, TestSuite)):
raise TypeError("TestCases and TestSuites must be instantiated "
"before passing them to addTest()")
self._tests.append(test)
def addTests(self, tests):
if isinstance(tests, basestring):
raise TypeError("tests must be an iterable of tests, not a string")
for test in tests:
self.addTest(test)
def run(self, result):
for test in self:
if result.shouldStop:
break
test(result)
return result
def __call__(self, *args, **kwds):
return self.run(*args, **kwds)
def debug(self):
"""Run the tests without collecting errors in a TestResult"""
for test in self:
test.debug()
class TestSuite(BaseTestSuite):
"""A test suite is a composite test consisting of a number of TestCases.
For use, create an instance of TestSuite, then add test case instances.
When all tests have been added, the suite can be passed to a test
runner, such as TextTestRunner. It will run the individual test cases
in the order in which they were added, aggregating the results. When
subclassing, do not forget to call the base class constructor.
"""
def run(self, result, debug=False):
topLevel = False
if getattr(result, '_testRunEntered', False) is False:
result._testRunEntered = topLevel = True
for test in self:
if result.shouldStop:
break
if _isnotsuite(test):
self._tearDownPreviousClass(test, result)
self._handleModuleFixture(test, result)
self._handleClassSetUp(test, result)
result._previousTestClass = test.__class__
if (getattr(test.__class__, '_classSetupFailed', False) or
getattr(result, '_moduleSetUpFailed', False)):
continue
if not debug:
test(result)
else:
test.debug()
if topLevel:
self._tearDownPreviousClass(None, result)
self._handleModuleTearDown(result)
result._testRunEntered = False
return result
def debug(self):
"""Run the tests without collecting errors in a TestResult"""
debug = _DebugResult()
self.run(debug, True)
################################
def _handleClassSetUp(self, test, result):
previousClass = getattr(result, '_previousTestClass', None)
currentClass = test.__class__
if currentClass == previousClass:
return
if result._moduleSetUpFailed:
return
if getattr(currentClass, "__unittest_skip__", False):
return
try:
currentClass._classSetupFailed = False
except TypeError:
# test may actually be a function
# so its class will be a builtin-type
pass
setUpClass = getattr(currentClass, 'setUpClass', None)
if setUpClass is not None:
_call_if_exists(result, '_setupStdout')
try:
setUpClass()
except Exception as e:
if isinstance(result, _DebugResult):
raise
currentClass._classSetupFailed = True
className = util.strclass(currentClass)
errorName = 'setUpClass (%s)' % className
self._addClassOrModuleLevelException(result, e, errorName)
finally:
_call_if_exists(result, '_restoreStdout')
def _get_previous_module(self, result):
previousModule = None
previousClass = getattr(result, '_previousTestClass', None)
if previousClass is not None:
previousModule = previousClass.__module__
return previousModule
def _handleModuleFixture(self, test, result):
previousModule = self._get_previous_module(result)
currentModule = test.__class__.__module__
if currentModule == previousModule:
return
self._handleModuleTearDown(result)
result._moduleSetUpFailed = False
try:
module = sys.modules[currentModule]
except KeyError:
return
setUpModule = getattr(module, 'setUpModule', None)
if setUpModule is not None:
_call_if_exists(result, '_setupStdout')
try:
setUpModule()
except Exception, e:
if isinstance(result, _DebugResult):
raise
result._moduleSetUpFailed = True
errorName = 'setUpModule (%s)' % currentModule
self._addClassOrModuleLevelException(result, e, errorName)
finally:
_call_if_exists(result, '_restoreStdout')
def _addClassOrModuleLevelException(self, result, exception, errorName):
error = _ErrorHolder(errorName)
addSkip = getattr(result, 'addSkip', None)
if addSkip is not None and isinstance(exception, case.SkipTest):
addSkip(error, str(exception))
else:
result.addError(error, sys.exc_info())
def _handleModuleTearDown(self, result):
previousModule = self._get_previous_module(result)
if previousModule is None:
return
if result._moduleSetUpFailed:
return
try:
module = sys.modules[previousModule]
except KeyError:
return
tearDownModule = getattr(module, 'tearDownModule', None)
if tearDownModule is not None:
_call_if_exists(result, '_setupStdout')
try:
tearDownModule()
except Exception as e:
if isinstance(result, _DebugResult):
raise
errorName = 'tearDownModule (%s)' % previousModule
self._addClassOrModuleLevelException(result, e, errorName)
finally:
_call_if_exists(result, '_restoreStdout')
def _tearDownPreviousClass(self, test, result):
previousClass = getattr(result, '_previousTestClass', None)
currentClass = test.__class__
if currentClass == previousClass:
return
if getattr(previousClass, '_classSetupFailed', False):
return
if getattr(result, '_moduleSetUpFailed', False):
return
if getattr(previousClass, "__unittest_skip__", False):
return
tearDownClass = getattr(previousClass, 'tearDownClass', None)
if tearDownClass is not None:
_call_if_exists(result, '_setupStdout')
try:
tearDownClass()
except Exception, e:
if isinstance(result, _DebugResult):
raise
className = util.strclass(previousClass)
errorName = 'tearDownClass (%s)' % className
self._addClassOrModuleLevelException(result, e, errorName)
finally:
_call_if_exists(result, '_restoreStdout')
class _ErrorHolder(object):
"""
Placeholder for a TestCase inside a result. As far as a TestResult
is concerned, this looks exactly like a unit test. Used to insert
arbitrary errors into a test suite run.
"""
# Inspired by the ErrorHolder from Twisted:
# http://twistedmatrix.com/trac/browser/trunk/twisted/trial/runner.py
# attribute used by TestResult._exc_info_to_string
failureException = None
def __init__(self, description):
self.description = description
def id(self):
return self.description
def shortDescription(self):
return None
def __repr__(self):
return "<ErrorHolder description=%r>" % (self.description,)
def __str__(self):
return self.id()
def run(self, result):
# could call result.addError(...) - but this test-like object
# shouldn't be run anyway
pass
def __call__(self, result):
return self.run(result)
def countTestCases(self):
return 0
def _isnotsuite(test):
"A crude way to tell apart testcases and suites with duck-typing"
try:
iter(test)
except TypeError:
return True
return False
class _DebugResult(object):
"Used by the TestSuite to hold previous class when running in debug."
_previousTestClass = None
_moduleSetUpFailed = False
shouldStop = False
"""Various utility functions."""
# from collections import namedtuple, OrderedDict
import operator
_itemgetter = operator.itemgetter
_property = property
_tuple = tuple
__unittest = True
_MAX_LENGTH = 80
def safe_repr(obj, short=False):
try:
result = repr(obj)
except Exception:
result = object.__repr__(obj)
if not short or len(result) < _MAX_LENGTH:
return result
return result[:_MAX_LENGTH] + ' [truncated]...'
def strclass(cls):
return "%s.%s" % (cls.__module__, cls.__name__)
def sorted_list_difference(expected, actual):
"""Finds elements in only one or the other of two, sorted input lists.
Returns a two-element tuple of lists. The first list contains those
elements in the "expected" list but not in the "actual" list, and the
second contains those elements in the "actual" list but not in the
"expected" list. Duplicate elements in either input list are ignored.
"""
i = j = 0
missing = []
unexpected = []
while True:
try:
e = expected[i]
a = actual[j]
if e < a:
missing.append(e)
i += 1
while expected[i] == e:
i += 1
elif e > a:
unexpected.append(a)
j += 1
while actual[j] == a:
j += 1
else:
i += 1
try:
while expected[i] == e:
i += 1
finally:
j += 1
while actual[j] == a:
j += 1
except IndexError:
missing.extend(expected[i:])
unexpected.extend(actual[j:])
break
return missing, unexpected
def unorderable_list_difference(expected, actual, ignore_duplicate=False):
"""Same behavior as sorted_list_difference but
for lists of unorderable items (like dicts).
As it does a linear search per item (remove) it
has O(n*n) performance.
"""
missing = []
unexpected = []
while expected:
item = expected.pop()
try:
actual.remove(item)
except ValueError:
missing.append(item)
if ignore_duplicate:
for lst in expected, actual:
try:
while True:
lst.remove(item)
except ValueError:
pass
if ignore_duplicate:
while actual:
item = actual.pop()
unexpected.append(item)
try:
while True:
actual.remove(item)
except ValueError:
pass
return missing, unexpected
# anything left in actual is unexpected
return missing, actual
# _Mismatch = namedtuple('Mismatch', 'actual expected value')
class _Mismatch(tuple):
'Mismatch(actual, expected, value)'
__slots__ = ()
_fields = ('actual', 'expected', 'value')
def __new__(_cls, actual, expected, value):
'Create new instance of Mismatch(actual, expected, value)'
return _tuple.__new__(_cls, (actual, expected, value))
# @classmethod
def _make(cls, iterable, new=tuple.__new__, len=len):
'Make a new Mismatch object from a sequence or iterable'
result = new(cls, iterable)
if len(result) != 3:
raise TypeError('Expected 3 arguments, got %d' % len(result))
return result
_make = classmethod(_make)
def __repr__(self):
'Return a nicely formatted representation string'
return 'Mismatch(actual=%r, expected=%r, value=%r)' % self
def _asdict(self):
'Return a new OrderedDict which maps field names to their values'
# return OrderedDict(zip(self._fields, self))
return dict(zip(self._fields, self))
def _replace(_self, **kwds):
'Return a new Mismatch object replacing specified fields with new values'
result = _self._make(map(kwds.pop, ('actual', 'expected', 'value'), _self))
if kwds:
raise ValueError('Got unexpected field names: %r' % kwds.keys())
return result
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return tuple(self)
__dict__ = _property(_asdict)
def __getstate__(self):
'Exclude the OrderedDict from pickling'
pass
actual = _property(_itemgetter(0), doc='Alias for field number 0')
expected = _property(_itemgetter(1), doc='Alias for field number 1')
value = _property(_itemgetter(2), doc='Alias for field number 2')
def _count_diff_all_purpose(actual, expected):
'Returns list of (cnt_act, cnt_exp, elem) triples where the counts differ'
# elements need not be hashable
s, t = list(actual), list(expected)
m, n = len(s), len(t)
NULL = object()
result = []
for i, elem in enumerate(s):
if elem is NULL:
continue
cnt_s = cnt_t = 0
for j in range(i, m):
if s[j] == elem:
cnt_s += 1
s[j] = NULL
for j, other_elem in enumerate(t):
if other_elem == elem:
cnt_t += 1
t[j] = NULL
if cnt_s != cnt_t:
diff = _Mismatch(cnt_s, cnt_t, elem)
result.append(diff)
for i, elem in enumerate(t):
if elem is NULL:
continue
cnt_t = 0
for j in range(i, n):
if t[j] == elem:
cnt_t += 1
t[j] = NULL
diff = _Mismatch(0, cnt_t, elem)
result.append(diff)
return result
def _ordered_count(iterable):
'Return dict of element counts, in the order they were first seen'
c = {} #OrderedDict()
for elem in iterable:
c[elem] = c.get(elem, 0) + 1
return c
def _count_diff_hashable(actual, expected):
'Returns list of (cnt_act, cnt_exp, elem) triples where the counts differ'
# elements must be hashable
s, t = _ordered_count(actual), _ordered_count(expected)
result = []
for elem, cnt_s in s.items():
cnt_t = t.get(elem, 0)
if cnt_s != cnt_t:
diff = _Mismatch(cnt_s, cnt_t, elem)
result.append(diff)
for elem, cnt_t in t.items():
if elem not in s:
diff = _Mismatch(0, cnt_t, elem)
result.append(diff)
return result
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment