Commit 71ca7758 authored by Neal Norwitz's avatar Neal Norwitz

SF #651082, tarfile module implementation from Lars Gustäbel

parent c16072c7
...@@ -329,6 +329,7 @@ LIBFILES= $(MANSTYLES) $(INDEXSTYLES) $(COMMONTEX) \ ...@@ -329,6 +329,7 @@ LIBFILES= $(MANSTYLES) $(INDEXSTYLES) $(COMMONTEX) \
lib/libmmap.tex \ lib/libmmap.tex \
lib/tkinter.tex \ lib/tkinter.tex \
lib/libturtle.tex \ lib/libturtle.tex \
lib/libtarfile.tex \
lib/libcfgparser.tex lib/libcfgparser.tex
# LaTeX source files for Macintosh Library Modules. # LaTeX source files for Macintosh Library Modules.
......
...@@ -128,6 +128,7 @@ and how to embed it in other applications. ...@@ -128,6 +128,7 @@ and how to embed it in other applications.
\input{libcfgparser} \input{libcfgparser}
\input{libfileinput} \input{libfileinput}
\input{libxreadlines} \input{libxreadlines}
\input{libtarfile}
\input{libcalendar} \input{libcalendar}
\input{libcmd} \input{libcmd}
\input{libshlex} \input{libshlex}
......
This diff is collapsed.
This diff is collapsed.
import sys
import os
import shutil
import unittest
import tarfile
from test import test_support
# Check for our compression modules.
try:
import gzip
except ImportError:
gzip = None
try:
import bz2
except ImportError:
bz2 = None
def path(path):
return test_support.findfile(path)
testtar = path("testtar.tar")
tempdir = path("testtar.dir")
tempname = path("testtar.tmp")
membercount = 10
def tarname(comp=""):
if not comp:
return testtar
return "%s.%s" % (testtar, comp)
def dirname():
if not os.path.exists(tempdir):
os.mkdir(tempdir)
return tempdir
def tmpname():
return tempname
class BaseTest(unittest.TestCase):
comp = ''
mode = 'r'
sep = ':'
def setUp(self):
mode = self.mode + self.sep + self.comp
self.tar = tarfile.open(tarname(self.comp), mode)
def tearDown(self):
self.tar.close()
class ReadTest(BaseTest):
def test(self):
"""Test member extraction.
"""
members = 0
for tarinfo in self.tar:
members += 1
if not tarinfo.isreg():
continue
f = self.tar.extractfile(tarinfo)
self.assert_(len(f.read()) == tarinfo.size,
"size read does not match expected size")
f.close()
self.assert_(members == membercount,
"could not find all members")
def test_sparse(self):
"""Test sparse member extraction.
"""
if self.sep != "|":
f1 = self.tar.extractfile("S-SPARSE")
f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
self.assert_(f1.read() == f2.read(),
"_FileObject failed on sparse file member")
def test_readlines(self):
"""Test readlines() method of _FileObject.
"""
if self.sep != "|":
filename = "0-REGTYPE-TEXT"
self.tar.extract(filename, dirname())
lines1 = file(os.path.join(dirname(), filename), "r").readlines()
lines2 = self.tar.extractfile(filename).readlines()
self.assert_(lines1 == lines2,
"_FileObject.readline() does not work correctly")
def test_seek(self):
"""Test seek() method of _FileObject, incl. random reading.
"""
if self.sep != "|":
filename = "0-REGTYPE"
self.tar.extract(filename, dirname())
data = file(os.path.join(dirname(), filename), "rb").read()
tarinfo = self.tar.getmember(filename)
fobj = self.tar.extractfile(tarinfo)
text = fobj.read()
fobj.seek(0)
self.assert_(0 == fobj.tell(),
"seek() to file's start failed")
fobj.seek(2048, 0)
self.assert_(2048 == fobj.tell(),
"seek() to absolute position failed")
fobj.seek(-1024, 1)
self.assert_(1024 == fobj.tell(),
"seek() to negative relative position failed")
fobj.seek(1024, 1)
self.assert_(2048 == fobj.tell(),
"seek() to positive relative position failed")
s = fobj.read(10)
self.assert_(s == data[2048:2058],
"read() after seek failed")
fobj.seek(0, 2)
self.assert_(tarinfo.size == fobj.tell(),
"seek() to file's end failed")
self.assert_(fobj.read() == "",
"read() at file's end did not return empty string")
fobj.seek(-tarinfo.size, 2)
self.assert_(0 == fobj.tell(),
"relative seek() to file's start failed")
fobj.seek(512)
s1 = fobj.readlines()
fobj.seek(512)
s2 = fobj.readlines()
self.assert_(s1 == s2,
"readlines() after seek failed")
fobj.close()
class ReadStreamTest(ReadTest):
sep = "|"
def test(self):
"""Test member extraction, and for StreamError when
seeking backwards.
"""
ReadTest.test(self)
tarinfo = self.tar.getmembers()[0]
f = self.tar.extractfile(tarinfo)
self.assertRaises(tarfile.StreamError, f.read)
def test_stream(self):
"""Compare the normal tar and the stream tar.
"""
stream = self.tar
tar = tarfile.open(tarname(), 'r')
while 1:
t1 = tar.next()
t2 = stream.next()
if t1 is None:
break
self.assert_(t2 is not None, "stream.next() failed.")
if t2.islnk() or t2.issym():
self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
continue
v1 = tar.extractfile(t1)
v2 = stream.extractfile(t2)
if v1 is None:
continue
self.assert_(v2 is not None, "stream.extractfile() failed")
self.assert_(v1.read() == v2.read(), "stream extraction failed")
stream.close()
class WriteTest(BaseTest):
mode = 'w'
def setUp(self):
mode = self.mode + self.sep + self.comp
self.src = tarfile.open(tarname(self.comp), 'r')
self.dst = tarfile.open(tmpname(), mode)
def tearDown(self):
self.src.close()
self.dst.close()
def test_posix(self):
self.dst.posix = 1
self._test()
def test_nonposix(self):
self.dst.posix = 0
self._test()
def _test(self):
for tarinfo in self.src:
if not tarinfo.isreg():
continue
f = self.src.extractfile(tarinfo)
if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME:
self.assertRaises(ValueError, self.dst.addfile,
tarinfo, f)
else:
self.dst.addfile(tarinfo, f)
class WriteStreamTest(WriteTest):
sep = '|'
# Gzip TestCases
class ReadTestGzip(ReadTest):
comp = "gz"
class ReadStreamTestGzip(ReadStreamTest):
comp = "gz"
class WriteTestGzip(WriteTest):
comp = "gz"
class WriteStreamTestGzip(WriteStreamTest):
comp = "gz"
if bz2:
# Bzip2 TestCases
class ReadTestBzip2(ReadTestGzip):
comp = "bz2"
class ReadStreamTestBzip2(ReadStreamTestGzip):
comp = "bz2"
class WriteTestBzip2(WriteTest):
comp = "bz2"
class WriteStreamTestBzip2(WriteStreamTestGzip):
comp = "bz2"
# If importing gzip failed, discard the Gzip TestCases.
if not gzip:
del ReadTestGzip
del ReadStreamTestGzip
del WriteTestGzip
del WriteStreamTestGzip
if __name__ == "__main__":
if gzip:
# create testtar.tar.gz
gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read())
if bz2:
# create testtar.tar.bz2
bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read())
try:
unittest.main()
finally:
if gzip:
os.remove(tarname("gz"))
if bz2:
os.remove(tarname("bz2"))
if os.path.exists(tempdir):
shutil.rmtree(tempdir)
if os.path.exists(tempname):
os.remove(tempname)
This diff was suppressed by a .gitattributes entry.
...@@ -196,6 +196,7 @@ Eddy De Greef ...@@ -196,6 +196,7 @@ Eddy De Greef
Duncan Grisby Duncan Grisby
Dag Gruneau Dag Gruneau
Michael Guravage Michael Guravage
Lars Gustbel
Barry Haddow Barry Haddow
Paul ten Hagen Paul ten Hagen
Rasmus Hahn Rasmus Hahn
......
...@@ -63,6 +63,10 @@ Library ...@@ -63,6 +63,10 @@ Library
It is also exported for ntpath, macpath, and os2emxpath. It is also exported for ntpath, macpath, and os2emxpath.
See SF bug #659228. See SF bug #659228.
- New module tarfile from Lars Gustäbel provides a comprehensive interface
to tar archive files with transparent gzip and bzip2 compression.
See SF patch #651082.
Tools/Demos Tools/Demos
----------- -----------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment