Commit a4731a0c by Julien Muchembled

Fix invalid processing of unregistered connections

This could happen if a file descriptor was reallocated by the kernel.
1 parent ed50edca
......@@ -28,6 +28,10 @@ class EpollEventManager(object):
def __init__(self):
self.connection_dict = {}
# Initialize a dummy 'unregistered' for the very rare case a registered
# connection is closed before the first call to poll. We don't care
# leaking a few integers for connections closed between 2 polls.
self.unregistered = []
self.reader_set = set()
self.writer_set = set()
self.epoll = epoll()
......@@ -95,6 +99,7 @@ class EpollEventManager(object):
self.writer_set.discard(fd)
if not check_timeout:
del self.connection_dict[fd]
self.unregistered.append(fd)
def isIdle(self):
return not (self._pending_processing or self.writer_set)
......@@ -143,45 +148,40 @@ class EpollEventManager(object):
elif exc.errno != EINTR:
raise
return
if not event_list:
if blocking > 0:
timeout_conn.onTimeout()
return
wlist = []
elist = []
for fd, event in event_list:
if event & EPOLLIN:
conn = self.connection_dict[fd]
if conn.readable():
self._addPendingConnection(conn)
if event & EPOLLOUT:
wlist.append(fd)
if event & (EPOLLERR | EPOLLHUP):
elist.append(fd)
for fd in wlist:
# This can fail, if a connection is closed in readable().
try:
conn = self.connection_dict[fd]
except KeyError:
continue
conn.writable()
for fd in elist:
# This can fail, if a connection is closed in previous calls to
# readable() or writable().
try:
conn = self.connection_dict[fd]
except KeyError:
if fd == self._trigger_fd:
if event_list:
self.unregistered = unregistered = []
wlist = []
elist = []
for fd, event in event_list:
if event & EPOLLIN:
conn = self.connection_dict[fd]
if conn.readable():
self._addPendingConnection(conn)
if event & EPOLLOUT:
wlist.append(fd)
if event & (EPOLLERR | EPOLLHUP):
elist.append(fd)
for fd in wlist:
if fd not in unregistered:
self.connection_dict[fd].writable()
for fd in elist:
if fd in unregistered:
continue
try:
conn = self.connection_dict[fd]
except KeyError:
assert fd == self._trigger_fd, fd
with self._trigger_lock:
self.epoll.unregister(fd)
if self._trigger_exit:
del self._trigger_exit
thread.exit()
continue
if conn.readable():
self._addPendingConnection(conn)
continue
if conn.readable():
self._addPendingConnection(conn)
elif blocking > 0:
logging.debug('timeout triggered for %r', timeout_conn)
timeout_conn.onTimeout()
def wakeup(self, exit=False):
with self._trigger_lock:
......
......@@ -287,6 +287,7 @@ class ReplicationTests(NEOThreadedTest):
# XXX: review API for checking timeouts
backup.storage.em._blocking = 1
Serialized.tic(); self.assertEqual(count[0], 2)
Serialized.tic(); self.assertEqual(count[0], 2)
Serialized.tic(); self.assertEqual(count[0], 3)
self.assertTrue(t + 1 <= time.time())
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!