Commit faa6b7f4 authored by Ezio Melotti's avatar Ezio Melotti

Merged revisions 77136 via svnmerge from

svn+ssh://pythondev@svn.python.org/python/trunk

........
  r77136 | ezio.melotti | 2009-12-30 08:14:51 +0200 (Wed, 30 Dec 2009) | 1 line

  #5511: Added the ability to use ZipFile as a context manager. Patch by Brian Curtin.
........
parent 588009e9
...@@ -102,25 +102,32 @@ ZipFile Objects ...@@ -102,25 +102,32 @@ ZipFile Objects
Open a ZIP file, where *file* can be either a path to a file (a string) or a Open a ZIP file, where *file* can be either a path to a file (a string) or a
file-like object. The *mode* parameter should be ``'r'`` to read an existing file-like object. The *mode* parameter should be ``'r'`` to read an existing
file, ``'w'`` to truncate and write a new file, or ``'a'`` to append to an file, ``'w'`` to truncate and write a new file, or ``'a'`` to append to an
existing file. If *mode* is ``'a'`` and *file* refers to an existing ZIP file, existing file. If *mode* is ``'a'`` and *file* refers to an existing ZIP
then additional files are added to it. If *file* does not refer to a ZIP file, file, then additional files are added to it. If *file* does not refer to a
then a new ZIP archive is appended to the file. This is meant for adding a ZIP ZIP file, then a new ZIP archive is appended to the file. This is meant for
archive to another file, such as :file:`python.exe`. Using :: adding a ZIP archive to another file (such as :file:`python.exe`). If
*mode* is ``a`` and the file does not exist at all, it is created.
cat myzip.zip >> python.exe *compression* is the ZIP compression method to use when writing the archive,
and should be :const:`ZIP_STORED` or :const:`ZIP_DEFLATED`; unrecognized
also works, and at least :program:`WinZip` can read such files. If *mode* is values will cause :exc:`RuntimeError` to be raised. If :const:`ZIP_DEFLATED`
``a`` and the file does not exist at all, it is created. *compression* is the is specified but the :mod:`zlib` module is not available, :exc:`RuntimeError`
ZIP compression method to use when writing the archive, and should be is also raised. The default is :const:`ZIP_STORED`. If *allowZip64* is
:const:`ZIP_STORED` or :const:`ZIP_DEFLATED`; unrecognized values will cause ``True`` zipfile will create ZIP files that use the ZIP64 extensions when
:exc:`RuntimeError` to be raised. If :const:`ZIP_DEFLATED` is specified but the the zipfile is larger than 2 GB. If it is false (the default) :mod:`zipfile`
:mod:`zlib` module is not available, :exc:`RuntimeError` is also raised. The will raise an exception when the ZIP file would require ZIP64 extensions.
default is :const:`ZIP_STORED`. If *allowZip64* is ``True`` zipfile will create ZIP64 extensions are disabled by default because the default :program:`zip`
ZIP files that use the ZIP64 extensions when the zipfile is larger than 2 GB. If and :program:`unzip` commands on Unix (the InfoZIP utilities) don't support
it is false (the default) :mod:`zipfile` will raise an exception when the ZIP these extensions.
file would require ZIP64 extensions. ZIP64 extensions are disabled by default
because the default :program:`zip` and :program:`unzip` commands on Unix (the ZipFile is also a context manager and therefore supports the
InfoZIP utilities) don't support these extensions. :keyword:`with` statement. In the example, *myzip* is closed after the
:keyword:`with` statement's suite is finished---even if an exception occurs::
with ZipFile('spam.zip', 'w') as myzip:
myzip.write('eggs.txt')
.. versionadded:: 3.2
Added the ability to use :class:`ZipFile` as a context manager.
.. method:: ZipFile.close() .. method:: ZipFile.close()
......
...@@ -42,17 +42,16 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -42,17 +42,16 @@ class TestsWithSourceFile(unittest.TestCase):
def make_test_archive(self, f, compression): def make_test_archive(self, f, compression):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
zipfp.write(TESTFN, "another.name") zipfp.write(TESTFN, "another.name")
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.writestr("strfile", self.data) zipfp.writestr("strfile", self.data)
zipfp.close()
def zip_test(self, f, compression): def zip_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
self.assertEqual(zipfp.read(TESTFN), self.data) self.assertEqual(zipfp.read(TESTFN), self.data)
self.assertEqual(zipfp.read("another.name"), self.data) self.assertEqual(zipfp.read("another.name"), self.data)
self.assertEqual(zipfp.read("strfile"), self.data) self.assertEqual(zipfp.read("strfile"), self.data)
...@@ -98,7 +97,6 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -98,7 +97,6 @@ class TestsWithSourceFile(unittest.TestCase):
# Check that testzip doesn't raise an exception # Check that testzip doesn't raise an exception
zipfp.testzip() zipfp.testzip()
zipfp.close()
def test_stored(self): def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
...@@ -108,10 +106,10 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -108,10 +106,10 @@ class TestsWithSourceFile(unittest.TestCase):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) zipopen1 = zipfp.open(TESTFN)
while 1: while True:
read_data = zipopen1.read(256) read_data = zipopen1.read(256)
if not read_data: if not read_data:
break break
...@@ -119,7 +117,7 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -119,7 +117,7 @@ class TestsWithSourceFile(unittest.TestCase):
zipdata2 = [] zipdata2 = []
zipopen2 = zipfp.open("another.name") zipopen2 = zipfp.open("another.name")
while 1: while True:
read_data = zipopen2.read(256) read_data = zipopen2.read(256)
if not read_data: if not read_data:
break break
...@@ -127,7 +125,6 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -127,7 +125,6 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(b''.join(zipdata1), self.data) self.assertEqual(b''.join(zipdata1), self.data)
self.assertEqual(b''.join(zipdata2), self.data) self.assertEqual(b''.join(zipdata2), self.data)
zipfp.close()
def test_open_stored(self): def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
...@@ -135,12 +132,11 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -135,12 +132,11 @@ class TestsWithSourceFile(unittest.TestCase):
def test_open_via_zip_info(self): def test_open_via_zip_info(self):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.writestr("name", "foo") zipfp.writestr("name", "foo")
zipfp.writestr("name", "bar") zipfp.writestr("name", "bar")
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "r") with zipfile.ZipFile(TESTFN2, "r") as zipfp:
infos = zipfp.infolist() infos = zipfp.infolist()
data = b"" data = b""
for info in infos: for info in infos:
...@@ -150,23 +146,22 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -150,23 +146,22 @@ class TestsWithSourceFile(unittest.TestCase):
for info in infos: for info in infos:
data += zipfp.read(info) data += zipfp.read(info)
self.assertTrue(data == b"foobar" or data == b"barfoo") self.assertTrue(data == b"foobar" or data == b"barfoo")
zipfp.close()
def zip_random_open_test(self, f, compression): def zip_random_open_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) zipopen1 = zipfp.open(TESTFN)
while 1: while True:
read_data = zipopen1.read(randint(1, 1024)) read_data = zipopen1.read(randint(1, 1024))
if not read_data: if not read_data:
break break
zipdata1.append(read_data) zipdata1.append(read_data)
self.assertEqual(b''.join(zipdata1), self.data) self.assertEqual(b''.join(zipdata1), self.data)
zipfp.close()
def test_random_open_stored(self): def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
...@@ -176,35 +171,29 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -176,35 +171,29 @@ class TestsWithSourceFile(unittest.TestCase):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
zipopen = zipfp.open(TESTFN) zipopen = zipfp.open(TESTFN)
for line in self.line_gen: for line in self.line_gen:
linedata = zipopen.readline() linedata = zipopen.readline()
self.assertEqual(linedata, line + '\n') self.assertEqual(linedata, line + '\n')
zipfp.close()
def zip_readlines_test(self, f, compression): def zip_readlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
ziplines = zipfp.open(TESTFN).readlines() ziplines = zipfp.open(TESTFN).readlines()
for line, zipline in zip(self.line_gen, ziplines): for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + '\n')
zipfp.close()
def zip_iterlines_test(self, f, compression): def zip_iterlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)): for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)):
self.assertEqual(zipline, line + '\n') self.assertEqual(zipline, line + '\n')
zipfp.close()
def test_readline_stored(self): def test_readline_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_readline_test(f, zipfile.ZIP_STORED) self.zip_readline_test(f, zipfile.ZIP_STORED)
...@@ -252,34 +241,30 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -252,34 +241,30 @@ class TestsWithSourceFile(unittest.TestCase):
def test_low_compression(self): def test_low_compression(self):
# Checks for cases where compressed data is larger than original # Checks for cases where compressed data is larger than original
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
zipfp.writestr("strfile", '12') zipfp.writestr("strfile", '12')
zipfp.close()
# Get an open object for strfile # Get an open object for strfile
zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) as zipfp:
openobj = zipfp.open("strfile") openobj = zipfp.open("strfile")
self.assertEqual(openobj.read(1), b'1') self.assertEqual(openobj.read(1), b'1')
self.assertEqual(openobj.read(1), b'2') self.assertEqual(openobj.read(1), b'2')
def test_absolute_arcnames(self): def test_absolute_arcnames(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, "/absolute") zipfp.write(TESTFN, "/absolute")
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
self.assertEqual(zipfp.namelist(), ["absolute"]) self.assertEqual(zipfp.namelist(), ["absolute"])
zipfp.close()
def test_append_to_zip_file(self): def test_append_to_zip_file(self):
# Test appending to an existing zipfile # Test appending to an existing zipfile
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
zipfp.writestr("strfile", self.data) zipfp.writestr("strfile", self.data)
self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"])
zipfp.close()
def test_append_to_non_zip_file(self): def test_append_to_non_zip_file(self):
# Test appending to an existing file that is not a zipfile # Test appending to an existing file that is not a zipfile
...@@ -289,53 +274,46 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -289,53 +274,46 @@ class TestsWithSourceFile(unittest.TestCase):
f = open(TESTFN2, 'wb') f = open(TESTFN2, 'wb')
f.write(d) f.write(d)
f.close() f.close()
zipfp = zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp:
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.close()
f = open(TESTFN2, 'rb') f = open(TESTFN2, 'rb')
f.seek(len(d)) f.seek(len(d))
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
self.assertEqual(zipfp.namelist(), [TESTFN]) self.assertEqual(zipfp.namelist(), [TESTFN])
zipfp.close()
f.close()
def test_write_default_name(self): def test_write_default_name(self):
# Check that calling ZipFile.write without arcname specified produces the expected result # Check that calling ZipFile.write without arcname specified produces the expected result
zipfp = zipfile.ZipFile(TESTFN2, "w") with zipfile.ZipFile(TESTFN2, "w") as zipfp:
zipfp.write(TESTFN) zipfp.write(TESTFN)
self.assertEqual(zipfp.read(TESTFN), open(TESTFN, "rb").read()) self.assertEqual(zipfp.read(TESTFN), open(TESTFN, "rb").read())
zipfp.close()
@skipUnless(zlib, "requires zlib") @skipUnless(zlib, "requires zlib")
def test_per_file_compression(self): def test_per_file_compression(self):
# Check that files within a Zip archive can have different compression options # Check that files within a Zip archive can have different compression options
zipfp = zipfile.ZipFile(TESTFN2, "w") with zipfile.ZipFile(TESTFN2, "w") as zipfp:
zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED)
zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED)
sinfo = zipfp.getinfo('storeme') sinfo = zipfp.getinfo('storeme')
dinfo = zipfp.getinfo('deflateme') dinfo = zipfp.getinfo('deflateme')
self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED)
self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED)
zipfp.close()
def test_write_to_readonly(self): def test_write_to_readonly(self):
# Check that trying to call write() on a readonly ZipFile object # Check that trying to call write() on a readonly ZipFile object
# raises a RuntimeError # raises a RuntimeError
zipf = zipfile.ZipFile(TESTFN2, mode="w") with zipfile.ZipFile(TESTFN2, mode="w") as zipfp:
zipf.writestr("somefile.txt", "bogus") zipfp.writestr("somefile.txt", "bogus")
zipf.close()
zipf = zipfile.ZipFile(TESTFN2, mode="r") with zipfile.ZipFile(TESTFN2, mode="r") as zipfp:
self.assertRaises(RuntimeError, zipf.write, TESTFN) self.assertRaises(RuntimeError, zipfp.write, TESTFN)
zipf.close()
def test_extract(self): def test_extract(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA: for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata) zipfp.writestr(fpath, fdata)
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "r") with zipfile.ZipFile(TESTFN2, "r") as zipfp:
for fpath, fdata in SMALL_TEST_DATA: for fpath, fdata in SMALL_TEST_DATA:
writtenfile = zipfp.extract(fpath) writtenfile = zipfp.extract(fpath)
...@@ -353,18 +331,15 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -353,18 +331,15 @@ class TestsWithSourceFile(unittest.TestCase):
os.remove(writtenfile) os.remove(writtenfile)
zipfp.close()
# remove the test file subdirectories # remove the test file subdirectories
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def test_extract_all(self): def test_extract_all(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
for fpath, fdata in SMALL_TEST_DATA: for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata) zipfp.writestr(fpath, fdata)
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "r") with zipfile.ZipFile(TESTFN2, "r") as zipfp:
zipfp.extractall() zipfp.extractall()
for fpath, fdata in SMALL_TEST_DATA: for fpath, fdata in SMALL_TEST_DATA:
if os.path.isabs(fpath): if os.path.isabs(fpath):
...@@ -376,8 +351,6 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -376,8 +351,6 @@ class TestsWithSourceFile(unittest.TestCase):
os.remove(outfile) os.remove(outfile)
zipfp.close()
# remove the test file subdirectories # remove the test file subdirectories
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir')) shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
...@@ -386,7 +359,7 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -386,7 +359,7 @@ class TestsWithSourceFile(unittest.TestCase):
# when it is passed a name rather than a ZipInfo instance. # when it is passed a name rather than a ZipInfo instance.
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
zinfo = zipfp.getinfo('strfile') zinfo = zipfp.getinfo('strfile')
self.assertEqual(zinfo.external_attr, 0o600 << 16) self.assertEqual(zinfo.external_attr, 0o600 << 16)
...@@ -395,12 +368,36 @@ class TestsWithSourceFile(unittest.TestCase): ...@@ -395,12 +368,36 @@ class TestsWithSourceFile(unittest.TestCase):
self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED)
def test_writestr_extended_local_header_issue1202(self): def test_writestr_extended_local_header_issue1202(self):
orig_zip = zipfile.ZipFile(TESTFN2, 'w') with zipfile.ZipFile(TESTFN2, 'w') as orig_zip:
for data in 'abcdefghijklmnop': for data in 'abcdefghijklmnop':
zinfo = zipfile.ZipInfo(data) zinfo = zipfile.ZipInfo(data)
zinfo.flag_bits |= 0x08 # Include an extended local header. zinfo.flag_bits |= 0x08 # Include an extended local header.
orig_zip.writestr(zinfo, data) orig_zip.writestr(zinfo, data)
orig_zip.close()
def test_close(self):
"""Check that the zipfile is closed after the 'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
with zipfile.ZipFile(TESTFN2, "r") as zipfp:
self.assertTrue(zipfp.fp is not None, 'zipfp is not open')
self.assertTrue(zipfp.fp is None, 'zipfp is not closed')
def test_close_on_exception(self):
"""Check that the zipfile is closed if an exception is raised in the
'with' block."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
for fpath, fdata in SMALL_TEST_DATA:
zipfp.writestr(fpath, fdata)
try:
with zipfile.ZipFile(TESTFN2, "r") as zipfp2:
raise zipfile.BadZipfile()
except zipfile.BadZipfile:
self.assertTrue(zipfp2.fp is None, 'zipfp is not closed')
def tearDown(self): def tearDown(self):
unlink(TESTFN) unlink(TESTFN)
...@@ -425,16 +422,14 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -425,16 +422,14 @@ class TestZip64InSmallFiles(unittest.TestCase):
fp.close() fp.close()
def large_file_exception_test(self, f, compression): def large_file_exception_test(self, f, compression):
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
self.assertRaises(zipfile.LargeZipFile, self.assertRaises(zipfile.LargeZipFile,
zipfp.write, TESTFN, "another.name") zipfp.write, TESTFN, "another.name")
zipfp.close()
def large_file_exception_test2(self, f, compression): def large_file_exception_test2(self, f, compression):
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
self.assertRaises(zipfile.LargeZipFile, self.assertRaises(zipfile.LargeZipFile,
zipfp.writestr, "another.name", self.data) zipfp.writestr, "another.name", self.data)
zipfp.close()
def test_large_file_exception(self): def test_large_file_exception(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
...@@ -443,14 +438,13 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -443,14 +438,13 @@ class TestZip64InSmallFiles(unittest.TestCase):
def zip_test(self, f, compression): def zip_test(self, f, compression):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression, allowZip64=True) with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp:
zipfp.write(TESTFN, "another.name") zipfp.write(TESTFN, "another.name")
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.writestr("strfile", self.data) zipfp.writestr("strfile", self.data)
zipfp.close()
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
self.assertEqual(zipfp.read(TESTFN), self.data) self.assertEqual(zipfp.read(TESTFN), self.data)
self.assertEqual(zipfp.read("another.name"), self.data) self.assertEqual(zipfp.read("another.name"), self.data)
self.assertEqual(zipfp.read("strfile"), self.data) self.assertEqual(zipfp.read("strfile"), self.data)
...@@ -498,8 +492,6 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -498,8 +492,6 @@ class TestZip64InSmallFiles(unittest.TestCase):
# Check that testzip doesn't raise an exception # Check that testzip doesn't raise an exception
zipfp.testzip() zipfp.testzip()
zipfp.close()
def test_stored(self): def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.zip_test(f, zipfile.ZIP_STORED) self.zip_test(f, zipfile.ZIP_STORED)
...@@ -510,13 +502,12 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -510,13 +502,12 @@ class TestZip64InSmallFiles(unittest.TestCase):
self.zip_test(f, zipfile.ZIP_DEFLATED) self.zip_test(f, zipfile.ZIP_DEFLATED)
def test_absolute_arcnames(self): def test_absolute_arcnames(self):
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, allowZip64=True) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED,
allowZip64=True) as zipfp:
zipfp.write(TESTFN, "/absolute") zipfp.write(TESTFN, "/absolute")
zipfp.close()
zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp:
self.assertEqual(zipfp.namelist(), ["absolute"]) self.assertEqual(zipfp.namelist(), ["absolute"])
zipfp.close()
def tearDown(self): def tearDown(self):
zipfile.ZIP64_LIMIT = self._limit zipfile.ZIP64_LIMIT = self._limit
...@@ -526,7 +517,7 @@ class TestZip64InSmallFiles(unittest.TestCase): ...@@ -526,7 +517,7 @@ class TestZip64InSmallFiles(unittest.TestCase):
class PyZipFileTests(unittest.TestCase): class PyZipFileTests(unittest.TestCase):
def test_write_pyfile(self): def test_write_pyfile(self):
zipfp = zipfile.PyZipFile(TemporaryFile(), "w") with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
fn = __file__ fn = __file__
if fn.endswith('.pyc') or fn.endswith('.pyo'): if fn.endswith('.pyc') or fn.endswith('.pyo'):
fn = fn[:-1] fn = fn[:-1]
...@@ -535,10 +526,10 @@ class PyZipFileTests(unittest.TestCase): ...@@ -535,10 +526,10 @@ class PyZipFileTests(unittest.TestCase):
bn = os.path.basename(fn) bn = os.path.basename(fn)
self.assertTrue(bn not in zipfp.namelist()) self.assertTrue(bn not in zipfp.namelist())
self.assertTrue(bn + 'o' in zipfp.namelist() or bn + 'c' in zipfp.namelist()) self.assertTrue(bn + 'o' in zipfp.namelist() or
zipfp.close() bn + 'c' in zipfp.namelist())
zipfp = zipfile.PyZipFile(TemporaryFile(), "w") with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
fn = __file__ fn = __file__
if fn.endswith('.pyc') or fn.endswith('.pyo'): if fn.endswith('.pyc') or fn.endswith('.pyo'):
fn = fn[:-1] fn = fn[:-1]
...@@ -547,20 +538,22 @@ class PyZipFileTests(unittest.TestCase): ...@@ -547,20 +538,22 @@ class PyZipFileTests(unittest.TestCase):
bn = "%s/%s"%("testpackage", os.path.basename(fn)) bn = "%s/%s"%("testpackage", os.path.basename(fn))
self.assertTrue(bn not in zipfp.namelist()) self.assertTrue(bn not in zipfp.namelist())
self.assertTrue(bn + 'o' in zipfp.namelist() or bn + 'c' in zipfp.namelist()) self.assertTrue(bn + 'o' in zipfp.namelist() or
zipfp.close() bn + 'c' in zipfp.namelist())
def test_write_python_package(self): def test_write_python_package(self):
import email import email
packagedir = os.path.dirname(email.__file__) packagedir = os.path.dirname(email.__file__)
zipfp = zipfile.PyZipFile(TemporaryFile(), "w") with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
zipfp.writepy(packagedir) zipfp.writepy(packagedir)
# Check for a couple of modules at different levels of the hieararchy # Check for a couple of modules at different levels of the hieararchy
names = zipfp.namelist() names = zipfp.namelist()
self.assertTrue('email/__init__.pyo' in names or 'email/__init__.pyc' in names) self.assertTrue('email/__init__.pyo' in names or
self.assertTrue('email/mime/text.pyo' in names or 'email/mime/text.pyc' in names) 'email/__init__.pyc' in names)
self.assertTrue('email/mime/text.pyo' in names or
'email/mime/text.pyc' in names)
def test_write_python_directory(self): def test_write_python_directory(self):
os.mkdir(TESTFN2) os.mkdir(TESTFN2)
...@@ -589,22 +582,22 @@ class PyZipFileTests(unittest.TestCase): ...@@ -589,22 +582,22 @@ class PyZipFileTests(unittest.TestCase):
shutil.rmtree(TESTFN2) shutil.rmtree(TESTFN2)
def test_write_non_pyfile(self): def test_write_non_pyfile(self):
zipfp = zipfile.PyZipFile(TemporaryFile(), "w") with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
open(TESTFN, 'w').write('most definitely not a python file') open(TESTFN, 'w').write('most definitely not a python file')
self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
os.remove(TESTFN) os.remove(TESTFN)
class OtherTests(unittest.TestCase): class OtherTests(unittest.TestCase):
def test_unicode_filenames(self): def test_unicode_filenames(self):
zf = zipfile.ZipFile(TESTFN, "w") with zipfile.ZipFile(TESTFN, "w") as zf:
zf.writestr("foo.txt", "Test for unicode filename") zf.writestr("foo.txt", "Test for unicode filename")
zf.writestr("\xf6.txt", "Test for unicode filename") zf.writestr("\xf6.txt", "Test for unicode filename")
zf.close()
zf = zipfile.ZipFile(TESTFN, "r") with zipfile.ZipFile(TESTFN, "r") as zf:
self.assertEqual(zf.filelist[0].filename, "foo.txt") self.assertEqual(zf.filelist[0].filename, "foo.txt")
self.assertEqual(zf.filelist[1].filename, "\xf6.txt") self.assertEqual(zf.filelist[1].filename, "\xf6.txt")
zf.close()
def test_create_non_existent_file_for_append(self): def test_create_non_existent_file_for_append(self):
if os.path.exists(TESTFN): if os.path.exists(TESTFN):
...@@ -614,17 +607,15 @@ class OtherTests(unittest.TestCase): ...@@ -614,17 +607,15 @@ class OtherTests(unittest.TestCase):
content = b'hello, world. this is some content.' content = b'hello, world. this is some content.'
try: try:
zf = zipfile.ZipFile(TESTFN, 'a') with zipfile.ZipFile(TESTFN, 'a') as zf:
zf.writestr(filename, content) zf.writestr(filename, content)
zf.close()
except IOError: except IOError:
self.fail('Could not append data to a non-existent zip file.') self.fail('Could not append data to a non-existent zip file.')
self.assertTrue(os.path.exists(TESTFN)) self.assertTrue(os.path.exists(TESTFN))
zf = zipfile.ZipFile(TESTFN, 'r') with zipfile.ZipFile(TESTFN, 'r') as zf:
self.assertEqual(zf.read(filename), content) self.assertEqual(zf.read(filename), content)
zf.close()
def test_close_erroneous_file(self): def test_close_erroneous_file(self):
# This test checks that the ZipFile constructor closes the file object # This test checks that the ZipFile constructor closes the file object
...@@ -668,9 +659,9 @@ class OtherTests(unittest.TestCase): ...@@ -668,9 +659,9 @@ class OtherTests(unittest.TestCase):
# a file that is a zip file # a file that is a zip file
# - passing a filename # - passing a filename
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", b"O, for a Muse of Fire!") zipf.writestr("foo.txt", b"O, for a Muse of Fire!")
zipf.close()
chk = zipfile.is_zipfile(TESTFN) chk = zipfile.is_zipfile(TESTFN)
self.assertTrue(chk) self.assertTrue(chk)
# - passing a file object # - passing a file object
...@@ -715,9 +706,8 @@ class OtherTests(unittest.TestCase): ...@@ -715,9 +706,8 @@ class OtherTests(unittest.TestCase):
def test_closed_zip_raises_RuntimeError(self): def test_closed_zip_raises_RuntimeError(self):
# Verify that testzip() doesn't swallow inappropriate exceptions. # Verify that testzip() doesn't swallow inappropriate exceptions.
data = io.BytesIO() data = io.BytesIO()
zipf = zipfile.ZipFile(data, mode="w") with zipfile.ZipFile(data, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close()
# This is correct; calling .read on a closed ZipFile should throw # This is correct; calling .read on a closed ZipFile should throw
# a RuntimeError, and so should calling .testzip. An earlier # a RuntimeError, and so should calling .testzip. An earlier
...@@ -736,19 +726,18 @@ class OtherTests(unittest.TestCase): ...@@ -736,19 +726,18 @@ class OtherTests(unittest.TestCase):
def test_bad_open_mode(self): def test_bad_open_mode(self):
# Check that bad modes passed to ZipFile.open are caught # Check that bad modes passed to ZipFile.open are caught
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close()
zipf = zipfile.ZipFile(TESTFN, mode="r") with zipfile.ZipFile(TESTFN, mode="r") as zipf:
# read the data to make sure the file is there # read the data to make sure the file is there
zipf.read("foo.txt") zipf.read("foo.txt")
self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q") self.assertRaises(RuntimeError, zipf.open, "foo.txt", "q")
zipf.close()
def test_read0(self): def test_read0(self):
# Check that calling read(0) on a ZipExtFile object returns an empty # Check that calling read(0) on a ZipExtFile object returns an empty
# string and doesn't advance file pointer # string and doesn't advance file pointer
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
# read the data to make sure the file is there # read the data to make sure the file is there
f = zipf.open("foo.txt") f = zipf.open("foo.txt")
...@@ -756,12 +745,11 @@ class OtherTests(unittest.TestCase): ...@@ -756,12 +745,11 @@ class OtherTests(unittest.TestCase):
self.assertEqual(f.read(0), b'') self.assertEqual(f.read(0), b'')
self.assertEqual(f.read(), b"O, for a Muse of Fire!") self.assertEqual(f.read(), b"O, for a Muse of Fire!")
zipf.close()
def test_open_non_existent_item(self): def test_open_non_existent_item(self):
# Check that attempting to call open() for an item that doesn't # Check that attempting to call open() for an item that doesn't
# exist in the archive raises a RuntimeError # exist in the archive raises a RuntimeError
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
self.assertRaises(KeyError, zipf.open, "foo.txt", "r") self.assertRaises(KeyError, zipf.open, "foo.txt", "r")
def test_bad_compression_mode(self): def test_bad_compression_mode(self):
...@@ -770,7 +758,7 @@ class OtherTests(unittest.TestCase): ...@@ -770,7 +758,7 @@ class OtherTests(unittest.TestCase):
def test_null_byte_in_filename(self): def test_null_byte_in_filename(self):
# Check that a filename containing a null byte is properly terminated # Check that a filename containing a null byte is properly terminated
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!")
self.assertEqual(zipf.namelist(), ['foo.txt']) self.assertEqual(zipf.namelist(), ['foo.txt'])
...@@ -785,43 +773,37 @@ class OtherTests(unittest.TestCase): ...@@ -785,43 +773,37 @@ class OtherTests(unittest.TestCase):
# This test checks that comments on the archive are handled properly # This test checks that comments on the archive are handled properly
# check default comment is empty # check default comment is empty
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
self.assertEqual(zipf.comment, b'') self.assertEqual(zipf.comment, b'')
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close()
zipfr = zipfile.ZipFile(TESTFN, mode="r") with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
self.assertEqual(zipfr.comment, b'') self.assertEqual(zipfr.comment, b'')
zipfr.close()
# check a simple short comment # check a simple short comment
comment = b'Bravely taking to his feet, he beat a very brave retreat.' comment = b'Bravely taking to his feet, he beat a very brave retreat.'
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.comment = comment zipf.comment = comment
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close() with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
zipfr = zipfile.ZipFile(TESTFN, mode="r") self.assertEqual(zipf.comment, comment)
self.assertEqual(zipfr.comment, comment)
zipfr.close()
# check a comment of max length # check a comment of max length
comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)])
comment2 = comment2.encode("ascii") comment2 = comment2.encode("ascii")
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.comment = comment2 zipf.comment = comment2
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close()
zipfr = zipfile.ZipFile(TESTFN, mode="r") with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
self.assertEqual(zipfr.comment, comment2) self.assertEqual(zipfr.comment, comment2)
zipfr.close()
# check a comment that is too long is truncated # check a comment that is too long is truncated
zipf = zipfile.ZipFile(TESTFN, mode="w") with zipfile.ZipFile(TESTFN, mode="w") as zipf:
zipf.comment = comment2 + b'oops' zipf.comment = comment2 + b'oops'
zipf.writestr("foo.txt", "O, for a Muse of Fire!") zipf.writestr("foo.txt", "O, for a Muse of Fire!")
zipf.close() with zipfile.ZipFile(TESTFN, mode="r") as zipfr:
zipfr = zipfile.ZipFile(TESTFN, mode="r")
self.assertEqual(zipfr.comment, comment2) self.assertEqual(zipfr.comment, comment2)
zipfr.close()
def tearDown(self): def tearDown(self):
unlink(TESTFN) unlink(TESTFN)
...@@ -907,21 +889,19 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -907,21 +889,19 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
def make_test_archive(self, f, compression): def make_test_archive(self, f, compression):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
zipfp.write(TESTFN, "another.name") zipfp.write(TESTFN, "another.name")
zipfp.write(TESTFN, TESTFN) zipfp.write(TESTFN, TESTFN)
zipfp.close()
def zip_test(self, f, compression): def zip_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
testdata = zipfp.read(TESTFN) testdata = zipfp.read(TESTFN)
self.assertEqual(len(testdata), len(self.data)) self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data) self.assertEqual(testdata, self.data)
self.assertEqual(zipfp.read("another.name"), self.data) self.assertEqual(zipfp.read("another.name"), self.data)
zipfp.close()
def test_stored(self): def test_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
...@@ -931,10 +911,10 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -931,10 +911,10 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) zipopen1 = zipfp.open(TESTFN)
while 1: while True:
read_data = zipopen1.read(256) read_data = zipopen1.read(256)
if not read_data: if not read_data:
break break
...@@ -942,7 +922,7 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -942,7 +922,7 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
zipdata2 = [] zipdata2 = []
zipopen2 = zipfp.open("another.name") zipopen2 = zipfp.open("another.name")
while 1: while True:
read_data = zipopen2.read(256) read_data = zipopen2.read(256)
if not read_data: if not read_data:
break break
...@@ -955,7 +935,6 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -955,7 +935,6 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
testdata2 = b''.join(zipdata2) testdata2 = b''.join(zipdata2)
self.assertEqual(len(testdata1), len(self.data)) self.assertEqual(len(testdata1), len(self.data))
self.assertEqual(testdata1, self.data) self.assertEqual(testdata1, self.data)
zipfp.close()
def test_open_stored(self): def test_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
...@@ -965,10 +944,10 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -965,10 +944,10 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r", compression) with zipfile.ZipFile(f, "r", compression) as zipfp:
zipdata1 = [] zipdata1 = []
zipopen1 = zipfp.open(TESTFN) zipopen1 = zipfp.open(TESTFN)
while 1: while True:
read_data = zipopen1.read(randint(1, 1024)) read_data = zipopen1.read(randint(1, 1024))
if not read_data: if not read_data:
break break
...@@ -977,7 +956,6 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -977,7 +956,6 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
testdata = b''.join(zipdata1) testdata = b''.join(zipdata1)
self.assertEqual(len(testdata), len(self.data)) self.assertEqual(len(testdata), len(self.data))
self.assertEqual(testdata, self.data) self.assertEqual(testdata, self.data)
zipfp.close()
def test_random_open_stored(self): def test_random_open_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
...@@ -988,15 +966,14 @@ class TestsWithRandomBinaryFiles(unittest.TestCase): ...@@ -988,15 +966,14 @@ class TestsWithRandomBinaryFiles(unittest.TestCase):
class TestsWithMultipleOpens(unittest.TestCase): class TestsWithMultipleOpens(unittest.TestCase):
def setUp(self): def setUp(self):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) as zipfp:
zipfp.writestr('ones', '1'*FIXEDTEST_SIZE) zipfp.writestr('ones', '1'*FIXEDTEST_SIZE)
zipfp.writestr('twos', '2'*FIXEDTEST_SIZE) zipfp.writestr('twos', '2'*FIXEDTEST_SIZE)
zipfp.close()
def test_same_file(self): def test_same_file(self):
# Verify that (when the ZipFile is in control of creating file objects) # Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other. # multiple open() calls can be made without interfering with each other.
zipf = zipfile.ZipFile(TESTFN2, mode="r") with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
zopen1 = zipf.open('ones') zopen1 = zipf.open('ones')
zopen2 = zipf.open('ones') zopen2 = zipf.open('ones')
data1 = zopen1.read(500) data1 = zopen1.read(500)
...@@ -1004,12 +981,11 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -1004,12 +981,11 @@ class TestsWithMultipleOpens(unittest.TestCase):
data1 += zopen1.read(500) data1 += zopen1.read(500)
data2 += zopen2.read(500) data2 += zopen2.read(500)
self.assertEqual(data1, data2) self.assertEqual(data1, data2)
zipf.close()
def test_different_file(self): def test_different_file(self):
# Verify that (when the ZipFile is in control of creating file objects) # Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other. # multiple open() calls can be made without interfering with each other.
zipf = zipfile.ZipFile(TESTFN2, mode="r") with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
zopen1 = zipf.open('ones') zopen1 = zipf.open('ones')
zopen2 = zipf.open('twos') zopen2 = zipf.open('twos')
data1 = zopen1.read(500) data1 = zopen1.read(500)
...@@ -1018,12 +994,11 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -1018,12 +994,11 @@ class TestsWithMultipleOpens(unittest.TestCase):
data2 += zopen2.read(500) data2 += zopen2.read(500)
self.assertEqual(data1, b'1'*FIXEDTEST_SIZE) self.assertEqual(data1, b'1'*FIXEDTEST_SIZE)
self.assertEqual(data2, b'2'*FIXEDTEST_SIZE) self.assertEqual(data2, b'2'*FIXEDTEST_SIZE)
zipf.close()
def test_interleaved(self): def test_interleaved(self):
# Verify that (when the ZipFile is in control of creating file objects) # Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other. # multiple open() calls can be made without interfering with each other.
zipf = zipfile.ZipFile(TESTFN2, mode="r") with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
zopen1 = zipf.open('ones') zopen1 = zipf.open('ones')
data1 = zopen1.read(500) data1 = zopen1.read(500)
zopen2 = zipf.open('twos') zopen2 = zipf.open('twos')
...@@ -1032,7 +1007,6 @@ class TestsWithMultipleOpens(unittest.TestCase): ...@@ -1032,7 +1007,6 @@ class TestsWithMultipleOpens(unittest.TestCase):
data2 += zopen2.read(500) data2 += zopen2.read(500)
self.assertEqual(data1, b'1'*FIXEDTEST_SIZE) self.assertEqual(data1, b'1'*FIXEDTEST_SIZE)
self.assertEqual(data2, b'2'*FIXEDTEST_SIZE) self.assertEqual(data2, b'2'*FIXEDTEST_SIZE)
zipf.close()
def tearDown(self): def tearDown(self):
unlink(TESTFN2) unlink(TESTFN2)
...@@ -1043,7 +1017,7 @@ class TestWithDirectory(unittest.TestCase): ...@@ -1043,7 +1017,7 @@ class TestWithDirectory(unittest.TestCase):
os.mkdir(TESTFN2) os.mkdir(TESTFN2)
def test_extract_dir(self): def test_extract_dir(self):
zipf = zipfile.ZipFile(findfile("zipdir.zip")) with zipfile.ZipFile(findfile("zipdir.zip")) as zipf:
zipf.extractall(TESTFN2) zipf.extractall(TESTFN2)
self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a")))
self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b")))
...@@ -1084,58 +1058,49 @@ class UniversalNewlineTests(unittest.TestCase): ...@@ -1084,58 +1058,49 @@ class UniversalNewlineTests(unittest.TestCase):
def make_test_archive(self, f, compression): def make_test_archive(self, f, compression):
# Create the ZIP archive # Create the ZIP archive
zipfp = zipfile.ZipFile(f, "w", compression) with zipfile.ZipFile(f, "w", compression) as zipfp:
for fn in self.arcfiles.values(): for fn in self.arcfiles.values():
zipfp.write(fn, fn) zipfp.write(fn, fn)
zipfp.close()
def read_test(self, f, compression): def read_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
zipdata = zipfp.open(fn, "rU").read() zipdata = zipfp.open(fn, "rU").read()
self.assertEqual(self.arcdata[sep], zipdata) self.assertEqual(self.arcdata[sep], zipdata)
zipfp.close()
def readline_test(self, f, compression): def readline_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
zipopen = zipfp.open(fn, "rU") zipopen = zipfp.open(fn, "rU")
for line in self.line_gen: for line in self.line_gen:
linedata = zipopen.readline() linedata = zipopen.readline()
self.assertEqual(linedata, line + b'\n') self.assertEqual(linedata, line + b'\n')
zipfp.close()
def readlines_test(self, f, compression): def readlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
ziplines = zipfp.open(fn, "rU").readlines() ziplines = zipfp.open(fn, "rU").readlines()
for line, zipline in zip(self.line_gen, ziplines): for line, zipline in zip(self.line_gen, ziplines):
self.assertEqual(zipline, line + b'\n') self.assertEqual(zipline, line + b'\n')
zipfp.close()
def iterlines_test(self, f, compression): def iterlines_test(self, f, compression):
self.make_test_archive(f, compression) self.make_test_archive(f, compression)
# Read the ZIP archive # Read the ZIP archive
zipfp = zipfile.ZipFile(f, "r") with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items(): for sep, fn in self.arcfiles.items():
for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")): for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
self.assertEqual(zipline, line + b'\n') self.assertEqual(zipline, line + b'\n')
zipfp.close()
def test_read_stored(self): def test_read_stored(self):
for f in (TESTFN2, TemporaryFile(), io.BytesIO()): for f in (TESTFN2, TemporaryFile(), io.BytesIO()):
self.read_test(f, zipfile.ZIP_STORED) self.read_test(f, zipfile.ZIP_STORED)
......
...@@ -719,6 +719,12 @@ class ZipFile: ...@@ -719,6 +719,12 @@ class ZipFile:
self.fp = None self.fp = None
raise RuntimeError('Mode must be "r", "w" or "a"') raise RuntimeError('Mode must be "r", "w" or "a"')
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
def _GetContents(self): def _GetContents(self):
"""Read the directory, making sure we close the file if the format """Read the directory, making sure we close the file if the format
is bad.""" is bad."""
......
...@@ -171,6 +171,9 @@ C-API ...@@ -171,6 +171,9 @@ C-API
Library Library
------- -------
- Issue #5511: now zipfile.ZipFile can be used as a context manager.
Initial patch by Brian Curtin.
- Issue #7556: Make sure Distutils' msvc9compile reads and writes the - Issue #7556: Make sure Distutils' msvc9compile reads and writes the
MSVC XML Manifest file in text mode so string patterns can be used MSVC XML Manifest file in text mode so string patterns can be used
in regular expressions. in regular expressions.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment