Commit 4a328ade authored by Julien Muchembled's avatar Julien Muchembled

Fix 100% CPU usage when the closure of a connection is delayed

parent 4e739de4
......@@ -246,10 +246,7 @@ class BaseConnection(object):
def close(self):
"""Close the connection."""
if self.connector is not None:
em = self.em
em.removeReader(self)
em.removeWriter(self)
em.unregister(self)
self.em.unregister(self)
self.connector.close()
self.connector = None
self.aborted = False
......@@ -538,6 +535,10 @@ class Connection(BaseConnection):
global connect_limit
t = time()
if t < connect_limit:
# Fake _addPacket so that if does not
# try to reenable polling for writing.
self.write_buf[:] = '',
self.em.unregister(self, check_timeout=True)
self.checkTimeout = self.lockWrapper(lambda t:
t < connect_limit or self._delayed_closure())
self.readable = self.writable = lambda: None
......
......@@ -16,7 +16,7 @@
from time import time
from select import epoll, EPOLLIN, EPOLLOUT, EPOLLERR, EPOLLHUP
from errno import EINTR, EAGAIN
from errno import EAGAIN, EINTR, ENOENT
from . import logging
class EpollEventManager(object):
......@@ -61,12 +61,15 @@ class EpollEventManager(object):
append(conn)
return result
# epoll_wait always waits for EPOLLERR & EPOLLHUP so we're forced
# to unregister when we want to ignore all events for a connection.
def register(self, conn):
fd = conn.getConnector().getDescriptor()
self.connection_dict[fd] = conn
self.epoll.register(fd)
def unregister(self, conn):
def unregister(self, conn, check_timeout=False):
new_pending_processing = [x for x in self._pending_processing
if x is not conn]
# Check that we removed at most one entry from
......@@ -74,7 +77,15 @@ class EpollEventManager(object):
assert len(new_pending_processing) > len(self._pending_processing) - 2
self._pending_processing = new_pending_processing
fd = conn.getConnector().getDescriptor()
try:
self.epoll.unregister(fd)
except IOError, e:
if e.errno != ENOENT:
raise
else:
self.reader_set.discard(fd)
self.writer_set.discard(fd)
if not check_timeout:
del self.connection_dict[fd]
def isIdle(self):
......
......@@ -395,7 +395,6 @@ class ConnectionTests(NeoUnitTestBase):
self._checkSend(1, "testdatasecondthird")
# connection closed -> buffers flushed
self._checkWriteBuf(bc, '')
self._checkReaderRemoved(1)
self._checkConnectionClosed(1)
self._checkUnregistered(1)
finally:
......@@ -571,8 +570,6 @@ class ConnectionTests(NeoUnitTestBase):
self._checkUnregistered(1)
# nothing else pending, so writer has been removed
self.assertFalse(bc.pending())
self._checkWriterRemoved(1)
self._checkReaderRemoved(1)
self._checkClose(1)
finally:
del DoNothingConnector.send
......@@ -735,8 +732,6 @@ class ConnectionTests(NeoUnitTestBase):
self._checkConnectionFailed(0)
self._checkUnregistered(1)
self._checkReaderAdded(1)
self._checkWriterRemoved(1)
self._checkReaderRemoved(1)
def test_14_ServerConnection(self):
bc = self._makeServerConnection()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment