Commit 2546a177 authored by Guido van Rossum's avatar Guido van Rossum

Important race condition fix for Tulip.

parent 559ae0fb
...@@ -344,7 +344,7 @@ class _SelectorTransport(transports.Transport): ...@@ -344,7 +344,7 @@ class _SelectorTransport(transports.Transport):
self._protocol = protocol self._protocol = protocol
self._server = server self._server = server
self._buffer = collections.deque() self._buffer = collections.deque()
self._conn_lost = 0 self._conn_lost = 0 # Set when call to connection_lost scheduled.
self._closing = False # Set when close() called. self._closing = False # Set when close() called.
if server is not None: if server is not None:
server.attach(self) server.attach(self)
...@@ -356,27 +356,27 @@ class _SelectorTransport(transports.Transport): ...@@ -356,27 +356,27 @@ class _SelectorTransport(transports.Transport):
if self._closing: if self._closing:
return return
self._closing = True self._closing = True
self._conn_lost += 1
self._loop.remove_reader(self._sock_fd) self._loop.remove_reader(self._sock_fd)
if not self._buffer: if not self._buffer:
self._conn_lost += 1
self._loop.call_soon(self._call_connection_lost, None) self._loop.call_soon(self._call_connection_lost, None)
def _fatal_error(self, exc): def _fatal_error(self, exc):
# should be called from exception handler only # Should be called from exception handler only.
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
logger.exception('Fatal error for %s', self) logger.exception('Fatal error for %s', self)
self._force_close(exc) self._force_close(exc)
def _force_close(self, exc): def _force_close(self, exc):
if self._conn_lost:
return
if self._buffer: if self._buffer:
self._buffer.clear() self._buffer.clear()
self._loop.remove_writer(self._sock_fd) self._loop.remove_writer(self._sock_fd)
if not self._closing:
if self._closing:
return
self._closing = True self._closing = True
self._conn_lost += 1
self._loop.remove_reader(self._sock_fd) self._loop.remove_reader(self._sock_fd)
self._conn_lost += 1
self._loop.call_soon(self._call_connection_lost, exc) self._loop.call_soon(self._call_connection_lost, exc)
def _call_connection_lost(self, exc): def _call_connection_lost(self, exc):
...@@ -424,8 +424,6 @@ class _SelectorSocketTransport(_SelectorTransport): ...@@ -424,8 +424,6 @@ class _SelectorSocketTransport(_SelectorTransport):
data = self._sock.recv(self.max_size) data = self._sock.recv(self.max_size)
except (BlockingIOError, InterruptedError): except (BlockingIOError, InterruptedError):
pass pass
except ConnectionResetError as exc:
self._force_close(exc)
except Exception as exc: except Exception as exc:
self._fatal_error(exc) self._fatal_error(exc)
else: else:
...@@ -453,17 +451,15 @@ class _SelectorSocketTransport(_SelectorTransport): ...@@ -453,17 +451,15 @@ class _SelectorSocketTransport(_SelectorTransport):
try: try:
n = self._sock.send(data) n = self._sock.send(data)
except (BlockingIOError, InterruptedError): except (BlockingIOError, InterruptedError):
n = 0 pass
except (BrokenPipeError, ConnectionResetError) as exc: except Exception as exc:
self._force_close(exc)
return
except OSError as exc:
self._fatal_error(exc) self._fatal_error(exc)
return return
else: else:
data = data[n:] data = data[n:]
if not data: if not data:
return return
# Start async I/O. # Start async I/O.
self._loop.add_writer(self._sock_fd, self._write_ready) self._loop.add_writer(self._sock_fd, self._write_ready)
...@@ -478,9 +474,6 @@ class _SelectorSocketTransport(_SelectorTransport): ...@@ -478,9 +474,6 @@ class _SelectorSocketTransport(_SelectorTransport):
n = self._sock.send(data) n = self._sock.send(data)
except (BlockingIOError, InterruptedError): except (BlockingIOError, InterruptedError):
self._buffer.append(data) self._buffer.append(data)
except (BrokenPipeError, ConnectionResetError) as exc:
self._loop.remove_writer(self._sock_fd)
self._force_close(exc)
except Exception as exc: except Exception as exc:
self._loop.remove_writer(self._sock_fd) self._loop.remove_writer(self._sock_fd)
self._fatal_error(exc) self._fatal_error(exc)
...@@ -493,7 +486,6 @@ class _SelectorSocketTransport(_SelectorTransport): ...@@ -493,7 +486,6 @@ class _SelectorSocketTransport(_SelectorTransport):
elif self._eof: elif self._eof:
self._sock.shutdown(socket.SHUT_WR) self._sock.shutdown(socket.SHUT_WR)
return return
self._buffer.append(data) # Try again later. self._buffer.append(data) # Try again later.
def write_eof(self): def write_eof(self):
...@@ -622,8 +614,6 @@ class _SelectorSslTransport(_SelectorTransport): ...@@ -622,8 +614,6 @@ class _SelectorSslTransport(_SelectorTransport):
except (BlockingIOError, InterruptedError, except (BlockingIOError, InterruptedError,
ssl.SSLWantReadError, ssl.SSLWantWriteError): ssl.SSLWantReadError, ssl.SSLWantWriteError):
pass pass
except ConnectionResetError as exc:
self._force_close(exc)
except Exception as exc: except Exception as exc:
self._fatal_error(exc) self._fatal_error(exc)
else: else:
...@@ -644,10 +634,6 @@ class _SelectorSslTransport(_SelectorTransport): ...@@ -644,10 +634,6 @@ class _SelectorSslTransport(_SelectorTransport):
except (BlockingIOError, InterruptedError, except (BlockingIOError, InterruptedError,
ssl.SSLWantReadError, ssl.SSLWantWriteError): ssl.SSLWantReadError, ssl.SSLWantWriteError):
n = 0 n = 0
except (BrokenPipeError, ConnectionResetError) as exc:
self._loop.remove_writer(self._sock_fd)
self._force_close(exc)
return
except Exception as exc: except Exception as exc:
self._loop.remove_writer(self._sock_fd) self._loop.remove_writer(self._sock_fd)
self._fatal_error(exc) self._fatal_error(exc)
...@@ -726,12 +712,12 @@ class _SelectorDatagramTransport(_SelectorTransport): ...@@ -726,12 +712,12 @@ class _SelectorDatagramTransport(_SelectorTransport):
else: else:
self._sock.sendto(data, addr) self._sock.sendto(data, addr)
return return
except (BlockingIOError, InterruptedError):
self._loop.add_writer(self._sock_fd, self._sendto_ready)
except ConnectionRefusedError as exc: except ConnectionRefusedError as exc:
if self._address: if self._address:
self._fatal_error(exc) self._fatal_error(exc)
return return
except (BlockingIOError, InterruptedError):
self._loop.add_writer(self._sock_fd, self._sendto_ready)
except Exception as exc: except Exception as exc:
self._fatal_error(exc) self._fatal_error(exc)
return return
...@@ -746,13 +732,13 @@ class _SelectorDatagramTransport(_SelectorTransport): ...@@ -746,13 +732,13 @@ class _SelectorDatagramTransport(_SelectorTransport):
self._sock.send(data) self._sock.send(data)
else: else:
self._sock.sendto(data, addr) self._sock.sendto(data, addr)
except (BlockingIOError, InterruptedError):
self._buffer.appendleft((data, addr)) # Try again later.
break
except ConnectionRefusedError as exc: except ConnectionRefusedError as exc:
if self._address: if self._address:
self._fatal_error(exc) self._fatal_error(exc)
return return
except (BlockingIOError, InterruptedError):
self._buffer.appendleft((data, addr)) # Try again later.
break
except Exception as exc: except Exception as exc:
self._fatal_error(exc) self._fatal_error(exc)
return return
...@@ -765,5 +751,4 @@ class _SelectorDatagramTransport(_SelectorTransport): ...@@ -765,5 +751,4 @@ class _SelectorDatagramTransport(_SelectorTransport):
def _force_close(self, exc): def _force_close(self, exc):
if self._address and isinstance(exc, ConnectionRefusedError): if self._address and isinstance(exc, ConnectionRefusedError):
self._protocol.connection_refused(exc) self._protocol.connection_refused(exc)
super()._force_close(exc) super()._force_close(exc)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment