Commit 3e3eab5b authored by Julien Muchembled's avatar Julien Muchembled

Perform DB truncation during recovery, send PT to storages before verification

Currently, the database may only be truncated when leaving backup mode, but
the issue will be the same when neoctl gets a new command to truncate at an
arbitrary tid: we want to be sure that all nodes are truncated before anything
else.

Therefore, we stop sending Truncate orders before stopping operation because
nodes could fail/exit before actually processing them. Truncation must also
happen before asking nodes their last ids.

With this commit, if a truncation is requested:
- this is always the first thing done when a storage node connects to the
  primary master during the RECOVERING phase,
- and the cluster does not start automatically if there are missing nodes,
  unless an admin forces it.

Other changes:
- Connections to storage nodes don't need to be aborted anymore when leaving
  backup mode.
- The master always initiates communication when a storage node identifies,
  which simplifies code and reduces the number of exchanged packets.
parent 2485f151
...@@ -331,6 +331,7 @@ class Application(BaseApplication): ...@@ -331,6 +331,7 @@ class Application(BaseApplication):
# machines but must not start automatically: otherwise, each storage # machines but must not start automatically: otherwise, each storage
# node would diverge. # node would diverge.
self._startup_allowed = False self._startup_allowed = False
self.truncate_tid = None
try: try:
while True: while True:
self.runManager(RecoveryManager) self.runManager(RecoveryManager)
...@@ -345,12 +346,10 @@ class Application(BaseApplication): ...@@ -345,12 +346,10 @@ class Application(BaseApplication):
if self.backup_app is None: if self.backup_app is None:
raise RuntimeError("No upstream cluster to backup" raise RuntimeError("No upstream cluster to backup"
" defined in configuration") " defined in configuration")
self.backup_app.provideService() self.truncate_tid = self.backup_app.provideService()
# All connections to storages are aborted when leaving
# backup mode so restart loop completely (recovery).
continue
except OperationFailure: except OperationFailure:
logging.critical('No longer operational') logging.critical('No longer operational')
self.truncate_tid = None
node_list = [] node_list = []
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient(): if node.isStorage() or node.isClient():
...@@ -442,7 +441,7 @@ class Application(BaseApplication): ...@@ -442,7 +441,7 @@ class Application(BaseApplication):
continue # keep handler continue # keep handler
if type(handler) is not type(conn.getLastHandler()): if type(handler) is not type(conn.getLastHandler()):
conn.setHandler(handler) conn.setHandler(handler)
handler.connectionCompleted(conn) handler.connectionCompleted(conn, new=False)
self.cluster_state = state self.cluster_state = state
def getNewUUID(self, uuid, address, node_type): def getNewUUID(self, uuid, address, node_type):
......
...@@ -152,24 +152,18 @@ class BackupApplication(object): ...@@ -152,24 +152,18 @@ class BackupApplication(object):
assert tid != ZERO_TID assert tid != ZERO_TID
logging.warning("Truncating at %s (last_tid was %s)", logging.warning("Truncating at %s (last_tid was %s)",
dump(app.backup_tid), dump(last_tid)) dump(app.backup_tid), dump(last_tid))
# XXX: We want to go through a recovery phase in order to # We will really truncate so do not start automatically
# initialize the transaction manager, but this is only # if there's any missing storage.
# possible if storages already know that we left backup app._startup_allowed = False
# mode. To that purpose, we always send a Truncate packet,
# even if there's nothing to truncate.
p = Packets.Truncate(tid)
for node in app.nm.getStorageList(only_identified=True):
conn = node.getConnection()
conn.setHandler(handler)
node.setState(NodeStates.TEMPORARILY_DOWN)
# Packets will be sent at the beginning of the recovery
# phase.
conn.notify(p)
conn.abort()
# If any error happened before reaching this line, we'd go back # If any error happened before reaching this line, we'd go back
# to backup mode, which is the right mode to recover. # to backup mode, which is the right mode to recover.
del app.backup_tid del app.backup_tid
break # We will go through a recovery phase in order to reset the
# transaction manager and this is only possible if storages
# already know that we left backup mode. To that purpose, we
# always stop operation with a tid, even if there's nothing to
# truncate.
return tid
finally: finally:
del self.primary_partition_dict, self.tid_list del self.primary_partition_dict, self.tid_list
pt.clearReplicating() pt.clearReplicating()
......
...@@ -24,6 +24,10 @@ from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets, ...@@ -24,6 +24,10 @@ from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets,
class MasterHandler(EventHandler): class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
def connectionCompleted(self, conn, new=None):
if new is None:
super(MasterHandler, self).connectionCompleted(conn)
def requestIdentification(self, conn, node_type, uuid, address, name): def requestIdentification(self, conn, node_type, uuid, address, name):
self.checkClusterName(name) self.checkClusterName(name)
app = self.app app = self.app
...@@ -74,13 +78,16 @@ class MasterHandler(EventHandler): ...@@ -74,13 +78,16 @@ class MasterHandler(EventHandler):
conn.answer(Packets.AnswerLastTransaction( conn.answer(Packets.AnswerLastTransaction(
self.app.getLastTransaction())) self.app.getLastTransaction()))
def askNodeInformation(self, conn): def _notifyNodeInformation(self, conn):
nm = self.app.nm nm = self.app.nm
node_list = [] node_list = []
node_list.extend(n.asTuple() for n in nm.getMasterList()) node_list.extend(n.asTuple() for n in nm.getMasterList())
node_list.extend(n.asTuple() for n in nm.getClientList()) node_list.extend(n.asTuple() for n in nm.getClientList())
node_list.extend(n.asTuple() for n in nm.getStorageList()) node_list.extend(n.asTuple() for n in nm.getStorageList())
conn.notify(Packets.NotifyNodeInformation(node_list)) conn.notify(Packets.NotifyNodeInformation(node_list))
def askNodeInformation(self, conn):
self._notifyNodeInformation(conn)
conn.answer(Packets.AnswerNodeInformation()) conn.answer(Packets.AnswerNodeInformation())
def askPartitionTable(self, conn): def askPartitionTable(self, conn):
...@@ -95,6 +102,11 @@ DISCONNECTED_STATE_DICT = { ...@@ -95,6 +102,11 @@ DISCONNECTED_STATE_DICT = {
class BaseServiceHandler(MasterHandler): class BaseServiceHandler(MasterHandler):
"""This class deals with events for a service phase.""" """This class deals with events for a service phase."""
def connectionCompleted(self, conn, new):
self._notifyNodeInformation(conn)
pt = self.app.pt
conn.notify(Packets.SendPartitionTable(pt.getID(), pt.getRowList()))
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
app = self.app app = self.app
node = app.nm.getByUUID(conn.getUUID()) node = app.nm.getByUUID(conn.getUUID())
......
...@@ -20,9 +20,6 @@ from . import MasterHandler ...@@ -20,9 +20,6 @@ from . import MasterHandler
class ClientServiceHandler(MasterHandler): class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """ """ Handler dedicated to client during service state """
def connectionCompleted(self, conn):
pass
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
# cancel its transactions and forgot the node # cancel its transactions and forgot the node
app = self.app app = self.app
......
...@@ -26,7 +26,7 @@ class IdentificationHandler(MasterHandler): ...@@ -26,7 +26,7 @@ class IdentificationHandler(MasterHandler):
**kw) **kw)
handler = conn.getHandler() handler = conn.getHandler()
assert not isinstance(handler, IdentificationHandler), handler assert not isinstance(handler, IdentificationHandler), handler
handler.connectionCompleted(conn) handler.connectionCompleted(conn, True)
def _setupNode(self, conn, node_type, uuid, address, node): def _setupNode(self, conn, node_type, uuid, address, node):
app = self.app app = self.app
......
...@@ -24,12 +24,13 @@ from . import BaseServiceHandler ...@@ -24,12 +24,13 @@ from . import BaseServiceHandler
class StorageServiceHandler(BaseServiceHandler): class StorageServiceHandler(BaseServiceHandler):
""" Handler dedicated to storages during service state """ """ Handler dedicated to storages during service state """
def connectionCompleted(self, conn): def connectionCompleted(self, conn, new):
# TODO: unit test
app = self.app app = self.app
uuid = conn.getUUID() uuid = conn.getUUID()
node = app.nm.getByUUID(uuid) node = app.nm.getByUUID(uuid)
app.setStorageNotReady(uuid) app.setStorageNotReady(uuid)
if new:
super(StorageServiceHandler, self).connectionCompleted(conn, new)
# XXX: what other values could happen ? # XXX: what other values could happen ?
if node.isRunning(): if node.isRunning():
conn.notify(Packets.StartOperation(bool(app.backup_tid))) conn.notify(Packets.StartOperation(bool(app.backup_tid)))
......
...@@ -128,7 +128,10 @@ class RecoveryManager(MasterHandler): ...@@ -128,7 +128,10 @@ class RecoveryManager(MasterHandler):
# broadcast to all so that admin nodes gets informed # broadcast to all so that admin nodes gets informed
self.app.broadcastNodesInformation([node]) self.app.broadcastNodesInformation([node])
def connectionCompleted(self, conn): def connectionCompleted(self, conn, new):
tid = self.app.truncate_tid
if tid:
conn.notify(Packets.Truncate(tid))
# ask the last IDs to perform the recovery # ask the last IDs to perform the recovery
conn.ask(Packets.AskLastIDs()) conn.ask(Packets.AskLastIDs())
......
...@@ -137,9 +137,6 @@ class VerificationManager(BaseServiceHandler): ...@@ -137,9 +137,6 @@ class VerificationManager(BaseServiceHandler):
self._uuid_set.remove(conn.getUUID()) self._uuid_set.remove(conn.getUUID())
self._tid = tid self._tid = tid
def connectionCompleted(self, conn):
pass
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
self._uuid_set.discard(conn.getUUID()) self._uuid_set.discard(conn.getUUID())
super(VerificationManager, self).connectionLost(conn, new_state) super(VerificationManager, self).connectionLost(conn, new_state)
...@@ -30,7 +30,7 @@ from neo.lib.bootstrap import BootstrapManager ...@@ -30,7 +30,7 @@ from neo.lib.bootstrap import BootstrapManager
from .checker import Checker from .checker import Checker
from .database import buildDatabaseManager from .database import buildDatabaseManager
from .exception import AlreadyPendingError from .exception import AlreadyPendingError
from .handlers import identification, verification, initialization from .handlers import identification, initialization
from .handlers import master, hidden from .handlers import master, hidden
from .replicator import Replicator from .replicator import Replicator
from .transactions import TransactionManager from .transactions import TransactionManager
...@@ -193,14 +193,11 @@ class Application(BaseApplication): ...@@ -193,14 +193,11 @@ class Application(BaseApplication):
self.event_queue = deque() self.event_queue = deque()
self.event_queue_dict = {} self.event_queue_dict = {}
try: try:
self.verifyData()
self.initialize() self.initialize()
self.doOperation() self.doOperation()
raise RuntimeError, 'should not reach here' raise RuntimeError, 'should not reach here'
except OperationFailure, msg: except OperationFailure, msg:
logging.error('operation stopped: %s', msg) logging.error('operation stopped: %s', msg)
if self.cluster_state == ClusterStates.STOPPING_BACKUP:
self.dm.setBackupTID(None)
except PrimaryFailure, msg: except PrimaryFailure, msg:
logging.error('primary master is down: %s', msg) logging.error('primary master is down: %s', msg)
finally: finally:
...@@ -247,30 +244,11 @@ class Application(BaseApplication): ...@@ -247,30 +244,11 @@ class Application(BaseApplication):
self.pt = PartitionTable(num_partitions, num_replicas) self.pt = PartitionTable(num_partitions, num_replicas)
self.loadPartitionTable() self.loadPartitionTable()
def verifyData(self):
"""Verify data under the control by a primary master node.
Connections from client nodes may not be accepted at this stage."""
logging.info('verifying data')
handler = verification.VerificationHandler(self)
self.master_conn.setHandler(handler)
_poll = self._poll
while not self.operational:
_poll()
def initialize(self): def initialize(self):
""" Retreive partition table and node informations from the primary """
logging.debug('initializing...') logging.debug('initializing...')
_poll = self._poll _poll = self._poll
handler = initialization.InitializationHandler(self) self.master_conn.setHandler(initialization.InitializationHandler(self))
self.master_conn.setHandler(handler) while not self.operational:
# ask node list and partition table
self.pt.clear()
self.master_conn.ask(Packets.AskNodeInformation())
self.master_conn.ask(Packets.AskPartitionTable())
while self.master_conn.isPending():
_poll() _poll()
self.ready = True self.ready = True
self.replicator.populate() self.replicator.populate()
......
...@@ -504,11 +504,9 @@ class DatabaseManager(object): ...@@ -504,11 +504,9 @@ class DatabaseManager(object):
def truncate(self, tid): def truncate(self, tid):
assert tid not in (None, ZERO_TID), tid assert tid not in (None, ZERO_TID), tid
assert self.getBackupTID()
self.setBackupTID(None) # XXX
for partition in xrange(self.getNumPartitions()): for partition in xrange(self.getNumPartitions()):
self._deleteRange(partition, tid) self._deleteRange(partition, tid)
self.commit() self.setBackupTID(None) # this also commits
def getTransaction(self, tid, all = False): def getTransaction(self, tid, all = False):
"""Return a tuple of the list of OIDs, user information, """Return a tuple of the list of OIDs, user information,
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.exception import PrimaryFailure, OperationFailure from neo.lib.exception import PrimaryFailure, OperationFailure
from neo.lib.protocol import uuid_str, NodeStates, NodeTypes from neo.lib.protocol import uuid_str, NodeStates, NodeTypes, Packets
class BaseMasterHandler(EventHandler): class BaseMasterHandler(EventHandler):
......
...@@ -15,24 +15,23 @@ ...@@ -15,24 +15,23 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import BaseMasterHandler from . import BaseMasterHandler
from neo.lib import logging, protocol from neo.lib import logging
from neo.lib.protocol import Packets, ProtocolError, ZERO_TID
class InitializationHandler(BaseMasterHandler): class InitializationHandler(BaseMasterHandler):
def answerNodeInformation(self, conn): def answerNodeInformation(self, conn):
pass pass
def answerPartitionTable(self, conn, ptid, row_list): def sendPartitionTable(self, conn, ptid, row_list):
app = self.app app = self.app
pt = app.pt pt = app.pt
pt.load(ptid, row_list, self.app.nm) pt.load(ptid, row_list, self.app.nm)
if not pt.filled(): if not pt.filled():
raise protocol.ProtocolError('Partial partition table received') raise ProtocolError('Partial partition table received')
logging.debug('Got the partition table:')
self.app.pt.log()
# Install the partition table into the database for persistency. # Install the partition table into the database for persistency.
cell_list = [] cell_list = []
num_partitions = app.pt.getPartitions() num_partitions = pt.getPartitions()
unassigned_set = set(xrange(num_partitions)) unassigned_set = set(xrange(num_partitions))
for offset in xrange(num_partitions): for offset in xrange(num_partitions):
for cell in pt.getCellList(offset): for cell in pt.getCellList(offset):
...@@ -46,12 +45,39 @@ class InitializationHandler(BaseMasterHandler): ...@@ -46,12 +45,39 @@ class InitializationHandler(BaseMasterHandler):
app.dm.changePartitionTable(ptid, cell_list, reset=True) app.dm.changePartitionTable(ptid, cell_list, reset=True)
def notifyPartitionChanges(self, conn, ptid, cell_list): def truncate(self, conn, tid):
# XXX: This is safe to ignore those notifications because all of the self.app.dm.truncate(tid)
# following applies:
# - we first ask for node information, and *then* partition def askLastIDs(self, conn):
# table content, so it is possible to get notifyPartitionChanges app = self.app
# packets in between (or even before asking for node information). ltid, _, _, loid = app.dm.getLastIDs()
# - this handler will be changed after receiving answerPartitionTable conn.answer(Packets.AnswerLastIDs(
# and before handling the next packet loid,
logging.debug('ignoring notifyPartitionChanges during initialization') ltid,
app.pt.getID(),
app.dm.getBackupTID()))
def askPartitionTable(self, conn):
pt = self.app.pt
conn.answer(Packets.AnswerPartitionTable(pt.getID(), pt.getRowList()))
def askLockedTransactions(self, conn):
conn.answer(Packets.AnswerLockedTransactions(
self.app.dm.getUnfinishedTIDDict()))
def validateTransaction(self, conn, ttid, tid):
dm = self.app.dm
dm.lockTransaction(tid, ttid)
dm.unlockTransaction(tid, ttid)
def startOperation(self, conn, backup):
self.app.operational = True
# XXX: see comment in protocol
dm = self.app.dm
if backup:
if dm.getBackupTID():
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm.setBackupTID(tid)
...@@ -67,10 +67,5 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -67,10 +67,5 @@ class MasterOperationHandler(BaseMasterHandler):
self.app.replicator.backup(tid, {p: a and (a, upstream_name) self.app.replicator.backup(tid, {p: a and (a, upstream_name)
for p, a in source_dict.iteritems()}) for p, a in source_dict.iteritems()})
def truncate(self, conn, tid):
self.app.replicator.cancel()
self.app.dm.truncate(tid)
conn.close()
def checkPartition(self, conn, *args): def checkPartition(self, conn, *args):
self.app.checker(*args) self.app.checker(*args)
#
# Copyright (C) 2006-2015 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from . import BaseMasterHandler
from neo.lib import logging
from neo.lib.protocol import Packets, ZERO_TID
from neo.lib.exception import OperationFailure
class VerificationHandler(BaseMasterHandler):
"""This class deals with events for a verification phase."""
def askLastIDs(self, conn):
app = self.app
ltid, _, _, loid = app.dm.getLastIDs()
conn.answer(Packets.AnswerLastIDs(
loid,
ltid,
app.pt.getID(),
app.dm.getBackupTID()))
def askPartitionTable(self, conn):
pt = self.app.pt
conn.answer(Packets.AnswerPartitionTable(pt.getID(), pt.getRowList()))
def notifyPartitionChanges(self, conn, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
if ptid <= app.pt.getID():
# Ignore this packet.
logging.debug('ignoring older partition changes')
return
# update partition table in memory and the database
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
def startOperation(self, conn, backup):
self.app.operational = True
# XXX: see comment in protocol
dm = self.app.dm
if backup:
if dm.getBackupTID():
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm.setBackupTID(tid)
def stopOperation(self, conn):
raise OperationFailure('operation stopped')
def askLockedTransactions(self, conn):
conn.answer(Packets.AnswerLockedTransactions(
self.app.dm.getUnfinishedTIDDict()))
def askFinalTID(self, conn, ttid):
conn.answer(Packets.AnswerFinalTID(self.app.dm.getFinalTID(ttid)))
def validateTransaction(self, conn, ttid, tid):
dm = self.app.dm
dm.lockTransaction(tid, ttid)
dm.unlockTransaction(tid, ttid)
...@@ -76,7 +76,7 @@ class StorageInitializationHandlerTests(NeoUnitTestBase): ...@@ -76,7 +76,7 @@ class StorageInitializationHandlerTests(NeoUnitTestBase):
(2, ((node_2, CellStates.UP_TO_DATE), (node_3, CellStates.UP_TO_DATE)))] (2, ((node_2, CellStates.UP_TO_DATE), (node_3, CellStates.UP_TO_DATE)))]
self.assertFalse(self.app.pt.filled()) self.assertFalse(self.app.pt.filled())
# send a complete new table and ack # send a complete new table and ack
self.verification.answerPartitionTable(conn, 2, row_list) self.verification.sendPartitionTable(conn, 2, row_list)
self.assertTrue(self.app.pt.filled()) self.assertTrue(self.app.pt.filled())
self.assertEqual(self.app.pt.getID(), 2) self.assertEqual(self.app.pt.getID(), 2)
self.assertTrue(list(self.app.dm.getPartitionTable())) self.assertTrue(list(self.app.dm.getPartitionTable()))
......
...@@ -34,7 +34,7 @@ from . import NEOCluster, NEOThreadedTest ...@@ -34,7 +34,7 @@ from . import NEOCluster, NEOThreadedTest
from neo.lib.util import add64, makeChecksum, p64, u64 from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOStorageError from neo.client.exception import NEOStorageError
from neo.client.pool import CELL_CONNECTED, CELL_GOOD from neo.client.pool import CELL_CONNECTED, CELL_GOOD
from neo.storage.handlers.verification import VerificationHandler from neo.storage.handlers.initialization import InitializationHandler
class PCounter(Persistent): class PCounter(Persistent):
value = 0 value = 0
...@@ -1051,8 +1051,9 @@ class Test(NEOThreadedTest): ...@@ -1051,8 +1051,9 @@ class Test(NEOThreadedTest):
p.revert() p.revert()
conn.close() conn.close()
try: try:
with Patch(cluster.master.pt, make=make), Patch(VerificationHandler, with Patch(cluster.master.pt, make=make), \
askPartitionTable=askPartitionTable) as p: Patch(InitializationHandler,
askPartitionTable=askPartitionTable) as p:
cluster.start() cluster.start()
self.assertFalse(p.applied) self.assertFalse(p.applied)
finally: finally:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment