Commit e692a05d authored by Yury Selivanov's avatar Yury Selivanov

Issue #27906: Fix socket accept exhaustion during high TCP traffic.

Patch by Kevin Conway.
parent 5b6cf804
......@@ -1034,7 +1034,7 @@ class BaseEventLoop(events.AbstractEventLoop):
for sock in sockets:
sock.listen(backlog)
sock.setblocking(False)
self._start_serving(protocol_factory, sock, ssl, server)
self._start_serving(protocol_factory, sock, ssl, server, backlog)
if self._debug:
logger.info("%r is serving", server)
return server
......
......@@ -494,7 +494,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
self._csock.send(b'\0')
def _start_serving(self, protocol_factory, sock,
sslcontext=None, server=None):
sslcontext=None, server=None, backlog=100):
def loop(f=None):
try:
......
......@@ -162,43 +162,50 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
exc_info=True)
def _start_serving(self, protocol_factory, sock,
sslcontext=None, server=None):
sslcontext=None, server=None, backlog=100):
self.add_reader(sock.fileno(), self._accept_connection,
protocol_factory, sock, sslcontext, server)
protocol_factory, sock, sslcontext, server, backlog)
def _accept_connection(self, protocol_factory, sock,
sslcontext=None, server=None):
try:
conn, addr = sock.accept()
if self._debug:
logger.debug("%r got a new connection from %r: %r",
server, addr, conn)
conn.setblocking(False)
except (BlockingIOError, InterruptedError, ConnectionAbortedError):
pass # False alarm.
except OSError as exc:
# There's nowhere to send the error, so just log it.
if exc.errno in (errno.EMFILE, errno.ENFILE,
errno.ENOBUFS, errno.ENOMEM):
# Some platforms (e.g. Linux keep reporting the FD as
# ready, so we remove the read handler temporarily.
# We'll try again in a while.
self.call_exception_handler({
'message': 'socket.accept() out of system resource',
'exception': exc,
'socket': sock,
})
self.remove_reader(sock.fileno())
self.call_later(constants.ACCEPT_RETRY_DELAY,
self._start_serving,
protocol_factory, sock, sslcontext, server)
sslcontext=None, server=None, backlog=100):
# This method is only called once for each event loop tick where the
# listening socket has triggered an EVENT_READ. There may be multiple
# connections waiting for an .accept() so it is called in a loop.
# See https://bugs.python.org/issue27906 for more details.
for _ in range(backlog):
try:
conn, addr = sock.accept()
if self._debug:
logger.debug("%r got a new connection from %r: %r",
server, addr, conn)
conn.setblocking(False)
except (BlockingIOError, InterruptedError, ConnectionAbortedError):
# Early exit because the socket accept buffer is empty.
return None
except OSError as exc:
# There's nowhere to send the error, so just log it.
if exc.errno in (errno.EMFILE, errno.ENFILE,
errno.ENOBUFS, errno.ENOMEM):
# Some platforms (e.g. Linux keep reporting the FD as
# ready, so we remove the read handler temporarily.
# We'll try again in a while.
self.call_exception_handler({
'message': 'socket.accept() out of system resource',
'exception': exc,
'socket': sock,
})
self.remove_reader(sock.fileno())
self.call_later(constants.ACCEPT_RETRY_DELAY,
self._start_serving,
protocol_factory, sock, sslcontext, server,
backlog)
else:
raise # The event loop will catch, log and ignore it.
else:
raise # The event loop will catch, log and ignore it.
else:
extra = {'peername': addr}
accept = self._accept_connection2(protocol_factory, conn, extra,
sslcontext, server)
self.create_task(accept)
extra = {'peername': addr}
accept = self._accept_connection2(protocol_factory, conn, extra,
sslcontext, server)
self.create_task(accept)
@coroutine
def _accept_connection2(self, protocol_factory, conn, extra,
......
......@@ -1634,7 +1634,7 @@ class BaseEventLoopWithSelectorTests(test_utils.TestCase):
self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
# self.loop._start_serving
mock.ANY,
MyProto, sock, None, None)
MyProto, sock, None, None, mock.ANY)
def test_call_coroutine(self):
@asyncio.coroutine
......
......@@ -687,6 +687,20 @@ class BaseSelectorEventLoopTests(test_utils.TestCase):
selectors.EVENT_WRITE)])
self.loop.remove_writer.assert_called_with(1)
def test_accept_connection_multiple(self):
sock = mock.Mock()
sock.accept.return_value = (mock.Mock(), mock.Mock())
backlog = 100
# Mock the coroutine generation for a connection to prevent
# warnings related to un-awaited coroutines.
mock_obj = mock.patch.object
with mock_obj(self.loop, '_accept_connection2') as accept2_mock:
accept2_mock.return_value = None
with mock_obj(self.loop, 'create_task') as task_mock:
task_mock.return_value = None
self.loop._accept_connection(mock.Mock(), sock, backlog=backlog)
self.assertEqual(sock.accept.call_count, backlog)
class SelectorTransportTests(test_utils.TestCase):
......
......@@ -263,6 +263,9 @@ Library
- Issue #27456: asyncio: Set TCP_NODELAY by default.
- Issue #27906: Fix socket accept exhaustion during high TCP traffic.
Patch by Kevin Conway.
IDLE
----
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment