Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
ad4ed872
Commit
ad4ed872
authored
May 06, 2019
by
Andrew Svetlov
Committed by
GitHub
May 06, 2019
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Forbid creating of stream objects outside of asyncio (#13101)
parent
2cc0223f
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
203 additions
and
76 deletions
+203
-76
Lib/asyncio/streams.py
Lib/asyncio/streams.py
+51
-16
Lib/asyncio/subprocess.py
Lib/asyncio/subprocess.py
+22
-10
Lib/test/test_asyncio/test_streams.py
Lib/test/test_asyncio/test_streams.py
+116
-50
Lib/test/test_asyncio/test_subprocess.py
Lib/test/test_asyncio/test_subprocess.py
+12
-0
Misc/NEWS.d/next/Library/2019-05-05-16-14-38.bpo-36806.rAzF-x.rst
...S.d/next/Library/2019-05-05-16-14-38.bpo-36806.rAzF-x.rst
+2
-0
No files found.
Lib/asyncio/streams.py
View file @
ad4ed872
...
@@ -4,6 +4,7 @@ __all__ = (
...
@@ -4,6 +4,7 @@ __all__ = (
import
socket
import
socket
import
sys
import
sys
import
warnings
import
weakref
import
weakref
if
hasattr
(
socket
,
'AF_UNIX'
):
if
hasattr
(
socket
,
'AF_UNIX'
):
...
@@ -42,11 +43,14 @@ async def open_connection(host=None, port=None, *,
...
@@ -42,11 +43,14 @@ async def open_connection(host=None, port=None, *,
"""
"""
if
loop
is
None
:
if
loop
is
None
:
loop
=
events
.
get_event_loop
()
loop
=
events
.
get_event_loop
()
reader
=
StreamReader
(
limit
=
limit
,
loop
=
loop
)
reader
=
StreamReader
(
limit
=
limit
,
loop
=
loop
,
protocol
=
StreamReaderProtocol
(
reader
,
loop
=
loop
)
_asyncio_internal
=
True
)
protocol
=
StreamReaderProtocol
(
reader
,
loop
=
loop
,
_asyncio_internal
=
True
)
transport
,
_
=
await
loop
.
create_connection
(
transport
,
_
=
await
loop
.
create_connection
(
lambda
:
protocol
,
host
,
port
,
**
kwds
)
lambda
:
protocol
,
host
,
port
,
**
kwds
)
writer
=
StreamWriter
(
transport
,
protocol
,
reader
,
loop
)
writer
=
StreamWriter
(
transport
,
protocol
,
reader
,
loop
,
_asyncio_internal
=
True
)
return
reader
,
writer
return
reader
,
writer
...
@@ -77,9 +81,11 @@ async def start_server(client_connected_cb, host=None, port=None, *,
...
@@ -77,9 +81,11 @@ async def start_server(client_connected_cb, host=None, port=None, *,
loop
=
events
.
get_event_loop
()
loop
=
events
.
get_event_loop
()
def
factory
():
def
factory
():
reader
=
StreamReader
(
limit
=
limit
,
loop
=
loop
)
reader
=
StreamReader
(
limit
=
limit
,
loop
=
loop
,
_asyncio_internal
=
True
)
protocol
=
StreamReaderProtocol
(
reader
,
client_connected_cb
,
protocol
=
StreamReaderProtocol
(
reader
,
client_connected_cb
,
loop
=
loop
)
loop
=
loop
,
_asyncio_internal
=
True
)
return
protocol
return
protocol
return
await
loop
.
create_server
(
factory
,
host
,
port
,
**
kwds
)
return
await
loop
.
create_server
(
factory
,
host
,
port
,
**
kwds
)
...
@@ -93,11 +99,14 @@ if hasattr(socket, 'AF_UNIX'):
...
@@ -93,11 +99,14 @@ if hasattr(socket, 'AF_UNIX'):
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
if
loop
is
None
:
if
loop
is
None
:
loop
=
events
.
get_event_loop
()
loop
=
events
.
get_event_loop
()
reader
=
StreamReader
(
limit
=
limit
,
loop
=
loop
)
reader
=
StreamReader
(
limit
=
limit
,
loop
=
loop
,
protocol
=
StreamReaderProtocol
(
reader
,
loop
=
loop
)
_asyncio_internal
=
True
)
protocol
=
StreamReaderProtocol
(
reader
,
loop
=
loop
,
_asyncio_internal
=
True
)
transport
,
_
=
await
loop
.
create_unix_connection
(
transport
,
_
=
await
loop
.
create_unix_connection
(
lambda
:
protocol
,
path
,
**
kwds
)
lambda
:
protocol
,
path
,
**
kwds
)
writer
=
StreamWriter
(
transport
,
protocol
,
reader
,
loop
)
writer
=
StreamWriter
(
transport
,
protocol
,
reader
,
loop
,
_asyncio_internal
=
True
)
return
reader
,
writer
return
reader
,
writer
async
def
start_unix_server
(
client_connected_cb
,
path
=
None
,
*
,
async
def
start_unix_server
(
client_connected_cb
,
path
=
None
,
*
,
...
@@ -107,9 +116,11 @@ if hasattr(socket, 'AF_UNIX'):
...
@@ -107,9 +116,11 @@ if hasattr(socket, 'AF_UNIX'):
loop
=
events
.
get_event_loop
()
loop
=
events
.
get_event_loop
()
def
factory
():
def
factory
():
reader
=
StreamReader
(
limit
=
limit
,
loop
=
loop
)
reader
=
StreamReader
(
limit
=
limit
,
loop
=
loop
,
_asyncio_internal
=
True
)
protocol
=
StreamReaderProtocol
(
reader
,
client_connected_cb
,
protocol
=
StreamReaderProtocol
(
reader
,
client_connected_cb
,
loop
=
loop
)
loop
=
loop
,
_asyncio_internal
=
True
)
return
protocol
return
protocol
return
await
loop
.
create_unix_server
(
factory
,
path
,
**
kwds
)
return
await
loop
.
create_unix_server
(
factory
,
path
,
**
kwds
)
...
@@ -125,11 +136,20 @@ class FlowControlMixin(protocols.Protocol):
...
@@ -125,11 +136,20 @@ class FlowControlMixin(protocols.Protocol):
StreamWriter.drain() must wait for _drain_helper() coroutine.
StreamWriter.drain() must wait for _drain_helper() coroutine.
"""
"""
def
__init__
(
self
,
loop
=
None
):
def
__init__
(
self
,
loop
=
None
,
*
,
_asyncio_internal
=
False
):
if
loop
is
None
:
if
loop
is
None
:
self
.
_loop
=
events
.
get_event_loop
()
self
.
_loop
=
events
.
get_event_loop
()
else
:
else
:
self
.
_loop
=
loop
self
.
_loop
=
loop
if
not
_asyncio_internal
:
# NOTE:
# Avoid inheritance from FlowControlMixin
# Copy-paste the code to your project
# if you need flow control helpers
warnings
.
warn
(
f"
{
self
.
__class__
}
should be instaniated "
"by asyncio internals only, "
"please avoid its creation from user code"
,
DeprecationWarning
)
self
.
_paused
=
False
self
.
_paused
=
False
self
.
_drain_waiter
=
None
self
.
_drain_waiter
=
None
self
.
_connection_lost
=
False
self
.
_connection_lost
=
False
...
@@ -191,8 +211,9 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
...
@@ -191,8 +211,9 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
_source_traceback
=
None
_source_traceback
=
None
def
__init__
(
self
,
stream_reader
,
client_connected_cb
=
None
,
loop
=
None
):
def
__init__
(
self
,
stream_reader
,
client_connected_cb
=
None
,
loop
=
None
,
super
().
__init__
(
loop
=
loop
)
*
,
_asyncio_internal
=
False
):
super
().
__init__
(
loop
=
loop
,
_asyncio_internal
=
_asyncio_internal
)
if
stream_reader
is
not
None
:
if
stream_reader
is
not
None
:
self
.
_stream_reader_wr
=
weakref
.
ref
(
stream_reader
,
self
.
_stream_reader_wr
=
weakref
.
ref
(
stream_reader
,
self
.
_on_reader_gc
)
self
.
_on_reader_gc
)
...
@@ -253,7 +274,8 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
...
@@ -253,7 +274,8 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
if
self
.
_client_connected_cb
is
not
None
:
if
self
.
_client_connected_cb
is
not
None
:
self
.
_stream_writer
=
StreamWriter
(
transport
,
self
,
self
.
_stream_writer
=
StreamWriter
(
transport
,
self
,
reader
,
reader
,
self
.
_loop
)
self
.
_loop
,
_asyncio_internal
=
True
)
res
=
self
.
_client_connected_cb
(
reader
,
res
=
self
.
_client_connected_cb
(
reader
,
self
.
_stream_writer
)
self
.
_stream_writer
)
if
coroutines
.
iscoroutine
(
res
):
if
coroutines
.
iscoroutine
(
res
):
...
@@ -311,7 +333,13 @@ class StreamWriter:
...
@@ -311,7 +333,13 @@ class StreamWriter:
directly.
directly.
"""
"""
def
__init__
(
self
,
transport
,
protocol
,
reader
,
loop
):
def
__init__
(
self
,
transport
,
protocol
,
reader
,
loop
,
*
,
_asyncio_internal
=
False
):
if
not
_asyncio_internal
:
warnings
.
warn
(
f"
{
self
.
__class__
}
should be instaniated "
"by asyncio internals only, "
"please avoid its creation from user code"
,
DeprecationWarning
)
self
.
_transport
=
transport
self
.
_transport
=
transport
self
.
_protocol
=
protocol
self
.
_protocol
=
protocol
# drain() expects that the reader has an exception() method
# drain() expects that the reader has an exception() method
...
@@ -388,7 +416,14 @@ class StreamReader:
...
@@ -388,7 +416,14 @@ class StreamReader:
_source_traceback
=
None
_source_traceback
=
None
def
__init__
(
self
,
limit
=
_DEFAULT_LIMIT
,
loop
=
None
):
def
__init__
(
self
,
limit
=
_DEFAULT_LIMIT
,
loop
=
None
,
*
,
_asyncio_internal
=
False
):
if
not
_asyncio_internal
:
warnings
.
warn
(
f"
{
self
.
__class__
}
should be instaniated "
"by asyncio internals only, "
"please avoid its creation from user code"
,
DeprecationWarning
)
# The line length limit is a security feature;
# The line length limit is a security feature;
# it also doubles as half the buffer limit.
# it also doubles as half the buffer limit.
...
...
Lib/asyncio/subprocess.py
View file @
ad4ed872
__all__
=
'create_subprocess_exec'
,
'create_subprocess_shell'
__all__
=
'create_subprocess_exec'
,
'create_subprocess_shell'
import
subprocess
import
subprocess
import
warnings
from
.
import
events
from
.
import
events
from
.
import
protocols
from
.
import
protocols
...
@@ -18,8 +19,8 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
...
@@ -18,8 +19,8 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
protocols
.
SubprocessProtocol
):
protocols
.
SubprocessProtocol
):
"""Like StreamReaderProtocol, but for a subprocess."""
"""Like StreamReaderProtocol, but for a subprocess."""
def
__init__
(
self
,
limit
,
loop
):
def
__init__
(
self
,
limit
,
loop
,
*
,
_asyncio_internal
=
False
):
super
().
__init__
(
loop
=
loop
)
super
().
__init__
(
loop
=
loop
,
_asyncio_internal
=
_asyncio_internal
)
self
.
_limit
=
limit
self
.
_limit
=
limit
self
.
stdin
=
self
.
stdout
=
self
.
stderr
=
None
self
.
stdin
=
self
.
stdout
=
self
.
stderr
=
None
self
.
_transport
=
None
self
.
_transport
=
None
...
@@ -42,14 +43,16 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
...
@@ -42,14 +43,16 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
stdout_transport
=
transport
.
get_pipe_transport
(
1
)
stdout_transport
=
transport
.
get_pipe_transport
(
1
)
if
stdout_transport
is
not
None
:
if
stdout_transport
is
not
None
:
self
.
stdout
=
streams
.
StreamReader
(
limit
=
self
.
_limit
,
self
.
stdout
=
streams
.
StreamReader
(
limit
=
self
.
_limit
,
loop
=
self
.
_loop
)
loop
=
self
.
_loop
,
_asyncio_internal
=
True
)
self
.
stdout
.
set_transport
(
stdout_transport
)
self
.
stdout
.
set_transport
(
stdout_transport
)
self
.
_pipe_fds
.
append
(
1
)
self
.
_pipe_fds
.
append
(
1
)
stderr_transport
=
transport
.
get_pipe_transport
(
2
)
stderr_transport
=
transport
.
get_pipe_transport
(
2
)
if
stderr_transport
is
not
None
:
if
stderr_transport
is
not
None
:
self
.
stderr
=
streams
.
StreamReader
(
limit
=
self
.
_limit
,
self
.
stderr
=
streams
.
StreamReader
(
limit
=
self
.
_limit
,
loop
=
self
.
_loop
)
loop
=
self
.
_loop
,
_asyncio_internal
=
True
)
self
.
stderr
.
set_transport
(
stderr_transport
)
self
.
stderr
.
set_transport
(
stderr_transport
)
self
.
_pipe_fds
.
append
(
2
)
self
.
_pipe_fds
.
append
(
2
)
...
@@ -58,7 +61,8 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
...
@@ -58,7 +61,8 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
self
.
stdin
=
streams
.
StreamWriter
(
stdin_transport
,
self
.
stdin
=
streams
.
StreamWriter
(
stdin_transport
,
protocol
=
self
,
protocol
=
self
,
reader
=
None
,
reader
=
None
,
loop
=
self
.
_loop
)
loop
=
self
.
_loop
,
_asyncio_internal
=
True
)
def
pipe_data_received
(
self
,
fd
,
data
):
def
pipe_data_received
(
self
,
fd
,
data
):
if
fd
==
1
:
if
fd
==
1
:
...
@@ -104,7 +108,13 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
...
@@ -104,7 +108,13 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
class
Process
:
class
Process
:
def
__init__
(
self
,
transport
,
protocol
,
loop
):
def
__init__
(
self
,
transport
,
protocol
,
loop
,
*
,
_asyncio_internal
=
False
):
if
not
_asyncio_internal
:
warnings
.
warn
(
f"
{
self
.
__class__
}
should be instaniated "
"by asyncio internals only, "
"please avoid its creation from user code"
,
DeprecationWarning
)
self
.
_transport
=
transport
self
.
_transport
=
transport
self
.
_protocol
=
protocol
self
.
_protocol
=
protocol
self
.
_loop
=
loop
self
.
_loop
=
loop
...
@@ -195,12 +205,13 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
...
@@ -195,12 +205,13 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
if
loop
is
None
:
if
loop
is
None
:
loop
=
events
.
get_event_loop
()
loop
=
events
.
get_event_loop
()
protocol_factory
=
lambda
:
SubprocessStreamProtocol
(
limit
=
limit
,
protocol_factory
=
lambda
:
SubprocessStreamProtocol
(
limit
=
limit
,
loop
=
loop
)
loop
=
loop
,
_asyncio_internal
=
True
)
transport
,
protocol
=
await
loop
.
subprocess_shell
(
transport
,
protocol
=
await
loop
.
subprocess_shell
(
protocol_factory
,
protocol_factory
,
cmd
,
stdin
=
stdin
,
stdout
=
stdout
,
cmd
,
stdin
=
stdin
,
stdout
=
stdout
,
stderr
=
stderr
,
**
kwds
)
stderr
=
stderr
,
**
kwds
)
return
Process
(
transport
,
protocol
,
loop
)
return
Process
(
transport
,
protocol
,
loop
,
_asyncio_internal
=
True
)
async
def
create_subprocess_exec
(
program
,
*
args
,
stdin
=
None
,
stdout
=
None
,
async
def
create_subprocess_exec
(
program
,
*
args
,
stdin
=
None
,
stdout
=
None
,
...
@@ -209,10 +220,11 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
...
@@ -209,10 +220,11 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
if
loop
is
None
:
if
loop
is
None
:
loop
=
events
.
get_event_loop
()
loop
=
events
.
get_event_loop
()
protocol_factory
=
lambda
:
SubprocessStreamProtocol
(
limit
=
limit
,
protocol_factory
=
lambda
:
SubprocessStreamProtocol
(
limit
=
limit
,
loop
=
loop
)
loop
=
loop
,
_asyncio_internal
=
True
)
transport
,
protocol
=
await
loop
.
subprocess_exec
(
transport
,
protocol
=
await
loop
.
subprocess_exec
(
protocol_factory
,
protocol_factory
,
program
,
*
args
,
program
,
*
args
,
stdin
=
stdin
,
stdout
=
stdout
,
stdin
=
stdin
,
stdout
=
stdout
,
stderr
=
stderr
,
**
kwds
)
stderr
=
stderr
,
**
kwds
)
return
Process
(
transport
,
protocol
,
loop
)
return
Process
(
transport
,
protocol
,
loop
,
_asyncio_internal
=
True
)
Lib/test/test_asyncio/test_streams.py
View file @
ad4ed872
...
@@ -42,7 +42,7 @@ class StreamTests(test_utils.TestCase):
...
@@ -42,7 +42,7 @@ class StreamTests(test_utils.TestCase):
@
mock
.
patch
(
'asyncio.streams.events'
)
@
mock
.
patch
(
'asyncio.streams.events'
)
def
test_ctor_global_loop
(
self
,
m_events
):
def
test_ctor_global_loop
(
self
,
m_events
):
stream
=
asyncio
.
StreamReader
()
stream
=
asyncio
.
StreamReader
(
_asyncio_internal
=
True
)
self
.
assertIs
(
stream
.
_loop
,
m_events
.
get_event_loop
.
return_value
)
self
.
assertIs
(
stream
.
_loop
,
m_events
.
get_event_loop
.
return_value
)
def
_basetest_open_connection
(
self
,
open_connection_fut
):
def
_basetest_open_connection
(
self
,
open_connection_fut
):
...
@@ -135,20 +135,23 @@ class StreamTests(test_utils.TestCase):
...
@@ -135,20 +135,23 @@ class StreamTests(test_utils.TestCase):
self
.
_basetest_open_connection_error
(
conn_fut
)
self
.
_basetest_open_connection_error
(
conn_fut
)
def
test_feed_empty_data
(
self
):
def
test_feed_empty_data
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b''
)
stream
.
feed_data
(
b''
)
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
def
test_feed_nonempty_data
(
self
):
def
test_feed_nonempty_data
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
self
.
DATA
)
stream
.
feed_data
(
self
.
DATA
)
self
.
assertEqual
(
self
.
DATA
,
stream
.
_buffer
)
self
.
assertEqual
(
self
.
DATA
,
stream
.
_buffer
)
def
test_read_zero
(
self
):
def
test_read_zero
(
self
):
# Read zero bytes.
# Read zero bytes.
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
self
.
DATA
)
stream
.
feed_data
(
self
.
DATA
)
data
=
self
.
loop
.
run_until_complete
(
stream
.
read
(
0
))
data
=
self
.
loop
.
run_until_complete
(
stream
.
read
(
0
))
...
@@ -157,7 +160,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -157,7 +160,8 @@ class StreamTests(test_utils.TestCase):
def
test_read
(
self
):
def
test_read
(
self
):
# Read bytes.
# Read bytes.
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
read_task
=
asyncio
.
Task
(
stream
.
read
(
30
),
loop
=
self
.
loop
)
read_task
=
asyncio
.
Task
(
stream
.
read
(
30
),
loop
=
self
.
loop
)
def
cb
():
def
cb
():
...
@@ -170,7 +174,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -170,7 +174,8 @@ class StreamTests(test_utils.TestCase):
def
test_read_line_breaks
(
self
):
def
test_read_line_breaks
(
self
):
# Read bytes without line breaks.
# Read bytes without line breaks.
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'line1'
)
stream
.
feed_data
(
b'line1'
)
stream
.
feed_data
(
b'line2'
)
stream
.
feed_data
(
b'line2'
)
...
@@ -181,7 +186,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -181,7 +186,8 @@ class StreamTests(test_utils.TestCase):
def
test_read_eof
(
self
):
def
test_read_eof
(
self
):
# Read bytes, stop at eof.
# Read bytes, stop at eof.
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
read_task
=
asyncio
.
Task
(
stream
.
read
(
1024
),
loop
=
self
.
loop
)
read_task
=
asyncio
.
Task
(
stream
.
read
(
1024
),
loop
=
self
.
loop
)
def
cb
():
def
cb
():
...
@@ -194,7 +200,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -194,7 +200,8 @@ class StreamTests(test_utils.TestCase):
def
test_read_until_eof
(
self
):
def
test_read_until_eof
(
self
):
# Read all bytes until eof.
# Read all bytes until eof.
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
read_task
=
asyncio
.
Task
(
stream
.
read
(
-
1
),
loop
=
self
.
loop
)
read_task
=
asyncio
.
Task
(
stream
.
read
(
-
1
),
loop
=
self
.
loop
)
def
cb
():
def
cb
():
...
@@ -209,7 +216,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -209,7 +216,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
def
test_read_exception
(
self
):
def
test_read_exception
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'line
\
n
'
)
stream
.
feed_data
(
b'line
\
n
'
)
data
=
self
.
loop
.
run_until_complete
(
stream
.
read
(
2
))
data
=
self
.
loop
.
run_until_complete
(
stream
.
read
(
2
))
...
@@ -221,13 +229,16 @@ class StreamTests(test_utils.TestCase):
...
@@ -221,13 +229,16 @@ class StreamTests(test_utils.TestCase):
def
test_invalid_limit
(
self
):
def
test_invalid_limit
(
self
):
with
self
.
assertRaisesRegex
(
ValueError
,
'imit'
):
with
self
.
assertRaisesRegex
(
ValueError
,
'imit'
):
asyncio
.
StreamReader
(
limit
=
0
,
loop
=
self
.
loop
)
asyncio
.
StreamReader
(
limit
=
0
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
with
self
.
assertRaisesRegex
(
ValueError
,
'imit'
):
with
self
.
assertRaisesRegex
(
ValueError
,
'imit'
):
asyncio
.
StreamReader
(
limit
=-
1
,
loop
=
self
.
loop
)
asyncio
.
StreamReader
(
limit
=-
1
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
def
test_read_limit
(
self
):
def
test_read_limit
(
self
):
stream
=
asyncio
.
StreamReader
(
limit
=
3
,
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
limit
=
3
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'chunk'
)
stream
.
feed_data
(
b'chunk'
)
data
=
self
.
loop
.
run_until_complete
(
stream
.
read
(
5
))
data
=
self
.
loop
.
run_until_complete
(
stream
.
read
(
5
))
self
.
assertEqual
(
b'chunk'
,
data
)
self
.
assertEqual
(
b'chunk'
,
data
)
...
@@ -236,7 +247,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -236,7 +247,8 @@ class StreamTests(test_utils.TestCase):
def
test_readline
(
self
):
def
test_readline
(
self
):
# Read one line. 'readline' will need to wait for the data
# Read one line. 'readline' will need to wait for the data
# to come from 'cb'
# to come from 'cb'
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'chunk1 '
)
stream
.
feed_data
(
b'chunk1 '
)
read_task
=
asyncio
.
Task
(
stream
.
readline
(),
loop
=
self
.
loop
)
read_task
=
asyncio
.
Task
(
stream
.
readline
(),
loop
=
self
.
loop
)
...
@@ -254,7 +266,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -254,7 +266,8 @@ class StreamTests(test_utils.TestCase):
# Read one line. The data is in StreamReader's buffer
# Read one line. The data is in StreamReader's buffer
# before the event loop is run.
# before the event loop is run.
stream
=
asyncio
.
StreamReader
(
limit
=
3
,
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
limit
=
3
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'li'
)
stream
.
feed_data
(
b'li'
)
stream
.
feed_data
(
b'ne1
\
n
line2
\
n
'
)
stream
.
feed_data
(
b'ne1
\
n
line2
\
n
'
)
...
@@ -263,7 +276,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -263,7 +276,8 @@ class StreamTests(test_utils.TestCase):
# The buffer should contain the remaining data after exception
# The buffer should contain the remaining data after exception
self
.
assertEqual
(
b'line2
\
n
'
,
stream
.
_buffer
)
self
.
assertEqual
(
b'line2
\
n
'
,
stream
.
_buffer
)
stream
=
asyncio
.
StreamReader
(
limit
=
3
,
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
limit
=
3
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'li'
)
stream
.
feed_data
(
b'li'
)
stream
.
feed_data
(
b'ne1'
)
stream
.
feed_data
(
b'ne1'
)
stream
.
feed_data
(
b'li'
)
stream
.
feed_data
(
b'li'
)
...
@@ -278,7 +292,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -278,7 +292,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
def
test_at_eof
(
self
):
def
test_at_eof
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
self
.
assertFalse
(
stream
.
at_eof
())
self
.
assertFalse
(
stream
.
at_eof
())
stream
.
feed_data
(
b'some data
\
n
'
)
stream
.
feed_data
(
b'some data
\
n
'
)
...
@@ -296,7 +311,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -296,7 +311,8 @@ class StreamTests(test_utils.TestCase):
# Read one line. StreamReaders are fed with data after
# Read one line. StreamReaders are fed with data after
# their 'readline' methods are called.
# their 'readline' methods are called.
stream
=
asyncio
.
StreamReader
(
limit
=
7
,
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
limit
=
7
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
def
cb
():
def
cb
():
stream
.
feed_data
(
b'chunk1'
)
stream
.
feed_data
(
b'chunk1'
)
stream
.
feed_data
(
b'chunk2'
)
stream
.
feed_data
(
b'chunk2'
)
...
@@ -310,7 +326,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -310,7 +326,8 @@ class StreamTests(test_utils.TestCase):
# a ValueError it should be empty.
# a ValueError it should be empty.
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
stream
=
asyncio
.
StreamReader
(
limit
=
7
,
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
limit
=
7
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
def
cb
():
def
cb
():
stream
.
feed_data
(
b'chunk1'
)
stream
.
feed_data
(
b'chunk1'
)
stream
.
feed_data
(
b'chunk2
\
n
'
)
stream
.
feed_data
(
b'chunk2
\
n
'
)
...
@@ -323,7 +340,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -323,7 +340,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b'chunk3
\
n
'
,
stream
.
_buffer
)
self
.
assertEqual
(
b'chunk3
\
n
'
,
stream
.
_buffer
)
# check strictness of the limit
# check strictness of the limit
stream
=
asyncio
.
StreamReader
(
limit
=
7
,
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
limit
=
7
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'1234567
\
n
'
)
stream
.
feed_data
(
b'1234567
\
n
'
)
line
=
self
.
loop
.
run_until_complete
(
stream
.
readline
())
line
=
self
.
loop
.
run_until_complete
(
stream
.
readline
())
self
.
assertEqual
(
b'1234567
\
n
'
,
line
)
self
.
assertEqual
(
b'1234567
\
n
'
,
line
)
...
@@ -342,7 +360,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -342,7 +360,8 @@ class StreamTests(test_utils.TestCase):
def
test_readline_nolimit_nowait
(
self
):
def
test_readline_nolimit_nowait
(
self
):
# All needed data for the first 'readline' call will be
# All needed data for the first 'readline' call will be
# in the buffer.
# in the buffer.
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
self
.
DATA
[:
6
])
stream
.
feed_data
(
self
.
DATA
[:
6
])
stream
.
feed_data
(
self
.
DATA
[
6
:])
stream
.
feed_data
(
self
.
DATA
[
6
:])
...
@@ -352,7 +371,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -352,7 +371,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b'line2
\
n
line3
\
n
'
,
stream
.
_buffer
)
self
.
assertEqual
(
b'line2
\
n
line3
\
n
'
,
stream
.
_buffer
)
def
test_readline_eof
(
self
):
def
test_readline_eof
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'some data'
)
stream
.
feed_data
(
b'some data'
)
stream
.
feed_eof
()
stream
.
feed_eof
()
...
@@ -360,14 +380,16 @@ class StreamTests(test_utils.TestCase):
...
@@ -360,14 +380,16 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b'some data'
,
line
)
self
.
assertEqual
(
b'some data'
,
line
)
def
test_readline_empty_eof
(
self
):
def
test_readline_empty_eof
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_eof
()
stream
.
feed_eof
()
line
=
self
.
loop
.
run_until_complete
(
stream
.
readline
())
line
=
self
.
loop
.
run_until_complete
(
stream
.
readline
())
self
.
assertEqual
(
b''
,
line
)
self
.
assertEqual
(
b''
,
line
)
def
test_readline_read_byte_count
(
self
):
def
test_readline_read_byte_count
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
self
.
DATA
)
stream
.
feed_data
(
self
.
DATA
)
self
.
loop
.
run_until_complete
(
stream
.
readline
())
self
.
loop
.
run_until_complete
(
stream
.
readline
())
...
@@ -378,7 +400,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -378,7 +400,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b'ine3
\
n
'
,
stream
.
_buffer
)
self
.
assertEqual
(
b'ine3
\
n
'
,
stream
.
_buffer
)
def
test_readline_exception
(
self
):
def
test_readline_exception
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'line
\
n
'
)
stream
.
feed_data
(
b'line
\
n
'
)
data
=
self
.
loop
.
run_until_complete
(
stream
.
readline
())
data
=
self
.
loop
.
run_until_complete
(
stream
.
readline
())
...
@@ -390,12 +413,14 @@ class StreamTests(test_utils.TestCase):
...
@@ -390,12 +413,14 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
def
test_readuntil_separator
(
self
):
def
test_readuntil_separator
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
with
self
.
assertRaisesRegex
(
ValueError
,
'Separator should be'
):
with
self
.
assertRaisesRegex
(
ValueError
,
'Separator should be'
):
self
.
loop
.
run_until_complete
(
stream
.
readuntil
(
separator
=
b''
))
self
.
loop
.
run_until_complete
(
stream
.
readuntil
(
separator
=
b''
))
def
test_readuntil_multi_chunks
(
self
):
def
test_readuntil_multi_chunks
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'lineAAA'
)
stream
.
feed_data
(
b'lineAAA'
)
data
=
self
.
loop
.
run_until_complete
(
stream
.
readuntil
(
separator
=
b'AAA'
))
data
=
self
.
loop
.
run_until_complete
(
stream
.
readuntil
(
separator
=
b'AAA'
))
...
@@ -413,7 +438,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -413,7 +438,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b'xxx'
,
stream
.
_buffer
)
self
.
assertEqual
(
b'xxx'
,
stream
.
_buffer
)
def
test_readuntil_multi_chunks_1
(
self
):
def
test_readuntil_multi_chunks_1
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'QWEaa'
)
stream
.
feed_data
(
b'QWEaa'
)
stream
.
feed_data
(
b'XYaa'
)
stream
.
feed_data
(
b'XYaa'
)
...
@@ -448,7 +474,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -448,7 +474,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
def
test_readuntil_eof
(
self
):
def
test_readuntil_eof
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'some dataAA'
)
stream
.
feed_data
(
b'some dataAA'
)
stream
.
feed_eof
()
stream
.
feed_eof
()
...
@@ -459,7 +486,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -459,7 +486,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
def
test_readuntil_limit_found_sep
(
self
):
def
test_readuntil_limit_found_sep
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
limit
=
3
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
limit
=
3
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'some dataAA'
)
stream
.
feed_data
(
b'some dataAA'
)
with
self
.
assertRaisesRegex
(
asyncio
.
LimitOverrunError
,
with
self
.
assertRaisesRegex
(
asyncio
.
LimitOverrunError
,
...
@@ -477,7 +505,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -477,7 +505,8 @@ class StreamTests(test_utils.TestCase):
def
test_readexactly_zero_or_less
(
self
):
def
test_readexactly_zero_or_less
(
self
):
# Read exact number of bytes (zero or less).
# Read exact number of bytes (zero or less).
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
self
.
DATA
)
stream
.
feed_data
(
self
.
DATA
)
data
=
self
.
loop
.
run_until_complete
(
stream
.
readexactly
(
0
))
data
=
self
.
loop
.
run_until_complete
(
stream
.
readexactly
(
0
))
...
@@ -490,7 +519,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -490,7 +519,8 @@ class StreamTests(test_utils.TestCase):
def
test_readexactly
(
self
):
def
test_readexactly
(
self
):
# Read exact number of bytes.
# Read exact number of bytes.
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
n
=
2
*
len
(
self
.
DATA
)
n
=
2
*
len
(
self
.
DATA
)
read_task
=
asyncio
.
Task
(
stream
.
readexactly
(
n
),
loop
=
self
.
loop
)
read_task
=
asyncio
.
Task
(
stream
.
readexactly
(
n
),
loop
=
self
.
loop
)
...
@@ -506,7 +536,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -506,7 +536,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
self
.
DATA
,
stream
.
_buffer
)
self
.
assertEqual
(
self
.
DATA
,
stream
.
_buffer
)
def
test_readexactly_limit
(
self
):
def
test_readexactly_limit
(
self
):
stream
=
asyncio
.
StreamReader
(
limit
=
3
,
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
limit
=
3
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'chunk'
)
stream
.
feed_data
(
b'chunk'
)
data
=
self
.
loop
.
run_until_complete
(
stream
.
readexactly
(
5
))
data
=
self
.
loop
.
run_until_complete
(
stream
.
readexactly
(
5
))
self
.
assertEqual
(
b'chunk'
,
data
)
self
.
assertEqual
(
b'chunk'
,
data
)
...
@@ -514,7 +545,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -514,7 +545,8 @@ class StreamTests(test_utils.TestCase):
def
test_readexactly_eof
(
self
):
def
test_readexactly_eof
(
self
):
# Read exact number of bytes (eof).
# Read exact number of bytes (eof).
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
n
=
2
*
len
(
self
.
DATA
)
n
=
2
*
len
(
self
.
DATA
)
read_task
=
asyncio
.
Task
(
stream
.
readexactly
(
n
),
loop
=
self
.
loop
)
read_task
=
asyncio
.
Task
(
stream
.
readexactly
(
n
),
loop
=
self
.
loop
)
...
@@ -532,7 +564,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -532,7 +564,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
self
.
assertEqual
(
b''
,
stream
.
_buffer
)
def
test_readexactly_exception
(
self
):
def
test_readexactly_exception
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'line
\
n
'
)
stream
.
feed_data
(
b'line
\
n
'
)
data
=
self
.
loop
.
run_until_complete
(
stream
.
readexactly
(
2
))
data
=
self
.
loop
.
run_until_complete
(
stream
.
readexactly
(
2
))
...
@@ -543,7 +576,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -543,7 +576,8 @@ class StreamTests(test_utils.TestCase):
ValueError
,
self
.
loop
.
run_until_complete
,
stream
.
readexactly
(
2
))
ValueError
,
self
.
loop
.
run_until_complete
,
stream
.
readexactly
(
2
))
def
test_exception
(
self
):
def
test_exception
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
self
.
assertIsNone
(
stream
.
exception
())
self
.
assertIsNone
(
stream
.
exception
())
exc
=
ValueError
()
exc
=
ValueError
()
...
@@ -551,7 +585,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -551,7 +585,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertIs
(
stream
.
exception
(),
exc
)
self
.
assertIs
(
stream
.
exception
(),
exc
)
def
test_exception_waiter
(
self
):
def
test_exception_waiter
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
@
asyncio
.
coroutine
@
asyncio
.
coroutine
def
set_err
():
def
set_err
():
...
@@ -565,7 +600,8 @@ class StreamTests(test_utils.TestCase):
...
@@ -565,7 +600,8 @@ class StreamTests(test_utils.TestCase):
self
.
assertRaises
(
ValueError
,
t1
.
result
)
self
.
assertRaises
(
ValueError
,
t1
.
result
)
def
test_exception_cancel
(
self
):
def
test_exception_cancel
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
t
=
asyncio
.
Task
(
stream
.
readline
(),
loop
=
self
.
loop
)
t
=
asyncio
.
Task
(
stream
.
readline
(),
loop
=
self
.
loop
)
test_utils
.
run_briefly
(
self
.
loop
)
test_utils
.
run_briefly
(
self
.
loop
)
...
@@ -742,8 +778,10 @@ os.close(fd)
...
@@ -742,8 +778,10 @@ os.close(fd)
args
=
[
sys
.
executable
,
'-c'
,
code
,
str
(
wfd
)]
args
=
[
sys
.
executable
,
'-c'
,
code
,
str
(
wfd
)]
pipe
=
open
(
rfd
,
'rb'
,
0
)
pipe
=
open
(
rfd
,
'rb'
,
0
)
reader
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
limit
=
1
)
reader
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
limit
=
1
,
protocol
=
asyncio
.
StreamReaderProtocol
(
reader
,
loop
=
self
.
loop
)
_asyncio_internal
=
True
)
protocol
=
asyncio
.
StreamReaderProtocol
(
reader
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
transport
,
_
=
self
.
loop
.
run_until_complete
(
transport
,
_
=
self
.
loop
.
run_until_complete
(
self
.
loop
.
connect_read_pipe
(
lambda
:
protocol
,
pipe
))
self
.
loop
.
connect_read_pipe
(
lambda
:
protocol
,
pipe
))
...
@@ -769,7 +807,7 @@ os.close(fd)
...
@@ -769,7 +807,7 @@ os.close(fd)
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
# retrieves the current loop if the loop parameter is not set
reader
=
asyncio
.
StreamReader
()
reader
=
asyncio
.
StreamReader
(
_asyncio_internal
=
True
)
self
.
assertIs
(
reader
.
_loop
,
self
.
loop
)
self
.
assertIs
(
reader
.
_loop
,
self
.
loop
)
def
test_streamreaderprotocol_constructor
(
self
):
def
test_streamreaderprotocol_constructor
(
self
):
...
@@ -779,7 +817,7 @@ os.close(fd)
...
@@ -779,7 +817,7 @@ os.close(fd)
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# asyncio issue #184: Ensure that StreamReaderProtocol constructor
# retrieves the current loop if the loop parameter is not set
# retrieves the current loop if the loop parameter is not set
reader
=
mock
.
Mock
()
reader
=
mock
.
Mock
()
protocol
=
asyncio
.
StreamReaderProtocol
(
reader
)
protocol
=
asyncio
.
StreamReaderProtocol
(
reader
,
_asyncio_internal
=
True
)
self
.
assertIs
(
protocol
.
_loop
,
self
.
loop
)
self
.
assertIs
(
protocol
.
_loop
,
self
.
loop
)
def
test_drain_raises
(
self
):
def
test_drain_raises
(
self
):
...
@@ -824,32 +862,38 @@ os.close(fd)
...
@@ -824,32 +862,38 @@ os.close(fd)
thread
.
join
()
thread
.
join
()
def
test___repr__
(
self
):
def
test___repr__
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
self
.
assertEqual
(
"<StreamReader>"
,
repr
(
stream
))
self
.
assertEqual
(
"<StreamReader>"
,
repr
(
stream
))
def
test___repr__nondefault_limit
(
self
):
def
test___repr__nondefault_limit
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
limit
=
123
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
limit
=
123
,
_asyncio_internal
=
True
)
self
.
assertEqual
(
"<StreamReader limit=123>"
,
repr
(
stream
))
self
.
assertEqual
(
"<StreamReader limit=123>"
,
repr
(
stream
))
def
test___repr__eof
(
self
):
def
test___repr__eof
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_eof
()
stream
.
feed_eof
()
self
.
assertEqual
(
"<StreamReader eof>"
,
repr
(
stream
))
self
.
assertEqual
(
"<StreamReader eof>"
,
repr
(
stream
))
def
test___repr__data
(
self
):
def
test___repr__data
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
feed_data
(
b'data'
)
stream
.
feed_data
(
b'data'
)
self
.
assertEqual
(
"<StreamReader 4 bytes>"
,
repr
(
stream
))
self
.
assertEqual
(
"<StreamReader 4 bytes>"
,
repr
(
stream
))
def
test___repr__exception
(
self
):
def
test___repr__exception
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
exc
=
RuntimeError
()
exc
=
RuntimeError
()
stream
.
set_exception
(
exc
)
stream
.
set_exception
(
exc
)
self
.
assertEqual
(
"<StreamReader exception=RuntimeError()>"
,
self
.
assertEqual
(
"<StreamReader exception=RuntimeError()>"
,
repr
(
stream
))
repr
(
stream
))
def
test___repr__waiter
(
self
):
def
test___repr__waiter
(
self
):
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
)
stream
=
asyncio
.
StreamReader
(
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
stream
.
_waiter
=
asyncio
.
Future
(
loop
=
self
.
loop
)
stream
.
_waiter
=
asyncio
.
Future
(
loop
=
self
.
loop
)
self
.
assertRegex
(
self
.
assertRegex
(
repr
(
stream
),
repr
(
stream
),
...
@@ -860,7 +904,8 @@ os.close(fd)
...
@@ -860,7 +904,8 @@ os.close(fd)
self.assertEqual("
<
StreamReader
>
", repr(stream))
self.assertEqual("
<
StreamReader
>
", repr(stream))
def test___repr__transport(self):
def test___repr__transport(self):
stream = asyncio.StreamReader(loop=self.loop)
stream = asyncio.StreamReader(loop=self.loop,
_asyncio_internal=True)
stream._transport = mock.Mock()
stream._transport = mock.Mock()
stream._transport.__repr__ = mock.Mock()
stream._transport.__repr__ = mock.Mock()
stream._transport.__repr__.return_value = "
<
Transport
>
"
stream._transport.__repr__.return_value = "
<
Transport
>
"
...
@@ -947,8 +992,10 @@ os.close(fd)
...
@@ -947,8 +992,10 @@ os.close(fd)
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
with test_utils.run_test_server() as httpd:
with test_utils.run_test_server() as httpd:
rd = asyncio.StreamReader(loop=self.loop)
rd = asyncio.StreamReader(loop=self.loop,
pr = asyncio.StreamReaderProtocol(rd, loop=self.loop)
_asyncio_internal=True)
pr = asyncio.StreamReaderProtocol(rd, loop=self.loop,
_asyncio_internal=True)
del rd
del rd
gc.collect()
gc.collect()
tr, _ = self.loop.run_until_complete(
tr, _ = self.loop.run_until_complete(
...
@@ -1005,6 +1052,25 @@ os.close(fd)
...
@@ -1005,6 +1052,25 @@ os.close(fd)
self.assertEqual(messages, [])
self.assertEqual(messages, [])
def test_stream_reader_create_warning(self):
with self.assertWarns(DeprecationWarning):
asyncio.StreamReader(loop=self.loop)
def test_stream_reader_protocol_create_warning(self):
reader = asyncio.StreamReader(loop=self.loop,
_asyncio_internal=True)
with self.assertWarns(DeprecationWarning):
asyncio.StreamReaderProtocol(reader, loop=self.loop)
def test_stream_writer_create_warning(self):
reader = asyncio.StreamReader(loop=self.loop,
_asyncio_internal=True)
proto = asyncio.StreamReaderProtocol(reader, loop=self.loop,
_asyncio_internal=True)
with self.assertWarns(DeprecationWarning):
asyncio.StreamWriter('transport', proto, reader, self.loop)
if __name__ == '__main__':
if __name__ == '__main__':
unittest.main()
unittest.main()
Lib/test/test_asyncio/test_subprocess.py
View file @
ad4ed872
...
@@ -510,6 +510,18 @@ class SubprocessMixin:
...
@@ -510,6 +510,18 @@ class SubprocessMixin:
self
.
loop
.
run_until_complete
(
execute
())
self
.
loop
.
run_until_complete
(
execute
())
def
test_subprocess_protocol_create_warning
(
self
):
with
self
.
assertWarns
(
DeprecationWarning
):
subprocess
.
SubprocessStreamProtocol
(
limit
=
10
,
loop
=
self
.
loop
)
def
test_process_create_warning
(
self
):
proto
=
subprocess
.
SubprocessStreamProtocol
(
limit
=
10
,
loop
=
self
.
loop
,
_asyncio_internal
=
True
)
transp
=
mock
.
Mock
()
with
self
.
assertWarns
(
DeprecationWarning
):
subprocess
.
Process
(
transp
,
proto
,
loop
=
self
.
loop
)
if
sys
.
platform
!=
'win32'
:
if
sys
.
platform
!=
'win32'
:
# Unix
# Unix
...
...
Misc/NEWS.d/next/Library/2019-05-05-16-14-38.bpo-36806.rAzF-x.rst
0 → 100644
View file @
ad4ed872
Forbid creation of asyncio stream objects like StreamReader, StreamWriter,
Process, and their protocols outside of asyncio package.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment