Commit a076e4f5 authored by Andrew Svetlov's avatar Andrew Svetlov Committed by GitHub

bpo-36802: Drop awrite()/aclose(), support await write() and await close() instead (#13099)

parent 3b2f9ab3
...@@ -22,13 +22,13 @@ streams:: ...@@ -22,13 +22,13 @@ streams::
'127.0.0.1', 8888) '127.0.0.1', 8888)
print(f'Send: {message!r}') print(f'Send: {message!r}')
await writer.awrite(message.encode()) await writer.write(message.encode())
data = await reader.read(100) data = await reader.read(100)
print(f'Received: {data.decode()!r}') print(f'Received: {data.decode()!r}')
print('Close the connection') print('Close the connection')
await writer.aclose() await writer.close()
asyncio.run(tcp_echo_client('Hello World!')) asyncio.run(tcp_echo_client('Hello World!'))
...@@ -226,23 +226,70 @@ StreamWriter ...@@ -226,23 +226,70 @@ StreamWriter
directly; use :func:`open_connection` and :func:`start_server` directly; use :func:`open_connection` and :func:`start_server`
instead. instead.
.. coroutinemethod:: awrite(data) .. method:: write(data)
The method attempts to write the *data* to the underlying socket immediately.
If that fails, the data is queued in an internal write buffer until it can be
sent.
Starting with Python 3.8, it is possible to directly await on the `write()`
method::
await stream.write(data)
The ``await`` pauses the current coroutine until the data is written to the
socket.
Below is an equivalent code that works with Python <= 3.7::
stream.write(data)
await stream.drain()
.. versionchanged:: 3.8
Support ``await stream.write(...)`` syntax.
.. method:: writelines(data)
The method writes a list (or any iterable) of bytes to the underlying socket
immediately.
If that fails, the data is queued in an internal write buffer until it can be
sent.
Starting with Python 3.8, it is possible to directly await on the `write()`
method::
await stream.writelines(lines)
The ``await`` pauses the current coroutine until the data is written to the
socket.
Below is an equivalent code that works with Python <= 3.7::
Write *data* to the stream. stream.writelines(lines)
await stream.drain()
The method respects flow control, execution is paused if the write .. versionchanged:: 3.8
buffer reaches the high watermark. Support ``await stream.writelines()`` syntax.
.. versionadded:: 3.8 .. method:: close()
The method closes the stream and the underlying socket.
Starting with Python 3.8, it is possible to directly await on the `close()`
method::
await stream.close()
.. coroutinemethod:: aclose() The ``await`` pauses the current coroutine until the stream and the underlying
socket are closed (and SSL shutdown is performed for a secure connection).
Close the stream. Below is an equivalent code that works with Python <= 3.7::
Wait until all closing actions are complete, e.g. SSL shutdown for stream.close()
secure sockets. await stream.wait_closed()
.. versionadded:: 3.8 .. versionchanged:: 3.8
Support ``await stream.close()`` syntax.
.. method:: can_write_eof() .. method:: can_write_eof()
...@@ -263,21 +310,6 @@ StreamWriter ...@@ -263,21 +310,6 @@ StreamWriter
Access optional transport information; see Access optional transport information; see
:meth:`BaseTransport.get_extra_info` for details. :meth:`BaseTransport.get_extra_info` for details.
.. method:: write(data)
Write *data* to the stream.
This method is not subject to flow control. Calls to ``write()`` should
be followed by :meth:`drain`. The :meth:`awrite` method is a
recommended alternative the applies flow control automatically.
.. method:: writelines(data)
Write a list (or any iterable) of bytes to the stream.
This method is not subject to flow control. Calls to ``writelines()``
should be followed by :meth:`drain`.
.. coroutinemethod:: drain() .. coroutinemethod:: drain()
Wait until it is appropriate to resume writing to the stream. Wait until it is appropriate to resume writing to the stream.
...@@ -293,10 +325,6 @@ StreamWriter ...@@ -293,10 +325,6 @@ StreamWriter
be resumed. When there is nothing to wait for, the :meth:`drain` be resumed. When there is nothing to wait for, the :meth:`drain`
returns immediately. returns immediately.
.. method:: close()
Close the stream.
.. method:: is_closing() .. method:: is_closing()
Return ``True`` if the stream is closed or in the process of Return ``True`` if the stream is closed or in the process of
......
...@@ -352,6 +352,8 @@ class StreamWriter: ...@@ -352,6 +352,8 @@ class StreamWriter:
assert reader is None or isinstance(reader, StreamReader) assert reader is None or isinstance(reader, StreamReader)
self._reader = reader self._reader = reader
self._loop = loop self._loop = loop
self._complete_fut = self._loop.create_future()
self._complete_fut.set_result(None)
def __repr__(self): def __repr__(self):
info = [self.__class__.__name__, f'transport={self._transport!r}'] info = [self.__class__.__name__, f'transport={self._transport!r}']
...@@ -365,9 +367,33 @@ class StreamWriter: ...@@ -365,9 +367,33 @@ class StreamWriter:
def write(self, data): def write(self, data):
self._transport.write(data) self._transport.write(data)
return self._fast_drain()
def writelines(self, data): def writelines(self, data):
self._transport.writelines(data) self._transport.writelines(data)
return self._fast_drain()
def _fast_drain(self):
# The helper tries to use fast-path to return already existing complete future
# object if underlying transport is not paused and actual waiting for writing
# resume is not needed
if self._reader is not None:
# this branch will be simplified after merging reader with writer
exc = self._reader.exception()
if exc is not None:
fut = self._loop.create_future()
fut.set_exception(exc)
return fut
if not self._transport.is_closing():
if self._protocol._connection_lost:
fut = self._loop.create_future()
fut.set_exception(ConnectionResetError('Connection lost'))
return fut
if not self._protocol._paused:
# fast path, the stream is not paused
# no need to wait for resume signal
return self._complete_fut
return self._loop.create_task(self.drain())
def write_eof(self): def write_eof(self):
return self._transport.write_eof() return self._transport.write_eof()
...@@ -377,6 +403,7 @@ class StreamWriter: ...@@ -377,6 +403,7 @@ class StreamWriter:
def close(self): def close(self):
self._transport.close() self._transport.close()
return self._protocol._get_close_waiter(self)
def is_closing(self): def is_closing(self):
return self._transport.is_closing() return self._transport.is_closing()
...@@ -408,14 +435,6 @@ class StreamWriter: ...@@ -408,14 +435,6 @@ class StreamWriter:
raise ConnectionResetError('Connection lost') raise ConnectionResetError('Connection lost')
await self._protocol._drain_helper() await self._protocol._drain_helper()
async def aclose(self):
self.close()
await self.wait_closed()
async def awrite(self, data):
self.write(data)
await self.drain()
class StreamReader: class StreamReader:
......
...@@ -1035,24 +1035,42 @@ os.close(fd) ...@@ -1035,24 +1035,42 @@ os.close(fd)
messages[0]['message']) messages[0]['message'])
def test_async_writer_api(self): def test_async_writer_api(self):
async def inner(httpd):
rd, wr = await asyncio.open_connection(*httpd.address)
await wr.write(b'GET / HTTP/1.0\r\n\r\n')
data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
data = await rd.read()
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
await wr.close()
messages = [] messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
with test_utils.run_test_server() as httpd: with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete( self.loop.run_until_complete(inner(httpd))
asyncio.open_connection(*httpd.address,
loop=self.loop))
f = wr.awrite(b'GET / HTTP/1.0\r\n\r\n') self.assertEqual(messages, [])
self.loop.run_until_complete(f)
f = rd.readline() def test_async_writer_api(self):
data = self.loop.run_until_complete(f) async def inner(httpd):
rd, wr = await asyncio.open_connection(*httpd.address)
await wr.write(b'GET / HTTP/1.0\r\n\r\n')
data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n') self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
f = rd.read() data = await rd.read()
data = self.loop.run_until_complete(f)
self.assertTrue(data.endswith(b'\r\n\r\nTest message')) self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
f = wr.aclose() wr.close()
self.loop.run_until_complete(f) with self.assertRaises(ConnectionResetError):
await wr.write(b'data')
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
with test_utils.run_test_server() as httpd:
self.loop.run_until_complete(inner(httpd))
self.assertEqual(messages, []) self.assertEqual(messages, [])
...@@ -1066,7 +1084,7 @@ os.close(fd) ...@@ -1066,7 +1084,7 @@ os.close(fd)
asyncio.open_connection(*httpd.address, asyncio.open_connection(*httpd.address,
loop=self.loop)) loop=self.loop))
f = wr.aclose() f = wr.close()
self.loop.run_until_complete(f) self.loop.run_until_complete(f)
assert rd.at_eof() assert rd.at_eof()
f = rd.read() f = rd.read()
......
Provide both sync and async calls for StreamWriter.write() and
StreamWriter.close()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment