Commit 352eb4f4 authored by Benjamin Peterson's avatar Benjamin Peterson

properly close files in test_zipfile (#20887)

Patch by Matti Picus.
parent 07681001
......@@ -8,7 +8,6 @@ import os
import io
import sys
import time
import shutil
import struct
import zipfile
import unittest
......@@ -19,7 +18,7 @@ from random import randint, random
from unittest import skipUnless
from test.test_support import TESTFN, TESTFN_UNICODE, TESTFN_ENCODING, \
run_unittest, findfile, unlink, check_warnings
run_unittest, findfile, unlink, rmtree, check_warnings
try:
TESTFN_UNICODE.encode(TESTFN_ENCODING)
except (UnicodeError, TypeError):
......@@ -367,7 +366,8 @@ class TestsWithSourceFile(unittest.TestCase):
produces the expected result."""
with zipfile.ZipFile(TESTFN2, "w") as zipfp:
zipfp.write(TESTFN)
self.assertEqual(zipfp.read(TESTFN), open(TESTFN).read())
with open(TESTFN,'r') as fid:
self.assertEqual(zipfp.read(TESTFN), fid.read())
@skipUnless(zlib, "requires zlib")
def test_per_file_compression(self):
......@@ -406,11 +406,12 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(writtenfile, correctfile)
# make sure correct data is in correct file
self.assertEqual(fdata, open(writtenfile, "rb").read())
with open(writtenfile, "rb") as fid:
self.assertEqual(fdata, fid.read())
os.remove(writtenfile)
# remove the test file subdirectories
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def test_extract_all(self):
with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp:
......@@ -422,11 +423,12 @@ class TestsWithSourceFile(unittest.TestCase):
for fpath, fdata in SMALL_TEST_DATA:
outfile = os.path.join(os.getcwd(), fpath)
self.assertEqual(fdata, open(outfile, "rb").read())
with open(outfile, "rb") as fid:
self.assertEqual(fdata, fid.read())
os.remove(outfile)
# remove the test file subdirectories
shutil.rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
rmtree(os.path.join(os.getcwd(), 'ziptest2dir'))
def check_file(self, filename, content):
self.assertTrue(os.path.isfile(filename))
......@@ -511,12 +513,12 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(writtenfile, correctfile,
msg="extract %r" % arcname)
self.check_file(correctfile, content)
shutil.rmtree('target')
rmtree('target')
with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
zipfp.extractall(targetpath)
self.check_file(correctfile, content)
shutil.rmtree('target')
rmtree('target')
correctfile = os.path.join(os.getcwd(), *fixedname.split('/'))
......@@ -525,12 +527,12 @@ class TestsWithSourceFile(unittest.TestCase):
self.assertEqual(writtenfile, correctfile,
msg="extract %r" % arcname)
self.check_file(correctfile, content)
shutil.rmtree(fixedname.split('/')[0])
rmtree(fixedname.split('/')[0])
with zipfile.ZipFile(TESTFN2, 'r') as zipfp:
zipfp.extractall()
self.check_file(correctfile, content)
shutil.rmtree(fixedname.split('/')[0])
rmtree(fixedname.split('/')[0])
os.remove(TESTFN2)
......@@ -775,11 +777,12 @@ class PyZipFileTests(unittest.TestCase):
self.assertNotIn('mod2.txt', names)
finally:
shutil.rmtree(TESTFN2)
rmtree(TESTFN2)
def test_write_non_pyfile(self):
with zipfile.PyZipFile(TemporaryFile(), "w") as zipfp:
open(TESTFN, 'w').write('most definitely not a python file')
with open(TESTFN, 'w') as fid:
fid.write('most definitely not a python file')
self.assertRaises(RuntimeError, zipfp.writepy, TESTFN)
os.remove(TESTFN)
......@@ -942,8 +945,9 @@ class OtherTests(unittest.TestCase):
self.assertRaises(RuntimeError, zipf.open, "foo.txt")
self.assertRaises(RuntimeError, zipf.testzip)
self.assertRaises(RuntimeError, zipf.writestr, "bogus.txt", "bogus")
open(TESTFN, 'w').write('zipfile test data')
self.assertRaises(RuntimeError, zipf.write, TESTFN)
with open(TESTFN, 'w') as fid:
fid.write('zipfile test data')
self.assertRaises(RuntimeError, zipf.write, TESTFN)
def test_bad_constructor_mode(self):
"""Check that bad modes passed to ZipFile constructor are caught."""
......@@ -1129,6 +1133,7 @@ class OtherTests(unittest.TestCase):
pass
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
zipf.close()
except zipfile.BadZipfile:
self.fail("Unable to create empty ZIP file in 'w' mode")
......@@ -1136,6 +1141,7 @@ class OtherTests(unittest.TestCase):
pass
try:
zipf = zipfile.ZipFile(TESTFN, mode="r")
zipf.close()
except:
self.fail("Unable to create empty ZIP file in 'a' mode")
......@@ -1332,12 +1338,11 @@ class TestsWithMultipleOpens(unittest.TestCase):
# Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other.
with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
zopen1 = zipf.open('ones')
zopen2 = zipf.open('ones')
data1 = zopen1.read(500)
data2 = zopen2.read(500)
data1 += zopen1.read(500)
data2 += zopen2.read(500)
with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2:
data1 = zopen1.read(500)
data2 = zopen2.read(500)
data1 += zopen1.read(500)
data2 += zopen2.read(500)
self.assertEqual(data1, data2)
def test_different_file(self):
......@@ -1364,6 +1369,17 @@ class TestsWithMultipleOpens(unittest.TestCase):
self.assertEqual(data1, '1'*FIXEDTEST_SIZE)
self.assertEqual(data2, '2'*FIXEDTEST_SIZE)
def test_many_opens(self):
# Verify that read() and open() promptly close the file descriptor,
# and don't rely on the garbage collector to free resources.
with zipfile.ZipFile(TESTFN2, mode="r") as zipf:
for x in range(100):
zipf.read('ones')
with zipf.open('ones') as zopen1:
pass
with open(os.devnull) as f:
self.assertLess(f.fileno(), 100)
def tearDown(self):
unlink(TESTFN2)
......@@ -1386,12 +1402,12 @@ class TestWithDirectory(unittest.TestCase):
def test_store_dir(self):
os.mkdir(os.path.join(TESTFN2, "x"))
zipf = zipfile.ZipFile(TESTFN, "w")
zipf.write(os.path.join(TESTFN2, "x"), "x")
self.assertTrue(zipf.filelist[0].filename.endswith("x/"))
with zipfile.ZipFile(TESTFN, "w") as zipf:
zipf.write(os.path.join(TESTFN2, "x"), "x")
self.assertTrue(zipf.filelist[0].filename.endswith("x/"))
def tearDown(self):
shutil.rmtree(TESTFN2)
rmtree(TESTFN2)
if os.path.exists(TESTFN):
unlink(TESTFN)
......@@ -1405,7 +1421,8 @@ class UniversalNewlineTests(unittest.TestCase):
for n, s in enumerate(self.seps):
self.arcdata[s] = s.join(self.line_gen) + s
self.arcfiles[s] = '%s-%d' % (TESTFN, n)
open(self.arcfiles[s], "wb").write(self.arcdata[s])
with open(self.arcfiles[s], "wb") as fid:
fid.write(self.arcdata[s])
def make_test_archive(self, f, compression):
# Create the ZIP archive
......@@ -1474,8 +1491,9 @@ class UniversalNewlineTests(unittest.TestCase):
# Read the ZIP archive
with zipfile.ZipFile(f, "r") as zipfp:
for sep, fn in self.arcfiles.items():
for line, zipline in zip(self.line_gen, zipfp.open(fn, "rU")):
self.assertEqual(zipline, line + '\n')
with zipfp.open(fn, "rU") as fid:
for line, zipline in zip(self.line_gen, fid):
self.assertEqual(zipline, line + '\n')
def test_read_stored(self):
for f in (TESTFN2, TemporaryFile(), StringIO()):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment