Commit e1f9a7da authored by Julien Muchembled's avatar Julien Muchembled

Go back/stay in RECOVERING state when the partition table can't be operational

This fixes several cases where the partition table could become corrupt and
the whole cluster being stuck in VERIFYING state.

This also reduces the probability the have cells out of date when restarting
several storage nodes simultaneously.

At last, if a master node becomes primary again, a cluster must not be started
automatically if nodes with readable cells are missing, in order to avoid
a split of the database. This could happen if this master node was previously
forced to start it.
parent 7eb7cf1b
...@@ -775,6 +775,8 @@ class StartOperation(Packet): ...@@ -775,6 +775,8 @@ class StartOperation(Packet):
this message, it must not serve client nodes. PM -> S. this message, it must not serve client nodes. PM -> S.
""" """
_fmt = PStruct('start_operation', _fmt = PStruct('start_operation',
# XXX: Is this boolean needed ? Maybe this
# can be deduced from cluster state.
PBoolean('backup'), PBoolean('backup'),
) )
......
...@@ -77,7 +77,6 @@ class Application(BaseApplication): ...@@ -77,7 +77,6 @@ class Application(BaseApplication):
self.primary = None self.primary = None
self.primary_master_node = None self.primary_master_node = None
self.cluster_state = None self.cluster_state = None
self._startup_allowed = False
uuid = config.getUUID() uuid = config.getUUID()
if uuid: if uuid:
...@@ -254,7 +253,6 @@ class Application(BaseApplication): ...@@ -254,7 +253,6 @@ class Application(BaseApplication):
ptid = self.pt.setNextID() ptid = self.pt.setNextID()
packet = Packets.NotifyPartitionChanges(ptid, cell_list) packet = Packets.NotifyPartitionChanges(ptid, cell_list)
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
# TODO: notify masters
if node.isRunning() and not node.isMaster(): if node.isRunning() and not node.isMaster():
node.notify(packet) node.notify(packet)
...@@ -278,8 +276,13 @@ class Application(BaseApplication): ...@@ -278,8 +276,13 @@ class Application(BaseApplication):
if e.args[0] != ClusterStates.STARTING_BACKUP: if e.args[0] != ClusterStates.STARTING_BACKUP:
raise raise
self.backup_tid = tid = self.getLastTransaction() self.backup_tid = tid = self.getLastTransaction()
self.pt.setBackupTidDict({node.getUUID(): tid packet = Packets.StartOperation(True)
for node in self.nm.getStorageList(only_identified=True)}) tid_dict = {}
for node in self.nm.getStorageList(only_identified=True):
tid_dict[node.getUUID()] = tid
if node.isRunning():
node.notify(packet)
self.pt.setBackupTidDict(tid_dict)
def playPrimaryRole(self): def playPrimaryRole(self):
logging.info('play the primary role with %r', self.listening_conn) logging.info('play the primary role with %r', self.listening_conn)
...@@ -323,30 +326,44 @@ class Application(BaseApplication): ...@@ -323,30 +326,44 @@ class Application(BaseApplication):
in_conflict) in_conflict)
in_conflict.setUUID(None) in_conflict.setUUID(None)
# recover the cluster status at startup # Do not restart automatically if ElectionFailure is raised, in order
# to avoid a split of the database. For example, with 2 machines with
# a master and a storage on each one and replicas=1, the secondary
# master becomes primary in case of network failure between the 2
# machines but must not start automatically: otherwise, each storage
# node would diverge.
self._startup_allowed = False
try: try:
self.runManager(RecoveryManager)
while True: while True:
self.runManager(VerificationManager) self.runManager(RecoveryManager)
# Automatic restart if we become non-operational.
self._startup_allowed = True
try: try:
if self.backup_tid: self.runManager(VerificationManager)
if self.backup_app is None: if not self.backup_tid:
raise RuntimeError("No upstream cluster to backup" self.provideService()
" defined in configuration") # self.provideService only returns without raising
self.backup_app.provideService() # when switching to backup mode.
# Reset connection with storages (and go through a if self.backup_app is None:
# recovery phase) when leaving backup mode in order raise RuntimeError("No upstream cluster to backup"
# to get correct last oid/tid. " defined in configuration")
self.runManager(RecoveryManager) self.backup_app.provideService()
continue # All connections to storages are aborted when leaving
self.provideService() # backup mode so restart loop completely (recovery).
continue
except OperationFailure: except OperationFailure:
logging.critical('No longer operational') logging.critical('No longer operational')
node_list = []
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient(): if node.isStorage() or node.isClient():
node.notify(Packets.StopOperation()) conn = node.getConnection()
conn.notify(Packets.StopOperation())
if node.isClient(): if node.isClient():
node.getConnection().abort() conn.abort()
elif node.isRunning():
node.setPending()
node_list.append(node)
self.broadcastNodesInformation(node_list)
except StateChangedException, e: except StateChangedException, e:
assert e.args[0] == ClusterStates.STOPPING assert e.args[0] == ClusterStates.STOPPING
self.shutdown() self.shutdown()
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import OperationFailure
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets, from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets,
BrokenNodeDisallowedError, BrokenNodeDisallowedError,
...@@ -94,15 +95,13 @@ DISCONNECTED_STATE_DICT = { ...@@ -94,15 +95,13 @@ DISCONNECTED_STATE_DICT = {
class BaseServiceHandler(MasterHandler): class BaseServiceHandler(MasterHandler):
"""This class deals with events for a service phase.""" """This class deals with events for a service phase."""
def nodeLost(self, conn, node):
# This method provides a hook point overridable by service classes.
# It is triggered when a connection to a node gets lost.
pass
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID()) app = self.app
node = app.nm.getByUUID(conn.getUUID())
if node is None: if node is None:
return # for example, when a storage is removed by an admin return # for example, when a storage is removed by an admin
assert node.isStorage(), node
logging.info('storage node lost')
if new_state != NodeStates.BROKEN: if new_state != NodeStates.BROKEN:
new_state = DISCONNECTED_STATE_DICT.get(node.getType(), new_state = DISCONNECTED_STATE_DICT.get(node.getType(),
NodeStates.DOWN) NodeStates.DOWN)
...@@ -117,10 +116,11 @@ class BaseServiceHandler(MasterHandler): ...@@ -117,10 +116,11 @@ class BaseServiceHandler(MasterHandler):
# was in pending state, so drop it from the node manager to forget # was in pending state, so drop it from the node manager to forget
# it and do not set in running state when it comes back # it and do not set in running state when it comes back
logging.info('drop a pending node from the node manager') logging.info('drop a pending node from the node manager')
self.app.nm.remove(node) app.nm.remove(node)
self.app.broadcastNodesInformation([node]) app.broadcastNodesInformation([node])
# clean node related data in specialized handlers app.broadcastPartitionChanges(app.pt.outdate(node))
self.nodeLost(conn, node) if not app.pt.operational():
raise OperationFailure("cannot continue operation")
def notifyReady(self, conn): def notifyReady(self, conn):
self.app.setStorageReady(conn.getUUID()) self.app.setStorageReady(conn.getUUID())
......
...@@ -34,15 +34,16 @@ class StorageServiceHandler(BaseServiceHandler): ...@@ -34,15 +34,16 @@ class StorageServiceHandler(BaseServiceHandler):
if node.isRunning(): if node.isRunning():
conn.notify(Packets.StartOperation(bool(app.backup_tid))) conn.notify(Packets.StartOperation(bool(app.backup_tid)))
def nodeLost(self, conn, node): def connectionLost(self, conn, new_state):
logging.info('storage node lost')
assert not node.isRunning(), node.getState()
app = self.app app = self.app
app.broadcastPartitionChanges(app.pt.outdate(node)) node = app.nm.getByUUID(conn.getUUID())
if not app.pt.operational(): super(StorageServiceHandler, self).connectionLost(conn, new_state)
raise OperationFailure, 'cannot continue operation'
app.tm.forget(conn.getUUID()) app.tm.forget(conn.getUUID())
if app.getClusterState() == ClusterStates.BACKINGUP: if (app.getClusterState() == ClusterStates.BACKINGUP
# Also check if we're exiting, because backup_app is not usable
# in this case. Maybe cluster state should be set to something
# else, like STOPPING, during cleanup (__del__/close).
and app.listening_conn):
app.backup_app.nodeLost(node) app.backup_app.nodeLost(node)
if app.packing is not None: if app.packing is not None:
self.answerPack(conn, False) self.answerPack(conn, False)
......
...@@ -299,15 +299,19 @@ class PartitionTable(neo.lib.pt.PartitionTable): ...@@ -299,15 +299,19 @@ class PartitionTable(neo.lib.pt.PartitionTable):
yield offset, cell yield offset, cell
break break
def getReadableCellNodeSet(self): def getOperationalNodeSet(self):
""" """
Return a set of all nodes which are part of at least one UP TO DATE Return a set of all nodes which are part of at least one UP TO DATE
partition. partition. An empty list is returned if these nodes aren't enough to
become operational.
""" """
return {cell.getNode() node_set = set()
for row in self.partition_list for row in self.partition_list:
for cell in row if not any(cell.isReadable() and cell.getNode().isPending()
if cell.isReadable()} for cell in row):
return () # not operational
node_set.update(cell.getNode() for cell in row if cell.isReadable())
return node_set
def clearReplicating(self): def clearReplicating(self):
for row in self.partition_list: for row in self.partition_list:
......
...@@ -51,7 +51,7 @@ class RecoveryManager(MasterHandler): ...@@ -51,7 +51,7 @@ class RecoveryManager(MasterHandler):
app = self.app app = self.app
pt = app.pt pt = app.pt
app.changeClusterState(ClusterStates.RECOVERING) app.changeClusterState(ClusterStates.RECOVERING)
pt.setID(None) pt.clear()
# collect the last partition table available # collect the last partition table available
poll = app.em.poll poll = app.em.poll
...@@ -60,7 +60,7 @@ class RecoveryManager(MasterHandler): ...@@ -60,7 +60,7 @@ class RecoveryManager(MasterHandler):
if pt.filled(): if pt.filled():
# A partition table exists, we are starting an existing # A partition table exists, we are starting an existing
# cluster. # cluster.
node_list = pt.getReadableCellNodeSet() node_list = pt.getOperationalNodeSet()
if app._startup_allowed: if app._startup_allowed:
node_list = [node for node in node_list if node.isPending()] node_list = [node for node in node_list if node.isPending()]
elif not all(node.isPending() for node in node_list): elif not all(node.isPending() for node in node_list):
...@@ -91,10 +91,16 @@ class RecoveryManager(MasterHandler): ...@@ -91,10 +91,16 @@ class RecoveryManager(MasterHandler):
# reset IDs generators & build new partition with running nodes # reset IDs generators & build new partition with running nodes
app.tm.setLastOID(ZERO_OID) app.tm.setLastOID(ZERO_OID)
pt.make(node_list) pt.make(node_list)
self._broadcastPartitionTable(pt.getID(), pt.getRowList()) self._notifyAdmins(Packets.SendPartitionTable(
elif app.backup_tid: pt.getID(), pt.getRowList()))
pt.setBackupTidDict(self.backup_tid_dict) else:
app.backup_tid = pt.getBackupTid() cell_list = pt.outdate()
if cell_list:
self._notifyAdmins(Packets.NotifyPartitionChanges(
pt.setNextID(), cell_list))
if app.backup_tid:
pt.setBackupTidDict(self.backup_tid_dict)
app.backup_tid = pt.getBackupTid()
app.setLastTransaction(app.tm.getLastTID()) app.setLastTransaction(app.tm.getLastTID())
logging.debug('cluster starts with loid=%s and this partition table :', logging.debug('cluster starts with loid=%s and this partition table :',
...@@ -132,20 +138,15 @@ class RecoveryManager(MasterHandler): ...@@ -132,20 +138,15 @@ class RecoveryManager(MasterHandler):
logging.warn('Got %s while waiting %s', dump(ptid), logging.warn('Got %s while waiting %s', dump(ptid),
dump(self.target_ptid)) dump(self.target_ptid))
else: else:
self._broadcastPartitionTable(ptid, row_list) try:
new_nodes = self.app.pt.load(ptid, row_list, self.app.nm)
except IndexError:
raise ProtocolError('Invalid offset')
self._notifyAdmins(Packets.NotifyNodeInformation(new_nodes),
Packets.SendPartitionTable(ptid, row_list))
self.app.backup_tid = self.backup_tid_dict[conn.getUUID()] self.app.backup_tid = self.backup_tid_dict[conn.getUUID()]
def _broadcastPartitionTable(self, ptid, row_list): def _notifyAdmins(self, *packets):
try: for node in self.app.nm.getAdminList(only_identified=True):
new_nodes = self.app.pt.load(ptid, row_list, self.app.nm) for packet in packets:
except IndexError: node.notify(packet)
raise ProtocolError('Invalid offset')
else:
notification = Packets.NotifyNodeInformation(new_nodes)
ptid = self.app.pt.getID()
row_list = self.app.pt.getRowList()
partition_table = Packets.SendPartitionTable(ptid, row_list)
# notify the admin nodes
for node in self.app.nm.getAdminList(only_identified=True):
node.notify(notification)
node.notify(partition_table)
...@@ -21,21 +21,7 @@ from neo.lib.protocol import ClusterStates, Packets, NodeStates ...@@ -21,21 +21,7 @@ from neo.lib.protocol import ClusterStates, Packets, NodeStates
from .handlers import BaseServiceHandler from .handlers import BaseServiceHandler
class VerificationFailure(Exception):
"""
Exception raised each time the cluster integrity failed.
- An required storage node is missing
- A transaction or an object is missing on a node
"""
pass
class VerificationManager(BaseServiceHandler): class VerificationManager(BaseServiceHandler):
"""
Manager for verification step of a NEO cluster:
- Wait for at least one available storage per partition
- Check if all expected content is present
"""
def __init__(self, app): def __init__(self, app):
self._locked_dict = {} self._locked_dict = {}
...@@ -44,18 +30,13 @@ class VerificationManager(BaseServiceHandler): ...@@ -44,18 +30,13 @@ class VerificationManager(BaseServiceHandler):
def _askStorageNodesAndWait(self, packet, node_list): def _askStorageNodesAndWait(self, packet, node_list):
poll = self.app.em.poll poll = self.app.em.poll
operational = self.app.pt.operational
uuid_set = self._uuid_set uuid_set = self._uuid_set
uuid_set.clear() uuid_set.clear()
for node in node_list: for node in node_list:
uuid_set.add(node.getUUID()) uuid_set.add(node.getUUID())
node.ask(packet) node.ask(packet)
while True: while uuid_set:
poll(1) poll(1)
if not operational():
raise VerificationFailure
if not uuid_set:
break
def getHandler(self): def getHandler(self):
return self return self
...@@ -76,26 +57,13 @@ class VerificationManager(BaseServiceHandler): ...@@ -76,26 +57,13 @@ class VerificationManager(BaseServiceHandler):
return state, self return state, self
def run(self): def run(self):
self.app.changeClusterState(ClusterStates.VERIFYING) app = self.app
while True: app.changeClusterState(ClusterStates.VERIFYING)
try: if not app.backup_tid:
self.verifyData() self.verifyData()
except VerificationFailure:
continue
break
# At this stage, all non-working nodes are out-of-date.
self.app.broadcastPartitionChanges(self.app.pt.outdate())
def verifyData(self): def verifyData(self):
app = self.app app = self.app
# wait for any missing node
logging.debug('waiting for the cluster to be operational')
while not app.pt.operational():
app.em.poll(1)
if app.backup_tid:
return
logging.info('start to verify data') logging.info('start to verify data')
getIdentifiedList = app.nm.getIdentifiedList getIdentifiedList = app.nm.getIdentifiedList
...@@ -156,7 +124,6 @@ class VerificationManager(BaseServiceHandler): ...@@ -156,7 +124,6 @@ class VerificationManager(BaseServiceHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
pass pass
def nodeLost(self, conn, node): def connectionLost(self, conn, new_state):
if not self.app.pt.operational(): self._uuid_set.discard(conn.getUUID())
raise VerificationFailure, 'cannot continue verification' super(VerificationManager, self).connectionLost(conn, new_state)
...@@ -16,13 +16,20 @@ ...@@ -16,13 +16,20 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.util import dump from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError from neo.lib.protocol import Packets, ProtocolError, ZERO_TID
from . import BaseMasterHandler from . import BaseMasterHandler
class MasterOperationHandler(BaseMasterHandler): class MasterOperationHandler(BaseMasterHandler):
""" This handler is used for the primary master """ """ This handler is used for the primary master """
def startOperation(self, conn, backup):
# XXX: see comment in protocol
assert self.app.operational and backup
dm = self.app.dm
if not dm.getBackupTID():
dm.setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
def notifyTransactionFinished(self, conn, *args, **kw): def notifyTransactionFinished(self, conn, *args, **kw):
self.app.replicator.transactionFinished(*args, **kw) self.app.replicator.transactionFinished(*args, **kw)
......
...@@ -49,6 +49,7 @@ class VerificationHandler(BaseMasterHandler): ...@@ -49,6 +49,7 @@ class VerificationHandler(BaseMasterHandler):
def startOperation(self, conn, backup): def startOperation(self, conn, backup):
self.app.operational = True self.app.operational = True
# XXX: see comment in protocol
dm = self.app.dm dm = self.app.dm
if backup: if backup:
if dm.getBackupTID(): if dm.getBackupTID():
......
...@@ -77,7 +77,7 @@ class ClusterTests(NEOFunctionalTest): ...@@ -77,7 +77,7 @@ class ClusterTests(NEOFunctionalTest):
self.neo.expectClusterRunning() self.neo.expectClusterRunning()
self.neo.expectOudatedCells(number=0) self.neo.expectOudatedCells(number=0)
self.neo.killStorage() self.neo.killStorage()
self.neo.expectClusterVerifying() self.neo.expectClusterRecovering()
def testClusterBreaksWithTwoNodes(self): def testClusterBreaksWithTwoNodes(self):
self.neo = NEOCluster(['test_neo1', 'test_neo2'], self.neo = NEOCluster(['test_neo1', 'test_neo2'],
...@@ -88,7 +88,7 @@ class ClusterTests(NEOFunctionalTest): ...@@ -88,7 +88,7 @@ class ClusterTests(NEOFunctionalTest):
self.neo.expectClusterRunning() self.neo.expectClusterRunning()
self.neo.expectOudatedCells(number=0) self.neo.expectOudatedCells(number=0)
self.neo.killStorage() self.neo.killStorage()
self.neo.expectClusterVerifying() self.neo.expectClusterRecovering()
def testClusterDoesntBreakWithTwoNodesOneReplica(self): def testClusterDoesntBreakWithTwoNodesOneReplica(self):
self.neo = NEOCluster(['test_neo1', 'test_neo2'], self.neo = NEOCluster(['test_neo1', 'test_neo2'],
...@@ -127,7 +127,7 @@ class ClusterTests(NEOFunctionalTest): ...@@ -127,7 +127,7 @@ class ClusterTests(NEOFunctionalTest):
self.assertEqual(len(self.neo.getClientlist()), 1) self.assertEqual(len(self.neo.getClientlist()), 1)
# drop the storage, the cluster is no more operational... # drop the storage, the cluster is no more operational...
self.neo.getStorageProcessList()[0].stop() self.neo.getStorageProcessList()[0].stop()
self.neo.expectClusterVerifying() self.neo.expectClusterRecovering()
# ...and the client gets disconnected # ...and the client gets disconnected
self.assertEqual(len(self.neo.getClientlist()), 0) self.assertEqual(len(self.neo.getClientlist()), 0)
# restart storage so that the cluster is operational again # restart storage so that the cluster is operational again
......
...@@ -179,7 +179,7 @@ class StorageTests(NEOFunctionalTest): ...@@ -179,7 +179,7 @@ class StorageTests(NEOFunctionalTest):
# Cluster not operational anymore. Only cells of second storage that # Cluster not operational anymore. Only cells of second storage that
# were shared with the third one should become outdated. # were shared with the third one should become outdated.
self.neo.expectUnavailable(started[1]) self.neo.expectUnavailable(started[1])
self.neo.expectClusterVerifying() self.neo.expectClusterRecovering()
self.neo.expectOudatedCells(3) self.neo.expectOudatedCells(3)
def testVerificationTriggered(self): def testVerificationTriggered(self):
...@@ -200,7 +200,7 @@ class StorageTests(NEOFunctionalTest): ...@@ -200,7 +200,7 @@ class StorageTests(NEOFunctionalTest):
# stop it, the cluster must switch to verification # stop it, the cluster must switch to verification
started[0].stop() started[0].stop()
self.neo.expectUnavailable(started[0]) self.neo.expectUnavailable(started[0])
self.neo.expectClusterVerifying() self.neo.expectClusterRecovering()
# client must have been disconnected # client must have been disconnected
self.assertEqual(len(self.neo.getClientlist()), 0) self.assertEqual(len(self.neo.getClientlist()), 0)
conn.close() conn.close()
...@@ -245,7 +245,7 @@ class StorageTests(NEOFunctionalTest): ...@@ -245,7 +245,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectUnavailable(started[1]) self.neo.expectUnavailable(started[1])
self.neo.expectUnavailable(started[2]) self.neo.expectUnavailable(started[2])
self.neo.expectOudatedCells(number=20) self.neo.expectOudatedCells(number=20)
self.neo.expectClusterVerifying() self.neo.expectClusterRecovering()
def testConflictingStorageRejected(self): def testConflictingStorageRejected(self):
""" Check that a storage coming after the recovery process with the same """ Check that a storage coming after the recovery process with the same
...@@ -403,7 +403,7 @@ class StorageTests(NEOFunctionalTest): ...@@ -403,7 +403,7 @@ class StorageTests(NEOFunctionalTest):
self.neo.expectUnavailable(started[0]) self.neo.expectUnavailable(started[0])
self.neo.expectUnavailable(started[1]) self.neo.expectUnavailable(started[1])
self.neo.expectOudatedCells(number=10) self.neo.expectOudatedCells(number=10)
self.neo.expectClusterVerifying() self.neo.expectClusterRecovering()
# XXX: need to sync with storages first # XXX: need to sync with storages first
self.neo.stop() self.neo.stop()
......
...@@ -177,60 +177,6 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -177,60 +177,6 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
self.assertEqual(node2.getState(), state) self.assertEqual(node2.getState(), state)
self.assertEqual(lptid, self.app.pt.getID()) self.assertEqual(lptid, self.app.pt.getID())
def test_nodeLostAfterAskLockInformation(self):
# 2 storage nodes, one will die
node1, conn1 = self._getStorage()
node2, conn2 = self._getStorage()
# client nodes, to distinguish answers for the sample transactions
client1, cconn1 = self._getClient()
client2, cconn2 = self._getClient()
client3, cconn3 = self._getClient()
oid_list = [self.getOID(), ]
# Some shortcuts to simplify test code
self.app.pt = Mock({'operational': True})
# Register some transactions
tm = self.app.tm
# Transaction 1: 2 storage nodes involved, one will die and the other
# already answered node lock
msg_id_1 = 1
ttid1 = tm.begin(client1)
tid1 = tm.prepare(ttid1, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_1)
tm.lock(ttid1, node2.getUUID())
# storage 1 request a notification at commit
tm. registerForNotification(node1.getUUID())
self.checkNoPacketSent(cconn1)
# Storage 1 dies
node1.setTemporarilyDown()
self.service.nodeLost(conn1, node1)
# T1: last locking node lost, client receives AnswerTransactionFinished
self.checkAnswerTransactionFinished(cconn1)
self.checkNotifyTransactionFinished(conn1)
self.checkNotifyUnlockInformation(conn2)
# ...and notifications are sent to other clients
self.checkInvalidateObjects(cconn2)
self.checkInvalidateObjects(cconn3)
# Transaction 2: 2 storage nodes involved, one will die
msg_id_2 = 2
ttid2 = tm.begin(node1)
tid2 = tm.prepare(ttid2, 1, oid_list,
[node1.getUUID(), node2.getUUID()], msg_id_2)
# T2: pending locking answer, client keeps waiting
self.checkNoPacketSent(cconn2, check_notify=False)
tm.remove(node1.getUUID(), ttid2)
# Transaction 3: 1 storage node involved, which won't die
msg_id_3 = 3
ttid3 = tm.begin(node1)
tid3 = tm.prepare(ttid3, 1, oid_list,
[node2.getUUID(), ], msg_id_3)
# T3: action not significant to this transacion, so no response
self.checkNoPacketSent(cconn3, check_notify=False)
tm.remove(node1.getUUID(), ttid3)
def test_answerPack(self): def test_answerPack(self):
# Note: incomming status has no meaning here, so it's left to False. # Note: incomming status has no meaning here, so it's left to False.
node1, conn1 = self._getStorage() node1, conn1 = self._getStorage()
......
...@@ -33,7 +33,7 @@ from neo.lib import logging ...@@ -33,7 +33,7 @@ from neo.lib import logging
from neo.lib.connection import BaseConnection, Connection from neo.lib.connection import BaseConnection, Connection
from neo.lib.connector import SocketConnector, ConnectorException from neo.lib.connector import SocketConnector, ConnectorException
from neo.lib.locking import SimpleQueue from neo.lib.locking import SimpleQueue
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, NodeTypes from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes
from neo.lib.util import cached_property, parseMasterList, p64 from neo.lib.util import cached_property, parseMasterList, p64
from .. import NeoTestBase, Patch, getTempDirectory, setupMySQLdb, \ from .. import NeoTestBase, Patch, getTempDirectory, setupMySQLdb, \
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_USER ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_USER
...@@ -478,6 +478,8 @@ class ConnectionFilter(object): ...@@ -478,6 +478,8 @@ class ConnectionFilter(object):
queue.appendleft(packet) queue.appendleft(packet)
break break
else: else:
if conn.isClosed():
return
cls._addPacket(conn, packet) cls._addPacket(conn, packet)
continue continue
break break
...@@ -731,9 +733,12 @@ class NEOCluster(object): ...@@ -731,9 +733,12 @@ class NEOCluster(object):
return node[3] return node[3]
def getOutdatedCells(self): def getOutdatedCells(self):
return [cell for row in self.neoctl.getPartitionRowList()[1] # Ask the admin instead of the primary master to check that it is
for cell in row[1] # notified of every change.
if cell[1] == CellStates.OUT_OF_DATE] return [(i, cell.getUUID())
for i, row in enumerate(self.admin.pt.partition_list)
for cell in row
if not cell.isReadable()]
def getZODBStorage(self, **kw): def getZODBStorage(self, **kw):
kw['_app'] = kw.pop('client', self.client) kw['_app'] = kw.pop('client', self.client)
......
...@@ -394,6 +394,62 @@ class Test(NEOThreadedTest): ...@@ -394,6 +394,62 @@ class Test(NEOThreadedTest):
finally: finally:
cluster.stop() cluster.stop()
def testRestartStoragesWithReplicas(self):
"""
Check that the master must discard its partition table when the
cluster is not operational anymore. Which means that it must go back
to RECOVERING state and remain there as long as the partition table
can't be operational.
This also checks that if the master remains the primary one after going
back to recovery, it automatically starts the cluster if possible
(i.e. without manual intervention).
"""
outdated = []
def doOperation(orig):
outdated.append(cluster.getOutdatedCells())
orig()
def stop():
with cluster.master.filterConnection(s0) as m2s0:
m2s0.add(lambda conn, packet:
isinstance(packet, Packets.NotifyPartitionChanges))
s1.stop()
cluster.join((s1,))
self.assertEqual(getClusterState(), ClusterStates.RUNNING)
self.assertEqual(cluster.getOutdatedCells(),
[(0, s1.uuid), (1, s1.uuid)])
s0.stop()
cluster.join((s0,))
self.assertNotEqual(getClusterState(), ClusterStates.RUNNING)
s0.resetNode()
s1.resetNode()
cluster = NEOCluster(storage_count=2, partitions=2, replicas=1)
try:
cluster.start()
s0, s1 = cluster.storage_list
getClusterState = cluster.neoctl.getClusterState
if 1:
# Scenario 1: When all storage nodes are restarting,
# we want a chance to not restart with outdated cells.
stop()
with Patch(s1, doOperation=doOperation):
s0.start()
s1.start()
self.tic()
self.assertEqual(getClusterState(), ClusterStates.RUNNING)
self.assertEqual(outdated, [[]])
if 1:
# Scenario 2: When only the first storage node to be stopped
# is started, the cluster must be able to restart.
stop()
s1.start()
self.tic()
# The master doesn't wait for s0 to come back.
self.assertEqual(getClusterState(), ClusterStates.RUNNING)
self.assertEqual(cluster.getOutdatedCells(),
[(0, s0.uuid), (1, s0.uuid)])
finally:
cluster.stop()
def testVerificationCommitUnfinishedTransactions(self): def testVerificationCommitUnfinishedTransactions(self):
""" Verification step should commit locked transactions """ """ Verification step should commit locked transactions """
def delayUnlockInformation(conn, packet): def delayUnlockInformation(conn, packet):
...@@ -588,12 +644,17 @@ class Test(NEOThreadedTest): ...@@ -588,12 +644,17 @@ class Test(NEOThreadedTest):
cluster.start() cluster.start()
# prevent storage to reconnect, in order to easily test # prevent storage to reconnect, in order to easily test
# that cluster becomes non-operational # that cluster becomes non-operational
storage.connectToPrimary = sys.exit with Patch(storage, connectToPrimary=sys.exit):
# send an unexpected to master so it aborts connection to storage # send an unexpected to master so it aborts connection to storage
storage.master_conn.answer(Packets.Pong()) storage.master_conn.answer(Packets.Pong())
self.tic()
self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.RECOVERING)
storage.resetNode()
storage.start()
self.tic() self.tic()
self.assertEqual(cluster.neoctl.getClusterState(), self.assertEqual(cluster.neoctl.getClusterState(),
ClusterStates.VERIFYING) ClusterStates.RUNNING)
finally: finally:
cluster.stop() cluster.stop()
......
...@@ -424,7 +424,7 @@ class ReplicationTests(NEOThreadedTest): ...@@ -424,7 +424,7 @@ class ReplicationTests(NEOThreadedTest):
check(ClusterStates.RUNNING, 1) check(ClusterStates.RUNNING, 1)
cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None) cluster.neoctl.checkReplicas(check_dict, ZERO_TID, None)
self.tic() self.tic()
check(ClusterStates.VERIFYING, 4) check(ClusterStates.RECOVERING, 4)
finally: finally:
checker.CHECK_COUNT = CHECK_COUNT checker.CHECK_COUNT = CHECK_COUNT
cluster.stop() cluster.stop()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment