Commit ea5962f8 authored by Tim Peters's avatar Tim Peters

Whitespace normalization.

parent cb637c9e
...@@ -328,8 +328,8 @@ class TCPServer(BaseServer): ...@@ -328,8 +328,8 @@ class TCPServer(BaseServer):
self.socket = socket.socket(self.address_family, self.socket = socket.socket(self.address_family,
self.socket_type) self.socket_type)
if bind_and_activate: if bind_and_activate:
self.server_bind() self.server_bind()
self.server_activate() self.server_activate()
def server_bind(self): def server_bind(self):
"""Called by constructor to bind the socket. """Called by constructor to bind the socket.
......
...@@ -84,7 +84,7 @@ def commonprefix(m): ...@@ -84,7 +84,7 @@ def commonprefix(m):
# Generic implementation of splitext, to be parametrized with # Generic implementation of splitext, to be parametrized with
# the separators # the separators
def _splitext(p, sep, altsep, extsep): def _splitext(p, sep, altsep, extsep):
"""Split the extension from a pathname. """Split the extension from a pathname.
Extension is everything from the last dot to the end, ignoring Extension is everything from the last dot to the end, ignoring
leading dots. Returns "(root, ext)"; ext may be empty.""" leading dots. Returns "(root, ext)"; ext may be empty."""
......
...@@ -766,7 +766,7 @@ class LMTP(SMTP): ...@@ -766,7 +766,7 @@ class LMTP(SMTP):
authentication, but your mileage might vary.""" authentication, but your mileage might vary."""
ehlo_msg = "lhlo" ehlo_msg = "lhlo"
def __init__(self, host = '', port = LMTP_PORT, local_hostname = None): def __init__(self, host = '', port = LMTP_PORT, local_hostname = None):
"""Initialize a new instance.""" """Initialize a new instance."""
SMTP.__init__(self, host, port, local_hostname) SMTP.__init__(self, host, port, local_hostname)
......
...@@ -596,7 +596,7 @@ class Popen(object): ...@@ -596,7 +596,7 @@ class Popen(object):
# either have to redirect all three or none. If the subprocess # either have to redirect all three or none. If the subprocess
# user has only redirected one or two handles, we are # user has only redirected one or two handles, we are
# automatically creating PIPEs for the rest. We should close # automatically creating PIPEs for the rest. We should close
# these after the process is started. See bug #1124861. # these after the process is started. See bug #1124861.
if mswindows: if mswindows:
if stdin is None and p2cwrite is not None: if stdin is None and p2cwrite is not None:
os.close(p2cwrite) os.close(p2cwrite)
......
...@@ -245,7 +245,7 @@ def test_resize_term(stdscr): ...@@ -245,7 +245,7 @@ def test_resize_term(stdscr):
if hasattr(curses, 'resizeterm'): if hasattr(curses, 'resizeterm'):
lines, cols = curses.LINES, curses.COLS lines, cols = curses.LINES, curses.COLS
curses.resizeterm(lines - 1, cols + 1) curses.resizeterm(lines - 1, cols + 1)
if curses.LINES != lines - 1 or curses.COLS != cols + 1: if curses.LINES != lines - 1 or curses.COLS != cols + 1:
raise RuntimeError, "Expected resizeterm to update LINES and COLS" raise RuntimeError, "Expected resizeterm to update LINES and COLS"
......
...@@ -1466,7 +1466,7 @@ def errors(): ...@@ -1466,7 +1466,7 @@ def errors():
>>> class A(object): >>> class A(object):
... pass ... pass
>>> class B(A, type): >>> class B(A, type):
... pass ... pass
Traceback (most recent call last): Traceback (most recent call last):
...@@ -1494,7 +1494,7 @@ def errors(): ...@@ -1494,7 +1494,7 @@ def errors():
... pass ... pass
Also check that assignment to bases is safe. Also check that assignment to bases is safe.
>>> B.__bases__ = A1, A2 >>> B.__bases__ = A1, A2
Traceback (most recent call last): Traceback (most recent call last):
TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases TypeError: metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
......
...@@ -461,12 +461,12 @@ class DictTest(unittest.TestCase): ...@@ -461,12 +461,12 @@ class DictTest(unittest.TestCase):
self.assertEqual(e.args, ((1,),)) self.assertEqual(e.args, ((1,),))
else: else:
self.fail("missing KeyError") self.fail("missing KeyError")
def test_bad_key(self): def test_bad_key(self):
# Dictionary lookups should fail if __cmp__() raises an exception. # Dictionary lookups should fail if __cmp__() raises an exception.
class CustomException(Exception): class CustomException(Exception):
pass pass
class BadDictKey: class BadDictKey:
def __hash__(self): def __hash__(self):
return hash(self.__class__) return hash(self.__class__)
...@@ -475,7 +475,7 @@ class DictTest(unittest.TestCase): ...@@ -475,7 +475,7 @@ class DictTest(unittest.TestCase):
if isinstance(other, self.__class__): if isinstance(other, self.__class__):
raise CustomException raise CustomException
return other return other
d = {} d = {}
x1 = BadDictKey() x1 = BadDictKey()
x2 = BadDictKey() x2 = BadDictKey()
...@@ -502,7 +502,7 @@ class DictTest(unittest.TestCase): ...@@ -502,7 +502,7 @@ class DictTest(unittest.TestCase):
# a mix of inserts and deletes hitting exactly the right hash codes in # a mix of inserts and deletes hitting exactly the right hash codes in
# exactly the right order, and I can't think of a randomized approach # exactly the right order, and I can't think of a randomized approach
# that would be *likely* to hit a failing case in reasonable time. # that would be *likely* to hit a failing case in reasonable time.
d = {} d = {}
for i in range(5): for i in range(5):
d[i] = i d[i] = i
...@@ -514,7 +514,7 @@ class DictTest(unittest.TestCase): ...@@ -514,7 +514,7 @@ class DictTest(unittest.TestCase):
def test_resize2(self): def test_resize2(self):
# Another dict resizing bug (SF bug #1456209). # Another dict resizing bug (SF bug #1456209).
# This caused Segmentation faults or Illegal instructions. # This caused Segmentation faults or Illegal instructions.
class X(object): class X(object):
def __hash__(self): def __hash__(self):
return 5 return 5
......
...@@ -192,11 +192,11 @@ class ImportTest(unittest.TestCase): ...@@ -192,11 +192,11 @@ class ImportTest(unittest.TestCase):
remove_files(TESTFN) remove_files(TESTFN)
if TESTFN in sys.modules: if TESTFN in sys.modules:
del sys.modules[TESTFN] del sys.modules[TESTFN]
def test_infinite_reload(self): def test_infinite_reload(self):
# Bug #742342 reports that Python segfaults (infinite recursion in C) # Bug #742342 reports that Python segfaults (infinite recursion in C)
# when faced with self-recursive reload()ing. # when faced with self-recursive reload()ing.
sys.path.insert(0, os.path.dirname(__file__)) sys.path.insert(0, os.path.dirname(__file__))
try: try:
import infinite_reload import infinite_reload
......
...@@ -211,20 +211,20 @@ class TestBasicOps(unittest.TestCase): ...@@ -211,20 +211,20 @@ class TestBasicOps(unittest.TestCase):
self.assertEqual(list(izip_longest(*args, **{})), target) self.assertEqual(list(izip_longest(*args, **{})), target)
target = [tuple((e is None and 'X' or e) for e in t) for t in target] # Replace None fills with 'X' target = [tuple((e is None and 'X' or e) for e in t) for t in target] # Replace None fills with 'X'
self.assertEqual(list(izip_longest(*args, **dict(fillvalue='X'))), target) self.assertEqual(list(izip_longest(*args, **dict(fillvalue='X'))), target)
self.assertEqual(take(3,izip_longest('abcdef', count())), zip('abcdef', range(3))) # take 3 from infinite input self.assertEqual(take(3,izip_longest('abcdef', count())), zip('abcdef', range(3))) # take 3 from infinite input
self.assertEqual(list(izip_longest()), zip()) self.assertEqual(list(izip_longest()), zip())
self.assertEqual(list(izip_longest([])), zip([])) self.assertEqual(list(izip_longest([])), zip([]))
self.assertEqual(list(izip_longest('abcdef')), zip('abcdef')) self.assertEqual(list(izip_longest('abcdef')), zip('abcdef'))
self.assertEqual(list(izip_longest('abc', 'defg', **{})), map(None, 'abc', 'defg')) # empty keyword dict self.assertEqual(list(izip_longest('abc', 'defg', **{})), map(None, 'abc', 'defg')) # empty keyword dict
self.assertRaises(TypeError, izip_longest, 3) self.assertRaises(TypeError, izip_longest, 3)
self.assertRaises(TypeError, izip_longest, range(3), 3) self.assertRaises(TypeError, izip_longest, range(3), 3)
for stmt in [ for stmt in [
"izip_longest('abc', fv=1)", "izip_longest('abc', fv=1)",
"izip_longest('abc', fillvalue=1, bogus_keyword=None)", "izip_longest('abc', fillvalue=1, bogus_keyword=None)",
]: ]:
try: try:
eval(stmt, globals(), locals()) eval(stmt, globals(), locals())
...@@ -232,7 +232,7 @@ class TestBasicOps(unittest.TestCase): ...@@ -232,7 +232,7 @@ class TestBasicOps(unittest.TestCase):
pass pass
else: else:
self.fail('Did not raise Type in: ' + stmt) self.fail('Did not raise Type in: ' + stmt)
# Check tuple re-use (implementation detail) # Check tuple re-use (implementation detail)
self.assertEqual([tuple(list(pair)) for pair in izip_longest('abc', 'def')], self.assertEqual([tuple(list(pair)) for pair in izip_longest('abc', 'def')],
zip('abc', 'def')) zip('abc', 'def'))
......
...@@ -24,7 +24,7 @@ class PosixPathTest(unittest.TestCase): ...@@ -24,7 +24,7 @@ class PosixPathTest(unittest.TestCase):
for suffix in ["", "1", "2"]: for suffix in ["", "1", "2"]:
test_support.unlink(test_support.TESTFN + suffix) test_support.unlink(test_support.TESTFN + suffix)
safe_rmdir(test_support.TESTFN + suffix) safe_rmdir(test_support.TESTFN + suffix)
def assertIs(self, a, b): def assertIs(self, a, b):
self.assert_(a is b) self.assert_(a is b)
...@@ -161,7 +161,7 @@ class PosixPathTest(unittest.TestCase): ...@@ -161,7 +161,7 @@ class PosixPathTest(unittest.TestCase):
if not f.closed: if not f.closed:
f.close() f.close()
def test_islink(self): def test_islink(self):
self.assertIs(posixpath.islink(test_support.TESTFN + "1"), False) self.assertIs(posixpath.islink(test_support.TESTFN + "1"), False)
f = open(test_support.TESTFN + "1", "wb") f = open(test_support.TESTFN + "1", "wb")
try: try:
......
...@@ -41,19 +41,19 @@ def normalize_output(data): ...@@ -41,19 +41,19 @@ def normalize_output(data):
# because pty code is not too portable. # because pty code is not too portable.
class PtyTest(unittest.TestCase): class PtyTest(unittest.TestCase):
def setUp(self): def setUp(self):
# isatty() and close() can hang on some platforms. Set an alarm # isatty() and close() can hang on some platforms. Set an alarm
# before running the test to make sure we don't hang forever. # before running the test to make sure we don't hang forever.
self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig) self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
signal.alarm(10) signal.alarm(10)
def tearDown(self): def tearDown(self):
# remove alarm, restore old alarm handler # remove alarm, restore old alarm handler
signal.alarm(0) signal.alarm(0)
signal.signal(signal.SIGALRM, self.old_alarm) signal.signal(signal.SIGALRM, self.old_alarm)
def handle_sig(self, sig, frame): def handle_sig(self, sig, frame):
self.fail("isatty hung") self.fail("isatty hung")
def test_basic(self): def test_basic(self):
try: try:
debug("Calling master_open()") debug("Calling master_open()")
...@@ -68,19 +68,19 @@ class PtyTest(unittest.TestCase): ...@@ -68,19 +68,19 @@ class PtyTest(unittest.TestCase):
raise TestSkipped, "Pseudo-terminals (seemingly) not functional." raise TestSkipped, "Pseudo-terminals (seemingly) not functional."
self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty') self.assertTrue(os.isatty(slave_fd), 'slave_fd is not a tty')
debug("Writing to slave_fd") debug("Writing to slave_fd")
os.write(slave_fd, TEST_STRING_1) os.write(slave_fd, TEST_STRING_1)
s1 = os.read(master_fd, 1024) s1 = os.read(master_fd, 1024)
self.assertEquals('I wish to buy a fish license.\n', self.assertEquals('I wish to buy a fish license.\n',
normalize_output(s1)) normalize_output(s1))
debug("Writing chunked output") debug("Writing chunked output")
os.write(slave_fd, TEST_STRING_2[:5]) os.write(slave_fd, TEST_STRING_2[:5])
os.write(slave_fd, TEST_STRING_2[5:]) os.write(slave_fd, TEST_STRING_2[5:])
s2 = os.read(master_fd, 1024) s2 = os.read(master_fd, 1024)
self.assertEquals('For my pet fish, Eric.\n', normalize_output(s2)) self.assertEquals('For my pet fish, Eric.\n', normalize_output(s2))
os.close(slave_fd) os.close(slave_fd)
os.close(master_fd) os.close(master_fd)
...@@ -93,7 +93,7 @@ class PtyTest(unittest.TestCase): ...@@ -93,7 +93,7 @@ class PtyTest(unittest.TestCase):
if not os.isatty(1): if not os.isatty(1):
debug("Child's fd 1 is not a tty?!") debug("Child's fd 1 is not a tty?!")
os._exit(3) os._exit(3)
# After pty.fork(), the child should already be a session leader. # After pty.fork(), the child should already be a session leader.
# (on those systems that have that concept.) # (on those systems that have that concept.)
debug("In child, calling os.setsid()") debug("In child, calling os.setsid()")
...@@ -125,7 +125,7 @@ class PtyTest(unittest.TestCase): ...@@ -125,7 +125,7 @@ class PtyTest(unittest.TestCase):
##if False and lines != ['In child, calling os.setsid()', ##if False and lines != ['In child, calling os.setsid()',
## 'Good: OSError was raised.', '']: ## 'Good: OSError was raised.', '']:
## raise TestFailed("Unexpected output from child: %r" % line) ## raise TestFailed("Unexpected output from child: %r" % line)
(pid, status) = os.waitpid(pid, 0) (pid, status) = os.waitpid(pid, 0)
res = status >> 8 res = status >> 8
debug("Child (%d) exited with status %d (%d)." % (pid, res, status)) debug("Child (%d) exited with status %d (%d)." % (pid, res, status))
...@@ -137,7 +137,7 @@ class PtyTest(unittest.TestCase): ...@@ -137,7 +137,7 @@ class PtyTest(unittest.TestCase):
self.fail("Child spawned by pty.fork() did not have a tty as stdout") self.fail("Child spawned by pty.fork() did not have a tty as stdout")
elif res != 4: elif res != 4:
self.fail("pty.fork() failed for unknown reasons.") self.fail("pty.fork() failed for unknown reasons.")
##debug("Reading from master_fd now that the child has exited") ##debug("Reading from master_fd now that the child has exited")
##try: ##try:
## s1 = os.read(master_fd, 1024) ## s1 = os.read(master_fd, 1024)
...@@ -145,9 +145,9 @@ class PtyTest(unittest.TestCase): ...@@ -145,9 +145,9 @@ class PtyTest(unittest.TestCase):
## pass ## pass
##else: ##else:
## raise TestFailed("Read from master_fd did not raise exception") ## raise TestFailed("Read from master_fd did not raise exception")
os.close(master_fd) os.close(master_fd)
# pty.fork() passed. # pty.fork() passed.
def test_main(verbose=None): def test_main(verbose=None):
......
...@@ -252,7 +252,7 @@ def test_1463026_3(): ...@@ -252,7 +252,7 @@ def test_1463026_3():
gen.endDocument() gen.endDocument()
return result.getvalue() == start+'<my:a xmlns:my="qux" b="c"></my:a>' return result.getvalue() == start+'<my:a xmlns:my="qux" b="c"></my:a>'
# ===== Xmlfilterbase # ===== Xmlfilterbase
def test_filter_basic(): def test_filter_basic():
......
...@@ -285,10 +285,10 @@ class TestJointOps(unittest.TestCase): ...@@ -285,10 +285,10 @@ class TestJointOps(unittest.TestCase):
s = self.thetype(d) s = self.thetype(d)
self.assertEqual(sum(elem.hash_count for elem in d), n) self.assertEqual(sum(elem.hash_count for elem in d), n)
s.difference(d) s.difference(d)
self.assertEqual(sum(elem.hash_count for elem in d), n) self.assertEqual(sum(elem.hash_count for elem in d), n)
if hasattr(s, 'symmetric_difference_update'): if hasattr(s, 'symmetric_difference_update'):
s.symmetric_difference_update(d) s.symmetric_difference_update(d)
self.assertEqual(sum(elem.hash_count for elem in d), n) self.assertEqual(sum(elem.hash_count for elem in d), n)
class TestSet(TestJointOps): class TestSet(TestJointOps):
thetype = set thetype = set
......
...@@ -49,14 +49,14 @@ class ThreadedTempFileTest(unittest.TestCase): ...@@ -49,14 +49,14 @@ class ThreadedTempFileTest(unittest.TestCase):
def test_main(self): def test_main(self):
threads = [] threads = []
thread_info = threading_setup() thread_info = threading_setup()
for i in range(NUM_THREADS): for i in range(NUM_THREADS):
t = TempFileGreedy() t = TempFileGreedy()
threads.append(t) threads.append(t)
t.start() t.start()
startEvent.set() startEvent.set()
ok = 0 ok = 0
errors = [] errors = []
for t in threads: for t in threads:
...@@ -66,8 +66,8 @@ class ThreadedTempFileTest(unittest.TestCase): ...@@ -66,8 +66,8 @@ class ThreadedTempFileTest(unittest.TestCase):
errors.append(str(t.getName()) + str(t.errors.getvalue())) errors.append(str(t.getName()) + str(t.errors.getvalue()))
threading_cleanup(*thread_info) threading_cleanup(*thread_info)
msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok, msg = "Errors: errors %d ok %d\n%s" % (len(errors), ok,
'\n'.join(errors)) '\n'.join(errors))
self.assertEquals(errors, [], msg) self.assertEquals(errors, [], msg)
self.assertEquals(ok, NUM_THREADS * FILES_PER_THREAD) self.assertEquals(ok, NUM_THREADS * FILES_PER_THREAD)
......
This diff is collapsed.
...@@ -117,12 +117,12 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -117,12 +117,12 @@ class TestsWithSourceFile(unittest.TestCase):
if not read_data: if not read_data:
break break
zipdata2.append(read_data) zipdata2.append(read_data)
self.assertEqual(''.join(zipdata1), self.data) self.assertEqual(''.join(zipdata1), self.data)
self.assertEqual(''.join(zipdata2), self.data) self.assertEqual(''.join(zipdata2), self.data)
zipfp.close() zipfp.close()
def testOpenStored(self): def testOpenStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipOpenTest(f, zipfile.ZIP_STORED) self.zipOpenTest(f, zipfile.ZIP_STORED)
...@@ -141,11 +141,11 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -141,11 +141,11 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(''.join(zipdata1), self.data) self.assertEqual(''.join(zipdata1), self.data)
zipfp.close() zipfp.close()
def testRandomOpenStored(self): def testRandomOpenStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipRandomOpenTest(f, zipfile.ZIP_STORED) self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
def zipReadlineTest(self, f, compression): def zipReadlineTest(self, f, compression):
self.makeTestArchive(f, compression) self.makeTestArchive(f, compression)
...@@ -178,16 +178,16 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -178,16 +178,16 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + '\n')
zipfp.close() zipfp.close()
def testReadlineStored(self): def testReadlineStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipReadlineTest(f, zipfile.ZIP_STORED) self.zipReadlineTest(f, zipfile.ZIP_STORED)
def testReadlinesStored(self): def testReadlinesStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipReadlinesTest(f, zipfile.ZIP_STORED) self.zipReadlinesTest(f, zipfile.ZIP_STORED)
def testIterlinesStored(self): def testIterlinesStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipIterlinesTest(f, zipfile.ZIP_STORED) self.zipIterlinesTest(f, zipfile.ZIP_STORED)
...@@ -204,18 +204,18 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -204,18 +204,18 @@ class TestsWithSourceFile(unittest.TestCase):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED) self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED)
def testReadlineDeflated(self): def testReadlineDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipReadlineTest(f, zipfile.ZIP_DEFLATED) self.zipReadlineTest(f, zipfile.ZIP_DEFLATED)
def testReadlinesDeflated(self): def testReadlinesDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED) self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED)
def testIterlinesDeflated(self): def testIterlinesDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED) self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED)
def testLowCompression(self): def testLowCompression(self):
# Checks for cases where compressed data is larger than original # Checks for cases where compressed data is larger than original
# Create the ZIP archive # Create the ZIP archive
...@@ -437,10 +437,10 @@ class OtherTests(unittest.TestCase): ...@@ -437,10 +437,10 @@ class OtherTests(unittest.TestCase):
def testCreateNonExistentFileForAppend(self): def testCreateNonExistentFileForAppend(self):
if os.path.exists(TESTFN): if os.path.exists(TESTFN):
os.unlink(TESTFN) os.unlink(TESTFN)
filename = 'testfile.txt' filename = 'testfile.txt'
content = 'hello, world. this is some content.' content = 'hello, world. this is some content.'
try: try:
zf = zipfile.ZipFile(TESTFN, 'a') zf = zipfile.ZipFile(TESTFN, 'a')
zf.writestr(filename, content) zf.writestr(filename, content)
...@@ -453,9 +453,9 @@ class OtherTests(unittest.TestCase): ...@@ -453,9 +453,9 @@ class OtherTests(unittest.TestCase):
zf = zipfile.ZipFile(TESTFN, 'r') zf = zipfile.ZipFile(TESTFN, 'r')
self.assertEqual(zf.read(filename), content) self.assertEqual(zf.read(filename), content)
zf.close() zf.close()
os.unlink(TESTFN) os.unlink(TESTFN)
def testCloseErroneousFile(self): def testCloseErroneousFile(self):
# This test checks that the ZipFile constructor closes the file object # This test checks that the ZipFile constructor closes the file object
# it opens if there's an error in the file. If it doesn't, the traceback # it opens if there's an error in the file. If it doesn't, the traceback
...@@ -472,24 +472,24 @@ class OtherTests(unittest.TestCase): ...@@ -472,24 +472,24 @@ class OtherTests(unittest.TestCase):
os.unlink(TESTFN) os.unlink(TESTFN)
def testIsZipErroneousFile(self): def testIsZipErroneousFile(self):
# This test checks that the is_zipfile function correctly identifies # This test checks that the is_zipfile function correctly identifies
# a file that is not a zip file # a file that is not a zip file
fp = open(TESTFN, "w") fp = open(TESTFN, "w")
fp.write("this is not a legal zip file\n") fp.write("this is not a legal zip file\n")
fp.close() fp.close()
chk = zipfile.is_zipfile(TESTFN) chk = zipfile.is_zipfile(TESTFN)
os.unlink(TESTFN) os.unlink(TESTFN)
self.assert_(chk is False) self.assert_(chk is False)
def testIsZipValidFile(self): def testIsZipValidFile(self):
# This test checks that the is_zipfile function correctly identifies # This test checks that the is_zipfile function correctly identifies
# a file that is a zip file # a file that is a zip file
zipf = zipfile.ZipFile(TESTFN, mode="w") zipf = zipfile.ZipFile(TESTFN, mode="w")
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close() zipf.close()
chk = zipfile.is_zipfile(TESTFN) chk = zipfile.is_zipfile(TESTFN)
os.unlink(TESTFN) os.unlink(TESTFN)
self.assert_(chk is True) self.assert_(chk is True)
def testNonExistentFileRaisesIOError(self): def testNonExistentFileRaisesIOError(self):
# make sure we don't raise an AttributeError when a partially-constructed # make sure we don't raise an AttributeError when a partially-constructed
...@@ -552,7 +552,7 @@ class DecryptionTests(unittest.TestCase): ...@@ -552,7 +552,7 @@ class DecryptionTests(unittest.TestCase):
def testBadPassword(self): def testBadPassword(self):
self.zip.setpassword("perl") self.zip.setpassword("perl")
self.assertRaises(RuntimeError, self.zip.read, "test.txt") self.assertRaises(RuntimeError, self.zip.read, "test.txt")
def testGoodPassword(self): def testGoodPassword(self):
self.zip.setpassword("python") self.zip.setpassword("python")
self.assertEquals(self.zip.read("test.txt"), self.plain) self.assertEquals(self.zip.read("test.txt"), self.plain)
...@@ -589,7 +589,7 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -589,7 +589,7 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
def testStored(self): def testStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipTest(f, zipfile.ZIP_STORED) self.zipTest(f, zipfile.ZIP_STORED)
def zipOpenTest(self, f, compression): def zipOpenTest(self, f, compression):
self.makeTestArchive(f, compression) self.makeTestArchive(f, compression)
...@@ -610,17 +610,17 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -610,17 +610,17 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
if not read_data: if not read_data:
break break
zipdata2.append(read_data) zipdata2.append(read_data)
testdata1 = ''.join(zipdata1) testdata1 = ''.join(zipdata1)
self.assertEqual(len(testdata1), len(self.data)) self.assertEqual(len(testdata1), len(self.data))
self.assertEqual(testdata1, self.data) self.assertEqual(testdata1, self.data)
testdata2 = ''.join(zipdata2) testdata2 = ''.join(zipdata2)
self.assertEqual(len(testdata1), len(self.data)) self.assertEqual(len(testdata1), len(self.data))
self.assertEqual(testdata1, self.data) self.assertEqual(testdata1, self.data)
zipfp.close() zipfp.close()
def testOpenStored(self): def testOpenStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipOpenTest(f, zipfile.ZIP_STORED) self.zipOpenTest(f, zipfile.ZIP_STORED)
...@@ -641,8 +641,8 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -641,8 +641,8 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
self.assertEqual(len(testdata), len(self.data)) self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data) self.assertEqual(testdata, self.data)
zipfp.close() zipfp.close()
def testRandomOpenStored(self): def testRandomOpenStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.zipRandomOpenTest(f, zipfile.ZIP_STORED) self.zipRandomOpenTest(f, zipfile.ZIP_STORED)
...@@ -653,7 +653,7 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -653,7 +653,7 @@ class TestsWithMultipleOpens(unittest.TestCase):
zipfp.writestr('ones', '1'*FIXEDTEST_SIZE) zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
zipfp.writestr('twos', '2'*FIXEDTEST_SIZE) zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
zipfp.close() zipfp.close()
def testSameFile(self): def testSameFile(self):
# Verify that (when the ZipFile is in control of creating file objects) # Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other. # multiple open() calls can be made without interfering with each other.
...@@ -694,10 +694,10 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -694,10 +694,10 @@ class TestsWithMultipleOpens(unittest.TestCase):
self.assertEqual(data1, '1'*FIXEDTEST_SIZE) self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
self.assertEqual(data2, '2'*FIXEDTEST_SIZE) self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
zipf.close() zipf.close()
def tearDown(self): def tearDown(self):
os.remove(TESTFN2) os.remove(TESTFN2)
class UniversalNewlineTests(unittest.TestCase): class UniversalNewlineTests(unittest.TestCase):
def setUp(self): def setUp(self):
...@@ -726,7 +726,7 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -726,7 +726,7 @@ class UniversalNewlineTests(unittest.TestCase):
self.assertEqual(self.arcdata[sep], zipdata) self.assertEqual(self.arcdata[sep], zipdata)
zipfp.close() zipfp.close()
def readlineTest(self, f, compression): def readlineTest(self, f, compression):
self.makeTestArchive(f, compression) self.makeTestArchive(f, compression)
...@@ -763,36 +763,36 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -763,36 +763,36 @@ class UniversalNewlineTests(unittest.TestCase):
zipfp.close() zipfp.close()
def testReadStored(self): def testReadStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.readTest(f, zipfile.ZIP_STORED) self.readTest(f, zipfile.ZIP_STORED)
def testReadlineStored(self): def testReadlineStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.readlineTest(f, zipfile.ZIP_STORED) self.readlineTest(f, zipfile.ZIP_STORED)
def testReadlinesStored(self): def testReadlinesStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.readlinesTest(f, zipfile.ZIP_STORED) self.readlinesTest(f, zipfile.ZIP_STORED)
def testIterlinesStored(self): def testIterlinesStored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.iterlinesTest(f, zipfile.ZIP_STORED) self.iterlinesTest(f, zipfile.ZIP_STORED)
if zlib: if zlib:
def testReadDeflated(self): def testReadDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.readTest(f, zipfile.ZIP_DEFLATED) self.readTest(f, zipfile.ZIP_DEFLATED)
def testReadlineDeflated(self): def testReadlineDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.readlineTest(f, zipfile.ZIP_DEFLATED) self.readlineTest(f, zipfile.ZIP_DEFLATED)
def testReadlinesDeflated(self): def testReadlinesDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.readlinesTest(f, zipfile.ZIP_DEFLATED) self.readlinesTest(f, zipfile.ZIP_DEFLATED)
def testIterlinesDeflated(self): def testIterlinesDeflated(self):
for f in (TESTFN2, TemporaryFile(), StringIO()): for f in (TESTFN2, TemporaryFile(), StringIO()):
self.iterlinesTest(f, zipfile.ZIP_DEFLATED) self.iterlinesTest(f, zipfile.ZIP_DEFLATED)
...@@ -802,8 +802,8 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -802,8 +802,8 @@ class UniversalNewlineTests(unittest.TestCase):
def test_main(): def test_main():
run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests, run_unittest(TestsWithSourceFile, TestZip64InSmallFiles, OtherTests,
PyZipFileTests, DecryptionTests, TestsWithMultipleOpens, PyZipFileTests, DecryptionTests, TestsWithMultipleOpens,
UniversalNewlineTests, TestsWithRandomBinaryFiles) UniversalNewlineTests, TestsWithRandomBinaryFiles)
#run_unittest(TestZip64InSmallFiles) #run_unittest(TestZip64InSmallFiles)
......
...@@ -30,7 +30,7 @@ handler, the argument will be installed instead of the default. ...@@ -30,7 +30,7 @@ handler, the argument will be installed instead of the default.
install_opener -- Installs a new opener as the default opener. install_opener -- Installs a new opener as the default opener.
objects of interest: objects of interest:
OpenerDirector -- OpenerDirector --
Request -- An object that encapsulates the state of a request. The Request -- An object that encapsulates the state of a request. The
state can be as simple as the URL. It can also include extra HTTP state can be as simple as the URL. It can also include extra HTTP
......
...@@ -357,9 +357,9 @@ class _ZipDecrypter: ...@@ -357,9 +357,9 @@ class _ZipDecrypter:
class ZipExtFile: class ZipExtFile:
"""File-like object for reading an archive member. """File-like object for reading an archive member.
Is returned by ZipFile.open(). Is returned by ZipFile.open().
""" """
def __init__(self, fileobj, zipinfo, decrypt=None): def __init__(self, fileobj, zipinfo, decrypt=None):
self.fileobj = fileobj self.fileobj = fileobj
self.decrypter = decrypt self.decrypter = decrypt
...@@ -374,7 +374,7 @@ class ZipExtFile: ...@@ -374,7 +374,7 @@ class ZipExtFile:
self.compress_type = zipinfo.compress_type self.compress_type = zipinfo.compress_type
self.compress_size = zipinfo.compress_size self.compress_size = zipinfo.compress_size
self.closed = False self.closed = False
self.mode = "r" self.mode = "r"
self.name = zipinfo.filename self.name = zipinfo.filename
...@@ -386,7 +386,7 @@ class ZipExtFile: ...@@ -386,7 +386,7 @@ class ZipExtFile:
def set_univ_newlines(self, univ_newlines): def set_univ_newlines(self, univ_newlines):
self.univ_newlines = univ_newlines self.univ_newlines = univ_newlines
# pick line separator char(s) based on universal newlines flag # pick line separator char(s) based on universal newlines flag
self.nlSeps = ("\n", ) self.nlSeps = ("\n", )
if self.univ_newlines: if self.univ_newlines:
...@@ -394,7 +394,7 @@ class ZipExtFile: ...@@ -394,7 +394,7 @@ class ZipExtFile:
def __iter__(self): def __iter__(self):
return self return self
def next(self): def next(self):
nextline = self.readline() nextline = self.readline()
if not nextline: if not nextline:
...@@ -414,17 +414,17 @@ class ZipExtFile: ...@@ -414,17 +414,17 @@ class ZipExtFile:
if (self.lastdiscard, self.linebuffer[0]) == ('\r','\n'): if (self.lastdiscard, self.linebuffer[0]) == ('\r','\n'):
self.linebuffer = self.linebuffer[1:] self.linebuffer = self.linebuffer[1:]
for sep in self.nlSeps: for sep in self.nlSeps:
nl = self.linebuffer.find(sep) nl = self.linebuffer.find(sep)
if nl >= 0: if nl >= 0:
nllen = len(sep) nllen = len(sep)
return nl, nllen return nl, nllen
return nl, nllen return nl, nllen
def readline(self, size = -1): def readline(self, size = -1):
"""Read a line with approx. size. If size is negative, """Read a line with approx. size. If size is negative,
read a whole line. read a whole line.
""" """
if size < 0: if size < 0:
size = sys.maxint size = sys.maxint
...@@ -433,7 +433,7 @@ class ZipExtFile: ...@@ -433,7 +433,7 @@ class ZipExtFile:
# check for a newline already in buffer # check for a newline already in buffer
nl, nllen = self._checkfornewline() nl, nllen = self._checkfornewline()
if nl >= 0: if nl >= 0:
# the next line was already in the buffer # the next line was already in the buffer
nl = min(nl, size) nl = min(nl, size)
...@@ -449,7 +449,7 @@ class ZipExtFile: ...@@ -449,7 +449,7 @@ class ZipExtFile:
# check for a newline in buffer # check for a newline in buffer
nl, nllen = self._checkfornewline() nl, nllen = self._checkfornewline()
# we either ran out of bytes in the file, or # we either ran out of bytes in the file, or
# met the specified size limit without finding a newline, # met the specified size limit without finding a newline,
# so return current buffer # so return current buffer
...@@ -528,8 +528,8 @@ class ZipExtFile: ...@@ -528,8 +528,8 @@ class ZipExtFile:
newdata = self.dc.decompress(newdata) newdata = self.dc.decompress(newdata)
self.rawbuffer = self.dc.unconsumed_tail self.rawbuffer = self.dc.unconsumed_tail
if self.eof and len(self.rawbuffer) == 0: if self.eof and len(self.rawbuffer) == 0:
# we're out of raw bytes (both from the file and # we're out of raw bytes (both from the file and
# the local buffer); flush just to make sure the # the local buffer); flush just to make sure the
# decompressor is done # decompressor is done
newdata += self.dc.flush() newdata += self.dc.flush()
# prevent decompressor from being used again # prevent decompressor from being used again
...@@ -547,7 +547,7 @@ class ZipExtFile: ...@@ -547,7 +547,7 @@ class ZipExtFile:
self.readbuffer = self.readbuffer[size:] self.readbuffer = self.readbuffer[size:]
return bytes return bytes
class ZipFile: class ZipFile:
""" Class with methods to open, read, write, close, list zip files. """ Class with methods to open, read, write, close, list zip files.
...@@ -738,7 +738,7 @@ class ZipFile: ...@@ -738,7 +738,7 @@ class ZipFile:
raise RuntimeError, \ raise RuntimeError, \
"Attempt to read ZIP archive that was already closed" "Attempt to read ZIP archive that was already closed"
# Only open a new file for instances where we were not # Only open a new file for instances where we were not
# given a file object in the constructor # given a file object in the constructor
if self._filePassed: if self._filePassed:
zef_file = self.fp zef_file = self.fp
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment