Commit e45dfae8 authored by Julien Muchembled's avatar Julien Muchembled

Fix handling of incoming non-empty storage nodes just after startup is allowed

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2835 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 0f8f681c
......@@ -157,11 +157,6 @@ class MasterEventHandler(EventHandler):
def notifyNodeInformation(self, conn, node_list):
app = self.app
app.nm.update(node_list)
if not app.pt.filled():
# Re-ask partition table, in case node change filled it.
# XXX: we should only ask it if received states indicates it is
# possible (ignore TEMPORARILY_DOWN for example)
conn.ask(Packets.AskPartitionTable())
class MasterRequestEventHandler(EventHandler):
""" This class handle all answer from primary master node"""
......
......@@ -23,7 +23,6 @@ from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates
from neo.lib.protocol import NotReadyError, ZERO_OID, ZERO_TID
from neo.master.handlers import MasterHandler
REQUIRED_NODE_NUMBER = 1
class RecoveryManager(MasterHandler):
"""
......@@ -42,14 +41,7 @@ class RecoveryManager(MasterHandler):
"""
Returns the handler for storage nodes
"""
# XXX: Looking at 'uuid' is not a good criteria to know if the storage
# is empty. Empty node should be accepted here.
# This is also the first step to fix handling of incoming
# non-empty storage nodes, whereas startup was already allowed.
if uuid is None and not self.app._startup_allowed:
neo.lib.logging.info('reject empty storage node')
raise NotReadyError
return (uuid, NodeStates.RUNNING, self)
return uuid, NodeStates.PENDING, self
def run(self):
"""
......@@ -67,22 +59,37 @@ class RecoveryManager(MasterHandler):
self.app.pt.setID(None)
# collect the last partition table available
while not self.app._startup_allowed:
while 1:
em.poll(1)
if self.app._startup_allowed:
allowed_node_set = set()
for node in self.app.nm.getStorageList():
if node.isPending():
break # waiting for an answer
if node.isRunning():
allowed_node_set.add(node)
else:
if allowed_node_set:
break # no ready storage node
neo.lib.logging.info('startup allowed')
# build a new partition table
if self.app.pt.getID() is None:
self.buildFromScratch()
neo.lib.logging.info('creating a new partition table')
# reset IDs generators & build new partition with running nodes
self.app.tm.setLastOID(ZERO_OID)
self.app.pt.make(allowed_node_set)
self._broadcastPartitionTable(self.app.pt.getID(),
self.app.pt.getRowList())
# collect node that are connected but not in the selected partition
# table and set them in pending state
allowed_node_set = set(self.app.pt.getNodeList())
refused_node_set = set(self.app.nm.getStorageList()) - allowed_node_set
for node in refused_node_set:
node.setPending()
self.app.broadcastNodesInformation(refused_node_set)
refused_node_set = allowed_node_set.difference(
self.app.pt.getNodeList())
if refused_node_set:
for node in refused_node_set:
node.setPending()
self.app.broadcastNodesInformation(refused_node_set)
self.app.setLastTransaction(self.app.tm.getLastTID())
neo.lib.logging.debug(
......@@ -90,23 +97,6 @@ class RecoveryManager(MasterHandler):
'table :', dump(self.app.tm.getLastOID()))
self.app.pt.log()
def buildFromScratch(self):
nm, em, pt = self.app.nm, self.app.em, self.app.pt
neo.lib.logging.debug('creating a new partition table, wait for a ' \
'storage node')
# wait for some empty storage nodes, their are accepted
while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER:
em.poll(1)
# take the first node available
node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER]
for node in node_list:
node.setRunning()
self.app.broadcastNodesInformation(node_list)
# resert IDs generators
self.app.tm.setLastOID(ZERO_OID)
# build the partition with this node
pt.make(node_list)
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
assert node is not None
......@@ -117,10 +107,8 @@ class RecoveryManager(MasterHandler):
self.app.broadcastNodesInformation([node])
def connectionCompleted(self, conn):
# XXX: handler split review needed to remove this hack
if not self.app._startup_allowed:
# ask the last IDs to perform the recovery
conn.ask(Packets.AskLastIDs())
# ask the last IDs to perform the recovery
conn.ask(Packets.AskLastIDs())
def answerLastIDs(self, conn, loid, ltid, lptid):
# Get max values.
......@@ -132,13 +120,25 @@ class RecoveryManager(MasterHandler):
# something newer
self.target_ptid = lptid
conn.ask(Packets.AskPartitionTable())
else:
node = self.app.nm.getByUUID(conn.getUUID())
assert node.isPending()
node.setRunning()
self.app.broadcastNodesInformation([node])
def answerPartitionTable(self, conn, ptid, row_list):
node = self.app.nm.getByUUID(conn.getUUID())
assert node.isPending()
node.setRunning()
if ptid != self.target_ptid:
# If this is not from a target node, ignore it.
neo.lib.logging.warn('Got %s while waiting %s', dump(ptid),
dump(self.target_ptid))
return
else:
self._broadcastPartitionTable(ptid, row_list)
self.app.broadcastNodesInformation([node])
def _broadcastPartitionTable(self, ptid, row_list):
try:
new_nodes = self.app.pt.load(ptid, row_list, self.app.nm)
except IndexError:
......@@ -152,4 +152,3 @@ class RecoveryManager(MasterHandler):
for node in self.app.nm.getAdminList(only_identified=True):
node.notify(notification)
node.notify(partition_table)
......@@ -97,12 +97,14 @@ class MasterRecoveryTests(NeoUnitTestBase):
# not from target node, ignore
uuid = self.identifyToMasterNode(NodeTypes.STORAGE, port=self.storage_port)
conn = self.getFakeConnection(uuid, self.storage_port)
node = self.app.nm.getByUUID(conn.getUUID())
offset = 1
cell_list = [(offset, uuid, CellStates.UP_TO_DATE)]
cells = self.app.pt.getRow(offset)
for cell, state in cells:
self.assertEqual(state, CellStates.OUT_OF_DATE)
recovery.target_ptid = 2
node.setPending()
recovery.answerPartitionTable(conn, 1, cell_list)
cells = self.app.pt.getRow(offset)
for cell, state in cells:
......@@ -114,6 +116,7 @@ class MasterRecoveryTests(NeoUnitTestBase):
cells = self.app.pt.getRow(offset)
for cell, state in cells:
self.assertEqual(state, CellStates.OUT_OF_DATE)
node.setPending()
recovery.answerPartitionTable(conn, None, cell_list)
cells = self.app.pt.getRow(offset)
for cell, state in cells:
......@@ -124,6 +127,7 @@ class MasterRecoveryTests(NeoUnitTestBase):
offset = 1000000
self.assertFalse(self.app.pt.hasOffset(offset))
cell_list = [(offset, ((uuid, NodeStates.DOWN,),),)]
node.setPending()
self.checkProtocolErrorRaised(recovery.answerPartitionTable, conn,
2, cell_list)
......
......@@ -109,8 +109,7 @@ class Test(NEOThreadedTest):
cluster.reset()
try:
cluster.start(storage_list=(s1,), fast_startup=fast_startup)
self.assertEqual((NodeStates.UNKNOWN, None)[fast_startup],
cluster.getNodeState(s2))
self.assertEqual(NodeStates.UNKNOWN, cluster.getNodeState(s2))
finally:
cluster.stop()
......@@ -140,10 +139,4 @@ class Test(NEOThreadedTest):
cluster.stop()
def testVerificationCommitUnfinishedTransactionsFastStartup(self):
# XXX: This test fails because if the admin starts the cluster without
# any storage node, the master (which is still in recovery stage)
# does not handle properly incoming non-empty storage nodes.
# In particular, it does not ask the last ids to the storage,
# and the client will ask objects at tid 0.
# See also RecoveryManager.identifyStorageNode
self.testVerificationCommitUnfinishedTransactions(True)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment