Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
e6f4d808
Commit
e6f4d808
authored
Mar 06, 2007
by
Martin v. Löwis
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Patch #1121142: Implement ZipFile.open.
parent
e01b50d2
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
665 additions
and
49 deletions
+665
-49
Doc/lib/libzipfile.tex
Doc/lib/libzipfile.tex
+26
-0
Lib/test/test_zipfile.py
Lib/test/test_zipfile.py
+402
-10
Lib/zipfile.py
Lib/zipfile.py
+235
-39
Misc/NEWS
Misc/NEWS
+2
-0
No files found.
Doc/lib/libzipfile.tex
View file @
e6f4d808
...
...
@@ -141,6 +141,32 @@ cat myzip.zip >> python.exe
Return a list of archive members by name.
\end{methoddesc}
\begin{methoddesc}
{
open
}{
name
\optional
{
, mode
\optional
{
, pwd
}}}
Extract a member from the archive as a file-like object (ZipExtFile).
\var
{
name
}
is the name of the file in the archive. The
\var
{
mode
}
parameter, if included, must be one of the following:
\code
{
'r'
}
(the
default),
\code
{
'U'
}
, or
\code
{
'rU'
}
. Choosing
\code
{
'U'
}
or
\code
{
'rU'
}
will enable universal newline support in the read-only
object.
\var
{
pwd
}
is the password used for encrypted files.
\begin{notice}
The file-like object is read-only and provides the following methods:
\method
{
read()
}
,
\method
{
readline()
}
,
\method
{
readlines()
}
,
\method
{__
iter
__
()
}
,
\method
{
next()
}
.
\end{notice}
\begin{notice}
If the ZipFile was created by passing in a file-like object as the
first argument to the constructor, then the object returned by
\method
{
open()
}
shares the ZipFile's file pointer. Under these
circumstances, the object returned by
\method
{
open()
}
should not
be used after any additional operations are performed on the
ZipFile object. If the ZipFile was created by passing in a string
(the filename) as the first argument to the constructor, then
\method
{
open()
}
will create a new file object that will be held
by the ZipExtFile, allowing it to operate independently of the
ZipFile.
\end{notice}
\end{methoddesc}
\begin{methoddesc}
{
printdir
}{}
Print a table of contents for the archive to
\code
{
sys.stdout
}
.
\end{methoddesc}
...
...
Lib/test/test_zipfile.py
View file @
e6f4d808
...
...
@@ -4,26 +4,29 @@ try:
except
ImportError
:
zlib
=
None
import
zipfile
,
os
,
unittest
,
sys
,
shutil
import
zipfile
,
os
,
unittest
,
sys
,
shutil
,
struct
from
StringIO
import
StringIO
from
tempfile
import
TemporaryFile
from
random
import
randint
,
random
from
test.test_support
import
TESTFN
,
run_unittest
TESTFN2
=
TESTFN
+
"2"
FIXEDTEST_SIZE
=
10
class
TestsWithSourceFile
(
unittest
.
TestCase
):
def
setUp
(
self
):
line_gen
=
(
"Test of zipfile line %d."
%
i
for
i
in
range
(
0
,
1000
))
self
.
data
=
'
\
n
'
.
join
(
line_gen
)
self
.
line_gen
=
(
"Zipfile test line %d. random float: %f"
%
(
i
,
random
())
for
i
in
xrange
(
FIXEDTEST_SIZE
))
self
.
data
=
'
\
n
'
.
join
(
self
.
line_gen
)
+
'
\
n
'
# Make a source file with some lines
fp
=
open
(
TESTFN
,
"wb"
)
fp
.
write
(
self
.
data
)
fp
.
close
()
def
zipTest
(
self
,
f
,
compression
):
def
makeTestArchive
(
self
,
f
,
compression
):
# Create the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"w"
,
compression
)
zipfp
.
write
(
TESTFN
,
"another"
+
os
.
extsep
+
"name"
)
...
...
@@ -31,6 +34,9 @@ class TestsWithSourceFile(unittest.TestCase):
zipfp
.
writestr
(
"strfile"
,
self
.
data
)
zipfp
.
close
()
def
zipTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
,
compression
)
self
.
assertEqual
(
zipfp
.
read
(
TESTFN
),
self
.
data
)
...
...
@@ -85,22 +91,144 @@ class TestsWithSourceFile(unittest.TestCase):
# Check that testzip doesn't raise an exception
zipfp
.
testzip
()
zipfp
.
close
()
def
testStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipTest
(
f
,
zipfile
.
ZIP_STORED
)
def
zipOpenTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
,
compression
)
zipdata1
=
[]
zipopen1
=
zipfp
.
open
(
TESTFN
)
while
1
:
read_data
=
zipopen1
.
read
(
256
)
if
not
read_data
:
break
zipdata1
.
append
(
read_data
)
zipdata2
=
[]
zipopen2
=
zipfp
.
open
(
"another"
+
os
.
extsep
+
"name"
)
while
1
:
read_data
=
zipopen2
.
read
(
256
)
if
not
read_data
:
break
zipdata2
.
append
(
read_data
)
self
.
assertEqual
(
''
.
join
(
zipdata1
),
self
.
data
)
self
.
assertEqual
(
''
.
join
(
zipdata2
),
self
.
data
)
zipfp
.
close
()
def
testOpenStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipOpenTest
(
f
,
zipfile
.
ZIP_STORED
)
def
zipRandomOpenTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
,
compression
)
zipdata1
=
[]
zipopen1
=
zipfp
.
open
(
TESTFN
)
while
1
:
read_data
=
zipopen1
.
read
(
randint
(
1
,
1024
))
if
not
read_data
:
break
zipdata1
.
append
(
read_data
)
self
.
assertEqual
(
''
.
join
(
zipdata1
),
self
.
data
)
zipfp
.
close
()
def
testRandomOpenStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipRandomOpenTest
(
f
,
zipfile
.
ZIP_STORED
)
def
zipReadlineTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
)
zipopen
=
zipfp
.
open
(
TESTFN
)
for
line
in
self
.
line_gen
:
linedata
=
zipopen
.
readline
()
self
.
assertEqual
(
linedata
,
line
+
'
\
n
'
)
zipfp
.
close
()
def
zipReadlinesTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
)
ziplines
=
zipfp
.
open
(
TESTFN
).
readlines
()
for
line
,
zipline
in
zip
(
self
.
line_gen
,
ziplines
):
self
.
assertEqual
(
zipline
,
line
+
'
\
n
'
)
zipfp
.
close
()
def
testStored
(
self
):
def
zipIterlinesTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
)
for
line
,
zipline
in
zip
(
self
.
line_gen
,
zipfp
.
open
(
TESTFN
)):
self
.
assertEqual
(
zipline
,
line
+
'
\
n
'
)
zipfp
.
close
()
def
testReadlineStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipTest
(
f
,
zipfile
.
ZIP_STORED
)
self
.
zipReadlineTest
(
f
,
zipfile
.
ZIP_STORED
)
def
testReadlinesStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipReadlinesTest
(
f
,
zipfile
.
ZIP_STORED
)
def
testIterlinesStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipIterlinesTest
(
f
,
zipfile
.
ZIP_STORED
)
if
zlib
:
def
testDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
testOpenDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipOpenTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
testRandomOpenDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipRandomOpenTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
testReadlineDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipReadlineTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
testReadlinesDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipReadlinesTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
testIterlinesDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipIterlinesTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
testLowCompression
(
self
):
# Checks for cases where compressed data is larger than original
# Create the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
TESTFN2
,
"w"
,
zipfile
.
ZIP_DEFLATED
)
zipfp
.
writestr
(
"strfile"
,
'12'
)
zipfp
.
close
()
# Get an open object for strfile
zipfp
=
zipfile
.
ZipFile
(
TESTFN2
,
"r"
,
zipfile
.
ZIP_DEFLATED
)
openobj
=
zipfp
.
open
(
"strfile"
)
self
.
assertEqual
(
openobj
.
read
(
1
),
'1'
)
self
.
assertEqual
(
openobj
.
read
(
1
),
'2'
)
def
testAbsoluteArcnames
(
self
):
zipfp
=
zipfile
.
ZipFile
(
TESTFN2
,
"w"
,
zipfile
.
ZIP_STORED
)
zipfp
.
write
(
TESTFN
,
"/absolute"
)
...
...
@@ -110,7 +238,6 @@ class TestsWithSourceFile(unittest.TestCase):
self
.
assertEqual
(
zipfp
.
namelist
(),
[
"absolute"
])
zipfp
.
close
()
def
tearDown
(
self
):
os
.
remove
(
TESTFN
)
os
.
remove
(
TESTFN2
)
...
...
@@ -123,7 +250,7 @@ class TestZip64InSmallFiles(unittest.TestCase):
self
.
_limit
=
zipfile
.
ZIP64_LIMIT
zipfile
.
ZIP64_LIMIT
=
5
line_gen
=
(
"Test of zipfile line %d."
%
i
for
i
in
range
(
0
,
1000
))
line_gen
=
(
"Test of zipfile line %d."
%
i
for
i
in
range
(
0
,
FIXEDTEST_SIZE
))
self
.
data
=
'
\
n
'
.
join
(
line_gen
)
# Make a source file with some lines
...
...
@@ -344,6 +471,26 @@ class OtherTests(unittest.TestCase):
except
zipfile
.
BadZipfile
:
os
.
unlink
(
TESTFN
)
def
testIsZipErroneousFile
(
self
):
# This test checks that the is_zipfile function correctly identifies
# a file that is not a zip file
fp
=
open
(
TESTFN
,
"w"
)
fp
.
write
(
"this is not a legal zip file
\
n
"
)
fp
.
close
()
chk
=
zipfile
.
is_zipfile
(
TESTFN
)
os
.
unlink
(
TESTFN
)
self
.
assert_
(
chk
is
False
)
def
testIsZipValidFile
(
self
):
# This test checks that the is_zipfile function correctly identifies
# a file that is a zip file
zipf
=
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"w"
)
zipf
.
writestr
(
"foo.txt"
,
"O, for a Muse of Fire!"
)
zipf
.
close
()
chk
=
zipfile
.
is_zipfile
(
TESTFN
)
os
.
unlink
(
TESTFN
)
self
.
assert_
(
chk
is
True
)
def
testNonExistentFileRaisesIOError
(
self
):
# make sure we don't raise an AttributeError when a partially-constructed
# ZipFile instance is finalized; this tests for regression on SF tracker
...
...
@@ -371,7 +518,6 @@ class OtherTests(unittest.TestCase):
# and report that the first file in the archive was corrupt.
self
.
assertRaises
(
RuntimeError
,
zipf
.
testzip
)
class
DecryptionTests
(
unittest
.
TestCase
):
# This test checks that ZIP decryption works. Since the library does not
# support encryption at the moment, we use a pre-generated encrypted
...
...
@@ -411,9 +557,255 @@ class DecryptionTests(unittest.TestCase):
self
.
zip
.
setpassword
(
"python"
)
self
.
assertEquals
(
self
.
zip
.
read
(
"test.txt"
),
self
.
plain
)
class
TestsWithRandomBinaryFiles
(
unittest
.
TestCase
):
def
setUp
(
self
):
datacount
=
randint
(
16
,
64
)
*
1024
+
randint
(
1
,
1024
)
self
.
data
=
''
.
join
((
struct
.
pack
(
'<f'
,
random
()
*
randint
(
-
1000
,
1000
))
for
i
in
xrange
(
datacount
)))
# Make a source file with some lines
fp
=
open
(
TESTFN
,
"wb"
)
fp
.
write
(
self
.
data
)
fp
.
close
()
def
makeTestArchive
(
self
,
f
,
compression
):
# Create the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"w"
,
compression
)
zipfp
.
write
(
TESTFN
,
"another"
+
os
.
extsep
+
"name"
)
zipfp
.
write
(
TESTFN
,
TESTFN
)
zipfp
.
close
()
def
zipTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
,
compression
)
testdata
=
zipfp
.
read
(
TESTFN
)
self
.
assertEqual
(
len
(
testdata
),
len
(
self
.
data
))
self
.
assertEqual
(
testdata
,
self
.
data
)
self
.
assertEqual
(
zipfp
.
read
(
"another"
+
os
.
extsep
+
"name"
),
self
.
data
)
zipfp
.
close
()
def
testStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipTest
(
f
,
zipfile
.
ZIP_STORED
)
def
zipOpenTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
,
compression
)
zipdata1
=
[]
zipopen1
=
zipfp
.
open
(
TESTFN
)
while
1
:
read_data
=
zipopen1
.
read
(
256
)
if
not
read_data
:
break
zipdata1
.
append
(
read_data
)
zipdata2
=
[]
zipopen2
=
zipfp
.
open
(
"another"
+
os
.
extsep
+
"name"
)
while
1
:
read_data
=
zipopen2
.
read
(
256
)
if
not
read_data
:
break
zipdata2
.
append
(
read_data
)
testdata1
=
''
.
join
(
zipdata1
)
self
.
assertEqual
(
len
(
testdata1
),
len
(
self
.
data
))
self
.
assertEqual
(
testdata1
,
self
.
data
)
testdata2
=
''
.
join
(
zipdata2
)
self
.
assertEqual
(
len
(
testdata1
),
len
(
self
.
data
))
self
.
assertEqual
(
testdata1
,
self
.
data
)
zipfp
.
close
()
def
testOpenStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipOpenTest
(
f
,
zipfile
.
ZIP_STORED
)
def
zipRandomOpenTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
,
compression
)
zipdata1
=
[]
zipopen1
=
zipfp
.
open
(
TESTFN
)
while
1
:
read_data
=
zipopen1
.
read
(
randint
(
1
,
1024
))
if
not
read_data
:
break
zipdata1
.
append
(
read_data
)
testdata
=
''
.
join
(
zipdata1
)
self
.
assertEqual
(
len
(
testdata
),
len
(
self
.
data
))
self
.
assertEqual
(
testdata
,
self
.
data
)
zipfp
.
close
()
def
testRandomOpenStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
zipRandomOpenTest
(
f
,
zipfile
.
ZIP_STORED
)
class
TestsWithMultipleOpens
(
unittest
.
TestCase
):
def
setUp
(
self
):
# Create the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
TESTFN2
,
"w"
,
zipfile
.
ZIP_DEFLATED
)
zipfp
.
writestr
(
'ones'
,
'1'
*
FIXEDTEST_SIZE
)
zipfp
.
writestr
(
'twos'
,
'2'
*
FIXEDTEST_SIZE
)
zipfp
.
close
()
def
testSameFile
(
self
):
# Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other.
zipf
=
zipfile
.
ZipFile
(
TESTFN2
,
mode
=
"r"
)
zopen1
=
zipf
.
open
(
'ones'
)
zopen2
=
zipf
.
open
(
'ones'
)
data1
=
zopen1
.
read
(
500
)
data2
=
zopen2
.
read
(
500
)
data1
+=
zopen1
.
read
(
500
)
data2
+=
zopen2
.
read
(
500
)
self
.
assertEqual
(
data1
,
data2
)
zipf
.
close
()
def
testDifferentFile
(
self
):
# Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other.
zipf
=
zipfile
.
ZipFile
(
TESTFN2
,
mode
=
"r"
)
zopen1
=
zipf
.
open
(
'ones'
)
zopen2
=
zipf
.
open
(
'twos'
)
data1
=
zopen1
.
read
(
500
)
data2
=
zopen2
.
read
(
500
)
data1
+=
zopen1
.
read
(
500
)
data2
+=
zopen2
.
read
(
500
)
self
.
assertEqual
(
data1
,
'1'
*
FIXEDTEST_SIZE
)
self
.
assertEqual
(
data2
,
'2'
*
FIXEDTEST_SIZE
)
zipf
.
close
()
def
testInterleaved
(
self
):
# Verify that (when the ZipFile is in control of creating file objects)
# multiple open() calls can be made without interfering with each other.
zipf
=
zipfile
.
ZipFile
(
TESTFN2
,
mode
=
"r"
)
zopen1
=
zipf
.
open
(
'ones'
)
data1
=
zopen1
.
read
(
500
)
zopen2
=
zipf
.
open
(
'twos'
)
data2
=
zopen2
.
read
(
500
)
data1
+=
zopen1
.
read
(
500
)
data2
+=
zopen2
.
read
(
500
)
self
.
assertEqual
(
data1
,
'1'
*
FIXEDTEST_SIZE
)
self
.
assertEqual
(
data2
,
'2'
*
FIXEDTEST_SIZE
)
zipf
.
close
()
def
tearDown
(
self
):
os
.
remove
(
TESTFN2
)
class
UniversalNewlineTests
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
line_gen
=
[
"Test of zipfile line %d."
%
i
for
i
in
xrange
(
FIXEDTEST_SIZE
)]
self
.
seps
=
(
'
\
r
'
,
'
\
r
\
n
'
,
'
\
n
'
)
self
.
arcdata
,
self
.
arcfiles
=
{},
{}
for
n
,
s
in
enumerate
(
self
.
seps
):
self
.
arcdata
[
s
]
=
s
.
join
(
self
.
line_gen
)
+
s
self
.
arcfiles
[
s
]
=
'%s-%d'
%
(
TESTFN
,
n
)
file
(
self
.
arcfiles
[
s
],
"wb"
).
write
(
self
.
arcdata
[
s
])
def
makeTestArchive
(
self
,
f
,
compression
):
# Create the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"w"
,
compression
)
for
fn
in
self
.
arcfiles
.
values
():
zipfp
.
write
(
fn
,
fn
)
zipfp
.
close
()
def
readTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
)
for
sep
,
fn
in
self
.
arcfiles
.
items
():
zipdata
=
zipfp
.
open
(
fn
,
"rU"
).
read
()
self
.
assertEqual
(
self
.
arcdata
[
sep
],
zipdata
)
zipfp
.
close
()
def
readlineTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
)
for
sep
,
fn
in
self
.
arcfiles
.
items
():
zipopen
=
zipfp
.
open
(
fn
,
"rU"
)
for
line
in
self
.
line_gen
:
linedata
=
zipopen
.
readline
()
self
.
assertEqual
(
linedata
,
line
+
'
\
n
'
)
zipfp
.
close
()
def
readlinesTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
)
for
sep
,
fn
in
self
.
arcfiles
.
items
():
ziplines
=
zipfp
.
open
(
fn
,
"rU"
).
readlines
()
for
line
,
zipline
in
zip
(
self
.
line_gen
,
ziplines
):
self
.
assertEqual
(
zipline
,
line
+
'
\
n
'
)
zipfp
.
close
()
def
iterlinesTest
(
self
,
f
,
compression
):
self
.
makeTestArchive
(
f
,
compression
)
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
)
for
sep
,
fn
in
self
.
arcfiles
.
items
():
for
line
,
zipline
in
zip
(
self
.
line_gen
,
zipfp
.
open
(
fn
,
"rU"
)):
self
.
assertEqual
(
zipline
,
line
+
'
\
n
'
)
zipfp
.
close
()
def
testReadStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
readTest
(
f
,
zipfile
.
ZIP_STORED
)
def
testReadlineStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
readlineTest
(
f
,
zipfile
.
ZIP_STORED
)
def
testReadlinesStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
readlinesTest
(
f
,
zipfile
.
ZIP_STORED
)
def
testIterlinesStored
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
iterlinesTest
(
f
,
zipfile
.
ZIP_STORED
)
if
zlib
:
def
testReadDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
readTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
testReadlineDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
readlineTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
testReadlinesDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
readlinesTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
testIterlinesDeflated
(
self
):
for
f
in
(
TESTFN2
,
TemporaryFile
(),
StringIO
()):
self
.
iterlinesTest
(
f
,
zipfile
.
ZIP_DEFLATED
)
def
tearDown
(
self
):
for
sep
,
fn
in
self
.
arcfiles
.
items
():
os
.
remove
(
fn
)
def
test_main
():
run_unittest
(
TestsWithSourceFile
,
TestZip64InSmallFiles
,
OtherTests
,
PyZipFileTests
,
DecryptionTests
)
PyZipFileTests
,
DecryptionTests
,
TestsWithMultipleOpens
,
UniversalNewlineTests
,
TestsWithRandomBinaryFiles
)
#run_unittest(TestZip64InSmallFiles)
if
__name__
==
"__main__"
:
...
...
Lib/zipfile.py
View file @
e6f4d808
...
...
@@ -355,6 +355,200 @@ class _ZipDecrypter:
self
.
_UpdateKeys
(
c
)
return
c
class
ZipExtFile
:
"""File-like object for reading an archive member.
Is returned by ZipFile.open().
"""
def
__init__
(
self
,
fileobj
,
zipinfo
,
decrypt
=
None
):
self
.
fileobj
=
fileobj
self
.
decrypter
=
decrypt
self
.
bytes_read
=
0L
self
.
rawbuffer
=
''
self
.
readbuffer
=
''
self
.
linebuffer
=
''
self
.
eof
=
False
self
.
univ_newlines
=
False
self
.
nlSeps
=
(
"
\
n
"
,
)
self
.
lastdiscard
=
''
self
.
compress_type
=
zipinfo
.
compress_type
self
.
compress_size
=
zipinfo
.
compress_size
self
.
closed
=
False
self
.
mode
=
"r"
self
.
name
=
zipinfo
.
filename
# read from compressed files in 64k blocks
self
.
compreadsize
=
64
*
1024
if
self
.
compress_type
==
ZIP_DEFLATED
:
self
.
dc
=
zlib
.
decompressobj
(
-
15
)
def
set_univ_newlines
(
self
,
univ_newlines
):
self
.
univ_newlines
=
univ_newlines
# pick line separator char(s) based on universal newlines flag
self
.
nlSeps
=
(
"
\
n
"
,
)
if
self
.
univ_newlines
:
self
.
nlSeps
=
(
"
\
r
\
n
"
,
"
\
r
"
,
"
\
n
"
)
def
__iter__
(
self
):
return
self
def
next
(
self
):
nextline
=
self
.
readline
()
if
not
nextline
:
raise
StopIteration
()
return
nextline
def
close
(
self
):
self
.
closed
=
True
def
_checkfornewline
(
self
):
nl
,
nllen
=
-
1
,
-
1
if
self
.
linebuffer
:
# ugly check for cases where half of an \r\n pair was
# read on the last pass, and the \r was discarded. In this
# case we just throw away the \n at the start of the buffer.
if
(
self
.
lastdiscard
,
self
.
linebuffer
[
0
])
==
(
'
\
r
'
,
'
\
n
'
):
self
.
linebuffer
=
self
.
linebuffer
[
1
:]
for
sep
in
self
.
nlSeps
:
nl
=
self
.
linebuffer
.
find
(
sep
)
if
nl
>=
0
:
nllen
=
len
(
sep
)
return
nl
,
nllen
return
nl
,
nllen
def
readline
(
self
,
size
=
-
1
):
"""Read a line with approx. size. If size is negative,
read a whole line.
"""
if
size
<
0
:
size
=
sys
.
maxint
elif
size
==
0
:
return
''
# check for a newline already in buffer
nl
,
nllen
=
self
.
_checkfornewline
()
if
nl
>=
0
:
# the next line was already in the buffer
nl
=
min
(
nl
,
size
)
else
:
# no line break in buffer - try to read more
size
-=
len
(
self
.
linebuffer
)
while
nl
<
0
and
size
>
0
:
buf
=
self
.
read
(
min
(
size
,
100
))
if
not
buf
:
break
self
.
linebuffer
+=
buf
size
-=
len
(
buf
)
# check for a newline in buffer
nl
,
nllen
=
self
.
_checkfornewline
()
# we either ran out of bytes in the file, or
# met the specified size limit without finding a newline,
# so return current buffer
if
nl
<
0
:
s
=
self
.
linebuffer
self
.
linebuffer
=
''
return
s
buf
=
self
.
linebuffer
[:
nl
]
self
.
lastdiscard
=
self
.
linebuffer
[
nl
:
nl
+
nllen
]
self
.
linebuffer
=
self
.
linebuffer
[
nl
+
nllen
:]
# line is always returned with \n as newline char (except possibly
# for a final incomplete line in the file, which is handled above).
return
buf
+
"
\
n
"
def
readlines
(
self
,
sizehint
=
-
1
):
"""Return a list with all (following) lines. The sizehint parameter
is ignored in this implementation.
"""
result
=
[]
while
True
:
line
=
self
.
readline
()
if
not
line
:
break
result
.
append
(
line
)
return
result
def
read
(
self
,
size
=
None
):
# act like file() obj and return empty string if size is 0
if
size
==
0
:
return
''
# determine read size
bytesToRead
=
self
.
compress_size
-
self
.
bytes_read
# adjust read size for encrypted files since the first 12 bytes
# are for the encryption/password information
if
self
.
decrypter
is
not
None
:
bytesToRead
-=
12
if
size
is
not
None
and
size
>=
0
:
if
self
.
compress_type
==
ZIP_STORED
:
lr
=
len
(
self
.
readbuffer
)
bytesToRead
=
min
(
bytesToRead
,
size
-
lr
)
elif
self
.
compress_type
==
ZIP_DEFLATED
:
if
len
(
self
.
readbuffer
)
>
size
:
# the user has requested fewer bytes than we've already
# pulled through the decompressor; don't read any more
bytesToRead
=
0
else
:
# user will use up the buffer, so read some more
lr
=
len
(
self
.
rawbuffer
)
bytesToRead
=
min
(
bytesToRead
,
self
.
compreadsize
-
lr
)
# avoid reading past end of file contents
if
bytesToRead
+
self
.
bytes_read
>
self
.
compress_size
:
bytesToRead
=
self
.
compress_size
-
self
.
bytes_read
# try to read from file (if necessary)
if
bytesToRead
>
0
:
bytes
=
self
.
fileobj
.
read
(
bytesToRead
)
self
.
bytes_read
+=
len
(
bytes
)
self
.
rawbuffer
+=
bytes
# handle contents of raw buffer
if
self
.
rawbuffer
:
newdata
=
self
.
rawbuffer
self
.
rawbuffer
=
''
# decrypt new data if we were given an object to handle that
if
newdata
and
self
.
decrypter
is
not
None
:
newdata
=
''
.
join
(
map
(
self
.
decrypter
,
newdata
))
# decompress newly read data if necessary
if
newdata
and
self
.
compress_type
==
ZIP_DEFLATED
:
newdata
=
self
.
dc
.
decompress
(
newdata
)
self
.
rawbuffer
=
self
.
dc
.
unconsumed_tail
if
self
.
eof
and
len
(
self
.
rawbuffer
)
==
0
:
# we're out of raw bytes (both from the file and
# the local buffer); flush just to make sure the
# decompressor is done
newdata
+=
self
.
dc
.
flush
()
# prevent decompressor from being used again
self
.
dc
=
None
self
.
readbuffer
+=
newdata
# return what the user asked for
if
size
is
None
or
len
(
self
.
readbuffer
)
<=
size
:
bytes
=
self
.
readbuffer
self
.
readbuffer
=
''
else
:
bytes
=
self
.
readbuffer
[:
size
]
self
.
readbuffer
=
self
.
readbuffer
[
size
:]
return
bytes
class
ZipFile
:
""" Class with methods to open, read, write, close, list zip files.
...
...
@@ -534,73 +728,75 @@ class ZipFile:
def
read
(
self
,
name
,
pwd
=
None
):
"""Return file bytes (as a string) for name."""
if
self
.
mode
not
in
(
"r"
,
"a"
):
raise
RuntimeError
,
'read() requires mode "r" or "a"'
return
self
.
open
(
name
,
"r"
,
pwd
).
read
()
def
open
(
self
,
name
,
mode
=
"r"
,
pwd
=
None
):
"""Return file-like object for 'name'."""
if
mode
not
in
(
"r"
,
"U"
,
"rU"
):
raise
RuntimeError
,
'open() requires mode "r", "U", or "rU"'
if
not
self
.
fp
:
raise
RuntimeError
,
\
"Attempt to read ZIP archive that was already closed"
# Only open a new file for instances where we were not
# given a file object in the constructor
if
self
.
_filePassed
:
zef_file
=
self
.
fp
else
:
zef_file
=
open
(
self
.
filename
,
'rb'
)
# Get info object for name
zinfo
=
self
.
getinfo
(
name
)
is_encrypted
=
zinfo
.
flag_bits
&
0x1
if
is_encrypted
:
if
not
pwd
:
pwd
=
self
.
pwd
if
not
pwd
:
raise
RuntimeError
,
"File %s is encrypted, "
\
"password required for extraction"
%
name
filepos
=
self
.
fp
.
tell
()
self
.
fp
.
seek
(
zinfo
.
header_offset
,
0
)
filepos
=
zef_file
.
tell
()
zef_file
.
seek
(
zinfo
.
header_offset
,
0
)
# Skip the file header:
fheader
=
self
.
fp
.
read
(
30
)
fheader
=
zef_file
.
read
(
30
)
if
fheader
[
0
:
4
]
!=
stringFileHeader
:
raise
BadZipfile
,
"Bad magic number for file header"
fheader
=
struct
.
unpack
(
structFileHeader
,
fheader
)
fname
=
self
.
fp
.
read
(
fheader
[
_FH_FILENAME_LENGTH
])
fname
=
zef_file
.
read
(
fheader
[
_FH_FILENAME_LENGTH
])
if
fheader
[
_FH_EXTRA_FIELD_LENGTH
]:
self
.
fp
.
read
(
fheader
[
_FH_EXTRA_FIELD_LENGTH
])
zef_file
.
read
(
fheader
[
_FH_EXTRA_FIELD_LENGTH
])
if
fname
!=
zinfo
.
orig_filename
:
raise
BadZipfile
,
\
'File name in directory "%s" and header "%s" differ.'
%
(
zinfo
.
orig_filename
,
fname
)
bytes
=
self
.
fp
.
read
(
zinfo
.
compress_size
)
# Go with decryption
# check for encrypted flag & handle password
is_encrypted
=
zinfo
.
flag_bits
&
0x1
zd
=
None
if
is_encrypted
:
if
not
pwd
:
pwd
=
self
.
pwd
if
not
pwd
:
raise
RuntimeError
,
"File %s is encrypted, "
\
"password required for extraction"
%
name
zd
=
_ZipDecrypter
(
pwd
)
# The first 12 bytes in the cypher stream is an encryption header
# used to strengthen the algorithm. The first 11 bytes are
# completely random, while the 12th contains the MSB of the CRC,
# and is used to check the correctness of the password.
bytes
=
zef_file
.
read
(
12
)
h
=
map
(
zd
,
bytes
[
0
:
12
])
if
ord
(
h
[
11
])
!=
((
zinfo
.
CRC
>>
24
)
&
255
):
raise
RuntimeError
,
"Bad password for file %s"
%
name
bytes
=
""
.
join
(
map
(
zd
,
bytes
[
12
:]))
# Go with decompression
self
.
fp
.
seek
(
filepos
,
0
)
if
zinfo
.
compress_type
==
ZIP_STORED
:
pass
elif
zinfo
.
compress_type
==
ZIP_DEFLATED
:
if
not
zlib
:
raise
RuntimeError
,
\
"De-compression requires the (missing) zlib module"
# zlib compress/decompress code by Jeremy Hylton of CNRI
dc
=
zlib
.
decompressobj
(
-
15
)
bytes
=
dc
.
decompress
(
bytes
)
# need to feed in unused pad byte so that zlib won't choke
ex
=
dc
.
decompress
(
'Z'
)
+
dc
.
flush
()
if
ex
:
bytes
=
bytes
+
ex
# build and return a ZipExtFile
if
zd
is
None
:
zef
=
ZipExtFile
(
zef_file
,
zinfo
)
else
:
raise
BadZipfile
,
\
"Unsupported compression method %d for file %s"
%
\
(
zinfo
.
compress_type
,
name
)
crc
=
binascii
.
crc32
(
bytes
)
if
crc
!=
zinfo
.
CRC
:
raise
BadZipfile
,
"Bad CRC-32 for file %s"
%
name
return
bytes
zef
=
ZipExtFile
(
zef_file
,
zinfo
,
zd
)
# set universal newlines on ZipExtFile if necessary
if
"U"
in
mode
:
zef
.
set_univ_newlines
(
True
)
return
zef
def
_writecheck
(
self
,
zinfo
):
"""Check for errors before writing a file to the archive."""
...
...
Misc/NEWS
View file @
e6f4d808
...
...
@@ -139,6 +139,8 @@ Core and builtins
Library
-------
- Patch #1121142: Implement ZipFile.open.
- Taught setup.py how to locate Berkeley DB on Macs using MacPorts.
- Added heapq.merge() for merging sorted input streams.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment