test_support.py 20.9 KB
Newer Older
1
"""Supporting definitions for the Python regression tests."""
2

3 4 5
if __name__ != 'test.test_support':
    raise ImportError, 'test_support must be imported from the test package'

6 7 8
import contextlib
import errno
import socket
9
import sys
10
import warnings
11
import types
12

13
class Error(Exception):
14
    """Base class for regression test exceptions."""
15 16

class TestFailed(Error):
17
    """Test failed."""
18 19

class TestSkipped(Error):
20
    """Test skipped.
21

22 23 24 25 26 27
    This can be raised to indicate that a test was deliberatly
    skipped, but not because a feature wasn't available.  For
    example, if some resource can't be used, such as the network
    appears to be unavailable, this should be raised instead of
    TestFailed.
    """
Guido van Rossum's avatar
Guido van Rossum committed
28

29 30 31 32 33 34 35 36
class ResourceDenied(TestSkipped):
    """Test skipped because it requested a disallowed resource.

    This is raised when a test calls requires() for a resource that
    has not be enabled.  It is used to distinguish between expected
    and unexpected skips.
    """

37
verbose = 1              # Flag set to 0 by regrtest.py
38 39 40
use_resources = None     # Flag set to [] by regrtest.py
max_memuse = 0           # Disable bigmem tests (they will still be run with
                         # small sizes, to make sure they work.)
41

42 43 44 45 46 47 48 49 50 51 52
# _original_stdout is meant to hold stdout at the time regrtest began.
# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
# The point is to have some flavor of stdout the user can actually see.
_original_stdout = None
def record_original_stdout(stdout):
    global _original_stdout
    _original_stdout = stdout

def get_original_stdout():
    return _original_stdout or sys.stdout

Guido van Rossum's avatar
Guido van Rossum committed
53
def unload(name):
54 55 56 57
    try:
        del sys.modules[name]
    except KeyError:
        pass
Guido van Rossum's avatar
Guido van Rossum committed
58

59 60 61 62 63 64 65
def unlink(filename):
    import os
    try:
        os.unlink(filename)
    except OSError:
        pass

Guido van Rossum's avatar
Guido van Rossum committed
66
def forget(modname):
67 68
    '''"Forget" a module was ever imported by removing it from sys.modules and
    deleting any .pyc and .pyo files.'''
69
    unload(modname)
70
    import os
71
    for dirname in sys.path:
72
        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
73 74 75
        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
        # the chance exists that there is no .pyc (and thus the 'try' statement
        # is exited) but there is a .pyo file.
76
        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
Guido van Rossum's avatar
Guido van Rossum committed
77

78
def is_resource_enabled(resource):
79 80
    """Test whether a resource is enabled.  Known resources are set by
    regrtest.py."""
81 82
    return use_resources is not None and resource in use_resources

83
def requires(resource, msg=None):
84 85 86 87
    """Raise ResourceDenied if the specified resource is not available.

    If the caller's module is __main__ then automatically return True.  The
    possibility of False being returned occurs when regrtest.py is executing."""
88 89 90 91
    # see if the caller's module is __main__ - if so, treat as if
    # the resource was set
    if sys._getframe().f_back.f_globals.get("__name__") == "__main__":
        return
92
    if not is_resource_enabled(resource):
93 94
        if msg is None:
            msg = "Use of the `%s' resource not enabled" % resource
95
        raise ResourceDenied(msg)
96

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
def bind_port(sock, host='', preferred_port=54321):
    """Try to bind the sock to a port.  If we are running multiple
    tests and we don't try multiple ports, the test can fails.  This
    makes the test more robust."""

    import socket, errno
    # some random ports that hopefully no one is listening on.
    for port in [preferred_port, 9907, 10243, 32999]:
        try:
            sock.bind((host, port))
            return port
        except socket.error, (err, msg):
            if err != errno.EADDRINUSE:
                raise
            print >>sys.__stderr__, \
                '  WARNING: failed to listen on port %d, trying another' % port
    raise TestFailed, 'unable to find port to listen on'

115 116 117
FUZZ = 1e-6

def fcmp(x, y): # fuzzy comparison function
118 119 120 121 122 123 124 125 126 127 128
    if type(x) == type(0.0) or type(y) == type(0.0):
        try:
            x, y = coerce(x, y)
            fuzz = (abs(x) + abs(y)) * FUZZ
            if abs(x-y) <= fuzz:
                return 0
        except:
            pass
    elif type(x) == type(y) and type(x) in (type(()), type([])):
        for i in range(min(len(x), len(y))):
            outcome = fcmp(x[i], y[i])
129
            if outcome != 0:
130 131 132
                return outcome
        return cmp(len(x), len(y))
    return cmp(x, y)
133

134 135 136 137 138 139
try:
    unicode
    have_unicode = 1
except NameError:
    have_unicode = 0

140 141
is_jython = sys.platform.startswith('java')

142
import os
143 144 145 146
# Filename used for testing
if os.name == 'java':
    # Jython disallows @ in module names
    TESTFN = '$test'
147 148 149
elif os.name == 'riscos':
    TESTFN = 'testfile'
else:
150
    TESTFN = '@test'
151
    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
152
    if have_unicode:
153 154 155
        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
        # TESTFN_UNICODE is a filename that can be encoded using the
        # file system encoding, but *not* with the default (ascii) encoding
156 157 158
        if isinstance('', unicode):
            # python -U
            # XXX perhaps unicode() should accept Unicode strings?
159
            TESTFN_UNICODE = "@test-\xe0\xf2"
160
        else:
161 162 163 164
            # 2 latin characters.
            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
        TESTFN_ENCODING = sys.getfilesystemencoding()
        # TESTFN_UNICODE_UNENCODEABLE is a filename that should *not* be
165
        # able to be encoded by *either* the default or filesystem encoding.
166
        # This test really only makes sense on Windows NT platforms
167
        # which have special Unicode support in posixmodule.
168 169
        if (not hasattr(sys, "getwindowsversion") or
                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
Tim Peters's avatar
Tim Peters committed
170
            TESTFN_UNICODE_UNENCODEABLE = None
171
        else:
172
            # Japanese characters (I think - from bug 846133)
173
            TESTFN_UNICODE_UNENCODEABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
174 175
            try:
                # XXX - Note - should be using TESTFN_ENCODING here - but for
176
                # Windows, "mbcs" currently always operates as if in
177 178 179 180 181 182 183 184 185 186 187
                # errors=ignore' mode - hence we get '?' characters rather than
                # the exception.  'Latin1' operates as we expect - ie, fails.
                # See [ 850997 ] mbcs encoding ignores errors
                TESTFN_UNICODE_UNENCODEABLE.encode("Latin1")
            except UnicodeEncodeError:
                pass
            else:
                print \
                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
                'Unicode filename tests may not be effective' \
                % TESTFN_UNICODE_UNENCODEABLE
188 189 190 191 192 193 194 195 196 197 198 199

# Make sure we can write to TESTFN, try in /tmp if we can't
fp = None
try:
    fp = open(TESTFN, 'w+')
except IOError:
    TMP_TESTFN = os.path.join('/tmp', TESTFN)
    try:
        fp = open(TMP_TESTFN, 'w+')
        TESTFN = TMP_TESTFN
        del TMP_TESTFN
    except IOError:
Tim Peters's avatar
Tim Peters committed
200
        print ('WARNING: tests will fail, unable to write to: %s or %s' %
201 202 203
                (TESTFN, TMP_TESTFN))
if fp is not None:
    fp.close()
204
    unlink(TESTFN)
205
del os, fp
206

207
def findfile(file, here=__file__):
208 209 210
    """Try to find a file on sys.path and the working directory.  If it is not
    found the argument passed to the function is returned (this does not
    necessarily signal failure; could still be the legitimate path)."""
211 212 213 214 215 216 217 218 219
    import os
    if os.path.isabs(file):
        return file
    path = sys.path
    path = [os.path.dirname(here)] + path
    for dn in path:
        fn = os.path.join(dn, file)
        if os.path.exists(fn): return fn
    return file
220 221

def verify(condition, reason='test failed'):
222
    """Verify that condition is true. If not, raise TestFailed.
223

Skip Montanaro's avatar
Skip Montanaro committed
224
       The optional argument reason can be given to provide
225
       a better error text.
226
    """
227

228
    if not condition:
229
        raise TestFailed(reason)
230

231
def vereq(a, b):
232 233 234 235 236 237 238 239 240 241
    """Raise TestFailed if a == b is false.

    This is better than verify(a == b) because, in case of failure, the
    error message incorporates repr(a) and repr(b) so you can see the
    inputs.

    Note that "not (a == b)" isn't necessarily the same as "a != b"; the
    former is tested.
    """

242 243 244
    if not (a == b):
        raise TestFailed, "%r == %r" % (a, b)

245 246 247 248 249 250 251 252
def sortdict(dict):
    "Like repr(dict), but in sorted order."
    items = dict.items()
    items.sort()
    reprpairs = ["%r: %r" % pair for pair in items]
    withcommas = ", ".join(reprpairs)
    return "{%s}" % withcommas

253
def check_syntax_error(testcase, statement):
254
    try:
255
        compile(statement, '<test string>', 'exec')
256 257 258
    except SyntaxError:
        pass
    else:
259
        testcase.fail('Missing SyntaxError: "%s"' % statement)
260

261 262 263
def open_urlresource(url):
    import urllib, urlparse
    import os.path
264

265 266 267 268 269 270 271 272 273 274 275
    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!

    for path in [os.path.curdir, os.path.pardir]:
        fn = os.path.join(path, filename)
        if os.path.exists(fn):
            return open(fn)

    requires('urlfetch')
    print >> get_original_stdout(), '\tfetching %s ...' % url
    fn, _ = urllib.urlretrieve(url, filename)
    return open(fn)
Tim Peters's avatar
Tim Peters committed
276

277
@contextlib.contextmanager
278 279 280 281 282 283 284
def guard_warnings_filter():
    """Guard the warnings filter from being permanently changed."""
    original_filters = warnings.filters[:]
    try:
        yield
    finally:
        warnings.filters = original_filters
285

286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
class WarningMessage(object):
    "Holds the result of the latest showwarning() call"
    def __init__(self):
        self.message = None
        self.category = None
        self.filename = None
        self.lineno = None

    def _showwarning(self, message, category, filename, lineno, file=None):
        self.message = message
        self.category = category
        self.filename = filename
        self.lineno = lineno

@contextlib.contextmanager
def catch_warning():
    """
    Guard the warnings filter from being permanently changed and record the
    data of the last warning that has been issued.

    Use like this:

        with catch_warning as w:
            warnings.warn("foo")
            assert str(w.message) == "foo"
    """
    warning = WarningMessage()
    original_filters = warnings.filters[:]
    original_showwarning = warnings.showwarning
    warnings.showwarning = warning._showwarning
    try:
        yield warning
    finally:
        warnings.showwarning = original_showwarning
        warnings.filters = original_filters

322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
class EnvironmentVarGuard(object):

    """Class to help protect the environment variable properly.  Can be used as
    a context manager."""

    def __init__(self):
        from os import environ
        self._environ = environ
        self._unset = set()
        self._reset = dict()

    def set(self, envvar, value):
        if envvar not in self._environ:
            self._unset.add(envvar)
        else:
            self._reset[envvar] = self._environ[envvar]
        self._environ[envvar] = value

    def unset(self, envvar):
        if envvar in self._environ:
            self._reset[envvar] = self._environ[envvar]
            del self._environ[envvar]

    def __enter__(self):
        return self

    def __exit__(self, *ignore_exc):
        for envvar, value in self._reset.iteritems():
            self._environ[envvar] = value
        for unset in self._unset:
            del self._environ[unset]

354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378
class TransientResource(object):

    """Raise ResourceDenied if an exception is raised while the context manager
    is in effect that matches the specified exception and attributes."""

    def __init__(self, exc, **kwargs):
        self.exc = exc
        self.attrs = kwargs

    def __enter__(self):
        return self

    def __exit__(self, type_=None, value=None, traceback=None):
        """If type_ is a subclass of self.exc and value has attributes matching
        self.attrs, raise ResourceDenied.  Otherwise let the exception
        propagate (if any)."""
        if type_ is not None and issubclass(self.exc, type_):
            for attr, attr_value in self.attrs.iteritems():
                if not hasattr(value, attr):
                    break
                if getattr(value, attr) != attr_value:
                    break
            else:
                raise ResourceDenied("an optional resource is not available")

379

380 381 382 383 384 385
def transient_internet():
    """Return a context manager that raises ResourceDenied when various issues
    with the Internet connection manifest themselves as exceptions."""
    time_out = TransientResource(IOError, errno=errno.ETIMEDOUT)
    socket_peer_reset = TransientResource(socket.error, errno=errno.ECONNRESET)
    ioerror_peer_reset = TransientResource(IOError, errno=errno.ECONNRESET)
386
    return contextlib.nested(time_out, socket_peer_reset, ioerror_peer_reset)
387 388


389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424
#=======================================================================
# Decorator for running a function in a different locale, correctly resetting
# it afterwards.

def run_with_locale(catstr, *locales):
    def decorator(func):
        def inner(*args, **kwds):
            try:
                import locale
                category = getattr(locale, catstr)
                orig_locale = locale.setlocale(category)
            except AttributeError:
                # if the test author gives us an invalid category string
                raise
            except:
                # cannot retrieve original locale, so do nothing
                locale = orig_locale = None
            else:
                for loc in locales:
                    try:
                        locale.setlocale(category, loc)
                        break
                    except:
                        pass

            # now run the function, resetting the locale on exceptions
            try:
                return func(*args, **kwds)
            finally:
                if locale and orig_locale:
                    locale.setlocale(category, orig_locale)
        inner.func_name = func.func_name
        inner.__doc__ = func.__doc__
        return inner
    return decorator

425 426 427 428 429 430 431 432 433
#=======================================================================
# Big-memory-test support. Separate from 'resources' because memory use should be configurable.

# Some handy shorthands. Note that these are used for byte-limits as well
# as size-limits, in the various bigmem tests
_1M = 1024*1024
_1G = 1024 * _1M
_2G = 2 * _1G

434 435 436 437 438 439
# Hack to get at the maximum value an internal index can take.
class _Dummy:
    def __getslice__(self, i, j):
        return j
MAX_Py_ssize_t = _Dummy()[:]

440 441 442 443 444 445 446 447 448 449 450 451 452 453
def set_memlimit(limit):
    import re
    global max_memuse
    sizes = {
        'k': 1024,
        'm': _1M,
        'g': _1G,
        't': 1024*_1G,
    }
    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
                 re.IGNORECASE | re.VERBOSE)
    if m is None:
        raise ValueError('Invalid memory limit %r' % (limit,))
    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
454 455 456
    if memlimit > MAX_Py_ssize_t:
        memlimit = MAX_Py_ssize_t
    if memlimit < _2G - 1:
457 458 459 460 461 462 463 464 465 466
        raise ValueError('Memory limit %r too low to be useful' % (limit,))
    max_memuse = memlimit

def bigmemtest(minsize, memuse, overhead=5*_1M):
    """Decorator for bigmem tests.

    'minsize' is the minimum useful size for the test (in arbitrary,
    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
    independant of the testsize, and defaults to 5Mb.
Tim Peters's avatar
Tim Peters committed
467

468 469 470 471 472 473 474 475 476 477 478 479
    The decorator tries to guess a good value for 'size' and passes it to
    the decorated test function. If minsize * memuse is more than the
    allowed memory use (as defined by max_memuse), the test is skipped.
    Otherwise, minsize is adjusted upward to use up to max_memuse.
    """
    def decorator(f):
        def wrapper(self):
            if not max_memuse:
                # If max_memuse is 0 (the default),
                # we still want to run the tests with size set to a few kb,
                # to make sure they work. We still want to avoid using
                # too much memory, though, but we do that noisily.
480
                maxsize = 5147
481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
                self.failIf(maxsize * memuse + overhead > 20 * _1M)
            else:
                maxsize = int((max_memuse - overhead) / memuse)
                if maxsize < minsize:
                    # Really ought to print 'test skipped' or something
                    if verbose:
                        sys.stderr.write("Skipping %s because of memory "
                                         "constraint\n" % (f.__name__,))
                    return
                # Try to keep some breathing room in memory use
                maxsize = max(maxsize - 50 * _1M, minsize)
            return f(self, maxsize)
        wrapper.minsize = minsize
        wrapper.memuse = memuse
        wrapper.overhead = overhead
        return wrapper
    return decorator

499 500 501 502 503 504 505 506 507 508 509
def bigaddrspacetest(f):
    """Decorator for tests that fill the address space."""
    def wrapper(self):
        if max_memuse < MAX_Py_ssize_t:
            if verbose:
                sys.stderr.write("Skipping %s because of memory "
                                 "constraint\n" % (f.__name__,))
        else:
            return f(self)
    return wrapper

510 511 512 513 514 515
#=======================================================================
# Preliminary PyUNIT integration.

import unittest


516
class BasicTestRunner:
517
    def run(self, test):
518
        result = unittest.TestResult()
519 520 521 522
        test(result)
        return result


523
def _run_suite(suite):
524
    """Run tests from a unittest.TestSuite-derived class."""
525
    if verbose:
526
        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
527
    else:
528
        runner = BasicTestRunner()
529

530 531
    result = runner.run(suite)
    if not result.wasSuccessful():
532 533 534 535 536
        if len(result.errors) == 1 and not result.failures:
            err = result.errors[0][1]
        elif len(result.failures) == 1 and not result.errors:
            err = result.failures[0][1]
        else:
537
            msg = "errors occurred; run in verbose mode for details"
538
            raise TestFailed(msg)
539
        raise TestFailed(err)
540

541

542 543
def run_unittest(*classes):
    """Run tests from unittest.TestCase-derived classes."""
544
    valid_types = (unittest.TestSuite, unittest.TestCase)
545
    suite = unittest.TestSuite()
546
    for cls in classes:
547 548 549 550 551 552
        if isinstance(cls, str):
            if cls in sys.modules:
                suite.addTest(unittest.findTestCases(sys.modules[cls]))
            else:
                raise ValueError("str arguments must be keys in sys.modules")
        elif isinstance(cls, valid_types):
553 554 555
            suite.addTest(cls)
        else:
            suite.addTest(unittest.makeSuite(cls))
556
    _run_suite(suite)
557

558

559 560 561 562
#=======================================================================
# doctest driver.

def run_doctest(module, verbosity=None):
563
    """Run doctest on the given module.  Return (#failures, #tests).
564 565

    If optional argument verbosity is not specified (or is None), pass
Tim Peters's avatar
Tim Peters committed
566 567
    test_support's belief about verbosity on to doctest.  Else doctest's
    usual behavior is used (it searches sys.argv for -v).
568 569 570 571 572 573 574 575 576
    """

    import doctest

    if verbosity is None:
        verbosity = verbose
    else:
        verbosity = None

577 578 579
    # Direct doctest output (normally just errors) to real stdout; doctest
    # output shouldn't be compared by regrtest.
    save_stdout = sys.stdout
580
    sys.stdout = get_original_stdout()
581 582 583 584 585 586
    try:
        f, t = doctest.testmod(module, verbose=verbosity)
        if f:
            raise TestFailed("%d of %d doctests failed" % (f, t))
    finally:
        sys.stdout = save_stdout
587
    if verbose:
588
        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
589
    return f, t
590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611

#=======================================================================
# Threading support to prevent reporting refleaks when running regrtest.py -R

def threading_setup():
    import threading
    return len(threading._active), len(threading._limbo)

def threading_cleanup(num_active, num_limbo):
    import threading
    import time

    _MAX_COUNT = 10
    count = 0
    while len(threading._active) != num_active and count < _MAX_COUNT:
        count += 1
        time.sleep(0.1)

    count = 0
    while len(threading._limbo) != num_limbo and count < _MAX_COUNT:
        count += 1
        time.sleep(0.1)
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632

def reap_children():
    """Use this function at the end of test_main() whenever sub-processes
    are started.  This will help ensure that no extra children (zombies)
    stick around to hog resources and create problems when looking
    for refleaks.
    """

    # Reap all our dead child processes so we don't leave zombies around.
    # These hog resources and might be causing some of the buildbots to die.
    import os
    if hasattr(os, 'waitpid'):
        any_process = -1
        while True:
            try:
                # This will raise an exception on Windows.  That's ok.
                pid, status = os.waitpid(any_process, os.WNOHANG)
                if pid == 0:
                    break
            except:
                break