Commit eef52c27 by Julien Muchembled

Tickless poll loop, for lowest latency and cpu usage

With this patch, the epolling object is not awoken every second to check
if a timeout has expired. The API of Connection is changed to get the smallest
timeout.
parent fd0b9c98
......@@ -42,8 +42,6 @@ class _ThreadedPoll(Thread):
try:
while 1:
try:
# XXX: Delay can't be infinite here, because we need
# to check connection timeouts.
self.em.poll(1)
except Exception:
log(ERROR, 'poll raised, retrying', exc_info=1)
......
......@@ -225,7 +225,7 @@ class BaseConnection(object):
def cancelRequests(self, *args, **kw):
return self._handlers.cancelRequests(self, *args, **kw)
def checkTimeout(self, t):
def getTimeout(self):
pass
def lockWrapper(self, func):
......@@ -351,7 +351,8 @@ class Connection(BaseConnection):
client = False
server = False
peer_id = None
_base_timeout = None
_next_timeout = None
_timeout = 0
def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw)
......@@ -428,25 +429,26 @@ class Connection(BaseConnection):
def updateTimeout(self, t=None):
if not self._queue:
if t:
self._base_timeout = t
if not t:
t = self._next_timeout - self._timeout
self._timeout = self._handlers.getNextTimeout() or self.KEEP_ALIVE
self._next_timeout = t + self._timeout
def getTimeout(self):
if not self._queue:
return self._next_timeout
def checkTimeout(self, t):
# first make sure we don't timeout on answers we already received
if self._base_timeout and not self._queue:
if self._timeout <= t - self._base_timeout:
handlers = self._handlers
if handlers.isPending():
msg_id = handlers.timeout(self)
if msg_id is None:
self._base_timeout = t
else:
logging.info('timeout for #0x%08x with %r',
msg_id, self)
self.close()
else:
self.idle()
def onTimeout(self):
handlers = self._handlers
if handlers.isPending():
msg_id = handlers.timeout(self)
if msg_id is None:
self._next_timeout = time() + self._timeout
else:
logging.info('timeout for #0x%08x with %r', msg_id, self)
self.close()
else:
self.idle()
def abort(self):
"""Abort dealing with this connection."""
......@@ -544,8 +546,8 @@ class Connection(BaseConnection):
# try to reenable polling for writing.
self.write_buf[:] = '',
self.em.unregister(self, check_timeout=True)
self.checkTimeout = self.lockWrapper(lambda t:
t < connect_limit or self._delayed_closure())
self.getTimeout = lambda: connect_limit
self.onTimeout = self.lockWrapper(self._delayed_closure)
self.readable = self.writable = lambda: None
else:
connect_limit = t + 1
......@@ -575,7 +577,8 @@ class Connection(BaseConnection):
logging.debug('Connection %r closed in recv', self.connector)
self._closure()
return
self._base_timeout = time() # last known remote activity
# last known remote activity
self._next_timeout = time() + self._timeout
self.read_buf.append(data)
def _send(self):
......@@ -639,7 +642,11 @@ class Connection(BaseConnection):
handlers = self._handlers
t = None if handlers.isPending() else time()
handlers.emit(packet, timeout, on_timeout, kw)
self.updateTimeout(t)
if not self._queue:
next_timeout = self._next_timeout
self.updateTimeout(t)
if self._next_timeout < next_timeout:
self.em.wakeup()
return msg_id
@not_closed
......@@ -717,7 +724,7 @@ class MTConnectionType(type):
if __debug__:
for name in 'analyse', 'answer':
setattr(cls, name, cls.lockCheckWrapper(name))
for name in ('close', 'checkTimeout', 'notify',
for name in ('close', 'notify', 'onTimeout',
'process', 'readable', 'writable'):
setattr(cls, name, cls.__class__.lockWrapper(cls, name))
......@@ -775,5 +782,9 @@ class MTClientConnection(ClientConnection):
handlers = self._handlers
t = None if handlers.isPending() else time()
handlers.emit(packet, timeout, on_timeout, kw)
self.updateTimeout(t)
if not self._queue:
next_timeout = self._next_timeout
self.updateTimeout(t)
if self._next_timeout < next_timeout:
self.em.wakeup()
return msg_id
......@@ -123,6 +123,17 @@ class EpollEventManager(object):
self._poll(timeout=0)
def _poll(self, timeout=1):
if timeout:
timeout = None
for conn in self.connection_dict.itervalues():
t = conn.getTimeout()
if t and (timeout is None or t < timeout):
timeout = t
timeout_conn = conn
# Make sure epoll_wait does not return too early, because it has a
# granularity of 1ms and Python 2.7 rounds the timeout towards zero.
# See also https://bugs.python.org/issue20452 (fixed in Python 3).
timeout = .001 + max(0, timeout - time()) if timeout else -1
try:
event_list = self.epoll.poll(timeout)
except IOError, exc:
......@@ -131,7 +142,11 @@ class EpollEventManager(object):
exc.errno)
elif exc.errno != EINTR:
raise
event_list = ()
return
if not event_list:
if timeout > 0:
timeout_conn.onTimeout()
return
wlist = []
elist = []
for fd, event in event_list:
......@@ -168,10 +183,6 @@ class EpollEventManager(object):
if conn.readable():
self._addPendingConnection(conn)
t = time()
for conn in self.connection_dict.values():
conn.checkTimeout(t)
def wakeup(self, exit=False):
with self._trigger_lock:
self._trigger_exit |= exit
......
......@@ -789,8 +789,12 @@ class ConnectionTests(NeoUnitTestBase):
p.setId(packet_id)
conn.connector.receive = [''.join(p.encode())].pop
conn.readable()
conn.checkTimeout(connection.time())
checkTimeout()
conn.process()
def checkTimeout():
timeout = conn.getTimeout()
if timeout and timeout <= connection.time():
conn.onTimeout()
try:
for use_case, expected in use_case_list:
i = iter(use_case)
......@@ -801,7 +805,7 @@ class ConnectionTests(NeoUnitTestBase):
conn.ask(Packets.Ping())
for t in i:
set_time(t)
conn.checkTimeout(connection.time())
checkTimeout()
packet_id = i.next()
if packet_id is None:
conn.ask(Packets.Ping())
......@@ -810,11 +814,11 @@ class ConnectionTests(NeoUnitTestBase):
i = iter(expected)
for t in i:
set_time(t - .1)
conn.checkTimeout(connection.time())
checkTimeout()
set_time(t)
# this test method relies on the fact that only
# conn.close is called in case of a timeout
conn.checkTimeout(connection.time())
checkTimeout()
self.assertEqual(closed.pop(), connection.time())
answer(i.next())
self.assertFalse(conn.isPending())
......
......@@ -96,12 +96,12 @@ class EventTests(NeoUnitTestBase):
(r_connector.getDescriptor(), EPOLLIN),
(w_connector.getDescriptor(), EPOLLOUT),
)})
em.poll(timeout=10)
em.poll(timeout=1)
# check it called poll on epoll
self.assertEqual(len(em.epoll.mockGetNamedCalls("poll")), 1)
call = em.epoll.mockGetNamedCalls("poll")[0]
data = call.getParam(0)
self.assertEqual(data, 10)
self.assertEqual(data, -1)
# need to rebuild completely this test and the the packet queue
# check readable conn
#self.assertEqual(len(r_conn.mockGetNamedCalls("readable")), 1)
......
......@@ -137,7 +137,7 @@ class SerializedEventManager(EventManager):
def _poll(self, timeout=1):
if self._pending_processing:
assert timeout <= 0
assert timeout == 0, timeout
elif 0 == self._timeout == timeout == Serialized.pending == len(
self.writer_set):
return
......@@ -365,7 +365,7 @@ class NeoCTL(neo.neoctl.app.NeoCTL):
@SerializedEventManager.decorate
def __init__(self, *args, **kw):
super(NeoCTL, self).__init__(*args, **kw)
self.em._timeout = -1
self.em._timeout = 1
class LoggerThreadName(str):
......@@ -466,7 +466,7 @@ class ConnectionFilter(object):
class NEOCluster(object):
BaseConnection_checkTimeout = staticmethod(BaseConnection.checkTimeout)
BaseConnection_getTimeout = staticmethod(BaseConnection.getTimeout)
SocketConnector_makeClientConnection = staticmethod(
SocketConnector.makeClientConnection)
SocketConnector_makeListeningConnection = staticmethod(
......@@ -517,7 +517,7 @@ class NEOCluster(object):
# TODO: 'sleep' should 'tic' in a smart way, so that storages can be
# safely started even if the cluster isn't.
bootstrap.sleep = lambda seconds: None
BaseConnection.checkTimeout = lambda self, t: None
BaseConnection.getTimeout = lambda self: None
SocketConnector.makeClientConnection = makeClientConnection
SocketConnector.makeListeningConnection = lambda self, addr: \
cls.SocketConnector_makeListeningConnection(self, BIND)
......@@ -533,7 +533,7 @@ class NEOCluster(object):
if cls._patch_count:
return
bootstrap.sleep = time.sleep
BaseConnection.checkTimeout = cls.BaseConnection_checkTimeout
BaseConnection.getTimeout = cls.BaseConnection_getTimeout
SocketConnector.makeClientConnection = \
cls.SocketConnector_makeClientConnection
SocketConnector.makeListeningConnection = \
......
......@@ -22,6 +22,7 @@ from functools import wraps
from neo.lib import logging
from neo.storage.checker import CHECK_COUNT
from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager
from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64
......@@ -249,17 +250,22 @@ class ReplicationTests(NEOThreadedTest):
"""
conn, = backup.master.getConnectionList(backup.upstream.master)
# trigger ping
conn.updateTimeout(1)
self.assertFalse(conn.isPending())
conn.checkTimeout(time.time())
conn.onTimeout()
self.assertTrue(conn.isPending())
# force ping to have expired
conn.updateTimeout(1)
# connection will be closed before upstream master has time
# to answer
backup.tic(force=1)
def _poll(orig, self, timeout):
if backup.master.em is self:
p.revert()
conn.onTimeout()
else:
orig(self, timeout)
with Patch(EventManager, _poll=_poll) as p:
backup.tic(force=1)
new_conn, = backup.master.getConnectionList(backup.upstream.master)
self.assertFalse(new_conn is conn)
self.assertIsNot(new_conn, conn)
@backup_test()
def testBackupUpstreamStorageDead(self, backup):
......@@ -277,11 +283,12 @@ class ReplicationTests(NEOThreadedTest):
upstream.storage.listening_conn.close()
Serialized.tic(); self.assertEqual(count[0], 0)
Serialized.tic(); count[0] or Serialized.tic()
t = time.time()
# XXX: review API for checking timeouts
backup.storage.em._timeout = 1
Serialized.tic(); self.assertEqual(count[0], 2)
Serialized.tic(); self.assertEqual(count[0], 2)
time.sleep(1.1)
Serialized.tic(); self.assertEqual(count[0], 3)
Serialized.tic(); self.assertEqual(count[0], 3)
self.assertTrue(t + 1 <= time.time())
@backup_test()
def testBackupDelayedUnlockTransaction(self, backup):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment