Commit 8b562920 authored by Serhiy Storchaka's avatar Serhiy Storchaka

Issue #18223: Refactor test_tarfile.

* Use mixins for generating tests for different compression types.
* Make test_tarfile discoverable.
* Use more special tests (i.e. assertEqual, assertIs) instead of assertTrue.
* Add explicit test skips instead of reporting skipped tests as passed.
* Wrap long lines.
* Correct a comment for test_hardlink_extraction1.
* Add support.requires_gzip.

and some other minor enhancements.
parent a269d821
...@@ -42,6 +42,11 @@ try: ...@@ -42,6 +42,11 @@ try:
except ImportError: except ImportError:
zlib = None zlib = None
try:
import gzip
except ImportError:
gzip = None
try: try:
import bz2 import bz2
except ImportError: except ImportError:
...@@ -71,7 +76,7 @@ __all__ = [ ...@@ -71,7 +76,7 @@ __all__ = [
"TestHandler", "Matcher", "can_symlink", "skip_unless_symlink", "TestHandler", "Matcher", "can_symlink", "skip_unless_symlink",
"skip_unless_xattr", "import_fresh_module", "requires_zlib", "skip_unless_xattr", "import_fresh_module", "requires_zlib",
"PIPE_MAX_SIZE", "failfast", "anticipate_failure", "run_with_tz", "PIPE_MAX_SIZE", "failfast", "anticipate_failure", "run_with_tz",
"requires_bz2", "requires_lzma", "suppress_crash_popup", "requires_gzip", "requires_bz2", "requires_lzma", "suppress_crash_popup",
] ]
class Error(Exception): class Error(Exception):
...@@ -588,6 +593,8 @@ requires_IEEE_754 = unittest.skipUnless( ...@@ -588,6 +593,8 @@ requires_IEEE_754 = unittest.skipUnless(
requires_zlib = unittest.skipUnless(zlib, 'requires zlib') requires_zlib = unittest.skipUnless(zlib, 'requires zlib')
requires_gzip = unittest.skipUnless(gzip, 'requires gzip')
requires_bz2 = unittest.skipUnless(bz2, 'requires bz2') requires_bz2 = unittest.skipUnless(bz2, 'requires bz2')
requires_lzma = unittest.skipUnless(lzma, 'requires lzma') requires_lzma = unittest.skipUnless(lzma, 'requires lzma')
......
...@@ -2,9 +2,7 @@ import sys ...@@ -2,9 +2,7 @@ import sys
import os import os
import io import io
import shutil import shutil
import io
from hashlib import md5 from hashlib import md5
import errno
import unittest import unittest
import tarfile import tarfile
...@@ -14,8 +12,7 @@ from test import support ...@@ -14,8 +12,7 @@ from test import support
# Check for our compression modules. # Check for our compression modules.
try: try:
import gzip import gzip
gzip.GzipFile except ImportError:
except (ImportError, AttributeError):
gzip = None gzip = None
try: try:
import bz2 import bz2
...@@ -40,25 +37,55 @@ md5_regtype = "65f477c818ad9e15f7feab0c6d37742f" ...@@ -40,25 +37,55 @@ md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
class ReadTest(unittest.TestCase): class TarTest:
tarname = tarname tarname = tarname
mode = "r:" suffix = ''
open = io.FileIO
@property
def mode(self):
return self.prefix + self.suffix
@support.requires_gzip
class GzipTest:
tarname = gzipname
suffix = 'gz'
open = gzip.GzipFile if gzip else None
@support.requires_bz2
class Bz2Test:
tarname = bz2name
suffix = 'bz2'
open = bz2.BZ2File if bz2 else None
@support.requires_lzma
class LzmaTest:
tarname = xzname
suffix = 'xz'
open = lzma.LZMAFile if lzma else None
class ReadTest(TarTest):
prefix = "r:"
def setUp(self): def setUp(self):
self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") self.tar = tarfile.open(self.tarname, mode=self.mode,
encoding="iso8859-1")
def tearDown(self): def tearDown(self):
self.tar.close() self.tar.close()
class UstarReadTest(ReadTest): class UstarReadTest(ReadTest, unittest.TestCase):
def test_fileobj_regular_file(self): def test_fileobj_regular_file(self):
tarinfo = self.tar.getmember("ustar/regtype") tarinfo = self.tar.getmember("ustar/regtype")
with self.tar.extractfile(tarinfo) as fobj: with self.tar.extractfile(tarinfo) as fobj:
data = fobj.read() data = fobj.read()
self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), self.assertEqual(len(data), tarinfo.size,
"regular file extraction failed")
self.assertEqual(md5sum(data), md5_regtype,
"regular file extraction failed") "regular file extraction failed")
def test_fileobj_readlines(self): def test_fileobj_readlines(self):
...@@ -70,12 +97,13 @@ class UstarReadTest(ReadTest): ...@@ -70,12 +97,13 @@ class UstarReadTest(ReadTest):
with self.tar.extractfile(tarinfo) as fobj: with self.tar.extractfile(tarinfo) as fobj:
fobj2 = io.TextIOWrapper(fobj) fobj2 = io.TextIOWrapper(fobj)
lines2 = fobj2.readlines() lines2 = fobj2.readlines()
self.assertTrue(lines1 == lines2, self.assertEqual(lines1, lines2,
"fileobj.readlines() failed") "fileobj.readlines() failed")
self.assertTrue(len(lines2) == 114, self.assertEqual(len(lines2), 114,
"fileobj.readlines() failed") "fileobj.readlines() failed")
self.assertTrue(lines2[83] == self.assertEqual(lines2[83],
"I will gladly admit that Python is not the fastest running scripting language.\n", "I will gladly admit that Python is not the fastest "
"running scripting language.\n",
"fileobj.readlines() failed") "fileobj.readlines() failed")
def test_fileobj_iter(self): def test_fileobj_iter(self):
...@@ -85,8 +113,8 @@ class UstarReadTest(ReadTest): ...@@ -85,8 +113,8 @@ class UstarReadTest(ReadTest):
lines1 = fobj1.readlines() lines1 = fobj1.readlines()
with self.tar.extractfile(tarinfo) as fobj2: with self.tar.extractfile(tarinfo) as fobj2:
lines2 = list(io.TextIOWrapper(fobj2)) lines2 = list(io.TextIOWrapper(fobj2))
self.assertTrue(lines1 == lines2, self.assertEqual(lines1, lines2,
"fileobj.__iter__() failed") "fileobj.__iter__() failed")
def test_fileobj_seek(self): def test_fileobj_seek(self):
self.tar.extract("ustar/regtype", TEMPDIR) self.tar.extract("ustar/regtype", TEMPDIR)
...@@ -110,12 +138,12 @@ class UstarReadTest(ReadTest): ...@@ -110,12 +138,12 @@ class UstarReadTest(ReadTest):
self.assertEqual(2048, fobj.tell(), self.assertEqual(2048, fobj.tell(),
"seek() to positive relative position failed") "seek() to positive relative position failed")
s = fobj.read(10) s = fobj.read(10)
self.assertTrue(s == data[2048:2058], self.assertEqual(s, data[2048:2058],
"read() after seek failed") "read() after seek failed")
fobj.seek(0, 2) fobj.seek(0, 2)
self.assertEqual(tarinfo.size, fobj.tell(), self.assertEqual(tarinfo.size, fobj.tell(),
"seek() to file's end failed") "seek() to file's end failed")
self.assertTrue(fobj.read() == b"", self.assertEqual(fobj.read(), b"",
"read() at file's end did not return empty string") "read() at file's end did not return empty string")
fobj.seek(-tarinfo.size, 2) fobj.seek(-tarinfo.size, 2)
self.assertEqual(0, fobj.tell(), self.assertEqual(0, fobj.tell(),
...@@ -124,13 +152,13 @@ class UstarReadTest(ReadTest): ...@@ -124,13 +152,13 @@ class UstarReadTest(ReadTest):
s1 = fobj.readlines() s1 = fobj.readlines()
fobj.seek(512) fobj.seek(512)
s2 = fobj.readlines() s2 = fobj.readlines()
self.assertTrue(s1 == s2, self.assertEqual(s1, s2,
"readlines() after seek failed") "readlines() after seek failed")
fobj.seek(0) fobj.seek(0)
self.assertEqual(len(fobj.readline()), fobj.tell(), self.assertEqual(len(fobj.readline()), fobj.tell(),
"tell() after readline() failed") "tell() after readline() failed")
fobj.seek(512) fobj.seek(512)
self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
"tell() after seek() and readline() failed") "tell() after seek() and readline() failed")
fobj.seek(0) fobj.seek(0)
line = fobj.readline() line = fobj.readline()
...@@ -154,24 +182,36 @@ class UstarReadTest(ReadTest): ...@@ -154,24 +182,36 @@ class UstarReadTest(ReadTest):
# test link members each point to a regular member whose data is # test link members each point to a regular member whose data is
# supposed to be exported. # supposed to be exported.
def _test_fileobj_link(self, lnktype, regtype): def _test_fileobj_link(self, lnktype, regtype):
with self.tar.extractfile(lnktype) as a, self.tar.extractfile(regtype) as b: with self.tar.extractfile(lnktype) as a, \
self.tar.extractfile(regtype) as b:
self.assertEqual(a.name, b.name) self.assertEqual(a.name, b.name)
def test_fileobj_link1(self): def test_fileobj_link1(self):
self._test_fileobj_link("ustar/lnktype", "ustar/regtype") self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
def test_fileobj_link2(self): def test_fileobj_link2(self):
self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") self._test_fileobj_link("./ustar/linktest2/lnktype",
"ustar/linktest1/regtype")
def test_fileobj_symlink1(self): def test_fileobj_symlink1(self):
self._test_fileobj_link("ustar/symtype", "ustar/regtype") self._test_fileobj_link("ustar/symtype", "ustar/regtype")
def test_fileobj_symlink2(self): def test_fileobj_symlink2(self):
self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") self._test_fileobj_link("./ustar/linktest2/symtype",
"ustar/linktest1/regtype")
def test_issue14160(self): def test_issue14160(self):
self._test_fileobj_link("symtype2", "ustar/regtype") self._test_fileobj_link("symtype2", "ustar/regtype")
class GzipUstarReadTest(GzipTest, UstarReadTest):
pass
class Bz2UstarReadTest(Bz2Test, UstarReadTest):
pass
class LzmaUstarReadTest(LzmaTest, UstarReadTest):
pass
class CommonReadTest(ReadTest): class CommonReadTest(ReadTest):
...@@ -203,37 +243,24 @@ class CommonReadTest(ReadTest): ...@@ -203,37 +243,24 @@ class CommonReadTest(ReadTest):
def test_ignore_zeros(self): def test_ignore_zeros(self):
# Test TarFile's ignore_zeros option. # Test TarFile's ignore_zeros option.
if self.mode.endswith(":gz"):
_open = gzip.GzipFile
elif self.mode.endswith(":bz2"):
_open = bz2.BZ2File
elif self.mode.endswith(":xz"):
_open = lzma.LZMAFile
else:
_open = io.FileIO
for char in (b'\0', b'a'): for char in (b'\0', b'a'):
# Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
# are ignored correctly. # are ignored correctly.
with _open(tmpname, "w") as fobj: with self.open(tmpname, "w") as fobj:
fobj.write(char * 1024) fobj.write(char * 1024)
fobj.write(tarfile.TarInfo("foo").tobuf()) fobj.write(tarfile.TarInfo("foo").tobuf())
tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
try: try:
self.assertListEqual(tar.getnames(), ["foo"], self.assertListEqual(tar.getnames(), ["foo"],
"ignore_zeros=True should have skipped the %r-blocks" % char) "ignore_zeros=True should have skipped the %r-blocks" %
char)
finally: finally:
tar.close() tar.close()
class MiscReadTest(CommonReadTest): class MiscReadTestBase(CommonReadTest):
def test_no_name_argument(self): def test_no_name_argument(self):
if self.mode.endswith(("bz2", "xz")):
# BZ2File and LZMAFile have no name attribute.
self.skipTest("no name attribute")
with open(self.tarname, "rb") as fobj: with open(self.tarname, "rb") as fobj:
tar = tarfile.open(fileobj=fobj, mode=self.mode) tar = tarfile.open(fileobj=fobj, mode=self.mode)
self.assertEqual(tar.name, os.path.abspath(fobj.name)) self.assertEqual(tar.name, os.path.abspath(fobj.name))
...@@ -269,16 +296,7 @@ class MiscReadTest(CommonReadTest): ...@@ -269,16 +296,7 @@ class MiscReadTest(CommonReadTest):
tar.close() tar.close()
# Open the testtar and seek to the offset of the second member. # Open the testtar and seek to the offset of the second member.
if self.mode.endswith(":gz"): with self.open(self.tarname) as fobj:
_open = gzip.GzipFile
elif self.mode.endswith(":bz2"):
_open = bz2.BZ2File
elif self.mode.endswith(":xz"):
_open = lzma.LZMAFile
else:
_open = io.FileIO
with _open(self.tarname) as fobj:
fobj.seek(offset) fobj.seek(offset)
# Test if the tarfile starts with the second member. # Test if the tarfile starts with the second member.
...@@ -294,8 +312,6 @@ class MiscReadTest(CommonReadTest): ...@@ -294,8 +312,6 @@ class MiscReadTest(CommonReadTest):
def test_fail_comp(self): def test_fail_comp(self):
# For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
if self.mode == "r:":
return
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
with open(tarname, "rb") as fobj: with open(tarname, "rb") as fobj:
self.assertRaises(tarfile.ReadError, tarfile.open, self.assertRaises(tarfile.ReadError, tarfile.open,
...@@ -306,7 +322,7 @@ class MiscReadTest(CommonReadTest): ...@@ -306,7 +322,7 @@ class MiscReadTest(CommonReadTest):
# Old V7 tars create directory members using an AREGTYPE # Old V7 tars create directory members using an AREGTYPE
# header with a "/" appended to the filename field. # header with a "/" appended to the filename field.
tarinfo = self.tar.getmember("misc/dirtype-old-v7") tarinfo = self.tar.getmember("misc/dirtype-old-v7")
self.assertTrue(tarinfo.type == tarfile.DIRTYPE, self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
"v7 dirtype failed") "v7 dirtype failed")
def test_xstar_type(self): def test_xstar_type(self):
...@@ -320,15 +336,15 @@ class MiscReadTest(CommonReadTest): ...@@ -320,15 +336,15 @@ class MiscReadTest(CommonReadTest):
def test_check_members(self): def test_check_members(self):
for tarinfo in self.tar: for tarinfo in self.tar:
self.assertTrue(int(tarinfo.mtime) == 0o7606136617, self.assertEqual(int(tarinfo.mtime), 0o7606136617,
"wrong mtime for %s" % tarinfo.name) "wrong mtime for %s" % tarinfo.name)
if not tarinfo.name.startswith("ustar/"): if not tarinfo.name.startswith("ustar/"):
continue continue
self.assertTrue(tarinfo.uname == "tarfile", self.assertEqual(tarinfo.uname, "tarfile",
"wrong uname for %s" % tarinfo.name) "wrong uname for %s" % tarinfo.name)
def test_find_members(self): def test_find_members(self):
self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
"could not find all members") "could not find all members")
@unittest.skipUnless(hasattr(os, "link"), @unittest.skipUnless(hasattr(os, "link"),
...@@ -365,7 +381,8 @@ class MiscReadTest(CommonReadTest): ...@@ -365,7 +381,8 @@ class MiscReadTest(CommonReadTest):
path = os.path.join(DIR, tarinfo.name) path = os.path.join(DIR, tarinfo.name)
if sys.platform != "win32": if sys.platform != "win32":
# Win32 has no support for fine grained permissions. # Win32 has no support for fine grained permissions.
self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777) self.assertEqual(tarinfo.mode & 0o777,
os.stat(path).st_mode & 0o777)
def format_mtime(mtime): def format_mtime(mtime):
if isinstance(mtime, float): if isinstance(mtime, float):
return "{} ({})".format(mtime, mtime.hex()) return "{} ({})".format(mtime, mtime.hex())
...@@ -423,10 +440,28 @@ class MiscReadTest(CommonReadTest): ...@@ -423,10 +440,28 @@ class MiscReadTest(CommonReadTest):
self.assertEqual(m1.offset, m2.offset) self.assertEqual(m1.offset, m2.offset)
self.assertEqual(m1.get_info(), m2.get_info()) self.assertEqual(m1.get_info(), m2.get_info())
class MiscReadTest(MiscReadTestBase, unittest.TestCase):
test_fail_comp = None
class StreamReadTest(CommonReadTest): class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
def test_non_existent_targz_file(self):
# Test for issue11513: prevent non-existent gzipped tarfiles raising
# multiple exceptions.
with self.assertRaisesRegex(FileNotFoundError, "xxx"):
tarfile.open("xxx", self.mode)
class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
def test_no_name_argument(self):
self.skipTest("BZ2File have no name attribute")
class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
def test_no_name_argument(self):
self.skipTest("LZMAFile have no name attribute")
mode="r|"
class StreamReadTest(CommonReadTest, unittest.TestCase):
prefix="r|"
def test_read_through(self): def test_read_through(self):
# Issue #11224: A poorly designed _FileInFile.read() method # Issue #11224: A poorly designed _FileInFile.read() method
...@@ -439,7 +474,8 @@ class StreamReadTest(CommonReadTest): ...@@ -439,7 +474,8 @@ class StreamReadTest(CommonReadTest):
try: try:
buf = fobj.read(512) buf = fobj.read(512)
except tarfile.StreamError: except tarfile.StreamError:
self.fail("simple read-through using TarFile.extractfile() failed") self.fail("simple read-through using "
"TarFile.extractfile() failed")
if not buf: if not buf:
break break
...@@ -447,7 +483,9 @@ class StreamReadTest(CommonReadTest): ...@@ -447,7 +483,9 @@ class StreamReadTest(CommonReadTest):
tarinfo = self.tar.next() # get "regtype" (can't use getmember) tarinfo = self.tar.next() # get "regtype" (can't use getmember)
with self.tar.extractfile(tarinfo) as fobj: with self.tar.extractfile(tarinfo) as fobj:
data = fobj.read() data = fobj.read()
self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), self.assertEqual(len(data), tarinfo.size,
"regular file extraction failed")
self.assertEqual(md5sum(data), md5_regtype,
"regular file extraction failed") "regular file extraction failed")
def test_provoke_stream_error(self): def test_provoke_stream_error(self):
...@@ -465,24 +503,34 @@ class StreamReadTest(CommonReadTest): ...@@ -465,24 +503,34 @@ class StreamReadTest(CommonReadTest):
t2 = tar2.next() t2 = tar2.next()
if t1 is None: if t1 is None:
break break
self.assertTrue(t2 is not None, "stream.next() failed.") self.assertIsNotNone(t2, "stream.next() failed.")
if t2.islnk() or t2.issym(): if t2.islnk() or t2.issym():
self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) with self.assertRaises(tarfile.StreamError):
tar2.extractfile(t2)
continue continue
v1 = tar1.extractfile(t1) v1 = tar1.extractfile(t1)
v2 = tar2.extractfile(t2) v2 = tar2.extractfile(t2)
if v1 is None: if v1 is None:
continue continue
self.assertTrue(v2 is not None, "stream.extractfile() failed") self.assertIsNotNone(v2, "stream.extractfile() failed")
self.assertEqual(v1.read(), v2.read(), "stream extraction failed") self.assertEqual(v1.read(), v2.read(),
"stream extraction failed")
finally: finally:
tar1.close() tar1.close()
class GzipStreamReadTest(GzipTest, StreamReadTest):
pass
class DetectReadTest(unittest.TestCase): class Bz2StreamReadTest(Bz2Test, StreamReadTest):
pass
class LzmaStreamReadTest(LzmaTest, StreamReadTest):
pass
class DetectReadTest(TarTest, unittest.TestCase):
def _testfunc_file(self, name, mode): def _testfunc_file(self, name, mode):
try: try:
tar = tarfile.open(name, mode) tar = tarfile.open(name, mode)
...@@ -501,47 +549,20 @@ class DetectReadTest(unittest.TestCase): ...@@ -501,47 +549,20 @@ class DetectReadTest(unittest.TestCase):
tar.close() tar.close()
def _test_modes(self, testfunc): def _test_modes(self, testfunc):
testfunc(tarname, "r") if self.suffix:
testfunc(tarname, "r:") with self.assertRaises(tarfile.ReadError):
testfunc(tarname, "r:*") tarfile.open(tarname, mode="r:" + self.suffix)
testfunc(tarname, "r|") with self.assertRaises(tarfile.ReadError):
testfunc(tarname, "r|*") tarfile.open(tarname, mode="r|" + self.suffix)
with self.assertRaises(tarfile.ReadError):
if gzip: tarfile.open(self.tarname, mode="r:")
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") with self.assertRaises(tarfile.ReadError):
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") tarfile.open(self.tarname, mode="r|")
self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") testfunc(self.tarname, "r")
self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") testfunc(self.tarname, "r:" + self.suffix)
testfunc(self.tarname, "r:*")
testfunc(gzipname, "r") testfunc(self.tarname, "r|" + self.suffix)
testfunc(gzipname, "r:*") testfunc(self.tarname, "r|*")
testfunc(gzipname, "r:gz")
testfunc(gzipname, "r|*")
testfunc(gzipname, "r|gz")
if bz2:
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
testfunc(bz2name, "r")
testfunc(bz2name, "r:*")
testfunc(bz2name, "r:bz2")
testfunc(bz2name, "r|*")
testfunc(bz2name, "r|bz2")
if lzma:
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:xz")
self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|xz")
self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r:")
self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r|")
testfunc(xzname, "r")
testfunc(xzname, "r:*")
testfunc(xzname, "r:xz")
testfunc(xzname, "r|*")
testfunc(xzname, "r|xz")
def test_detect_file(self): def test_detect_file(self):
self._test_modes(self._testfunc_file) self._test_modes(self._testfunc_file)
...@@ -549,14 +570,15 @@ class DetectReadTest(unittest.TestCase): ...@@ -549,14 +570,15 @@ class DetectReadTest(unittest.TestCase):
def test_detect_fileobj(self): def test_detect_fileobj(self):
self._test_modes(self._testfunc_fileobj) self._test_modes(self._testfunc_fileobj)
class GzipDetectReadTest(GzipTest, DetectReadTest):
pass
class Bz2DetectReadTest(Bz2Test, DetectReadTest):
def test_detect_stream_bz2(self): def test_detect_stream_bz2(self):
# Originally, tarfile's stream detection looked for the string # Originally, tarfile's stream detection looked for the string
# "BZh91" at the start of the file. This is incorrect because # "BZh91" at the start of the file. This is incorrect because
# the '9' represents the blocksize (900kB). If the file was # the '9' represents the blocksize (900kB). If the file was
# compressed using another blocksize autodetection fails. # compressed using another blocksize autodetection fails.
if not bz2:
return
with open(tarname, "rb") as fobj: with open(tarname, "rb") as fobj:
data = fobj.read() data = fobj.read()
...@@ -566,13 +588,17 @@ class DetectReadTest(unittest.TestCase): ...@@ -566,13 +588,17 @@ class DetectReadTest(unittest.TestCase):
self._testfunc_file(tmpname, "r|*") self._testfunc_file(tmpname, "r|*")
class LzmaDetectReadTest(LzmaTest, DetectReadTest):
pass
class MemberReadTest(ReadTest): class MemberReadTest(ReadTest, unittest.TestCase):
def _test_member(self, tarinfo, chksum=None, **kwargs): def _test_member(self, tarinfo, chksum=None, **kwargs):
if chksum is not None: if chksum is not None:
self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, with self.tar.extractfile(tarinfo) as f:
"wrong md5sum for %s" % tarinfo.name) self.assertEqual(md5sum(f.read()), chksum,
"wrong md5sum for %s" % tarinfo.name)
kwargs["mtime"] = 0o7606136617 kwargs["mtime"] = 0o7606136617
kwargs["uid"] = 1000 kwargs["uid"] = 1000
...@@ -582,7 +608,7 @@ class MemberReadTest(ReadTest): ...@@ -582,7 +608,7 @@ class MemberReadTest(ReadTest):
kwargs["uname"] = "tarfile" kwargs["uname"] = "tarfile"
kwargs["gname"] = "tarfile" kwargs["gname"] = "tarfile"
for k, v in kwargs.items(): for k, v in kwargs.items():
self.assertTrue(getattr(tarinfo, k) == v, self.assertEqual(getattr(tarinfo, k), v,
"wrong value in %s field of %s" % (k, tarinfo.name)) "wrong value in %s field of %s" % (k, tarinfo.name))
def test_find_regtype(self): def test_find_regtype(self):
...@@ -642,7 +668,8 @@ class MemberReadTest(ReadTest): ...@@ -642,7 +668,8 @@ class MemberReadTest(ReadTest):
self._test_member(tarinfo, size=86016, chksum=md5_sparse) self._test_member(tarinfo, size=86016, chksum=md5_sparse)
def test_find_umlauts(self): def test_find_umlauts(self):
tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") tarinfo = self.tar.getmember("ustar/umlauts-"
"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
self._test_member(tarinfo, size=7011, chksum=md5_regtype) self._test_member(tarinfo, size=7011, chksum=md5_regtype)
def test_find_ustar_longname(self): def test_find_ustar_longname(self):
...@@ -655,12 +682,14 @@ class MemberReadTest(ReadTest): ...@@ -655,12 +682,14 @@ class MemberReadTest(ReadTest):
def test_find_pax_umlauts(self): def test_find_pax_umlauts(self):
self.tar.close() self.tar.close()
self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") self.tar = tarfile.open(self.tarname, mode=self.mode,
tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") encoding="iso8859-1")
tarinfo = self.tar.getmember("pax/umlauts-"
"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
self._test_member(tarinfo, size=7011, chksum=md5_regtype) self._test_member(tarinfo, size=7011, chksum=md5_regtype)
class LongnameTest(ReadTest): class LongnameTest:
def test_read_longname(self): def test_read_longname(self):
# Test reading of longname (bug #1471427). # Test reading of longname (bug #1471427).
...@@ -669,7 +698,8 @@ class LongnameTest(ReadTest): ...@@ -669,7 +698,8 @@ class LongnameTest(ReadTest):
tarinfo = self.tar.getmember(longname) tarinfo = self.tar.getmember(longname)
except KeyError: except KeyError:
self.fail("longname not found") self.fail("longname not found")
self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
"read longname as dirtype")
def test_read_longlink(self): def test_read_longlink(self):
longname = self.subdir + "/" + "123/" * 125 + "longname" longname = self.subdir + "/" + "123/" * 125 + "longname"
...@@ -678,7 +708,7 @@ class LongnameTest(ReadTest): ...@@ -678,7 +708,7 @@ class LongnameTest(ReadTest):
tarinfo = self.tar.getmember(longlink) tarinfo = self.tar.getmember(longlink)
except KeyError: except KeyError:
self.fail("longlink not found") self.fail("longlink not found")
self.assertTrue(tarinfo.linkname == longname, "linkname wrong") self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
def test_truncated_longname(self): def test_truncated_longname(self):
longname = self.subdir + "/" + "123/" * 125 + "longname" longname = self.subdir + "/" + "123/" * 125 + "longname"
...@@ -686,7 +716,8 @@ class LongnameTest(ReadTest): ...@@ -686,7 +716,8 @@ class LongnameTest(ReadTest):
offset = tarinfo.offset offset = tarinfo.offset
self.tar.fileobj.seek(offset) self.tar.fileobj.seek(offset)
fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) with self.assertRaises(tarfile.ReadError):
tarfile.open(name="foo.tar", fileobj=fobj)
def test_header_offset(self): def test_header_offset(self):
# Test if the start offset of the TarInfo object includes # Test if the start offset of the TarInfo object includes
...@@ -695,11 +726,12 @@ class LongnameTest(ReadTest): ...@@ -695,11 +726,12 @@ class LongnameTest(ReadTest):
offset = self.tar.getmember(longname).offset offset = self.tar.getmember(longname).offset
with open(tarname, "rb") as fobj: with open(tarname, "rb") as fobj:
fobj.seek(offset) fobj.seek(offset)
tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict") tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
"iso8859-1", "strict")
self.assertEqual(tarinfo.type, self.longnametype) self.assertEqual(tarinfo.type, self.longnametype)
class GNUReadTest(LongnameTest): class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
subdir = "gnu" subdir = "gnu"
longnametype = tarfile.GNUTYPE_LONGNAME longnametype = tarfile.GNUTYPE_LONGNAME
...@@ -721,7 +753,7 @@ class GNUReadTest(LongnameTest): ...@@ -721,7 +753,7 @@ class GNUReadTest(LongnameTest):
if self._fs_supports_holes(): if self._fs_supports_holes():
s = os.stat(filename) s = os.stat(filename)
self.assertTrue(s.st_blocks * 512 < s.st_size) self.assertLess(s.st_blocks * 512, s.st_size)
def test_sparse_file_old(self): def test_sparse_file_old(self):
self._test_sparse_file("gnu/sparse") self._test_sparse_file("gnu/sparse")
...@@ -753,7 +785,7 @@ class GNUReadTest(LongnameTest): ...@@ -753,7 +785,7 @@ class GNUReadTest(LongnameTest):
return False return False
class PaxReadTest(LongnameTest): class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
subdir = "pax" subdir = "pax"
longnametype = tarfile.XHDTYPE longnametype = tarfile.XHDTYPE
...@@ -764,17 +796,20 @@ class PaxReadTest(LongnameTest): ...@@ -764,17 +796,20 @@ class PaxReadTest(LongnameTest):
tarinfo = tar.getmember("pax/regtype1") tarinfo = tar.getmember("pax/regtype1")
self.assertEqual(tarinfo.uname, "foo") self.assertEqual(tarinfo.uname, "foo")
self.assertEqual(tarinfo.gname, "bar") self.assertEqual(tarinfo.gname, "bar")
self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
tarinfo = tar.getmember("pax/regtype2") tarinfo = tar.getmember("pax/regtype2")
self.assertEqual(tarinfo.uname, "") self.assertEqual(tarinfo.uname, "")
self.assertEqual(tarinfo.gname, "bar") self.assertEqual(tarinfo.gname, "bar")
self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
tarinfo = tar.getmember("pax/regtype3") tarinfo = tar.getmember("pax/regtype3")
self.assertEqual(tarinfo.uname, "tarfile") self.assertEqual(tarinfo.uname, "tarfile")
self.assertEqual(tarinfo.gname, "tarfile") self.assertEqual(tarinfo.gname, "tarfile")
self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
finally: finally:
tar.close() tar.close()
...@@ -794,7 +829,7 @@ class PaxReadTest(LongnameTest): ...@@ -794,7 +829,7 @@ class PaxReadTest(LongnameTest):
tar.close() tar.close()
class WriteTestBase(unittest.TestCase): class WriteTestBase(TarTest):
# Put all write tests in here that are supposed to be tested # Put all write tests in here that are supposed to be tested
# in all possible mode combinations. # in all possible mode combinations.
...@@ -803,12 +838,12 @@ class WriteTestBase(unittest.TestCase): ...@@ -803,12 +838,12 @@ class WriteTestBase(unittest.TestCase):
tar = tarfile.open(fileobj=fobj, mode=self.mode) tar = tarfile.open(fileobj=fobj, mode=self.mode)
tar.addfile(tarfile.TarInfo("foo")) tar.addfile(tarfile.TarInfo("foo"))
tar.close() tar.close()
self.assertTrue(fobj.closed is False, "external fileobjs must never closed") self.assertFalse(fobj.closed, "external fileobjs must never closed")
class WriteTest(WriteTestBase): class WriteTest(WriteTestBase, unittest.TestCase):
mode = "w:" prefix = "w:"
def test_100_char_name(self): def test_100_char_name(self):
# The name field in a tar header stores strings of at most 100 chars. # The name field in a tar header stores strings of at most 100 chars.
...@@ -825,7 +860,7 @@ class WriteTest(WriteTestBase): ...@@ -825,7 +860,7 @@ class WriteTest(WriteTestBase):
tar = tarfile.open(tmpname) tar = tarfile.open(tmpname)
try: try:
self.assertTrue(tar.getnames()[0] == name, self.assertEqual(tar.getnames()[0], name,
"failed to store 100 char filename") "failed to store 100 char filename")
finally: finally:
tar.close() tar.close()
...@@ -840,7 +875,7 @@ class WriteTest(WriteTestBase): ...@@ -840,7 +875,7 @@ class WriteTest(WriteTestBase):
tar.add(path) tar.add(path)
finally: finally:
tar.close() tar.close()
self.assertTrue(os.path.getsize(tmpname) > 0, self.assertGreater(os.path.getsize(tmpname), 0,
"tarfile is empty") "tarfile is empty")
# The test_*_size tests test for bug #1167128. # The test_*_size tests test for bug #1167128.
...@@ -873,25 +908,26 @@ class WriteTest(WriteTestBase): ...@@ -873,25 +908,26 @@ class WriteTest(WriteTestBase):
finally: finally:
os.rmdir(path) os.rmdir(path)
@unittest.skipUnless(hasattr(os, "link"),
"Missing hardlink implementation")
def test_link_size(self): def test_link_size(self):
if hasattr(os, "link"): link = os.path.join(TEMPDIR, "link")
link = os.path.join(TEMPDIR, "link") target = os.path.join(TEMPDIR, "link_target")
target = os.path.join(TEMPDIR, "link_target") with open(target, "wb") as fobj:
with open(target, "wb") as fobj: fobj.write(b"aaa")
fobj.write(b"aaa") os.link(target, link)
os.link(target, link) try:
tar = tarfile.open(tmpname, self.mode)
try: try:
tar = tarfile.open(tmpname, self.mode) # Record the link target in the inodes list.
try: tar.gettarinfo(target)
# Record the link target in the inodes list. tarinfo = tar.gettarinfo(link)
tar.gettarinfo(target) self.assertEqual(tarinfo.size, 0)
tarinfo = tar.gettarinfo(link)
self.assertEqual(tarinfo.size, 0)
finally:
tar.close()
finally: finally:
os.remove(target) tar.close()
os.remove(link) finally:
os.remove(target)
os.remove(link)
@support.skip_unless_symlink @support.skip_unless_symlink
def test_symlink_size(self): def test_symlink_size(self):
...@@ -912,15 +948,18 @@ class WriteTest(WriteTestBase): ...@@ -912,15 +948,18 @@ class WriteTest(WriteTestBase):
dstname = os.path.abspath(tmpname) dstname = os.path.abspath(tmpname)
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
try: try:
self.assertTrue(tar.name == dstname, "archive name must be absolute") self.assertEqual(tar.name, dstname,
"archive name must be absolute")
tar.add(dstname) tar.add(dstname)
self.assertTrue(tar.getnames() == [], "added the archive to itself") self.assertEqual(tar.getnames(), [],
"added the archive to itself")
cwd = os.getcwd() cwd = os.getcwd()
os.chdir(TEMPDIR) os.chdir(TEMPDIR)
tar.add(dstname) tar.add(dstname)
os.chdir(cwd) os.chdir(cwd)
self.assertTrue(tar.getnames() == [], "added the archive to itself") self.assertEqual(tar.getnames(), [],
"added the archive to itself")
finally: finally:
tar.close() tar.close()
...@@ -1087,48 +1126,49 @@ class WriteTest(WriteTestBase): ...@@ -1087,48 +1126,49 @@ class WriteTest(WriteTestBase):
tar = tarfile.open(tmpname, "r") tar = tarfile.open(tmpname, "r")
try: try:
for t in tar: for t in tar:
self.assertTrue(t.name == "." or t.name.startswith("./")) if t.name != ".":
self.assertTrue(t.name.startswith("./"), t.name)
finally: finally:
tar.close() tar.close()
finally: finally:
os.chdir(cwd) os.chdir(cwd)
class GzipWriteTest(GzipTest, WriteTest):
pass
class Bz2WriteTest(Bz2Test, WriteTest):
pass
class LzmaWriteTest(LzmaTest, WriteTest):
pass
class StreamWriteTest(WriteTestBase):
mode = "w|" class StreamWriteTest(WriteTestBase, unittest.TestCase):
prefix = "w|"
decompressor = None
def test_stream_padding(self): def test_stream_padding(self):
# Test for bug #1543303. # Test for bug #1543303.
tar = tarfile.open(tmpname, self.mode) tar = tarfile.open(tmpname, self.mode)
tar.close() tar.close()
if self.decompressor:
if self.mode.endswith("gz"): dec = self.decompressor()
with gzip.GzipFile(tmpname) as fobj:
data = fobj.read()
elif self.mode.endswith("bz2"):
dec = bz2.BZ2Decompressor()
with open(tmpname, "rb") as fobj: with open(tmpname, "rb") as fobj:
data = fobj.read() data = fobj.read()
data = dec.decompress(data) data = dec.decompress(data)
self.assertTrue(len(dec.unused_data) == 0, self.assertFalse(dec.unused_data, "found trailing data")
"found trailing data")
elif self.mode.endswith("xz"):
with lzma.LZMAFile(tmpname) as fobj:
data = fobj.read()
else: else:
with open(tmpname, "rb") as fobj: with self.open(tmpname) as fobj:
data = fobj.read() data = fobj.read()
self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
"incorrect zero padding")
self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE, @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
"incorrect zero padding") "Missing umask implementation")
def test_file_mode(self): def test_file_mode(self):
# Test for issue #8464: Create files with correct # Test for issue #8464: Create files with correct
# permissions. # permissions.
if sys.platform == "win32" or not hasattr(os, "umask"):
return
if os.path.exists(tmpname): if os.path.exists(tmpname):
os.remove(tmpname) os.remove(tmpname)
...@@ -1141,15 +1181,22 @@ class StreamWriteTest(WriteTestBase): ...@@ -1141,15 +1181,22 @@ class StreamWriteTest(WriteTestBase):
finally: finally:
os.umask(original_umask) os.umask(original_umask)
class GzipStreamWriteTest(GzipTest, StreamWriteTest):
pass
class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
decompressor = bz2.BZ2Decompressor if bz2 else None
class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
decompressor = lzma.LZMADecompressor if lzma else None
class GNUWriteTest(unittest.TestCase): class GNUWriteTest(unittest.TestCase):
# This testcase checks for correct creation of GNU Longname # This testcase checks for correct creation of GNU Longname
# and Longlink extended headers (cp. bug #812325). # and Longlink extended headers (cp. bug #812325).
def _length(self, s): def _length(self, s):
blocks, remainder = divmod(len(s) + 1, 512) blocks = len(s) // 512 + 1
if remainder:
blocks += 1
return blocks * 512 return blocks * 512
def _calc_size(self, name, link=None): def _calc_size(self, name, link=None):
...@@ -1179,7 +1226,7 @@ class GNUWriteTest(unittest.TestCase): ...@@ -1179,7 +1226,7 @@ class GNUWriteTest(unittest.TestCase):
v1 = self._calc_size(name, link) v1 = self._calc_size(name, link)
v2 = tar.offset v2 = tar.offset
self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
finally: finally:
tar.close() tar.close()
...@@ -1226,6 +1273,7 @@ class GNUWriteTest(unittest.TestCase): ...@@ -1226,6 +1273,7 @@ class GNUWriteTest(unittest.TestCase):
("longlnk/" * 127) + "longlink_") ("longlnk/" * 127) + "longlink_")
@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
class HardlinkTest(unittest.TestCase): class HardlinkTest(unittest.TestCase):
# Test the creation of LNKTYPE (hardlink) members in an archive. # Test the creation of LNKTYPE (hardlink) members in an archive.
...@@ -1250,18 +1298,18 @@ class HardlinkTest(unittest.TestCase): ...@@ -1250,18 +1298,18 @@ class HardlinkTest(unittest.TestCase):
# The same name will be added as a REGTYPE every # The same name will be added as a REGTYPE every
# time regardless of st_nlink. # time regardless of st_nlink.
tarinfo = self.tar.gettarinfo(self.foo) tarinfo = self.tar.gettarinfo(self.foo)
self.assertTrue(tarinfo.type == tarfile.REGTYPE, self.assertEqual(tarinfo.type, tarfile.REGTYPE,
"add file as regular failed") "add file as regular failed")
def test_add_hardlink(self): def test_add_hardlink(self):
tarinfo = self.tar.gettarinfo(self.bar) tarinfo = self.tar.gettarinfo(self.bar)
self.assertTrue(tarinfo.type == tarfile.LNKTYPE, self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
"add file as hardlink failed") "add file as hardlink failed")
def test_dereference_hardlink(self): def test_dereference_hardlink(self):
self.tar.dereference = True self.tar.dereference = True
tarinfo = self.tar.gettarinfo(self.bar) tarinfo = self.tar.gettarinfo(self.bar)
self.assertTrue(tarinfo.type == tarfile.REGTYPE, self.assertEqual(tarinfo.type, tarfile.REGTYPE,
"dereferencing hardlink failed") "dereferencing hardlink failed")
...@@ -1284,10 +1332,10 @@ class PaxWriteTest(GNUWriteTest): ...@@ -1284,10 +1332,10 @@ class PaxWriteTest(GNUWriteTest):
try: try:
if link: if link:
l = tar.getmembers()[0].linkname l = tar.getmembers()[0].linkname
self.assertTrue(link == l, "PAX longlink creation failed") self.assertEqual(link, l, "PAX longlink creation failed")
else: else:
n = tar.getmembers()[0].name n = tar.getmembers()[0].name
self.assertTrue(name == n, "PAX longname creation failed") self.assertEqual(name, n, "PAX longname creation failed")
finally: finally:
tar.close() tar.close()
...@@ -1313,8 +1361,8 @@ class PaxWriteTest(GNUWriteTest): ...@@ -1313,8 +1361,8 @@ class PaxWriteTest(GNUWriteTest):
self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
# Test if all the fields are strings. # Test if all the fields are strings.
for key, val in tar.pax_headers.items(): for key, val in tar.pax_headers.items():
self.assertTrue(type(key) is not bytes) self.assertIsNot(type(key), bytes)
self.assertTrue(type(val) is not bytes) self.assertIsNot(type(val), bytes)
if key in tarfile.PAX_NUMBER_FIELDS: if key in tarfile.PAX_NUMBER_FIELDS:
try: try:
tarfile.PAX_NUMBER_FIELDS[key](val) tarfile.PAX_NUMBER_FIELDS[key](val)
...@@ -1328,7 +1376,8 @@ class PaxWriteTest(GNUWriteTest): ...@@ -1328,7 +1376,8 @@ class PaxWriteTest(GNUWriteTest):
# TarInfo. # TarInfo.
pax_headers = {"path": "foo", "uid": "123"} pax_headers = {"path": "foo", "uid": "123"}
tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
encoding="iso8859-1")
try: try:
t = tarfile.TarInfo() t = tarfile.TarInfo()
t.name = "\xe4\xf6\xfc" # non-ASCII t.name = "\xe4\xf6\xfc" # non-ASCII
...@@ -1362,7 +1411,8 @@ class UstarUnicodeTest(unittest.TestCase): ...@@ -1362,7 +1411,8 @@ class UstarUnicodeTest(unittest.TestCase):
self._test_unicode_filename("utf-8") self._test_unicode_filename("utf-8")
def _test_unicode_filename(self, encoding): def _test_unicode_filename(self, encoding):
tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") tar = tarfile.open(tmpname, "w", format=self.format,
encoding=encoding, errors="strict")
try: try:
name = "\xe4\xf6\xfc" name = "\xe4\xf6\xfc"
tar.addfile(tarfile.TarInfo(name)) tar.addfile(tarfile.TarInfo(name))
...@@ -1376,11 +1426,8 @@ class UstarUnicodeTest(unittest.TestCase): ...@@ -1376,11 +1426,8 @@ class UstarUnicodeTest(unittest.TestCase):
tar.close() tar.close()
def test_unicode_filename_error(self): def test_unicode_filename_error(self):
if self.format == tarfile.PAX_FORMAT: tar = tarfile.open(tmpname, "w", format=self.format,
# PAX_FORMAT ignores encoding in write mode. encoding="ascii", errors="strict")
return
tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
try: try:
tarinfo = tarfile.TarInfo() tarinfo = tarfile.TarInfo()
...@@ -1394,13 +1441,14 @@ class UstarUnicodeTest(unittest.TestCase): ...@@ -1394,13 +1441,14 @@ class UstarUnicodeTest(unittest.TestCase):
tar.close() tar.close()
def test_unicode_argument(self): def test_unicode_argument(self):
tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") tar = tarfile.open(tarname, "r",
encoding="iso8859-1", errors="strict")
try: try:
for t in tar: for t in tar:
self.assertTrue(type(t.name) is str) self.assertIs(type(t.name), str)
self.assertTrue(type(t.linkname) is str) self.assertIs(type(t.linkname), str)
self.assertTrue(type(t.uname) is str) self.assertIs(type(t.uname), str)
self.assertTrue(type(t.gname) is str) self.assertIs(type(t.gname), str)
finally: finally:
tar.close() tar.close()
...@@ -1409,7 +1457,8 @@ class UstarUnicodeTest(unittest.TestCase): ...@@ -1409,7 +1457,8 @@ class UstarUnicodeTest(unittest.TestCase):
t.uname = "\xe4\xf6\xfc" t.uname = "\xe4\xf6\xfc"
t.gname = "\xe4\xf6\xfc" t.gname = "\xe4\xf6\xfc"
tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") tar = tarfile.open(tmpname, mode="w", format=self.format,
encoding="iso8859-1")
try: try:
tar.addfile(t) tar.addfile(t)
finally: finally:
...@@ -1438,9 +1487,11 @@ class GNUUnicodeTest(UstarUnicodeTest): ...@@ -1438,9 +1487,11 @@ class GNUUnicodeTest(UstarUnicodeTest):
def test_bad_pax_header(self): def test_bad_pax_header(self):
# Test for issue #8633. GNU tar <= 1.23 creates raw binary fields # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
# without a hdrcharset=BINARY header. # without a hdrcharset=BINARY header.
for encoding, name in (("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), for encoding, name in (
("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: with tarfile.open(tarname, encoding=encoding,
errors="surrogateescape") as tar:
try: try:
t = tar.getmember(name) t = tar.getmember(name)
except KeyError: except KeyError:
...@@ -1451,18 +1502,23 @@ class PAXUnicodeTest(UstarUnicodeTest): ...@@ -1451,18 +1502,23 @@ class PAXUnicodeTest(UstarUnicodeTest):
format = tarfile.PAX_FORMAT format = tarfile.PAX_FORMAT
# PAX_FORMAT ignores encoding in write mode.
test_unicode_filename_error = None
def test_binary_header(self): def test_binary_header(self):
# Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
for encoding, name in (("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), for encoding, name in (
("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: with tarfile.open(tarname, encoding=encoding,
errors="surrogateescape") as tar:
try: try:
t = tar.getmember(name) t = tar.getmember(name)
except KeyError: except KeyError:
self.fail("unable to read POSIX.1-2008 binary header") self.fail("unable to read POSIX.1-2008 binary header")
class AppendTest(unittest.TestCase): class AppendTestBase:
# Test append mode (cp. patch #1652681). # Test append mode (cp. patch #1652681).
def setUp(self): def setUp(self):
...@@ -1470,10 +1526,6 @@ class AppendTest(unittest.TestCase): ...@@ -1470,10 +1526,6 @@ class AppendTest(unittest.TestCase):
if os.path.exists(self.tarname): if os.path.exists(self.tarname):
os.remove(self.tarname) os.remove(self.tarname)
def _add_testfile(self, fileobj=None):
with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
tar.addfile(tarfile.TarInfo("bar"))
def _create_testtar(self, mode="w:"): def _create_testtar(self, mode="w:"):
with tarfile.open(tarname, encoding="iso8859-1") as src: with tarfile.open(tarname, encoding="iso8859-1") as src:
t = src.getmember("ustar/regtype") t = src.getmember("ustar/regtype")
...@@ -1482,6 +1534,17 @@ class AppendTest(unittest.TestCase): ...@@ -1482,6 +1534,17 @@ class AppendTest(unittest.TestCase):
with tarfile.open(self.tarname, mode) as tar: with tarfile.open(self.tarname, mode) as tar:
tar.addfile(t, f) tar.addfile(t, f)
def test_append_compressed(self):
self._create_testtar("w:" + self.suffix)
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
class AppendTest(AppendTestBase, unittest.TestCase):
test_append_compressed = None
def _add_testfile(self, fileobj=None):
with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
tar.addfile(tarfile.TarInfo("bar"))
def _test(self, names=["bar"], fileobj=None): def _test(self, names=["bar"], fileobj=None):
with tarfile.open(self.tarname, fileobj=fileobj) as tar: with tarfile.open(self.tarname, fileobj=fileobj) as tar:
self.assertEqual(tar.getnames(), names) self.assertEqual(tar.getnames(), names)
...@@ -1515,24 +1578,6 @@ class AppendTest(unittest.TestCase): ...@@ -1515,24 +1578,6 @@ class AppendTest(unittest.TestCase):
self._add_testfile() self._add_testfile()
self._test(names=["foo", "bar"]) self._test(names=["foo", "bar"])
def test_append_gz(self):
if gzip is None:
return
self._create_testtar("w:gz")
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
def test_append_bz2(self):
if bz2 is None:
return
self._create_testtar("w:bz2")
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
def test_append_lzma(self):
if lzma is None:
self.skipTest("lzma module not available")
self._create_testtar("w:xz")
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
# Append mode is supposed to fail if the tarfile to append to # Append mode is supposed to fail if the tarfile to append to
# does not end with a zero block. # does not end with a zero block.
def _test_error(self, data): def _test_error(self, data):
...@@ -1557,6 +1602,15 @@ class AppendTest(unittest.TestCase): ...@@ -1557,6 +1602,15 @@ class AppendTest(unittest.TestCase):
def test_invalid(self): def test_invalid(self):
self._test_error(b"a" * 512) self._test_error(b"a" * 512)
class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
pass
class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
pass
class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
pass
class LimitsTest(unittest.TestCase): class LimitsTest(unittest.TestCase):
...@@ -1620,36 +1674,54 @@ class LimitsTest(unittest.TestCase): ...@@ -1620,36 +1674,54 @@ class LimitsTest(unittest.TestCase):
class MiscTest(unittest.TestCase): class MiscTest(unittest.TestCase):
def test_char_fields(self): def test_char_fields(self):
self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0") self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo") b"foo\0\0\0\0\0")
self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo") self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo") b"foo")
self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
"foo")
self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
"foo")
def test_read_number_fields(self): def test_read_number_fields(self):
# Issue 13158: Test if GNU tar specific base-256 number fields # Issue 13158: Test if GNU tar specific base-256 number fields
# are decoded correctly. # are decoded correctly.
self.assertEqual(tarfile.nti(b"0000001\x00"), 1) self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 0o10000000) self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 0xffffffff) 0o10000000)
self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), -1) self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), -100) 0xffffffff)
self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), -0x100000000000000) self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
-1)
self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
-100)
self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
-0x100000000000000)
def test_write_number_fields(self): def test_write_number_fields(self):
self.assertEqual(tarfile.itn(1), b"0000001\x00") self.assertEqual(tarfile.itn(1), b"0000001\x00")
self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
self.assertEqual(tarfile.itn(0o10000000), b"\x80\x00\x00\x00\x00\x20\x00\x00") self.assertEqual(tarfile.itn(0o10000000),
self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff") b"\x80\x00\x00\x00\x00\x20\x00\x00")
self.assertEqual(tarfile.itn(-1), b"\xff\xff\xff\xff\xff\xff\xff\xff") self.assertEqual(tarfile.itn(0xffffffff),
self.assertEqual(tarfile.itn(-100), b"\xff\xff\xff\xff\xff\xff\xff\x9c") b"\x80\x00\x00\x00\xff\xff\xff\xff")
self.assertEqual(tarfile.itn(-0x100000000000000), b"\xff\x00\x00\x00\x00\x00\x00\x00") self.assertEqual(tarfile.itn(-1),
b"\xff\xff\xff\xff\xff\xff\xff\xff")
self.assertEqual(tarfile.itn(-100),
b"\xff\xff\xff\xff\xff\xff\xff\x9c")
self.assertEqual(tarfile.itn(-0x100000000000000),
b"\xff\x00\x00\x00\x00\x00\x00\x00")
def test_number_field_limits(self): def test_number_field_limits(self):
self.assertRaises(ValueError, tarfile.itn, -1, 8, tarfile.USTAR_FORMAT) with self.assertRaises(ValueError):
self.assertRaises(ValueError, tarfile.itn, 0o10000000, 8, tarfile.USTAR_FORMAT) tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
self.assertRaises(ValueError, tarfile.itn, -0x10000000001, 6, tarfile.GNU_FORMAT) with self.assertRaises(ValueError):
self.assertRaises(ValueError, tarfile.itn, 0x10000000000, 6, tarfile.GNU_FORMAT) tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
with self.assertRaises(ValueError):
tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
with self.assertRaises(ValueError):
tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
class ContextManagerTest(unittest.TestCase): class ContextManagerTest(unittest.TestCase):
...@@ -1710,19 +1782,19 @@ class ContextManagerTest(unittest.TestCase): ...@@ -1710,19 +1782,19 @@ class ContextManagerTest(unittest.TestCase):
self.assertTrue(tar.closed, "context manager failed") self.assertTrue(tar.closed, "context manager failed")
class LinkEmulationTest(ReadTest): @unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
class LinkEmulationTest(ReadTest, unittest.TestCase):
# Test for issue #8741 regression. On platforms that do not support # Test for issue #8741 regression. On platforms that do not support
# symbolic or hard links tarfile tries to extract these types of members as # symbolic or hard links tarfile tries to extract these types of members
# the regular files they point to. # as the regular files they point to.
def _test_link_extraction(self, name): def _test_link_extraction(self, name):
self.tar.extract(name, TEMPDIR) self.tar.extract(name, TEMPDIR)
data = open(os.path.join(TEMPDIR, name), "rb").read() with open(os.path.join(TEMPDIR, name), "rb") as f:
data = f.read()
self.assertEqual(md5sum(data), md5_regtype) self.assertEqual(md5sum(data), md5_regtype)
# When 8879 gets fixed, this will need to change. Currently on Windows # See issues #1578269, #8879, and #17689 for some history on these skips
# we have os.path.islink but no os.link, so these tests fail without the
# following skip until link is completed.
@unittest.skipIf(hasattr(os.path, "islink"), @unittest.skipIf(hasattr(os.path, "islink"),
"Skip emulation - has os.path.islink but not os.link") "Skip emulation - has os.path.islink but not os.link")
def test_hardlink_extraction1(self): def test_hardlink_extraction1(self):
...@@ -1744,44 +1816,7 @@ class LinkEmulationTest(ReadTest): ...@@ -1744,44 +1816,7 @@ class LinkEmulationTest(ReadTest):
self._test_link_extraction("./ustar/linktest2/symtype") self._test_link_extraction("./ustar/linktest2/symtype")
class GzipMiscReadTest(MiscReadTest): class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
tarname = gzipname
mode = "r:gz"
def test_non_existent_targz_file(self):
# Test for issue11513: prevent non-existent gzipped tarfiles raising
# multiple exceptions.
with self.assertRaisesRegex(IOError, "xxx") as ex:
tarfile.open("xxx", self.mode)
self.assertEqual(ex.exception.errno, errno.ENOENT)
class GzipUstarReadTest(UstarReadTest):
tarname = gzipname
mode = "r:gz"
class GzipStreamReadTest(StreamReadTest):
tarname = gzipname
mode = "r|gz"
class GzipWriteTest(WriteTest):
mode = "w:gz"
class GzipStreamWriteTest(StreamWriteTest):
mode = "w|gz"
class Bz2MiscReadTest(MiscReadTest):
tarname = bz2name
mode = "r:bz2"
class Bz2UstarReadTest(UstarReadTest):
tarname = bz2name
mode = "r:bz2"
class Bz2StreamReadTest(StreamReadTest):
tarname = bz2name
mode = "r|bz2"
class Bz2WriteTest(WriteTest):
mode = "w:bz2"
class Bz2StreamWriteTest(StreamWriteTest):
mode = "w|bz2"
class Bz2PartialReadTest(unittest.TestCase):
# Issue5068: The _BZ2Proxy.read() method loops forever # Issue5068: The _BZ2Proxy.read() method loops forever
# on an empty or partial bzipped file. # on an empty or partial bzipped file.
...@@ -1790,7 +1825,8 @@ class Bz2PartialReadTest(unittest.TestCase): ...@@ -1790,7 +1825,8 @@ class Bz2PartialReadTest(unittest.TestCase):
hit_eof = False hit_eof = False
def read(self, n): def read(self, n):
if self.hit_eof: if self.hit_eof:
raise AssertionError("infinite loop detected in tarfile.open()") raise AssertionError("infinite loop detected in "
"tarfile.open()")
self.hit_eof = self.tell() == len(self.getvalue()) self.hit_eof = self.tell() == len(self.getvalue())
return super(MyBytesIO, self).read(n) return super(MyBytesIO, self).read(n)
def seek(self, *args): def seek(self, *args):
...@@ -1811,102 +1847,23 @@ class Bz2PartialReadTest(unittest.TestCase): ...@@ -1811,102 +1847,23 @@ class Bz2PartialReadTest(unittest.TestCase):
self._test_partial_input("r:bz2") self._test_partial_input("r:bz2")
class LzmaMiscReadTest(MiscReadTest): def setUpModule():
tarname = xzname
mode = "r:xz"
class LzmaUstarReadTest(UstarReadTest):
tarname = xzname
mode = "r:xz"
class LzmaStreamReadTest(StreamReadTest):
tarname = xzname
mode = "r|xz"
class LzmaWriteTest(WriteTest):
mode = "w:xz"
class LzmaStreamWriteTest(StreamWriteTest):
mode = "w|xz"
def test_main():
support.unlink(TEMPDIR) support.unlink(TEMPDIR)
os.makedirs(TEMPDIR) os.makedirs(TEMPDIR)
tests = [
UstarReadTest,
MiscReadTest,
StreamReadTest,
DetectReadTest,
MemberReadTest,
GNUReadTest,
PaxReadTest,
WriteTest,
StreamWriteTest,
GNUWriteTest,
PaxWriteTest,
UstarUnicodeTest,
GNUUnicodeTest,
PAXUnicodeTest,
AppendTest,
LimitsTest,
MiscTest,
ContextManagerTest,
]
if hasattr(os, "link"):
tests.append(HardlinkTest)
else:
tests.append(LinkEmulationTest)
with open(tarname, "rb") as fobj: with open(tarname, "rb") as fobj:
data = fobj.read() data = fobj.read()
if gzip: # Create compressed tarfiles.
# Create testtar.tar.gz and add gzip-specific tests. for c in GzipTest, Bz2Test, LzmaTest:
support.unlink(gzipname) if c.open:
with gzip.open(gzipname, "wb") as tar: support.unlink(c.tarname)
tar.write(data) with c.open(c.tarname, "wb") as tar:
tar.write(data)
tests += [
GzipMiscReadTest, def tearDownModule():
GzipUstarReadTest, if os.path.exists(TEMPDIR):
GzipStreamReadTest, shutil.rmtree(TEMPDIR)
GzipWriteTest,
GzipStreamWriteTest,
]
if bz2:
# Create testtar.tar.bz2 and add bz2-specific tests.
support.unlink(bz2name)
with bz2.BZ2File(bz2name, "wb") as tar:
tar.write(data)
tests += [
Bz2MiscReadTest,
Bz2UstarReadTest,
Bz2StreamReadTest,
Bz2WriteTest,
Bz2StreamWriteTest,
Bz2PartialReadTest,
]
if lzma:
# Create testtar.tar.xz and add lzma-specific tests.
support.unlink(xzname)
with lzma.LZMAFile(xzname, "w") as tar:
tar.write(data)
tests += [
LzmaMiscReadTest,
LzmaUstarReadTest,
LzmaStreamReadTest,
LzmaWriteTest,
LzmaStreamWriteTest,
]
try:
support.run_unittest(*tests)
finally:
if os.path.exists(TEMPDIR):
shutil.rmtree(TEMPDIR)
if __name__ == "__main__": if __name__ == "__main__":
test_main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment