Commit 2485f151 authored by Julien Muchembled's avatar Julien Muchembled

master: fix possible blockage during recovery after a storage disconnection

At some point, the master asks a storage node its partition table. If this node
is lost before getting an answer, another node (or the same one if it comes
back) must be asked.

Before this change, the master node had to be restarted.
parent dec81519
...@@ -28,6 +28,7 @@ class RecoveryManager(MasterHandler): ...@@ -28,6 +28,7 @@ class RecoveryManager(MasterHandler):
def __init__(self, app): def __init__(self, app):
# The target node's uuid to request next. # The target node's uuid to request next.
self.target_ptid = None self.target_ptid = None
self.ask_pt = []
self.backup_tid_dict = {} self.backup_tid_dict = {}
def getHandler(self): def getHandler(self):
...@@ -105,8 +106,22 @@ class RecoveryManager(MasterHandler): ...@@ -105,8 +106,22 @@ class RecoveryManager(MasterHandler):
pt.log() pt.log()
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID()) uuid = conn.getUUID()
assert node is not None node = self.app.nm.getByUUID(uuid)
try:
i = self.ask_pt.index(uuid)
except ValueError:
pass
else:
del self.ask_pt[i]
if not i:
if self.ask_pt:
self.app.nm.getByUUID(self.ask_pt[0]) \
.ask(Packets.AskPartitionTable())
else:
logging.warning("Waiting for %r to come back."
" No other node has version %s of the partition table.",
node, self.target_ptid)
if node.getState() == new_state: if node.getState() == new_state:
return return
node.setState(new_state) node.setState(new_state)
...@@ -121,24 +136,29 @@ class RecoveryManager(MasterHandler): ...@@ -121,24 +136,29 @@ class RecoveryManager(MasterHandler):
tm = self.app.tm tm = self.app.tm
tm.setLastOID(loid) tm.setLastOID(loid)
tm.setLastTID(ltid) tm.setLastTID(ltid)
if lptid > self.target_ptid: uuid = conn.getUUID()
# something newer if self.target_ptid <= lptid:
self.target_ptid = lptid # Maybe a newer partition table.
conn.ask(Packets.AskPartitionTable()) if self.target_ptid == lptid and self.ask_pt:
self.backup_tid_dict[conn.getUUID()] = backup_tid # Another node is already asked.
self.ask_pt.append(uuid)
elif self.target_ptid < lptid or self.ask_pt is not ():
# No node asked yet for the newest partition table.
self.target_ptid = lptid
self.ask_pt = [uuid]
conn.ask(Packets.AskPartitionTable())
self.backup_tid_dict[uuid] = backup_tid
def answerPartitionTable(self, conn, ptid, row_list): def answerPartitionTable(self, conn, ptid, row_list):
if ptid != self.target_ptid: # If this is not from a target node, ignore it.
# If this is not from a target node, ignore it. if ptid == self.target_ptid:
logging.warn('Got %s while waiting %s', dump(ptid),
dump(self.target_ptid))
else:
try: try:
new_nodes = self.app.pt.load(ptid, row_list, self.app.nm) new_nodes = self.app.pt.load(ptid, row_list, self.app.nm)
except IndexError: except IndexError:
raise ProtocolError('Invalid offset') raise ProtocolError('Invalid offset')
self._notifyAdmins(Packets.NotifyNodeInformation(new_nodes), self._notifyAdmins(Packets.NotifyNodeInformation(new_nodes),
Packets.SendPartitionTable(ptid, row_list)) Packets.SendPartitionTable(ptid, row_list))
self.ask_pt = ()
self.app.backup_tid = self.backup_tid_dict[conn.getUUID()] self.app.backup_tid = self.backup_tid_dict[conn.getUUID()]
def _notifyAdmins(self, *packets): def _notifyAdmins(self, *packets):
......
...@@ -34,6 +34,7 @@ from . import NEOCluster, NEOThreadedTest ...@@ -34,6 +34,7 @@ from . import NEOCluster, NEOThreadedTest
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOStorageError from neo.client.exception import NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD from neo.client.pool import CELL_CONNECTED, CELL_GOOD
from neo.storage.handlers.verification import VerificationHandler
class PCounter(Persistent): class PCounter(Persistent):
value = 0 value = 0
...@@ -1033,5 +1034,30 @@ class Test(NEOThreadedTest): ...@@ -1033,5 +1034,30 @@ class Test(NEOThreadedTest):
finally: finally:
cluster.stop() cluster.stop()
def testStorageLostDuringRecovery(self):
# Initialize a cluster.
cluster = NEOCluster(storage_count=2, partitions=2)
try:
cluster.start()
finally:
cluster.stop()
cluster.reset()
# Restart with a connection failure for the first AskPartitionTable.
# The master must not be stuck in RECOVERING state
# or re-make the partition table.
def make(*args):
sys.exit()
def askPartitionTable(orig, self, conn):
p.revert()
conn.close()
try:
with Patch(cluster.master.pt, make=make), Patch(VerificationHandler,
askPartitionTable=askPartitionTable) as p:
cluster.start()
self.assertFalse(p.applied)
finally:
cluster.stop()
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment