Commit c05e65ac authored by Julien Muchembled's avatar Julien Muchembled

Prevent nodes from reconnecting too fast

This happened between storage nodes of different clusters because they're not
informed about their state, e.g. a dead upstream storage node.

In any case, logs were flooded at 100% cpu usage.
parent c321971f
...@@ -28,6 +28,8 @@ from .util import ReadBuffer ...@@ -28,6 +28,8 @@ from .util import ReadBuffer
CRITICAL_TIMEOUT = 30 CRITICAL_TIMEOUT = 30
connect_limit = 0
class ConnectionClosed(Exception): class ConnectionClosed(Exception):
pass pass
...@@ -548,6 +550,18 @@ class Connection(BaseConnection): ...@@ -548,6 +550,18 @@ class Connection(BaseConnection):
self._handlers.handle(self, self._queue.pop(0)) self._handlers.handle(self, self._queue.pop(0))
self.close() self.close()
def _delayed_closure(self):
# Wait at least 1 second between connection failures.
global connect_limit
t = time()
if t < connect_limit:
self.checkTimeout = lambda t: t < connect_limit or \
self._delayed_closure()
self.readable = self.writable = lambda: None
else:
connect_limit = t + 1
self._closure()
def _recv(self): def _recv(self):
"""Receive data from a connector.""" """Receive data from a connector."""
try: try:
...@@ -556,7 +570,7 @@ class Connection(BaseConnection): ...@@ -556,7 +570,7 @@ class Connection(BaseConnection):
pass pass
except ConnectorConnectionRefusedException: except ConnectorConnectionRefusedException:
assert self.connecting assert self.connecting
self._closure() self._delayed_closure()
except ConnectorConnectionClosedException: except ConnectorConnectionClosedException:
# connection resetted by peer, according to the man, this error # connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false # should not occurs but it seems it's false
...@@ -671,7 +685,7 @@ class ClientConnection(Connection): ...@@ -671,7 +685,7 @@ class ClientConnection(Connection):
else: else:
self._connectionCompleted() self._connectionCompleted()
except ConnectorConnectionRefusedException: except ConnectorConnectionRefusedException:
self._closure() self._delayed_closure()
except ConnectorException: except ConnectorException:
# unhandled connector exception # unhandled connector exception
self._closure() self._closure()
...@@ -680,7 +694,7 @@ class ClientConnection(Connection): ...@@ -680,7 +694,7 @@ class ClientConnection(Connection):
def writable(self): def writable(self):
"""Called when self is writable.""" """Called when self is writable."""
if self.connector.getError(): if self.connector.getError():
self._closure() self._delayed_closure()
else: else:
self._connectionCompleted() self._connectionCompleted()
self.writable() self.writable()
......
...@@ -190,12 +190,12 @@ class Application(object): ...@@ -190,12 +190,12 @@ class Application(object):
while (self.unconnected_master_node_set or while (self.unconnected_master_node_set or
self.negotiating_master_node_set): self.negotiating_master_node_set):
for addr in self.unconnected_master_node_set: for addr in self.unconnected_master_node_set:
self.negotiating_master_node_set.add(addr)
ClientConnection(self.em, client_handler, ClientConnection(self.em, client_handler,
# XXX: Ugly, but the whole election code will be # XXX: Ugly, but the whole election code will be
# replaced soon # replaced soon
node=getByAddress(addr), node=getByAddress(addr),
connector=self.connector_handler()) connector=self.connector_handler())
self.negotiating_master_node_set.add(addr)
self.unconnected_master_node_set.clear() self.unconnected_master_node_set.clear()
self.em.poll(1) self.em.poll(1)
except ElectionFailure, m: except ElectionFailure, m:
......
...@@ -57,9 +57,7 @@ class ClientElectionHandler(BaseElectionHandler): ...@@ -57,9 +57,7 @@ class ClientElectionHandler(BaseElectionHandler):
addr = conn.getAddress() addr = conn.getAddress()
node = self.app.nm.getByAddress(addr) node = self.app.nm.getByAddress(addr)
assert node is not None, (uuid_str(self.app.uuid), addr) assert node is not None, (uuid_str(self.app.uuid), addr)
assert node.isUnknown(), (uuid_str(self.app.uuid), node.whoSetState(), # node may still be in unknown state
node)
# connection never success, node is still in unknown state
self.app.negotiating_master_node_set.discard(addr) self.app.negotiating_master_node_set.discard(addr)
super(ClientElectionHandler, self).connectionFailed(conn) super(ClientElectionHandler, self).connectionFailed(conn)
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
import unittest import unittest
from time import time from time import time
from mock import Mock from mock import Mock
from neo.lib import connection
from neo.lib.connection import ListeningConnection, Connection, \ from neo.lib.connection import ListeningConnection, Connection, \
ClientConnection, ServerConnection, MTClientConnection, \ ClientConnection, ServerConnection, MTClientConnection, \
HandlerSwitcher, CRITICAL_TIMEOUT HandlerSwitcher, CRITICAL_TIMEOUT
...@@ -37,6 +38,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -37,6 +38,7 @@ class ConnectionTests(NeoUnitTestBase):
self.handler = Mock({'__repr__': 'Fake Handler'}) self.handler = Mock({'__repr__': 'Fake Handler'})
self.address = ("127.0.0.7", 93413) self.address = ("127.0.0.7", 93413)
self.node = Mock({'getAddress': self.address}) self.node = Mock({'getAddress': self.address})
connection.connect_limit = 0
def _makeListeningConnection(self, addr): def _makeListeningConnection(self, addr):
# create instance after monkey patches # create instance after monkey patches
......
...@@ -21,11 +21,12 @@ from collections import defaultdict ...@@ -21,11 +21,12 @@ from collections import defaultdict
from functools import wraps from functools import wraps
from neo.lib import logging from neo.lib import logging
from neo.storage.checker import CHECK_COUNT from neo.storage.checker import CHECK_COUNT
from neo.lib.connection import ClientConnection
from neo.lib.protocol import CellStates, ClusterStates, Packets, \ from neo.lib.protocol import CellStates, ClusterStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import p64 from neo.lib.util import p64
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, Patch, \ from . import ConnectionFilter, NEOCluster, NEOThreadedTest, Patch, \
predictable_random predictable_random, Serialized
def backup_test(partitions=1, upstream_kw={}, backup_kw={}): def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
...@@ -261,6 +262,31 @@ class ReplicationTests(NEOThreadedTest): ...@@ -261,6 +262,31 @@ class ReplicationTests(NEOThreadedTest):
new_conn, = backup.master.getConnectionList(backup.upstream.master) new_conn, = backup.master.getConnectionList(backup.upstream.master)
self.assertFalse(new_conn is conn) self.assertFalse(new_conn is conn)
@backup_test()
def testBackupUpstreamStorageDead(self, backup):
upstream = backup.upstream
with ConnectionFilter() as f:
f.add(lambda conn, packet:
isinstance(packet, Packets.InvalidateObjects))
upstream.importZODB()(1)
upstream.client.setPoll(0)
count = [0]
def __init__(orig, *args, **kw):
count[0] += 1
orig(*args, **kw)
p = Patch(ClientConnection, __init__=__init__)
try:
upstream.storage.listening_conn.close()
Serialized.tic(); self.assertEqual(count[0], 0)
Serialized.tic(); count[0] or Serialized.tic()
Serialized.tic(); self.assertEqual(count[0], 2)
Serialized.tic(); self.assertEqual(count[0], 2)
time.sleep(1.1)
Serialized.tic(); self.assertEqual(count[0], 3)
Serialized.tic(); self.assertEqual(count[0], 3)
finally:
del p
@backup_test() @backup_test()
def testBackupDelayedUnlockTransaction(self, backup): def testBackupDelayedUnlockTransaction(self, backup):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment