Commit c9070d03 authored by Yury Selivanov's avatar Yury Selivanov Committed by GitHub

bpo-32662: Implement Server.start_serving() and Server.serve_forever() (#5312)

* bpo-32662: Implement Server.start_serving() and Server.serve_forever()

New methods:

* Server.start_serving(),
* Server.serve_forever(), and
* Server.is_serving().

Add 'start_serving' keyword parameter to loop.create_server() and
loop.create_unix_server().
parent 1aa094f7
...@@ -424,7 +424,7 @@ Creating connections ...@@ -424,7 +424,7 @@ Creating connections
Creating listening connections Creating listening connections
------------------------------ ------------------------------
.. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None) .. coroutinemethod:: AbstractEventLoop.create_server(protocol_factory, host=None, port=None, \*, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to Create a TCP server (socket type :data:`~socket.SOCK_STREAM`) bound to
*host* and *port*. *host* and *port*.
...@@ -472,9 +472,15 @@ Creating listening connections ...@@ -472,9 +472,15 @@ Creating listening connections
for the SSL handshake to complete before aborting the connection. for the SSL handshake to complete before aborting the connection.
``10.0`` seconds if ``None`` (default). ``10.0`` seconds if ``None`` (default).
* *start_serving* set to ``True`` (the default) causes the created server
to start accepting connections immediately. When set to ``False``,
the user should await on :meth:`Server.start_serving` or
:meth:`Server.serve_forever` to make the server to start accepting
connections.
.. versionadded:: 3.7 .. versionadded:: 3.7
The *ssl_handshake_timeout* parameter. *ssl_handshake_timeout* and *start_serving* parameters.
.. versionchanged:: 3.5 .. versionchanged:: 3.5
...@@ -490,7 +496,7 @@ Creating listening connections ...@@ -490,7 +496,7 @@ Creating listening connections
The *host* parameter can now be a sequence of strings. The *host* parameter can now be a sequence of strings.
.. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None) .. coroutinemethod:: AbstractEventLoop.create_unix_server(protocol_factory, path=None, \*, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
Similar to :meth:`AbstractEventLoop.create_server`, but specific to the Similar to :meth:`AbstractEventLoop.create_server`, but specific to the
socket family :py:data:`~socket.AF_UNIX`. socket family :py:data:`~socket.AF_UNIX`.
...@@ -929,8 +935,26 @@ Server ...@@ -929,8 +935,26 @@ Server
Server listening on sockets. Server listening on sockets.
Object created by the :meth:`AbstractEventLoop.create_server` method and the Object created by :meth:`AbstractEventLoop.create_server`,
:func:`start_server` function. Don't instantiate the class directly. :meth:`AbstractEventLoop.create_unix_server`, :func:`start_server`,
and :func:`start_unix_server` functions. Don't instantiate the class
directly.
*Server* objects are asynchronous context managers. When used in an
``async with`` statement, it's guaranteed that the Server object is
closed and not accepting new connections when the ``async with``
statement is completed::
srv = await loop.create_server(...)
async with srv:
# some code
# At this point, srv is closed and no longer accepts new connections.
.. versionchanged:: 3.7
Server object is an asynchronous context manager since Python 3.7.
.. method:: close() .. method:: close()
...@@ -949,6 +973,54 @@ Server ...@@ -949,6 +973,54 @@ Server
.. versionadded:: 3.7 .. versionadded:: 3.7
.. coroutinemethod:: start_serving()
Start accepting connections.
This method is idempotent, so it can be called when
the server is already being serving.
The new *start_serving* keyword-only parameter to
:meth:`AbstractEventLoop.create_server` and
:meth:`asyncio.start_server` allows to create a Server object
that is not accepting connections right away. In which case
this method, or :meth:`Server.serve_forever` can be used
to make the Server object to start accepting connections.
.. versionadded:: 3.7
.. coroutinemethod:: serve_forever()
Start accepting connections until the coroutine is cancelled.
Cancellation of ``serve_forever`` task causes the server
to be closed.
This method can be called if the server is already accepting
connections. Only one ``serve_forever`` task can exist per
one *Server* object.
Example::
async def client_connected(reader, writer):
# Communicate with the client with
# reader/writer streams. For example:
await reader.readline()
async def main(host, port):
srv = await asyncio.start_server(
client_connected, host, port)
await loop.serve_forever()
asyncio.run(main('127.0.0.1', 0))
.. versionadded:: 3.7
.. method:: is_serving()
Return ``True`` if the server is accepting new connections.
.. versionadded:: 3.7
.. coroutinemethod:: wait_closed() .. coroutinemethod:: wait_closed()
Wait until the :meth:`close` method completes. Wait until the :meth:`close` method completes.
...@@ -958,6 +1030,11 @@ Server ...@@ -958,6 +1030,11 @@ Server
List of :class:`socket.socket` objects the server is listening to, or List of :class:`socket.socket` objects the server is listening to, or
``None`` if the server is closed. ``None`` if the server is closed.
.. versionchanged:: 3.7
Prior to Python 3.7 ``Server.sockets`` used to return the
internal list of server's sockets directly. In 3.7 a copy
of that list is returned.
Handle Handle
------ ------
......
...@@ -157,47 +157,106 @@ def _run_until_complete_cb(fut): ...@@ -157,47 +157,106 @@ def _run_until_complete_cb(fut):
class Server(events.AbstractServer): class Server(events.AbstractServer):
def __init__(self, loop, sockets): def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
ssl_handshake_timeout):
self._loop = loop self._loop = loop
self.sockets = sockets self._sockets = sockets
self._active_count = 0 self._active_count = 0
self._waiters = [] self._waiters = []
self._protocol_factory = protocol_factory
self._backlog = backlog
self._ssl_context = ssl_context
self._ssl_handshake_timeout = ssl_handshake_timeout
self._serving = False
self._serving_forever_fut = None
def __repr__(self): def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>' return f'<{self.__class__.__name__} sockets={self.sockets!r}>'
def _attach(self): def _attach(self):
assert self.sockets is not None assert self._sockets is not None
self._active_count += 1 self._active_count += 1
def _detach(self): def _detach(self):
assert self._active_count > 0 assert self._active_count > 0
self._active_count -= 1 self._active_count -= 1
if self._active_count == 0 and self.sockets is None: if self._active_count == 0 and self._sockets is None:
self._wakeup() self._wakeup()
def _wakeup(self):
waiters = self._waiters
self._waiters = None
for waiter in waiters:
if not waiter.done():
waiter.set_result(waiter)
def _start_serving(self):
if self._serving:
return
self._serving = True
for sock in self._sockets:
sock.listen(self._backlog)
self._loop._start_serving(
self._protocol_factory, sock, self._ssl_context,
self, self._backlog, self._ssl_handshake_timeout)
def get_loop(self):
return self._loop
def is_serving(self):
return self._serving
@property
def sockets(self):
if self._sockets is None:
return []
return list(self._sockets)
def close(self): def close(self):
sockets = self.sockets sockets = self._sockets
if sockets is None: if sockets is None:
return return
self.sockets = None self._sockets = None
for sock in sockets: for sock in sockets:
self._loop._stop_serving(sock) self._loop._stop_serving(sock)
self._serving = False
if (self._serving_forever_fut is not None and
not self._serving_forever_fut.done()):
self._serving_forever_fut.cancel()
self._serving_forever_fut = None
if self._active_count == 0: if self._active_count == 0:
self._wakeup() self._wakeup()
def get_loop(self): async def start_serving(self):
return self._loop self._start_serving()
def _wakeup(self): async def serve_forever(self):
waiters = self._waiters if self._serving_forever_fut is not None:
self._waiters = None raise RuntimeError(
for waiter in waiters: f'server {self!r} is already being awaited on serve_forever()')
if not waiter.done(): if self._sockets is None:
waiter.set_result(waiter) raise RuntimeError(f'server {self!r} is closed')
self._start_serving()
self._serving_forever_fut = self._loop.create_future()
try:
await self._serving_forever_fut
except futures.CancelledError:
try:
self.close()
await self.wait_closed()
finally:
raise
finally:
self._serving_forever_fut = None
async def wait_closed(self): async def wait_closed(self):
if self.sockets is None or self._waiters is None: if self._sockets is None or self._waiters is None:
return return
waiter = self._loop.create_future() waiter = self._loop.create_future()
self._waiters.append(waiter) self._waiters.append(waiter)
...@@ -1059,7 +1118,8 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -1059,7 +1118,8 @@ class BaseEventLoop(events.AbstractEventLoop):
ssl=None, ssl=None,
reuse_address=None, reuse_address=None,
reuse_port=None, reuse_port=None,
ssl_handshake_timeout=None): ssl_handshake_timeout=None,
start_serving=True):
"""Create a TCP server. """Create a TCP server.
The host parameter can be a string, in that case the TCP server is The host parameter can be a string, in that case the TCP server is
...@@ -1149,12 +1209,14 @@ class BaseEventLoop(events.AbstractEventLoop): ...@@ -1149,12 +1209,14 @@ class BaseEventLoop(events.AbstractEventLoop):
raise ValueError(f'A Stream Socket was expected, got {sock!r}') raise ValueError(f'A Stream Socket was expected, got {sock!r}')
sockets = [sock] sockets = [sock]
server = Server(self, sockets)
for sock in sockets: for sock in sockets:
sock.listen(backlog)
sock.setblocking(False) sock.setblocking(False)
self._start_serving(protocol_factory, sock, ssl, server, backlog,
ssl_handshake_timeout) server = Server(self, sockets, protocol_factory,
ssl, backlog, ssl_handshake_timeout)
if start_serving:
server._start_serving()
if self._debug: if self._debug:
logger.info("%r is serving", server) logger.info("%r is serving", server)
return server return server
......
...@@ -164,13 +164,39 @@ class AbstractServer: ...@@ -164,13 +164,39 @@ class AbstractServer:
"""Stop serving. This leaves existing connections open.""" """Stop serving. This leaves existing connections open."""
raise NotImplementedError raise NotImplementedError
def get_loop(self):
"""Get the event loop the Server object is attached to."""
raise NotImplementedError
def is_serving(self):
"""Return True if the server is accepting connections."""
raise NotImplementedError
async def start_serving(self):
"""Start accepting connections.
This method is idempotent, so it can be called when
the server is already being serving.
"""
raise NotImplementedError
async def serve_forever(self):
"""Start accepting connections until the coroutine is cancelled.
The server is closed when the coroutine is cancelled.
"""
raise NotImplementedError
async def wait_closed(self): async def wait_closed(self):
"""Coroutine to wait until service is closed.""" """Coroutine to wait until service is closed."""
raise NotImplementedError raise NotImplementedError
def get_loop(self): async def __aenter__(self):
""" Get the event loop the Server object is attached to.""" return self
raise NotImplementedError
async def __aexit__(self, *exc):
self.close()
await self.wait_closed()
class AbstractEventLoop: class AbstractEventLoop:
...@@ -279,7 +305,8 @@ class AbstractEventLoop: ...@@ -279,7 +305,8 @@ class AbstractEventLoop:
*, family=socket.AF_UNSPEC, *, family=socket.AF_UNSPEC,
flags=socket.AI_PASSIVE, sock=None, backlog=100, flags=socket.AI_PASSIVE, sock=None, backlog=100,
ssl=None, reuse_address=None, reuse_port=None, ssl=None, reuse_address=None, reuse_port=None,
ssl_handshake_timeout=None): ssl_handshake_timeout=None,
start_serving=True):
"""A coroutine which creates a TCP server bound to host and port. """A coroutine which creates a TCP server bound to host and port.
The return value is a Server object which can be used to stop The return value is a Server object which can be used to stop
...@@ -319,6 +346,11 @@ class AbstractEventLoop: ...@@ -319,6 +346,11 @@ class AbstractEventLoop:
will wait for completion of the SSL handshake before aborting the will wait for completion of the SSL handshake before aborting the
connection. Default is 10s, longer timeouts may increase vulnerability connection. Default is 10s, longer timeouts may increase vulnerability
to DoS attacks (see https://support.f5.com/csp/article/K13834) to DoS attacks (see https://support.f5.com/csp/article/K13834)
start_serving set to True (default) causes the created server
to start accepting connections immediately. When set to False,
the user should await Server.start_serving() or Server.serve_forever()
to make the server to start accepting connections.
""" """
raise NotImplementedError raise NotImplementedError
...@@ -343,7 +375,8 @@ class AbstractEventLoop: ...@@ -343,7 +375,8 @@ class AbstractEventLoop:
async def create_unix_server( async def create_unix_server(
self, protocol_factory, path=None, *, self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None, sock=None, backlog=100, ssl=None,
ssl_handshake_timeout=None): ssl_handshake_timeout=None,
start_serving=True):
"""A coroutine which creates a UNIX Domain Socket server. """A coroutine which creates a UNIX Domain Socket server.
The return value is a Server object, which can be used to stop The return value is a Server object, which can be used to stop
...@@ -363,6 +396,11 @@ class AbstractEventLoop: ...@@ -363,6 +396,11 @@ class AbstractEventLoop:
ssl_handshake_timeout is the time in seconds that an SSL server ssl_handshake_timeout is the time in seconds that an SSL server
will wait for the SSL handshake to complete (defaults to 10s). will wait for the SSL handshake to complete (defaults to 10s).
start_serving set to True (default) causes the created server
to start accepting connections immediately. When set to False,
the user should await Server.start_serving() or Server.serve_forever()
to make the server to start accepting connections.
""" """
raise NotImplementedError raise NotImplementedError
......
...@@ -250,7 +250,8 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...@@ -250,7 +250,8 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
async def create_unix_server( async def create_unix_server(
self, protocol_factory, path=None, *, self, protocol_factory, path=None, *,
sock=None, backlog=100, ssl=None, sock=None, backlog=100, ssl=None,
ssl_handshake_timeout=None): ssl_handshake_timeout=None,
start_serving=True):
if isinstance(ssl, bool): if isinstance(ssl, bool):
raise TypeError('ssl argument must be an SSLContext or None') raise TypeError('ssl argument must be an SSLContext or None')
...@@ -302,11 +303,12 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...@@ -302,11 +303,12 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
raise ValueError( raise ValueError(
f'A UNIX Domain Stream Socket was expected, got {sock!r}') f'A UNIX Domain Stream Socket was expected, got {sock!r}')
server = base_events.Server(self, [sock])
sock.listen(backlog)
sock.setblocking(False) sock.setblocking(False)
self._start_serving(protocol_factory, sock, ssl, server, server = base_events.Server(self, [sock], protocol_factory,
ssl_handshake_timeout=ssl_handshake_timeout) ssl, backlog, ssl_handshake_timeout)
if start_serving:
server._start_serving()
return server return server
async def _sock_sendfile_native(self, sock, file, offset, count): async def _sock_sendfile_native(self, sock, file, offset, count):
......
import asyncio
import socket
import threading
import unittest
from test.test_asyncio import utils as test_utils
from test.test_asyncio import functional as func_tests
class BaseStartServer(func_tests.FunctionalTestCaseMixin):
def new_loop(self):
raise NotImplementedError
def test_start_server_1(self):
HELLO_MSG = b'1' * 1024 * 5 + b'\n'
def client(sock, addr):
sock.connect(addr)
sock.send(HELLO_MSG)
sock.recv_all(1)
sock.close()
async def serve(reader, writer):
await reader.readline()
main_task.cancel()
writer.write(b'1')
writer.close()
await writer.wait_closed()
async def main(srv):
async with srv:
await srv.serve_forever()
srv = self.loop.run_until_complete(asyncio.start_server(
serve, '127.0.0.1', 0, loop=self.loop, start_serving=False))
self.assertFalse(srv.is_serving())
main_task = self.loop.create_task(main(srv))
addr = srv.sockets[0].getsockname()
with self.assertRaises(asyncio.CancelledError):
with self.tcp_client(lambda sock: client(sock, addr)):
self.loop.run_until_complete(main_task)
self.assertEqual(srv.sockets, [])
self.assertIsNone(srv._sockets)
self.assertIsNone(srv._waiters)
self.assertFalse(srv.is_serving())
with self.assertRaisesRegex(RuntimeError, r'is closed'):
self.loop.run_until_complete(srv.serve_forever())
class SelectorStartServerTests(BaseStartServer, unittest.TestCase):
def new_loop(self):
return asyncio.SelectorEventLoop()
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'no Unix sockets')
def test_start_unix_server_1(self):
HELLO_MSG = b'1' * 1024 * 5 + b'\n'
started = threading.Event()
def client(sock, addr):
started.wait(5)
sock.connect(addr)
sock.send(HELLO_MSG)
sock.recv_all(1)
sock.close()
async def serve(reader, writer):
await reader.readline()
main_task.cancel()
writer.write(b'1')
writer.close()
await writer.wait_closed()
async def main(srv):
async with srv:
self.assertFalse(srv.is_serving())
await srv.start_serving()
self.assertTrue(srv.is_serving())
started.set()
await srv.serve_forever()
with test_utils.unix_socket_path() as addr:
srv = self.loop.run_until_complete(asyncio.start_unix_server(
serve, addr, loop=self.loop, start_serving=False))
main_task = self.loop.create_task(main(srv))
with self.assertRaises(asyncio.CancelledError):
with self.unix_client(lambda sock: client(sock, addr)):
self.loop.run_until_complete(main_task)
self.assertEqual(srv.sockets, [])
self.assertIsNone(srv._sockets)
self.assertIsNone(srv._waiters)
self.assertFalse(srv.is_serving())
with self.assertRaisesRegex(RuntimeError, r'is closed'):
self.loop.run_until_complete(srv.serve_forever())
@unittest.skipUnless(hasattr(asyncio, 'ProactorEventLoop'), 'Windows only')
class ProactorStartServerTests(BaseStartServer, unittest.TestCase):
def new_loop(self):
return asyncio.ProactorEventLoop()
if __name__ == '__main__':
unittest.main()
Implement Server.start_serving(), Server.serve_forever(), and
Server.is_serving() methods. Add 'start_serving' keyword parameter to
loop.create_server() and loop.create_unix_server().
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment