Commit b48c5d51 authored by Gregory P. Smith's avatar Gregory P. Smith

Fixes issue19081: When a zipimport .zip file in sys.path being imported

from is modified during the lifetime of the Python process after
zipimport has already opened and cached the zip's table of contents
it now fstat's the file after opening it upon every attempt to access
anything within and will re-read the table of contents if the .zip file
inode, size or mtime have changed.

It would've been nicer to hold any .zip file used by zipimport open for the
duration of the process but that would be more invasive and add an additional
open file descriptor to all zipimport using processes.  It also would likely
not fix the problem on Windows due to different filesystem semantics.
parent 875565bb
import io
import sys
import os
import marshal
......@@ -55,6 +56,27 @@ TESTPACK2 = "ziptestpackage2"
TEMP_ZIP = os.path.abspath("junk95142" + os.extsep + "zip")
def _write_zip_package(zipname, files,
data_to_prepend=b"", compression=ZIP_STORED):
z = ZipFile(zipname, "w")
try:
for name, (mtime, data) in files.items():
zinfo = ZipInfo(name, time.localtime(mtime))
zinfo.compress_type = compression
z.writestr(zinfo, data)
finally:
z.close()
if data_to_prepend:
# Prepend data to the start of the zipfile
with open(zipname, "rb") as f:
zip_data = f.read()
with open(zipname, "wb") as f:
f.write(data_to_prepend)
f.write(zip_data)
class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
compression = ZIP_STORED
......@@ -67,26 +89,9 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
ImportHooksBaseTestCase.setUp(self)
def doTest(self, expected_ext, files, *modules, **kw):
z = ZipFile(TEMP_ZIP, "w")
_write_zip_package(TEMP_ZIP, files, data_to_prepend=kw.get("stuff"),
compression=self.compression)
try:
for name, (mtime, data) in files.items():
zinfo = ZipInfo(name, time.localtime(mtime))
zinfo.compress_type = self.compression
z.writestr(zinfo, data)
z.close()
stuff = kw.get("stuff", None)
if stuff is not None:
# Prepend 'stuff' to the start of the zipfile
f = open(TEMP_ZIP, "rb")
data = f.read()
f.close()
f = open(TEMP_ZIP, "wb")
f.write(stuff)
f.write(data)
f.close()
sys.path.insert(0, TEMP_ZIP)
mod = __import__(".".join(modules), globals(), locals(),
......@@ -101,7 +106,6 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
self.assertEqual(file, os.path.join(TEMP_ZIP,
*modules) + expected_ext)
finally:
z.close()
os.remove(TEMP_ZIP)
def testAFakeZlib(self):
......@@ -387,6 +391,64 @@ class CompressedZipImportTestCase(UncompressedZipImportTestCase):
compression = ZIP_DEFLATED
class ZipFileModifiedAfterImportTestCase(ImportHooksBaseTestCase):
def setUp(self):
zipimport._zip_directory_cache.clear()
zipimport._zip_stat_cache.clear()
ImportHooksBaseTestCase.setUp(self)
def tearDown(self):
ImportHooksBaseTestCase.tearDown(self)
if os.path.exists(TEMP_ZIP):
os.remove(TEMP_ZIP)
def testZipFileChangesAfterFirstImport(self):
"""Alter the zip file after caching its index and try an import."""
packdir = TESTPACK + os.sep
files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
packdir + TESTMOD + ".py": (NOW, "test_value = 38\n"),
"ziptest_a.py": (NOW, "test_value = 23\n"),
"ziptest_b.py": (NOW, "test_value = 42\n"),
"ziptest_c.py": (NOW, "test_value = 1337\n")}
zipfile_path = TEMP_ZIP
_write_zip_package(zipfile_path, files)
self.assertTrue(os.path.exists(zipfile_path))
sys.path.insert(0, zipfile_path)
# Import something out of the zipfile and confirm it is correct.
testmod = __import__(TESTPACK + "." + TESTMOD,
globals(), locals(), ["__dummy__"])
self.assertEqual(testmod.test_value, 38)
# Import something else out of the zipfile and confirm it is correct.
ziptest_b = __import__("ziptest_b", globals(), locals(), ["test_value"])
self.assertEqual(ziptest_b.test_value, 42)
# Truncate and fill the zip file with non-zip garbage.
with io.open(zipfile_path, "rb") as orig_zip_file:
orig_zip_file_contents = orig_zip_file.read()
with io.open(zipfile_path, "wb") as byebye_valid_zip_file:
byebye_valid_zip_file.write(b"Tear down this wall!\n"*1987)
# Now that the zipfile has been replaced, import something else from it
# which should fail as the file contents are now garbage.
with self.assertRaises(ImportError):
ziptest_a = __import__("ziptest_a", globals(), locals(),
["test_value"])
# Now lets make it a valid zipfile that has some garbage at the start.
# This alters all of the offsets within the file
with io.open(zipfile_path, "wb") as new_zip_file:
new_zip_file.write(b"X"*1991) # The year Python was created.
new_zip_file.write(orig_zip_file_contents)
# Now that the zip file has been "restored" to a valid but different
# zipfile the zipimporter should *successfully* re-read the new zip
# file's end of file central index and be able to import from it again.
ziptest_a = __import__("ziptest_a", globals(), locals(), ["test_value"])
self.assertEqual(ziptest_a.test_value, 23)
ziptest_c = __import__("ziptest_c", globals(), locals(), ["test_value"])
self.assertEqual(ziptest_c.test_value, 1337)
class BadFileZipImportTestCase(unittest.TestCase):
def assertZipFailure(self, filename):
self.assertRaises(zipimport.ZipImportError,
......@@ -464,6 +526,7 @@ def test_main():
UncompressedZipImportTestCase,
CompressedZipImportTestCase,
BadFileZipImportTestCase,
ZipFileModifiedAfterImportTestCase,
)
finally:
test_support.unlink(TESTMOD)
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment