Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
ff46d6e8
Commit
ff46d6e8
authored
May 10, 2010
by
Giampaolo Rodolà
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Issue #8490: adds a more solid test suite for asyncore
parent
264552a8
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
278 additions
and
1 deletion
+278
-1
Lib/test/test_asyncore.py
Lib/test/test_asyncore.py
+275
-1
Misc/NEWS
Misc/NEWS
+3
-0
No files found.
Lib/test/test_asyncore.py
View file @
ff46d6e8
...
...
@@ -418,11 +418,285 @@ if hasattr(asyncore, 'file_wrapper'):
self
.
assertEqual
(
file
(
TESTFN
).
read
(),
self
.
d
+
d1
+
d2
)
class
BaseTestHandler
(
asyncore
.
dispatcher
):
def
__init__
(
self
,
sock
=
None
):
asyncore
.
dispatcher
.
__init__
(
self
,
sock
)
self
.
flag
=
False
def
handle_accept
(
self
):
raise
Exception
(
"handle_accept not supposed to be called"
)
def
handle_connect
(
self
):
raise
Exception
(
"handle_connect not supposed to be called"
)
def
handle_expt
(
self
):
raise
Exception
(
"handle_expt not supposed to be called"
)
def
handle_close
(
self
):
raise
Exception
(
"handle_close not supposed to be called"
)
def
handle_error
(
self
):
raise
class
TCPServer
(
asyncore
.
dispatcher
):
"""A server which listens on an address and dispatches the
connection to a handler.
"""
def
__init__
(
self
,
handler
=
BaseTestHandler
,
host
=
HOST
,
port
=
0
):
asyncore
.
dispatcher
.
__init__
(
self
)
self
.
create_socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
set_reuse_addr
()
self
.
bind
((
host
,
port
))
self
.
listen
(
5
)
self
.
handler
=
handler
@
property
def
address
(
self
):
return
self
.
socket
.
getsockname
()[:
2
]
def
handle_accept
(
self
):
sock
,
addr
=
self
.
accept
()
self
.
handler
(
sock
)
def
handle_error
(
self
):
raise
class
BaseClient
(
BaseTestHandler
):
def
__init__
(
self
,
address
):
BaseTestHandler
.
__init__
(
self
)
self
.
create_socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
connect
(
address
)
def
handle_connect
(
self
):
pass
class
BaseTestAPI
(
unittest
.
TestCase
):
def
tearDown
(
self
):
asyncore
.
close_all
()
def
loop_waiting_for_flag
(
self
,
instance
,
timeout
=
5
):
timeout
=
float
(
timeout
)
/
100
count
=
100
while
asyncore
.
socket_map
and
count
>
0
:
asyncore
.
loop
(
timeout
=
0.01
,
count
=
1
,
use_poll
=
self
.
use_poll
)
if
instance
.
flag
:
return
count
-=
1
time
.
sleep
(
timeout
)
self
.
fail
(
"flag not set"
)
def
test_handle_connect
(
self
):
# make sure handle_connect is called on connect()
class
TestClient
(
BaseClient
):
def
handle_connect
(
self
):
self
.
flag
=
True
server
=
TCPServer
()
client
=
TestClient
(
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_handle_accept
(
self
):
# make sure handle_accept() is called when a client connects
class
TestListener
(
BaseTestHandler
):
def
__init__
(
self
):
BaseTestHandler
.
__init__
(
self
)
self
.
create_socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
bind
((
HOST
,
0
))
self
.
listen
(
5
)
self
.
address
=
self
.
socket
.
getsockname
()[:
2
]
def
handle_accept
(
self
):
self
.
flag
=
True
server
=
TestListener
()
client
=
BaseClient
(
server
.
address
)
self
.
loop_waiting_for_flag
(
server
)
def
test_handle_read
(
self
):
# make sure handle_read is called on data received
class
TestClient
(
BaseClient
):
def
handle_read
(
self
):
self
.
flag
=
True
class
TestHandler
(
BaseTestHandler
):
def
__init__
(
self
,
conn
):
BaseTestHandler
.
__init__
(
self
,
conn
)
self
.
send
(
'x'
*
1024
)
server
=
TCPServer
(
TestHandler
)
client
=
TestClient
(
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_handle_write
(
self
):
# make sure handle_write is called
class
TestClient
(
BaseClient
):
def
handle_write
(
self
):
self
.
flag
=
True
server
=
TCPServer
()
client
=
TestClient
(
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_handle_close
(
self
):
# make sure handle_close is called when the other end closes
# the connection
class
TestClient
(
BaseClient
):
def
handle_read
(
self
):
# in order to make handle_close be called we are supposed
# to make at least one recv() call
self
.
recv
(
1024
)
def
handle_close
(
self
):
self
.
flag
=
True
self
.
close
()
class
TestHandler
(
BaseTestHandler
):
def
__init__
(
self
,
conn
):
BaseTestHandler
.
__init__
(
self
,
conn
)
self
.
close
()
server
=
TCPServer
(
TestHandler
)
client
=
TestClient
(
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
@
unittest
.
skipIf
(
sys
.
platform
.
startswith
(
"sunos"
),
"OOB support is broken on Solaris"
)
def
test_handle_expt
(
self
):
# Make sure handle_expt is called on OOB data received.
# Note: this might fail on some platforms as OOB data is
# tenuously supported and rarely used.
class
TestClient
(
BaseClient
):
def
handle_expt
(
self
):
self
.
flag
=
True
class
TestHandler
(
BaseTestHandler
):
def
__init__
(
self
,
conn
):
BaseTestHandler
.
__init__
(
self
,
conn
)
self
.
socket
.
send
(
chr
(
244
),
socket
.
MSG_OOB
)
server
=
TCPServer
(
TestHandler
)
client
=
TestClient
(
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_handle_error
(
self
):
class
TestClient
(
BaseClient
):
def
handle_write
(
self
):
1.0
/
0
def
handle_error
(
self
):
self
.
flag
=
True
try
:
raise
except
ZeroDivisionError
:
pass
else
:
raise
Exception
(
"exception not raised"
)
server
=
TCPServer
()
client
=
TestClient
(
server
.
address
)
self
.
loop_waiting_for_flag
(
client
)
def
test_connection_attributes
(
self
):
server
=
TCPServer
()
client
=
BaseClient
(
server
.
address
)
# we start disconnected
self
.
assertFalse
(
server
.
connected
)
self
.
assertTrue
(
server
.
accepting
)
# XXX - Solaris seems to connect() immediately even without
# starting the poller. This is something which should be
# fixed as handle_connect() gets called immediately even if
# no connection actually took place (see issue #8490).
if
not
sys
.
platform
.
startswith
(
"sunos"
):
self
.
assertFalse
(
client
.
connected
)
self
.
assertFalse
(
client
.
accepting
)
# execute some loops so that client connects to server
asyncore
.
loop
(
timeout
=
0.01
,
use_poll
=
self
.
use_poll
,
count
=
100
)
self
.
assertFalse
(
server
.
connected
)
self
.
assertTrue
(
server
.
accepting
)
self
.
assertTrue
(
client
.
connected
)
self
.
assertFalse
(
client
.
accepting
)
# disconnect the client
client
.
close
()
self
.
assertFalse
(
server
.
connected
)
self
.
assertTrue
(
server
.
accepting
)
self
.
assertFalse
(
client
.
connected
)
self
.
assertFalse
(
client
.
accepting
)
# stop serving
server
.
close
()
self
.
assertFalse
(
server
.
connected
)
self
.
assertFalse
(
server
.
accepting
)
def
test_create_socket
(
self
):
s
=
asyncore
.
dispatcher
()
s
.
create_socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
assertEqual
(
s
.
socket
.
family
,
socket
.
AF_INET
)
self
.
assertEqual
(
s
.
socket
.
type
,
socket
.
SOCK_STREAM
)
def
test_bind
(
self
):
s1
=
asyncore
.
dispatcher
()
s1
.
create_socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
s1
.
bind
((
HOST
,
0
))
s1
.
listen
(
5
)
port
=
s1
.
socket
.
getsockname
()[
1
]
s2
=
asyncore
.
dispatcher
()
s2
.
create_socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
# EADDRINUSE indicates the socket was correctly bound
self
.
assertRaises
(
socket
.
error
,
s2
.
bind
,
(
HOST
,
port
))
def
test_set_reuse_addr
(
self
):
sock
=
socket
.
socket
()
try
:
sock
.
setsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
,
1
)
except
socket
.
error
:
unittest
.
skip
(
"SO_REUSEADDR not supported on this platform"
)
else
:
# if SO_REUSEADDR succeeded for sock we expect asyncore
# to do the same
s
=
asyncore
.
dispatcher
(
socket
.
socket
())
self
.
assertFalse
(
s
.
socket
.
getsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
))
s
.
create_socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
s
.
set_reuse_addr
()
self
.
assertTrue
(
s
.
socket
.
getsockopt
(
socket
.
SOL_SOCKET
,
socket
.
SO_REUSEADDR
))
finally
:
sock
.
close
()
class
TestAPI_UseSelect
(
BaseTestAPI
):
use_poll
=
False
class
TestAPI_UsePoll
(
BaseTestAPI
):
use_poll
=
True
def
test_main
():
tests
=
[
HelperFunctionTests
,
DispatcherTests
,
DispatcherWithSendTests
,
DispatcherWithSendTests_UsePoll
]
DispatcherWithSendTests_UsePoll
,
TestAPI_UseSelect
]
if
hasattr
(
asyncore
,
'file_wrapper'
):
tests
.
append
(
FileWrapperTest
)
if
hasattr
(
select
,
'poll'
):
tests
.
append
(
TestAPI_UsePoll
)
run_unittest
(
*
tests
)
...
...
Misc/NEWS
View file @
ff46d6e8
...
...
@@ -211,6 +211,9 @@ Extension Modules
Tests
-----
- Issue #8490: asyncore now has a more solid test suite which actually tests
its API.
- Issue #8576: Remove use of find_unused_port() in test_smtplib and
test_multiprocessing. Patch by Paul Moore.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment