Commit a62fab36 authored by Serhiy Storchaka's avatar Serhiy Storchaka

Issue #18702: All skipped tests now reported as skipped.

parent 144dc0bb
...@@ -11,6 +11,7 @@ import operator ...@@ -11,6 +11,7 @@ import operator
import io import io
import math import math
import struct import struct
import sys
import warnings import warnings
import array import array
...@@ -993,15 +994,15 @@ class BaseTest: ...@@ -993,15 +994,15 @@ class BaseTest:
s = None s = None
self.assertRaises(ReferenceError, len, p) self.assertRaises(ReferenceError, len, p)
@unittest.skipUnless(hasattr(sys, 'getrefcount'),
'test needs sys.getrefcount()')
def test_bug_782369(self): def test_bug_782369(self):
import sys for i in range(10):
if hasattr(sys, "getrefcount"): b = array.array('B', range(64))
for i in range(10): rc = sys.getrefcount(10)
b = array.array('B', range(64)) for i in range(10):
rc = sys.getrefcount(10) b = array.array('B', range(64))
for i in range(10): self.assertEqual(rc, sys.getrefcount(10))
b = array.array('B', range(64))
self.assertEqual(rc, sys.getrefcount(10))
def test_subclass_with_kwargs(self): def test_subclass_with_kwargs(self):
# SF bug #1486663 -- this used to erroneously raise a TypeError # SF bug #1486663 -- this used to erroneously raise a TypeError
......
...@@ -39,11 +39,10 @@ class CompileallTests(unittest.TestCase): ...@@ -39,11 +39,10 @@ class CompileallTests(unittest.TestCase):
compare = struct.pack('<4sl', imp.get_magic(), mtime) compare = struct.pack('<4sl', imp.get_magic(), mtime)
return data, compare return data, compare
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def recreation_check(self, metadata): def recreation_check(self, metadata):
"""Check that compileall recreates bytecode when the new metadata is """Check that compileall recreates bytecode when the new metadata is
used.""" used."""
if not hasattr(os, 'stat'):
return
py_compile.compile(self.source_path) py_compile.compile(self.source_path)
self.assertEqual(*self.data()) self.assertEqual(*self.data())
with open(self.bc_path, 'rb') as file: with open(self.bc_path, 'rb') as file:
......
...@@ -896,78 +896,77 @@ Stonecutters Seafood and Chop House+ Lemont+ IL+ 12/19/02+ Week Back ...@@ -896,78 +896,77 @@ Stonecutters Seafood and Chop House+ Lemont+ IL+ 12/19/02+ Week Back
dialect = sniffer.sniff(self.sample9) dialect = sniffer.sniff(self.sample9)
self.assertTrue(dialect.doublequote) self.assertTrue(dialect.doublequote)
if not hasattr(sys, "gettotalrefcount"): class NUL:
if support.verbose: print("*** skipping leakage tests ***") def write(s, *args):
else: pass
class NUL: writelines = write
def write(s, *args):
pass @unittest.skipUnless(hasattr(sys, "gettotalrefcount"),
writelines = write 'requires sys.gettotalrefcount()')
class TestLeaks(unittest.TestCase):
class TestLeaks(unittest.TestCase): def test_create_read(self):
def test_create_read(self): delta = 0
delta = 0 lastrc = sys.gettotalrefcount()
lastrc = sys.gettotalrefcount() for i in range(20):
for i in range(20): gc.collect()
gc.collect() self.assertEqual(gc.garbage, [])
self.assertEqual(gc.garbage, []) rc = sys.gettotalrefcount()
rc = sys.gettotalrefcount() csv.reader(["a,b,c\r\n"])
csv.reader(["a,b,c\r\n"]) csv.reader(["a,b,c\r\n"])
csv.reader(["a,b,c\r\n"]) csv.reader(["a,b,c\r\n"])
csv.reader(["a,b,c\r\n"]) delta = rc-lastrc
delta = rc-lastrc lastrc = rc
lastrc = rc # if csv.reader() leaks, last delta should be 3 or more
# if csv.reader() leaks, last delta should be 3 or more self.assertEqual(delta < 3, True)
self.assertEqual(delta < 3, True)
def test_create_write(self):
def test_create_write(self): delta = 0
delta = 0 lastrc = sys.gettotalrefcount()
lastrc = sys.gettotalrefcount() s = NUL()
s = NUL() for i in range(20):
for i in range(20): gc.collect()
gc.collect() self.assertEqual(gc.garbage, [])
self.assertEqual(gc.garbage, []) rc = sys.gettotalrefcount()
rc = sys.gettotalrefcount() csv.writer(s)
csv.writer(s) csv.writer(s)
csv.writer(s) csv.writer(s)
csv.writer(s) delta = rc-lastrc
delta = rc-lastrc lastrc = rc
lastrc = rc # if csv.writer() leaks, last delta should be 3 or more
# if csv.writer() leaks, last delta should be 3 or more self.assertEqual(delta < 3, True)
self.assertEqual(delta < 3, True)
def test_read(self):
def test_read(self): delta = 0
delta = 0 rows = ["a,b,c\r\n"]*5
rows = ["a,b,c\r\n"]*5 lastrc = sys.gettotalrefcount()
lastrc = sys.gettotalrefcount() for i in range(20):
for i in range(20): gc.collect()
gc.collect() self.assertEqual(gc.garbage, [])
self.assertEqual(gc.garbage, []) rc = sys.gettotalrefcount()
rc = sys.gettotalrefcount() rdr = csv.reader(rows)
rdr = csv.reader(rows) for row in rdr:
for row in rdr: pass
pass delta = rc-lastrc
delta = rc-lastrc lastrc = rc
lastrc = rc # if reader leaks during read, delta should be 5 or more
# if reader leaks during read, delta should be 5 or more self.assertEqual(delta < 5, True)
self.assertEqual(delta < 5, True)
def test_write(self):
def test_write(self): delta = 0
delta = 0 rows = [[1,2,3]]*5
rows = [[1,2,3]]*5 s = NUL()
s = NUL() lastrc = sys.gettotalrefcount()
lastrc = sys.gettotalrefcount() for i in range(20):
for i in range(20): gc.collect()
gc.collect() self.assertEqual(gc.garbage, [])
self.assertEqual(gc.garbage, []) rc = sys.gettotalrefcount()
rc = sys.gettotalrefcount() writer = csv.writer(s)
writer = csv.writer(s) for row in rows:
for row in rows: writer.writerow(row)
writer.writerow(row) delta = rc-lastrc
delta = rc-lastrc lastrc = rc
lastrc = rc # if writer leaks during write, last delta should be 5 or more
# if writer leaks during write, last delta should be 5 or more self.assertEqual(delta < 5, True)
self.assertEqual(delta < 5, True)
class TestUnicode(unittest.TestCase): class TestUnicode(unittest.TestCase):
......
...@@ -37,11 +37,9 @@ class DumbDBMTestCase(unittest.TestCase): ...@@ -37,11 +37,9 @@ class DumbDBMTestCase(unittest.TestCase):
self.read_helper(f) self.read_helper(f)
f.close() f.close()
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'chmod'), 'test needs os.chmod()')
def test_dumbdbm_creation_mode(self): def test_dumbdbm_creation_mode(self):
# On platforms without chmod, don't do anything.
if not (hasattr(os, 'chmod') and hasattr(os, 'umask')):
return
try: try:
old_umask = os.umask(0o002) old_umask = os.umask(0o002)
f = dumbdbm.open(_fname, 'c', 0o637) f = dumbdbm.open(_fname, 'c', 0o637)
......
...@@ -204,11 +204,10 @@ class TestReversed(unittest.TestCase, PickleTest): ...@@ -204,11 +204,10 @@ class TestReversed(unittest.TestCase, PickleTest):
self.assertRaises(TypeError, reversed) self.assertRaises(TypeError, reversed)
self.assertRaises(TypeError, reversed, [], 'extra') self.assertRaises(TypeError, reversed, [], 'extra')
@unittest.skipUnless(hasattr(sys, 'getrefcount'), 'test needs sys.getrefcount()')
def test_bug1229429(self): def test_bug1229429(self):
# this bug was never in reversed, it was in # this bug was never in reversed, it was in
# PyObject_CallMethod, and reversed_new calls that sometimes. # PyObject_CallMethod, and reversed_new calls that sometimes.
if not hasattr(sys, "getrefcount"):
return
def f(): def f():
pass pass
r = f.__reversed__ = object() r = f.__reversed__ = object()
......
...@@ -16,7 +16,7 @@ try: ...@@ -16,7 +16,7 @@ try:
except ImportError: except ImportError:
ssl = None ssl = None
from unittest import TestCase from unittest import TestCase, skipUnless
from test import support from test import support
from test.support import HOST, HOSTv6 from test.support import HOST, HOSTv6
threading = support.import_module('threading') threading = support.import_module('threading')
...@@ -779,6 +779,7 @@ class TestFTPClass(TestCase): ...@@ -779,6 +779,7 @@ class TestFTPClass(TestCase):
self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f) self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled")
class TestIPv6Environment(TestCase): class TestIPv6Environment(TestCase):
def setUp(self): def setUp(self):
...@@ -819,6 +820,7 @@ class TestIPv6Environment(TestCase): ...@@ -819,6 +820,7 @@ class TestIPv6Environment(TestCase):
retr() retr()
@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClassMixin(TestFTPClass): class TestTLS_FTPClassMixin(TestFTPClass):
"""Repeat TestFTPClass tests starting the TLS layer for both control """Repeat TestFTPClass tests starting the TLS layer for both control
and data connections first. and data connections first.
...@@ -834,6 +836,7 @@ class TestTLS_FTPClassMixin(TestFTPClass): ...@@ -834,6 +836,7 @@ class TestTLS_FTPClassMixin(TestFTPClass):
self.client.prot_p() self.client.prot_p()
@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClass(TestCase): class TestTLS_FTPClass(TestCase):
"""Specific TLS_FTP class tests.""" """Specific TLS_FTP class tests."""
...@@ -1015,12 +1018,9 @@ class TestTimeouts(TestCase): ...@@ -1015,12 +1018,9 @@ class TestTimeouts(TestCase):
def test_main(): def test_main():
tests = [TestFTPClass, TestTimeouts] tests = [TestFTPClass, TestTimeouts,
if support.IPV6_ENABLED: TestIPv6Environment,
tests.append(TestIPv6Environment) TestTLS_FTPClassMixin, TestTLS_FTPClass]
if ssl is not None:
tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass])
thread_info = support.threading_setup() thread_info = support.threading_setup()
try: try:
......
...@@ -868,10 +868,10 @@ class TestMaildir(TestMailbox, unittest.TestCase): ...@@ -868,10 +868,10 @@ class TestMaildir(TestMailbox, unittest.TestCase):
for msg in self._box: for msg in self._box:
pass pass
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_file_permissions(self): def test_file_permissions(self):
# Verify that message files are created without execute permissions # Verify that message files are created without execute permissions
if not hasattr(os, "stat") or not hasattr(os, "umask"):
return
msg = mailbox.MaildirMessage(self._template % 0) msg = mailbox.MaildirMessage(self._template % 0)
orig_umask = os.umask(0) orig_umask = os.umask(0)
try: try:
...@@ -882,12 +882,11 @@ class TestMaildir(TestMailbox, unittest.TestCase): ...@@ -882,12 +882,11 @@ class TestMaildir(TestMailbox, unittest.TestCase):
mode = os.stat(path).st_mode mode = os.stat(path).st_mode
self.assertFalse(mode & 0o111) self.assertFalse(mode & 0o111)
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_folder_file_perms(self): def test_folder_file_perms(self):
# From bug #3228, we want to verify that the file created inside a Maildir # From bug #3228, we want to verify that the file created inside a Maildir
# subfolder isn't marked as executable. # subfolder isn't marked as executable.
if not hasattr(os, "stat") or not hasattr(os, "umask"):
return
orig_umask = os.umask(0) orig_umask = os.umask(0)
try: try:
subfolder = self._box.add_folder('subfolder') subfolder = self._box.add_folder('subfolder')
...@@ -1097,24 +1096,25 @@ class TestMbox(_TestMboxMMDF, unittest.TestCase): ...@@ -1097,24 +1096,25 @@ class TestMbox(_TestMboxMMDF, unittest.TestCase):
_factory = lambda self, path, factory=None: mailbox.mbox(path, factory) _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def test_file_perms(self): def test_file_perms(self):
# From bug #3228, we want to verify that the mailbox file isn't executable, # From bug #3228, we want to verify that the mailbox file isn't executable,
# even if the umask is set to something that would leave executable bits set. # even if the umask is set to something that would leave executable bits set.
# We only run this test on platforms that support umask. # We only run this test on platforms that support umask.
if hasattr(os, 'umask') and hasattr(os, 'stat'): try:
try: old_umask = os.umask(0o077)
old_umask = os.umask(0o077) self._box.close()
self._box.close() os.unlink(self._path)
os.unlink(self._path) self._box = mailbox.mbox(self._path, create=True)
self._box = mailbox.mbox(self._path, create=True) self._box.add('')
self._box.add('') self._box.close()
self._box.close() finally:
finally: os.umask(old_umask)
os.umask(old_umask)
st = os.stat(self._path) st = os.stat(self._path)
perms = st.st_mode perms = st.st_mode
self.assertFalse((perms & 0o111)) # Execute bits should all be off. self.assertFalse((perms & 0o111)) # Execute bits should all be off.
def test_terminating_newline(self): def test_terminating_newline(self):
message = email.message.Message() message = email.message.Message()
......
...@@ -980,38 +980,37 @@ class MathTests(unittest.TestCase): ...@@ -980,38 +980,37 @@ class MathTests(unittest.TestCase):
# still fails this part of the test on some platforms. For now, we only # still fails this part of the test on some platforms. For now, we only
# *run* test_exceptions() in verbose mode, so that this isn't normally # *run* test_exceptions() in verbose mode, so that this isn't normally
# tested. # tested.
@unittest.skipUnless(verbose, 'requires verbose mode')
def test_exceptions(self):
try:
x = math.exp(-1000000000)
except:
# mathmodule.c is failing to weed out underflows from libm, or
# we've got an fp format with huge dynamic range
self.fail("underflowing exp() should not have raised "
"an exception")
if x != 0:
self.fail("underflowing exp() should have returned 0")
# If this fails, probably using a strict IEEE-754 conforming libm, and x
# is +Inf afterwards. But Python wants overflows detected by default.
try:
x = math.exp(1000000000)
except OverflowError:
pass
else:
self.fail("overflowing exp() didn't trigger OverflowError")
if verbose: # If this fails, it could be a puzzle. One odd possibility is that
def test_exceptions(self): # mathmodule.c's macros are getting confused while comparing
try: # Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE
x = math.exp(-1000000000) # as a result (and so raising OverflowError instead).
except: try:
# mathmodule.c is failing to weed out underflows from libm, or x = math.sqrt(-1.0)
# we've got an fp format with huge dynamic range except ValueError:
self.fail("underflowing exp() should not have raised " pass
"an exception") else:
if x != 0: self.fail("sqrt(-1) didn't raise ValueError")
self.fail("underflowing exp() should have returned 0")
# If this fails, probably using a strict IEEE-754 conforming libm, and x
# is +Inf afterwards. But Python wants overflows detected by default.
try:
x = math.exp(1000000000)
except OverflowError:
pass
else:
self.fail("overflowing exp() didn't trigger OverflowError")
# If this fails, it could be a puzzle. One odd possibility is that
# mathmodule.c's macros are getting confused while comparing
# Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE
# as a result (and so raising OverflowError instead).
try:
x = math.sqrt(-1.0)
except ValueError:
pass
else:
self.fail("sqrt(-1) didn't raise ValueError")
@requires_IEEE_754 @requires_IEEE_754
def test_testfile(self): def test_testfile(self):
......
...@@ -314,26 +314,25 @@ class MmapTests(unittest.TestCase): ...@@ -314,26 +314,25 @@ class MmapTests(unittest.TestCase):
mf.close() mf.close()
f.close() f.close()
@unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_entire_file(self): def test_entire_file(self):
# test mapping of entire file by passing 0 for map length # test mapping of entire file by passing 0 for map length
if hasattr(os, "stat"): f = open(TESTFN, "wb+")
f = open(TESTFN, "wb+")
f.write(2**16 * b'm') # Arbitrary character f.write(2**16 * b'm') # Arbitrary character
f.close() f.close()
f = open(TESTFN, "rb+") f = open(TESTFN, "rb+")
mf = mmap.mmap(f.fileno(), 0) mf = mmap.mmap(f.fileno(), 0)
self.assertEqual(len(mf), 2**16, "Map size should equal file size.") self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
self.assertEqual(mf.read(2**16), 2**16 * b"m") self.assertEqual(mf.read(2**16), 2**16 * b"m")
mf.close() mf.close()
f.close() f.close()
@unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_length_0_offset(self): def test_length_0_offset(self):
# Issue #10916: test mapping of remainder of file by passing 0 for # Issue #10916: test mapping of remainder of file by passing 0 for
# map length with an offset doesn't cause a segfault. # map length with an offset doesn't cause a segfault.
if not hasattr(os, "stat"):
self.skipTest("needs os.stat")
# NOTE: allocation granularity is currently 65536 under Win64, # NOTE: allocation granularity is currently 65536 under Win64,
# and therefore the minimum offset alignment. # and therefore the minimum offset alignment.
with open(TESTFN, "wb") as f: with open(TESTFN, "wb") as f:
...@@ -343,12 +342,10 @@ class MmapTests(unittest.TestCase): ...@@ -343,12 +342,10 @@ class MmapTests(unittest.TestCase):
with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf: with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
self.assertRaises(IndexError, mf.__getitem__, 80000) self.assertRaises(IndexError, mf.__getitem__, 80000)
@unittest.skipUnless(hasattr(os, "stat"), "needs os.stat()")
def test_length_0_large_offset(self): def test_length_0_large_offset(self):
# Issue #10959: test mapping of a file by passing 0 for # Issue #10959: test mapping of a file by passing 0 for
# map length with a large offset doesn't cause a segfault. # map length with a large offset doesn't cause a segfault.
if not hasattr(os, "stat"):
self.skipTest("needs os.stat")
with open(TESTFN, "wb") as f: with open(TESTFN, "wb") as f:
f.write(115699 * b'm') # Arbitrary character f.write(115699 * b'm') # Arbitrary character
...@@ -560,9 +557,8 @@ class MmapTests(unittest.TestCase): ...@@ -560,9 +557,8 @@ class MmapTests(unittest.TestCase):
return mmap.mmap.__new__(klass, -1, *args, **kwargs) return mmap.mmap.__new__(klass, -1, *args, **kwargs)
anon_mmap(PAGESIZE) anon_mmap(PAGESIZE)
@unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
def test_prot_readonly(self): def test_prot_readonly(self):
if not hasattr(mmap, 'PROT_READ'):
return
mapsize = 10 mapsize = 10
with open(TESTFN, "wb") as fp: with open(TESTFN, "wb") as fp:
fp.write(b"a"*mapsize) fp.write(b"a"*mapsize)
...@@ -616,67 +612,69 @@ class MmapTests(unittest.TestCase): ...@@ -616,67 +612,69 @@ class MmapTests(unittest.TestCase):
self.assertEqual(m.read_byte(), b) self.assertEqual(m.read_byte(), b)
m.close() m.close()
if os.name == 'nt': @unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_tagname(self): def test_tagname(self):
data1 = b"0123456789" data1 = b"0123456789"
data2 = b"abcdefghij" data2 = b"abcdefghij"
assert len(data1) == len(data2) assert len(data1) == len(data2)
# Test same tag # Test same tag
m1 = mmap.mmap(-1, len(data1), tagname="foo") m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1[:] = data1 m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="foo") m2 = mmap.mmap(-1, len(data2), tagname="foo")
m2[:] = data2 m2[:] = data2
self.assertEqual(m1[:], data2) self.assertEqual(m1[:], data2)
self.assertEqual(m2[:], data2) self.assertEqual(m2[:], data2)
m2.close() m2.close()
m1.close() m1.close()
# Test different tag # Test different tag
m1 = mmap.mmap(-1, len(data1), tagname="foo") m1 = mmap.mmap(-1, len(data1), tagname="foo")
m1[:] = data1 m1[:] = data1
m2 = mmap.mmap(-1, len(data2), tagname="boo") m2 = mmap.mmap(-1, len(data2), tagname="boo")
m2[:] = data2 m2[:] = data2
self.assertEqual(m1[:], data1) self.assertEqual(m1[:], data1)
self.assertEqual(m2[:], data2) self.assertEqual(m2[:], data2)
m2.close() m2.close()
m1.close() m1.close()
def test_crasher_on_windows(self): @unittest.skipUnless(os.name == 'nt', 'requires Windows')
# Should not crash (Issue 1733986) def test_crasher_on_windows(self):
m = mmap.mmap(-1, 1000, tagname="foo") # Should not crash (Issue 1733986)
try: m = mmap.mmap(-1, 1000, tagname="foo")
mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size try:
except: mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
pass except:
m.close() pass
m.close()
# Should not crash (Issue 5385) # Should not crash (Issue 5385)
with open(TESTFN, "wb") as fp: with open(TESTFN, "wb") as fp:
fp.write(b"x"*10) fp.write(b"x"*10)
f = open(TESTFN, "r+b") f = open(TESTFN, "r+b")
m = mmap.mmap(f.fileno(), 0) m = mmap.mmap(f.fileno(), 0)
f.close() f.close()
try: try:
m.resize(0) # will raise WindowsError m.resize(0) # will raise WindowsError
except: except:
pass pass
try: try:
m[:] m[:]
except: except:
pass pass
m.close() m.close()
def test_invalid_descriptor(self): @unittest.skipUnless(os.name == 'nt', 'requires Windows')
# socket file descriptors are valid, but out of range def test_invalid_descriptor(self):
# for _get_osfhandle, causing a crash when validating the # socket file descriptors are valid, but out of range
# parameters to _get_osfhandle. # for _get_osfhandle, causing a crash when validating the
s = socket.socket() # parameters to _get_osfhandle.
try: s = socket.socket()
with self.assertRaises(mmap.error): try:
m = mmap.mmap(s.fileno(), 10) with self.assertRaises(mmap.error):
finally: m = mmap.mmap(s.fileno(), 10)
s.close() finally:
s.close()
def test_context_manager(self): def test_context_manager(self):
with mmap.mmap(-1, 10) as m: with mmap.mmap(-1, 10) as m:
......
...@@ -6,10 +6,12 @@ import unittest ...@@ -6,10 +6,12 @@ import unittest
import functools import functools
import contextlib import contextlib
from test import support from test import support
from nntplib import NNTP, GroupInfo, _have_ssl from nntplib import NNTP, GroupInfo
import nntplib import nntplib
if _have_ssl: try:
import ssl import ssl
except ImportError:
ssl = None
TIMEOUT = 30 TIMEOUT = 30
...@@ -199,23 +201,23 @@ class NetworkedNNTPTestsMixin: ...@@ -199,23 +201,23 @@ class NetworkedNNTPTestsMixin:
resp, caps = self.server.capabilities() resp, caps = self.server.capabilities()
_check_caps(caps) _check_caps(caps)
if _have_ssl: @unittest.skipUnless(ssl, 'requires SSL support')
def test_starttls(self): def test_starttls(self):
file = self.server.file file = self.server.file
sock = self.server.sock sock = self.server.sock
try: try:
self.server.starttls() self.server.starttls()
except nntplib.NNTPPermanentError: except nntplib.NNTPPermanentError:
self.skipTest("STARTTLS not supported by server.") self.skipTest("STARTTLS not supported by server.")
else: else:
# Check that the socket and internal pseudo-file really were # Check that the socket and internal pseudo-file really were
# changed. # changed.
self.assertNotEqual(file, self.server.file) self.assertNotEqual(file, self.server.file)
self.assertNotEqual(sock, self.server.sock) self.assertNotEqual(sock, self.server.sock)
# Check that the new socket really is an SSL one # Check that the new socket really is an SSL one
self.assertIsInstance(self.server.sock, ssl.SSLSocket) self.assertIsInstance(self.server.sock, ssl.SSLSocket)
# Check that trying starttls when it's already active fails. # Check that trying starttls when it's already active fails.
self.assertRaises(ValueError, self.server.starttls) self.assertRaises(ValueError, self.server.starttls)
def test_zlogin(self): def test_zlogin(self):
# This test must be the penultimate because further commands will be # This test must be the penultimate because further commands will be
...@@ -300,25 +302,24 @@ class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase): ...@@ -300,25 +302,24 @@ class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
if cls.server is not None: if cls.server is not None:
cls.server.quit() cls.server.quit()
@unittest.skipUnless(ssl, 'requires SSL support')
class NetworkedNNTP_SSLTests(NetworkedNNTPTests):
if _have_ssl: # Technical limits for this public NNTP server (see http://www.aioe.org):
class NetworkedNNTP_SSLTests(NetworkedNNTPTests): # "Only two concurrent connections per IP address are allowed and
# 400 connections per day are accepted from each IP address."
# Technical limits for this public NNTP server (see http://www.aioe.org):
# "Only two concurrent connections per IP address are allowed and
# 400 connections per day are accepted from each IP address."
NNTP_HOST = 'nntp.aioe.org' NNTP_HOST = 'nntp.aioe.org'
GROUP_NAME = 'comp.lang.python' GROUP_NAME = 'comp.lang.python'
GROUP_PAT = 'comp.lang.*' GROUP_PAT = 'comp.lang.*'
NNTP_CLASS = nntplib.NNTP_SSL NNTP_CLASS = getattr(nntplib, 'NNTP_SSL', None)
# Disabled as it produces too much data # Disabled as it produces too much data
test_list = None test_list = None
# Disabled as the connection will already be encrypted. # Disabled as the connection will already be encrypted.
test_starttls = None test_starttls = None
# #
...@@ -1407,12 +1408,13 @@ class MiscTests(unittest.TestCase): ...@@ -1407,12 +1408,13 @@ class MiscTests(unittest.TestCase):
gives(2000, 6, 23, "000623", "000000") gives(2000, 6, 23, "000623", "000000")
gives(2010, 6, 5, "100605", "000000") gives(2010, 6, 5, "100605", "000000")
@unittest.skipUnless(ssl, 'requires SSL support')
def test_ssl_support(self):
self.assertTrue(hasattr(nntplib, 'NNTP_SSL'))
def test_main(): def test_main():
tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests, tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
SendReaderNNTPv2Tests, NetworkedNNTPTests] SendReaderNNTPv2Tests, NetworkedNNTPTests, NetworkedNNTP_SSLTests]
if _have_ssl:
tests.append(NetworkedNNTP_SSLTests)
support.run_unittest(*tests) support.run_unittest(*tests)
......
This diff is collapsed.
...@@ -11,7 +11,7 @@ import os ...@@ -11,7 +11,7 @@ import os
import time import time
import errno import errno
from unittest import TestCase from unittest import TestCase, skipUnless
from test import support as test_support from test import support as test_support
threading = test_support.import_module('threading') threading = test_support.import_module('threading')
...@@ -288,35 +288,37 @@ if hasattr(poplib, 'POP3_SSL'): ...@@ -288,35 +288,37 @@ if hasattr(poplib, 'POP3_SSL'):
else: else:
DummyPOP3Handler.handle_read(self) DummyPOP3Handler.handle_read(self)
requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
class TestPOP3_SSLClass(TestPOP3Class): @requires_ssl
# repeat previous tests by using poplib.POP3_SSL class TestPOP3_SSLClass(TestPOP3Class):
# repeat previous tests by using poplib.POP3_SSL
def setUp(self): def setUp(self):
self.server = DummyPOP3Server((HOST, PORT)) self.server = DummyPOP3Server((HOST, PORT))
self.server.handler = DummyPOP3_SSLHandler self.server.handler = DummyPOP3_SSLHandler
self.server.start() self.server.start()
self.client = poplib.POP3_SSL(self.server.host, self.server.port) self.client = poplib.POP3_SSL(self.server.host, self.server.port)
def test__all__(self): def test__all__(self):
self.assertIn('POP3_SSL', poplib.__all__) self.assertIn('POP3_SSL', poplib.__all__)
def test_context(self): def test_context(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, keyfile=CERTFILE, context=ctx) self.server.port, keyfile=CERTFILE, context=ctx)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, certfile=CERTFILE, context=ctx) self.server.port, certfile=CERTFILE, context=ctx)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, keyfile=CERTFILE, self.server.port, keyfile=CERTFILE,
certfile=CERTFILE, context=ctx) certfile=CERTFILE, context=ctx)
self.client.quit() self.client.quit()
self.client = poplib.POP3_SSL(self.server.host, self.server.port, self.client = poplib.POP3_SSL(self.server.host, self.server.port,
context=ctx) context=ctx)
self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.assertIsInstance(self.client.sock, ssl.SSLSocket)
self.assertIs(self.client.sock.context, ctx) self.assertIs(self.client.sock.context, ctx)
self.assertTrue(self.client.noop().startswith(b'+OK')) self.assertTrue(self.client.noop().startswith(b'+OK'))
class TestTimeouts(TestCase): class TestTimeouts(TestCase):
...@@ -374,9 +376,8 @@ class TestTimeouts(TestCase): ...@@ -374,9 +376,8 @@ class TestTimeouts(TestCase):
def test_main(): def test_main():
tests = [TestPOP3Class, TestTimeouts] tests = [TestPOP3Class, TestTimeouts,
if SUPPORTS_SSL: TestPOP3_SSLClass]
tests.append(TestPOP3_SSLClass)
thread_info = test_support.threading_setup() thread_info = test_support.threading_setup()
try: try:
test_support.run_unittest(*tests) test_support.run_unittest(*tests)
......
This diff is collapsed.
...@@ -625,10 +625,10 @@ class TestSet(TestJointOps, unittest.TestCase): ...@@ -625,10 +625,10 @@ class TestSet(TestJointOps, unittest.TestCase):
myset >= myobj myset >= myobj
self.assertTrue(myobj.le_called) self.assertTrue(myobj.le_called)
# C API test only available in a debug build @unittest.skipUnless(hasattr(set, "test_c_api"),
if hasattr(set, "test_c_api"): 'C API test only available in a debug build')
def test_c_api(self): def test_c_api(self):
self.assertEqual(set().test_c_api(), True) self.assertEqual(set().test_c_api(), True)
class SetSubclass(set): class SetSubclass(set):
pass pass
......
...@@ -194,37 +194,37 @@ class TestShutil(unittest.TestCase): ...@@ -194,37 +194,37 @@ class TestShutil(unittest.TestCase):
self.assertIn(errors[1][2][1].filename, possible_args) self.assertIn(errors[1][2][1].filename, possible_args)
# See bug #1071513 for why we don't run this on cygwin @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
# and bug #1076467 for why we don't run this as root. @unittest.skipIf(sys.platform[:6] == 'cygwin',
if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin' "This test can't be run on Cygwin (issue #1071513).")
and not (hasattr(os, 'geteuid') and os.geteuid() == 0)): @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
def test_on_error(self): "This test can't be run reliably as root (issue #1076467).")
self.errorState = 0 def test_on_error(self):
os.mkdir(TESTFN) self.errorState = 0
self.addCleanup(shutil.rmtree, TESTFN) os.mkdir(TESTFN)
self.addCleanup(shutil.rmtree, TESTFN)
self.child_file_path = os.path.join(TESTFN, 'a')
self.child_dir_path = os.path.join(TESTFN, 'b') self.child_file_path = os.path.join(TESTFN, 'a')
support.create_empty_file(self.child_file_path) self.child_dir_path = os.path.join(TESTFN, 'b')
os.mkdir(self.child_dir_path) support.create_empty_file(self.child_file_path)
old_dir_mode = os.stat(TESTFN).st_mode os.mkdir(self.child_dir_path)
old_child_file_mode = os.stat(self.child_file_path).st_mode old_dir_mode = os.stat(TESTFN).st_mode
old_child_dir_mode = os.stat(self.child_dir_path).st_mode old_child_file_mode = os.stat(self.child_file_path).st_mode
# Make unwritable. old_child_dir_mode = os.stat(self.child_dir_path).st_mode
new_mode = stat.S_IREAD|stat.S_IEXEC # Make unwritable.
os.chmod(self.child_file_path, new_mode) new_mode = stat.S_IREAD|stat.S_IEXEC
os.chmod(self.child_dir_path, new_mode) os.chmod(self.child_file_path, new_mode)
os.chmod(TESTFN, new_mode) os.chmod(self.child_dir_path, new_mode)
os.chmod(TESTFN, new_mode)
self.addCleanup(os.chmod, TESTFN, old_dir_mode)
self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) self.addCleanup(os.chmod, TESTFN, old_dir_mode)
self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode)
self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode)
shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
# Test whether onerror has actually been called. shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
self.assertEqual(self.errorState, 3, # Test whether onerror has actually been called.
"Expected call to onerror function did not " self.assertEqual(self.errorState, 3,
"happen.") "Expected call to onerror function did not happen.")
def check_args_to_onerror(self, func, arg, exc): def check_args_to_onerror(self, func, arg, exc):
# test_rmtree_errors deliberately runs rmtree # test_rmtree_errors deliberately runs rmtree
...@@ -806,38 +806,39 @@ class TestShutil(unittest.TestCase): ...@@ -806,38 +806,39 @@ class TestShutil(unittest.TestCase):
finally: finally:
shutil.rmtree(TESTFN, ignore_errors=True) shutil.rmtree(TESTFN, ignore_errors=True)
if hasattr(os, "mkfifo"): # Issue #3002: copyfile and copytree block indefinitely on named pipes
# Issue #3002: copyfile and copytree block indefinitely on named pipes @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
def test_copyfile_named_pipe(self): def test_copyfile_named_pipe(self):
os.mkfifo(TESTFN) os.mkfifo(TESTFN)
try: try:
self.assertRaises(shutil.SpecialFileError, self.assertRaises(shutil.SpecialFileError,
shutil.copyfile, TESTFN, TESTFN2) shutil.copyfile, TESTFN, TESTFN2)
self.assertRaises(shutil.SpecialFileError, self.assertRaises(shutil.SpecialFileError,
shutil.copyfile, __file__, TESTFN) shutil.copyfile, __file__, TESTFN)
finally: finally:
os.remove(TESTFN) os.remove(TESTFN)
@support.skip_unless_symlink @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
def test_copytree_named_pipe(self): @support.skip_unless_symlink
os.mkdir(TESTFN) def test_copytree_named_pipe(self):
os.mkdir(TESTFN)
try:
subdir = os.path.join(TESTFN, "subdir")
os.mkdir(subdir)
pipe = os.path.join(subdir, "mypipe")
os.mkfifo(pipe)
try: try:
subdir = os.path.join(TESTFN, "subdir") shutil.copytree(TESTFN, TESTFN2)
os.mkdir(subdir) except shutil.Error as e:
pipe = os.path.join(subdir, "mypipe") errors = e.args[0]
os.mkfifo(pipe) self.assertEqual(len(errors), 1)
try: src, dst, error_msg = errors[0]
shutil.copytree(TESTFN, TESTFN2) self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
except shutil.Error as e: else:
errors = e.args[0] self.fail("shutil.Error should have been raised")
self.assertEqual(len(errors), 1) finally:
src, dst, error_msg = errors[0] shutil.rmtree(TESTFN, ignore_errors=True)
self.assertEqual("`%s` is a named pipe" % pipe, error_msg) shutil.rmtree(TESTFN2, ignore_errors=True)
else:
self.fail("shutil.Error should have been raised")
finally:
shutil.rmtree(TESTFN, ignore_errors=True)
shutil.rmtree(TESTFN2, ignore_errors=True)
def test_copytree_special_func(self): def test_copytree_special_func(self):
......
...@@ -772,16 +772,17 @@ class GeneralModuleTests(unittest.TestCase): ...@@ -772,16 +772,17 @@ class GeneralModuleTests(unittest.TestCase):
self.assertRaises(TypeError, socket.if_nametoindex, 0) self.assertRaises(TypeError, socket.if_nametoindex, 0)
self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF') self.assertRaises(TypeError, socket.if_indextoname, '_DEADBEEF')
@unittest.skipUnless(hasattr(sys, 'getrefcount'),
'test needs sys.getrefcount()')
def testRefCountGetNameInfo(self): def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo # Testing reference count for getnameinfo
if hasattr(sys, "getrefcount"): try:
try: # On some versions, this loses a reference
# On some versions, this loses a reference orig = sys.getrefcount(__name__)
orig = sys.getrefcount(__name__) socket.getnameinfo(__name__,0)
socket.getnameinfo(__name__,0) except TypeError:
except TypeError: if sys.getrefcount(__name__) != orig:
if sys.getrefcount(__name__) != orig: self.fail("socket.getnameinfo loses a reference")
self.fail("socket.getnameinfo loses a reference")
def testInterpreterCrash(self): def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter # Making sure getnameinfo doesn't crash the interpreter
...@@ -886,17 +887,17 @@ class GeneralModuleTests(unittest.TestCase): ...@@ -886,17 +887,17 @@ class GeneralModuleTests(unittest.TestCase):
# Check that setting it to an invalid type raises TypeError # Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam") self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
@unittest.skipUnless(hasattr(socket, 'inet_aton'),
'test needs socket.inet_aton()')
def testIPv4_inet_aton_fourbytes(self): def testIPv4_inet_aton_fourbytes(self):
if not hasattr(socket, 'inet_aton'):
return # No inet_aton, nothing to check
# Test that issue1008086 and issue767150 are fixed. # Test that issue1008086 and issue767150 are fixed.
# It must return 4 bytes. # It must return 4 bytes.
self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0')) self.assertEqual(b'\x00'*4, socket.inet_aton('0.0.0.0'))
self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255')) self.assertEqual(b'\xff'*4, socket.inet_aton('255.255.255.255'))
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv4toString(self): def testIPv4toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
from socket import inet_aton as f, inet_pton, AF_INET from socket import inet_aton as f, inet_pton, AF_INET
g = lambda a: inet_pton(AF_INET, a) g = lambda a: inet_pton(AF_INET, a)
...@@ -925,9 +926,9 @@ class GeneralModuleTests(unittest.TestCase): ...@@ -925,9 +926,9 @@ class GeneralModuleTests(unittest.TestCase):
assertInvalid(g, '1.2.3.4.5') assertInvalid(g, '1.2.3.4.5')
assertInvalid(g, '::1') assertInvalid(g, '::1')
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv6toString(self): def testIPv6toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
try: try:
from socket import inet_pton, AF_INET6, has_ipv6 from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6: if not has_ipv6:
...@@ -979,9 +980,9 @@ class GeneralModuleTests(unittest.TestCase): ...@@ -979,9 +980,9 @@ class GeneralModuleTests(unittest.TestCase):
assertInvalid('::1.2.3.4:0') assertInvalid('::1.2.3.4:0')
assertInvalid('0.100.200.0:3:4:5:6:7:8') assertInvalid('0.100.200.0:3:4:5:6:7:8')
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv4(self): def testStringToIPv4(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
from socket import inet_ntoa as f, inet_ntop, AF_INET from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a) g = lambda a: inet_ntop(AF_INET, a)
assertInvalid = lambda func,a: self.assertRaises( assertInvalid = lambda func,a: self.assertRaises(
...@@ -1003,9 +1004,9 @@ class GeneralModuleTests(unittest.TestCase): ...@@ -1003,9 +1004,9 @@ class GeneralModuleTests(unittest.TestCase):
assertInvalid(g, b'\x00' * 5) assertInvalid(g, b'\x00' * 5)
assertInvalid(g, b'\x00' * 16) assertInvalid(g, b'\x00' * 16)
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv6(self): def testStringToIPv6(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
try: try:
from socket import inet_ntop, AF_INET6, has_ipv6 from socket import inet_ntop, AF_INET6, has_ipv6
if not has_ipv6: if not has_ipv6:
...@@ -3531,6 +3532,8 @@ class TCPCloserTest(ThreadedTCPSocketTest): ...@@ -3531,6 +3532,8 @@ class TCPCloserTest(ThreadedTCPSocketTest):
self.cli.connect((HOST, self.port)) self.cli.connect((HOST, self.port))
time.sleep(1.0) time.sleep(1.0)
@unittest.skipUnless(hasattr(socket, 'socketpair'),
'test needs socket.socketpair()')
@unittest.skipUnless(thread, 'Threading required for this test.') @unittest.skipUnless(thread, 'Threading required for this test.')
class BasicSocketPairTest(SocketPairTest): class BasicSocketPairTest(SocketPairTest):
...@@ -3593,26 +3596,27 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest): ...@@ -3593,26 +3596,27 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testSetBlocking(self): def _testSetBlocking(self):
pass pass
if hasattr(socket, "SOCK_NONBLOCK"): @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
@support.requires_linux_version(2, 6, 28) 'test needs socket.SOCK_NONBLOCK')
def testInitNonBlocking(self): @support.requires_linux_version(2, 6, 28)
# reinit server socket def testInitNonBlocking(self):
self.serv.close() # reinit server socket
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM | self.serv.close()
socket.SOCK_NONBLOCK) self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM |
self.port = support.bind_port(self.serv) socket.SOCK_NONBLOCK)
self.serv.listen(1) self.port = support.bind_port(self.serv)
# actual testing self.serv.listen(1)
start = time.time() # actual testing
try: start = time.time()
self.serv.accept() try:
except socket.error: self.serv.accept()
pass except socket.error:
end = time.time()
self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
def _testInitNonBlocking(self):
pass pass
end = time.time()
self.assertTrue((end - start) < 1.0, "Error creating with non-blocking mode.")
def _testInitNonBlocking(self):
pass
def testInheritFlags(self): def testInheritFlags(self):
# Issue #7995: when calling accept() on a listening socket with a # Issue #7995: when calling accept() on a listening socket with a
...@@ -4302,12 +4306,12 @@ class TCPTimeoutTest(SocketTCPTest): ...@@ -4302,12 +4306,12 @@ class TCPTimeoutTest(SocketTCPTest):
if not ok: if not ok:
self.fail("accept() returned success when we did not expect it") self.fail("accept() returned success when we did not expect it")
@unittest.skipUnless(hasattr(signal, 'alarm'),
'test needs signal.alarm()')
def testInterruptedTimeout(self): def testInterruptedTimeout(self):
# XXX I don't know how to do this test on MSWindows or any other # XXX I don't know how to do this test on MSWindows or any other
# plaform that doesn't support signal.alarm() or os.kill(), though # plaform that doesn't support signal.alarm() or os.kill(), though
# the bug should have existed on all platforms. # the bug should have existed on all platforms.
if not hasattr(signal, "alarm"):
return # can only test on *nix
self.serv.settimeout(5.0) # must be longer than alarm self.serv.settimeout(5.0) # must be longer than alarm
class Alarm(Exception): class Alarm(Exception):
pass pass
...@@ -4367,6 +4371,7 @@ class TestExceptions(unittest.TestCase): ...@@ -4367,6 +4371,7 @@ class TestExceptions(unittest.TestCase):
self.assertTrue(issubclass(socket.gaierror, socket.error)) self.assertTrue(issubclass(socket.gaierror, socket.error))
self.assertTrue(issubclass(socket.timeout, socket.error)) self.assertTrue(issubclass(socket.timeout, socket.error))
@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
class TestLinuxAbstractNamespace(unittest.TestCase): class TestLinuxAbstractNamespace(unittest.TestCase):
UNIX_PATH_MAX = 108 UNIX_PATH_MAX = 108
...@@ -4402,6 +4407,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase): ...@@ -4402,6 +4407,7 @@ class TestLinuxAbstractNamespace(unittest.TestCase):
finally: finally:
s.close() s.close()
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'test needs socket.AF_UNIX')
class TestUnixDomain(unittest.TestCase): class TestUnixDomain(unittest.TestCase):
def setUp(self): def setUp(self):
...@@ -4551,10 +4557,10 @@ def isTipcAvailable(): ...@@ -4551,10 +4557,10 @@ def isTipcAvailable():
for line in f: for line in f:
if line.startswith("tipc "): if line.startswith("tipc "):
return True return True
if support.verbose:
print("TIPC module is not loaded, please 'sudo modprobe tipc'")
return False return False
@unittest.skipUnless(isTipcAvailable(),
"TIPC module is not loaded, please 'sudo modprobe tipc'")
class TIPCTest(unittest.TestCase): class TIPCTest(unittest.TestCase):
def testRDM(self): def testRDM(self):
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM) srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
...@@ -4577,6 +4583,8 @@ class TIPCTest(unittest.TestCase): ...@@ -4577,6 +4583,8 @@ class TIPCTest(unittest.TestCase):
self.assertEqual(msg, MSG) self.assertEqual(msg, MSG)
@unittest.skipUnless(isTipcAvailable(),
"TIPC module is not loaded, please 'sudo modprobe tipc'")
class TIPCThreadableTest(unittest.TestCase, ThreadableTest): class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName = 'runTest'): def __init__(self, methodName = 'runTest'):
unittest.TestCase.__init__(self, methodName = methodName) unittest.TestCase.__init__(self, methodName = methodName)
...@@ -4842,15 +4850,10 @@ def test_main(): ...@@ -4842,15 +4850,10 @@ def test_main():
CloexecConstantTest, CloexecConstantTest,
NonblockConstantTest NonblockConstantTest
]) ])
if hasattr(socket, "socketpair"): tests.append(BasicSocketPairTest)
tests.append(BasicSocketPairTest) tests.append(TestUnixDomain)
if hasattr(socket, "AF_UNIX"): tests.append(TestLinuxAbstractNamespace)
tests.append(TestUnixDomain) tests.extend([TIPCTest, TIPCThreadableTest])
if sys.platform == 'linux':
tests.append(TestLinuxAbstractNamespace)
if isTipcAvailable():
tests.append(TIPCTest)
tests.append(TIPCThreadableTest)
tests.extend([BasicCANTest, CANTest]) tests.extend([BasicCANTest, CANTest])
tests.extend([BasicRDSTest, RDSTest]) tests.extend([BasicRDSTest, RDSTest])
tests.extend([ tests.extend([
......
...@@ -27,7 +27,10 @@ TEST_STR = b"hello world\n" ...@@ -27,7 +27,10 @@ TEST_STR = b"hello world\n"
HOST = test.support.HOST HOST = test.support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS,
'requires Unix sockets')
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2" HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
requires_forking = unittest.skipUnless(HAVE_FORKING, 'requires forking')
def signal_alarm(n): def signal_alarm(n):
"""Call signal.alarm when it exists (i.e. not on Windows).""" """Call signal.alarm when it exists (i.e. not on Windows)."""
...@@ -189,31 +192,33 @@ class SocketServerTest(unittest.TestCase): ...@@ -189,31 +192,33 @@ class SocketServerTest(unittest.TestCase):
socketserver.StreamRequestHandler, socketserver.StreamRequestHandler,
self.stream_examine) self.stream_examine)
if HAVE_FORKING: @requires_forking
def test_ForkingTCPServer(self): def test_ForkingTCPServer(self):
with simple_subprocess(self): with simple_subprocess(self):
self.run_server(socketserver.ForkingTCPServer, self.run_server(socketserver.ForkingTCPServer,
socketserver.StreamRequestHandler,
self.stream_examine)
if HAVE_UNIX_SOCKETS:
def test_UnixStreamServer(self):
self.run_server(socketserver.UnixStreamServer,
socketserver.StreamRequestHandler, socketserver.StreamRequestHandler,
self.stream_examine) self.stream_examine)
def test_ThreadingUnixStreamServer(self): @requires_unix_sockets
self.run_server(socketserver.ThreadingUnixStreamServer, def test_UnixStreamServer(self):
self.run_server(socketserver.UnixStreamServer,
socketserver.StreamRequestHandler,
self.stream_examine)
@requires_unix_sockets
def test_ThreadingUnixStreamServer(self):
self.run_server(socketserver.ThreadingUnixStreamServer,
socketserver.StreamRequestHandler,
self.stream_examine)
@requires_unix_sockets
@requires_forking
def test_ForkingUnixStreamServer(self):
with simple_subprocess(self):
self.run_server(ForkingUnixStreamServer,
socketserver.StreamRequestHandler, socketserver.StreamRequestHandler,
self.stream_examine) self.stream_examine)
if HAVE_FORKING:
def test_ForkingUnixStreamServer(self):
with simple_subprocess(self):
self.run_server(ForkingUnixStreamServer,
socketserver.StreamRequestHandler,
self.stream_examine)
def test_UDPServer(self): def test_UDPServer(self):
self.run_server(socketserver.UDPServer, self.run_server(socketserver.UDPServer,
socketserver.DatagramRequestHandler, socketserver.DatagramRequestHandler,
...@@ -224,12 +229,12 @@ class SocketServerTest(unittest.TestCase): ...@@ -224,12 +229,12 @@ class SocketServerTest(unittest.TestCase):
socketserver.DatagramRequestHandler, socketserver.DatagramRequestHandler,
self.dgram_examine) self.dgram_examine)
if HAVE_FORKING: @requires_forking
def test_ForkingUDPServer(self): def test_ForkingUDPServer(self):
with simple_subprocess(self): with simple_subprocess(self):
self.run_server(socketserver.ForkingUDPServer, self.run_server(socketserver.ForkingUDPServer,
socketserver.DatagramRequestHandler, socketserver.DatagramRequestHandler,
self.dgram_examine) self.dgram_examine)
@contextlib.contextmanager @contextlib.contextmanager
def mocked_select_module(self): def mocked_select_module(self):
...@@ -266,22 +271,24 @@ class SocketServerTest(unittest.TestCase): ...@@ -266,22 +271,24 @@ class SocketServerTest(unittest.TestCase):
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work: # client address so this cannot work:
# if HAVE_UNIX_SOCKETS: # @requires_unix_sockets
# def test_UnixDatagramServer(self): # def test_UnixDatagramServer(self):
# self.run_server(socketserver.UnixDatagramServer, # self.run_server(socketserver.UnixDatagramServer,
# socketserver.DatagramRequestHandler, # socketserver.DatagramRequestHandler,
# self.dgram_examine) # self.dgram_examine)
# #
# def test_ThreadingUnixDatagramServer(self): # @requires_unix_sockets
# self.run_server(socketserver.ThreadingUnixDatagramServer, # def test_ThreadingUnixDatagramServer(self):
# socketserver.DatagramRequestHandler, # self.run_server(socketserver.ThreadingUnixDatagramServer,
# self.dgram_examine) # socketserver.DatagramRequestHandler,
# self.dgram_examine)
# #
# if HAVE_FORKING: # @requires_unix_sockets
# def test_ForkingUnixDatagramServer(self): # @requires_forking
# self.run_server(socketserver.ForkingUnixDatagramServer, # def test_ForkingUnixDatagramServer(self):
# socketserver.DatagramRequestHandler, # self.run_server(socketserver.ForkingUnixDatagramServer,
# self.dgram_examine) # socketserver.DatagramRequestHandler,
# self.dgram_examine)
@reap_threads @reap_threads
def test_shutdown(self): def test_shutdown(self):
......
...@@ -291,15 +291,16 @@ class SysModuleTest(unittest.TestCase): ...@@ -291,15 +291,16 @@ class SysModuleTest(unittest.TestCase):
def test_call_tracing(self): def test_call_tracing(self):
self.assertRaises(TypeError, sys.call_tracing, type, 2) self.assertRaises(TypeError, sys.call_tracing, type, 2)
@unittest.skipUnless(hasattr(sys, "setdlopenflags"),
'test needs sys.setdlopenflags()')
def test_dlopenflags(self): def test_dlopenflags(self):
if hasattr(sys, "setdlopenflags"): self.assertTrue(hasattr(sys, "getdlopenflags"))
self.assertTrue(hasattr(sys, "getdlopenflags")) self.assertRaises(TypeError, sys.getdlopenflags, 42)
self.assertRaises(TypeError, sys.getdlopenflags, 42) oldflags = sys.getdlopenflags()
oldflags = sys.getdlopenflags() self.assertRaises(TypeError, sys.setdlopenflags)
self.assertRaises(TypeError, sys.setdlopenflags) sys.setdlopenflags(oldflags+1)
sys.setdlopenflags(oldflags+1) self.assertEqual(sys.getdlopenflags(), oldflags+1)
self.assertEqual(sys.getdlopenflags(), oldflags+1) sys.setdlopenflags(oldflags)
sys.setdlopenflags(oldflags)
@test.support.refcount_test @test.support.refcount_test
def test_refcount(self): def test_refcount(self):
......
...@@ -271,11 +271,10 @@ class WarnTests(BaseTest): ...@@ -271,11 +271,10 @@ class WarnTests(BaseTest):
finally: finally:
warning_tests.__file__ = filename warning_tests.__file__ = filename
@unittest.skipUnless(hasattr(sys, 'argv'), 'test needs sys.argv')
def test_missing_filename_main_with_argv(self): def test_missing_filename_main_with_argv(self):
# If __file__ is not specified and the caller is __main__ and sys.argv # If __file__ is not specified and the caller is __main__ and sys.argv
# exists, then use sys.argv[0] as the file. # exists, then use sys.argv[0] as the file.
if not hasattr(sys, 'argv'):
return
filename = warning_tests.__file__ filename = warning_tests.__file__
module_name = warning_tests.__name__ module_name = warning_tests.__name__
try: try:
......
...@@ -7,6 +7,13 @@ from test.support import bigmemtest, _1G, _4G ...@@ -7,6 +7,13 @@ from test.support import bigmemtest, _1G, _4G
zlib = support.import_module('zlib') zlib = support.import_module('zlib')
requires_Compress_copy = unittest.skipUnless(
hasattr(zlib.compressobj(), "copy"),
'requires Compress.copy()')
requires_Decompress_copy = unittest.skipUnless(
hasattr(zlib.decompressobj(), "copy"),
'requires Decompress.copy()')
class VersionTestCase(unittest.TestCase): class VersionTestCase(unittest.TestCase):
...@@ -381,39 +388,39 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): ...@@ -381,39 +388,39 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
"mode=%i, level=%i") % (sync, level)) "mode=%i, level=%i") % (sync, level))
del obj del obj
@unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
'requires zlib.Z_SYNC_FLUSH')
def test_odd_flush(self): def test_odd_flush(self):
# Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
import random import random
# Testing on 17K of "random" data
if hasattr(zlib, 'Z_SYNC_FLUSH'): # Create compressor and decompressor objects
# Testing on 17K of "random" data co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
dco = zlib.decompressobj()
# Create compressor and decompressor objects
co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
dco = zlib.decompressobj()
# Try 17K of data # Try 17K of data
# generate random data stream # generate random data stream
try:
# In 2.3 and later, WichmannHill is the RNG of the bug report
gen = random.WichmannHill()
except AttributeError:
try: try:
# In 2.3 and later, WichmannHill is the RNG of the bug report # 2.2 called it Random
gen = random.WichmannHill() gen = random.Random()
except AttributeError: except AttributeError:
try: # others might simply have a single RNG
# 2.2 called it Random gen = random
gen = random.Random() gen.seed(1)
except AttributeError: data = genblock(1, 17 * 1024, generator=gen)
# others might simply have a single RNG
gen = random # compress, sync-flush, and decompress
gen.seed(1) first = co.compress(data)
data = genblock(1, 17 * 1024, generator=gen) second = co.flush(zlib.Z_SYNC_FLUSH)
expanded = dco.decompress(first + second)
# compress, sync-flush, and decompress
first = co.compress(data) # if decompressed data is different from the input data, choke.
second = co.flush(zlib.Z_SYNC_FLUSH) self.assertEqual(expanded, data, "17K random source doesn't match")
expanded = dco.decompress(first + second)
# if decompressed data is different from the input data, choke.
self.assertEqual(expanded, data, "17K random source doesn't match")
def test_empty_flush(self): def test_empty_flush(self):
# Test that calling .flush() on unused objects works. # Test that calling .flush() on unused objects works.
...@@ -525,67 +532,69 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase): ...@@ -525,67 +532,69 @@ class CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
data = zlib.compress(input2) data = zlib.compress(input2)
self.assertEqual(dco.flush(), input1[1:]) self.assertEqual(dco.flush(), input1[1:])
if hasattr(zlib.compressobj(), "copy"): @requires_Compress_copy
def test_compresscopy(self): def test_compresscopy(self):
# Test copying a compression object # Test copying a compression object
data0 = HAMLET_SCENE data0 = HAMLET_SCENE
data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
bufs0 = [] bufs0 = []
bufs0.append(c0.compress(data0)) bufs0.append(c0.compress(data0))
c1 = c0.copy() c1 = c0.copy()
bufs1 = bufs0[:] bufs1 = bufs0[:]
bufs0.append(c0.compress(data0)) bufs0.append(c0.compress(data0))
bufs0.append(c0.flush()) bufs0.append(c0.flush())
s0 = b''.join(bufs0) s0 = b''.join(bufs0)
bufs1.append(c1.compress(data1)) bufs1.append(c1.compress(data1))
bufs1.append(c1.flush()) bufs1.append(c1.flush())
s1 = b''.join(bufs1) s1 = b''.join(bufs1)
self.assertEqual(zlib.decompress(s0),data0+data0) self.assertEqual(zlib.decompress(s0),data0+data0)
self.assertEqual(zlib.decompress(s1),data0+data1) self.assertEqual(zlib.decompress(s1),data0+data1)
def test_badcompresscopy(self): @requires_Compress_copy
# Test copying a compression object in an inconsistent state def test_badcompresscopy(self):
c = zlib.compressobj() # Test copying a compression object in an inconsistent state
c.compress(HAMLET_SCENE) c = zlib.compressobj()
c.flush() c.compress(HAMLET_SCENE)
self.assertRaises(ValueError, c.copy) c.flush()
self.assertRaises(ValueError, c.copy)
if hasattr(zlib.decompressobj(), "copy"):
def test_decompresscopy(self): @requires_Decompress_copy
# Test copying a decompression object def test_decompresscopy(self):
data = HAMLET_SCENE # Test copying a decompression object
comp = zlib.compress(data) data = HAMLET_SCENE
# Test type of return value comp = zlib.compress(data)
self.assertIsInstance(comp, bytes) # Test type of return value
self.assertIsInstance(comp, bytes)
d0 = zlib.decompressobj()
bufs0 = [] d0 = zlib.decompressobj()
bufs0.append(d0.decompress(comp[:32])) bufs0 = []
bufs0.append(d0.decompress(comp[:32]))
d1 = d0.copy()
bufs1 = bufs0[:] d1 = d0.copy()
bufs1 = bufs0[:]
bufs0.append(d0.decompress(comp[32:]))
s0 = b''.join(bufs0) bufs0.append(d0.decompress(comp[32:]))
s0 = b''.join(bufs0)
bufs1.append(d1.decompress(comp[32:]))
s1 = b''.join(bufs1) bufs1.append(d1.decompress(comp[32:]))
s1 = b''.join(bufs1)
self.assertEqual(s0,s1)
self.assertEqual(s0,data) self.assertEqual(s0,s1)
self.assertEqual(s0,data)
def test_baddecompresscopy(self):
# Test copying a compression object in an inconsistent state @requires_Decompress_copy
data = zlib.compress(HAMLET_SCENE) def test_baddecompresscopy(self):
d = zlib.decompressobj() # Test copying a compression object in an inconsistent state
d.decompress(data) data = zlib.compress(HAMLET_SCENE)
d.flush() d = zlib.decompressobj()
self.assertRaises(ValueError, d.copy) d.decompress(data)
d.flush()
self.assertRaises(ValueError, d.copy)
# Memory use of the following functions takes into account overallocation # Memory use of the following functions takes into account overallocation
......
...@@ -29,6 +29,8 @@ Library ...@@ -29,6 +29,8 @@ Library
Tests Tests
----- -----
- Issue #18702: All skipped tests now reported as skipped.
- Issue #19085: Added basic tests for all tkinter widget options. - Issue #19085: Added basic tests for all tkinter widget options.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment