Commit 0284a48f authored by Guido van Rossum's avatar Guido van Rossum

asyncio: Refactor ssl transport ready loop (Nikolay Kim).

parent 89a9fe91
......@@ -286,7 +286,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# Jump to the except clause below.
raise OSError(err, 'Connect call failed')
raise OSError(err, 'Connect call failed %s' % (address,))
except (BlockingIOError, InterruptedError):
self.add_writer(fd, self._sock_connect, fut, True, sock, address)
except Exception as exc:
......@@ -413,7 +413,7 @@ class _SelectorTransport(transports.Transport):
try:
self._protocol.pause_writing()
except Exception:
tulip_log.exception('pause_writing() failed')
logger.exception('pause_writing() failed')
def _maybe_resume_protocol(self):
if (self._protocol_paused and
......@@ -422,7 +422,7 @@ class _SelectorTransport(transports.Transport):
try:
self._protocol.resume_writing()
except Exception:
tulip_log.exception('resume_writing() failed')
logger.exception('resume_writing() failed')
def set_write_buffer_limits(self, high=None, low=None):
if high is None:
......@@ -635,15 +635,16 @@ class _SelectorSslTransport(_SelectorTransport):
compression=self._sock.compression(),
)
self._loop.add_reader(self._sock_fd, self._on_ready)
self._loop.add_writer(self._sock_fd, self._on_ready)
self._read_wants_write = False
self._write_wants_read = False
self._loop.add_reader(self._sock_fd, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
if self._waiter is not None:
self._loop.call_soon(self._waiter.set_result, None)
def pause_reading(self):
# XXX This is a bit icky, given the comment at the top of
# _on_ready(). Is it possible to evoke a deadlock? I don't
# _read_ready(). Is it possible to evoke a deadlock? I don't
# know, although it doesn't look like it; write() will still
# accept more data for the buffer and eventually the app will
# call resume_reading() again, and things will flow again.
......@@ -658,41 +659,55 @@ class _SelectorSslTransport(_SelectorTransport):
self._paused = False
if self._closing:
return
self._loop.add_reader(self._sock_fd, self._on_ready)
self._loop.add_reader(self._sock_fd, self._read_ready)
def _on_ready(self):
# Because of renegotiations (?), there's no difference between
# readable and writable. We just try both. XXX This may be
# incorrect; we probably need to keep state about what we
# should do next.
def _read_ready(self):
if self._write_wants_read:
self._write_wants_read = False
self._write_ready()
# First try reading.
if not self._closing and not self._paused:
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError,
ssl.SSLWantReadError, ssl.SSLWantWriteError):
pass
except Exception as exc:
self._fatal_error(exc)
if self._buffer:
self._loop.add_writer(self._sock_fd, self._write_ready)
try:
data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
pass
except ssl.SSLWantWriteError:
self._read_wants_write = True
self._loop.remove_reader(self._sock_fd)
self._loop.add_writer(self._sock_fd, self._write_ready)
except Exception as exc:
self._fatal_error(exc)
else:
if data:
self._protocol.data_received(data)
else:
if data:
self._protocol.data_received(data)
else:
try:
self._protocol.eof_received()
finally:
self.close()
try:
self._protocol.eof_received()
finally:
self.close()
def _write_ready(self):
if self._read_wants_write:
self._read_wants_write = False
self._read_ready()
if not (self._paused or self._closing):
self._loop.add_reader(self._sock_fd, self._read_ready)
# Now try writing, if there's anything to write.
if self._buffer:
data = b''.join(self._buffer)
self._buffer.clear()
try:
n = self._sock.send(data)
except (BlockingIOError, InterruptedError,
ssl.SSLWantReadError, ssl.SSLWantWriteError):
ssl.SSLWantWriteError):
n = 0
except ssl.SSLWantReadError:
n = 0
self._loop.remove_writer(self._sock_fd)
self._write_wants_read = True
except Exception as exc:
self._loop.remove_writer(self._sock_fd)
self._fatal_error(exc)
......@@ -701,11 +716,12 @@ class _SelectorSslTransport(_SelectorTransport):
if n < len(data):
self._buffer.append(data[n:])
self._maybe_resume_protocol() # May append to buffer.
self._maybe_resume_protocol() # May append to buffer.
if self._closing and not self._buffer:
if not self._buffer:
self._loop.remove_writer(self._sock_fd)
self._call_connection_lost(None)
if self._closing:
self._call_connection_lost(None)
def write(self, data):
assert isinstance(data, bytes), repr(type(data))
......@@ -718,20 +734,16 @@ class _SelectorSslTransport(_SelectorTransport):
self._conn_lost += 1
return
# We could optimize, but the callback can do this for now.
if not self._buffer:
self._loop.add_writer(self._sock_fd, self._write_ready)
# Add it to the buffer.
self._buffer.append(data)
self._maybe_pause_protocol()
def can_write_eof(self):
return False
def close(self):
if self._closing:
return
self._closing = True
self._conn_lost += 1
self._loop.remove_reader(self._sock_fd)
class _SelectorDatagramTransport(_SelectorTransport):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment