Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
9e4861f5
Commit
9e4861f5
authored
Mar 05, 2019
by
Serhiy Storchaka
Committed by
GitHub
Mar 05, 2019
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
bpo-22831: Use "with" to avoid possible fd leaks in tests (part 1). (GH-10928)
parent
b7272395
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
254 additions
and
298 deletions
+254
-298
Lib/test/test_dbm_dumb.py
Lib/test/test_dbm_dumb.py
+69
-80
Lib/test/test_mmap.py
Lib/test/test_mmap.py
+36
-48
Lib/test/test_socket.py
Lib/test/test_socket.py
+19
-24
Lib/test/test_tarfile.py
Lib/test/test_tarfile.py
+60
-67
Lib/test/test_zipfile64.py
Lib/test/test_zipfile64.py
+70
-79
No files found.
Lib/test/test_dbm_dumb.py
View file @
9e4861f5
...
...
@@ -2,6 +2,7 @@
Original by Roger E. Masse
"""
import
contextlib
import
io
import
operator
import
os
...
...
@@ -32,12 +33,11 @@ class DumbDBMTestCase(unittest.TestCase):
}
def
test_dumbdbm_creation
(
self
):
f
=
dumbdbm
.
open
(
_fname
,
'c'
)
self
.
assertEqual
(
list
(
f
.
keys
()),
[])
for
key
in
self
.
_dict
:
f
[
key
]
=
self
.
_dict
[
key
]
self
.
read_helper
(
f
)
f
.
close
()
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
,
'c'
))
as
f
:
self
.
assertEqual
(
list
(
f
.
keys
()),
[])
for
key
in
self
.
_dict
:
f
[
key
]
=
self
.
_dict
[
key
]
self
.
read_helper
(
f
)
@
unittest
.
skipUnless
(
hasattr
(
os
,
'umask'
),
'test needs os.umask()'
)
def
test_dumbdbm_creation_mode
(
self
):
...
...
@@ -69,78 +69,70 @@ class DumbDBMTestCase(unittest.TestCase):
def
test_dumbdbm_modification
(
self
):
self
.
init_db
()
f
=
dumbdbm
.
open
(
_fname
,
'w'
)
self
.
_dict
[
b'g'
]
=
f
[
b'g'
]
=
b"indented"
self
.
read_helper
(
f
)
# setdefault() works as in the dict interface
self
.
assertEqual
(
f
.
setdefault
(
b'xxx'
,
b'foo'
),
b'foo'
)
self
.
assertEqual
(
f
[
b'xxx'
],
b'foo'
)
f
.
close
()
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
,
'w'
))
as
f
:
self
.
_dict
[
b'g'
]
=
f
[
b'g'
]
=
b"indented"
self
.
read_helper
(
f
)
# setdefault() works as in the dict interface
self
.
assertEqual
(
f
.
setdefault
(
b'xxx'
,
b'foo'
),
b'foo'
)
self
.
assertEqual
(
f
[
b'xxx'
],
b'foo'
)
def
test_dumbdbm_read
(
self
):
self
.
init_db
()
f
=
dumbdbm
.
open
(
_fname
,
'r'
)
self
.
read_helper
(
f
)
with
self
.
assertRaisesRegex
(
dumbdbm
.
error
,
'The database is opened for reading only'
):
f
[
b'g'
]
=
b'x'
with
self
.
assertRaisesRegex
(
dumbdbm
.
error
,
'The database is opened for reading only'
):
del
f
[
b'a'
]
# get() works as in the dict interface
self
.
assertEqual
(
f
.
get
(
b'a'
),
self
.
_dict
[
b'a'
])
self
.
assertEqual
(
f
.
get
(
b'xxx'
,
b'foo'
),
b'foo'
)
self
.
assertIsNone
(
f
.
get
(
b'xxx'
))
with
self
.
assertRaises
(
KeyError
):
f
[
b'xxx'
]
f
.
close
()
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
,
'r'
))
as
f
:
self
.
read_helper
(
f
)
with
self
.
assertRaisesRegex
(
dumbdbm
.
error
,
'The database is opened for reading only'
):
f
[
b'g'
]
=
b'x'
with
self
.
assertRaisesRegex
(
dumbdbm
.
error
,
'The database is opened for reading only'
):
del
f
[
b'a'
]
# get() works as in the dict interface
self
.
assertEqual
(
f
.
get
(
b'a'
),
self
.
_dict
[
b'a'
])
self
.
assertEqual
(
f
.
get
(
b'xxx'
,
b'foo'
),
b'foo'
)
self
.
assertIsNone
(
f
.
get
(
b'xxx'
))
with
self
.
assertRaises
(
KeyError
):
f
[
b'xxx'
]
def
test_dumbdbm_keys
(
self
):
self
.
init_db
()
f
=
dumbdbm
.
open
(
_fname
)
keys
=
self
.
keys_helper
(
f
)
f
.
close
()
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
))
as
f
:
keys
=
self
.
keys_helper
(
f
)
def
test_write_contains
(
self
):
f
=
dumbdbm
.
open
(
_fname
)
f
[
b'1'
]
=
b'hello'
self
.
assertIn
(
b'1'
,
f
)
f
.
close
()
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
))
as
f
:
f
[
b'1'
]
=
b'hello'
self
.
assertIn
(
b'1'
,
f
)
def
test_write_write_read
(
self
):
# test for bug #482460
f
=
dumbdbm
.
open
(
_fname
)
f
[
b'1'
]
=
b'hello'
f
[
b'1'
]
=
b'hello2'
f
.
close
()
f
=
dumbdbm
.
open
(
_fname
)
self
.
assertEqual
(
f
[
b'1'
],
b'hello2'
)
f
.
close
()
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
))
as
f
:
f
[
b'1'
]
=
b'hello'
f
[
b'1'
]
=
b'hello2'
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
))
as
f
:
self
.
assertEqual
(
f
[
b'1'
],
b'hello2'
)
def
test_str_read
(
self
):
self
.
init_db
()
f
=
dumbdbm
.
open
(
_fname
,
'r'
)
self
.
assertEqual
(
f
[
'
\
u00fc
'
],
self
.
_dict
[
'
\
u00fc
'
.
encode
(
'utf-8'
)])
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
,
'r'
))
as
f
:
self
.
assertEqual
(
f
[
'
\
u00fc
'
],
self
.
_dict
[
'
\
u00fc
'
.
encode
(
'utf-8'
)])
def
test_str_write_contains
(
self
):
self
.
init_db
()
f
=
dumbdbm
.
open
(
_fname
)
f
[
'
\
u00fc
'
]
=
b'!'
f
[
'1'
]
=
'a'
f
.
close
()
f
=
dumbdbm
.
open
(
_fname
,
'r'
)
self
.
assertIn
(
'
\
u00fc
'
,
f
)
self
.
assertEqual
(
f
[
'
\
u00fc
'
.
encode
(
'utf-8'
)],
self
.
_dict
[
'
\
u00fc
'
.
encode
(
'utf-8'
)])
self
.
assertEqual
(
f
[
b'1'
],
b'a'
)
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
))
as
f
:
f
[
'
\
u00fc
'
]
=
b'!'
f
[
'1'
]
=
'a'
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
,
'r'
))
as
f
:
self
.
assertIn
(
'
\
u00fc
'
,
f
)
self
.
assertEqual
(
f
[
'
\
u00fc
'
.
encode
(
'utf-8'
)],
self
.
_dict
[
'
\
u00fc
'
.
encode
(
'utf-8'
)])
self
.
assertEqual
(
f
[
b'1'
],
b'a'
)
def
test_line_endings
(
self
):
# test for bug #1172763: dumbdbm would die if the line endings
# weren't what was expected.
f
=
dumbdbm
.
open
(
_fname
)
f
[
b'1'
]
=
b'hello'
f
[
b'2'
]
=
b'hello2'
f
.
close
()
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
))
as
f
:
f
[
b'1'
]
=
b'hello'
f
[
b'2'
]
=
b'hello2'
# Mangle the file by changing the line separator to Windows or Unix
with
io
.
open
(
_fname
+
'.dir'
,
'rb'
)
as
file
:
...
...
@@ -163,10 +155,9 @@ class DumbDBMTestCase(unittest.TestCase):
self
.
assertEqual
(
self
.
_dict
[
key
],
f
[
key
])
def
init_db
(
self
):
f
=
dumbdbm
.
open
(
_fname
,
'n'
)
for
k
in
self
.
_dict
:
f
[
k
]
=
self
.
_dict
[
k
]
f
.
close
()
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
,
'n'
))
as
f
:
for
k
in
self
.
_dict
:
f
[
k
]
=
self
.
_dict
[
k
]
def
keys_helper
(
self
,
f
):
keys
=
sorted
(
f
.
keys
())
...
...
@@ -180,25 +171,23 @@ class DumbDBMTestCase(unittest.TestCase):
import
random
d
=
{}
# mirror the database
for
dummy
in
range
(
5
):
f
=
dumbdbm
.
open
(
_fname
)
for
dummy
in
range
(
100
):
k
=
random
.
choice
(
'abcdefghijklm'
)
if
random
.
random
()
<
0.2
:
if
k
in
d
:
del
d
[
k
]
del
f
[
k
]
else
:
v
=
random
.
choice
((
b'a'
,
b'b'
,
b'c'
))
*
random
.
randrange
(
10000
)
d
[
k
]
=
v
f
[
k
]
=
v
self
.
assertEqual
(
f
[
k
],
v
)
f
.
close
()
f
=
dumbdbm
.
open
(
_fname
)
expected
=
sorted
((
k
.
encode
(
"latin-1"
),
v
)
for
k
,
v
in
d
.
items
())
got
=
sorted
(
f
.
items
())
self
.
assertEqual
(
expected
,
got
)
f
.
close
()
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
))
as
f
:
for
dummy
in
range
(
100
):
k
=
random
.
choice
(
'abcdefghijklm'
)
if
random
.
random
()
<
0.2
:
if
k
in
d
:
del
d
[
k
]
del
f
[
k
]
else
:
v
=
random
.
choice
((
b'a'
,
b'b'
,
b'c'
))
*
random
.
randrange
(
10000
)
d
[
k
]
=
v
f
[
k
]
=
v
self
.
assertEqual
(
f
[
k
],
v
)
with
contextlib
.
closing
(
dumbdbm
.
open
(
_fname
))
as
f
:
expected
=
sorted
((
k
.
encode
(
"latin-1"
),
v
)
for
k
,
v
in
d
.
items
())
got
=
sorted
(
f
.
items
())
self
.
assertEqual
(
expected
,
got
)
def
test_context_manager
(
self
):
with
dumbdbm
.
open
(
_fname
,
'c'
)
as
db
:
...
...
Lib/test/test_mmap.py
View file @
9e4861f5
...
...
@@ -268,13 +268,12 @@ class MmapTests(unittest.TestCase):
def
test_find_end
(
self
):
# test the new 'end' parameter works as expected
f
=
open
(
TESTFN
,
'wb+'
)
data
=
b'one two ones'
n
=
len
(
data
)
f
.
write
(
data
)
f
.
flush
()
m
=
mmap
.
mmap
(
f
.
fileno
(),
n
)
f
.
close
()
with
open
(
TESTFN
,
'wb+'
)
as
f
:
data
=
b'one two ones'
n
=
len
(
data
)
f
.
write
(
data
)
f
.
flush
()
m
=
mmap
.
mmap
(
f
.
fileno
(),
n
)
self
.
assertEqual
(
m
.
find
(
b'one'
),
0
)
self
.
assertEqual
(
m
.
find
(
b'ones'
),
8
)
...
...
@@ -287,13 +286,12 @@ class MmapTests(unittest.TestCase):
def
test_rfind
(
self
):
# test the new 'end' parameter works as expected
f
=
open
(
TESTFN
,
'wb+'
)
data
=
b'one two ones'
n
=
len
(
data
)
f
.
write
(
data
)
f
.
flush
()
m
=
mmap
.
mmap
(
f
.
fileno
(),
n
)
f
.
close
()
with
open
(
TESTFN
,
'wb+'
)
as
f
:
data
=
b'one two ones'
n
=
len
(
data
)
f
.
write
(
data
)
f
.
flush
()
m
=
mmap
.
mmap
(
f
.
fileno
(),
n
)
self
.
assertEqual
(
m
.
rfind
(
b'one'
),
8
)
self
.
assertEqual
(
m
.
rfind
(
b'one '
),
0
)
...
...
@@ -306,30 +304,23 @@ class MmapTests(unittest.TestCase):
def
test_double_close
(
self
):
# make sure a double close doesn't crash on Solaris (Bug# 665913)
f
=
open
(
TESTFN
,
'wb+'
)
f
.
write
(
2
**
16
*
b'a'
)
# Arbitrary character
f
.
close
()
with
open
(
TESTFN
,
'wb+'
)
as
f
:
f
.
write
(
2
**
16
*
b'a'
)
# Arbitrary character
f
=
open
(
TESTFN
,
'rb'
)
mf
=
mmap
.
mmap
(
f
.
fileno
(),
2
**
16
,
access
=
mmap
.
ACCESS_READ
)
mf
.
close
()
mf
.
close
()
f
.
close
()
with
open
(
TESTFN
,
'rb'
)
as
f
:
mf
=
mmap
.
mmap
(
f
.
fileno
(),
2
**
16
,
access
=
mmap
.
ACCESS_READ
)
mf
.
close
()
mf
.
close
()
def
test_entire_file
(
self
):
# test mapping of entire file by passing 0 for map length
f
=
open
(
TESTFN
,
"wb+"
)
with
open
(
TESTFN
,
"wb+"
)
as
f
:
f
.
write
(
2
**
16
*
b'm'
)
# Arbitrary character
f
.
write
(
2
**
16
*
b'm'
)
# Arbitrary character
f
.
close
()
f
=
open
(
TESTFN
,
"rb+"
)
mf
=
mmap
.
mmap
(
f
.
fileno
(),
0
)
self
.
assertEqual
(
len
(
mf
),
2
**
16
,
"Map size should equal file size."
)
self
.
assertEqual
(
mf
.
read
(
2
**
16
),
2
**
16
*
b"m"
)
mf
.
close
()
f
.
close
()
with
open
(
TESTFN
,
"rb+"
)
as
f
,
\
mmap
.
mmap
(
f
.
fileno
(),
0
)
as
mf
:
self
.
assertEqual
(
len
(
mf
),
2
**
16
,
"Map size should equal file size."
)
self
.
assertEqual
(
mf
.
read
(
2
**
16
),
2
**
16
*
b"m"
)
def
test_length_0_offset
(
self
):
# Issue #10916: test mapping of remainder of file by passing 0 for
...
...
@@ -355,16 +346,15 @@ class MmapTests(unittest.TestCase):
def
test_move
(
self
):
# make move works everywhere (64-bit format problem earlier)
f
=
open
(
TESTFN
,
'wb+'
)
with
open
(
TESTFN
,
'wb+'
)
as
f
:
f
.
write
(
b"ABCDEabcde"
)
# Arbitrary character
f
.
flush
()
f
.
write
(
b"ABCDEabcde"
)
# Arbitrary character
f
.
flush
()
mf
=
mmap
.
mmap
(
f
.
fileno
(),
10
)
mf
.
move
(
5
,
0
,
5
)
self
.
assertEqual
(
mf
[:],
b"ABCDEABCDE"
,
"Map move should have duplicated front 5"
)
mf
.
close
()
f
.
close
()
mf
=
mmap
.
mmap
(
f
.
fileno
(),
10
)
mf
.
move
(
5
,
0
,
5
)
self
.
assertEqual
(
mf
[:],
b"ABCDEABCDE"
,
"Map move should have duplicated front 5"
)
mf
.
close
()
# more excessive test
data
=
b"0123456789"
...
...
@@ -562,10 +552,9 @@ class MmapTests(unittest.TestCase):
mapsize
=
10
with
open
(
TESTFN
,
"wb"
)
as
fp
:
fp
.
write
(
b"a"
*
mapsize
)
f
=
open
(
TESTFN
,
"rb"
)
m
=
mmap
.
mmap
(
f
.
fileno
(),
mapsize
,
prot
=
mmap
.
PROT_READ
)
self
.
assertRaises
(
TypeError
,
m
.
write
,
"foo"
)
f
.
close
()
with
open
(
TESTFN
,
"rb"
)
as
f
:
m
=
mmap
.
mmap
(
f
.
fileno
(),
mapsize
,
prot
=
mmap
.
PROT_READ
)
self
.
assertRaises
(
TypeError
,
m
.
write
,
"foo"
)
def
test_error
(
self
):
self
.
assertIs
(
mmap
.
error
,
OSError
)
...
...
@@ -574,9 +563,8 @@ class MmapTests(unittest.TestCase):
data
=
b"0123456789"
with
open
(
TESTFN
,
"wb"
)
as
fp
:
fp
.
write
(
b"x"
*
len
(
data
))
f
=
open
(
TESTFN
,
"r+b"
)
m
=
mmap
.
mmap
(
f
.
fileno
(),
len
(
data
))
f
.
close
()
with
open
(
TESTFN
,
"r+b"
)
as
f
:
m
=
mmap
.
mmap
(
f
.
fileno
(),
len
(
data
))
# Test write_byte()
for
i
in
range
(
len
(
data
)):
self
.
assertEqual
(
m
.
tell
(),
i
)
...
...
Lib/test/test_socket.py
View file @
9e4861f5
...
...
@@ -799,10 +799,9 @@ class GeneralModuleTests(unittest.TestCase):
self
.
assertEqual
(
repr
(
s
),
expected
)
def
test_weakref
(
self
):
s
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
p
=
proxy
(
s
)
self
.
assertEqual
(
p
.
fileno
(),
s
.
fileno
())
s
.
close
()
with
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
as
s
:
p
=
proxy
(
s
)
self
.
assertEqual
(
p
.
fileno
(),
s
.
fileno
())
s
=
None
try
:
p
.
fileno
()
...
...
@@ -1072,9 +1071,8 @@ class GeneralModuleTests(unittest.TestCase):
# Testing default timeout
# The default timeout should initially be None
self
.
assertEqual
(
socket
.
getdefaulttimeout
(),
None
)
s
=
socket
.
socket
()
self
.
assertEqual
(
s
.
gettimeout
(),
None
)
s
.
close
()
with
socket
.
socket
()
as
s
:
self
.
assertEqual
(
s
.
gettimeout
(),
None
)
# Set the default timeout to 10, and see if it propagates
with
socket_setdefaulttimeout
(
10
):
...
...
@@ -1297,9 +1295,8 @@ class GeneralModuleTests(unittest.TestCase):
def
testSendAfterClose
(
self
):
# testing send() after close() with timeout
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
sock
.
settimeout
(
1
)
sock
.
close
()
with
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
as
sock
:
sock
.
settimeout
(
1
)
self
.
assertRaises
(
OSError
,
sock
.
send
,
b"spam"
)
def
testCloseException
(
self
):
...
...
@@ -1317,16 +1314,15 @@ class GeneralModuleTests(unittest.TestCase):
def
testNewAttributes
(
self
):
# testing .family, .type and .protocol
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
assertEqual
(
sock
.
family
,
socket
.
AF_INET
)
if
hasattr
(
socket
,
'SOCK_CLOEXEC'
):
self
.
assertIn
(
sock
.
type
,
(
socket
.
SOCK_STREAM
|
socket
.
SOCK_CLOEXEC
,
socket
.
SOCK_STREAM
))
else
:
self
.
assertEqual
(
sock
.
type
,
socket
.
SOCK_STREAM
)
self
.
assertEqual
(
sock
.
proto
,
0
)
sock
.
close
()
with
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
as
sock
:
self
.
assertEqual
(
sock
.
family
,
socket
.
AF_INET
)
if
hasattr
(
socket
,
'SOCK_CLOEXEC'
):
self
.
assertIn
(
sock
.
type
,
(
socket
.
SOCK_STREAM
|
socket
.
SOCK_CLOEXEC
,
socket
.
SOCK_STREAM
))
else
:
self
.
assertEqual
(
sock
.
type
,
socket
.
SOCK_STREAM
)
self
.
assertEqual
(
sock
.
proto
,
0
)
def
test_getsockaddrarg
(
self
):
sock
=
socket
.
socket
()
...
...
@@ -1601,10 +1597,9 @@ class GeneralModuleTests(unittest.TestCase):
def
test_listen_backlog_overflow
(
self
):
# Issue 15989
import
_testcapi
srv
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
srv
.
bind
((
HOST
,
0
))
self
.
assertRaises
(
OverflowError
,
srv
.
listen
,
_testcapi
.
INT_MAX
+
1
)
srv
.
close
()
with
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
as
srv
:
srv
.
bind
((
HOST
,
0
))
self
.
assertRaises
(
OverflowError
,
srv
.
listen
,
_testcapi
.
INT_MAX
+
1
)
@
unittest
.
skipUnless
(
support
.
IPV6_ENABLED
,
'IPv6 required for this test.'
)
def
test_flowinfo
(
self
):
...
...
Lib/test/test_tarfile.py
View file @
9e4861f5
...
...
@@ -132,49 +132,47 @@ class UstarReadTest(ReadTest, unittest.TestCase):
data
=
fobj
.
read
()
tarinfo
=
self
.
tar
.
getmember
(
"ustar/regtype"
)
fobj
=
self
.
tar
.
extractfile
(
tarinfo
)
text
=
fobj
.
read
()
fobj
.
seek
(
0
)
self
.
assertEqual
(
0
,
fobj
.
tell
(),
"seek() to file's start failed"
)
fobj
.
seek
(
2048
,
0
)
self
.
assertEqual
(
2048
,
fobj
.
tell
(),
"seek() to absolute position failed"
)
fobj
.
seek
(
-
1024
,
1
)
self
.
assertEqual
(
1024
,
fobj
.
tell
(),
"seek() to negative relative position failed"
)
fobj
.
seek
(
1024
,
1
)
self
.
assertEqual
(
2048
,
fobj
.
tell
(),
"seek() to positive relative position failed"
)
s
=
fobj
.
read
(
10
)
self
.
assertEqual
(
s
,
data
[
2048
:
2058
],
"read() after seek failed"
)
fobj
.
seek
(
0
,
2
)
self
.
assertEqual
(
tarinfo
.
size
,
fobj
.
tell
(),
"seek() to file's end failed"
)
self
.
assertEqual
(
fobj
.
read
(),
b""
,
"read() at file's end did not return empty string"
)
fobj
.
seek
(
-
tarinfo
.
size
,
2
)
self
.
assertEqual
(
0
,
fobj
.
tell
(),
"relative seek() to file's end failed"
)
fobj
.
seek
(
512
)
s1
=
fobj
.
readlines
()
fobj
.
seek
(
512
)
s2
=
fobj
.
readlines
()
self
.
assertEqual
(
s1
,
s2
,
"readlines() after seek failed"
)
fobj
.
seek
(
0
)
self
.
assertEqual
(
len
(
fobj
.
readline
()),
fobj
.
tell
(),
"tell() after readline() failed"
)
fobj
.
seek
(
512
)
self
.
assertEqual
(
len
(
fobj
.
readline
())
+
512
,
fobj
.
tell
(),
"tell() after seek() and readline() failed"
)
fobj
.
seek
(
0
)
line
=
fobj
.
readline
()
self
.
assertEqual
(
fobj
.
read
(),
data
[
len
(
line
):],
"read() after readline() failed"
)
fobj
.
close
()
with
self
.
tar
.
extractfile
(
tarinfo
)
as
fobj
:
text
=
fobj
.
read
()
fobj
.
seek
(
0
)
self
.
assertEqual
(
0
,
fobj
.
tell
(),
"seek() to file's start failed"
)
fobj
.
seek
(
2048
,
0
)
self
.
assertEqual
(
2048
,
fobj
.
tell
(),
"seek() to absolute position failed"
)
fobj
.
seek
(
-
1024
,
1
)
self
.
assertEqual
(
1024
,
fobj
.
tell
(),
"seek() to negative relative position failed"
)
fobj
.
seek
(
1024
,
1
)
self
.
assertEqual
(
2048
,
fobj
.
tell
(),
"seek() to positive relative position failed"
)
s
=
fobj
.
read
(
10
)
self
.
assertEqual
(
s
,
data
[
2048
:
2058
],
"read() after seek failed"
)
fobj
.
seek
(
0
,
2
)
self
.
assertEqual
(
tarinfo
.
size
,
fobj
.
tell
(),
"seek() to file's end failed"
)
self
.
assertEqual
(
fobj
.
read
(),
b""
,
"read() at file's end did not return empty string"
)
fobj
.
seek
(
-
tarinfo
.
size
,
2
)
self
.
assertEqual
(
0
,
fobj
.
tell
(),
"relative seek() to file's end failed"
)
fobj
.
seek
(
512
)
s1
=
fobj
.
readlines
()
fobj
.
seek
(
512
)
s2
=
fobj
.
readlines
()
self
.
assertEqual
(
s1
,
s2
,
"readlines() after seek failed"
)
fobj
.
seek
(
0
)
self
.
assertEqual
(
len
(
fobj
.
readline
()),
fobj
.
tell
(),
"tell() after readline() failed"
)
fobj
.
seek
(
512
)
self
.
assertEqual
(
len
(
fobj
.
readline
())
+
512
,
fobj
.
tell
(),
"tell() after seek() and readline() failed"
)
fobj
.
seek
(
0
)
line
=
fobj
.
readline
()
self
.
assertEqual
(
fobj
.
read
(),
data
[
len
(
line
):],
"read() after readline() failed"
)
def
test_fileobj_text
(
self
):
with
self
.
tar
.
extractfile
(
"ustar/regtype"
)
as
fobj
:
...
...
@@ -486,15 +484,14 @@ class MiscReadTestBase(CommonReadTest):
fobj
.
seek
(
offset
)
# Test if the tarfile starts with the second member.
tar
=
tar
.
open
(
self
.
tarname
,
mode
=
"r:"
,
fileobj
=
fobj
)
t
=
tar
.
next
()
self
.
assertEqual
(
t
.
name
,
name
)
# Read to the end of fileobj and test if seeking back to the
# beginning works.
tar
.
getmembers
()
self
.
assertEqual
(
tar
.
extractfile
(
t
).
read
(),
data
,
"seek back did not work"
)
tar
.
close
()
with
tar
.
open
(
self
.
tarname
,
mode
=
"r:"
,
fileobj
=
fobj
)
as
tar
:
t
=
tar
.
next
()
self
.
assertEqual
(
t
.
name
,
name
)
# Read to the end of fileobj and test if seeking back to the
# beginning works.
tar
.
getmembers
()
self
.
assertEqual
(
tar
.
extractfile
(
t
).
read
(),
data
,
"seek back did not work"
)
def
test_fail_comp
(
self
):
# For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
...
...
@@ -1042,9 +1039,8 @@ class WriteTestBase(TarTest):
def
test_fileobj_no_close
(
self
):
fobj
=
io
.
BytesIO
()
tar
=
tarfile
.
open
(
fileobj
=
fobj
,
mode
=
self
.
mode
)
tar
.
addfile
(
tarfile
.
TarInfo
(
"foo"
))
tar
.
close
()
with
tarfile
.
open
(
fileobj
=
fobj
,
mode
=
self
.
mode
)
as
tar
:
tar
.
addfile
(
tarfile
.
TarInfo
(
"foo"
))
self
.
assertFalse
(
fobj
.
closed
,
"external fileobjs must never closed"
)
# Issue #20238: Incomplete gzip output with mode="w:gz"
data
=
fobj
.
getvalue
()
...
...
@@ -1306,19 +1302,16 @@ class WriteTest(WriteTestBase, unittest.TestCase):
with
open
(
source_file
,
'w'
)
as
f
:
f
.
write
(
'something
\
n
'
)
os
.
symlink
(
source_file
,
target_file
)
tar
=
tarfile
.
open
(
temparchive
,
'w'
)
tar
.
add
(
source_file
)
tar
.
add
(
target_file
)
tar
.
close
()
with
tarfile
.
open
(
temparchive
,
'w'
)
as
tar
:
tar
.
add
(
source_file
)
tar
.
add
(
target_file
)
# Let's extract it to the location which contains the symlink
tar
=
tarfile
.
open
(
temparchive
,
'r'
)
# this should not raise OSError: [Errno 17] File exists
try
:
tar
.
extractall
(
path
=
tempdir
)
except
OSError
:
self
.
fail
(
"extractall failed with symlinked files"
)
finally
:
tar
.
close
()
with
tarfile
.
open
(
temparchive
)
as
tar
:
# this should not raise OSError: [Errno 17] File exists
try
:
tar
.
extractall
(
path
=
tempdir
)
except
OSError
:
self
.
fail
(
"extractall failed with symlinked files"
)
finally
:
support
.
unlink
(
temparchive
)
support
.
rmtree
(
tempdir
)
...
...
Lib/test/test_zipfile64.py
View file @
9e4861f5
...
...
@@ -31,42 +31,39 @@ class TestsWithSourceFile(unittest.TestCase):
self
.
data
=
'
\
n
'
.
join
(
line_gen
).
encode
(
'ascii'
)
# And write it to a file.
fp
=
open
(
TESTFN
,
"wb"
)
fp
.
write
(
self
.
data
)
fp
.
close
()
with
open
(
TESTFN
,
"wb"
)
as
fp
:
fp
.
write
(
self
.
data
)
def
zipTest
(
self
,
f
,
compression
):
# Create the ZIP archive.
zipfp
=
zipfile
.
ZipFile
(
f
,
"w"
,
compression
)
# It will contain enough copies of self.data to reach about 6 GiB of
# raw data to store.
filecount
=
6
*
1024
**
3
//
len
(
self
.
data
)
next_time
=
time
.
monotonic
()
+
_PRINT_WORKING_MSG_INTERVAL
for
num
in
range
(
filecount
):
zipfp
.
writestr
(
"testfn%d"
%
num
,
self
.
data
)
# Print still working message since this test can be really slow
if
next_time
<=
time
.
monotonic
():
next_time
=
time
.
monotonic
()
+
_PRINT_WORKING_MSG_INTERVAL
print
((
' zipTest still writing %d of %d, be patient...'
%
(
num
,
filecount
)),
file
=
sys
.
__stdout__
)
sys
.
__stdout__
.
flush
()
zipfp
.
close
()
with
zipfile
.
ZipFile
(
f
,
"w"
,
compression
)
as
zipfp
:
# It will contain enough copies of self.data to reach about 6 GiB of
# raw data to store.
filecount
=
6
*
1024
**
3
//
len
(
self
.
data
)
next_time
=
time
.
monotonic
()
+
_PRINT_WORKING_MSG_INTERVAL
for
num
in
range
(
filecount
):
zipfp
.
writestr
(
"testfn%d"
%
num
,
self
.
data
)
# Print still working message since this test can be really slow
if
next_time
<=
time
.
monotonic
():
next_time
=
time
.
monotonic
()
+
_PRINT_WORKING_MSG_INTERVAL
print
((
' zipTest still writing %d of %d, be patient...'
%
(
num
,
filecount
)),
file
=
sys
.
__stdout__
)
sys
.
__stdout__
.
flush
()
# Read the ZIP archive
zipfp
=
zipfile
.
ZipFile
(
f
,
"r"
,
compression
)
for
num
in
range
(
filecount
):
self
.
assertEqual
(
zipfp
.
read
(
"testfn%d"
%
num
),
self
.
data
)
# Print still working message since this test can be really slow
if
next_time
<=
time
.
monotonic
():
next_time
=
time
.
monotonic
()
+
_PRINT_WORKING_MSG_INTERVAL
print
((
' zipTest still reading %d of %d, be patient...'
%
(
num
,
filecount
)),
file
=
sys
.
__stdout__
)
sys
.
__stdout__
.
flush
()
zipfp
.
close
()
with
zipfile
.
ZipFile
(
f
,
"r"
,
compression
)
as
zipfp
:
for
num
in
range
(
filecount
):
self
.
assertEqual
(
zipfp
.
read
(
"testfn%d"
%
num
),
self
.
data
)
# Print still working message since this test can be really slow
if
next_time
<=
time
.
monotonic
():
next_time
=
time
.
monotonic
()
+
_PRINT_WORKING_MSG_INTERVAL
print
((
' zipTest still reading %d of %d, be patient...'
%
(
num
,
filecount
)),
file
=
sys
.
__stdout__
)
sys
.
__stdout__
.
flush
()
def
testStored
(
self
):
# Try the temp file first. If we do TESTFN2 first, then it hogs
...
...
@@ -95,56 +92,50 @@ class OtherTests(unittest.TestCase):
def
testMoreThan64kFiles
(
self
):
# This test checks that more than 64k files can be added to an archive,
# and that the resulting archive can be read properly by ZipFile
zipf
=
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"w"
,
allowZip64
=
True
)
zipf
.
debug
=
100
numfiles
=
(
1
<<
16
)
*
3
//
2
for
i
in
range
(
numfiles
):
zipf
.
writestr
(
"foo%08d"
%
i
,
"%d"
%
(
i
**
3
%
57
))
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
zipf
.
close
()
zipf2
=
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"r"
)
self
.
assertEqual
(
len
(
zipf2
.
namelist
()),
numfiles
)
for
i
in
range
(
numfiles
):
content
=
zipf2
.
read
(
"foo%08d"
%
i
).
decode
(
'ascii'
)
self
.
assertEqual
(
content
,
"%d"
%
(
i
**
3
%
57
))
zipf2
.
close
()
with
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"w"
,
allowZip64
=
True
)
as
zipf
:
zipf
.
debug
=
100
numfiles
=
(
1
<<
16
)
*
3
//
2
for
i
in
range
(
numfiles
):
zipf
.
writestr
(
"foo%08d"
%
i
,
"%d"
%
(
i
**
3
%
57
))
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
with
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"r"
)
as
zipf2
:
self
.
assertEqual
(
len
(
zipf2
.
namelist
()),
numfiles
)
for
i
in
range
(
numfiles
):
content
=
zipf2
.
read
(
"foo%08d"
%
i
).
decode
(
'ascii'
)
self
.
assertEqual
(
content
,
"%d"
%
(
i
**
3
%
57
))
def
testMoreThan64kFilesAppend
(
self
):
zipf
=
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"w"
,
allowZip64
=
False
)
zipf
.
debug
=
100
numfiles
=
(
1
<<
16
)
-
1
for
i
in
range
(
numfiles
):
zipf
.
writestr
(
"foo%08d"
%
i
,
"%d"
%
(
i
**
3
%
57
))
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
with
self
.
assertRaises
(
zipfile
.
LargeZipFile
):
zipf
.
writestr
(
"foo%08d"
%
numfiles
,
b''
)
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
zipf
.
close
()
zipf
=
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"a"
,
allowZip64
=
False
)
zipf
.
debug
=
100
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
with
self
.
assertRaises
(
zipfile
.
LargeZipFile
):
zipf
.
writestr
(
"foo%08d"
%
numfiles
,
b''
)
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
zipf
.
close
()
zipf
=
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"a"
,
allowZip64
=
True
)
zipf
.
debug
=
100
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
numfiles2
=
(
1
<<
16
)
*
3
//
2
for
i
in
range
(
numfiles
,
numfiles2
):
zipf
.
writestr
(
"foo%08d"
%
i
,
"%d"
%
(
i
**
3
%
57
))
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles2
)
zipf
.
close
()
zipf2
=
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"r"
)
self
.
assertEqual
(
len
(
zipf2
.
namelist
()),
numfiles2
)
for
i
in
range
(
numfiles2
):
content
=
zipf2
.
read
(
"foo%08d"
%
i
).
decode
(
'ascii'
)
self
.
assertEqual
(
content
,
"%d"
%
(
i
**
3
%
57
))
zipf2
.
close
()
with
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"w"
,
allowZip64
=
False
)
as
zipf
:
zipf
.
debug
=
100
numfiles
=
(
1
<<
16
)
-
1
for
i
in
range
(
numfiles
):
zipf
.
writestr
(
"foo%08d"
%
i
,
"%d"
%
(
i
**
3
%
57
))
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
with
self
.
assertRaises
(
zipfile
.
LargeZipFile
):
zipf
.
writestr
(
"foo%08d"
%
numfiles
,
b''
)
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
with
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"a"
,
allowZip64
=
False
)
as
zipf
:
zipf
.
debug
=
100
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
with
self
.
assertRaises
(
zipfile
.
LargeZipFile
):
zipf
.
writestr
(
"foo%08d"
%
numfiles
,
b''
)
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
with
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"a"
,
allowZip64
=
True
)
as
zipf
:
zipf
.
debug
=
100
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles
)
numfiles2
=
(
1
<<
16
)
*
3
//
2
for
i
in
range
(
numfiles
,
numfiles2
):
zipf
.
writestr
(
"foo%08d"
%
i
,
"%d"
%
(
i
**
3
%
57
))
self
.
assertEqual
(
len
(
zipf
.
namelist
()),
numfiles2
)
with
zipfile
.
ZipFile
(
TESTFN
,
mode
=
"r"
)
as
zipf2
:
self
.
assertEqual
(
len
(
zipf2
.
namelist
()),
numfiles2
)
for
i
in
range
(
numfiles2
):
content
=
zipf2
.
read
(
"foo%08d"
%
i
).
decode
(
'ascii'
)
self
.
assertEqual
(
content
,
"%d"
%
(
i
**
3
%
57
))
def
tearDown
(
self
):
support
.
unlink
(
TESTFN
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment