Commit 6793cce1 authored by Xtreak's avatar Xtreak Committed by Andrew Svetlov

bpo-36889: Document asyncio Stream and StreamServer (GH-14203)

parent 6ffd9b05
...@@ -1625,8 +1625,7 @@ Wait until a file descriptor received some data using the ...@@ -1625,8 +1625,7 @@ Wait until a file descriptor received some data using the
:meth:`loop.create_connection` method. :meth:`loop.create_connection` method.
* Another similar :ref:`example <asyncio_example_create_connection-streams>` * Another similar :ref:`example <asyncio_example_create_connection-streams>`
using the high-level :func:`asyncio.open_connection` function using the high-level :func:`asyncio.connect` function and streams.
and streams.
.. _asyncio_example_unix_signals: .. _asyncio_example_unix_signals:
......
...@@ -810,7 +810,7 @@ data, and waits until the connection is closed:: ...@@ -810,7 +810,7 @@ data, and waits until the connection is closed::
.. seealso:: .. seealso::
The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>` The :ref:`TCP echo client using streams <asyncio-tcp-echo-client-streams>`
example uses the high-level :func:`asyncio.open_connection` function. example uses the high-level :func:`asyncio.connect` function.
.. _asyncio-udp-echo-server-protocol: .. _asyncio-udp-echo-server-protocol:
...@@ -977,7 +977,7 @@ Wait until a socket receives data using the ...@@ -977,7 +977,7 @@ Wait until a socket receives data using the
The :ref:`register an open socket to wait for data using streams The :ref:`register an open socket to wait for data using streams
<asyncio_example_create_connection-streams>` example uses high-level streams <asyncio_example_create_connection-streams>` example uses high-level streams
created by the :func:`open_connection` function in a coroutine. created by the :func:`asyncio.connect` function in a coroutine.
.. _asyncio_example_subprocess_proto: .. _asyncio_example_subprocess_proto:
......
...@@ -18,17 +18,12 @@ streams:: ...@@ -18,17 +18,12 @@ streams::
import asyncio import asyncio
async def tcp_echo_client(message): async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection( async with asyncio.connect('127.0.0.1', 8888) as stream:
'127.0.0.1', 8888) print(f'Send: {message!r}')
await stream.write(message.encode())
print(f'Send: {message!r}') data = await stream.read(100)
await writer.write(message.encode()) print(f'Received: {data.decode()!r}')
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
await writer.close()
asyncio.run(tcp_echo_client('Hello World!')) asyncio.run(tcp_echo_client('Hello World!'))
...@@ -42,6 +37,32 @@ The following top-level asyncio functions can be used to create ...@@ -42,6 +37,32 @@ The following top-level asyncio functions can be used to create
and work with streams: and work with streams:
.. coroutinefunction:: connect(host=None, port=None, \*, \
limit=2**16, ssl=None, family=0, \
proto=0, flags=0, sock=None, local_addr=None, \
server_hostname=None, ssl_handshake_timeout=None, \
happy_eyeballs_delay=None, interleave=None)
Connect to TCP socket on *host* : *port* address and return a :class:`Stream`
object of mode :attr:`StreamMode.READWRITE`.
*limit* determines the buffer size limit used by the returned :class:`Stream`
instance. By default the *limit* is set to 64 KiB.
The rest of the arguments are passed directly to :meth:`loop.create_connection`.
The function can be used with ``await`` to get a connected stream::
stream = await asyncio.connect('127.0.0.1', 8888)
The function can also be used as an async context manager::
async with asyncio.connect('127.0.0.1', 8888) as stream:
...
.. versionadded:: 3.8
.. coroutinefunction:: open_connection(host=None, port=None, \*, \ .. coroutinefunction:: open_connection(host=None, port=None, \*, \
loop=None, limit=None, ssl=None, family=0, \ loop=None, limit=None, ssl=None, family=0, \
proto=0, flags=0, sock=None, local_addr=None, \ proto=0, flags=0, sock=None, local_addr=None, \
...@@ -69,10 +90,10 @@ and work with streams: ...@@ -69,10 +90,10 @@ and work with streams:
.. deprecated-removed:: 3.8 3.10 .. deprecated-removed:: 3.8 3.10
`open_connection()` is deprecated in favor of `connect()`. `open_connection()` is deprecated in favor of :func:`connect`.
.. coroutinefunction:: start_server(client_connected_cb, host=None, \ .. coroutinefunction:: start_server(client_connected_cb, host=None, \
port=None, \*, loop=None, limit=None, \ port=None, \*, loop=None, limit=2**16, \
family=socket.AF_UNSPEC, \ family=socket.AF_UNSPEC, \
flags=socket.AI_PASSIVE, sock=None, \ flags=socket.AI_PASSIVE, sock=None, \
backlog=100, ssl=None, reuse_address=None, \ backlog=100, ssl=None, reuse_address=None, \
...@@ -106,11 +127,58 @@ and work with streams: ...@@ -106,11 +127,58 @@ and work with streams:
.. deprecated-removed:: 3.8 3.10 .. deprecated-removed:: 3.8 3.10
`start_server()` is deprecated if favor of `StreamServer()` `start_server()` is deprecated if favor of :class:`StreamServer`
.. coroutinefunction:: connect_read_pipe(pipe, *, limit=2**16)
Takes a :term:`file-like object <file object>` *pipe* to return a
:class:`Stream` object of the mode :attr:`StreamMode.READ` that has
similar API of :class:`StreamReader`. It can also be used as an async context manager.
*limit* determines the buffer size limit used by the returned :class:`Stream`
instance. By default the limit is set to 64 KiB.
.. versionadded:: 3.8
.. coroutinefunction:: connect_write_pipe(pipe, *, limit=2**16)
Takes a :term:`file-like object <file object>` *pipe* to return a
:class:`Stream` object of the mode :attr:`StreamMode.WRITE` that has
similar API of :class:`StreamWriter`. It can also be used as an async context manager.
*limit* determines the buffer size limit used by the returned :class:`Stream`
instance. By default the limit is set to 64 KiB.
.. versionadded:: 3.8
.. rubric:: Unix Sockets .. rubric:: Unix Sockets
.. function:: connect_unix(path=None, *, limit=2**16, ssl=None, \
sock=None, server_hostname=None, \
ssl_handshake_timeout=None)
Establish a Unix socket connection to socket with *path* address and
return an awaitable :class:`Stream` object of the mode :attr:`StreamMode.READWRITE`
that can be used as a reader and a writer.
*limit* determines the buffer size limit used by the returned :class:`Stream`
instance. By default the *limit* is set to 64 KiB.
The rest of the arguments are passed directly to :meth:`loop.create_unix_connection`.
The function can be used with ``await`` to get a connected stream::
stream = await asyncio.connect_unix('/tmp/example.sock')
The function can also be used as an async context manager::
async with asyncio.connect_unix('/tmp/example.sock') as stream:
...
.. availability:: Unix.
.. versionadded:: 3.8
.. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \ .. coroutinefunction:: open_unix_connection(path=None, \*, loop=None, \
limit=None, ssl=None, sock=None, \ limit=None, ssl=None, sock=None, \
server_hostname=None, ssl_handshake_timeout=None) server_hostname=None, ssl_handshake_timeout=None)
...@@ -134,7 +202,7 @@ and work with streams: ...@@ -134,7 +202,7 @@ and work with streams:
.. deprecated-removed:: 3.8 3.10 .. deprecated-removed:: 3.8 3.10
`open_unix_connection()` is deprecated if favor of `connect_unix()`. ``open_unix_connection()`` is deprecated if favor of :func:`connect_unix`.
.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ .. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
...@@ -160,11 +228,176 @@ and work with streams: ...@@ -160,11 +228,176 @@ and work with streams:
.. deprecated-removed:: 3.8 3.10 .. deprecated-removed:: 3.8 3.10
`start_unix_server()` is deprecated in favor of `UnixStreamServer()`. ``start_unix_server()`` is deprecated in favor of :class:`UnixStreamServer`.
--------- ---------
StreamServer
============
.. class:: StreamServer(client_connected_cb, /, host=None, port=None, *, \
limit=2**16, family=socket.AF_UNSPEC, \
flags=socket.AI_PASSIVE, sock=None, backlog=100, \
ssl=None, reuse_address=None, reuse_port=None, \
ssl_handshake_timeout=None, shutdown_timeout=60)
The *client_connected_cb* callback is called whenever a new client
connection is established. It receives a :class:`Stream` object of the
mode :attr:`StreamMode.READWRITE`.
*client_connected_cb* can be a plain callable or a
:ref:`coroutine function <coroutine>`; if it is a coroutine function,
it will be automatically scheduled as a :class:`Task`.
*limit* determines the buffer size limit used by the
returned :class:`Stream` instance. By default the *limit*
is set to 64 KiB.
The rest of the arguments are passed directly to
:meth:`loop.create_server`.
.. coroutinemethod:: start_serving()
Binds to the given host and port to start the server.
.. coroutinemethod:: serve_forever()
Start accepting connections until the coroutine is cancelled.
Cancellation of ``serve_forever`` task causes the server
to be closed.
This method can be called if the server is already accepting
connections. Only one ``serve_forever`` task can exist per
one *Server* object.
.. method:: is_serving()
Returns ``True`` if the server is bound and currently serving.
.. method:: bind()
Bind the server to the given *host* and *port*. This method is
automatically called during ``__aenter__`` when :class:`StreamServer` is
used as an async context manager.
.. method:: is_bound()
Return ``True`` if the server is bound.
.. coroutinemethod:: abort()
Closes the connection and cancels all pending tasks.
.. coroutinemethod:: close()
Closes the connection. This method is automatically called during
``__aexit__`` when :class:`StreamServer` is used as an async context
manager.
.. attribute:: sockets
Returns a tuple of socket objects the server is bound to.
.. versionadded:: 3.8
UnixStreamServer
================
.. class:: UnixStreamServer(client_connected_cb, /, path=None, *, \
limit=2**16, sock=None, backlog=100, \
ssl=None, ssl_handshake_timeout=None, shutdown_timeout=60)
The *client_connected_cb* callback is called whenever a new client
connection is established. It receives a :class:`Stream` object of the
mode :attr:`StreamMode.READWRITE`.
*client_connected_cb* can be a plain callable or a
:ref:`coroutine function <coroutine>`; if it is a coroutine function,
it will be automatically scheduled as a :class:`Task`.
*limit* determines the buffer size limit used by the
returned :class:`Stream` instance. By default the *limit*
is set to 64 KiB.
The rest of the arguments are passed directly to
:meth:`loop.create_unix_server`.
.. coroutinemethod:: start_serving()
Binds to the given host and port to start the server.
.. method:: is_serving()
Returns ``True`` if the server is bound and currently serving.
.. method:: bind()
Bind the server to the given *host* and *port*. This method is
automatically called during ``__aenter__`` when :class:`UnixStreamServer` is
used as an async context manager.
.. method:: is_bound()
Return ``True`` if the server is bound.
.. coroutinemethod:: abort()
Closes the connection and cancels all pending tasks.
.. coroutinemethod:: close()
Closes the connection. This method is automatically called during
``__aexit__`` when :class:`UnixStreamServer` is used as an async context
manager.
.. attribute:: sockets
Returns a tuple of socket objects the server is bound to.
.. availability:: Unix.
.. versionadded:: 3.8
Stream
======
.. class:: Stream
Represents a Stream object that provides APIs to read and write data
to the IO stream . It includes the API provided by :class:`StreamReader`
and :class:`StreamWriter`.
Do not instantiate *Stream* objects directly; use API like :func:`connect`
and :class:`StreamServer` instead.
.. versionadded:: 3.8
StreamMode
==========
.. class:: StreamMode
A subclass of :class:`enum.Flag` that defines a set of values that can be
used to determine the ``mode`` of :class:`Stream` objects.
.. data:: READ
The stream object is readable and provides the API of :class:`StreamReader`.
.. data:: WRITE
The stream object is writeable and provides the API of :class:`StreamWriter`.
.. data:: READWRITE
The stream object is readable and writeable and provides the API of both
:class:`StreamReader` and :class:`StreamWriter`.
.. versionadded:: 3.8
StreamReader StreamReader
============ ============
...@@ -366,22 +599,17 @@ Examples ...@@ -366,22 +599,17 @@ Examples
TCP echo client using streams TCP echo client using streams
----------------------------- -----------------------------
TCP echo client using the :func:`asyncio.open_connection` function:: TCP echo client using the :func:`asyncio.connect` function::
import asyncio import asyncio
async def tcp_echo_client(message): async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection( async with asyncio.connect('127.0.0.1', 8888) as stream:
'127.0.0.1', 8888) print(f'Send: {message!r}')
await stream.write(message.encode())
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100) data = await stream.read(100)
print(f'Received: {data.decode()!r}') print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!')) asyncio.run(tcp_echo_client('Hello World!'))
...@@ -397,32 +625,28 @@ TCP echo client using the :func:`asyncio.open_connection` function:: ...@@ -397,32 +625,28 @@ TCP echo client using the :func:`asyncio.open_connection` function::
TCP echo server using streams TCP echo server using streams
----------------------------- -----------------------------
TCP echo server using the :func:`asyncio.start_server` function:: TCP echo server using the :class:`asyncio.StreamServer` class::
import asyncio import asyncio
async def handle_echo(reader, writer): async def handle_echo(stream):
data = await reader.read(100) data = await stream.read(100)
message = data.decode() message = data.decode()
addr = writer.get_extra_info('peername') addr = stream.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}") print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}") print(f"Send: {message!r}")
writer.write(data) await stream.write(data)
await writer.drain()
print("Close the connection") print("Close the connection")
writer.close() await stream.close()
async def main(): async def main():
server = await asyncio.start_server( async with asyncio.StreamServer(
handle_echo, '127.0.0.1', 8888) handle_echo, '127.0.0.1', 8888) as server:
addr = server.sockets[0].getsockname()
addr = server.sockets[0].getsockname() print(f'Serving on {addr}')
print(f'Serving on {addr}')
async with server:
await server.serve_forever() await server.serve_forever()
asyncio.run(main()) asyncio.run(main())
...@@ -446,11 +670,9 @@ Simple example querying HTTP headers of the URL passed on the command line:: ...@@ -446,11 +670,9 @@ Simple example querying HTTP headers of the URL passed on the command line::
async def print_http_headers(url): async def print_http_headers(url):
url = urllib.parse.urlsplit(url) url = urllib.parse.urlsplit(url)
if url.scheme == 'https': if url.scheme == 'https':
reader, writer = await asyncio.open_connection( stream = await asyncio.connect(url.hostname, 443, ssl=True)
url.hostname, 443, ssl=True)
else: else:
reader, writer = await asyncio.open_connection( stream = await asyncio.connect(url.hostname, 80)
url.hostname, 80)
query = ( query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n" f"HEAD {url.path or '/'} HTTP/1.0\r\n"
...@@ -458,18 +680,14 @@ Simple example querying HTTP headers of the URL passed on the command line:: ...@@ -458,18 +680,14 @@ Simple example querying HTTP headers of the URL passed on the command line::
f"\r\n" f"\r\n"
) )
writer.write(query.encode('latin-1')) stream.write(query.encode('latin-1'))
while True: while (line := await stream.readline()):
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip() line = line.decode('latin1').rstrip()
if line: if line:
print(f'HTTP header> {line}') print(f'HTTP header> {line}')
# Ignore the body, close the socket # Ignore the body, close the socket
writer.close() await stream.close()
url = sys.argv[1] url = sys.argv[1]
asyncio.run(print_http_headers(url)) asyncio.run(print_http_headers(url))
...@@ -490,7 +708,7 @@ Register an open socket to wait for data using streams ...@@ -490,7 +708,7 @@ Register an open socket to wait for data using streams
------------------------------------------------------ ------------------------------------------------------
Coroutine waiting until a socket receives data using the Coroutine waiting until a socket receives data using the
:func:`open_connection` function:: :func:`asyncio.connect` function::
import asyncio import asyncio
import socket import socket
...@@ -504,17 +722,15 @@ Coroutine waiting until a socket receives data using the ...@@ -504,17 +722,15 @@ Coroutine waiting until a socket receives data using the
rsock, wsock = socket.socketpair() rsock, wsock = socket.socketpair()
# Register the open socket to wait for data. # Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock) async with asyncio.connect(sock=rsock) as stream:
# Simulate the reception of data from the network
# Simulate the reception of data from the network loop.call_soon(wsock.send, 'abc'.encode())
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data # Wait for data
data = await reader.read(100) data = await stream.read(100)
# Got data, we are done: close the socket # Got data, we are done: close the socket
print("Received:", data.decode()) print("Received:", data.decode())
writer.close()
# Close the second socket # Close the second socket
wsock.close() wsock.close()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment