Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
cac9e719
Commit
cac9e719
authored
Jul 31, 2014
by
Antoine Pitrou
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Issue #22111: Assorted cleanups in test_imaplib. Patch by Milan Oberkirch.
parent
a734af3f
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
53 additions
and
65 deletions
+53
-65
Lib/test/test_imaplib.py
Lib/test/test_imaplib.py
+51
-65
Misc/NEWS
Misc/NEWS
+2
-0
No files found.
Lib/test/test_imaplib.py
View file @
cac9e719
...
...
@@ -11,7 +11,8 @@ import socketserver
import
time
import
calendar
from
test.support
import
reap_threads
,
verbose
,
transient_internet
,
run_with_tz
,
run_with_locale
from
test.support
import
(
reap_threads
,
verbose
,
transient_internet
,
run_with_tz
,
run_with_locale
)
import
unittest
from
datetime
import
datetime
,
timezone
,
timedelta
try
:
...
...
@@ -22,8 +23,8 @@ except ImportError:
else
:
from
ssl
import
HAS_SNI
CERTFILE
=
None
CAFILE
=
None
CERTFILE
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
)
or
os
.
curdir
,
"keycert3.pem"
)
CAFILE
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
)
or
os
.
curdir
,
"pycacert.pem"
)
class
TestImaplib
(
unittest
.
TestCase
):
...
...
@@ -44,17 +45,15 @@ class TestImaplib(unittest.TestCase):
def
test_Internaldate2tuple_issue10941
(
self
):
self
.
assertNotEqual
(
imaplib
.
Internaldate2tuple
(
b'25 (INTERNALDATE "02-Apr-2000 02:30:00 +0000")'
),
imaplib
.
Internaldate2tuple
(
b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'
))
imaplib
.
Internaldate2tuple
(
b'25 (INTERNALDATE "02-Apr-2000 03:30:00 +0000")'
))
def
timevalues
(
self
):
return
[
2000000000
,
2000000000.0
,
time
.
localtime
(
2000000000
),
(
2033
,
5
,
18
,
5
,
33
,
20
,
-
1
,
-
1
,
-
1
),
(
2033
,
5
,
18
,
5
,
33
,
20
,
-
1
,
-
1
,
1
),
datetime
.
fromtimestamp
(
2000000000
,
timezone
(
timedelta
(
0
,
2
*
60
*
60
))),
timezone
(
timedelta
(
0
,
2
*
60
*
60
))),
'"18-May-2033 05:33:20 +0200"'
]
@
run_with_locale
(
'LC_ALL'
,
'de_DE'
,
'fr_FR'
)
...
...
@@ -75,7 +74,6 @@ class TestImaplib(unittest.TestCase):
if
ssl
:
class
SecureTCPServer
(
socketserver
.
TCPServer
):
def
get_request
(
self
):
...
...
@@ -96,13 +94,13 @@ else:
class
SimpleIMAPHandler
(
socketserver
.
StreamRequestHandler
):
timeout
=
1
continuation
=
None
capabilities
=
''
def
_send
(
self
,
message
):
if
verbose
:
print
(
"SENT: %r"
%
message
.
strip
())
if
verbose
:
print
(
"SENT: %r"
%
message
.
strip
())
self
.
wfile
.
write
(
message
)
def
_send_line
(
self
,
message
):
...
...
@@ -135,7 +133,8 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler):
if
line
.
endswith
(
b'
\
r
\
n
'
):
break
if
verbose
:
print
(
'GOT: %r'
%
line
.
strip
())
if
verbose
:
print
(
'GOT: %r'
%
line
.
strip
())
if
self
.
continuation
:
try
:
self
.
continuation
.
send
(
line
)
...
...
@@ -147,8 +146,8 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler):
cmd
=
splitline
[
1
]
args
=
splitline
[
2
:]
if
hasattr
(
self
,
'cmd_'
+
cmd
):
continuation
=
getattr
(
self
,
'cmd_'
+
cmd
)(
tag
,
args
)
if
hasattr
(
self
,
'cmd_'
+
cmd
):
continuation
=
getattr
(
self
,
'cmd_'
+
cmd
)(
tag
,
args
)
if
continuation
:
self
.
continuation
=
continuation
next
(
continuation
)
...
...
@@ -156,7 +155,9 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler):
self
.
_send_tagged
(
tag
,
'BAD'
,
cmd
+
' unknown'
)
def
cmd_CAPABILITY
(
self
,
tag
,
args
):
caps
=
'IMAP4rev1 '
+
self
.
capabilities
if
self
.
capabilities
else
'IMAP4rev1'
caps
=
(
'IMAP4rev1 '
+
self
.
capabilities
if
self
.
capabilities
else
'IMAP4rev1'
)
self
.
_send_textline
(
'* CAPABILITY '
+
caps
)
self
.
_send_tagged
(
tag
,
'OK'
,
'CAPABILITY completed'
)
...
...
@@ -165,7 +166,9 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler):
self
.
_send_tagged
(
tag
,
'OK'
,
'LOGOUT completed'
)
class
BaseThreadedNetworkedTests
(
unittest
.
TestCase
):
class
ThreadedNetworkedTests
(
unittest
.
TestCase
):
server_class
=
socketserver
.
TCPServer
imap_class
=
imaplib
.
IMAP4
def
make_server
(
self
,
addr
,
hdlr
):
...
...
@@ -175,7 +178,8 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
self
.
server_close
()
raise
if
verbose
:
print
(
"creating server"
)
if
verbose
:
print
(
"creating server"
)
server
=
MyServer
(
addr
,
hdlr
)
self
.
assertEqual
(
server
.
server_address
,
server
.
socket
.
getsockname
())
...
...
@@ -191,18 +195,21 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
# Short poll interval to make the test finish quickly.
# Time between requests is short enough that we won't wake
# up spuriously too many times.
kwargs
=
{
'poll_interval'
:
0.01
})
kwargs
=
{
'poll_interval'
:
0.01
})
t
.
daemon
=
True
# In case this function raises.
t
.
start
()
if
verbose
:
print
(
"server running"
)
if
verbose
:
print
(
"server running"
)
return
server
,
t
def
reap_server
(
self
,
server
,
thread
):
if
verbose
:
print
(
"waiting for server"
)
if
verbose
:
print
(
"waiting for server"
)
server
.
shutdown
()
server
.
server_close
()
thread
.
join
()
if
verbose
:
print
(
"done"
)
if
verbose
:
print
(
"done"
)
@
contextmanager
def
reaped_server
(
self
,
hdlr
):
...
...
@@ -259,7 +266,7 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
def
cmd_AUTHENTICATE
(
self
,
tag
,
args
):
self
.
_send_tagged
(
tag
,
'NO'
,
'unrecognized authentication '
'type {}'
.
format
(
args
[
0
]))
'type {}'
.
format
(
args
[
0
]))
with
self
.
reaped_pair
(
MyServer
)
as
(
server
,
client
):
with
self
.
assertRaises
(
imaplib
.
IMAP4
.
error
):
...
...
@@ -293,13 +300,13 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
code
,
data
=
client
.
authenticate
(
'MYAUTH'
,
lambda
x
:
b'fake'
)
self
.
assertEqual
(
code
,
'OK'
)
self
.
assertEqual
(
server
.
response
,
b'ZmFrZQ==
\
r
\
n
'
)
#
b64 encoded 'fake'
b'ZmFrZQ==
\
r
\
n
'
)
#
b64 encoded 'fake'
with
self
.
reaped_pair
(
MyServer
)
as
(
server
,
client
):
code
,
data
=
client
.
authenticate
(
'MYAUTH'
,
lambda
x
:
'fake'
)
self
.
assertEqual
(
code
,
'OK'
)
self
.
assertEqual
(
server
.
response
,
b'ZmFrZQ==
\
r
\
n
'
)
#
b64 encoded 'fake'
b'ZmFrZQ==
\
r
\
n
'
)
#
b64 encoded 'fake'
@
reap_threads
def
test_login_cram_md5
(
self
):
...
...
@@ -310,9 +317,10 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
def
cmd_AUTHENTICATE
(
self
,
tag
,
args
):
self
.
_send_textline
(
'+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
'VzdG9uLm1jaS5uZXQ='
)
'VzdG9uLm1jaS5uZXQ='
)
r
=
yield
if
r
==
b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy
\
r
\
n
'
:
if
(
r
==
b'dGltIGYxY2E2YmU0NjRiOWVmYT'
b'FjY2E2ZmZkNmNmMmQ5ZjMy
\
r
\
n
'
):
self
.
_send_tagged
(
tag
,
'OK'
,
'CRAM-MD5 successful'
)
else
:
self
.
_send_tagged
(
tag
,
'NO'
,
'No access'
)
...
...
@@ -327,27 +335,19 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
ret
,
data
=
client
.
login_cram_md5
(
"tim"
,
b"tanstaaftanstaaf"
)
self
.
assertEqual
(
ret
,
"OK"
)
def
test_linetoolong
(
self
):
class
TooLongHandler
(
SimpleIMAPHandler
):
def
handle
(
self
):
# Send a very long response line
self
.
wfile
.
write
(
b'* OK '
+
imaplib
.
_MAXLINE
*
b'x'
+
b'
\
r
\
n
'
)
self
.
wfile
.
write
(
b'* OK '
+
imaplib
.
_MAXLINE
*
b'x'
+
b'
\
r
\
n
'
)
with
self
.
reaped_server
(
TooLongHandler
)
as
server
:
self
.
assertRaises
(
imaplib
.
IMAP4
.
error
,
self
.
imap_class
,
*
server
.
server_address
)
class
ThreadedNetworkedTests
(
BaseThreadedNetworkedTests
):
server_class
=
socketserver
.
TCPServer
imap_class
=
imaplib
.
IMAP4
@
unittest
.
skipUnless
(
ssl
,
"SSL not available"
)
class
ThreadedNetworkedTestsSSL
(
BaseThreadedNetworkedTests
):
class
ThreadedNetworkedTestsSSL
(
ThreadedNetworkedTests
):
server_class
=
SecureTCPServer
imap_class
=
IMAP4_SSL
...
...
@@ -359,8 +359,9 @@ class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
ssl_context
.
check_hostname
=
True
ssl_context
.
load_verify_locations
(
CAFILE
)
with
self
.
assertRaisesRegex
(
ssl
.
CertificateError
,
"hostname '127.0.0.1' doesn't match 'localhost'"
):
with
self
.
assertRaisesRegex
(
ssl
.
CertificateError
,
"hostname '127.0.0.1' doesn't match 'localhost'"
):
with
self
.
reaped_server
(
SimpleIMAPHandler
)
as
server
:
client
=
self
.
imap_class
(
*
server
.
server_address
,
ssl_context
=
ssl_context
)
...
...
@@ -372,6 +373,8 @@ class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
client
.
shutdown
()
@
unittest
.
skipUnless
(
support
.
is_resource_enabled
(
'network'
),
'network resource disabled'
)
class
RemoteIMAPTest
(
unittest
.
TestCase
):
host
=
'cyrus.andrew.cmu.edu'
port
=
143
...
...
@@ -405,6 +408,8 @@ class RemoteIMAPTest(unittest.TestCase):
@
unittest
.
skipUnless
(
ssl
,
"SSL not available"
)
@
unittest
.
skipUnless
(
support
.
is_resource_enabled
(
'network'
),
'network resource disabled'
)
class
RemoteIMAP_STARTTLSTest
(
RemoteIMAPTest
):
def
setUp
(
self
):
...
...
@@ -458,7 +463,8 @@ class RemoteIMAP_SSLTest(RemoteIMAPTest):
def
test_logincapa_with_client_ssl_context
(
self
):
with
transient_internet
(
self
.
host
):
_server
=
self
.
imap_class
(
self
.
host
,
self
.
port
,
ssl_context
=
self
.
create_ssl_context
())
_server
=
self
.
imap_class
(
self
.
host
,
self
.
port
,
ssl_context
=
self
.
create_ssl_context
())
self
.
check_logincapa
(
_server
)
def
test_logout
(
self
):
...
...
@@ -469,35 +475,15 @@ class RemoteIMAP_SSLTest(RemoteIMAPTest):
def
test_ssl_context_certfile_exclusive
(
self
):
with
transient_internet
(
self
.
host
):
self
.
assertRaises
(
ValueError
,
self
.
imap_class
,
self
.
host
,
self
.
port
,
certfile
=
CERTFILE
,
ssl_context
=
self
.
create_ssl_context
())
self
.
assertRaises
(
ValueError
,
self
.
imap_class
,
self
.
host
,
self
.
port
,
certfile
=
CERTFILE
,
ssl_context
=
self
.
create_ssl_context
())
def
test_ssl_context_keyfile_exclusive
(
self
):
with
transient_internet
(
self
.
host
):
self
.
assertRaises
(
ValueError
,
self
.
imap_class
,
self
.
host
,
self
.
port
,
keyfile
=
CERTFILE
,
ssl_context
=
self
.
create_ssl_context
())
def
load_tests
(
*
args
):
tests
=
[
TestImaplib
]
if
support
.
is_resource_enabled
(
'network'
):
if
ssl
:
global
CERTFILE
,
CAFILE
CERTFILE
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
)
or
os
.
curdir
,
"keycert3.pem"
)
if
not
os
.
path
.
exists
(
CERTFILE
):
raise
support
.
TestFailed
(
"Can't read certificate files!"
)
CAFILE
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
)
or
os
.
curdir
,
"pycacert.pem"
)
if
not
os
.
path
.
exists
(
CAFILE
):
raise
support
.
TestFailed
(
"Can't read CA file!"
)
tests
.
extend
([
ThreadedNetworkedTests
,
ThreadedNetworkedTestsSSL
,
RemoteIMAPTest
,
RemoteIMAP_SSLTest
,
RemoteIMAP_STARTTLSTest
,
])
return
unittest
.
TestSuite
([
unittest
.
makeSuite
(
test
)
for
test
in
tests
])
self
.
assertRaises
(
ValueError
,
self
.
imap_class
,
self
.
host
,
self
.
port
,
keyfile
=
CERTFILE
,
ssl_context
=
self
.
create_ssl_context
())
if
__name__
==
"__main__"
:
...
...
Misc/NEWS
View file @
cac9e719
...
...
@@ -786,6 +786,8 @@ Documentation
Tests
-----
- Issue #22111: Assorted cleanups in test_imaplib. Patch by Milan Oberkirch.
- Issue #22002: Added ``load_package_tests`` function to test.support and used
it to implement/augment test discovery in test_asyncio, test_email,
test_importlib, test_json, and test_tools.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment