Commit 32e23e73 authored by Serhiy Storchaka's avatar Serhiy Storchaka

Issue #18702: All skipped tests now reported as skipped.

parent 68f518ce
......@@ -9,6 +9,7 @@ from test import test_support
from weakref import proxy
import array, cStringIO
from cPickle import loads, dumps, HIGHEST_PROTOCOL
import sys
class ArraySubclass(array.array):
pass
......@@ -772,15 +773,15 @@ class BaseTest(unittest.TestCase):
s = None
self.assertRaises(ReferenceError, len, p)
@unittest.skipUnless(hasattr(sys, 'getrefcount'),
'test needs sys.getrefcount()')
def test_bug_782369(self):
import sys
if hasattr(sys, "getrefcount"):
for i in range(10):
b = array.array('B', range(64))
rc = sys.getrefcount(10)
for i in range(10):
b = array.array('B', range(64))
self.assertEqual(rc, sys.getrefcount(10))
for i in range(10):
b = array.array('B', range(64))
rc = sys.getrefcount(10)
for i in range(10):
b = array.array('B', range(64))
self.assertEqual(rc, sys.getrefcount(10))
def test_subclass_with_kwargs(self):
# SF bug #1486663 -- this used to erroneously raise a TypeError
......
......@@ -31,11 +31,10 @@ class CompileallTests(unittest.TestCase):
compare = struct.pack('<4sl', imp.get_magic(), mtime)
return data, compare
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def recreation_check(self, metadata):
"""Check that compileall recreates bytecode when the new metadata is
used."""
if not hasattr(os, 'stat'):
return
py_compile.compile(self.source_path)
self.assertEqual(*self.data())
with open(self.bc_path, 'rb') as file:
......
......@@ -1014,78 +1014,77 @@ Stonecutters Seafood and Chop House+ Lemont+ IL+ 12/19/02+ Week Back
dialect = sniffer.sniff(self.sample9)
self.assertTrue(dialect.doublequote)
if not hasattr(sys, "gettotalrefcount"):
if test_support.verbose: print "*** skipping leakage tests ***"
else:
class NUL:
def write(s, *args):
pass
writelines = write
class TestLeaks(unittest.TestCase):
def test_create_read(self):
delta = 0
lastrc = sys.gettotalrefcount()
for i in xrange(20):
gc.collect()
self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount()
csv.reader(["a,b,c\r\n"])
csv.reader(["a,b,c\r\n"])
csv.reader(["a,b,c\r\n"])
delta = rc-lastrc
lastrc = rc
# if csv.reader() leaks, last delta should be 3 or more
self.assertEqual(delta < 3, True)
def test_create_write(self):
delta = 0
lastrc = sys.gettotalrefcount()
s = NUL()
for i in xrange(20):
gc.collect()
self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount()
csv.writer(s)
csv.writer(s)
csv.writer(s)
delta = rc-lastrc
lastrc = rc
# if csv.writer() leaks, last delta should be 3 or more
self.assertEqual(delta < 3, True)
def test_read(self):
delta = 0
rows = ["a,b,c\r\n"]*5
lastrc = sys.gettotalrefcount()
for i in xrange(20):
gc.collect()
self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount()
rdr = csv.reader(rows)
for row in rdr:
pass
delta = rc-lastrc
lastrc = rc
# if reader leaks during read, delta should be 5 or more
self.assertEqual(delta < 5, True)
def test_write(self):
delta = 0
rows = [[1,2,3]]*5
s = NUL()
lastrc = sys.gettotalrefcount()
for i in xrange(20):
gc.collect()
self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount()
writer = csv.writer(s)
for row in rows:
writer.writerow(row)
delta = rc-lastrc
lastrc = rc
# if writer leaks during write, last delta should be 5 or more
self.assertEqual(delta < 5, True)
class NUL:
def write(s, *args):
pass
writelines = write
@unittest.skipUnless(hasattr(sys, "gettotalrefcount"),
'requires sys.gettotalrefcount()')
class TestLeaks(unittest.TestCase):
def test_create_read(self):
delta = 0
lastrc = sys.gettotalrefcount()
for i in xrange(20):
gc.collect()
self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount()
csv.reader(["a,b,c\r\n"])
csv.reader(["a,b,c\r\n"])
csv.reader(["a,b,c\r\n"])
delta = rc-lastrc
lastrc = rc
# if csv.reader() leaks, last delta should be 3 or more
self.assertEqual(delta < 3, True)
def test_create_write(self):
delta = 0
lastrc = sys.gettotalrefcount()
s = NUL()
for i in xrange(20):
gc.collect()
self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount()
csv.writer(s)
csv.writer(s)
csv.writer(s)
delta = rc-lastrc
lastrc = rc
# if csv.writer() leaks, last delta should be 3 or more
self.assertEqual(delta < 3, True)
def test_read(self):
delta = 0
rows = ["a,b,c\r\n"]*5
lastrc = sys.gettotalrefcount()
for i in xrange(20):
gc.collect()
self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount()
rdr = csv.reader(rows)
for row in rdr:
pass
delta = rc-lastrc
lastrc = rc
# if reader leaks during read, delta should be 5 or more
self.assertEqual(delta < 5, True)
def test_write(self):
delta = 0
rows = [[1,2,3]]*5
s = NUL()
lastrc = sys.gettotalrefcount()
for i in xrange(20):
gc.collect()
self.assertEqual(gc.garbage, [])
rc = sys.gettotalrefcount()
writer = csv.writer(s)
for row in rows:
writer.writerow(row)
delta = rc-lastrc
lastrc = rc
# if writer leaks during write, last delta should be 5 or more
self.assertEqual(delta < 5, True)
# commented out for now - csv module doesn't yet support Unicode
## class TestUnicode(unittest.TestCase):
......
......@@ -188,11 +188,10 @@ class TestReversed(unittest.TestCase):
self.assertRaises(TypeError, reversed)
self.assertRaises(TypeError, reversed, [], 'extra')
@unittest.skipUnless(hasattr(sys, 'getrefcount'), 'test needs sys.getrefcount()')
def test_bug1229429(self):
# this bug was never in reversed, it was in
# PyObject_CallMethod, and reversed_new calls that sometimes.
if not hasattr(sys, "getrefcount"):
return
def f():
pass
r = f.__reversed__ = object()
......
......@@ -15,7 +15,7 @@ try:
except ImportError:
ssl = None
from unittest import TestCase
from unittest import TestCase, SkipTest, skipUnless
from test import test_support
from test.test_support import HOST, HOSTv6
threading = test_support.import_module('threading')
......@@ -579,8 +579,16 @@ class TestFTPClass(TestCase):
self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
@skipUnless(socket.has_ipv6, "IPv6 not enabled")
class TestIPv6Environment(TestCase):
@classmethod
def setUpClass(cls):
try:
DummyFTPServer((HOST, 0), af=socket.AF_INET6)
except socket.error:
raise SkipTest("IPv6 not enabled")
def setUp(self):
self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
self.server.start()
......@@ -615,6 +623,7 @@ class TestIPv6Environment(TestCase):
retr()
@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClassMixin(TestFTPClass):
"""Repeat TestFTPClass tests starting the TLS layer for both control
and data connections first.
......@@ -630,6 +639,7 @@ class TestTLS_FTPClassMixin(TestFTPClass):
self.client.prot_p()
@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClass(TestCase):
"""Specific TLS_FTP class tests."""
......@@ -783,17 +793,9 @@ class TestTimeouts(TestCase):
def test_main():
tests = [TestFTPClass, TestTimeouts]
if socket.has_ipv6:
try:
DummyFTPServer((HOST, 0), af=socket.AF_INET6)
except socket.error:
pass
else:
tests.append(TestIPv6Environment)
if ssl is not None:
tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass])
tests = [TestFTPClass, TestTimeouts,
TestIPv6Environment,
TestTLS_FTPClassMixin, TestTLS_FTPClass]
thread_info = test_support.threading_setup()
try:
......
......@@ -772,10 +772,10 @@ class TestMaildir(TestMailbox, unittest.TestCase):
for msg in self._box:
pass
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_file_permissions(self):
# Verify that message files are created without execute permissions
if not hasattr(os, "stat") or not hasattr(os, "umask"):
return
msg = mailbox.MaildirMessage(self._template % 0)
orig_umask = os.umask(0)
try:
......@@ -786,12 +786,11 @@ class TestMaildir(TestMailbox, unittest.TestCase):
mode = os.stat(path).st_mode
self.assertEqual(mode & 0111, 0)
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_folder_file_perms(self):
# From bug #3228, we want to verify that the file created inside a Maildir
# subfolder isn't marked as executable.
if not hasattr(os, "stat") or not hasattr(os, "umask"):
return
orig_umask = os.umask(0)
try:
subfolder = self._box.add_folder('subfolder')
......@@ -991,24 +990,25 @@ class TestMbox(_TestMboxMMDF, unittest.TestCase):
_factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_file_perms(self):
# From bug #3228, we want to verify that the mailbox file isn't executable,
# even if the umask is set to something that would leave executable bits set.
# We only run this test on platforms that support umask.
if hasattr(os, 'umask') and hasattr(os, 'stat'):
try:
old_umask = os.umask(0077)
self._box.close()
os.unlink(self._path)
self._box = mailbox.mbox(self._path, create=True)
self._box.add('')
self._box.close()
finally:
os.umask(old_umask)
try:
old_umask = os.umask(0077)
self._box.close()
os.unlink(self._path)
self._box = mailbox.mbox(self._path, create=True)
self._box.add('')
self._box.close()
finally:
os.umask(old_umask)
st = os.stat(self._path)
perms = st.st_mode
self.assertFalse((perms & 0111)) # Execute bits should all be off.
st = os.stat(self._path)
perms = st.st_mode
self.assertFalse((perms & 0111)) # Execute bits should all be off.
def test_terminating_newline(self):
message = email.message.Message()
......
......@@ -906,38 +906,37 @@ class MathTests(unittest.TestCase):
# still fails this part of the test on some platforms. For now, we only
# *run* test_exceptions() in verbose mode, so that this isn't normally
# tested.
@unittest.skipUnless(verbose, 'requires verbose mode')
def test_exceptions(self):
try:
x = math.exp(-1000000000)
except:
# mathmodule.c is failing to weed out underflows from libm, or
# we've got an fp format with huge dynamic range
self.fail("underflowing exp() should not have raised "
"an exception")
if x != 0:
self.fail("underflowing exp() should have returned 0")
# If this fails, probably using a strict IEEE-754 conforming libm, and x
# is +Inf afterwards. But Python wants overflows detected by default.
try:
x = math.exp(1000000000)
except OverflowError:
pass
else:
self.fail("overflowing exp() didn't trigger OverflowError")
if verbose:
def test_exceptions(self):
try:
x = math.exp(-1000000000)
except:
# mathmodule.c is failing to weed out underflows from libm, or
# we've got an fp format with huge dynamic range
self.fail("underflowing exp() should not have raised "
"an exception")
if x != 0:
self.fail("underflowing exp() should have returned 0")
# If this fails, probably using a strict IEEE-754 conforming libm, and x
# is +Inf afterwards. But Python wants overflows detected by default.
try:
x = math.exp(1000000000)
except OverflowError:
pass
else:
self.fail("overflowing exp() didn't trigger OverflowError")
# If this fails, it could be a puzzle. One odd possibility is that
# mathmodule.c's macros are getting confused while comparing
# Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE
# as a result (and so raising OverflowError instead).
try:
x = math.sqrt(-1.0)
except ValueError:
pass
else:
self.fail("sqrt(-1) didn't raise ValueError")
# If this fails, it could be a puzzle. One odd possibility is that
# mathmodule.c's macros are getting confused while comparing
# Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE
# as a result (and so raising OverflowError instead).
try:
x = math.sqrt(-1.0)
except ValueError:
pass
else:
self.fail("sqrt(-1) didn't raise ValueError")
@requires_IEEE_754
def test_testfile(self):
......
......@@ -320,26 +320,25 @@ class MmapTests(unittest.TestCase):
mf.close()
f.close()
@unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_entire_file(self):
# test mapping of entire file by passing 0 for map length
if hasattr(os, "stat"):
f = open(TESTFN, "w+")
f = open(TESTFN, "w+")
f.write(2**16 * 'm') # Arbitrary character
f.close()
f.write(2**16 * 'm') # Arbitrary character
f.close()
f = open(TESTFN, "rb+")
mf = mmap.mmap(f.fileno(), 0)
self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
self.assertEqual(mf.read(2**16), 2**16 * "m")
mf.close()
f.close()
f = open(TESTFN, "rb+")
mf = mmap.mmap(f.fileno(), 0)
self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
self.assertEqual(mf.read(2**16), 2**16 * "m")
mf.close()
f.close()
@unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_length_0_offset(self):
# Issue #10916: test mapping of remainder of file by passing 0 for
# map length with an offset doesn't cause a segfault.
if not hasattr(os, "stat"):
self.skipTest("needs os.stat")
# NOTE: allocation granularity is currently 65536 under Win64,
# and therefore the minimum offset alignment.
with open(TESTFN, "wb") as f:
......@@ -352,12 +351,10 @@ class MmapTests(unittest.TestCase):
finally:
mf.close()
@unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_length_0_large_offset(self):
# Issue #10959: test mapping of a file by passing 0 for
# map length with a large offset doesn't cause a segfault.
if not hasattr(os, "stat"):
self.skipTest("needs os.stat")
with open(TESTFN, "wb") as f:
f.write(115699 * b'm') # Arbitrary character
......@@ -538,9 +535,8 @@ class MmapTests(unittest.TestCase):
return mmap.mmap.__new__(klass, -1, *args, **kwargs)
anon_mmap(PAGESIZE)
@unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
def test_prot_readonly(self):
if not hasattr(mmap, 'PROT_READ'):
return
mapsize = 10
open(TESTFN, "wb").write("a"*mapsize)
f = open(TESTFN, "rb")
......@@ -584,66 +580,68 @@ class MmapTests(unittest.TestCase):
m.seek(8)
self.assertRaises(ValueError, m.write, "bar")
if os.name == 'nt':
def test_tagname(self):
data1 = "0123456789"
data2 = "abcdefghij"
assert len(data1) == len(data2)
# Test same tag
m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="foo")
m2[:] = data2
self.assertEqual(m1[:], data2)
self.assertEqual(m2[:], data2)
m2.close()
m1.close()
# Test different tag
m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="boo")
m2[:] = data2
self.assertEqual(m1[:], data1)
self.assertEqual(m2[:], data2)
m2.close()
m1.close()
def test_crasher_on_windows(self):
# Should not crash (Issue 1733986)
m = mmap.mmap(-1, 1000, tagname="foo")
try:
mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
except:
pass
m.close()
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_tagname(self):
data1 = "0123456789"
data2 = "abcdefghij"
assert len(data1) == len(data2)
# Test same tag
m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="foo")
m2[:] = data2
self.assertEqual(m1[:], data2)
self.assertEqual(m2[:], data2)
m2.close()
m1.close()
# Test different tag
m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="boo")
m2[:] = data2
self.assertEqual(m1[:], data1)
self.assertEqual(m2[:], data2)
m2.close()
m1.close()
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_crasher_on_windows(self):
# Should not crash (Issue 1733986)
m = mmap.mmap(-1, 1000, tagname="foo")
try:
mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
except:
pass
m.close()
# Should not crash (Issue 5385)
open(TESTFN, "wb").write("x"*10)
f = open(TESTFN, "r+b")
m = mmap.mmap(f.fileno(), 0)
f.close()
try:
m.resize(0) # will raise WindowsError
except:
pass
try:
m[:]
except:
pass
m.close()
# Should not crash (Issue 5385)
open(TESTFN, "wb").write("x"*10)
f = open(TESTFN, "r+b")
m = mmap.mmap(f.fileno(), 0)
f.close()
try:
m.resize(0) # will raise WindowsError
except:
pass
try:
m[:]
except:
pass
m.close()
def test_invalid_descriptor(self):
# socket file descriptors are valid, but out of range
# for _get_osfhandle, causing a crash when validating the
# parameters to _get_osfhandle.
s = socket.socket()
try:
with self.assertRaises(mmap.error):
m = mmap.mmap(s.fileno(), 10)
finally:
s.close()
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_invalid_descriptor(self):
# socket file descriptors are valid, but out of range
# for _get_osfhandle, causing a crash when validating the
# parameters to _get_osfhandle.
s = socket.socket()
try:
with self.assertRaises(mmap.error):
m = mmap.mmap(s.fileno(), 10)
finally:
s.close()
class LargeMmapTests(unittest.TestCase):
......
This diff is collapsed.
......@@ -11,7 +11,7 @@ import os
import time
import errno
from unittest import TestCase
from unittest import TestCase, skipUnless
from test import test_support
from test.test_support import HOST
threading = test_support.import_module('threading')
......@@ -263,17 +263,20 @@ if hasattr(poplib, 'POP3_SSL'):
else:
DummyPOP3Handler.handle_read(self)
class TestPOP3_SSLClass(TestPOP3Class):
# repeat previous tests by using poplib.POP3_SSL
requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
def setUp(self):
self.server = DummyPOP3Server((HOST, 0))
self.server.handler = DummyPOP3_SSLHandler
self.server.start()
self.client = poplib.POP3_SSL(self.server.host, self.server.port)
@requires_ssl
class TestPOP3_SSLClass(TestPOP3Class):
# repeat previous tests by using poplib.POP3_SSL
def test__all__(self):
self.assertIn('POP3_SSL', poplib.__all__)
def setUp(self):
self.server = DummyPOP3Server((HOST, 0))
self.server.handler = DummyPOP3_SSLHandler
self.server.start()
self.client = poplib.POP3_SSL(self.server.host, self.server.port)
def test__all__(self):
self.assertIn('POP3_SSL', poplib.__all__)
class TestTimeouts(TestCase):
......@@ -331,9 +334,8 @@ class TestTimeouts(TestCase):
def test_main():
tests = [TestPOP3Class, TestTimeouts]
if SUPPORTS_SSL:
tests.append(TestPOP3_SSLClass)
tests = [TestPOP3Class, TestTimeouts,
TestPOP3_SSLClass]
thread_info = test_support.threading_setup()
try:
test_support.run_unittest(*tests)
......
This diff is collapsed.
......@@ -561,10 +561,10 @@ class TestSet(TestJointOps):
s = None
self.assertRaises(ReferenceError, str, p)
# C API test only available in a debug build
if hasattr(set, "test_c_api"):
def test_c_api(self):
self.assertEqual(set().test_c_api(), True)
@unittest.skipUnless(hasattr(set, "test_c_api"),
'C API test only available in a debug build')
def test_c_api(self):
self.assertEqual(set().test_c_api(), True)
class SetSubclass(set):
pass
......
......@@ -78,33 +78,34 @@ class TestShutil(unittest.TestCase):
filename = tempfile.mktemp()
self.assertRaises(OSError, shutil.rmtree, filename)
# See bug #1071513 for why we don't run this on cygwin
# and bug #1076467 for why we don't run this as root.
if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin'
and not (hasattr(os, 'geteuid') and os.geteuid() == 0)):
def test_on_error(self):
self.errorState = 0
os.mkdir(TESTFN)
self.childpath = os.path.join(TESTFN, 'a')
f = open(self.childpath, 'w')
f.close()
old_dir_mode = os.stat(TESTFN).st_mode
old_child_mode = os.stat(self.childpath).st_mode
# Make unwritable.
os.chmod(self.childpath, stat.S_IREAD)
os.chmod(TESTFN, stat.S_IREAD)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called.
self.assertEqual(self.errorState, 2,
"Expected call to onerror function did not happen.")
# Make writable again.
os.chmod(TESTFN, old_dir_mode)
os.chmod(self.childpath, old_child_mode)
# Clean up.
shutil.rmtree(TESTFN)
@unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
"This test can't be run reliably as root (issue #1076467).")
def test_on_error(self):
self.errorState = 0
os.mkdir(TESTFN)
self.childpath = os.path.join(TESTFN, 'a')
f = open(self.childpath, 'w')
f.close()
old_dir_mode = os.stat(TESTFN).st_mode
old_child_mode = os.stat(self.childpath).st_mode
# Make unwritable.
os.chmod(self.childpath, stat.S_IREAD)
os.chmod(TESTFN, stat.S_IREAD)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called.
self.assertEqual(self.errorState, 2,
"Expected call to onerror function did not happen.")
# Make writable again.
os.chmod(TESTFN, old_dir_mode)
os.chmod(self.childpath, old_child_mode)
# Clean up.
shutil.rmtree(TESTFN)
def check_args_to_onerror(self, func, arg, exc):
# test_rmtree_errors deliberately runs rmtree
......@@ -308,37 +309,38 @@ class TestShutil(unittest.TestCase):
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
if hasattr(os, "mkfifo"):
# Issue #3002: copyfile and copytree block indefinitely on named pipes
def test_copyfile_named_pipe(self):
os.mkfifo(TESTFN)
try:
self.assertRaises(shutil.SpecialFileError,
shutil.copyfile, TESTFN, TESTFN2)
self.assertRaises(shutil.SpecialFileError,
shutil.copyfile, __file__, TESTFN)
finally:
os.remove(TESTFN)
# Issue #3002: copyfile and copytree block indefinitely on named pipes
@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
def test_copyfile_named_pipe(self):
os.mkfifo(TESTFN)
try:
self.assertRaises(shutil.SpecialFileError,
shutil.copyfile, TESTFN, TESTFN2)
self.assertRaises(shutil.SpecialFileError,
shutil.copyfile, __file__, TESTFN)
finally:
os.remove(TESTFN)
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try:
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
shutil.copytree(TESTFN, TESTFN2)
except shutil.Error as e:
errors = e.args[0]
self.assertEqual(len(errors), 1)
src, dst, error_msg = errors[0]
self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
@unittest.skipUnless(hasattr(os, 'chflags') and
hasattr(errno, 'EOPNOTSUPP') and
......
......@@ -343,16 +343,17 @@ class GeneralModuleTests(unittest.TestCase):
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
@unittest.skipUnless(hasattr(sys, 'getrefcount'),
'test needs sys.getrefcount()')
def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo
if hasattr(sys, "getrefcount"):
try:
# On some versions, this loses a reference
orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__,0)
except TypeError:
self.assertEqual(sys.getrefcount(__name__), orig,
"socket.getnameinfo loses a reference")
try:
# On some versions, this loses a reference
orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__,0)
except TypeError:
self.assertEqual(sys.getrefcount(__name__), orig,
"socket.getnameinfo loses a reference")
def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter
......@@ -459,17 +460,17 @@ class GeneralModuleTests(unittest.TestCase):
# Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
@unittest.skipUnless(hasattr(socket, 'inet_aton'),
'test needs socket.inet_aton()')
def testIPv4_inet_aton_fourbytes(self):
if not hasattr(socket, 'inet_aton'):
return # No inet_aton, nothing to check
# Test that issue1008086 and issue767150 are fixed.
# It must return 4 bytes.
self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv4toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
from socket import inet_aton as f, inet_pton, AF_INET
g = lambda a: inet_pton(AF_INET, a)
......@@ -484,9 +485,9 @@ class GeneralModuleTests(unittest.TestCase):
self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv6toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
try:
from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6:
......@@ -503,9 +504,9 @@ class GeneralModuleTests(unittest.TestCase):
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
)
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv4(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
......@@ -518,9 +519,9 @@ class GeneralModuleTests(unittest.TestCase):
self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv6(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
try:
from socket import inet_ntop, AF_INET6, has_ipv6
if not has_ipv6:
......@@ -871,6 +872,8 @@ class TCPCloserTest(ThreadedTCPSocketTest):
self.cli.connect((HOST, self.port))
time.sleep(1.0)
@unittest.skipUnless(hasattr(socket, 'socketpair'),
'test needs socket.socketpair()')
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicSocketPairTest(SocketPairTest):
......@@ -1456,12 +1459,12 @@ class TCPTimeoutTest(SocketTCPTest):
if not ok:
self.fail("accept() returned success when we did not expect it")
@unittest.skipUnless(hasattr(signal, 'alarm'),
'test needs signal.alarm()')
def testInterruptedTimeout(self):
# XXX I don't know how to do this test on MSWindows or any other
# plaform that doesn't support signal.alarm() or os.kill(), though
# the bug should have existed on all platforms.
if not hasattr(signal, "alarm"):
return # can only test on *nix
self.serv.settimeout(5.0) # must be longer than alarm
class Alarm(Exception):
pass
......@@ -1521,6 +1524,7 @@ class TestExceptions(unittest.TestCase):
self.assertTrue(issubclass(socket.gaierror, socket.error))
self.assertTrue(issubclass(socket.timeout, socket.error))
@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
class TestLinuxAbstractNamespace(unittest.TestCase):
UNIX_PATH_MAX = 108
......@@ -1635,11 +1639,11 @@ def isTipcAvailable():
for line in f:
if line.startswith("tipc "):
return True
if test_support.verbose:
print "TIPC module is not loaded, please 'sudo modprobe tipc'"
return False
class TIPCTest (unittest.TestCase):
@unittest.skipUnless(isTipcAvailable(),
"TIPC module is not loaded, please 'sudo modprobe tipc'")
class TIPCTest(unittest.TestCase):
def testRDM(self):
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
......@@ -1659,7 +1663,9 @@ class TIPCTest (unittest.TestCase):
self.assertEqual(msg, MSG)
class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
@unittest.skipUnless(isTipcAvailable(),
"TIPC module is not loaded, please 'sudo modprobe tipc'")
class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName = 'runTest'):
unittest.TestCase.__init__(self, methodName = methodName)
ThreadableTest.__init__(self)
......@@ -1712,13 +1718,9 @@ def test_main():
NetworkConnectionAttributesTest,
NetworkConnectionBehaviourTest,
])
if hasattr(socket, "socketpair"):
tests.append(BasicSocketPairTest)
if sys.platform == 'linux2':
tests.append(TestLinuxAbstractNamespace)
if isTipcAvailable():
tests.append(TIPCTest)
tests.append(TIPCThreadableTest)
tests.append(BasicSocketPairTest)
tests.append(TestLinuxAbstractNamespace)
tests.extend([TIPCTest, TIPCThreadableTest])
thread_info = test_support.threading_setup()
test_support.run_unittest(*tests)
......
......@@ -27,7 +27,10 @@ TEST_STR = "hello world\n"
HOST = test.test_support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS,
'requires Unix sockets')
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
requires_forking = unittest.skipUnless(HAVE_FORKING, 'requires forking')
def signal_alarm(n):
"""Call signal.alarm when it exists (i.e. not on Windows)."""
......@@ -188,31 +191,33 @@ class SocketServerTest(unittest.TestCase):
SocketServer.StreamRequestHandler,
self.stream_examine)
if HAVE_FORKING:
def test_ForkingTCPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingTCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
if HAVE_UNIX_SOCKETS:
def test_UnixStreamServer(self):
self.run_server(SocketServer.UnixStreamServer,
@requires_forking
def test_ForkingTCPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingTCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
def test_ThreadingUnixStreamServer(self):
self.run_server(SocketServer.ThreadingUnixStreamServer,
@requires_unix_sockets
def test_UnixStreamServer(self):
self.run_server(SocketServer.UnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
@requires_unix_sockets
def test_ThreadingUnixStreamServer(self):
self.run_server(SocketServer.ThreadingUnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
@requires_unix_sockets
@requires_forking
def test_ForkingUnixStreamServer(self):
with simple_subprocess(self):
self.run_server(ForkingUnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
if HAVE_FORKING:
def test_ForkingUnixStreamServer(self):
with simple_subprocess(self):
self.run_server(ForkingUnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
def test_UDPServer(self):
self.run_server(SocketServer.UDPServer,
SocketServer.DatagramRequestHandler,
......@@ -223,12 +228,12 @@ class SocketServerTest(unittest.TestCase):
SocketServer.DatagramRequestHandler,
self.dgram_examine)
if HAVE_FORKING:
def test_ForkingUDPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingUDPServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
@requires_forking
def test_ForkingUDPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingUDPServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
@contextlib.contextmanager
def mocked_select_module(self):
......@@ -265,22 +270,24 @@ class SocketServerTest(unittest.TestCase):
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
# if HAVE_UNIX_SOCKETS:
# def test_UnixDatagramServer(self):
# self.run_server(SocketServer.UnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
# @requires_unix_sockets
# def test_UnixDatagramServer(self):
# self.run_server(SocketServer.UnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
#
# def test_ThreadingUnixDatagramServer(self):
# self.run_server(SocketServer.ThreadingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
# @requires_unix_sockets
# def test_ThreadingUnixDatagramServer(self):
# self.run_server(SocketServer.ThreadingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
#
# if HAVE_FORKING:
# def test_ForkingUnixDatagramServer(self):
# self.run_server(SocketServer.ForkingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
# @requires_unix_sockets
# @requires_forking
# def test_ForkingUnixDatagramServer(self):
# self.run_server(SocketServer.ForkingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
@reap_threads
def test_shutdown(self):
......
......@@ -266,15 +266,16 @@ class SysModuleTest(unittest.TestCase):
# still has 5 elements
maj, min, buildno, plat, csd = sys.getwindowsversion()
@unittest.skipUnless(hasattr(sys, "setdlopenflags"),
'test needs sys.setdlopenflags()')
def test_dlopenflags(self):
if hasattr(sys, "setdlopenflags"):
self.assertTrue(hasattr(sys, "getdlopenflags"))
self.assertRaises(TypeError, sys.getdlopenflags, 42)
oldflags = sys.getdlopenflags()
self.assertRaises(TypeError, sys.setdlopenflags)
sys.setdlopenflags(oldflags+1)
self.assertEqual(sys.getdlopenflags(), oldflags+1)
sys.setdlopenflags(oldflags)
self.assertTrue(hasattr(sys, "getdlopenflags"))
self.assertRaises(TypeError, sys.getdlopenflags, 42)
oldflags = sys.getdlopenflags()
self.assertRaises(TypeError, sys.setdlopenflags)
sys.setdlopenflags(oldflags+1)
self.assertEqual(sys.getdlopenflags(), oldflags+1)
sys.setdlopenflags(oldflags)
def test_refcount(self):
# n here must be a global in order for this test to pass while
......
......@@ -259,11 +259,10 @@ class WarnTests(unittest.TestCase):
finally:
warning_tests.__file__ = filename
@unittest.skipUnless(hasattr(sys, 'argv'), 'test needs sys.argv')
def test_missing_filename_main_with_argv(self):
# If __file__ is not specified and the caller is __main__ and sys.argv
# exists, then use sys.argv[0] as the file.
if not hasattr(sys, 'argv'):
return
filename = warning_tests.__file__
module_name = warning_tests.__name__
try:
......
......@@ -12,6 +12,13 @@ except ImportError:
zlib = import_module('zlib')
requires_Compress_copy = unittest.skipUnless(
hasattr(zlib.compressobj(), "copy"),
'requires Compress.copy()')
requires_Decompress_copy = unittest.skipUnless(
hasattr(zlib.decompressobj(), "copy"),
'requires Decompress.copy()')
class ChecksumTestCase(unittest.TestCase):
# checksum test cases
......@@ -339,39 +346,39 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
"mode=%i, level=%i") % (sync, level))
del obj
@unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
'requires zlib.Z_SYNC_FLUSH')
def test_odd_flush(self):
# Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
import random
# Testing on 17K of "random" data
if hasattr(zlib, 'Z_SYNC_FLUSH'):
# Testing on 17K of "random" data
# Create compressor and decompressor objects
co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
dco = zlib.decompressobj()
# Create compressor and decompressor objects
co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
dco = zlib.decompressobj()
# Try 17K of data
# generate random data stream
# Try 17K of data
# generate random data stream
try:
# In 2.3 and later, WichmannHill is the RNG of the bug report
gen = random.WichmannHill()
except AttributeError:
try:
# In 2.3 and later, WichmannHill is the RNG of the bug report
gen = random.WichmannHill()
# 2.2 called it Random
gen = random.Random()
except AttributeError:
try:
# 2.2 called it Random
gen = random.Random()
except AttributeError:
# others might simply have a single RNG
gen = random
gen.seed(1)
data = genblock(1, 17 * 1024, generator=gen)
# compress, sync-flush, and decompress
first = co.compress(data)
second = co.flush(zlib.Z_SYNC_FLUSH)
expanded = dco.decompress(first + second)
# if decompressed data is different from the input data, choke.
self.assertEqual(expanded, data, "17K random source doesn't match")
# others might simply have a single RNG
gen = random
gen.seed(1)
data = genblock(1, 17 * 1024, generator=gen)
# compress, sync-flush, and decompress
first = co.compress(data)
second = co.flush(zlib.Z_SYNC_FLUSH)
expanded = dco.decompress(first + second)
# if decompressed data is different from the input data, choke.
self.assertEqual(expanded, data, "17K random source doesn't match")
def test_empty_flush(self):
# Test that calling .flush() on unused objects works.
......@@ -408,35 +415,36 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
data = zlib.compress(input2)
self.assertEqual(dco.flush(), input1[1:])
if hasattr(zlib.compressobj(), "copy"):
def test_compresscopy(self):
# Test copying a compression object
data0 = HAMLET_SCENE
data1 = HAMLET_SCENE.swapcase()
c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
bufs0 = []
bufs0.append(c0.compress(data0))
c1 = c0.copy()
bufs1 = bufs0[:]
bufs0.append(c0.compress(data0))
bufs0.append(c0.flush())
s0 = ''.join(bufs0)
bufs1.append(c1.compress(data1))
bufs1.append(c1.flush())
s1 = ''.join(bufs1)
self.assertEqual(zlib.decompress(s0),data0+data0)
self.assertEqual(zlib.decompress(s1),data0+data1)
def test_badcompresscopy(self):
# Test copying a compression object in an inconsistent state
c = zlib.compressobj()
c.compress(HAMLET_SCENE)
c.flush()
self.assertRaises(ValueError, c.copy)
@requires_Compress_copy
def test_compresscopy(self):
# Test copying a compression object
data0 = HAMLET_SCENE
data1 = HAMLET_SCENE.swapcase()
c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
bufs0 = []
bufs0.append(c0.compress(data0))
c1 = c0.copy()
bufs1 = bufs0[:]
bufs0.append(c0.compress(data0))
bufs0.append(c0.flush())
s0 = ''.join(bufs0)
bufs1.append(c1.compress(data1))
bufs1.append(c1.flush())
s1 = ''.join(bufs1)
self.assertEqual(zlib.decompress(s0),data0+data0)
self.assertEqual(zlib.decompress(s1),data0+data1)
@requires_Compress_copy
def test_badcompresscopy(self):
# Test copying a compression object in an inconsistent state
c = zlib.compressobj()
c.compress(HAMLET_SCENE)
c.flush()
self.assertRaises(ValueError, c.copy)
def test_decompress_unused_data(self):
# Repeated calls to decompress() after EOF should accumulate data in
......@@ -463,35 +471,36 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
self.assertEqual(dco.unconsumed_tail, b'')
self.assertEqual(dco.unused_data, remainder)
if hasattr(zlib.decompressobj(), "copy"):
def test_decompresscopy(self):
# Test copying a decompression object
data = HAMLET_SCENE
comp = zlib.compress(data)
@requires_Decompress_copy
def test_decompresscopy(self):
# Test copying a decompression object
data = HAMLET_SCENE
comp = zlib.compress(data)
d0 = zlib.decompressobj()
bufs0 = []
bufs0.append(d0.decompress(comp[:32]))
d0 = zlib.decompressobj()
bufs0 = []
bufs0.append(d0.decompress(comp[:32]))
d1 = d0.copy()
bufs1 = bufs0[:]
d1 = d0.copy()
bufs1 = bufs0[:]
bufs0.append(d0.decompress(comp[32:]))
s0 = ''.join(bufs0)
bufs0.append(d0.decompress(comp[32:]))
s0 = ''.join(bufs0)
bufs1.append(d1.decompress(comp[32:]))
s1 = ''.join(bufs1)
bufs1.append(d1.decompress(comp[32:]))
s1 = ''.join(bufs1)
self.assertEqual(s0,s1)
self.assertEqual(s0,data)
self.assertEqual(s0,s1)
self.assertEqual(s0,data)
def test_baddecompresscopy(self):
# Test copying a compression object in an inconsistent state
data = zlib.compress(HAMLET_SCENE)
d = zlib.decompressobj()
d.decompress(data)
d.flush()
self.assertRaises(ValueError, d.copy)
@requires_Decompress_copy
def test_baddecompresscopy(self):
# Test copying a compression object in an inconsistent state
data = zlib.compress(HAMLET_SCENE)
d = zlib.decompressobj()
d.decompress(data)
d.flush()
self.assertRaises(ValueError, d.copy)
# Memory use of the following functions takes into account overallocation
......
......@@ -23,6 +23,8 @@ Library
Tests
-----
- Issue #18702: All skipped tests now reported as skipped.
- Issue #19085: Added basic tests for all tkinter widget options.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment