Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
gevent
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
gevent
Commits
897d20ff
Commit
897d20ff
authored
Dec 12, 2017
by
Jason Madden
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add and fix test_ftplib.py and test_asyncore.py for 3.6. See #1050 for a description of failures.
parent
4f9bb1d9
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
1965 additions
and
9 deletions
+1965
-9
CHANGES.rst
CHANGES.rst
+4
-0
src/gevent/_ssl3.py
src/gevent/_ssl3.py
+19
-8
src/gevent/_sslgte279.py
src/gevent/_sslgte279.py
+4
-1
src/greentest/3.6/test_asyncore.py
src/greentest/3.6/test_asyncore.py
+847
-0
src/greentest/3.6/test_ftplib.py
src/greentest/3.6/test_ftplib.py
+1091
-0
No files found.
CHANGES.rst
View file @
897d20ff
...
...
@@ -156,6 +156,10 @@
``filename`` attribute set.
- The :class:`threading.Timer` class is now monkey-patched and can
be joined.
- :meth:`gevent.ssl.SSLSocket.unwrap` behaves more like the standard
library, including returning a SSLSocket and allowing certain
timeout-related SSL errors to propagate. The added standard
library tests ``test_ftplib.py`` now passes.
1.2.2 (2017-06-05)
==================
...
...
src/gevent/_ssl3.py
View file @
897d20ff
...
...
@@ -142,6 +142,7 @@ class SSLSocket(socket):
server_hostname
=
None
,
_session
=
None
,
# 3.6
_context
=
None
):
# pylint:disable=too-many-locals,too-many-statements,too-many-branches
if
_context
:
self
.
_context
=
_context
...
...
@@ -513,22 +514,32 @@ class SSLSocket(socket):
s
=
self
.
_sslobj
.
shutdown
()
break
except
SSLWantReadError
:
# Callers of this method expect to get a socket
# back, so we can't simply return 0, we have
# to let these be raised
if
self
.
timeout
==
0.0
:
r
eturn
0
r
aise
self
.
_wait
(
self
.
_read_event
)
except
SSLWantWriteError
:
if
self
.
timeout
==
0.0
:
r
eturn
0
r
aise
self
.
_wait
(
self
.
_write_event
)
self
.
_sslobj
=
None
# The return value of shutting down the SSLObject is the
# original wrapped socket, i.e., _contextawaresock. But that
# object doesn't have the gevent wrapper around it so it can't
# be used. We have to wrap it back up with a gevent wrapper.
sock
=
socket
(
family
=
s
.
family
,
type
=
s
.
type
,
proto
=
s
.
proto
,
fileno
=
s
.
fileno
())
s
.
detach
()
return
sock
# original wrapped socket passed to _wrap_socket, i.e.,
# _contextawaresock. But that object doesn't have the
# gevent wrapper around it so it can't be used. We have to
# wrap it back up with a gevent wrapper.
assert
s
is
self
.
_sock
# In the stdlib, SSLSocket subclasses socket.socket and passes itself
# to _wrap_socket, so it gets itself back. We can't do that, we have to
# pass our subclass of _socket.socket, _contextawaresock.
# So ultimately we should return ourself.
# See test_ftplib.py:TestTLS_FTPClass.test_ccc
return
self
else
:
raise
ValueError
(
"No SSL wrapper around "
+
str
(
self
))
...
...
src/gevent/_sslgte279.py
View file @
897d20ff
...
...
@@ -553,7 +553,10 @@ class SSLSocket(socket):
if
self
.
_sslobj
:
s
=
self
.
_sslobj_shutdown
()
self
.
_sslobj
=
None
return
socket
(
_sock
=
s
)
# match _ssl2; critical to drop/reuse here on PyPy
# match _ssl2; critical to drop/reuse here on PyPy
# XXX: _ssl3 returns an SSLSocket. Is that what the standard lib does on
# Python 2? Should we do that?
return
socket
(
_sock
=
s
)
else
:
raise
ValueError
(
"No SSL wrapper around "
+
str
(
self
))
...
...
src/greentest/3.6/test_asyncore.py
0 → 100644
View file @
897d20ff
import
asyncore
import
unittest
import
select
import
os
import
socket
import
sys
import
time
import
errno
import
struct
from
test
import
support
from
io
import
BytesIO
if
support
.
PGO
:
raise
unittest
.
SkipTest
(
"test is not helpful for PGO"
)
try
:
import
threading
except
ImportError
:
threading
=
None
TIMEOUT
=
3
HAS_UNIX_SOCKETS
=
hasattr
(
socket
,
'AF_UNIX'
)
class
dummysocket
:
def
__init__
(
self
):
self
.
closed
=
False
def
close
(
self
):
self
.
closed
=
True
def
fileno
(
self
):
return
42
class
dummychannel
:
def
__init__
(
self
):
self
.
socket
=
dummysocket
()
def
close
(
self
):
self
.
socket
.
close
()
class
exitingdummy
:
def
__init__
(
self
):
pass
def
handle_read_event
(
self
):
raise
asyncore
.
ExitNow
()
handle_write_event
=
handle_read_event
handle_close
=
handle_read_event
handle_expt_event
=
handle_read_event
class
crashingdummy
:
def
__init__
(
self
):
self
.
error_handled
=
False
def
handle_read_event
(
self
):
raise
Exception
()
handle_write_event
=
handle_read_event
handle_close
=
handle_read_event
handle_expt_event
=
handle_read_event
def
handle_error
(
self
):
self
.
error_handled
=
True
# used when testing senders; just collects what it gets until newline is sent
def
capture_server
(
evt
,
buf
,
serv
):
try
:
serv
.
listen
()
conn
,
addr
=
serv
.
accept
()
except
socket
.
timeout
:
pass
else
:
n
=
200
start
=
time
.
time
()
while
n
>
0
and
time
.
time
()
-
start
<
3.0
:
r
,
w
,
e
=
select
.
select
([
conn
],
[],
[],
0.1
)
if
r
:
n
-=
1
data
=
conn
.
recv
(
10
)
# keep everything except for the newline terminator
buf
.
write
(
data
.
replace
(
b'
\
n
'
,
b''
))
if
b'
\
n
'
in
data
:
break
time
.
sleep
(
0.01
)
conn
.
close
()
finally
:
serv
.
close
()
evt
.
set
()
def
bind_af_aware
(
sock
,
addr
):
"""Helper function to bind a socket according to its family."""
if
HAS_UNIX_SOCKETS
and
sock
.
family
==
socket
.
AF_UNIX
:
# Make sure the path doesn't exist.
support
.
unlink
(
addr
)
support
.
bind_unix_socket
(
sock
,
addr
)
else
:
sock
.
bind
(
addr
)
class
HelperFunctionTests
(
unittest
.
TestCase
):
def
test_readwriteexc
(
self
):
# Check exception handling behavior of read, write and _exception
# check that ExitNow exceptions in the object handler method
# bubbles all the way up through asyncore read/write/_exception calls
tr1
=
exitingdummy
()
self
.
assertRaises
(
asyncore
.
ExitNow
,
asyncore
.
read
,
tr1
)
self
.
assertRaises
(
asyncore
.
ExitNow
,
asyncore
.
write
,
tr1
)
self
.
assertRaises
(
asyncore
.
ExitNow
,
asyncore
.
_exception
,
tr1
)
# check that an exception other than ExitNow in the object handler
# method causes the handle_error method to get called
tr2
=
crashingdummy
()
asyncore
.
read
(
tr2
)
self
.
assertEqual
(
tr2
.
error_handled
,
True
)
tr2
=
crashingdummy
()
asyncore
.
write
(
tr2
)
self
.
assertEqual
(
tr2
.
error_handled
,
True
)
tr2
=
crashingdummy
()
asyncore
.
_exception
(
tr2
)
self
.
assertEqual
(
tr2
.
error_handled
,
True
)
# asyncore.readwrite uses constants in the select module that
# are not present in Windows systems (see this thread:
# http://mail.python.org/pipermail/python-list/2001-October/109973.html)
# These constants should be present as long as poll is available
@
unittest
.
skipUnless
(
hasattr
(
select
,
'poll'
),
'select.poll required'
)
def
test_readwrite
(
self
):
# Check that correct methods are called by readwrite()
attributes
=
(
'read'
,
'expt'
,
'write'
,
'closed'
,
'error_handled'
)
expected
=
(
(
select
.
POLLIN
,
'read'
),
(
select
.
POLLPRI
,
'expt'
),
(
select
.
POLLOUT
,
'write'
),
(
select
.
POLLERR
,
'closed'
),
(
select
.
POLLHUP
,
'closed'
),
(
select
.
POLLNVAL
,
'closed'
),
)
class
testobj
:
def
__init__
(
self
):
self
.
read
=
False
self
.
write
=
False
self
.
closed
=
False
self
.
expt
=
False
self
.
error_handled
=
False
def
handle_read_event
(
self
):
self
.
read
=
True
def
handle_write_event
(
self
):
self
.
write
=
True
def
handle_close
(
self
):
self
.
closed
=
True
def
handle_expt_event
(
self
):
self
.
expt
=
True
def
handle_error
(
self
):
self
.
error_handled
=
True
for
flag
,
expectedattr
in
expected
:
tobj
=
testobj
()
self
.
assertEqual
(
getattr
(
tobj
,
expectedattr
),
False
)
asyncore
.
readwrite
(
tobj
,
flag
)
# Only the attribute modified by the routine we expect to be
# called should be True.
for
attr
in
attributes
:
self
.
assertEqual
(
getattr
(
tobj
,
attr
),
attr
==
expectedattr
)
# check that ExitNow exceptions in the object handler method
# bubbles all the way up through asyncore readwrite call
tr1
=
exitingdummy
()
self
.
assertRaises
(
asyncore
.
ExitNow
,
asyncore
.
readwrite
,
tr1
,
flag
)
# check that an exception other than ExitNow in the object handler
# method causes the handle_error method to get called
tr2
=
crashingdummy
()
self
.
assertEqual
(
tr2
.
error_handled
,
False
)
asyncore
.
readwrite
(
tr2
,
flag
)
self
.
assertEqual
(
tr2
.
error_handled
,
True
)
def
test_closeall
(
self
):
self
.
closeall_check
(
False
)
def
test_closeall_default
(
self
):
self
.
closeall_check
(
True
)
def
closeall_check
(
self
,
usedefault
):
# Check that close_all() closes everything in a given map
l
=
[]
testmap
=
{}
for
i
in
range
(
10
):
c
=
dummychannel
()
l
.
append
(
c
)
self
.
assertEqual
(
c
.
socket
.
closed
,
False
)
testmap
[
i
]
=
c
if
usedefault
:
socketmap
=
asyncore
.
socket_map
try
:
asyncore
.
socket_map
=
testmap
asyncore
.
close_all
()
finally
:
testmap
,
asyncore
.
socket_map
=
asyncore
.
socket_map
,
socketmap
else
:
asyncore
.
close_all
(
testmap
)
self
.
assertEqual
(
len
(
testmap
),
0
)
for
c
in
l
:
self
.
assertEqual
(
c
.
socket
.
closed
,
True
)
def
test_compact_traceback
(
self
):
try
:
raise
Exception
(
"I don't like spam!"
)
except
:
real_t
,
real_v
,
real_tb
=
sys
.
exc_info
()
r
=
asyncore
.
compact_traceback
()
else
:
self
.
fail
(
"Expected exception"
)
(
f
,
function
,
line
),
t
,
v
,
info
=
r
self
.
assertEqual
(
os
.
path
.
split
(
f
)[
-
1
],
'test_asyncore.py'
)
self
.
assertEqual
(
function
,
'test_compact_traceback'
)
self
.
assertEqual
(
t
,
real_t
)
self
.
assertEqual
(
v
,
real_v
)
self
.
assertEqual
(
info
,
'[%s|%s|%s]'
%
(
f
,
function
,
line
))
class
DispatcherTests
(
unittest
.
TestCase
):
def
setUp
(
self
):
pass
def
tearDown
(
self
):
asyncore
.
close_all
()
def
test_basic
(
self
):
d
=
asyncore
.
dispatcher
()
self
.
assertEqual
(
d
.
readable
(),
True
)
self
.
assertEqual
(
d
.
writable
(),
True
)
def
test_repr
(
self
):
d
=
asyncore
.
dispatcher
()
self
.
assertEqual
(
repr
(
d
),
'<asyncore.dispatcher at %#x>'
%
id
(
d
))
def
test_log
(
self
):
d
=
asyncore
.
dispatcher
()
# capture output of dispatcher.log() (to stderr)
l1
=
"Lovely spam! Wonderful spam!"
l2
=
"I don't like spam!"
with
support
.
captured_stderr
()
as
stderr
:
d
.
log
(
l1
)
d
.
log
(
l2
)
lines
=
stderr
.
getvalue
().
splitlines
()
self
.
assertEqual
(
lines
,
[
'log: %s'
%
l1
,
'log: %s'
%
l2
])
def
test_log_info
(
self
):
d
=
asyncore
.
dispatcher
()
# capture output of dispatcher.log_info() (to stdout via print)
l1
=
"Have you got anything without spam?"
l2
=
"Why can't she have egg bacon spam and sausage?"
l3
=
"THAT'S got spam in it!"
with
support
.
captured_stdout
()
as
stdout
:
d
.
log_info
(
l1
,
'EGGS'
)
d
.
log_info
(
l2
)
d
.
log_info
(
l3
,
'SPAM'
)
lines
=
stdout
.
getvalue
().
splitlines
()
expected
=
[
'EGGS: %s'
%
l1
,
'info: %s'
%
l2
,
'SPAM: %s'
%
l3
]
self
.
assertEqual
(
lines
,
expected
)
def
test_unhandled
(
self
):
d
=
asyncore
.
dispatcher
()
d
.
ignore_log_types
=
()
# capture output of dispatcher.log_info() (to stdout via print)
with
support
.
captured_stdout
()
as
stdout
:
d
.
handle_expt
()
d
.
handle_read
()
d
.
handle_write
()
d
.
handle_connect
()
lines
=
stdout
.
getvalue
().
splitlines
()
expected
=
[
'warning: unhandled incoming priority event'
,
'warning: unhandled read event'
,
'warning: unhandled write event'
,
'warning: unhandled connect event'
]
self
.
assertEqual
(
lines
,
expected
)
def
test_strerror
(
self
):
# refers to bug #8573
err
=
asyncore
.
_strerror
(
errno
.
EPERM
)
if
hasattr
(
os
,
'strerror'
):
self
.
assertEqual
(
err
,
os
.
strerror
(
errno
.
EPERM
))
err
=
asyncore
.
_strerror
(
-
1
)
self
.
assertTrue
(
err
!=
""
)
class
dispatcherwithsend_noread
(
asyncore
.
dispatcher_with_send
):
def
readable
(
self
):
return
False
def
handle_connect
(
self
):
pass
class
DispatcherWithSendTests
(
unittest
.
TestCase
):
def
setUp
(
self
):
pass
def
tearDown
(
self
):
asyncore
.
close_all
()
@
unittest
.
skipUnless
(
threading
,
'Threading required for this test.'
)
@
support
.
reap_threads
def
test_send
(
self
):
evt
=
threading
.
Event
()
sock
=
socket
.
socket
()
sock
.
settimeout
(
3
)
port
=
support
.
bind_port
(
sock
)
cap
=
BytesIO
()
args
=
(
evt
,
cap
,
sock
)
t
=
threading
.
Thread
(
target
=
capture_server
,
args
=
args
)
t
.
start
()
try
:
# wait a little longer for the server to initialize (it sometimes
# refuses connections on slow machines without this wait)
time
.
sleep
(
0.2
)
data
=
b"Suppose there isn't a 16-ton weight?"
d
=
dispatcherwithsend_noread
()
d
.
create_socket
()
d
.
connect
((
support
.
HOST
,
port
))
# give time for socket to connect
time
.
sleep
(
0.1
)
d
.
send
(
data
)
d
.
send
(
data
)
d
.
send
(
b'
\
n
'
)
n
=
1000
while
d
.
out_buffer
and
n
>
0
:
asyncore
.
poll
()
n
-=
1
evt
.
wait
()
self
.
assertEqual
(
cap
.
getvalue
(),
data
*
2
)
finally
:
t
.
join
(
timeout
=
TIMEOUT
)
if
t
.
is_alive
():
self
.
fail
(
"join() timed out"
)
@
unittest
.
skipUnless
(
hasattr
(
asyncore
,
'file_wrapper'
),
'asyncore.file_wrapper required'
)
class
FileWrapperTest
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
d
=
b"It's not dead, it's sleeping!"
with
open
(
support
.
TESTFN
,
'wb'
)
as
file
:
file
.
write
(
self
.
d
)
def
tearDown
(
self
):
support
.
unlink
(
support
.
TESTFN
)
def
test_recv
(
self
):
fd
=
os
.
open
(
support
.
TESTFN
,
os
.
O_RDONLY
)
w
=
asyncore
.
file_wrapper
(
fd
)
os
.
close
(
fd
)
self
.
assertNotEqual
(
w
.
fd
,
fd
)
self
.
assertNotEqual
(
w
.
fileno
(),
fd
)
self
.
assertEqual
(
w
.
recv
(
13
),
b"It's not dead"
)
self
.
assertEqual
(
w
.
read
(
6
),
b", it's"
)
w
.
close
()
self
.
assertRaises
(
OSError
,
w
.
read
,
1
)
def
test_send
(
self
):
d1
=
b"Come again?"
d2
=
b"I want to buy some cheese."
fd
=
os
.
open
(
support
.
TESTFN
,
os
.
O_WRONLY
|
os
.
O_APPEND
)
w
=
asyncore
.
file_wrapper
(
fd
)
os
.
close
(
fd
)
w
.
write
(
d1
)
w
.
send
(
d2
)
w
.
close
()
with
open
(
support
.
TESTFN
,
'rb'
)
as
file
:
self
.
assertEqual
(
file
.
read
(),
self
.
d
+
d1
+
d2
)
@
unittest
.
skipUnless
(
hasattr
(
asyncore
,
'file_dispatcher'
),
'asyncore.file_dispatcher required'
)
def
test_dispatcher
(
self
):
fd
=
os
.
open
(
support
.
TESTFN
,
os
.
O_RDONLY
)
data
=
[]
class
FileDispatcher
(
asyncore
.
file_dispatcher
):
def
handle_read
(
self
):
data
.
append
(
self
.
recv
(
29
))
s
=
FileDispatcher
(
fd
)
os
.
close
(
fd
)
asyncore
.
loop
(
timeout
=
0.01
,
use_poll
=
True
,
count
=
2
)
self
.
assertEqual
(
b""
.
join
(
data
),
self
.
d
)
def
test_resource_warning
(
self
):
# Issue #11453
fd
=
os
.
open
(
support
.
TESTFN
,
os
.
O_RDONLY
)
f
=
asyncore
.
file_wrapper
(
fd
)
os
.
close
(
fd
)
with
support
.
check_warnings
((
''
,
ResourceWarning
)):
f
=
None
support
.
gc_collect
()
def
test_close_twice
(
self
):
fd
=
os
.
open
(
support
.
TESTFN
,
os
.
O_RDONLY
)
f
=
asyncore
.
file_wrapper
(
fd
)
os
.
close
(
fd
)
os
.
close
(
f
.
fd
)
# file_wrapper dupped fd
with
self
.
assertRaises
(
OSError
):
f
.
close
()
self
.
assertEqual
(
f
.
fd
,
-
1
)
# calling close twice should not fail
f
.
close
()
class
BaseTestHandler
(
asyncore
.
dispatcher
):
def
__init__
(
self
,
sock
=
None
):
asyncore
.
dispatcher
.
__init__
(
self
,
sock
)
self
.
flag
=
False
def
handle_accept
(
self
):
raise
Exception
(
"handle_accept not supposed to be called"
)
def
handle_accepted
(
self
):
raise
Exception
(
"handle_accepted not supposed to be called"
)
def
handle_connect
(
self
):
raise
Exception
(
"handle_connect not supposed to be called"
)
def
handle_expt
(
self
):
raise
Exception
(
"handle_expt not supposed to be called"
)
def
handle_close
(
self
):
raise
Exception
(
"handle_close not supposed to be called"
)
def
handle_error
(
self
):
raise
class
BaseServer
(
asyncore
.
dispatcher
):
"""A server which listens on an address and dispatches the
connection to a handler.
"""
def
__init__
(
self
,
family
,
addr
,
handler
=
BaseTestHandler
):
asyncore
.
dispatcher
.
__init__
(
self
)
self
.
create_socket
(
family
)
self
.
set_reuse_addr
()
bind_af_aware
(
self
.
socket
,
addr
)
self
.
listen
(
5
)
self
.
handler
=
handler
@
property
def
address
(
self
):
return
self
.
socket
.
getsockname
()
def
handle_accepted
(
self
,
sock
,
addr
):
self
.
handler
(
sock
)
def
handle_error
(
self
):
raise
class
BaseClient
(
BaseTestHandler
):
def
__init__
(
self
,
family
,
address
):
BaseTestHandler
.
__init__
(
self
)
self
.
create_socket
(
family
)
self
.
connect
(
address
)
def
handle_connect
(
self
):
pass
class
BaseTestAPI
:
def
tearDown
(
self
):
asyncore
.
close_all
(
ignore_all
=
True
)
def
loop_waiting_for_flag
(
self
,
instance
,
timeout
=
5
):
timeout
=
float
(
timeout
)
/
100
count
=
100
while
asyncore
.
socket_map
and
count
>
0
:
asyncore
.
loop
(
timeout
=
0.01
,
count
=
1
,
use_poll
=
self
.
use_poll
)
if
instance
.
flag
:
return
count
-=
1
time
.
sleep
(
timeout
)
self
.
fail
(
"flag not set"
)
def
test_handle_connect
(
self
):
# make sure handle_connect is called on connect()
class
TestClient
(
BaseClient
):
def
handle_connect
(
self
):
self
.
flag
=
True
server
=
BaseServer
(
self
.
family
,
self
.
addr
)
client
=
TestClient
(
self
.
family
,
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_handle_accept
(
self
):
# make sure handle_accept() is called when a client connects
class
TestListener
(
BaseTestHandler
):
def
__init__
(
self
,
family
,
addr
):
BaseTestHandler
.
__init__
(
self
)
self
.
create_socket
(
family
)
bind_af_aware
(
self
.
socket
,
addr
)
self
.
listen
(
5
)
self
.
address
=
self
.
socket
.
getsockname
()
def
handle_accept
(
self
):
self
.
flag
=
True
server
=
TestListener
(
self
.
family
,
self
.
addr
)
client
=
BaseClient
(
self
.
family
,
server
.
address
)
self
.
loop_waiting_for_flag
(
server
)
def
test_handle_accepted
(
self
):
# make sure handle_accepted() is called when a client connects
class
TestListener
(
BaseTestHandler
):
def
__init__
(
self
,
family
,
addr
):
BaseTestHandler
.
__init__
(
self
)
self
.
create_socket
(
family
)
bind_af_aware
(
self
.
socket
,
addr
)
self
.
listen
(
5
)
self
.
address
=
self
.
socket
.
getsockname
()
def
handle_accept
(
self
):
asyncore
.
dispatcher
.
handle_accept
(
self
)
def
handle_accepted
(
self
,
sock
,
addr
):
sock
.
close
()
self
.
flag
=
True
server
=
TestListener
(
self
.
family
,
self
.
addr
)
client
=
BaseClient
(
self
.
family
,
server
.
address
)
self
.
loop_waiting_for_flag
(
server
)
def
test_handle_read
(
self
):
# make sure handle_read is called on data received
class
TestClient
(
BaseClient
):
def
handle_read
(
self
):
self
.
flag
=
True
class
TestHandler
(
BaseTestHandler
):
def
__init__
(
self
,
conn
):
BaseTestHandler
.
__init__
(
self
,
conn
)
self
.
send
(
b'x'
*
1024
)
server
=
BaseServer
(
self
.
family
,
self
.
addr
,
TestHandler
)
client
=
TestClient
(
self
.
family
,
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_handle_write
(
self
):
# make sure handle_write is called
class
TestClient
(
BaseClient
):
def
handle_write
(
self
):
self
.
flag
=
True
server
=
BaseServer
(
self
.
family
,
self
.
addr
)
client
=
TestClient
(
self
.
family
,
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_handle_close
(
self
):
# make sure handle_close is called when the other end closes
# the connection
class
TestClient
(
BaseClient
):
def
handle_read
(
self
):
# in order to make handle_close be called we are supposed
# to make at least one recv() call
self
.
recv
(
1024
)
def
handle_close
(
self
):
self
.
flag
=
True
self
.
close
()
class
TestHandler
(
BaseTestHandler
):
def
__init__
(
self
,
conn
):
BaseTestHandler
.
__init__
(
self
,
conn
)
self
.
close
()
server
=
BaseServer
(
self
.
family
,
self
.
addr
,
TestHandler
)
client
=
TestClient
(
self
.
family
,
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_handle_close_after_conn_broken
(
self
):
# Check that ECONNRESET/EPIPE is correctly handled (issues #5661 and
# #11265).
data
=
b'
\
0
'
*
128
class
TestClient
(
BaseClient
):
def
handle_write
(
self
):
self
.
send
(
data
)
def
handle_close
(
self
):
self
.
flag
=
True
self
.
close
()
def
handle_expt
(
self
):
self
.
flag
=
True
self
.
close
()
class
TestHandler
(
BaseTestHandler
):
def
handle_read
(
self
):
self
.
recv
(
len
(
data
))
self
.
close
()
def
writable
(
self
):
return
False
server
=
BaseServer
(
self
.
family
,
self
.
addr
,
TestHandler
)
client
=
TestClient
(
self
.
family
,
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
@
unittest
.
skipIf
(
sys
.
platform
.
startswith
(
"sunos"
),
"OOB support is broken on Solaris"
)
def
test_handle_expt
(
self
):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
if
HAS_UNIX_SOCKETS
and
self
.
family
==
socket
.
AF_UNIX
:
self
.
skipTest
(
"Not applicable to AF_UNIX sockets."
)
if
sys
.
platform
==
"darwin"
and
self
.
use_poll
:
self
.
skipTest
(
"poll may fail on macOS; see issue #28087"
)
class
TestClient
(
BaseClient
):
def
handle_expt
(
self
):
self
.
socket
.
recv
(
1024
,
socket
.
MSG_OOB
)
self
.
flag
=
True
class
TestHandler
(
BaseTestHandler
):
def
__init__
(
self
,
conn
):
BaseTestHandler
.
__init__
(
self
,
conn
)
self
.
socket
.
send
(
bytes
(
chr
(
244
),
'latin-1'
),
socket
.
MSG_OOB
)
server
=
BaseServer
(
self
.
family
,
self
.
addr
,
TestHandler
)
client
=
TestClient
(
self
.
family
,
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_handle_error
(
self
):
class
TestClient
(
BaseClient
):
def
handle_write
(
self
):
1.0
/
0
def
handle_error
(
self
):
self
.
flag
=
True
try
:
raise
except
ZeroDivisionError
:
pass
else
:
raise
Exception
(
"exception not raised"
)
server
=
BaseServer
(
self
.
family
,
self
.
addr
)
client
=
TestClient
(
self
.
family
,
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_connection_attributes
(
self
):
server
=
BaseServer
(
self
.
family
,
self
.
addr
)
client
=
BaseClient
(
self
.
family
,
server
.
address
)
# we start disconnected
self
.
assertFalse
(
server
.
connected
)
self
.
assertTrue
(
server
.
accepting
)
# this can't be taken for granted across all platforms
#self.assertFalse(client.connected)
self
.
assertFalse
(
client
.
accepting
)
# execute some loops so that client connects to server
asyncore
.
loop
(
timeout
=
0.01
,
use_poll
=
self
.
use_poll
,
count
=
100
)
self
.
assertFalse
(
server
.
connected
)
self
.
assertTrue
(
server
.
accepting
)
self
.
assertTrue
(
client
.
connected
)
self
.
assertFalse
(
client
.
accepting
)
# disconnect the client
client
.
close
()
self
.
assertFalse
(
server
.
connected
)
self
.
assertTrue
(
server
.
accepting
)
self
.
assertFalse
(
client
.
connected
)
self
.
assertFalse
(
client
.
accepting
)
# stop serving
server
.
close
()
self
.
assertFalse
(
server
.
connected
)
self
.
assertFalse
(
server
.
accepting
)
def
test_create_socket
(
self
):
s
=
asyncore
.
dispatcher
()
s
.
create_socket
(
self
.
family
)
self
.
assertEqual
(
s
.
socket
.
family
,
self
.
family
)
SOCK_NONBLOCK
=
getattr
(
socket
,
'SOCK_NONBLOCK'
,
0
)
sock_type
=
socket
.
SOCK_STREAM
|
SOCK_NONBLOCK
if
hasattr
(
socket
,
'SOCK_CLOEXEC'
):
self
.
assertIn
(
s
.
socket
.
type
,
(
sock_type
|
socket
.
SOCK_CLOEXEC
,
sock_type
))
else
:
self
.
assertEqual
(
s
.
socket
.
type
,
sock_type
)
def
test_bind
(
self
):
if
HAS_UNIX_SOCKETS
and
self
.
family
==
socket
.
AF_UNIX
:
self
.
skipTest
(
"Not applicable to AF_UNIX sockets."
)
s1
=
asyncore
.
dispatcher
()
s1
.
create_socket
(
self
.
family
)
s1
.
bind
(
self
.
addr
)
s1
.
listen
(
5
)
port
=
s1
.
socket
.
getsockname
()[
1
]
s2
=
asyncore
.
dispatcher
()
s2
.
create_socket
(
self
.
family
)
# EADDRINUSE indicates the socket was correctly bound
self
.
assertRaises
(
OSError
,
s2
.
bind
,
(
self
.
addr
[
0
],
port
))
def
test_set_reuse_addr
(
self
):
if
HAS_UNIX_SOCKETS
and
self
.
family
==
socket
.
AF_UNIX
:
self
.
skipTest
(
"Not applicable to AF_UNIX sockets."
)
with
socket
.
socket
(
self
.
family
)
as
sock
:
try
:
sock
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
except
OSError
:
unittest
.
skip
(
"SO_REUSEADDR not supported on this platform"
)
else
:
# if SO_REUSEADDR succeeded for sock we expect asyncore
# to do the same
s
=
asyncore
.
dispatcher
(
socket
.
socket
(
self
.
family
))
self
.
assertFalse
(
s
.
socket
.
getsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
))
s
.
socket
.
close
()
s
.
create_socket
(
self
.
family
)
s
.
set_reuse_addr
()
self
.
assertTrue
(
s
.
socket
.
getsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
))
@
unittest
.
skipUnless
(
threading
,
'Threading required for this test.'
)
@
support
.
reap_threads
def
test_quick_connect
(
self
):
# see: http://bugs.python.org/issue10340
if
self
.
family
not
in
(
socket
.
AF_INET
,
getattr
(
socket
,
"AF_INET6"
,
object
())):
self
.
skipTest
(
"test specific to AF_INET and AF_INET6"
)
server
=
BaseServer
(
self
.
family
,
self
.
addr
)
# run the thread 500 ms: the socket should be connected in 200 ms
t
=
threading
.
Thread
(
target
=
lambda
:
asyncore
.
loop
(
timeout
=
0.1
,
count
=
5
))
t
.
start
()
try
:
with
socket
.
socket
(
self
.
family
,
socket
.
SOCK_STREAM
)
as
s
:
s
.
settimeout
(.
2
)
s
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_LINGER
,
struct
.
pack
(
'ii'
,
1
,
0
))
try
:
s
.
connect
(
server
.
address
)
except
OSError
:
pass
finally
:
t
.
join
(
timeout
=
TIMEOUT
)
if
t
.
is_alive
():
self
.
fail
(
"join() timed out"
)
class
TestAPI_UseIPv4Sockets
(
BaseTestAPI
):
family
=
socket
.
AF_INET
addr
=
(
support
.
HOST
,
0
)
@
unittest
.
skipUnless
(
support
.
IPV6_ENABLED
,
'IPv6 support required'
)
class
TestAPI_UseIPv6Sockets
(
BaseTestAPI
):
family
=
socket
.
AF_INET6
addr
=
(
support
.
HOSTv6
,
0
)
@
unittest
.
skipUnless
(
HAS_UNIX_SOCKETS
,
'Unix sockets required'
)
class
TestAPI_UseUnixSockets
(
BaseTestAPI
):
if
HAS_UNIX_SOCKETS
:
family
=
socket
.
AF_UNIX
addr
=
support
.
TESTFN
def
tearDown
(
self
):
support
.
unlink
(
self
.
addr
)
BaseTestAPI
.
tearDown
(
self
)
class
TestAPI_UseIPv4Select
(
TestAPI_UseIPv4Sockets
,
unittest
.
TestCase
):
use_poll
=
False
@
unittest
.
skipUnless
(
hasattr
(
select
,
'poll'
),
'select.poll required'
)
class
TestAPI_UseIPv4Poll
(
TestAPI_UseIPv4Sockets
,
unittest
.
TestCase
):
use_poll
=
True
class
TestAPI_UseIPv6Select
(
TestAPI_UseIPv6Sockets
,
unittest
.
TestCase
):
use_poll
=
False
@
unittest
.
skipUnless
(
hasattr
(
select
,
'poll'
),
'select.poll required'
)
class
TestAPI_UseIPv6Poll
(
TestAPI_UseIPv6Sockets
,
unittest
.
TestCase
):
use_poll
=
True
class
TestAPI_UseUnixSocketsSelect
(
TestAPI_UseUnixSockets
,
unittest
.
TestCase
):
use_poll
=
False
@
unittest
.
skipUnless
(
hasattr
(
select
,
'poll'
),
'select.poll required'
)
class
TestAPI_UseUnixSocketsPoll
(
TestAPI_UseUnixSockets
,
unittest
.
TestCase
):
use_poll
=
True
if
__name__
==
"__main__"
:
unittest
.
main
()
src/greentest/3.6/test_ftplib.py
0 → 100644
View file @
897d20ff
"""Test script for ftplib module."""
# Modified by Giampaolo Rodola' to test FTP class, IPv6 and TLS
# environment
import
ftplib
import
asyncore
import
asynchat
import
socket
import
io
import
errno
import
os
import
time
try
:
import
ssl
except
ImportError
:
ssl
=
None
from
unittest
import
TestCase
,
skipUnless
from
test
import
support
from
test.support
import
HOST
,
HOSTv6
threading
=
support
.
import_module
(
'threading'
)
TIMEOUT
=
3
# the dummy data returned by server over the data channel when
# RETR, LIST, NLST, MLSD commands are issued
RETR_DATA
=
'abcde12345
\
r
\
n
'
*
1000
LIST_DATA
=
'foo
\
r
\
n
bar
\
r
\
n
'
NLST_DATA
=
'foo
\
r
\
n
bar
\
r
\
n
'
MLSD_DATA
=
(
"type=cdir;perm=el;unique==keVO1+ZF4; test
\
r
\
n
"
"type=pdir;perm=e;unique==keVO1+d?3; ..
\
r
\
n
"
"type=OS.unix=slink:/foobar;perm=;unique==keVO1+4G4; foobar
\
r
\
n
"
"type=OS.unix=chr-13/29;perm=;unique==keVO1+5G4; device
\
r
\
n
"
"type=OS.unix=blk-11/108;perm=;unique==keVO1+6G4; block
\
r
\
n
"
"type=file;perm=awr;unique==keVO1+8G4; writable
\
r
\
n
"
"type=dir;perm=cpmel;unique==keVO1+7G4; promiscuous
\
r
\
n
"
"type=dir;perm=;unique==keVO1+1t2; no-exec
\
r
\
n
"
"type=file;perm=r;unique==keVO1+EG4; two words
\
r
\
n
"
"type=file;perm=r;unique==keVO1+IH4; leading space
\
r
\
n
"
"type=file;perm=r;unique==keVO1+1G4; file1
\
r
\
n
"
"type=dir;perm=cpmel;unique==keVO1+7G4; incoming
\
r
\
n
"
"type=file;perm=r;unique==keVO1+1G4; file2
\
r
\
n
"
"type=file;perm=r;unique==keVO1+1G4; file3
\
r
\
n
"
"type=file;perm=r;unique==keVO1+1G4; file4
\
r
\
n
"
)
class
DummyDTPHandler
(
asynchat
.
async_chat
):
dtp_conn_closed
=
False
def
__init__
(
self
,
conn
,
baseclass
):
asynchat
.
async_chat
.
__init__
(
self
,
conn
)
self
.
baseclass
=
baseclass
self
.
baseclass
.
last_received_data
=
''
def
handle_read
(
self
):
self
.
baseclass
.
last_received_data
+=
self
.
recv
(
1024
).
decode
(
'ascii'
)
def
handle_close
(
self
):
# XXX: this method can be called many times in a row for a single
# connection, including in clear-text (non-TLS) mode.
# (behaviour witnessed with test_data_connection)
if
not
self
.
dtp_conn_closed
:
self
.
baseclass
.
push
(
'226 transfer complete'
)
self
.
close
()
self
.
dtp_conn_closed
=
True
def
push
(
self
,
what
):
if
self
.
baseclass
.
next_data
is
not
None
:
what
=
self
.
baseclass
.
next_data
self
.
baseclass
.
next_data
=
None
if
not
what
:
return
self
.
close_when_done
()
super
(
DummyDTPHandler
,
self
).
push
(
what
.
encode
(
'ascii'
))
def
handle_error
(
self
):
raise
Exception
class
DummyFTPHandler
(
asynchat
.
async_chat
):
dtp_handler
=
DummyDTPHandler
def
__init__
(
self
,
conn
):
asynchat
.
async_chat
.
__init__
(
self
,
conn
)
# tells the socket to handle urgent data inline (ABOR command)
self
.
socket
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_OOBINLINE
,
1
)
self
.
set_terminator
(
b"
\
r
\
n
"
)
self
.
in_buffer
=
[]
self
.
dtp
=
None
self
.
last_received_cmd
=
None
self
.
last_received_data
=
''
self
.
next_response
=
''
self
.
next_data
=
None
self
.
rest
=
None
self
.
next_retr_data
=
RETR_DATA
self
.
push
(
'220 welcome'
)
def
collect_incoming_data
(
self
,
data
):
self
.
in_buffer
.
append
(
data
)
def
found_terminator
(
self
):
line
=
b''
.
join
(
self
.
in_buffer
).
decode
(
'ascii'
)
self
.
in_buffer
=
[]
if
self
.
next_response
:
self
.
push
(
self
.
next_response
)
self
.
next_response
=
''
cmd
=
line
.
split
(
' '
)[
0
].
lower
()
self
.
last_received_cmd
=
cmd
space
=
line
.
find
(
' '
)
if
space
!=
-
1
:
arg
=
line
[
space
+
1
:]
else
:
arg
=
""
if
hasattr
(
self
,
'cmd_'
+
cmd
):
method
=
getattr
(
self
,
'cmd_'
+
cmd
)
method
(
arg
)
else
:
self
.
push
(
'550 command "%s" not understood.'
%
cmd
)
def
handle_error
(
self
):
raise
Exception
def
push
(
self
,
data
):
asynchat
.
async_chat
.
push
(
self
,
data
.
encode
(
'ascii'
)
+
b'
\
r
\
n
'
)
def
cmd_port
(
self
,
arg
):
addr
=
list
(
map
(
int
,
arg
.
split
(
','
)))
ip
=
'%d.%d.%d.%d'
%
tuple
(
addr
[:
4
])
port
=
(
addr
[
4
]
*
256
)
+
addr
[
5
]
s
=
socket
.
create_connection
((
ip
,
port
),
timeout
=
TIMEOUT
)
self
.
dtp
=
self
.
dtp_handler
(
s
,
baseclass
=
self
)
self
.
push
(
'200 active data connection established'
)
def
cmd_pasv
(
self
,
arg
):
with
socket
.
socket
()
as
sock
:
sock
.
bind
((
self
.
socket
.
getsockname
()[
0
],
0
))
sock
.
listen
()
sock
.
settimeout
(
TIMEOUT
)
ip
,
port
=
sock
.
getsockname
()[:
2
]
ip
=
ip
.
replace
(
'.'
,
','
);
p1
=
port
/
256
;
p2
=
port
%
256
self
.
push
(
'227 entering passive mode (%s,%d,%d)'
%
(
ip
,
p1
,
p2
))
conn
,
addr
=
sock
.
accept
()
self
.
dtp
=
self
.
dtp_handler
(
conn
,
baseclass
=
self
)
def
cmd_eprt
(
self
,
arg
):
af
,
ip
,
port
=
arg
.
split
(
arg
[
0
])[
1
:
-
1
]
port
=
int
(
port
)
s
=
socket
.
create_connection
((
ip
,
port
),
timeout
=
TIMEOUT
)
self
.
dtp
=
self
.
dtp_handler
(
s
,
baseclass
=
self
)
self
.
push
(
'200 active data connection established'
)
def
cmd_epsv
(
self
,
arg
):
with
socket
.
socket
(
socket
.
AF_INET6
)
as
sock
:
sock
.
bind
((
self
.
socket
.
getsockname
()[
0
],
0
))
sock
.
listen
()
sock
.
settimeout
(
TIMEOUT
)
port
=
sock
.
getsockname
()[
1
]
self
.
push
(
'229 entering extended passive mode (|||%d|)'
%
port
)
conn
,
addr
=
sock
.
accept
()
self
.
dtp
=
self
.
dtp_handler
(
conn
,
baseclass
=
self
)
def
cmd_echo
(
self
,
arg
):
# sends back the received string (used by the test suite)
self
.
push
(
arg
)
def
cmd_noop
(
self
,
arg
):
self
.
push
(
'200 noop ok'
)
def
cmd_user
(
self
,
arg
):
self
.
push
(
'331 username ok'
)
def
cmd_pass
(
self
,
arg
):
self
.
push
(
'230 password ok'
)
def
cmd_acct
(
self
,
arg
):
self
.
push
(
'230 acct ok'
)
def
cmd_rnfr
(
self
,
arg
):
self
.
push
(
'350 rnfr ok'
)
def
cmd_rnto
(
self
,
arg
):
self
.
push
(
'250 rnto ok'
)
def
cmd_dele
(
self
,
arg
):
self
.
push
(
'250 dele ok'
)
def
cmd_cwd
(
self
,
arg
):
self
.
push
(
'250 cwd ok'
)
def
cmd_size
(
self
,
arg
):
self
.
push
(
'250 1000'
)
def
cmd_mkd
(
self
,
arg
):
self
.
push
(
'257 "%s"'
%
arg
)
def
cmd_rmd
(
self
,
arg
):
self
.
push
(
'250 rmd ok'
)
def
cmd_pwd
(
self
,
arg
):
self
.
push
(
'257 "pwd ok"'
)
def
cmd_type
(
self
,
arg
):
self
.
push
(
'200 type ok'
)
def
cmd_quit
(
self
,
arg
):
self
.
push
(
'221 quit ok'
)
self
.
close
()
def
cmd_abor
(
self
,
arg
):
self
.
push
(
'226 abor ok'
)
def
cmd_stor
(
self
,
arg
):
self
.
push
(
'125 stor ok'
)
def
cmd_rest
(
self
,
arg
):
self
.
rest
=
arg
self
.
push
(
'350 rest ok'
)
def
cmd_retr
(
self
,
arg
):
self
.
push
(
'125 retr ok'
)
if
self
.
rest
is
not
None
:
offset
=
int
(
self
.
rest
)
else
:
offset
=
0
self
.
dtp
.
push
(
self
.
next_retr_data
[
offset
:])
self
.
dtp
.
close_when_done
()
self
.
rest
=
None
def
cmd_list
(
self
,
arg
):
self
.
push
(
'125 list ok'
)
self
.
dtp
.
push
(
LIST_DATA
)
self
.
dtp
.
close_when_done
()
def
cmd_nlst
(
self
,
arg
):
self
.
push
(
'125 nlst ok'
)
self
.
dtp
.
push
(
NLST_DATA
)
self
.
dtp
.
close_when_done
()
def
cmd_opts
(
self
,
arg
):
self
.
push
(
'200 opts ok'
)
def
cmd_mlsd
(
self
,
arg
):
self
.
push
(
'125 mlsd ok'
)
self
.
dtp
.
push
(
MLSD_DATA
)
self
.
dtp
.
close_when_done
()
def
cmd_setlongretr
(
self
,
arg
):
# For testing. Next RETR will return long line.
self
.
next_retr_data
=
'x'
*
int
(
arg
)
self
.
push
(
'125 setlongretr ok'
)
class
DummyFTPServer
(
asyncore
.
dispatcher
,
threading
.
Thread
):
handler
=
DummyFTPHandler
def
__init__
(
self
,
address
,
af
=
socket
.
AF_INET
):
threading
.
Thread
.
__init__
(
self
)
asyncore
.
dispatcher
.
__init__
(
self
)
self
.
create_socket
(
af
,
socket
.
SOCK_STREAM
)
self
.
bind
(
address
)
self
.
listen
(
5
)
self
.
active
=
False
self
.
active_lock
=
threading
.
Lock
()
self
.
host
,
self
.
port
=
self
.
socket
.
getsockname
()[:
2
]
self
.
handler_instance
=
None
def
start
(
self
):
assert
not
self
.
active
self
.
__flag
=
threading
.
Event
()
threading
.
Thread
.
start
(
self
)
self
.
__flag
.
wait
()
def
run
(
self
):
self
.
active
=
True
self
.
__flag
.
set
()
while
self
.
active
and
asyncore
.
socket_map
:
self
.
active_lock
.
acquire
()
asyncore
.
loop
(
timeout
=
0.1
,
count
=
1
)
self
.
active_lock
.
release
()
asyncore
.
close_all
(
ignore_all
=
True
)
def
stop
(
self
):
assert
self
.
active
self
.
active
=
False
self
.
join
()
def
handle_accepted
(
self
,
conn
,
addr
):
self
.
handler_instance
=
self
.
handler
(
conn
)
def
handle_connect
(
self
):
self
.
close
()
handle_read
=
handle_connect
def
writable
(
self
):
return
0
def
handle_error
(
self
):
raise
Exception
if
ssl
is
not
None
:
CERTFILE
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"keycert3.pem"
)
CAFILE
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
"pycacert.pem"
)
class
SSLConnection
(
asyncore
.
dispatcher
):
"""An asyncore.dispatcher subclass supporting TLS/SSL."""
_ssl_accepting
=
False
_ssl_closing
=
False
def
secure_connection
(
self
):
context
=
ssl
.
SSLContext
()
context
.
load_cert_chain
(
CERTFILE
)
socket
=
context
.
wrap_socket
(
self
.
socket
,
suppress_ragged_eofs
=
False
,
server_side
=
True
,
do_handshake_on_connect
=
False
)
self
.
del_channel
()
self
.
set_socket
(
socket
)
self
.
_ssl_accepting
=
True
def
_do_ssl_handshake
(
self
):
try
:
self
.
socket
.
do_handshake
()
except
ssl
.
SSLError
as
err
:
if
err
.
args
[
0
]
in
(
ssl
.
SSL_ERROR_WANT_READ
,
ssl
.
SSL_ERROR_WANT_WRITE
):
return
elif
err
.
args
[
0
]
==
ssl
.
SSL_ERROR_EOF
:
return
self
.
handle_close
()
raise
except
OSError
as
err
:
if
err
.
args
[
0
]
==
errno
.
ECONNABORTED
:
return
self
.
handle_close
()
else
:
self
.
_ssl_accepting
=
False
def
_do_ssl_shutdown
(
self
):
self
.
_ssl_closing
=
True
try
:
self
.
socket
=
self
.
socket
.
unwrap
()
except
ssl
.
SSLError
as
err
:
if
err
.
args
[
0
]
in
(
ssl
.
SSL_ERROR_WANT_READ
,
ssl
.
SSL_ERROR_WANT_WRITE
):
return
except
OSError
as
err
:
# Any "socket error" corresponds to a SSL_ERROR_SYSCALL return
# from OpenSSL's SSL_shutdown(), corresponding to a
# closed socket condition. See also:
# http://www.mail-archive.com/openssl-users@openssl.org/msg60710.html
pass
self
.
_ssl_closing
=
False
if
getattr
(
self
,
'_ccc'
,
False
)
is
False
:
super
(
SSLConnection
,
self
).
close
()
else
:
pass
def
handle_read_event
(
self
):
if
self
.
_ssl_accepting
:
self
.
_do_ssl_handshake
()
elif
self
.
_ssl_closing
:
self
.
_do_ssl_shutdown
()
else
:
super
(
SSLConnection
,
self
).
handle_read_event
()
def
handle_write_event
(
self
):
if
self
.
_ssl_accepting
:
self
.
_do_ssl_handshake
()
elif
self
.
_ssl_closing
:
self
.
_do_ssl_shutdown
()
else
:
super
(
SSLConnection
,
self
).
handle_write_event
()
def
send
(
self
,
data
):
try
:
return
super
(
SSLConnection
,
self
).
send
(
data
)
except
ssl
.
SSLError
as
err
:
if
err
.
args
[
0
]
in
(
ssl
.
SSL_ERROR_EOF
,
ssl
.
SSL_ERROR_ZERO_RETURN
,
ssl
.
SSL_ERROR_WANT_READ
,
ssl
.
SSL_ERROR_WANT_WRITE
):
return
0
raise
def
recv
(
self
,
buffer_size
):
try
:
return
super
(
SSLConnection
,
self
).
recv
(
buffer_size
)
except
ssl
.
SSLError
as
err
:
if
err
.
args
[
0
]
in
(
ssl
.
SSL_ERROR_WANT_READ
,
ssl
.
SSL_ERROR_WANT_WRITE
):
return
b''
if
err
.
args
[
0
]
in
(
ssl
.
SSL_ERROR_EOF
,
ssl
.
SSL_ERROR_ZERO_RETURN
):
self
.
handle_close
()
return
b''
raise
def
handle_error
(
self
):
raise
Exception
def
close
(
self
):
if
(
isinstance
(
self
.
socket
,
ssl
.
SSLSocket
)
and
self
.
socket
.
_sslobj
is
not
None
):
self
.
_do_ssl_shutdown
()
else
:
super
(
SSLConnection
,
self
).
close
()
class
DummyTLS_DTPHandler
(
SSLConnection
,
DummyDTPHandler
):
"""A DummyDTPHandler subclass supporting TLS/SSL."""
def
__init__
(
self
,
conn
,
baseclass
):
DummyDTPHandler
.
__init__
(
self
,
conn
,
baseclass
)
if
self
.
baseclass
.
secure_data_channel
:
self
.
secure_connection
()
class
DummyTLS_FTPHandler
(
SSLConnection
,
DummyFTPHandler
):
"""A DummyFTPHandler subclass supporting TLS/SSL."""
dtp_handler
=
DummyTLS_DTPHandler
def
__init__
(
self
,
conn
):
DummyFTPHandler
.
__init__
(
self
,
conn
)
self
.
secure_data_channel
=
False
self
.
_ccc
=
False
def
cmd_auth
(
self
,
line
):
"""Set up secure control channel."""
self
.
push
(
'234 AUTH TLS successful'
)
self
.
secure_connection
()
def
cmd_ccc
(
self
,
line
):
self
.
push
(
'220 Reverting back to clear-text'
)
self
.
_ccc
=
True
self
.
_do_ssl_shutdown
()
def
cmd_pbsz
(
self
,
line
):
"""Negotiate size of buffer for secure data transfer.
For TLS/SSL the only valid value for the parameter is '0'.
Any other value is accepted but ignored.
"""
self
.
push
(
'200 PBSZ=0 successful.'
)
def
cmd_prot
(
self
,
line
):
"""Setup un/secure data channel."""
arg
=
line
.
upper
()
if
arg
==
'C'
:
self
.
push
(
'200 Protection set to Clear'
)
self
.
secure_data_channel
=
False
elif
arg
==
'P'
:
self
.
push
(
'200 Protection set to Private'
)
self
.
secure_data_channel
=
True
else
:
self
.
push
(
"502 Unrecognized PROT type (use C or P)."
)
class
DummyTLS_FTPServer
(
DummyFTPServer
):
handler
=
DummyTLS_FTPHandler
class
TestFTPClass
(
TestCase
):
def
setUp
(
self
):
self
.
server
=
DummyFTPServer
((
HOST
,
0
))
self
.
server
.
start
()
self
.
client
=
ftplib
.
FTP
(
timeout
=
TIMEOUT
)
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
def
tearDown
(
self
):
self
.
client
.
close
()
self
.
server
.
stop
()
# Explicitly clear the attribute to prevent dangling thread
self
.
server
=
None
asyncore
.
close_all
(
ignore_all
=
True
)
def
check_data
(
self
,
received
,
expected
):
self
.
assertEqual
(
len
(
received
),
len
(
expected
))
self
.
assertEqual
(
received
,
expected
)
def
test_getwelcome
(
self
):
self
.
assertEqual
(
self
.
client
.
getwelcome
(),
'220 welcome'
)
def
test_sanitize
(
self
):
self
.
assertEqual
(
self
.
client
.
sanitize
(
'foo'
),
repr
(
'foo'
))
self
.
assertEqual
(
self
.
client
.
sanitize
(
'pass 12345'
),
repr
(
'pass *****'
))
self
.
assertEqual
(
self
.
client
.
sanitize
(
'PASS 12345'
),
repr
(
'PASS *****'
))
def
test_exceptions
(
self
):
self
.
assertRaises
(
ValueError
,
self
.
client
.
sendcmd
,
'echo 40
\
r
\
n
0'
)
self
.
assertRaises
(
ValueError
,
self
.
client
.
sendcmd
,
'echo 40
\
n
0'
)
self
.
assertRaises
(
ValueError
,
self
.
client
.
sendcmd
,
'echo 40
\
r
0'
)
self
.
assertRaises
(
ftplib
.
error_temp
,
self
.
client
.
sendcmd
,
'echo 400'
)
self
.
assertRaises
(
ftplib
.
error_temp
,
self
.
client
.
sendcmd
,
'echo 499'
)
self
.
assertRaises
(
ftplib
.
error_perm
,
self
.
client
.
sendcmd
,
'echo 500'
)
self
.
assertRaises
(
ftplib
.
error_perm
,
self
.
client
.
sendcmd
,
'echo 599'
)
self
.
assertRaises
(
ftplib
.
error_proto
,
self
.
client
.
sendcmd
,
'echo 999'
)
def
test_all_errors
(
self
):
exceptions
=
(
ftplib
.
error_reply
,
ftplib
.
error_temp
,
ftplib
.
error_perm
,
ftplib
.
error_proto
,
ftplib
.
Error
,
OSError
,
EOFError
)
for
x
in
exceptions
:
try
:
raise
x
(
'exception not included in all_errors set'
)
except
ftplib
.
all_errors
:
pass
def
test_set_pasv
(
self
):
# passive mode is supposed to be enabled by default
self
.
assertTrue
(
self
.
client
.
passiveserver
)
self
.
client
.
set_pasv
(
True
)
self
.
assertTrue
(
self
.
client
.
passiveserver
)
self
.
client
.
set_pasv
(
False
)
self
.
assertFalse
(
self
.
client
.
passiveserver
)
def
test_voidcmd
(
self
):
self
.
client
.
voidcmd
(
'echo 200'
)
self
.
client
.
voidcmd
(
'echo 299'
)
self
.
assertRaises
(
ftplib
.
error_reply
,
self
.
client
.
voidcmd
,
'echo 199'
)
self
.
assertRaises
(
ftplib
.
error_reply
,
self
.
client
.
voidcmd
,
'echo 300'
)
def
test_login
(
self
):
self
.
client
.
login
()
def
test_acct
(
self
):
self
.
client
.
acct
(
'passwd'
)
def
test_rename
(
self
):
self
.
client
.
rename
(
'a'
,
'b'
)
self
.
server
.
handler_instance
.
next_response
=
'200'
self
.
assertRaises
(
ftplib
.
error_reply
,
self
.
client
.
rename
,
'a'
,
'b'
)
def
test_delete
(
self
):
self
.
client
.
delete
(
'foo'
)
self
.
server
.
handler_instance
.
next_response
=
'199'
self
.
assertRaises
(
ftplib
.
error_reply
,
self
.
client
.
delete
,
'foo'
)
def
test_size
(
self
):
self
.
client
.
size
(
'foo'
)
def
test_mkd
(
self
):
dir
=
self
.
client
.
mkd
(
'/foo'
)
self
.
assertEqual
(
dir
,
'/foo'
)
def
test_rmd
(
self
):
self
.
client
.
rmd
(
'foo'
)
def
test_cwd
(
self
):
dir
=
self
.
client
.
cwd
(
'/foo'
)
self
.
assertEqual
(
dir
,
'250 cwd ok'
)
def
test_pwd
(
self
):
dir
=
self
.
client
.
pwd
()
self
.
assertEqual
(
dir
,
'pwd ok'
)
def
test_quit
(
self
):
self
.
assertEqual
(
self
.
client
.
quit
(),
'221 quit ok'
)
# Ensure the connection gets closed; sock attribute should be None
self
.
assertEqual
(
self
.
client
.
sock
,
None
)
def
test_abort
(
self
):
self
.
client
.
abort
()
def
test_retrbinary
(
self
):
def
callback
(
data
):
received
.
append
(
data
.
decode
(
'ascii'
))
received
=
[]
self
.
client
.
retrbinary
(
'retr'
,
callback
)
self
.
check_data
(
''
.
join
(
received
),
RETR_DATA
)
def
test_retrbinary_rest
(
self
):
def
callback
(
data
):
received
.
append
(
data
.
decode
(
'ascii'
))
for
rest
in
(
0
,
10
,
20
):
received
=
[]
self
.
client
.
retrbinary
(
'retr'
,
callback
,
rest
=
rest
)
self
.
check_data
(
''
.
join
(
received
),
RETR_DATA
[
rest
:])
def
test_retrlines
(
self
):
received
=
[]
self
.
client
.
retrlines
(
'retr'
,
received
.
append
)
self
.
check_data
(
''
.
join
(
received
),
RETR_DATA
.
replace
(
'
\
r
\
n
'
,
''
))
def
test_storbinary
(
self
):
f
=
io
.
BytesIO
(
RETR_DATA
.
encode
(
'ascii'
))
self
.
client
.
storbinary
(
'stor'
,
f
)
self
.
check_data
(
self
.
server
.
handler_instance
.
last_received_data
,
RETR_DATA
)
# test new callback arg
flag
=
[]
f
.
seek
(
0
)
self
.
client
.
storbinary
(
'stor'
,
f
,
callback
=
lambda
x
:
flag
.
append
(
None
))
self
.
assertTrue
(
flag
)
def
test_storbinary_rest
(
self
):
f
=
io
.
BytesIO
(
RETR_DATA
.
replace
(
'
\
r
\
n
'
,
'
\
n
'
).
encode
(
'ascii'
))
for
r
in
(
30
,
'30'
):
f
.
seek
(
0
)
self
.
client
.
storbinary
(
'stor'
,
f
,
rest
=
r
)
self
.
assertEqual
(
self
.
server
.
handler_instance
.
rest
,
str
(
r
))
def
test_storlines
(
self
):
f
=
io
.
BytesIO
(
RETR_DATA
.
replace
(
'
\
r
\
n
'
,
'
\
n
'
).
encode
(
'ascii'
))
self
.
client
.
storlines
(
'stor'
,
f
)
self
.
check_data
(
self
.
server
.
handler_instance
.
last_received_data
,
RETR_DATA
)
# test new callback arg
flag
=
[]
f
.
seek
(
0
)
self
.
client
.
storlines
(
'stor foo'
,
f
,
callback
=
lambda
x
:
flag
.
append
(
None
))
self
.
assertTrue
(
flag
)
f
=
io
.
StringIO
(
RETR_DATA
.
replace
(
'
\
r
\
n
'
,
'
\
n
'
))
# storlines() expects a binary file, not a text file
with
support
.
check_warnings
((
''
,
BytesWarning
),
quiet
=
True
):
self
.
assertRaises
(
TypeError
,
self
.
client
.
storlines
,
'stor foo'
,
f
)
def
test_nlst
(
self
):
self
.
client
.
nlst
()
self
.
assertEqual
(
self
.
client
.
nlst
(),
NLST_DATA
.
split
(
'
\
r
\
n
'
)[:
-
1
])
def
test_dir
(
self
):
l
=
[]
self
.
client
.
dir
(
lambda
x
:
l
.
append
(
x
))
self
.
assertEqual
(
''
.
join
(
l
),
LIST_DATA
.
replace
(
'
\
r
\
n
'
,
''
))
def
test_mlsd
(
self
):
list
(
self
.
client
.
mlsd
())
list
(
self
.
client
.
mlsd
(
path
=
'/'
))
list
(
self
.
client
.
mlsd
(
path
=
'/'
,
facts
=
[
'size'
,
'type'
]))
ls
=
list
(
self
.
client
.
mlsd
())
for
name
,
facts
in
ls
:
self
.
assertIsInstance
(
name
,
str
)
self
.
assertIsInstance
(
facts
,
dict
)
self
.
assertTrue
(
name
)
self
.
assertIn
(
'type'
,
facts
)
self
.
assertIn
(
'perm'
,
facts
)
self
.
assertIn
(
'unique'
,
facts
)
def
set_data
(
data
):
self
.
server
.
handler_instance
.
next_data
=
data
def
test_entry
(
line
,
type
=
None
,
perm
=
None
,
unique
=
None
,
name
=
None
):
type
=
'type'
if
type
is
None
else
type
perm
=
'perm'
if
perm
is
None
else
perm
unique
=
'unique'
if
unique
is
None
else
unique
name
=
'name'
if
name
is
None
else
name
set_data
(
line
)
_name
,
facts
=
next
(
self
.
client
.
mlsd
())
self
.
assertEqual
(
_name
,
name
)
self
.
assertEqual
(
facts
[
'type'
],
type
)
self
.
assertEqual
(
facts
[
'perm'
],
perm
)
self
.
assertEqual
(
facts
[
'unique'
],
unique
)
# plain
test_entry
(
'type=type;perm=perm;unique=unique; name
\
r
\
n
'
)
# "=" in fact value
test_entry
(
'type=ty=pe;perm=perm;unique=unique; name
\
r
\
n
'
,
type
=
"ty=pe"
)
test_entry
(
'type==type;perm=perm;unique=unique; name
\
r
\
n
'
,
type
=
"=type"
)
test_entry
(
'type=t=y=pe;perm=perm;unique=unique; name
\
r
\
n
'
,
type
=
"t=y=pe"
)
test_entry
(
'type=====;perm=perm;unique=unique; name
\
r
\
n
'
,
type
=
"===="
)
# spaces in name
test_entry
(
'type=type;perm=perm;unique=unique; na me
\
r
\
n
'
,
name
=
"na me"
)
test_entry
(
'type=type;perm=perm;unique=unique; name
\
r
\
n
'
,
name
=
"name "
)
test_entry
(
'type=type;perm=perm;unique=unique; name
\
r
\
n
'
,
name
=
" name"
)
test_entry
(
'type=type;perm=perm;unique=unique; n am e
\
r
\
n
'
,
name
=
"n am e"
)
# ";" in name
test_entry
(
'type=type;perm=perm;unique=unique; na;me
\
r
\
n
'
,
name
=
"na;me"
)
test_entry
(
'type=type;perm=perm;unique=unique; ;name
\
r
\
n
'
,
name
=
";name"
)
test_entry
(
'type=type;perm=perm;unique=unique; ;name;
\
r
\
n
'
,
name
=
";name;"
)
test_entry
(
'type=type;perm=perm;unique=unique; ;;;;
\
r
\
n
'
,
name
=
";;;;"
)
# case sensitiveness
set_data
(
'Type=type;TyPe=perm;UNIQUE=unique; name
\
r
\
n
'
)
_name
,
facts
=
next
(
self
.
client
.
mlsd
())
for
x
in
facts
:
self
.
assertTrue
(
x
.
islower
())
# no data (directory empty)
set_data
(
''
)
self
.
assertRaises
(
StopIteration
,
next
,
self
.
client
.
mlsd
())
set_data
(
''
)
for
x
in
self
.
client
.
mlsd
():
self
.
fail
(
"unexpected data %s"
%
x
)
def
test_makeport
(
self
):
with
self
.
client
.
makeport
():
# IPv4 is in use, just make sure send_eprt has not been used
self
.
assertEqual
(
self
.
server
.
handler_instance
.
last_received_cmd
,
'port'
)
def
test_makepasv
(
self
):
host
,
port
=
self
.
client
.
makepasv
()
conn
=
socket
.
create_connection
((
host
,
port
),
timeout
=
TIMEOUT
)
conn
.
close
()
# IPv4 is in use, just make sure send_epsv has not been used
self
.
assertEqual
(
self
.
server
.
handler_instance
.
last_received_cmd
,
'pasv'
)
def
test_with_statement
(
self
):
self
.
client
.
quit
()
def
is_client_connected
():
if
self
.
client
.
sock
is
None
:
return
False
try
:
self
.
client
.
sendcmd
(
'noop'
)
except
(
OSError
,
EOFError
):
return
False
return
True
# base test
with
ftplib
.
FTP
(
timeout
=
TIMEOUT
)
as
self
.
client
:
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
self
.
client
.
sendcmd
(
'noop'
)
self
.
assertTrue
(
is_client_connected
())
self
.
assertEqual
(
self
.
server
.
handler_instance
.
last_received_cmd
,
'quit'
)
self
.
assertFalse
(
is_client_connected
())
# QUIT sent inside the with block
with
ftplib
.
FTP
(
timeout
=
TIMEOUT
)
as
self
.
client
:
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
self
.
client
.
sendcmd
(
'noop'
)
self
.
client
.
quit
()
self
.
assertEqual
(
self
.
server
.
handler_instance
.
last_received_cmd
,
'quit'
)
self
.
assertFalse
(
is_client_connected
())
# force a wrong response code to be sent on QUIT: error_perm
# is expected and the connection is supposed to be closed
try
:
with
ftplib
.
FTP
(
timeout
=
TIMEOUT
)
as
self
.
client
:
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
self
.
client
.
sendcmd
(
'noop'
)
self
.
server
.
handler_instance
.
next_response
=
'550 error on quit'
except
ftplib
.
error_perm
as
err
:
self
.
assertEqual
(
str
(
err
),
'550 error on quit'
)
else
:
self
.
fail
(
'Exception not raised'
)
# needed to give the threaded server some time to set the attribute
# which otherwise would still be == 'noop'
time
.
sleep
(
0.1
)
self
.
assertEqual
(
self
.
server
.
handler_instance
.
last_received_cmd
,
'quit'
)
self
.
assertFalse
(
is_client_connected
())
def
test_source_address
(
self
):
self
.
client
.
quit
()
port
=
support
.
find_unused_port
()
try
:
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
,
source_address
=
(
HOST
,
port
))
self
.
assertEqual
(
self
.
client
.
sock
.
getsockname
()[
1
],
port
)
self
.
client
.
quit
()
except
OSError
as
e
:
if
e
.
errno
==
errno
.
EADDRINUSE
:
self
.
skipTest
(
"couldn't bind to port %d"
%
port
)
raise
def
test_source_address_passive_connection
(
self
):
port
=
support
.
find_unused_port
()
self
.
client
.
source_address
=
(
HOST
,
port
)
try
:
with
self
.
client
.
transfercmd
(
'list'
)
as
sock
:
self
.
assertEqual
(
sock
.
getsockname
()[
1
],
port
)
except
OSError
as
e
:
if
e
.
errno
==
errno
.
EADDRINUSE
:
self
.
skipTest
(
"couldn't bind to port %d"
%
port
)
raise
def
test_parse257
(
self
):
self
.
assertEqual
(
ftplib
.
parse257
(
'257 "/foo/bar"'
),
'/foo/bar'
)
self
.
assertEqual
(
ftplib
.
parse257
(
'257 "/foo/bar" created'
),
'/foo/bar'
)
self
.
assertEqual
(
ftplib
.
parse257
(
'257 ""'
),
''
)
self
.
assertEqual
(
ftplib
.
parse257
(
'257 "" created'
),
''
)
self
.
assertRaises
(
ftplib
.
error_reply
,
ftplib
.
parse257
,
'250 "/foo/bar"'
)
# The 257 response is supposed to include the directory
# name and in case it contains embedded double-quotes
# they must be doubled (see RFC-959, chapter 7, appendix 2).
self
.
assertEqual
(
ftplib
.
parse257
(
'257 "/foo/b""ar"'
),
'/foo/b"ar'
)
self
.
assertEqual
(
ftplib
.
parse257
(
'257 "/foo/b""ar" created'
),
'/foo/b"ar'
)
def
test_line_too_long
(
self
):
self
.
assertRaises
(
ftplib
.
Error
,
self
.
client
.
sendcmd
,
'x'
*
self
.
client
.
maxline
*
2
)
def
test_retrlines_too_long
(
self
):
self
.
client
.
sendcmd
(
'SETLONGRETR %d'
%
(
self
.
client
.
maxline
*
2
))
received
=
[]
self
.
assertRaises
(
ftplib
.
Error
,
self
.
client
.
retrlines
,
'retr'
,
received
.
append
)
def
test_storlines_too_long
(
self
):
f
=
io
.
BytesIO
(
b'x'
*
self
.
client
.
maxline
*
2
)
self
.
assertRaises
(
ftplib
.
Error
,
self
.
client
.
storlines
,
'stor'
,
f
)
@
skipUnless
(
support
.
IPV6_ENABLED
,
"IPv6 not enabled"
)
class
TestIPv6Environment
(
TestCase
):
def
setUp
(
self
):
self
.
server
=
DummyFTPServer
((
HOSTv6
,
0
),
af
=
socket
.
AF_INET6
)
self
.
server
.
start
()
self
.
client
=
ftplib
.
FTP
(
timeout
=
TIMEOUT
)
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
def
tearDown
(
self
):
self
.
client
.
close
()
self
.
server
.
stop
()
# Explicitly clear the attribute to prevent dangling thread
self
.
server
=
None
asyncore
.
close_all
(
ignore_all
=
True
)
def
test_af
(
self
):
self
.
assertEqual
(
self
.
client
.
af
,
socket
.
AF_INET6
)
def
test_makeport
(
self
):
with
self
.
client
.
makeport
():
self
.
assertEqual
(
self
.
server
.
handler_instance
.
last_received_cmd
,
'eprt'
)
def
test_makepasv
(
self
):
host
,
port
=
self
.
client
.
makepasv
()
conn
=
socket
.
create_connection
((
host
,
port
),
timeout
=
TIMEOUT
)
conn
.
close
()
self
.
assertEqual
(
self
.
server
.
handler_instance
.
last_received_cmd
,
'epsv'
)
def
test_transfer
(
self
):
def
retr
():
def
callback
(
data
):
received
.
append
(
data
.
decode
(
'ascii'
))
received
=
[]
self
.
client
.
retrbinary
(
'retr'
,
callback
)
self
.
assertEqual
(
len
(
''
.
join
(
received
)),
len
(
RETR_DATA
))
self
.
assertEqual
(
''
.
join
(
received
),
RETR_DATA
)
self
.
client
.
set_pasv
(
True
)
retr
()
self
.
client
.
set_pasv
(
False
)
retr
()
@
skipUnless
(
ssl
,
"SSL not available"
)
class
TestTLS_FTPClassMixin
(
TestFTPClass
):
"""Repeat TestFTPClass tests starting the TLS layer for both control
and data connections first.
"""
def
setUp
(
self
):
self
.
server
=
DummyTLS_FTPServer
((
HOST
,
0
))
self
.
server
.
start
()
self
.
client
=
ftplib
.
FTP_TLS
(
timeout
=
TIMEOUT
)
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
# enable TLS
self
.
client
.
auth
()
self
.
client
.
prot_p
()
@
skipUnless
(
ssl
,
"SSL not available"
)
class
TestTLS_FTPClass
(
TestCase
):
"""Specific TLS_FTP class tests."""
def
setUp
(
self
):
self
.
server
=
DummyTLS_FTPServer
((
HOST
,
0
))
self
.
server
.
start
()
self
.
client
=
ftplib
.
FTP_TLS
(
timeout
=
TIMEOUT
)
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
def
tearDown
(
self
):
self
.
client
.
close
()
self
.
server
.
stop
()
# Explicitly clear the attribute to prevent dangling thread
self
.
server
=
None
asyncore
.
close_all
(
ignore_all
=
True
)
def
test_control_connection
(
self
):
self
.
assertNotIsInstance
(
self
.
client
.
sock
,
ssl
.
SSLSocket
)
self
.
client
.
auth
()
self
.
assertIsInstance
(
self
.
client
.
sock
,
ssl
.
SSLSocket
)
def
test_data_connection
(
self
):
# clear text
with
self
.
client
.
transfercmd
(
'list'
)
as
sock
:
self
.
assertNotIsInstance
(
sock
,
ssl
.
SSLSocket
)
self
.
assertEqual
(
self
.
client
.
voidresp
(),
"226 transfer complete"
)
# secured, after PROT P
self
.
client
.
prot_p
()
with
self
.
client
.
transfercmd
(
'list'
)
as
sock
:
self
.
assertIsInstance
(
sock
,
ssl
.
SSLSocket
)
self
.
assertEqual
(
self
.
client
.
voidresp
(),
"226 transfer complete"
)
# PROT C is issued, the connection must be in cleartext again
self
.
client
.
prot_c
()
with
self
.
client
.
transfercmd
(
'list'
)
as
sock
:
self
.
assertNotIsInstance
(
sock
,
ssl
.
SSLSocket
)
self
.
assertEqual
(
self
.
client
.
voidresp
(),
"226 transfer complete"
)
def
test_login
(
self
):
# login() is supposed to implicitly secure the control connection
self
.
assertNotIsInstance
(
self
.
client
.
sock
,
ssl
.
SSLSocket
)
self
.
client
.
login
()
self
.
assertIsInstance
(
self
.
client
.
sock
,
ssl
.
SSLSocket
)
# make sure that AUTH TLS doesn't get issued again
self
.
client
.
login
()
def
test_auth_issued_twice
(
self
):
self
.
client
.
auth
()
self
.
assertRaises
(
ValueError
,
self
.
client
.
auth
)
def
test_auth_ssl
(
self
):
try
:
self
.
client
.
ssl_version
=
ssl
.
PROTOCOL_SSLv23
self
.
client
.
auth
()
self
.
assertRaises
(
ValueError
,
self
.
client
.
auth
)
finally
:
self
.
client
.
ssl_version
=
ssl
.
PROTOCOL_TLSv1
def
test_context
(
self
):
self
.
client
.
quit
()
ctx
=
ssl
.
SSLContext
(
ssl
.
PROTOCOL_TLSv1
)
self
.
assertRaises
(
ValueError
,
ftplib
.
FTP_TLS
,
keyfile
=
CERTFILE
,
context
=
ctx
)
self
.
assertRaises
(
ValueError
,
ftplib
.
FTP_TLS
,
certfile
=
CERTFILE
,
context
=
ctx
)
self
.
assertRaises
(
ValueError
,
ftplib
.
FTP_TLS
,
certfile
=
CERTFILE
,
keyfile
=
CERTFILE
,
context
=
ctx
)
self
.
client
=
ftplib
.
FTP_TLS
(
context
=
ctx
,
timeout
=
TIMEOUT
)
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
self
.
assertNotIsInstance
(
self
.
client
.
sock
,
ssl
.
SSLSocket
)
self
.
client
.
auth
()
self
.
assertIs
(
self
.
client
.
sock
.
context
,
ctx
)
self
.
assertIsInstance
(
self
.
client
.
sock
,
ssl
.
SSLSocket
)
self
.
client
.
prot_p
()
with
self
.
client
.
transfercmd
(
'list'
)
as
sock
:
self
.
assertIs
(
sock
.
context
,
ctx
)
self
.
assertIsInstance
(
sock
,
ssl
.
SSLSocket
)
def
test_ccc
(
self
):
self
.
assertRaises
(
ValueError
,
self
.
client
.
ccc
)
self
.
client
.
login
(
secure
=
True
)
self
.
assertIsInstance
(
self
.
client
.
sock
,
ssl
.
SSLSocket
)
self
.
client
.
ccc
()
self
.
assertRaises
(
ValueError
,
self
.
client
.
sock
.
unwrap
)
def
test_check_hostname
(
self
):
self
.
client
.
quit
()
ctx
=
ssl
.
SSLContext
(
ssl
.
PROTOCOL_TLSv1
)
ctx
.
verify_mode
=
ssl
.
CERT_REQUIRED
ctx
.
check_hostname
=
True
ctx
.
load_verify_locations
(
CAFILE
)
self
.
client
=
ftplib
.
FTP_TLS
(
context
=
ctx
,
timeout
=
TIMEOUT
)
# 127.0.0.1 doesn't match SAN
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
with
self
.
assertRaises
(
ssl
.
CertificateError
):
self
.
client
.
auth
()
# exception quits connection
self
.
client
.
connect
(
self
.
server
.
host
,
self
.
server
.
port
)
self
.
client
.
prot_p
()
with
self
.
assertRaises
(
ssl
.
CertificateError
):
with
self
.
client
.
transfercmd
(
"list"
)
as
sock
:
pass
self
.
client
.
quit
()
self
.
client
.
connect
(
"localhost"
,
self
.
server
.
port
)
self
.
client
.
auth
()
self
.
client
.
quit
()
self
.
client
.
connect
(
"localhost"
,
self
.
server
.
port
)
self
.
client
.
prot_p
()
with
self
.
client
.
transfercmd
(
"list"
)
as
sock
:
pass
class
TestTimeouts
(
TestCase
):
def
setUp
(
self
):
self
.
evt
=
threading
.
Event
()
self
.
sock
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
sock
.
settimeout
(
20
)
self
.
port
=
support
.
bind_port
(
self
.
sock
)
self
.
server_thread
=
threading
.
Thread
(
target
=
self
.
server
)
self
.
server_thread
.
start
()
# Wait for the server to be ready.
self
.
evt
.
wait
()
self
.
evt
.
clear
()
self
.
old_port
=
ftplib
.
FTP
.
port
ftplib
.
FTP
.
port
=
self
.
port
def
tearDown
(
self
):
ftplib
.
FTP
.
port
=
self
.
old_port
self
.
server_thread
.
join
()
# Explicitly clear the attribute to prevent dangling thread
self
.
server_thread
=
None
def
server
(
self
):
# This method sets the evt 3 times:
# 1) when the connection is ready to be accepted.
# 2) when it is safe for the caller to close the connection
# 3) when we have closed the socket
self
.
sock
.
listen
()
# (1) Signal the caller that we are ready to accept the connection.
self
.
evt
.
set
()
try
:
conn
,
addr
=
self
.
sock
.
accept
()
except
socket
.
timeout
:
pass
else
:
conn
.
sendall
(
b"1 Hola mundo
\
n
"
)
conn
.
shutdown
(
socket
.
SHUT_WR
)
# (2) Signal the caller that it is safe to close the socket.
self
.
evt
.
set
()
conn
.
close
()
finally
:
self
.
sock
.
close
()
def
testTimeoutDefault
(
self
):
# default -- use global socket timeout
self
.
assertIsNone
(
socket
.
getdefaulttimeout
())
socket
.
setdefaulttimeout
(
30
)
try
:
ftp
=
ftplib
.
FTP
(
HOST
)
finally
:
socket
.
setdefaulttimeout
(
None
)
self
.
assertEqual
(
ftp
.
sock
.
gettimeout
(),
30
)
self
.
evt
.
wait
()
ftp
.
close
()
def
testTimeoutNone
(
self
):
# no timeout -- do not use global socket timeout
self
.
assertIsNone
(
socket
.
getdefaulttimeout
())
socket
.
setdefaulttimeout
(
30
)
try
:
ftp
=
ftplib
.
FTP
(
HOST
,
timeout
=
None
)
finally
:
socket
.
setdefaulttimeout
(
None
)
self
.
assertIsNone
(
ftp
.
sock
.
gettimeout
())
self
.
evt
.
wait
()
ftp
.
close
()
def
testTimeoutValue
(
self
):
# a value
ftp
=
ftplib
.
FTP
(
HOST
,
timeout
=
30
)
self
.
assertEqual
(
ftp
.
sock
.
gettimeout
(),
30
)
self
.
evt
.
wait
()
ftp
.
close
()
def
testTimeoutConnect
(
self
):
ftp
=
ftplib
.
FTP
()
ftp
.
connect
(
HOST
,
timeout
=
30
)
self
.
assertEqual
(
ftp
.
sock
.
gettimeout
(),
30
)
self
.
evt
.
wait
()
ftp
.
close
()
def
testTimeoutDifferentOrder
(
self
):
ftp
=
ftplib
.
FTP
(
timeout
=
30
)
ftp
.
connect
(
HOST
)
self
.
assertEqual
(
ftp
.
sock
.
gettimeout
(),
30
)
self
.
evt
.
wait
()
ftp
.
close
()
def
testTimeoutDirectAccess
(
self
):
ftp
=
ftplib
.
FTP
()
ftp
.
timeout
=
30
ftp
.
connect
(
HOST
)
self
.
assertEqual
(
ftp
.
sock
.
gettimeout
(),
30
)
self
.
evt
.
wait
()
ftp
.
close
()
class
MiscTestCase
(
TestCase
):
def
test__all__
(
self
):
blacklist
=
{
'MSG_OOB'
,
'FTP_PORT'
,
'MAXLINE'
,
'CRLF'
,
'B_CRLF'
,
'Error'
,
'parse150'
,
'parse227'
,
'parse229'
,
'parse257'
,
'print_line'
,
'ftpcp'
,
'test'
}
support
.
check__all__
(
self
,
ftplib
,
blacklist
=
blacklist
)
def
test_main
():
tests
=
[
TestFTPClass
,
TestTimeouts
,
TestIPv6Environment
,
TestTLS_FTPClassMixin
,
TestTLS_FTPClass
,
MiscTestCase
]
thread_info
=
support
.
threading_setup
()
try
:
support
.
run_unittest
(
*
tests
)
finally
:
support
.
threading_cleanup
(
*
thread_info
)
if
__name__
==
'__main__'
:
test_main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment