Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
59d77e8a
Commit
59d77e8a
authored
Jul 12, 2014
by
Victor Stinner
Browse files
Options
Browse Files
Download
Plain Diff
Merge with 3.4
parents
4fee7aab
e912e652
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
219 additions
and
29 deletions
+219
-29
Lib/asyncio/base_events.py
Lib/asyncio/base_events.py
+52
-4
Lib/asyncio/proactor_events.py
Lib/asyncio/proactor_events.py
+33
-3
Lib/asyncio/selector_events.py
Lib/asyncio/selector_events.py
+76
-13
Lib/asyncio/unix_events.py
Lib/asyncio/unix_events.py
+36
-0
Lib/asyncio/windows_events.py
Lib/asyncio/windows_events.py
+12
-0
Lib/test/test_asyncio/test_proactor_events.py
Lib/test/test_asyncio/test_proactor_events.py
+4
-3
Lib/test/test_asyncio/test_selector_events.py
Lib/test/test_asyncio/test_selector_events.py
+6
-6
No files found.
Lib/asyncio/base_events.py
View file @
59d77e8a
...
@@ -94,6 +94,9 @@ class Server(events.AbstractServer):
...
@@ -94,6 +94,9 @@ class Server(events.AbstractServer):
self
.
_active_count
=
0
self
.
_active_count
=
0
self
.
_waiters
=
[]
self
.
_waiters
=
[]
def
__repr__
(
self
):
return
'<%s sockets=%r>'
%
(
self
.
__class__
.
__name__
,
self
.
sockets
)
def
_attach
(
self
):
def
_attach
(
self
):
assert
self
.
sockets
is
not
None
assert
self
.
sockets
is
not
None
self
.
_active_count
+=
1
self
.
_active_count
+=
1
...
@@ -110,8 +113,6 @@ class Server(events.AbstractServer):
...
@@ -110,8 +113,6 @@ class Server(events.AbstractServer):
return
return
self
.
sockets
=
None
self
.
sockets
=
None
for
sock
in
sockets
:
for
sock
in
sockets
:
# closing sockets will call asynchronously the _detach() method
# which calls _wakeup() for the last socket
self
.
_loop
.
_stop_serving
(
sock
)
self
.
_loop
.
_stop_serving
(
sock
)
if
self
.
_active_count
==
0
:
if
self
.
_active_count
==
0
:
self
.
_wakeup
()
self
.
_wakeup
()
...
@@ -276,6 +277,8 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -276,6 +277,8 @@ class BaseEventLoop(events.AbstractEventLoop):
raise
RuntimeError
(
"cannot close a running event loop"
)
raise
RuntimeError
(
"cannot close a running event loop"
)
if
self
.
_closed
:
if
self
.
_closed
:
return
return
if
self
.
_debug
:
logger
.
debug
(
"Close %r"
,
self
)
self
.
_closed
=
True
self
.
_closed
=
True
self
.
_ready
.
clear
()
self
.
_ready
.
clear
()
self
.
_scheduled
.
clear
()
self
.
_scheduled
.
clear
()
...
@@ -402,10 +405,39 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -402,10 +405,39 @@ class BaseEventLoop(events.AbstractEventLoop):
def
set_default_executor
(
self
,
executor
):
def
set_default_executor
(
self
,
executor
):
self
.
_default_executor
=
executor
self
.
_default_executor
=
executor
def
_getaddrinfo_debug
(
self
,
host
,
port
,
family
,
type
,
proto
,
flags
):
msg
=
[
"%s:%r"
%
(
host
,
port
)]
if
family
:
msg
.
append
(
'family=%r'
%
family
)
if
type
:
msg
.
append
(
'type=%r'
%
type
)
if
proto
:
msg
.
append
(
'proto=%r'
%
proto
)
if
flags
:
msg
.
append
(
'flags=%r'
%
flags
)
msg
=
', '
.
join
(
msg
)
logger
.
debug
(
'Get addresss info %s'
,
msg
)
t0
=
self
.
time
()
addrinfo
=
socket
.
getaddrinfo
(
host
,
port
,
family
,
type
,
proto
,
flags
)
dt
=
self
.
time
()
-
t0
msg
=
(
'Getting addresss info %s took %.3f ms: %r'
%
(
msg
,
dt
*
1e3
,
addrinfo
))
if
dt
>=
self
.
slow_callback_duration
:
logger
.
info
(
msg
)
else
:
logger
.
debug
(
msg
)
return
addrinfo
def
getaddrinfo
(
self
,
host
,
port
,
*
,
def
getaddrinfo
(
self
,
host
,
port
,
*
,
family
=
0
,
type
=
0
,
proto
=
0
,
flags
=
0
):
family
=
0
,
type
=
0
,
proto
=
0
,
flags
=
0
):
return
self
.
run_in_executor
(
None
,
socket
.
getaddrinfo
,
if
self
.
_debug
:
host
,
port
,
family
,
type
,
proto
,
flags
)
return
self
.
run_in_executor
(
None
,
self
.
_getaddrinfo_debug
,
host
,
port
,
family
,
type
,
proto
,
flags
)
else
:
return
self
.
run_in_executor
(
None
,
socket
.
getaddrinfo
,
host
,
port
,
family
,
type
,
proto
,
flags
)
def
getnameinfo
(
self
,
sockaddr
,
flags
=
0
):
def
getnameinfo
(
self
,
sockaddr
,
flags
=
0
):
return
self
.
run_in_executor
(
None
,
socket
.
getnameinfo
,
sockaddr
,
flags
)
return
self
.
run_in_executor
(
None
,
socket
.
getnameinfo
,
sockaddr
,
flags
)
...
@@ -492,6 +524,8 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -492,6 +524,8 @@ class BaseEventLoop(events.AbstractEventLoop):
sock
.
close
()
sock
.
close
()
sock
=
None
sock
=
None
continue
continue
if
self
.
_debug
:
logger
.
debug
(
"connect %r to %r"
,
sock
,
address
)
yield
from
self
.
sock_connect
(
sock
,
address
)
yield
from
self
.
sock_connect
(
sock
,
address
)
except
OSError
as
exc
:
except
OSError
as
exc
:
if
sock
is
not
None
:
if
sock
is
not
None
:
...
@@ -524,6 +558,9 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -524,6 +558,9 @@ class BaseEventLoop(events.AbstractEventLoop):
transport
,
protocol
=
yield
from
self
.
_create_connection_transport
(
transport
,
protocol
=
yield
from
self
.
_create_connection_transport
(
sock
,
protocol_factory
,
ssl
,
server_hostname
)
sock
,
protocol_factory
,
ssl
,
server_hostname
)
if
self
.
_debug
:
logger
.
debug
(
"connected to %s:%r: (%r, %r)"
,
host
,
port
,
transport
,
protocol
)
return
transport
,
protocol
return
transport
,
protocol
@
coroutine
@
coroutine
...
@@ -614,6 +651,15 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -614,6 +651,15 @@ class BaseEventLoop(events.AbstractEventLoop):
waiter
=
futures
.
Future
(
loop
=
self
)
waiter
=
futures
.
Future
(
loop
=
self
)
transport
=
self
.
_make_datagram_transport
(
sock
,
protocol
,
r_addr
,
transport
=
self
.
_make_datagram_transport
(
sock
,
protocol
,
r_addr
,
waiter
)
waiter
)
if
self
.
_debug
:
if
local_addr
:
logger
.
info
(
"Datagram endpoint local_addr=%r remote_addr=%r "
"created: (%r, %r)"
,
local_addr
,
remote_addr
,
transport
,
protocol
)
else
:
logger
.
debug
(
"Datagram endpoint remote_addr=%r created: "
"(%r, %r)"
,
remote_addr
,
transport
,
protocol
)
yield
from
waiter
yield
from
waiter
return
transport
,
protocol
return
transport
,
protocol
...
@@ -694,6 +740,8 @@ class BaseEventLoop(events.AbstractEventLoop):
...
@@ -694,6 +740,8 @@ class BaseEventLoop(events.AbstractEventLoop):
sock
.
listen
(
backlog
)
sock
.
listen
(
backlog
)
sock
.
setblocking
(
False
)
sock
.
setblocking
(
False
)
self
.
_start_serving
(
protocol_factory
,
sock
,
ssl
,
server
)
self
.
_start_serving
(
protocol_factory
,
sock
,
ssl
,
server
)
if
self
.
_debug
:
logger
.
info
(
"%r is serving"
,
server
)
return
server
return
server
@
coroutine
@
coroutine
...
...
Lib/asyncio/proactor_events.py
View file @
59d77e8a
...
@@ -41,6 +41,23 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
...
@@ -41,6 +41,23 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
# wait until protocol.connection_made() has been called
# wait until protocol.connection_made() has been called
self
.
_loop
.
call_soon
(
waiter
.
_set_result_unless_cancelled
,
None
)
self
.
_loop
.
call_soon
(
waiter
.
_set_result_unless_cancelled
,
None
)
def
__repr__
(
self
):
info
=
[
self
.
__class__
.
__name__
,
'fd=%s'
%
self
.
_sock
.
fileno
()]
if
self
.
_read_fut
is
not
None
:
ov
=
"pending"
if
self
.
_read_fut
.
ov
.
pending
else
"completed"
info
.
append
(
'read=%s'
%
ov
)
if
self
.
_write_fut
is
not
None
:
if
self
.
_write_fut
.
ov
.
pending
:
info
.
append
(
"write=pending=%s"
%
self
.
_pending_write
)
else
:
info
.
append
(
"write=completed"
)
if
self
.
_buffer
:
bufsize
=
len
(
self
.
_buffer
)
info
.
append
(
'write_bufsize=%s'
%
bufsize
)
if
self
.
_eof_written
:
info
.
append
(
'EOF written'
)
return
'<%s>'
%
' '
.
join
(
info
)
def
_set_extra
(
self
,
sock
):
def
_set_extra
(
self
,
sock
):
self
.
_extra
[
'pipe'
]
=
sock
self
.
_extra
[
'pipe'
]
=
sock
...
@@ -55,7 +72,10 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
...
@@ -55,7 +72,10 @@ class _ProactorBasePipeTransport(transports._FlowControlMixin,
self
.
_read_fut
.
cancel
()
self
.
_read_fut
.
cancel
()
def
_fatal_error
(
self
,
exc
,
message
=
'Fatal error on pipe transport'
):
def
_fatal_error
(
self
,
exc
,
message
=
'Fatal error on pipe transport'
):
if
not
isinstance
(
exc
,
(
BrokenPipeError
,
ConnectionResetError
)):
if
isinstance
(
exc
,
(
BrokenPipeError
,
ConnectionResetError
)):
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r: %s"
,
self
,
message
,
exc_info
=
True
)
else
:
self
.
_loop
.
call_exception_handler
({
self
.
_loop
.
call_exception_handler
({
'message'
:
message
,
'message'
:
message
,
'exception'
:
exc
,
'exception'
:
exc
,
...
@@ -108,7 +128,6 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
...
@@ -108,7 +128,6 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
def
__init__
(
self
,
loop
,
sock
,
protocol
,
waiter
=
None
,
def
__init__
(
self
,
loop
,
sock
,
protocol
,
waiter
=
None
,
extra
=
None
,
server
=
None
):
extra
=
None
,
server
=
None
):
super
().
__init__
(
loop
,
sock
,
protocol
,
waiter
,
extra
,
server
)
super
().
__init__
(
loop
,
sock
,
protocol
,
waiter
,
extra
,
server
)
self
.
_read_fut
=
None
self
.
_paused
=
False
self
.
_paused
=
False
self
.
_loop
.
call_soon
(
self
.
_loop_reading
)
self
.
_loop
.
call_soon
(
self
.
_loop_reading
)
...
@@ -118,6 +137,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
...
@@ -118,6 +137,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if
self
.
_paused
:
if
self
.
_paused
:
raise
RuntimeError
(
'Already paused'
)
raise
RuntimeError
(
'Already paused'
)
self
.
_paused
=
True
self
.
_paused
=
True
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r pauses reading"
,
self
)
def
resume_reading
(
self
):
def
resume_reading
(
self
):
if
not
self
.
_paused
:
if
not
self
.
_paused
:
...
@@ -126,6 +147,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
...
@@ -126,6 +147,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if
self
.
_closing
:
if
self
.
_closing
:
return
return
self
.
_loop
.
call_soon
(
self
.
_loop_reading
,
self
.
_read_fut
)
self
.
_loop
.
call_soon
(
self
.
_loop_reading
,
self
.
_read_fut
)
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r resumes reading"
,
self
)
def
_loop_reading
(
self
,
fut
=
None
):
def
_loop_reading
(
self
,
fut
=
None
):
if
self
.
_paused
:
if
self
.
_paused
:
...
@@ -166,6 +189,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
...
@@ -166,6 +189,8 @@ class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
if
data
:
if
data
:
self
.
_protocol
.
data_received
(
data
)
self
.
_protocol
.
data_received
(
data
)
elif
data
is
not
None
:
elif
data
is
not
None
:
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r received EOF"
,
self
)
keep_open
=
self
.
_protocol
.
eof_received
()
keep_open
=
self
.
_protocol
.
eof_received
()
if
not
keep_open
:
if
not
keep_open
:
self
.
close
()
self
.
close
()
...
@@ -401,7 +426,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
...
@@ -401,7 +426,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self
.
_ssock
.
setblocking
(
False
)
self
.
_ssock
.
setblocking
(
False
)
self
.
_csock
.
setblocking
(
False
)
self
.
_csock
.
setblocking
(
False
)
self
.
_internal_fds
+=
1
self
.
_internal_fds
+=
1
self
.
call_soon
(
self
.
_loop_self_reading
)
# don't check the current loop because _make_self_pipe() is called
# from the event loop constructor
self
.
_call_soon
(
self
.
_loop_self_reading
,
(),
check_loop
=
False
)
def
_loop_self_reading
(
self
,
f
=
None
):
def
_loop_self_reading
(
self
,
f
=
None
):
try
:
try
:
...
@@ -426,6 +453,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
...
@@ -426,6 +453,9 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
try
:
try
:
if
f
is
not
None
:
if
f
is
not
None
:
conn
,
addr
=
f
.
result
()
conn
,
addr
=
f
.
result
()
if
self
.
_debug
:
logger
.
debug
(
"%r got a new connection from %r: %r"
,
server
,
addr
,
conn
)
protocol
=
protocol_factory
()
protocol
=
protocol_factory
()
self
.
_make_socket_transport
(
self
.
_make_socket_transport
(
conn
,
protocol
,
conn
,
protocol
,
...
...
Lib/asyncio/selector_events.py
View file @
59d77e8a
...
@@ -23,6 +23,17 @@ from . import transports
...
@@ -23,6 +23,17 @@ from . import transports
from
.log
import
logger
from
.log
import
logger
def
_test_selector_event
(
selector
,
fd
,
event
):
# Test if the selector is monitoring 'event' events
# for the file descriptor 'fd'.
try
:
key
=
selector
.
get_key
(
fd
)
except
KeyError
:
return
False
else
:
return
bool
(
key
.
events
&
event
)
class
BaseSelectorEventLoop
(
base_events
.
BaseEventLoop
):
class
BaseSelectorEventLoop
(
base_events
.
BaseEventLoop
):
"""Selector event loop.
"""Selector event loop.
...
@@ -116,6 +127,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
...
@@ -116,6 +127,9 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
sslcontext
=
None
,
server
=
None
):
sslcontext
=
None
,
server
=
None
):
try
:
try
:
conn
,
addr
=
sock
.
accept
()
conn
,
addr
=
sock
.
accept
()
if
self
.
_debug
:
logger
.
debug
(
"%r got a new connection from %r: %r"
,
server
,
addr
,
conn
)
conn
.
setblocking
(
False
)
conn
.
setblocking
(
False
)
except
(
BlockingIOError
,
InterruptedError
,
ConnectionAbortedError
):
except
(
BlockingIOError
,
InterruptedError
,
ConnectionAbortedError
):
pass
# False alarm.
pass
# False alarm.
...
@@ -419,6 +433,26 @@ class _SelectorTransport(transports._FlowControlMixin,
...
@@ -419,6 +433,26 @@ class _SelectorTransport(transports._FlowControlMixin,
if
self
.
_server
is
not
None
:
if
self
.
_server
is
not
None
:
self
.
_server
.
_attach
()
self
.
_server
.
_attach
()
def
__repr__
(
self
):
info
=
[
self
.
__class__
.
__name__
,
'fd=%s'
%
self
.
_sock_fd
]
polling
=
_test_selector_event
(
self
.
_loop
.
_selector
,
self
.
_sock_fd
,
selectors
.
EVENT_READ
)
if
polling
:
info
.
append
(
'read=polling'
)
else
:
info
.
append
(
'read=idle'
)
polling
=
_test_selector_event
(
self
.
_loop
.
_selector
,
self
.
_sock_fd
,
selectors
.
EVENT_WRITE
)
if
polling
:
state
=
'polling'
else
:
state
=
'idle'
bufsize
=
self
.
get_write_buffer_size
()
info
.
append
(
'write=<%s, bufsize=%s>'
%
(
state
,
bufsize
))
return
'<%s>'
%
' '
.
join
(
info
)
def
abort
(
self
):
def
abort
(
self
):
self
.
_force_close
(
None
)
self
.
_force_close
(
None
)
...
@@ -433,7 +467,10 @@ class _SelectorTransport(transports._FlowControlMixin,
...
@@ -433,7 +467,10 @@ class _SelectorTransport(transports._FlowControlMixin,
def
_fatal_error
(
self
,
exc
,
message
=
'Fatal error on transport'
):
def
_fatal_error
(
self
,
exc
,
message
=
'Fatal error on transport'
):
# Should be called from exception handler only.
# Should be called from exception handler only.
if
not
isinstance
(
exc
,
(
BrokenPipeError
,
ConnectionResetError
)):
if
isinstance
(
exc
,
(
BrokenPipeError
,
ConnectionResetError
)):
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r: %s"
,
self
,
message
,
exc_info
=
True
)
else
:
self
.
_loop
.
call_exception_handler
({
self
.
_loop
.
call_exception_handler
({
'message'
:
message
,
'message'
:
message
,
'exception'
:
exc
,
'exception'
:
exc
,
...
@@ -492,6 +529,8 @@ class _SelectorSocketTransport(_SelectorTransport):
...
@@ -492,6 +529,8 @@ class _SelectorSocketTransport(_SelectorTransport):
raise
RuntimeError
(
'Already paused'
)
raise
RuntimeError
(
'Already paused'
)
self
.
_paused
=
True
self
.
_paused
=
True
self
.
_loop
.
remove_reader
(
self
.
_sock_fd
)
self
.
_loop
.
remove_reader
(
self
.
_sock_fd
)
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r pauses reading"
,
self
)
def
resume_reading
(
self
):
def
resume_reading
(
self
):
if
not
self
.
_paused
:
if
not
self
.
_paused
:
...
@@ -500,6 +539,8 @@ class _SelectorSocketTransport(_SelectorTransport):
...
@@ -500,6 +539,8 @@ class _SelectorSocketTransport(_SelectorTransport):
if
self
.
_closing
:
if
self
.
_closing
:
return
return
self
.
_loop
.
add_reader
(
self
.
_sock_fd
,
self
.
_read_ready
)
self
.
_loop
.
add_reader
(
self
.
_sock_fd
,
self
.
_read_ready
)
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r resumes reading"
,
self
)
def
_read_ready
(
self
):
def
_read_ready
(
self
):
try
:
try
:
...
@@ -512,6 +553,8 @@ class _SelectorSocketTransport(_SelectorTransport):
...
@@ -512,6 +553,8 @@ class _SelectorSocketTransport(_SelectorTransport):
if
data
:
if
data
:
self
.
_protocol
.
data_received
(
data
)
self
.
_protocol
.
data_received
(
data
)
else
:
else
:
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r received EOF"
,
self
)
keep_open
=
self
.
_protocol
.
eof_received
()
keep_open
=
self
.
_protocol
.
eof_received
()
if
keep_open
:
if
keep_open
:
# We're keeping the connection open so the
# We're keeping the connection open so the
...
@@ -638,31 +681,37 @@ class _SelectorSslTransport(_SelectorTransport):
...
@@ -638,31 +681,37 @@ class _SelectorSslTransport(_SelectorTransport):
# SSL-specific extra info. (peercert is set later)
# SSL-specific extra info. (peercert is set later)
self
.
_extra
.
update
(
sslcontext
=
sslcontext
)
self
.
_extra
.
update
(
sslcontext
=
sslcontext
)
self
.
_on_handshake
()
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r starts SSL handshake"
,
self
)
start_time
=
self
.
_loop
.
time
()
else
:
start_time
=
None
self
.
_on_handshake
(
start_time
)
def
_on_handshake
(
self
):
def
_on_handshake
(
self
,
start_time
):
try
:
try
:
self
.
_sock
.
do_handshake
()
self
.
_sock
.
do_handshake
()
except
ssl
.
SSLWantReadError
:
except
ssl
.
SSLWantReadError
:
self
.
_loop
.
add_reader
(
self
.
_sock_fd
,
self
.
_on_handshake
)
self
.
_loop
.
add_reader
(
self
.
_sock_fd
,
self
.
_on_handshake
,
start_time
)
return
return
except
ssl
.
SSLWantWriteError
:
except
ssl
.
SSLWantWriteError
:
self
.
_loop
.
add_writer
(
self
.
_sock_fd
,
self
.
_on_handshake
)
self
.
_loop
.
add_writer
(
self
.
_sock_fd
,
return
self
.
_on_handshake
,
start_time
)
except
Exception
as
exc
:
self
.
_loop
.
remove_reader
(
self
.
_sock_fd
)
self
.
_loop
.
remove_writer
(
self
.
_sock_fd
)
self
.
_sock
.
close
()
if
self
.
_waiter
is
not
None
:
self
.
_waiter
.
set_exception
(
exc
)
return
return
except
BaseException
as
exc
:
except
BaseException
as
exc
:
if
self
.
_loop
.
get_debug
():
logger
.
warning
(
"%r: SSL handshake failed"
,
self
,
exc_info
=
True
)
self
.
_loop
.
remove_reader
(
self
.
_sock_fd
)
self
.
_loop
.
remove_reader
(
self
.
_sock_fd
)
self
.
_loop
.
remove_writer
(
self
.
_sock_fd
)
self
.
_loop
.
remove_writer
(
self
.
_sock_fd
)
self
.
_sock
.
close
()
self
.
_sock
.
close
()
if
self
.
_waiter
is
not
None
:
if
self
.
_waiter
is
not
None
:
self
.
_waiter
.
set_exception
(
exc
)
self
.
_waiter
.
set_exception
(
exc
)
raise
if
isinstance
(
exc
,
Exception
):
return
else
:
raise
self
.
_loop
.
remove_reader
(
self
.
_sock_fd
)
self
.
_loop
.
remove_reader
(
self
.
_sock_fd
)
self
.
_loop
.
remove_writer
(
self
.
_sock_fd
)
self
.
_loop
.
remove_writer
(
self
.
_sock_fd
)
...
@@ -676,6 +725,10 @@ class _SelectorSslTransport(_SelectorTransport):
...
@@ -676,6 +725,10 @@ class _SelectorSslTransport(_SelectorTransport):
try
:
try
:
ssl
.
match_hostname
(
peercert
,
self
.
_server_hostname
)
ssl
.
match_hostname
(
peercert
,
self
.
_server_hostname
)
except
Exception
as
exc
:
except
Exception
as
exc
:
if
self
.
_loop
.
get_debug
():
logger
.
warning
(
"%r: SSL handshake failed "
"on matching the hostname"
,
self
,
exc_info
=
True
)
self
.
_sock
.
close
()
self
.
_sock
.
close
()
if
self
.
_waiter
is
not
None
:
if
self
.
_waiter
is
not
None
:
self
.
_waiter
.
set_exception
(
exc
)
self
.
_waiter
.
set_exception
(
exc
)
...
@@ -696,6 +749,10 @@ class _SelectorSslTransport(_SelectorTransport):
...
@@ -696,6 +749,10 @@ class _SelectorSslTransport(_SelectorTransport):
self
.
_loop
.
call_soon
(
self
.
_waiter
.
_set_result_unless_cancelled
,
self
.
_loop
.
call_soon
(
self
.
_waiter
.
_set_result_unless_cancelled
,
None
)
None
)
if
self
.
_loop
.
get_debug
():
dt
=
self
.
_loop
.
time
()
-
start_time
logger
.
debug
(
"%r: SSL handshake took %.1f ms"
,
self
,
dt
*
1e3
)
def
pause_reading
(
self
):
def
pause_reading
(
self
):
# XXX This is a bit icky, given the comment at the top of
# XXX This is a bit icky, given the comment at the top of
# _read_ready(). Is it possible to evoke a deadlock? I don't
# _read_ready(). Is it possible to evoke a deadlock? I don't
...
@@ -709,6 +766,8 @@ class _SelectorSslTransport(_SelectorTransport):
...
@@ -709,6 +766,8 @@ class _SelectorSslTransport(_SelectorTransport):
raise
RuntimeError
(
'Already paused'
)
raise
RuntimeError
(
'Already paused'
)
self
.
_paused
=
True
self
.
_paused
=
True
self
.
_loop
.
remove_reader
(
self
.
_sock_fd
)
self
.
_loop
.
remove_reader
(
self
.
_sock_fd
)
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r pauses reading"
,
self
)
def
resume_reading
(
self
):
def
resume_reading
(
self
):
if
not
self
.
_paused
:
if
not
self
.
_paused
:
...
@@ -717,6 +776,8 @@ class _SelectorSslTransport(_SelectorTransport):
...
@@ -717,6 +776,8 @@ class _SelectorSslTransport(_SelectorTransport):
if
self
.
_closing
:
if
self
.
_closing
:
return
return
self
.
_loop
.
add_reader
(
self
.
_sock_fd
,
self
.
_read_ready
)
self
.
_loop
.
add_reader
(
self
.
_sock_fd
,
self
.
_read_ready
)
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r resumes reading"
,
self
)
def
_read_ready
(
self
):
def
_read_ready
(
self
):
if
self
.
_write_wants_read
:
if
self
.
_write_wants_read
:
...
@@ -741,6 +802,8 @@ class _SelectorSslTransport(_SelectorTransport):
...
@@ -741,6 +802,8 @@ class _SelectorSslTransport(_SelectorTransport):
self
.
_protocol
.
data_received
(
data
)
self
.
_protocol
.
data_received
(
data
)
else
:
else
:
try
:
try
:
if
self
.
_loop
.
get_debug
():
logger
.
debug
(
"%r received EOF"
,
self
)
keep_open
=
self
.
_protocol
.
eof_received
()
keep_open
=
self
.
_protocol
.
eof_received
()
if
keep_open
:
if
keep_open
:
logger
.
warning
(
'returning true from eof_received() '
logger
.
warning
(
'returning true from eof_received() '
...
...
Lib/asyncio/unix_events.py
View file @
59d77e8a
...
@@ -16,6 +16,7 @@ from . import base_subprocess
...
@@ -16,6 +16,7 @@ from . import base_subprocess
from
.
import
constants
from
.
import
constants
from
.
import
events
from
.
import
events
from
.
import
selector_events
from
.
import
selector_events
from
.
import
selectors
from
.
import
transports
from
.
import
transports
from
.coroutines
import
coroutine
from
.coroutines
import
coroutine
from
.log
import
logger
from
.log
import
logger
...
@@ -272,6 +273,20 @@ class _UnixReadPipeTransport(transports.ReadTransport):
...
@@ -272,6 +273,20 @@ class _UnixReadPipeTransport(transports.ReadTransport):
# wait until protocol.connection_made() has been called
# wait until protocol.connection_made() has been called
self
.
_loop
.
call_soon
(
waiter
.
_set_result_unless_cancelled
,
None
)
self
.
_loop
.
call_soon
(
waiter
.
_set_result_unless_cancelled
,
None
)
def
__repr__
(
self
):
info
=
[
self
.
__class__
.
__name__
,
'fd=%s'
%
self
.
_fileno
]
if
self
.
_pipe
is
not
None
:
polling
=
selector_events
.
_test_selector_event
(
self
.
_loop
.
_selector
,
self
.
_fileno
,
selectors
.
EVENT_READ
)
if
polling
:
info
.
append
(
'polling'
)
else
:
info
.
append
(
'idle'
)
else
:
info
.
append
(
'closed'
)
return
'<%s>'
%
' '
.
join
(
info
)
def
_read_ready
(
self
):
def
_read_ready
(
self
):
try
:
try
:
data
=
os
.
read
(
self
.
_fileno
,
self
.
max_size
)
data
=
os
.
read
(
self
.
_fileno
,
self
.
max_size
)
...
@@ -283,6 +298,8 @@ class _UnixReadPipeTransport(transports.ReadTransport):
...
@@ -283,6 +298,8 @@ class _UnixReadPipeTransport(transports.ReadTransport):
if
data
:
if
data
:
self
.
_protocol
.
data_received
(
data
)
self
.
_protocol
.
data_received
(
data
)
else
:
else
:
if
self
.
_loop
.
get_debug
():
logger
.
info
(
"%r was closed by peer"
,
self
)
self
.
_closing
=
True
self
.
_closing
=
True
self
.
_loop
.
remove_reader
(
self
.
_fileno
)
self
.
_loop
.
remove_reader
(
self
.
_fileno
)
self
.
_loop
.
call_soon
(
self
.
_protocol
.
eof_received
)
self
.
_loop
.
call_soon
(
self
.
_protocol
.
eof_received
)
...
@@ -357,11 +374,30 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
...
@@ -357,11 +374,30 @@ class _UnixWritePipeTransport(transports._FlowControlMixin,
# wait until protocol.connection_made() has been called
# wait until protocol.connection_made() has been called
self
.
_loop
.
call_soon
(
waiter
.
_set_result_unless_cancelled
,
None
)
self
.
_loop
.
call_soon
(
waiter
.
_set_result_unless_cancelled
,
None
)
def
__repr__
(
self
):
info
=
[
self
.
__class__
.
__name__
,
'fd=%s'
%
self
.
_fileno
]
if
self
.
_pipe
is
not
None
:
polling
=
selector_events
.
_test_selector_event
(
self
.
_loop
.
_selector
,
self
.
_fileno
,
selectors
.
EVENT_WRITE
)
if
polling
:
info
.
append
(
'polling'
)
else
:
info
.
append
(
'idle'
)
bufsize
=
self
.
get_write_buffer_size
()
info
.
append
(
'bufsize=%s'
%
bufsize
)
else
:
info
.
append
(
'closed'
)
return
'<%s>'
%
' '
.
join
(
info
)
def
get_write_buffer_size
(
self
):
def
get_write_buffer_size
(
self
):
return
sum
(
len
(
data
)
for
data
in
self
.
_buffer
)
return
sum
(
len
(
data
)
for
data
in
self
.
_buffer
)
def
_read_ready
(
self
):
def
_read_ready
(
self
):
# Pipe was closed by peer.
# Pipe was closed by peer.
if
self
.
_loop
.
get_debug
():
logger
.
info
(
"%r was closed by peer"
,
self
)
if
self
.
_buffer
:
if
self
.
_buffer
:
self
.
_close
(
BrokenPipeError
())
self
.
_close
(
BrokenPipeError
())
else
:
else
:
...
...
Lib/asyncio/windows_events.py
View file @
59d77e8a
...
@@ -40,6 +40,18 @@ class _OverlappedFuture(futures.Future):
...
@@ -40,6 +40,18 @@ class _OverlappedFuture(futures.Future):
super
().
__init__
(
loop
=
loop
)
super
().
__init__
(
loop
=
loop
)
self
.
ov
=
ov
self
.
ov
=
ov
def
__repr__
(
self
):
info
=
[
self
.
_state
.
lower
()]
if
self
.
ov
.
pending
:
info
.
append
(
'overlapped=pending'
)
else
:
info
.
append
(
'overlapped=completed'
)
if
self
.
_state
==
futures
.
_FINISHED
:
info
.
append
(
self
.
_format_result
())
if
self
.
_callbacks
:
info
.
append
(
self
.
_format_callbacks
())
return
'<%s %s>'
%
(
self
.
__class__
.
__name__
,
' '
.
join
(
info
))
def
cancel
(
self
):
def
cancel
(
self
):
try
:
try
:
self
.
ov
.
cancel
()
self
.
ov
.
cancel
()
...
...
Lib/test/test_asyncio/test_proactor_events.py
View file @
59d77e8a
...
@@ -358,16 +358,17 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
...
@@ -358,16 +358,17 @@ class BaseProactorEventLoopTests(test_utils.TestCase):
self
.
loop
=
EventLoop
(
self
.
proactor
)
self
.
loop
=
EventLoop
(
self
.
proactor
)
self
.
set_event_loop
(
self
.
loop
,
cleanup
=
False
)
self
.
set_event_loop
(
self
.
loop
,
cleanup
=
False
)
@
mock
.
patch
.
object
(
BaseProactorEventLoop
,
'call_soon'
)
@
mock
.
patch
.
object
(
BaseProactorEventLoop
,
'
_
call_soon'
)
@
mock
.
patch
.
object
(
BaseProactorEventLoop
,
'_socketpair'
)
@
mock
.
patch
.
object
(
BaseProactorEventLoop
,
'_socketpair'
)
def
test_ctor
(
self
,
socketpair
,
call_soon
):
def
test_ctor
(
self
,
socketpair
,
_
call_soon
):
ssock
,
csock
=
socketpair
.
return_value
=
(
ssock
,
csock
=
socketpair
.
return_value
=
(
mock
.
Mock
(),
mock
.
Mock
())
mock
.
Mock
(),
mock
.
Mock
())
loop
=
BaseProactorEventLoop
(
self
.
proactor
)
loop
=
BaseProactorEventLoop
(
self
.
proactor
)
self
.
assertIs
(
loop
.
_ssock
,
ssock
)
self
.
assertIs
(
loop
.
_ssock
,
ssock
)
self
.
assertIs
(
loop
.
_csock
,
csock
)
self
.
assertIs
(
loop
.
_csock
,
csock
)
self
.
assertEqual
(
loop
.
_internal_fds
,
1
)
self
.
assertEqual
(
loop
.
_internal_fds
,
1
)
call_soon
.
assert_called_with
(
loop
.
_loop_self_reading
)
_call_soon
.
assert_called_with
(
loop
.
_loop_self_reading
,
(),
check_loop
=
False
)
def
test_close_self_pipe
(
self
):
def
test_close_self_pipe
(
self
):
self
.
loop
.
_close_self_pipe
()
self
.
loop
.
_close_self_pipe
()
...
...
Lib/test/test_asyncio/test_selector_events.py
View file @
59d77e8a
...
@@ -1092,15 +1092,15 @@ class SelectorSslTransportTests(test_utils.TestCase):
...
@@ -1092,15 +1092,15 @@ class SelectorSslTransportTests(test_utils.TestCase):
self
.
sslsock
.
do_handshake
.
side_effect
=
ssl
.
SSLWantReadError
self
.
sslsock
.
do_handshake
.
side_effect
=
ssl
.
SSLWantReadError
transport
=
_SelectorSslTransport
(
transport
=
_SelectorSslTransport
(
self
.
loop
,
self
.
sock
,
self
.
protocol
,
self
.
sslcontext
)
self
.
loop
,
self
.
sock
,
self
.
protocol
,
self
.
sslcontext
)
transport
.
_on_handshake
()
transport
.
_on_handshake
(
None
)
self
.
loop
.
assert_reader
(
1
,
transport
.
_on_handshake
)
self
.
loop
.
assert_reader
(
1
,
transport
.
_on_handshake
,
None
)
def
test_on_handshake_writer_retry
(
self
):
def
test_on_handshake_writer_retry
(
self
):
self
.
sslsock
.
do_handshake
.
side_effect
=
ssl
.
SSLWantWriteError
self
.
sslsock
.
do_handshake
.
side_effect
=
ssl
.
SSLWantWriteError
transport
=
_SelectorSslTransport
(
transport
=
_SelectorSslTransport
(
self
.
loop
,
self
.
sock
,
self
.
protocol
,
self
.
sslcontext
)
self
.
loop
,
self
.
sock
,
self
.
protocol
,
self
.
sslcontext
)
transport
.
_on_handshake
()
transport
.
_on_handshake
(
None
)
self
.
loop
.
assert_writer
(
1
,
transport
.
_on_handshake
)
self
.
loop
.
assert_writer
(
1
,
transport
.
_on_handshake
,
None
)
def
test_on_handshake_exc
(
self
):
def
test_on_handshake_exc
(
self
):
exc
=
ValueError
()
exc
=
ValueError
()
...
@@ -1108,7 +1108,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
...
@@ -1108,7 +1108,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport
=
_SelectorSslTransport
(
transport
=
_SelectorSslTransport
(
self
.
loop
,
self
.
sock
,
self
.
protocol
,
self
.
sslcontext
)
self
.
loop
,
self
.
sock
,
self
.
protocol
,
self
.
sslcontext
)
transport
.
_waiter
=
asyncio
.
Future
(
loop
=
self
.
loop
)
transport
.
_waiter
=
asyncio
.
Future
(
loop
=
self
.
loop
)
transport
.
_on_handshake
()
transport
.
_on_handshake
(
None
)
self
.
assertTrue
(
self
.
sslsock
.
close
.
called
)
self
.
assertTrue
(
self
.
sslsock
.
close
.
called
)
self
.
assertTrue
(
transport
.
_waiter
.
done
())
self
.
assertTrue
(
transport
.
_waiter
.
done
())
self
.
assertIs
(
exc
,
transport
.
_waiter
.
exception
())
self
.
assertIs
(
exc
,
transport
.
_waiter
.
exception
())
...
@@ -1119,7 +1119,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
...
@@ -1119,7 +1119,7 @@ class SelectorSslTransportTests(test_utils.TestCase):
transport
.
_waiter
=
asyncio
.
Future
(
loop
=
self
.
loop
)
transport
.
_waiter
=
asyncio
.
Future
(
loop
=
self
.
loop
)
exc
=
BaseException
()
exc
=
BaseException
()
self
.
sslsock
.
do_handshake
.
side_effect
=
exc
self
.
sslsock
.
do_handshake
.
side_effect
=
exc
self
.
assertRaises
(
BaseException
,
transport
.
_on_handshake
)
self
.
assertRaises
(
BaseException
,
transport
.
_on_handshake
,
None
)
self
.
assertTrue
(
self
.
sslsock
.
close
.
called
)
self
.
assertTrue
(
self
.
sslsock
.
close
.
called
)
self
.
assertTrue
(
transport
.
_waiter
.
done
())
self
.
assertTrue
(
transport
.
_waiter
.
done
())
self
.
assertIs
(
exc
,
transport
.
_waiter
.
exception
())
self
.
assertIs
(
exc
,
transport
.
_waiter
.
exception
())
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment