Commit 737e227a authored by Julien Muchembled's avatar Julien Muchembled

connection: reimplement timeout logic and redefine pings as a keep-alive feature

- Previous implementation was not able to import transactions with many small
  objects, the client for faster to send a store request than to process its
  answer. If X is the difference of time for these 2 operations, the maximum
  number of objects a transaction could contain was CRITICAL_TIMEOUT / X.
  And HasLock feature can't act as a workaround because it is not working yet.
- Change API of 'on_timeout', which currently only used by HasLock.
- Stop pinging when we wait for an answer. This wastes resources and would
  never recover any bad state.
- Make client connections send pings when they are idle instead.
  This implements keep-alive feature for high availability.
  Start with an non-configurable period of 60 seconds.
- Move processing of ping/pong to handlers.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2762 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f7378a70
......@@ -20,7 +20,7 @@ RC - Clarify cell state signification
RC - Review XXX in the code (CODE)
RC - Review TODO in the code (CODE)
RC - Review output of pylint (CODE)
- Keep-alive (HIGH AVAILABILITY)
- Keep-alive (HIGH AVAILABILITY) (implemented, to be reviewed and tested)
Consider the need to implement a keep-alive system (packets sent
automatically when there is no activity on the connection for a period
of time).
......
......@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.lib.handler import EventHandler
from neo.lib.protocol import ProtocolError
from neo.lib.protocol import ProtocolError, Packets
class BaseHandler(EventHandler):
"""Base class for client-side EventHandler implementations."""
......@@ -37,10 +37,10 @@ class BaseHandler(EventHandler):
def packetReceived(self, conn, packet):
"""Redirect all received packet to dispatcher thread."""
if packet.isResponse():
if packet.isResponse() and type(packet) is not Packets.Pong:
if not self.dispatcher.dispatch(conn, packet.getId(), packet):
raise ProtocolError('Unexpected response packet from %r: %r',
conn, packet)
raise ProtocolError('Unexpected response packet from %r: %r'
% (conn, packet))
else:
self.dispatch(conn, packet)
......
......@@ -32,9 +32,7 @@ from neo.lib import attributeTracker
from neo.lib.util import ReadBuffer
from neo.lib.profiling import profiler_decorator
PING_DELAY = 6
PING_TIMEOUT = 5
INCOMING_TIMEOUT = 10
KEEP_ALIVE = 60
CRITICAL_TIMEOUT = 30
class ConnectionClosed(Exception):
......@@ -130,26 +128,19 @@ class HandlerSwitcher(object):
self._next_on_timeout = on_timeout
request_dict[msg_id] = (answer_class, timeout, on_timeout)
def checkTimeout(self, connection, t):
next_timeout = self._next_timeout
if next_timeout is not None and next_timeout < t:
def getNextTimeout(self):
return self._next_timeout
def timeout(self, connection):
msg_id = self._next_timeout_msg_id
if self._next_on_timeout is None:
result = msg_id
else:
if self._next_on_timeout(connection, msg_id):
# Don't notify that a timeout occured, and forget about
# this answer.
for (request_dict, _) in self._pending:
request_dict.pop(msg_id, None)
self._updateNextTimeout()
result = None
else:
if self._next_on_timeout is not None:
self._next_on_timeout(connection, msg_id)
if self._next_timeout_msg_id != msg_id:
# on_timeout sent a packet with a smaller timeout
# so keep the connection open
return
# Notify that a timeout occured
result = msg_id
else:
result = None
return result
return msg_id
def handle(self, connection, packet):
assert not self._is_handling
......@@ -191,24 +182,18 @@ class HandlerSwitcher(object):
neo.lib.logging.debug(
'Apply handler %r on %r', self._pending[0][1],
connection)
if timeout == self._next_timeout:
if msg_id == self._next_timeout_msg_id:
self._updateNextTimeout()
def _updateNextTimeout(self):
# Find next timeout and its msg_id
timeout_list = []
extend = timeout_list.extend
for (request_dict, handler) in self._pending:
extend(((timeout, msg_id, on_timeout) \
for msg_id, (_, timeout, on_timeout) in \
request_dict.iteritems()))
if timeout_list:
timeout_list.sort(key=lambda x: x[0])
self._next_timeout, self._next_timeout_msg_id, \
self._next_on_timeout = timeout_list[0]
else:
self._next_timeout, self._next_timeout_msg_id, \
self._next_on_timeout = None, None, None
next_timeout = None
for pending in self._pending:
for msg_id, (_, timeout, on_timeout) in pending[0].iteritems():
if not next_timeout or timeout < next_timeout[0]:
next_timeout = timeout, msg_id, on_timeout
self._next_timeout, self._next_timeout_msg_id, self._next_on_timeout = \
next_timeout or (None, None, None)
@profiler_decorator
def setHandler(self, handler):
......@@ -222,53 +207,28 @@ class HandlerSwitcher(object):
return can_apply
class Timeout(object):
""" Keep track of connection-level timeouts """
def __init__(self):
self._ping_time = None
self._critical_time = None
def update(self, t, force=False):
"""
Send occurred:
- set ping time if earlier than existing one
"""
ping_time = self._ping_time
t += PING_DELAY
if force or ping_time is None or t < ping_time:
self._ping_time = t
def refresh(self, t):
"""
Recv occured:
- reschedule next ping time
- as this is an evidence that node is alive, remove pong expectation
"""
self._ping_time = t + PING_DELAY
self._critical_time = None
def ping(self, t):
"""
Ping send occured:
- reschedule next ping time
- set pong expectation
class BaseConnection(object):
"""A base connection
About timeouts:
Timeout are mainly per-connection instead of per-packet.
The idea is that most of time, packets are received and processed
sequentially, so if it takes a long for a peer to process a packet,
following packets would just be enqueued.
What really matters is that the peer makes progress in its work.
As long as we receive an answer, we consider it's still alive and
it may just have started to process the following request. So we reset
timeouts.
There is anyway nothing more we could do, because processing of a packet
may be delayed in a very unpredictable way depending of previously
received packets on peer side.
Even ourself may be slow to receive a packet. We must not timeout for
an answer that is already in our incoming buffer (read_buf or _queue).
Timeouts in HandlerSwitcher are only there to prioritize some packets.
"""
self._ping_time = t + PING_DELAY
self._critical_time = t + PING_TIMEOUT
def softExpired(self, t):
""" Do we need to ping ? """
return self._ping_time < t
def hardExpired(self, t):
""" Have we reached pong latest arrival time, if set ? """
critical_time = self._critical_time
return critical_time is not None and critical_time < t
class BaseConnection(object):
"""A base connection."""
_base_timeout = None
def __init__(self, event_manager, handler, connector, addr=None):
assert connector is not None, "Need a low-level connector"
......@@ -276,28 +236,33 @@ class BaseConnection(object):
self.connector = connector
self.addr = addr
self._handlers = HandlerSwitcher(handler)
self._timeout = Timeout()
event_manager.register(self)
def isPending(self):
return self._handlers.isPending()
def updateTimeout(self, t=None):
if not self._queue:
if t:
self._base_timeout = t
self._timeout = self._handlers.getNextTimeout() or KEEP_ALIVE
def checkTimeout(self, t):
# first make sure we don't timeout on answers we already received
if self._base_timeout and not self._queue:
timeout = t - self._base_timeout
if self._timeout <= timeout:
handlers = self._handlers
if handlers.isPending():
msg_id = handlers.checkTimeout(self, t)
if msg_id is not None:
neo.lib.logging.info(
'timeout for #0x%08x with %r', msg_id, self)
msg_id = handlers.timeout(self)
if msg_id is None:
self._base_timeout = t
else:
neo.lib.logging.info('timeout for #0x%08x with %r',
msg_id, self)
self.close()
elif self._timeout.hardExpired(t):
# critical time reach or pong not received, abort
neo.lib.logging.info('timeout with %r', self)
self.notify(Packets.Notify('Timeout'))
self.abort()
elif self._timeout.softExpired(t):
self._timeout.ping(t)
self.ping()
else:
self.idle()
def lock(self):
return 1
......@@ -381,6 +346,10 @@ class BaseConnection(object):
"""
return attributeTracker.whoSet(self, 'connector')
def idle(self):
pass
attributeTracker.track(BaseConnection)
class ListeningConnection(BaseConnection):
......@@ -400,8 +369,6 @@ class ListeningConnection(BaseConnection):
handler = self.getHandler()
new_conn = ServerConnection(self.getEventManager(), handler,
connector=new_s, addr=addr)
# A request for a node identification should arrive.
self._timeout.update(time())
handler.connectionAccepted(new_conn)
except ConnectorTryAgainException:
pass
......@@ -421,9 +388,8 @@ class Connection(BaseConnection):
connecting = False
def __init__(self, event_manager, handler, connector, addr=None):
BaseConnection.__init__(self, event_manager, handler,
connector=connector, addr=addr)
def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw)
self.read_buf = ReadBuffer()
self.write_buf = []
self.cur_id = 0
......@@ -493,18 +459,6 @@ class Connection(BaseConnection):
except PacketMalformedError, msg:
self.getHandler()._packetMalformed(self, msg)
return
self._timeout.refresh(time())
packet_type = type(packet)
if packet_type is Packets.Ping:
# Send a pong notification
PACKET_LOGGER.dispatch(self, packet, False)
if not self.aborted:
self.answer(Packets.Pong(), packet.getId())
elif packet_type is Packets.Pong:
# Skip PONG packets, its only purpose is refresh the timeout
# generated upong ping. But still log them.
PACKET_LOGGER.dispatch(self, packet, False)
else:
self._queue.append(packet)
def hasPendingMessages(self):
......@@ -520,6 +474,7 @@ class Connection(BaseConnection):
# check out packet and process it with current handler
packet = self._queue.pop(0)
self._handlers.handle(self, packet)
self.updateTimeout()
def pending(self):
return self.connector is not None and self.write_buf
......@@ -580,6 +535,7 @@ class Connection(BaseConnection):
'Connection %r closed in recv', self.connector)
self._closure()
return
self._base_timeout = time() # last known remote activity
self.read_buf.append(data)
@profiler_decorator
......@@ -646,11 +602,10 @@ class Connection(BaseConnection):
msg_id = self._getNextId()
packet.setId(msg_id)
self._addPacket(packet)
t = time()
# If there is no pending request, initialise timeout values.
if not self._handlers.isPending():
self._timeout.update(t, force=True)
self._handlers.emit(packet, t + timeout, on_timeout)
handlers = self._handlers
t = not handlers.isPending() and time() or None
handlers.emit(packet, timeout, on_timeout)
self.updateTimeout(t)
return msg_id
@not_closed
......@@ -662,21 +617,14 @@ class Connection(BaseConnection):
assert packet.isResponse(), packet
self._addPacket(packet)
@not_closed
def ping(self):
packet = Packets.Ping()
packet.setId(self._getNextId())
self._addPacket(packet)
class ClientConnection(Connection):
"""A connection from this node to a remote node."""
connecting = True
def __init__(self, event_manager, handler, addr, connector, **kw):
Connection.__init__(self, event_manager, handler, addr=addr,
connector=connector)
def __init__(self, event_manager, handler, addr, connector):
Connection.__init__(self, event_manager, handler, connector, addr)
handler.connectionStarted(self)
try:
try:
......@@ -685,6 +633,7 @@ class ClientConnection(Connection):
event_manager.addWriter(self)
else:
self.connecting = False
self.updateTimeout(time())
self.getHandler().connectionCompleted(self)
except ConnectorConnectionRefusedException:
self._closure()
......@@ -702,6 +651,7 @@ class ClientConnection(Connection):
return
else:
self.connecting = False
self.updateTimeout(time())
self.getHandler().connectionCompleted(self)
self.em.addReader(self)
else:
......@@ -710,10 +660,17 @@ class ClientConnection(Connection):
def isClient(self):
return True
def idle(self):
self.ask(Packets.Ping())
class ServerConnection(Connection):
"""A connection from a remote node to this node."""
def __init__(self, *args, **kw):
Connection.__init__(self, *args, **kw)
self.updateTimeout(time())
def isServer(self):
return True
......@@ -778,11 +735,10 @@ class MTClientConnection(ClientConnection):
else:
self.dispatcher.register(self, msg_id, queue)
self._addPacket(packet)
t = time()
# If there is no pending request, initialise timeout values.
if not self._handlers.isPending():
self._timeout.update(t)
self._handlers.emit(packet, t + timeout, on_timeout)
handlers = self._handlers
t = not handlers.isPending() and time() or None
handlers.emit(packet, timeout, on_timeout)
self.updateTimeout(t)
return msg_id
finally:
self.unlock()
......
......@@ -121,6 +121,15 @@ class EventHandler(object):
# Packet handlers.
def ping(self, conn):
if not conn.isAborted():
conn.answer(Packets.Pong())
def pong(self, conn):
# Ignore PONG packets. The only purpose of ping/pong packets is
# to test/maintain underlying connection.
pass
def notify(self, conn, message):
neo.lib.logging.info('notification from %r: %s', conn, message)
......
# -*- coding: utf-8 -*-
#
# Copyright (C) 2009-2010 Nexedi SA
#
......@@ -19,11 +20,12 @@ from time import time
from mock import Mock
from neo.lib.connection import ListeningConnection, Connection, \
ClientConnection, ServerConnection, MTClientConnection, \
HandlerSwitcher, Timeout, PING_DELAY, PING_TIMEOUT, OnTimeout
HandlerSwitcher, CRITICAL_TIMEOUT
from neo.lib.connector import getConnectorHandler, registerConnectorHandler
from neo.tests import DoNothingConnector
from neo.lib.connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException
from neo.lib.handler import EventHandler
from neo.lib.protocol import Packets, ParserState
from neo.tests import NeoUnitTestBase
from neo.lib.util import ReadBuffer
......@@ -502,40 +504,6 @@ class ConnectionTests(NeoUnitTestBase):
self.assertEqual(data.decode(), p.decode())
self._checkReadBuf(bc, '')
def test_Connection_analyse5(self):
# test ping handling
bc = self._makeConnection()
bc._queue = Mock()
p = Packets.Ping()
p.setId(1)
self._appendPacketToReadBuf(bc, p)
bc.analyse()
# check no packet was queued
self.assertEqual(len(bc._queue.mockGetNamedCalls("append")), 0)
# check pong answered
parser_state = ParserState()
buffer = ReadBuffer()
for chunk in bc.write_buf:
buffer.append(chunk)
answer = Packets.parse(buffer, parser_state)
self.assertTrue(answer is not None)
self.assertTrue(type(answer) == Packets.Pong)
self.assertEqual(answer.getId(), p.getId())
def test_Connection_analyse6(self):
# test pong handling
bc = self._makeConnection()
bc._timeout = Mock()
bc._queue = Mock()
p = Packets.Pong()
p.setId(1)
self._appendPacketToReadBuf(bc, p)
bc.analyse()
# check no packet was queued
self.assertEqual(len(bc._queue.mockGetNamedCalls("append")), 0)
# check timeout has been refreshed
self.assertEqual(len(bc._timeout.mockGetNamedCalls("refresh")), 1)
def test_Connection_writable1(self):
# with pending operation after send
def send(self, data):
......@@ -800,6 +768,68 @@ class ConnectionTests(NeoUnitTestBase):
next_id = bc._getNextId()
self.assertEqual(next_id, 0)
def test_15_Timeout(self):
# NOTE: This method uses ping/pong packets only because MT connection
# don't accept any other packet without specifying a queue.
self.handler = EventHandler(self.app)
conn = self._makeClientConnection()
use_case_list = (
# (a) For a single packet sent at T,
# the limit time for the answer is T + (1 * CRITICAL_TIMEOUT)
((), (1., 0)),
# (b) Same as (a), even if send another packet at (T + CT/2).
# But receiving a packet (at T + CT - ε) resets the timeout
# (which means the limit for the 2nd one is T + 2*CT)
((.5, None), (1., 0, 2., 1)),
# (c) Same as (b) with a first answer at well before the limit
# (T' = T + CT/2). The limit for the second one is T' + CT.
((.1, None, .5, 1), (1.5, 0)),
)
from neo.lib import connection
def set_time(t):
connection.time = lambda: int(CRITICAL_TIMEOUT * (1000 + t))
closed = []
conn.close = lambda: closed.append(connection.time())
def answer(packet_id):
p = Packets.Pong()
p.setId(packet_id)
conn.connector.receive = [''.join(p.encode())].pop
conn.readable()
conn.checkTimeout(connection.time())
conn.process()
try:
for use_case, expected in use_case_list:
i = iter(use_case)
conn.cur_id = 0
set_time(0)
# No timeout when no pending request
self.assertEqual(conn._handlers.getNextTimeout(), None)
conn.ask(Packets.Ping())
for t in i:
set_time(t)
conn.checkTimeout(connection.time())
packet_id = i.next()
if packet_id is None:
conn.ask(Packets.Ping())
else:
answer(packet_id)
i = iter(expected)
for t in i:
set_time(t - .1)
conn.checkTimeout(connection.time())
set_time(t)
# this test method relies on the fact that only
# conn.close is called in case of a timeout
conn.checkTimeout(connection.time())
self.assertEqual(closed.pop(), connection.time())
answer(i.next())
self.assertFalse(conn.isPending())
self.assertFalse(closed)
finally:
connection.time = time
class MTConnectionTests(ConnectionTests):
# XXX: here we test non-client-connection-related things too, which
# duplicates test suite work... Should be fragmented into finer-grained
......@@ -1005,142 +1035,6 @@ class HandlerSwitcherTests(NeoUnitTestBase):
self._handlers.handle(self._connection, a2)
self.checkAborted(self._connection)
def testTimeout(self):
"""
This timeout happens when a request has not been answered for longer
than a duration defined at emit() time.
"""
now = time()
# No timeout when no pending request
self.assertEqual(self._handlers.checkTimeout(self._connection, now),
None)
# Prepare some requests
msg_id_1 = 1
msg_id_2 = 2
msg_id_3 = 3
msg_id_4 = 4
r1 = self._makeRequest(msg_id_1)
a1 = self._makeAnswer(msg_id_1)
r2 = self._makeRequest(msg_id_2)
a2 = self._makeAnswer(msg_id_2)
r3 = self._makeRequest(msg_id_3)
r4 = self._makeRequest(msg_id_4)
msg_1_time = now + 5
msg_2_time = msg_1_time + 5
msg_3_time = msg_2_time + 5
msg_4_time = msg_3_time + 5
markers = []
def msg_3_on_timeout(conn, msg_id):
markers.append((3, conn, msg_id))
return True
def msg_4_on_timeout(conn, msg_id):
markers.append((4, conn, msg_id))
return False
# Emit r3 before all other, to test that it's time parameter value
# which is used, not the registration order.
self._handlers.emit(r3, msg_3_time, OnTimeout(msg_3_on_timeout))
self._handlers.emit(r1, msg_1_time, None)
self._handlers.emit(r2, msg_2_time, None)
# No timeout before msg_1_time
self.assertEqual(self._handlers.checkTimeout(self._connection, now),
None)
# Timeout for msg_1 after msg_1_time
self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_1_time + 0.5), msg_id_1)
# If msg_1 met its answer, no timeout after msg_1_time
self._handlers.handle(self._connection, a1)
self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_1_time + 0.5), None)
# Next timeout is after msg_2_time
self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_2_time + 0.5), msg_id_2)
self._handlers.handle(self._connection, a2)
# Sanity check
self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_2_time + 0.5), None)
# msg_3 timeout will fire msg_3_on_timeout callback, which causes the
# timeout to be ignored (it returns True)
self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_3_time + 0.5), None)
# ...check that callback actually fired
self.assertEqual(len(markers), 1)
# ...with expected parameters
self.assertEqual(markers[0], (3, self._connection, msg_id_3))
# answer to msg_3 must not be expected anymore (and it was the last
# expected message)
self.assertFalse(self._handlers.isPending())
del markers[:]
self._handlers.emit(r4, msg_4_time, OnTimeout(msg_4_on_timeout))
# msg_4 timeout will fire msg_4_on_timeout callback, which lets the
# timeout be detected (it returns False)
self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_4_time + 0.5), msg_id_4)
# ...check that callback actually fired
self.assertEqual(len(markers), 1)
# ...with expected parameters
self.assertEqual(markers[0], (4, self._connection, msg_id_4))
class TestTimeout(NeoUnitTestBase):
def setUp(self):
NeoUnitTestBase.setUp(self)
self.current = time()
self.timeout = Timeout()
self._updateAt(0)
self.assertTrue(PING_DELAY > PING_TIMEOUT) # Sanity check
def _checkAt(self, n, soft, hard):
at = self.current + n
self.assertEqual(soft, self.timeout.softExpired(at))
self.assertEqual(hard, self.timeout.hardExpired(at))
def _updateAt(self, n, force=False):
self.timeout.update(self.current + n, force=force)
def _refreshAt(self, n):
self.timeout.refresh(self.current + n)
def _pingAt(self, n):
self.timeout.ping(self.current + n)
def testSoftTimeout(self):
"""
Soft timeout is when a ping should be sent to peer to see if it's
still responsive, after seing no life sign for PING_DELAY.
"""
# Before PING_DELAY, no timeout.
self._checkAt(PING_DELAY - 0.5, False, False)
# If nothing came to refresh the timeout, soft timeout will be asserted
# after PING_DELAY.
self._checkAt(PING_DELAY + 0.5, True, False)
# If something refreshes the timeout, soft timeout will not be asserted
# after PING_DELAY.
answer_time = PING_DELAY - 0.5
self._refreshAt(answer_time)
self._checkAt(PING_DELAY + 0.5, False, False)
# ...but it will happen again after PING_DELAY after that answer
self._checkAt(answer_time + PING_DELAY + 0.5, True, False)
# if there is no more pending requests, a clear will happen so next
# send doesn't immediately trigger a ping
new_request_time = answer_time + PING_DELAY * 2
self._updateAt(new_request_time, force=True)
self._checkAt(new_request_time + PING_DELAY - 0.5, False, False)
self._checkAt(new_request_time + PING_DELAY + 0.5, True, False)
def testHardTimeout(self):
"""
Hard timeout is when a ping was sent, and any life sign must come
back to us before PING_TIMEOUT.
"""
# A timeout triggered at PING_DELAY, so a ping was sent.
self._pingAt(PING_DELAY)
# Before PING_DELAY + PING_TIMEOUT, no timeout occurs.
self._checkAt(PING_DELAY + PING_TIMEOUT - 0.5, False, False)
# After PING_DELAY + PING_TIMEOUT, hard timeout occurs.
self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, True)
# If anything happened on the connection, there is no hard timeout
# anymore after PING_DELAY + PING_TIMEOUT.
self._refreshAt(PING_DELAY + PING_TIMEOUT - 0.5)
self._checkAt(PING_DELAY + PING_TIMEOUT + 0.5, False, False)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment