Commit f6eb02b4 authored by Julien Muchembled's avatar Julien Muchembled

Remove packet timeouts

Since it's not worth anymore to keep track of the last connection activity
(which, btw, ignored TCP ACKs, i.e. timeouts could theorically be triggered
before all the data were actually sent), the semantics of closeClient has also
changed. Before this commit, the 1-minute timeout was reset whenever there was
activity (connection still used as server). Now, it happens exactly 100 seconds
after the connection is not used anymore as client.
parent 9b70f88f
...@@ -23,15 +23,11 @@ from .locking import RLock ...@@ -23,15 +23,11 @@ from .locking import RLock
from .protocol import uuid_str, Errors, PacketMalformedError, Packets from .protocol import uuid_str, Errors, PacketMalformedError, Packets
from .util import dummy_read_buffer, ReadBuffer from .util import dummy_read_buffer, ReadBuffer
CRITICAL_TIMEOUT = 30
class ConnectionClosed(Exception): class ConnectionClosed(Exception):
pass pass
class HandlerSwitcher(object): class HandlerSwitcher(object):
_is_handling = False _is_handling = False
_next_timeout = None
_next_timeout_msg_id = None
_pending = ({}, None), _pending = ({}, None),
def __init__(self, handler): def __init__(self, handler):
...@@ -53,7 +49,7 @@ class HandlerSwitcher(object): ...@@ -53,7 +49,7 @@ class HandlerSwitcher(object):
while request_dict: while request_dict:
msg_id, request = request_dict.popitem() msg_id, request = request_dict.popitem()
p.setId(msg_id) p.setId(msg_id)
handler.packetReceived(conn, p, request[2]) handler.packetReceived(conn, p, request[1])
if len(self._pending) == 1: if len(self._pending) == 1:
break break
del self._pending[0] del self._pending[0]
...@@ -65,7 +61,7 @@ class HandlerSwitcher(object): ...@@ -65,7 +61,7 @@ class HandlerSwitcher(object):
""" Return the last (may be unapplied) handler registered """ """ Return the last (may be unapplied) handler registered """
return self._pending[-1][1] return self._pending[-1][1]
def emit(self, request, timeout, kw={}): def emit(self, request, kw={}):
# register the request in the current handler # register the request in the current handler
_pending = self._pending _pending = self._pending
if self._is_handling: if self._is_handling:
...@@ -80,18 +76,7 @@ class HandlerSwitcher(object): ...@@ -80,18 +76,7 @@ class HandlerSwitcher(object):
answer_class = request.getAnswerClass() answer_class = request.getAnswerClass()
assert answer_class is not None, "Not a request" assert answer_class is not None, "Not a request"
assert msg_id not in request_dict, "Packet id already expected" assert msg_id not in request_dict, "Packet id already expected"
next_timeout = self._next_timeout request_dict[msg_id] = answer_class, kw
if next_timeout is None or timeout < next_timeout:
self._next_timeout = timeout
self._next_timeout_msg_id = msg_id
request_dict[msg_id] = answer_class, timeout, kw
def getNextTimeout(self):
return self._next_timeout
def timeout(self, connection):
logging.info('timeout for #0x%08x with %r',
self._next_timeout_msg_id, connection)
def handle(self, connection, packet): def handle(self, connection, packet):
assert not self._is_handling assert not self._is_handling
...@@ -118,7 +103,7 @@ class HandlerSwitcher(object): ...@@ -118,7 +103,7 @@ class HandlerSwitcher(object):
request_dict, handler = pending[0] request_dict, handler = pending[0]
# checkout the expected answer class # checkout the expected answer class
try: try:
klass, _, kw = request_dict.pop(msg_id) klass, kw = request_dict.pop(msg_id)
except KeyError: except KeyError:
klass = None klass = None
kw = {} kw = {}
...@@ -137,18 +122,6 @@ class HandlerSwitcher(object): ...@@ -137,18 +122,6 @@ class HandlerSwitcher(object):
del pending[0] del pending[0]
logging.debug('Apply handler %r on %r', pending[0][1], logging.debug('Apply handler %r on %r', pending[0][1],
connection) connection)
if msg_id == self._next_timeout_msg_id:
self._updateNextTimeout()
def _updateNextTimeout(self):
# Find next timeout and its msg_id
next_timeout = None
for pending in self._pending:
for msg_id, (_, timeout, _) in pending[0].iteritems():
if not next_timeout or timeout < next_timeout[0]:
next_timeout = timeout, msg_id
self._next_timeout, self._next_timeout_msg_id = \
next_timeout or (None, None)
def setHandler(self, handler): def setHandler(self, handler):
can_apply = len(self._pending) == 1 and not self._pending[0][0] can_apply = len(self._pending) == 1 and not self._pending[0][0]
...@@ -166,20 +139,30 @@ class BaseConnection(object): ...@@ -166,20 +139,30 @@ class BaseConnection(object):
About timeouts: About timeouts:
Timeout are mainly per-connection instead of per-packet. In the past, ask() took a timeout parameter as a way to close the
The idea is that most of time, packets are received and processed connection if the remote node was too long to reply, with the idea
sequentially, so if it takes a long for a peer to process a packet, that something went wrong. There was no known bug but this feature was
following packets would just be enqueued. actually a bad idea.
What really matters is that the peer makes progress in its work.
As long as we receive an answer, we consider it's still alive and It is impossible to test whether the remote node is in good state or
it may just have started to process the following request. So we reset not. The experience shows that timeouts were always triggered because
timeouts. the remote nodes were simply too slow. Waiting remains the best option
There is anyway nothing more we could do, because processing of a packet and anything else would only make things worse.
may be delayed in a very unpredictable way depending of previously
received packets on peer side. The only case where it could make sense to react on a slow request is
Even ourself may be slow to receive a packet. We must not timeout for when there is redundancy, more exactly for read requests to storage
an answer that is already in our incoming buffer (read_buf or _queue). nodes when there are replicas. A client node could resend its request
Timeouts in HandlerSwitcher are only there to prioritize some packets. to another node, _without_ breaking the first connection (then wait for
the first reply and ignore the other).
The previous timeout implementation (before May 2017) was not well
suited to support the above use case so most of the code has been
removed, but it may contain some interesting parts.
Currently, since applicative pings have been replaced by TCP
keepalives, timeouts are only used for 2 things:
- to avoid reconnecting too fast
- to close idle client connections
""" """
from .connector import SocketConnector as ConnectorClass from .connector import SocketConnector as ConnectorClass
...@@ -326,10 +309,8 @@ class Connection(BaseConnection): ...@@ -326,10 +309,8 @@ class Connection(BaseConnection):
client = False client = False
server = False server = False
peer_id = None peer_id = None
_next_timeout = None
_parser_state = None _parser_state = None
_idle_timeout = 0 _timeout = None
_timeout = 0
def __init__(self, event_manager, *args, **kw): def __init__(self, event_manager, *args, **kw):
BaseConnection.__init__(self, event_manager, *args, **kw) BaseConnection.__init__(self, event_manager, *args, **kw)
...@@ -361,29 +342,32 @@ class Connection(BaseConnection): ...@@ -361,29 +342,32 @@ class Connection(BaseConnection):
def asClient(self): def asClient(self):
try: try:
del self._idle_timeout del self._timeout
except AttributeError: except AttributeError:
self.client = True self.client = True
else: else:
assert self.client assert self.client
self.updateTimeout()
def asServer(self): def asServer(self):
self.server = True self.server = True
def _closeClient(self): def _closeClient(self):
if self.server: if self.server:
del self._idle_timeout del self._timeout
self.updateTimeout()
self.client = False self.client = False
self.send(Packets.CloseClient()) self.send(Packets.CloseClient())
else: else:
self.close() self.close()
def closeClient(self): def closeClient(self):
# Currently, the only usage that is really useful is between a backup
# storage node and an upstream one, to avoid:
# - maintaining many connections for nothing when there's no write
# activity for a long time (and waste resources with keepalives)
# - reconnecting too often (i.e. be reactive) when there's moderate
# activity (think of a timer with a period of 1 minute)
if self.connector is not None and self.client: if self.connector is not None and self.client:
self._idle_timeout = 60 self._timeout = time() + 100
self._checkSmallerTimeout()
def isAborted(self): def isAborted(self):
return self.aborted return self.aborted
...@@ -406,33 +390,13 @@ class Connection(BaseConnection): ...@@ -406,33 +390,13 @@ class Connection(BaseConnection):
self.cur_id = (next_id + 1) & 0xffffffff self.cur_id = (next_id + 1) & 0xffffffff
return next_id return next_id
def updateTimeout(self, t=None):
if not self._queue:
if not t:
t = self._next_timeout - self._timeout
self._timeout = self._handlers.getNextTimeout() or \
self._idle_timeout
self._next_timeout = t + self._timeout
_checkSmallerTimeout = updateTimeout
def getTimeout(self): def getTimeout(self):
if not self._queue and self._timeout: if not self._queue:
return self._next_timeout return self._timeout
def onTimeout(self): def onTimeout(self):
handlers = self._handlers assert self._timeout
if handlers.isPending(): self._closeClient()
# It is possible that another thread used ask() while getting a
# timeout from epoll, so we must check again the value of
# _next_timeout (we know that _queue is still empty).
# Although this test is only useful for MTClientConnection,
# it's not worth complicating the code more.
if self._next_timeout <= time():
handlers.timeout(self)
self.close()
elif self._idle_timeout:
self._closeClient()
def abort(self): def abort(self):
"""Abort dealing with this connection.""" """Abort dealing with this connection."""
...@@ -501,7 +465,6 @@ class Connection(BaseConnection): ...@@ -501,7 +465,6 @@ class Connection(BaseConnection):
def readable(self): def readable(self):
"""Called when self is readable.""" """Called when self is readable."""
# last known remote activity # last known remote activity
self._next_timeout = time() + self._timeout
try: try:
try: try:
if self.connector.receive(self.read_buf): if self.connector.receive(self.read_buf):
...@@ -532,10 +495,7 @@ class Connection(BaseConnection): ...@@ -532,10 +495,7 @@ class Connection(BaseConnection):
Process a pending packet. Process a pending packet.
""" """
# check out packet and process it with current handler # check out packet and process it with current handler
try: self._handlers.handle(self, self._queue.pop(0))
self._handlers.handle(self, self._queue.pop(0))
finally:
self.updateTimeout()
def pending(self): def pending(self):
connector = self.connector connector = self.connector
...@@ -592,7 +552,7 @@ class Connection(BaseConnection): ...@@ -592,7 +552,7 @@ class Connection(BaseConnection):
packet.setId(self._getNextId() if msg_id is None else msg_id) packet.setId(self._getNextId() if msg_id is None else msg_id)
self._addPacket(packet) self._addPacket(packet)
def ask(self, packet, timeout=CRITICAL_TIMEOUT, **kw): def ask(self, packet, **kw):
""" """
Send a packet with a new ID and register the expectation of an answer Send a packet with a new ID and register the expectation of an answer
""" """
...@@ -601,10 +561,7 @@ class Connection(BaseConnection): ...@@ -601,10 +561,7 @@ class Connection(BaseConnection):
msg_id = self._getNextId() msg_id = self._getNextId()
packet.setId(msg_id) packet.setId(msg_id)
self._addPacket(packet) self._addPacket(packet)
handlers = self._handlers self._handlers.emit(packet, kw)
t = None if handlers.isPending() else time()
handlers.emit(packet, timeout, kw)
self._checkSmallerTimeout(t)
return msg_id return msg_id
def answer(self, packet): def answer(self, packet):
...@@ -667,7 +624,6 @@ class ClientConnection(Connection): ...@@ -667,7 +624,6 @@ class ClientConnection(Connection):
def _maybeConnected(self): def _maybeConnected(self):
self.writable = self.lockWrapper(super(ClientConnection, self).writable) self.writable = self.lockWrapper(super(ClientConnection, self).writable)
self.updateTimeout(time())
if self._ssl: if self._ssl:
self.connector.ssl(self._ssl, self._connected) self.connector.ssl(self._ssl, self._connected)
else: else:
...@@ -682,7 +638,6 @@ class ServerConnection(Connection): ...@@ -682,7 +638,6 @@ class ServerConnection(Connection):
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
Connection.__init__(self, *args, **kw) Connection.__init__(self, *args, **kw)
self.em.register(self) self.em.register(self)
self.updateTimeout(time())
class MTConnectionType(type): class MTConnectionType(type):
...@@ -741,20 +696,36 @@ class MTClientConnection(ClientConnection): ...@@ -741,20 +696,36 @@ class MTClientConnection(ClientConnection):
# Alias without lock (cheaper than super()) # Alias without lock (cheaper than super())
_ask = ClientConnection.ask.__func__ _ask = ClientConnection.ask.__func__
def ask(self, packet, timeout=CRITICAL_TIMEOUT, queue=None, **kw): def ask(self, packet, queue=None, **kw):
with self.lock: with self.lock:
if queue is None: if queue is None:
if type(packet) is Packets.Ping: if type(packet) is Packets.Ping:
return self._ask(packet, timeout, **kw) return self._ask(packet, **kw)
raise TypeError('Only Ping packet can be asked' raise TypeError('Only Ping packet can be asked'
' without a queue, got a %r.' % packet) ' without a queue, got a %r.' % packet)
msg_id = self._ask(packet, timeout, **kw) msg_id = self._ask(packet, **kw)
self.dispatcher.register(self, msg_id, queue) self.dispatcher.register(self, msg_id, queue)
return msg_id return msg_id
def _checkSmallerTimeout(self, t=None): # Currently, on connected connections, we only use timeouts for
if not self._queue: # closeClient, which is never used for MTClientConnection.
next_timeout = self._timeout and self._next_timeout # So we disable the logic completely as a precaution, and for performance.
self.updateTimeout(t) # What is specific to MTClientConnection is that the poll thread must be
if not next_timeout or self._next_timeout < next_timeout: # woken up whenever the timeout is changed to a smaller value.
self.em.wakeup()
def closeClient(self):
# For example here, in addition to what the super method does,
# we may have to call `self.em.wakeup()`
raise NotImplementedError
def getTimeout(self):
pass
def onTimeout(self):
# It is possible that another thread manipulated the connection while
# getting a timeout from epoll. Only the poll thread fills _queue
# so we know that it is empty, but we may have to check timeout values
# again (i.e. compare time() with the result of getTimeout()).
raise NotImplementedError
###
...@@ -69,7 +69,7 @@ class ClientServiceHandler(MasterHandler): ...@@ -69,7 +69,7 @@ class ClientServiceHandler(MasterHandler):
if tid: if tid:
p = Packets.AskLockInformation(ttid, tid) p = Packets.AskLockInformation(ttid, tid)
for node in node_list: for node in node_list:
node.ask(p, timeout=60) node.ask(p)
else: else:
conn.answer(Errors.IncompleteTransaction()) conn.answer(Errors.IncompleteTransaction())
# It's simpler to abort automatically rather than asking the client # It's simpler to abort automatically rather than asking the client
......
...@@ -42,7 +42,6 @@ from neo.tests.benchmark import BenchmarkRunner ...@@ -42,7 +42,6 @@ from neo.tests.benchmark import BenchmarkRunner
# each of them have to import its TestCase classes # each of them have to import its TestCase classes
UNIT_TEST_MODULES = [ UNIT_TEST_MODULES = [
# generic parts # generic parts
'neo.tests.testConnection',
'neo.tests.testHandler', 'neo.tests.testHandler',
'neo.tests.testNodes', 'neo.tests.testNodes',
'neo.tests.testUtil', 'neo.tests.testUtil',
......
...@@ -46,7 +46,6 @@ class StorageOperationHandler(EventHandler): ...@@ -46,7 +46,6 @@ class StorageOperationHandler(EventHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
app = self.app app = self.app
if app.operational and conn.isClient(): if app.operational and conn.isClient():
# XXX: Connection and Node should merged.
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid: if uuid:
node = app.nm.getByUUID(uuid) node = app.nm.getByUUID(uuid)
......
...@@ -356,6 +356,7 @@ class Replicator(object): ...@@ -356,6 +356,7 @@ class Replicator(object):
self.fetchTransactions() self.fetchTransactions()
def fetchTransactions(self, min_tid=None): def fetchTransactions(self, min_tid=None):
assert self.current_node.getConnection().isClient(), self.current_node
offset = self.current_partition offset = self.current_partition
p = self.partition_dict[offset] p = self.partition_dict[offset]
if min_tid: if min_tid:
......
# -*- coding: utf-8 -*-
#
# Copyright (C) 2009-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from time import time
from .mock import Mock
from neo.lib import connection, logging
from neo.lib.connection import BaseConnection, ClientConnection, \
MTClientConnection, CRITICAL_TIMEOUT
from neo.lib.handler import EventHandler
from neo.lib.protocol import ENCODED_VERSION, Packets
from . import NeoUnitTestBase, Patch
connector_cpt = 0
class DummyConnector(Mock):
def __init__(self, addr, s=None):
logging.info("initializing connector")
global connector_cpt
self.desc = connector_cpt
connector_cpt += 1
self.packet_cpt = 0
self.addr = addr
Mock.__init__(self)
def getAddress(self):
return self.addr
def getDescriptor(self):
return self.desc
accept = getError = makeClientConnection = makeListeningConnection = \
receive = send = lambda *args, **kw: None
dummy_connector = Patch(BaseConnection,
ConnectorClass=lambda orig, self, *args, **kw: DummyConnector(*args, **kw))
class ConnectionTests(NeoUnitTestBase):
def setUp(self):
NeoUnitTestBase.setUp(self)
self.app = Mock({'__repr__': 'Fake App'})
self.app.ssl = None
self.em = self.app.em = Mock({'__repr__': 'Fake Em'})
self.handler = Mock({'__repr__': 'Fake Handler'})
self.address = ("127.0.0.7", 93413)
self.node = Mock({'getAddress': self.address})
def _makeClientConnection(self):
with dummy_connector:
conn = ClientConnection(self.app, self.handler, self.node)
self.connector = conn.connector
return conn
def testTimeout(self):
# NOTE: This method uses ping/pong packets only because MT connections
# don't accept any other packet without specifying a queue.
self.handler = EventHandler(self.app)
conn = self._makeClientConnection()
conn.read_buf.append(ENCODED_VERSION)
use_case_list = (
# (a) For a single packet sent at T,
# the limit time for the answer is T + (1 * CRITICAL_TIMEOUT)
((), (1., 0)),
# (b) Same as (a), even if send another packet at (T + CT/2).
# But receiving a packet (at T + CT - ε) resets the timeout
# (which means the limit for the 2nd one is T + 2*CT)
((.5, None), (1., 0, 2., 1)),
# (c) Same as (b) with a first answer at well before the limit
# (T' = T + CT/2). The limit for the second one is T' + CT.
((.1, None, .5, 1), (1.5, 0)),
)
def set_time(t):
connection.time = lambda: int(CRITICAL_TIMEOUT * (1000 + t))
closed = []
conn.close = lambda: closed.append(connection.time())
def answer(packet_id):
p = Packets.Pong()
p.setId(packet_id)
conn.connector.receive = lambda read_buf: \
read_buf.append(''.join(p.encode()))
conn.readable()
checkTimeout()
conn.process()
def checkTimeout():
timeout = conn.getTimeout()
if timeout and timeout <= connection.time():
conn.onTimeout()
try:
for use_case, expected in use_case_list:
i = iter(use_case)
conn.cur_id = 0
set_time(0)
# No timeout when no pending request
self.assertEqual(conn._handlers.getNextTimeout(), None)
conn.ask(Packets.Ping())
for t in i:
set_time(t)
checkTimeout()
packet_id = i.next()
if packet_id is None:
conn.ask(Packets.Ping())
else:
answer(packet_id)
i = iter(expected)
for t in i:
set_time(t - .1)
checkTimeout()
set_time(t)
# this test method relies on the fact that only
# conn.close is called in case of a timeout
checkTimeout()
self.assertEqual(closed.pop(), connection.time())
answer(i.next())
self.assertFalse(conn.isPending())
self.assertFalse(closed)
finally:
connection.time = time
class MTConnectionTests(ConnectionTests):
# XXX: here we test non-client-connection-related things too, which
# duplicates test suite work... Should be fragmented into finer-grained
# test classes.
def setUp(self):
super(MTConnectionTests, self).setUp()
self.dispatcher = Mock({'__repr__': 'Fake Dispatcher'})
def _makeClientConnection(self):
with dummy_connector:
conn = MTClientConnection(self.app, self.handler, self.node,
dispatcher=self.dispatcher)
self.connector = conn.connector
return conn
def test_MTClientConnectionQueueParameter(self):
ask = self._makeClientConnection().ask
packet = Packets.AskPrimary() # Any non-Ping simple "ask" packet
# One cannot "ask" anything without a queue
self.assertRaises(TypeError, ask, packet)
ask(packet, queue=object())
# ... except Ping
ask(Packets.Ping())
if __name__ == '__main__':
unittest.main()
...@@ -2047,7 +2047,7 @@ class Test(NEOThreadedTest): ...@@ -2047,7 +2047,7 @@ class Test(NEOThreadedTest):
if (isinstance(packet, Packets.AnswerStoreObject) if (isinstance(packet, Packets.AnswerStoreObject)
and packet.decode()[0]): and packet.decode()[0]):
conn, = cluster.client.getConnectionList(app) conn, = cluster.client.getConnectionList(app)
kw = conn._handlers._pending[0][0][packet._id][2] kw = conn._handlers._pending[0][0][packet._id][1]
return 1 == u64(kw['oid']) and delay_conflict[app.uuid].pop() return 1 == u64(kw['oid']) and delay_conflict[app.uuid].pop()
def writeA(orig, txn_context, oid, serial, data): def writeA(orig, txn_context, oid, serial, data):
if u64(oid) == 1: if u64(oid) == 1:
......
...@@ -27,7 +27,6 @@ from neo.storage.checker import CHECK_COUNT ...@@ -27,7 +27,6 @@ from neo.storage.checker import CHECK_COUNT
from neo.storage.replicator import Replicator from neo.storage.replicator import Replicator
from neo.lib.connector import SocketConnector from neo.lib.connector import SocketConnector
from neo.lib.connection import ClientConnection from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager
from neo.lib.protocol import CellStates, ClusterStates, Packets, \ from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64, u64 from neo.lib.util import p64, u64
...@@ -225,33 +224,6 @@ class ReplicationTests(NEOThreadedTest): ...@@ -225,33 +224,6 @@ class ReplicationTests(NEOThreadedTest):
self.tic() self.tic()
self.assertEqual(np*3, self.checkBackup(backup)) self.assertEqual(np*3, self.checkBackup(backup))
@backup_test()
def testBackupUpstreamMasterDead(self, backup):
"""Check proper behaviour when upstream master is unreachable
More generally, this checks that when a handler raises when a connection
is closed voluntarily, the connection is in a consistent state and can
be, for example, closed again after the exception is caught, without
assertion failure.
"""
conn, = backup.master.getConnectionList(backup.upstream.master)
conn.ask(Packets.Ping())
self.assertTrue(conn.isPending())
# force ping to have expired
# connection will be closed before upstream master has time
# to answer
def _poll(orig, self, blocking):
if backup.master.em is self:
p.revert()
conn._next_timeout = 0
conn.onTimeout()
else:
orig(self, blocking)
with Patch(EventManager, _poll=_poll) as p:
self.tic()
new_conn, = backup.master.getConnectionList(backup.upstream.master)
self.assertIsNot(new_conn, conn)
@backup_test() @backup_test()
def testBackupUpstreamStorageDead(self, backup): def testBackupUpstreamStorageDead(self, backup):
upstream = backup.upstream upstream = backup.upstream
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment