Commit 11194c87 authored by Andrew Svetlov's avatar Andrew Svetlov Committed by GitHub

bpo-34666: Implement stream.awrite() and stream.aclose() (GH-9274)

parent 413118eb
...@@ -20,13 +20,13 @@ streams:: ...@@ -20,13 +20,13 @@ streams::
'127.0.0.1', 8888) '127.0.0.1', 8888)
print(f'Send: {message!r}') print(f'Send: {message!r}')
writer.write(message.encode()) await writer.awrite(message.encode())
data = await reader.read(100) data = await reader.read(100)
print(f'Received: {data.decode()!r}') print(f'Received: {data.decode()!r}')
print('Close the connection') print('Close the connection')
writer.close() await writer.aclose()
asyncio.run(tcp_echo_client('Hello World!')) asyncio.run(tcp_echo_client('Hello World!'))
...@@ -229,14 +229,57 @@ StreamWriter ...@@ -229,14 +229,57 @@ StreamWriter
directly; use :func:`open_connection` and :func:`start_server` directly; use :func:`open_connection` and :func:`start_server`
instead. instead.
.. coroutinemethod:: awrite(data)
Write *data* to the stream.
The method respects control-flow, execution is paused if write
buffer reaches high-water limit.
.. versionadded:: 3.8
.. coroutinemethod:: aclose()
Close the stream.
Wait for finishing all closing actions, e.g. SSL shutdown for
secure sockets.
.. versionadded:: 3.8
.. method:: can_write_eof()
Return *True* if the underlying transport supports
the :meth:`write_eof` method, *False* otherwise.
.. method:: write_eof()
Close the write end of the stream after the buffered write
data is flushed.
.. attribute:: transport
Return the underlying asyncio transport.
.. method:: get_extra_info(name, default=None)
Access optional transport information; see
:meth:`BaseTransport.get_extra_info` for details.
.. method:: write(data) .. method:: write(data)
Write *data* to the stream. Write *data* to the stream.
This method doesn't apply control-flow. The call should be
followed by :meth:`drain`.
.. method:: writelines(data) .. method:: writelines(data)
Write a list (or any iterable) of bytes to the stream. Write a list (or any iterable) of bytes to the stream.
This method doesn't apply control-flow. The call should be
followed by :meth:`drain`.
.. coroutinemethod:: drain() .. coroutinemethod:: drain()
Wait until it is appropriate to resume writing to the stream. Wait until it is appropriate to resume writing to the stream.
...@@ -272,25 +315,6 @@ StreamWriter ...@@ -272,25 +315,6 @@ StreamWriter
.. versionadded:: 3.7 .. versionadded:: 3.7
.. method:: can_write_eof()
Return *True* if the underlying transport supports
the :meth:`write_eof` method, *False* otherwise.
.. method:: write_eof()
Close the write end of the stream after the buffered write
data is flushed.
.. attribute:: transport
Return the underlying asyncio transport.
.. method:: get_extra_info(name, default=None)
Access optional transport information; see
:meth:`BaseTransport.get_extra_info` for details.
Examples Examples
======== ========
......
...@@ -348,7 +348,7 @@ class StreamWriter: ...@@ -348,7 +348,7 @@ class StreamWriter:
# a reader can be garbage collected # a reader can be garbage collected
# after connection closing # after connection closing
self._protocol._untrack_reader() self._protocol._untrack_reader()
return self._transport.close() self._transport.close()
def is_closing(self): def is_closing(self):
return self._transport.is_closing() return self._transport.is_closing()
...@@ -381,6 +381,14 @@ class StreamWriter: ...@@ -381,6 +381,14 @@ class StreamWriter:
await sleep(0, loop=self._loop) await sleep(0, loop=self._loop)
await self._protocol._drain_helper() await self._protocol._drain_helper()
async def aclose(self):
self.close()
await self.wait_closed()
async def awrite(self, data):
self.write(data)
await self.drain()
class StreamReader: class StreamReader:
......
...@@ -964,6 +964,28 @@ os.close(fd) ...@@ -964,6 +964,28 @@ os.close(fd)
'call "stream.close()" explicitly.', 'call "stream.close()" explicitly.',
messages[0]['message']) messages[0]['message'])
def test_async_writer_api(self):
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address,
loop=self.loop))
f = wr.awrite(b'GET / HTTP/1.0\r\n\r\n')
self.loop.run_until_complete(f)
f = rd.readline()
data = self.loop.run_until_complete(f)
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
f = rd.read()
data = self.loop.run_until_complete(f)
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
f = wr.aclose()
self.loop.run_until_complete(f)
self.assertEqual(messages, [])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Implement ``asyncio.StreamWriter.awrite`` and
``asyncio.StreamWriter.aclose()`` coroutines. Methods are needed for
providing a consistent stream API with control flow switched on by default.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment