Commit 6758e6e1 authored by Yury Selivanov's avatar Yury Selivanov Committed by GitHub

bpo-38242: Revert "bpo-36889: Merge asyncio streams (GH-13251)" (#16482)

See https://bugs.python.org/issue38242 for more details
parent 3667e1ee
...@@ -132,47 +132,23 @@ High-level APIs to work with network IO. ...@@ -132,47 +132,23 @@ High-level APIs to work with network IO.
:widths: 50 50 :widths: 50 50
:class: full-width-table :class: full-width-table
* - ``await`` :func:`connect`
- Establish a TCP connection to send and receive data.
* - ``await`` :func:`open_connection` * - ``await`` :func:`open_connection`
- Establish a TCP connection. (Deprecated in favor of :func:`connect`) - Establish a TCP connection.
* - ``await`` :func:`connect_unix`
- Establish a Unix socket connection to send and receive data.
* - ``await`` :func:`open_unix_connection` * - ``await`` :func:`open_unix_connection`
- Establish a Unix socket connection. (Deprecated in favor of :func:`connect_unix`) - Establish a Unix socket connection.
* - :class:`StreamServer`
- Start a TCP server.
* - ``await`` :func:`start_server` * - ``await`` :func:`start_server`
- Start a TCP server. (Deprecated in favor of :class:`StreamServer`) - Start a TCP server.
* - :class:`UnixStreamServer`
- Start a Unix socket server.
* - ``await`` :func:`start_unix_server` * - ``await`` :func:`start_unix_server`
- Start a Unix socket server. (Deprecated in favor of :class:`UnixStreamServer`) - Start a Unix socket server.
* - :func:`connect_read_pipe`
- Establish a connection to :term:`file-like object <file object>` *pipe*
to receive data.
* - :func:`connect_write_pipe`
- Establish a connection to :term:`file-like object <file object>` *pipe*
to send data.
* - :class:`Stream`
- Stream is a single object combining APIs of :class:`StreamReader` and
:class:`StreamWriter`.
* - :class:`StreamReader` * - :class:`StreamReader`
- High-level async/await object to receive network data. (Deprecated in favor of :class:`Stream`) - High-level async/await object to receive network data.
* - :class:`StreamWriter` * - :class:`StreamWriter`
- High-level async/await object to send network data. (Deprecated in favor of :class:`Stream`) - High-level async/await object to send network data.
.. rubric:: Examples .. rubric:: Examples
......
...@@ -1637,7 +1637,8 @@ Wait until a file descriptor received some data using the ...@@ -1637,7 +1637,8 @@ Wait until a file descriptor received some data using the
:meth:`loop.create_connection` method. :meth:`loop.create_connection` method.
* Another similar :ref:`example <asyncio_example_create_connection-streams>` * Another similar :ref:`example <asyncio_example_create_connection-streams>`
using the high-level :func:`asyncio.connect` function and streams. using the high-level :func:`asyncio.open_connection` function
and streams.
.. _asyncio_example_unix_signals: .. _asyncio_example_unix_signals:
......
...@@ -809,7 +809,7 @@ data, and waits until the connection is closed:: ...@@ -809,7 +809,7 @@ data, and waits until the connection is closed::
.. seealso:: .. seealso::
The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>` The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
example uses the high-level :func:`asyncio.connect` function. example uses the high-level :func:`asyncio.open_connection` function.
.. _asyncio-udp-echo-server-protocol: .. _asyncio-udp-echo-server-protocol:
...@@ -978,7 +978,7 @@ Wait until a socket receives data using the ...@@ -978,7 +978,7 @@ Wait until a socket receives data using the
The :ref:`register an open socket to wait for data using streams The :ref:`register an open socket to wait for data using streams
<asyncio_example_create_connection-streams>` example uses high-level streams <asyncio_example_create_connection-streams>` example uses high-level streams
created by the :func:`asyncio.connect` function in a coroutine. created by the :func:`open_connection` function in a coroutine.
.. _asyncio_example_subprocess_proto: .. _asyncio_example_subprocess_proto:
......
This diff is collapsed.
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
# flake8: noqa # flake8: noqa
import sys import sys
import warnings
# This relies on each of the submodules having an __all__ variable. # This relies on each of the submodules having an __all__ variable.
from .base_events import * from .base_events import *
...@@ -44,40 +43,3 @@ if sys.platform == 'win32': # pragma: no cover ...@@ -44,40 +43,3 @@ if sys.platform == 'win32': # pragma: no cover
else: else:
from .unix_events import * # pragma: no cover from .unix_events import * # pragma: no cover
__all__ += unix_events.__all__ __all__ += unix_events.__all__
__all__ += ('StreamReader', 'StreamWriter', 'StreamReaderProtocol') # deprecated
def __getattr__(name):
global StreamReader, StreamWriter, StreamReaderProtocol
if name == 'StreamReader':
warnings.warn("StreamReader is deprecated since Python 3.8 "
"in favor of Stream, and scheduled for removal "
"in Python 3.10",
DeprecationWarning,
stacklevel=2)
from .streams import StreamReader as sr
StreamReader = sr
return StreamReader
if name == 'StreamWriter':
warnings.warn("StreamWriter is deprecated since Python 3.8 "
"in favor of Stream, and scheduled for removal "
"in Python 3.10",
DeprecationWarning,
stacklevel=2)
from .streams import StreamWriter as sw
StreamWriter = sw
return StreamWriter
if name == 'StreamReaderProtocol':
warnings.warn("Using asyncio internal class StreamReaderProtocol "
"is deprecated since Python 3.8 "
" and scheduled for removal "
"in Python 3.10",
DeprecationWarning,
stacklevel=2)
from .streams import StreamReaderProtocol as srp
StreamReaderProtocol = srp
return StreamReaderProtocol
raise AttributeError(f"module {__name__} has no attribute {name}")
This diff is collapsed.
...@@ -19,16 +19,14 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, ...@@ -19,16 +19,14 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
protocols.SubprocessProtocol): protocols.SubprocessProtocol):
"""Like StreamReaderProtocol, but for a subprocess.""" """Like StreamReaderProtocol, but for a subprocess."""
def __init__(self, limit, loop, *, _asyncio_internal=False): def __init__(self, limit, loop):
super().__init__(loop=loop, _asyncio_internal=_asyncio_internal) super().__init__(loop=loop)
self._limit = limit self._limit = limit
self.stdin = self.stdout = self.stderr = None self.stdin = self.stdout = self.stderr = None
self._transport = None self._transport = None
self._process_exited = False self._process_exited = False
self._pipe_fds = [] self._pipe_fds = []
self._stdin_closed = self._loop.create_future() self._stdin_closed = self._loop.create_future()
self._stdout_closed = self._loop.create_future()
self._stderr_closed = self._loop.create_future()
def __repr__(self): def __repr__(self):
info = [self.__class__.__name__] info = [self.__class__.__name__]
...@@ -42,35 +40,27 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, ...@@ -42,35 +40,27 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
def connection_made(self, transport): def connection_made(self, transport):
self._transport = transport self._transport = transport
stdout_transport = transport.get_pipe_transport(1) stdout_transport = transport.get_pipe_transport(1)
if stdout_transport is not None: if stdout_transport is not None:
self.stdout = streams.Stream(mode=streams.StreamMode.READ, self.stdout = streams.StreamReader(limit=self._limit,
transport=stdout_transport, loop=self._loop)
protocol=self, self.stdout.set_transport(stdout_transport)
limit=self._limit,
loop=self._loop,
_asyncio_internal=True)
self.stdout._set_transport(stdout_transport)
self._pipe_fds.append(1) self._pipe_fds.append(1)
stderr_transport = transport.get_pipe_transport(2) stderr_transport = transport.get_pipe_transport(2)
if stderr_transport is not None: if stderr_transport is not None:
self.stderr = streams.Stream(mode=streams.StreamMode.READ, self.stderr = streams.StreamReader(limit=self._limit,
transport=stderr_transport, loop=self._loop)
protocol=self, self.stderr.set_transport(stderr_transport)
limit=self._limit,
loop=self._loop,
_asyncio_internal=True)
self.stderr._set_transport(stderr_transport)
self._pipe_fds.append(2) self._pipe_fds.append(2)
stdin_transport = transport.get_pipe_transport(0) stdin_transport = transport.get_pipe_transport(0)
if stdin_transport is not None: if stdin_transport is not None:
self.stdin = streams.Stream(mode=streams.StreamMode.WRITE, self.stdin = streams.StreamWriter(stdin_transport,
transport=stdin_transport, protocol=self,
protocol=self, reader=None,
loop=self._loop, loop=self._loop)
_asyncio_internal=True)
def pipe_data_received(self, fd, data): def pipe_data_received(self, fd, data):
if fd == 1: if fd == 1:
...@@ -80,7 +70,7 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, ...@@ -80,7 +70,7 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
else: else:
reader = None reader = None
if reader is not None: if reader is not None:
reader._feed_data(data) reader.feed_data(data)
def pipe_connection_lost(self, fd, exc): def pipe_connection_lost(self, fd, exc):
if fd == 0: if fd == 0:
...@@ -101,9 +91,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, ...@@ -101,9 +91,9 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
reader = None reader = None
if reader is not None: if reader is not None:
if exc is None: if exc is None:
reader._feed_eof() reader.feed_eof()
else: else:
reader._set_exception(exc) reader.set_exception(exc)
if fd in self._pipe_fds: if fd in self._pipe_fds:
self._pipe_fds.remove(fd) self._pipe_fds.remove(fd)
...@@ -121,20 +111,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, ...@@ -121,20 +111,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
def _get_close_waiter(self, stream): def _get_close_waiter(self, stream):
if stream is self.stdin: if stream is self.stdin:
return self._stdin_closed return self._stdin_closed
elif stream is self.stdout:
return self._stdout_closed
elif stream is self.stderr:
return self._stderr_closed
class Process: class Process:
def __init__(self, transport, protocol, loop, *, _asyncio_internal=False): def __init__(self, transport, protocol, loop):
if not _asyncio_internal:
warnings.warn(f"{self.__class__} should be instantiated "
"by asyncio internals only, "
"please avoid its creation from user code",
DeprecationWarning)
self._transport = transport self._transport = transport
self._protocol = protocol self._protocol = protocol
self._loop = loop self._loop = loop
...@@ -232,13 +212,12 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None, ...@@ -232,13 +212,12 @@ async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
) )
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop, loop=loop)
_asyncio_internal=True)
transport, protocol = await loop.subprocess_shell( transport, protocol = await loop.subprocess_shell(
protocol_factory, protocol_factory,
cmd, stdin=stdin, stdout=stdout, cmd, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds) stderr=stderr, **kwds)
return Process(transport, protocol, loop, _asyncio_internal=True) return Process(transport, protocol, loop)
async def create_subprocess_exec(program, *args, stdin=None, stdout=None, async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
...@@ -253,11 +232,10 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None, ...@@ -253,11 +232,10 @@ async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
stacklevel=2 stacklevel=2
) )
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop, loop=loop)
_asyncio_internal=True)
transport, protocol = await loop.subprocess_exec( transport, protocol = await loop.subprocess_exec(
protocol_factory, protocol_factory,
program, *args, program, *args,
stdin=stdin, stdout=stdout, stdin=stdin, stdout=stdout,
stderr=stderr, **kwds) stderr=stderr, **kwds)
return Process(transport, protocol, loop, _asyncio_internal=True) return Process(transport, protocol, loop)
...@@ -58,10 +58,9 @@ class BaseTestBufferedProtocol(func_tests.FunctionalTestCaseMixin): ...@@ -58,10 +58,9 @@ class BaseTestBufferedProtocol(func_tests.FunctionalTestCaseMixin):
writer.close() writer.close()
await writer.wait_closed() await writer.wait_closed()
with self.assertWarns(DeprecationWarning): srv = self.loop.run_until_complete(
srv = self.loop.run_until_complete( asyncio.start_server(
asyncio.start_server( on_server_client, '127.0.0.1', 0))
on_server_client, '127.0.0.1', 0))
addr = srv.sockets[0].getsockname() addr = srv.sockets[0].getsockname()
self.loop.run_until_complete( self.loop.run_until_complete(
......
...@@ -95,11 +95,9 @@ class StreamReaderTests(BaseTest): ...@@ -95,11 +95,9 @@ class StreamReaderTests(BaseTest):
def test_readline(self): def test_readline(self):
DATA = b'line1\nline2\nline3' DATA = b'line1\nline2\nline3'
stream = asyncio.Stream(mode=asyncio.StreamMode.READ, stream = asyncio.StreamReader(loop=self.loop)
loop=self.loop, stream.feed_data(DATA)
_asyncio_internal=True) stream.feed_eof()
stream._feed_data(DATA)
stream._feed_eof()
async def reader(): async def reader():
data = [] data = []
......
This diff is collapsed.
...@@ -582,18 +582,6 @@ class SubprocessMixin: ...@@ -582,18 +582,6 @@ class SubprocessMixin:
self.loop.run_until_complete(execute()) self.loop.run_until_complete(execute())
def test_subprocess_protocol_create_warning(self):
with self.assertWarns(DeprecationWarning):
subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop)
def test_process_create_warning(self):
proto = subprocess.SubprocessStreamProtocol(limit=10, loop=self.loop,
_asyncio_internal=True)
transp = mock.Mock()
with self.assertWarns(DeprecationWarning):
subprocess.Process(transp, proto, loop=self.loop)
def test_create_subprocess_exec_text_mode_fails(self): def test_create_subprocess_exec_text_mode_fails(self):
async def execute(): async def execute():
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
......
...@@ -15,7 +15,6 @@ import _winapi ...@@ -15,7 +15,6 @@ import _winapi
import asyncio import asyncio
from asyncio import windows_events from asyncio import windows_events
from asyncio.streams import _StreamProtocol
from test.test_asyncio import utils as test_utils from test.test_asyncio import utils as test_utils
...@@ -118,16 +117,14 @@ class ProactorTests(test_utils.TestCase): ...@@ -118,16 +117,14 @@ class ProactorTests(test_utils.TestCase):
clients = [] clients = []
for i in range(5): for i in range(5):
stream = asyncio.Stream(mode=asyncio.StreamMode.READ, stream_reader = asyncio.StreamReader(loop=self.loop)
loop=self.loop, _asyncio_internal=True) protocol = asyncio.StreamReaderProtocol(stream_reader,
protocol = _StreamProtocol(stream, loop=self.loop)
loop=self.loop,
_asyncio_internal=True)
trans, proto = await self.loop.create_pipe_connection( trans, proto = await self.loop.create_pipe_connection(
lambda: protocol, ADDRESS) lambda: protocol, ADDRESS)
self.assertIsInstance(trans, asyncio.Transport) self.assertIsInstance(trans, asyncio.Transport)
self.assertEqual(protocol, proto) self.assertEqual(protocol, proto)
clients.append((stream, trans)) clients.append((stream_reader, trans))
for i, (r, w) in enumerate(clients): for i, (r, w) in enumerate(clients):
w.write('lower-{}\n'.format(i).encode()) w.write('lower-{}\n'.format(i).encode())
...@@ -136,7 +133,6 @@ class ProactorTests(test_utils.TestCase): ...@@ -136,7 +133,6 @@ class ProactorTests(test_utils.TestCase):
response = await r.readline() response = await r.readline()
self.assertEqual(response, 'LOWER-{}\n'.format(i).encode()) self.assertEqual(response, 'LOWER-{}\n'.format(i).encode())
w.close() w.close()
await r.close()
server.close() server.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment