Commit 305dda86 authored by Julien Muchembled's avatar Julien Muchembled

client: fix AssertionError when trying to reconnect too quickly after an error

When ConnectionPool._initNodeConnection fails a first time with:

  StorageError: protocol error: already connected

the following assertion failure happens when trying to reconnect before the
previous connection is actually closed (currently, only the node sending an
error message closes the connection, as commented in EventHandler):

  Traceback (most recent call last):
    File "neo/client/Storage.py", line 82, in load
      return self.app.load(oid)[:2]
    File "neo/client/app.py", line 367, in load
      data, tid, next_tid, _ = self._loadFromStorage(oid, tid, before_tid)
    File "neo/client/app.py", line 399, in _loadFromStorage
      askStorage)
    File "neo/client/app.py", line 293, in _askStorageForRead
      conn = cp.getConnForNode(node)
    File "neo/client/pool.py", line 98, in getConnForNode
      conn = self._initNodeConnection(node)
    File "neo/client/pool.py", line 48, in _initNodeConnection
      dispatcher=app.dispatcher)
    File "neo/lib/connection.py", line 704, in __init__
      super(MTClientConnection, self).__init__(*args, **kwargs)
    File "neo/lib/connection.py", line 602, in __init__
      node.setConnection(self)
    File "neo/lib/node.py", line 122, in setConnection
      attributeTracker.whoSet(self, '_connection'))
  AssertionError
parent 163858ed
...@@ -30,20 +30,6 @@ from ..exception import NEOStorageReadRetry, NEOStorageDoesNotExistError ...@@ -30,20 +30,6 @@ from ..exception import NEOStorageReadRetry, NEOStorageDoesNotExistError
class StorageEventHandler(MTEventHandler): class StorageEventHandler(MTEventHandler):
def connectionLost(self, conn, new_state):
# TODO: refactor with connectionFailed
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
super(StorageEventHandler, self).connectionLost(conn, new_state)
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getByAddress(conn.getAddress())
assert node is not None
self.app.cp.removeConnection(node)
super(StorageEventHandler, self).connectionFailed(conn)
def _acceptIdentification(*args): def _acceptIdentification(*args):
pass pass
......
...@@ -29,10 +29,10 @@ MAX_FAILURE_AGE = 600 ...@@ -29,10 +29,10 @@ MAX_FAILURE_AGE = 600
class ConnectionPool(object): class ConnectionPool(object):
"""This class manages a pool of connections to storage nodes.""" """This class manages a pool of connections to storage nodes."""
# XXX: This is not a pool anymore.
def __init__(self, app): def __init__(self, app):
self.app = app self.app = app
self.connection_dict = {}
# Define a lock in order to create one connection to # Define a lock in order to create one connection to
# a storage node at a time to avoid multiple connections # a storage node at a time to avoid multiple connections
# to the same node. # to the same node.
...@@ -82,39 +82,18 @@ class ConnectionPool(object): ...@@ -82,39 +82,18 @@ class ConnectionPool(object):
def getConnForNode(self, node): def getConnForNode(self, node):
"""Return a locked connection object to a given node """Return a locked connection object to a given node
If no connection exists, create a new one""" If no connection exists, create a new one"""
if node.isRunning(): conn = node._connection # XXX
uuid = node.getUUID() if node.isRunning() if conn is None else not node._identified:
try:
# Already connected to node
return self.connection_dict[uuid]
except KeyError:
with self._lock: with self._lock:
# Second lookup, if another thread initiated connection conn = node._connection # XXX
# while we were waiting for connection lock. if conn is None:
try: return self._initNodeConnection(node)
return self.connection_dict[uuid]
except KeyError:
# Create new connection to node
conn = self._initNodeConnection(node)
if conn is not None:
self.connection_dict[uuid] = conn
if not conn.isClosed():
return conn return conn
# conn was closed just after the reception of
# AnswerRequestIdentification (e.g. RST upon ACK),
# and removeConnection may have been called (by the
# poll thread) before we add it to connection_dict.
self.connection_dict.pop(uuid, None)
def removeConnection(self, node):
self.connection_dict.pop(node.getUUID(), None)
def closeAll(self): def closeAll(self):
with self._lock: with self._lock:
while 1: for node in self.app.nm.getStorageList():
try: conn = node._connection # XXX
conn = self.connection_dict.popitem()[1] if conn is not None:
except KeyError:
break
conn.setReconnectionNoDelay() conn.setReconnectionNoDelay()
conn.close() conn.close()
...@@ -1624,7 +1624,7 @@ class Test(NEOThreadedTest): ...@@ -1624,7 +1624,7 @@ class Test(NEOThreadedTest):
with cluster.newClient(1) as db: with cluster.newClient(1) as db:
t2, c2 = cluster.getTransaction(db) t2, c2 = cluster.getTransaction(db)
with self.noConnection(c1, s2), self.noConnection(c2, s1): with self.noConnection(c1, s2), self.noConnection(c2, s1):
cluster.client.cp.connection_dict[s2.uuid].close() cluster.client.nm.getByUUID(s2.uuid).getConnection().close()
self.tic() self.tic()
for c1_aborts in 0, 1: for c1_aborts in 0, 1:
# 0: C1 finishes, C2 vote fails # 0: C1 finishes, C2 vote fails
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment