Commit 8ae3f0ea by Vincent Pelletier

Automate startup when all storage nodes of UP_TO_DATE cells are available.

Inspired from "mdadm assemble --no-degraded" behaviour, which was brought
up during a discussion on linux-raid mailing list about split-brain
situation detection and avoidance.
1 parent 2d35ac93
......@@ -51,6 +51,15 @@ class AdministrationHandler(MasterHandler):
# change state
if state == ClusterStates.VERIFYING:
storage_list = self.app.nm.getStorageList(only_identified=True)
if not storage_list:
raise ProtocolError('Cannot exit recovery without any '
'storage node')
for node in storage_list:
assert node.isPending(), node
if node.getConnection().isPending():
raise ProtocolError('Cannot exit recovery now: node %r is '
'entering cluster' % (node, ))
self.app._startup_allowed = True
else:
self.app.changeClusterState(state)
......
......@@ -302,3 +302,24 @@ class PartitionTable(PartitionTable):
CellStates.OUT_OF_DATE))
return cell_list
def getUpToDateCellNodeSet(self):
"""
Return a set of all nodes which are part of at least one UP TO DATE
partition.
"""
return set(cell.getNode()
for row in self.partition_list
for cell in row
if cell.isUpToDate() or cell.isFeeding()
)
def getOutOfDateCellNodeSet(self):
"""
Return a set of all nodes which are part of at least one OUT OF DATE
partition.
"""
return set(cell.getNode()
for row in self.partition_list
for cell in row if cell.isOutOfDate()
)
......@@ -51,50 +51,59 @@ class RecoveryManager(MasterHandler):
if this is the first time.
"""
neo.lib.logging.info('begin the recovery of the status')
self.app.changeClusterState(ClusterStates.RECOVERING)
em = self.app.em
self.app.pt.setID(None)
app = self.app
pt = app.pt
app.changeClusterState(ClusterStates.RECOVERING)
pt.setID(None)
# collect the last partition table available
poll = app.em.poll
while 1:
em.poll(1)
if self.app._startup_allowed:
poll(1)
allowed_node_set = set()
for node in self.app.nm.getStorageList():
if node.isPending():
break # waiting for an answer
if node.isRunning():
allowed_node_set.add(node)
else:
if pt.filled():
# A partition table exists, we are starting an existing
# cluster.
partition_node_set = pt.getUpToDateCellNodeSet()
pending_node_set = set(x for x in partition_node_set
if x.isPending())
if app._startup_allowed or \
partition_node_set == pending_node_set:
allowed_node_set = pending_node_set
extra_node_set = pt.getOutOfDateCellNodeSet()
elif app._startup_allowed:
# No partition table and admin allowed startup, we are
# creating a new cluster out of all pending nodes.
allowed_node_set = set(app.nm.getStorageList(
only_identified=True))
extra_node_set = set()
if allowed_node_set:
break # no ready storage node
for node in allowed_node_set:
assert node.isPending(), node
if node.getConnection().isPending():
break
else:
allowed_node_set |= extra_node_set
break
neo.lib.logging.info('startup allowed')
if self.app.pt.getID() is None:
for node in allowed_node_set:
node.setRunning()
app.broadcastNodesInformation(allowed_node_set)
if pt.getID() is None:
neo.lib.logging.info('creating a new partition table')
# reset IDs generators & build new partition with running nodes
self.app.tm.setLastOID(ZERO_OID)
self.app.pt.make(allowed_node_set)
self._broadcastPartitionTable(self.app.pt.getID(),
self.app.pt.getRowList())
# collect node that are connected but not in the selected partition
# table and set them in pending state
refused_node_set = allowed_node_set.difference(
self.app.pt.getNodeList())
if refused_node_set:
for node in refused_node_set:
node.setPending()
self.app.broadcastNodesInformation(refused_node_set)
self.app.setLastTransaction(self.app.tm.getLastTID())
app.tm.setLastOID(ZERO_OID)
pt.make(allowed_node_set)
self._broadcastPartitionTable(pt.getID(), pt.getRowList())
app.setLastTransaction(app.tm.getLastTID())
neo.lib.logging.debug(
'cluster starts with loid=%s and this partition ' \
'table :', dump(self.app.tm.getLastOID()))
self.app.pt.log()
'table :', dump(app.tm.getLastOID()))
pt.log()
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
......@@ -109,12 +118,6 @@ class RecoveryManager(MasterHandler):
# ask the last IDs to perform the recovery
conn.ask(Packets.AskLastIDs())
def _lastIDsCompleted(self, conn):
node = self.app.nm.getByUUID(conn.getUUID())
assert node.isPending()
node.setRunning()
self.app.broadcastNodesInformation([node])
def answerLastIDs(self, conn, loid, ltid, lptid):
# Get max values.
if loid is not None:
......@@ -125,8 +128,6 @@ class RecoveryManager(MasterHandler):
# something newer
self.target_ptid = lptid
conn.ask(Packets.AskPartitionTable())
else:
self._lastIDsCompleted(conn)
def answerPartitionTable(self, conn, ptid, row_list):
if ptid != self.target_ptid:
......@@ -135,7 +136,6 @@ class RecoveryManager(MasterHandler):
dump(self.target_ptid))
else:
self._broadcastPartitionTable(ptid, row_list)
self._lastIDsCompleted(conn)
def _broadcastPartitionTable(self, ptid, row_list):
try:
......
......@@ -340,16 +340,24 @@ class NEOCluster(object):
""" Do a complete start of a cluster """
self.run(except_storages=except_storages)
neoctl = self.neoctl
neoctl.startCluster()
target_count = len(self.db_list) - len(except_storages)
storage_node_list = []
def test():
storage_node_list[:] = neoctl.getNodeList(
node_type=NodeTypes.STORAGE)
storage_node_list[:] = [x
for x in neoctl.getNodeList(node_type=NodeTypes.STORAGE)
if x[3] == NodeStates.PENDING]
# wait at least number of started storages, admin node can know
# more nodes when the cluster restart with an existing partition
# table referencing non-running nodes
return len(storage_node_list) >= target_count
result = len(storage_node_list) >= target_count
if result:
try:
neoctl.startCluster()
except RuntimeError, exc:
result = False
else:
result = True
return result
if not pdb.wait(test, MAX_START_TIME):
raise AssertionError('Timeout when starting cluster')
if storage_node_list:
......
......@@ -32,6 +32,47 @@ class ClusterTests(NEOFunctionalTest):
self.neo.stop()
NEOFunctionalTest.tearDown(self)
def testClusterStartup(self):
neo = NEOCluster(['test_neo1', 'test_neo2'], replicas=1,
adapter='MySQL')
neoctl = neo.getNEOCTL()
neo.run()
# Runing a new cluster doesn't exit Recovery state.
s1, s2 = neo.getStorageProcessList()
neo.expectPending(s1)
neo.expectPending(s2)
neo.expectClusterRecovering()
# When allowing cluster to exit Recovery, it reaches Running state and
# all present storage nodes reach running state.
neoctl.startCluster()
neo.expectRunning(s1)
neo.expectRunning(s2)
neo.expectClusterRunning()
# Re-running cluster with a missing storage doesn't exit Recovery
# state.
neo.stop()
neo.run(except_storages=(s2, ))
neo.expectPending(s1)
neo.expectUnknown(s2)
neo.expectClusterRecovering()
# Starting missing storage allows cluster to exit Recovery without
# neoctl action.
s2.start()
neo.expectRunning(s1)
neo.expectRunning(s2)
neo.expectClusterRunning()
# Re-running cluster with a missing storage and allowing startup exits
# recovery.
neo.stop()
neo.run(except_storages=(s2, ))
neo.expectPending(s1)
neo.expectUnknown(s2)
neo.expectClusterRecovering()
neoctl.startCluster()
neo.expectRunning(s1)
neo.expectUnknown(s2)
neo.expectClusterRunning()
def testClusterBreaks(self):
self.neo = NEOCluster(['test_neo1'],
master_count=1, temp_dir=self.getTempDirectory())
......@@ -121,14 +162,14 @@ class ClusterTests(NEOFunctionalTest):
self.neo.expectStorageNotKnown(storages[0])
self.neo.expectStorageNotKnown(storages[1])
storages[0].start()
self.neo.expectRunning(storages[0])
self.neo.expectPending(storages[0])
self.neo.expectStorageNotKnown(storages[1])
storages[1].start()
self.neo.expectRunning(storages[0])
self.neo.expectRunning(storages[1])
self.neo.expectPending(storages[0])
self.neo.expectPending(storages[1])
storages[0].stop()
self.neo.expectUnavailable(storages[0])
self.neo.expectRunning(storages[1])
self.neo.expectPending(storages[1])
storages[1].stop()
self.neo.expectUnavailable(storages[0])
self.neo.expectUnavailable(storages[1])
......
......@@ -400,51 +400,6 @@ class StorageTests(NEOFunctionalTest):
self.__setup(storage_number=2, partitions=5000, master_count=1)
self.neo.expectClusterState(ClusterStates.RUNNING)
def testDropNodeThenRestartCluster(self):
""" Start a cluster with more than one storage, down one, shutdown the
cluster then restart it. The partition table recovered must not include
the dropped node """
# start with two storage / one replica
(started, stopped) = self.__setup(storage_number=2, replicas=1,
master_count=1, partitions=10)
self.neo.expectRunning(started[0])
self.neo.expectRunning(started[1])
self.neo.expectOudatedCells(number=0)
# drop one
self.neo.neoctl.dropNode(started[0].getUUID())
self.neo.expectStorageNotKnown(started[0])
self.neo.expectRunning(started[1])
# wait for running storage to store new partition table
self.__checkReplicateCount(self.neo.db_list[1], 1)
# restart all nodes except the dropped, it must not be known
self.neo.stop()
self.neo.start(except_storages=[started[0]])
self.neo.expectStorageNotKnown(started[0])
self.neo.expectRunning(started[1])
# then restart it, it must be in pending state
started[0].start()
self.neo.expectPending(started[0])
self.neo.expectRunning(started[1])
def testAcceptFirstEmptyStorageAfterStartupAllowed(self):
""" Create a new cluster with no storage node, allow it to starts
then run the first empty storage, it must be accepted """
(started, stopped) = self.__setup(storage_number=1, replicas=0,
pending_number=1, partitions=10)
# start without storage
self.neo.expectClusterRecovering()
self.neo.expectStorageNotKnown(stopped[0])
# start the empty storage, it must be accepted
stopped[0].start(with_uuid=False)
self.neo.expectClusterRunning()
self.assertEqual(len(self.neo.getStorageList()), 1)
self.neo.expectOudatedCells(number=0)
def testDropNodeWithOtherPending(self):
""" Ensure we can drop a node """
# start with one storage
......@@ -486,15 +441,16 @@ class StorageTests(NEOFunctionalTest):
# restart the cluster with the first storage killed
self.neo.run(except_storages=[started[1]])
self.neo.expectRunning(started[0])
self.neo.expectPending(started[0])
self.neo.expectUnknown(started[1])
self.neo.expectClusterRecovering()
# Cluster doesn't know there are outdated cells
self.neo.expectOudatedCells(number=0)
started[1].start()
self.neo.expectRunning(started[0])
self.neo.expectRunning(started[1])
self.neo.expectClusterRecovering()
self.neo.expectOudatedCells(number=10)
self.neo.expectClusterRunning()
self.neo.expectOudatedCells(number=0)
def testReplicationBlockedByUnfinished(self):
# start a cluster with 1 of 2 storages and a replica
......
......@@ -549,7 +549,7 @@ class NEOCluster(object):
self.client = ClientApplication(self)
self.neoctl = NeoCTL(weakref.proxy(self))
def start(self, storage_list=None, fast_startup=True):
def start(self, storage_list=None, fast_startup=False):
self._patch()
Serialized.init()
for node_type in 'master', 'admin':
......@@ -557,18 +557,29 @@ class NEOCluster(object):
node.start()
self.tic()
if fast_startup:
self.neoctl.startCluster()
self._startCluster()
if storage_list is None:
storage_list = self.storage_list
for node in storage_list:
node.start()
self.tic()
if not fast_startup:
self.neoctl.startCluster()
self._startCluster()
self.tic()
assert self.neoctl.getClusterState() == ClusterStates.RUNNING
self.enableStorageList(storage_list)
def _startCluster(self):
try:
self.neoctl.startCluster()
except RuntimeError:
self.tic()
if self.neoctl.getClusterState() not in (
ClusterStates.RUNNING,
ClusterStates.VERIFYING,
):
raise
def enableStorageList(self, storage_list):
self.neoctl.enableStorageList([x.uuid for x in storage_list])
self.tic()
......
......@@ -297,7 +297,7 @@ class Test(NEOThreadedTest):
finally:
cluster.stop()
def testRestartWithMissingStorage(self, fast_startup=False):
def testRestartWithMissingStorage(self):
# translated from neo.tests.functional.testStorage.StorageTest
cluster = NEOCluster(replicas=1, partitions=10)
s1, s2 = cluster.storage_list
......@@ -309,15 +309,12 @@ class Test(NEOThreadedTest):
# restart it with one storage only
cluster.reset()
try:
cluster.start(storage_list=(s1,), fast_startup=fast_startup)
cluster.start(storage_list=(s1,))
self.assertEqual(NodeStates.UNKNOWN, cluster.getNodeState(s2))
finally:
cluster.stop()
def testRestartWithMissingStorageFastStartup(self):
self.testRestartWithMissingStorage(True)
def testVerificationCommitUnfinishedTransactions(self, fast_startup=False):
def testVerificationCommitUnfinishedTransactions(self):
""" Verification step should commit unfinished transactions """
# translated from neo.tests.functional.testCluster.ClusterTests
cluster = NEOCluster()
......@@ -336,7 +333,7 @@ class Test(NEOThreadedTest):
self.assertEqual(dict.fromkeys(data_info, 1),
cluster.storage.getDataLockInfo())
try:
cluster.start(fast_startup=fast_startup)
cluster.start()
t, c = cluster.getTransaction()
# transaction should be verified and commited
self.assertEqual(c.root()[0], 'ok')
......@@ -344,9 +341,6 @@ class Test(NEOThreadedTest):
finally:
cluster.stop()
def testVerificationCommitUnfinishedTransactionsFastStartup(self):
self.testVerificationCommitUnfinishedTransactions(True)
def testStorageReconnectDuringStore(self):
cluster = NEOCluster(replicas=1)
try:
......@@ -386,5 +380,41 @@ class Test(NEOThreadedTest):
finally:
cluster.stop()
def testDropNodeThenRestartCluster(self):
""" Start a cluster with more than one storage, down one, shutdown the
cluster then restart it. The partition table recovered must not include
the dropped node """
def checkNodeState(state):
self.assertEqual(cluster.getNodeState(s1), state)
self.assertEqual(cluster.getNodeState(s2), NodeStates.RUNNING)
# start with two storage / one replica
cluster = NEOCluster(storage_count=2, replicas=1)
s1, s2 = cluster.storage_list
try:
cluster.start()
checkNodeState(NodeStates.RUNNING)
self.assertEqual([], cluster.getOudatedCells())
# drop one
cluster.neoctl.dropNode(s1.uuid)
checkNodeState(None)
cluster.tic() # Let node state update reach remaining storage
checkNodeState(None)
self.assertEqual([], cluster.getOudatedCells())
# restart with s2 only
finally:
cluster.stop()
cluster.reset()
try:
cluster.start(storage_list=[s2])
checkNodeState(None)
# then restart it, it must be in pending state
s1.start()
cluster.tic()
checkNodeState(NodeStates.PENDING)
finally:
cluster.stop()
if __name__ == "__main__":
unittest.main()
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!