Commit 1cc0ee7d authored by Andrew Svetlov's avatar Andrew Svetlov Committed by Miss Islington (bot)

bpo-36801: Fix waiting in StreamWriter.drain for closing SSL transport (GH-13098)



https://bugs.python.org/issue36801
parent e19a91e4
...@@ -199,6 +199,9 @@ class FlowControlMixin(protocols.Protocol): ...@@ -199,6 +199,9 @@ class FlowControlMixin(protocols.Protocol):
self._drain_waiter = waiter self._drain_waiter = waiter
await waiter await waiter
def _get_close_waiter(self, stream):
raise NotImplementedError
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
"""Helper class to adapt between Protocol and StreamReader. """Helper class to adapt between Protocol and StreamReader.
...@@ -315,6 +318,9 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol): ...@@ -315,6 +318,9 @@ class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
return False return False
return True return True
def _get_close_waiter(self, stream):
return self._closed
def __del__(self): def __del__(self):
# Prevent reports about unhandled exceptions. # Prevent reports about unhandled exceptions.
# Better than self._closed._log_traceback = False hack # Better than self._closed._log_traceback = False hack
...@@ -376,7 +382,7 @@ class StreamWriter: ...@@ -376,7 +382,7 @@ class StreamWriter:
return self._transport.is_closing() return self._transport.is_closing()
async def wait_closed(self): async def wait_closed(self):
await self._protocol._closed await self._protocol._get_close_waiter(self)
def get_extra_info(self, name, default=None): def get_extra_info(self, name, default=None):
return self._transport.get_extra_info(name, default) return self._transport.get_extra_info(name, default)
...@@ -394,13 +400,12 @@ class StreamWriter: ...@@ -394,13 +400,12 @@ class StreamWriter:
if exc is not None: if exc is not None:
raise exc raise exc
if self._transport.is_closing(): if self._transport.is_closing():
# Yield to the event loop so connection_lost() may be # Wait for protocol.connection_lost() call
# called. Without this, _drain_helper() would return # Raise connection closing error if any,
# immediately, and code that calls # ConnectionResetError otherwise
# write(...); await drain() fut = self._protocol._get_close_waiter(self)
# in a loop would never call connection_lost(), so it await fut
# would not see an error when the socket is closed. raise ConnectionResetError('Connection lost')
await sleep(0, loop=self._loop)
await self._protocol._drain_helper() await self._protocol._drain_helper()
async def aclose(self): async def aclose(self):
......
...@@ -26,6 +26,7 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, ...@@ -26,6 +26,7 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
self._transport = None self._transport = None
self._process_exited = False self._process_exited = False
self._pipe_fds = [] self._pipe_fds = []
self._stdin_closed = self._loop.create_future()
def __repr__(self): def __repr__(self):
info = [self.__class__.__name__] info = [self.__class__.__name__]
...@@ -80,6 +81,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, ...@@ -80,6 +81,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
if pipe is not None: if pipe is not None:
pipe.close() pipe.close()
self.connection_lost(exc) self.connection_lost(exc)
if exc is None:
self._stdin_closed.set_result(None)
else:
self._stdin_closed.set_exception(exc)
return return
if fd == 1: if fd == 1:
reader = self.stdout reader = self.stdout
...@@ -106,6 +111,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, ...@@ -106,6 +111,10 @@ class SubprocessStreamProtocol(streams.FlowControlMixin,
self._transport.close() self._transport.close()
self._transport = None self._transport = None
def _get_close_waiter(self, stream):
if stream is self.stdin:
return self._stdin_closed
class Process: class Process:
def __init__(self, transport, protocol, loop, *, _asyncio_internal=False): def __init__(self, transport, protocol, loop, *, _asyncio_internal=False):
......
...@@ -109,6 +109,29 @@ class StreamTests(test_utils.TestCase): ...@@ -109,6 +109,29 @@ class StreamTests(test_utils.TestCase):
self._basetest_open_connection_no_loop_ssl(conn_fut) self._basetest_open_connection_no_loop_ssl(conn_fut)
@unittest.skipIf(ssl is None, 'No ssl module')
def test_drain_on_closed_writer_ssl(self):
async def inner(httpd):
reader, writer = await asyncio.open_connection(
*httpd.address,
ssl=test_utils.dummy_ssl_context())
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
writer.write(b'GET / HTTP/1.0\r\n\r\n')
data = await reader.read()
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
writer.close()
with self.assertRaises(ConnectionResetError):
await writer.drain()
self.assertEqual(messages, [])
with test_utils.run_test_server(use_ssl=True) as httpd:
self.loop.run_until_complete(inner(httpd))
def _basetest_open_connection_error(self, open_connection_fut): def _basetest_open_connection_error(self, open_connection_fut):
messages = [] messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx)) self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
......
Properly handle SSL connection closing in asyncio StreamWriter.drain() call.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment