Commit ca2caf87 by Julien Muchembled

Bump protocol version and upgrade storages automatically

2 parents cff279af d3c8b76d
Showing 50 changed files with 852 additions and 1221 deletions
......@@ -58,8 +58,6 @@
committed by future transactions.
- Add a 'devid' storage configuration so that master do not distribute
replicated partitions on storages with same 'devid'.
- Make tpc_finish safer as described in its __doc__: moving work to
tpc_vote and recover from master failure when possible.
Storage
- Use libmysqld instead of a stand-alone MySQL server.
......@@ -143,9 +141,7 @@
Admin
- Make admin node able to monitor multiple clusters simultaneously
- Send notifications (ie: mail) when a storage or master node is lost
- Add ctl command to truncate DB at arbitrary TID. 'Truncate' message
can be reused. There should also be a way to list last transactions,
like fstail for FileStorage.
- Add ctl command to list last transactions, like fstail for FileStorage.
Tests
- Use another mock library: Python 3.3+ has unittest.mock, which is
......
......@@ -65,10 +65,12 @@ class AdminEventHandler(EventHandler):
askLastIDs = forward_ask(Packets.AskLastIDs)
askLastTransaction = forward_ask(Packets.AskLastTransaction)
addPendingNodes = forward_ask(Packets.AddPendingNodes)
askRecovery = forward_ask(Packets.AskRecovery)
tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
setClusterState = forward_ask(Packets.SetClusterState)
setNodeState = forward_ask(Packets.SetNodeState)
checkReplicas = forward_ask(Packets.CheckReplicas)
truncate = forward_ask(Packets.Truncate)
class MasterEventHandler(EventHandler):
......
......@@ -612,18 +612,29 @@ class Application(ThreadedApplication):
packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension),
txn_context['cache_dict'])
add_involved_nodes = txn_context['involved_nodes'].add
queue = txn_context['queue']
trans_nodes = []
for node, conn in self.cp.iterateForObject(ttid):
logging.debug("voting transaction %s on %s", dump(ttid),
dump(conn.getUUID()))
try:
self._askStorage(conn, packet)
conn.ask(packet, queue=queue)
except ConnectionClosed:
continue
add_involved_nodes(node)
trans_nodes.append(node)
# check at least one storage node accepted
if txn_context['involved_nodes']:
if trans_nodes:
involved_nodes = txn_context['involved_nodes']
packet = Packets.AskVoteTransaction(ttid)
for node in involved_nodes.difference(trans_nodes):
conn = self.cp.getConnForNode(node)
if conn is not None:
try:
conn.ask(packet, queue=queue)
except ConnectionClosed:
pass
involved_nodes.update(trans_nodes)
self.waitResponses(queue)
txn_context['voted'] = None
# We must not go further if connection to master was lost since
# tpc_begin, to lower the probability of failing during tpc_finish.
......@@ -667,27 +678,14 @@ class Application(ThreadedApplication):
fail in tpc_finish. In particular, making a transaction permanent
should ideally be as simple as switching a bit permanently.
In NEO, tpc_finish breaks this promise by not ensuring earlier that all
data and metadata are written, and it is for example vulnerable to
ENOSPC errors. In other words, some work should be moved to tpc_vote.
TODO: - In tpc_vote, all involved storage nodes must be asked to write
all metadata to ttrans/tobj and _commit_. AskStoreTransaction
can be extended for this: for nodes that don't store anything
in ttrans, it can just contain the ttid. The final tid is not
known yet, so ttrans/tobj would contain the ttid.
- In tpc_finish, AskLockInformation is still required for read
locking, ttrans.tid must be updated with the final value and
ttrans _committed_.
- The Verification phase would need some change because
ttrans/tobj may contain data for which tpc_finish was not
called. The ttid is also in trans so a mapping ttid<->tid is
always possible and can be forwarded via the master so that all
storage are still able to update the tid column with the final
value when moving rows from tobj to obj.
The resulting cost is:
- additional RPCs in tpc_vote
- 1 updated row in ttrans + commit
In NEO, all the data (with the exception of the tid, simply because
it is not known yet) is already flushed on disk at the end on the vote.
During tpc_finish, all nodes storing the transaction metadata are asked
to commit by saving the new tid and flushing again: for SQL backends,
it's just an UPDATE of 1 cell. At last, the metadata is moved to
a final place so that the new transaction is readable, but this is
something that can always be replayed (during the verification phase)
if any failure happens.
TODO: We should recover from master failures when the transaction got
successfully committed. More precisely, we should not raise:
......
......@@ -102,11 +102,17 @@ class PrimaryNotificationsHandler(MTEventHandler):
if app.master_conn is None:
app._cache_lock_acquire()
try:
oid_list = app._cache.clear_current()
db = app.getDB()
if db is not None:
db.invalidate(app.last_tid and
add64(app.last_tid, 1), oid_list)
if app.last_tid < ltid:
oid_list = app._cache.clear_current()
db is None or db.invalidate(
app.last_tid and add64(app.last_tid, 1),
oid_list)
else:
# The DB was truncated. It happens so
# rarely that we don't need to optimize.
app._cache.clear()
db is None or db.invalidateCache()
finally:
app._cache_lock_release()
app.last_tid = ltid
......
......@@ -112,9 +112,11 @@ class StorageAnswersHandler(AnswerBaseHandler):
answerCheckCurrentSerial = answerStoreObject
def answerStoreTransaction(self, conn, _):
def answerStoreTransaction(self, conn):
pass
answerVoteTransaction = answerStoreTransaction
def answerTIDsFrom(self, conn, tid_list):
logging.debug('Get %u TIDs from %r', len(tid_list), conn)
self.app.setHandlerData(tid_list)
......
......@@ -41,9 +41,6 @@ class BootstrapManager(EventHandler):
self.num_partitions = None
self.current = None
def notifyNodeInformation(self, conn, node_list):
pass
def announcePrimary(self, conn):
# We found the primary master early enough to be notified of election
# end. Lucky. Anyway, we must carry on with identification request, so
......
......@@ -23,7 +23,7 @@ class ElectionFailure(NeoException):
class PrimaryFailure(NeoException):
pass
class OperationFailure(NeoException):
class StoppedOperation(NeoException):
pass
class DatabaseFailure(NeoException):
......
......@@ -20,7 +20,7 @@ import traceback
from cStringIO import StringIO
from struct import Struct
PROTOCOL_VERSION = 4
PROTOCOL_VERSION = 5
# Size restrictions.
MIN_PACKET_SIZE = 10
......@@ -722,16 +722,24 @@ class ReelectPrimary(Packet):
Force a re-election of a primary master node. M -> M.
"""
class Recovery(Packet):
"""
Ask all data needed by master to recover. PM -> S, S -> PM.
"""
_answer = PStruct('answer_recovery',
PPTID('ptid'),
PTID('backup_tid'),
PTID('truncate_tid'),
)
class LastIDs(Packet):
"""
Ask the last OID, the last TID and the last Partition Table ID so that
a master recover. PM -> S, S -> PM.
Ask the last OID/TID so that a master can initialize its TransactionManager.
PM -> S, S -> PM.
"""
_answer = PStruct('answer_last_ids',
POID('last_oid'),
PTID('last_tid'),
PPTID('last_ptid'),
PTID('backup_tid'),
)
class PartitionTable(Packet):
......@@ -775,6 +783,8 @@ class StartOperation(Packet):
this message, it must not serve client nodes. PM -> S.
"""
_fmt = PStruct('start_operation',
# XXX: Is this boolean needed ? Maybe this
# can be deduced from cluster state.
PBoolean('backup'),
)
......@@ -786,8 +796,8 @@ class StopOperation(Packet):
class UnfinishedTransactions(Packet):
"""
Ask unfinished transactions PM -> S.
Answer unfinished transactions S -> PM.
Ask unfinished transactions S -> PM.
Answer unfinished transactions PM -> S.
"""
_answer = PStruct('answer_unfinished_transactions',
PTID('max_tid'),
......@@ -796,36 +806,36 @@ class UnfinishedTransactions(Packet):
),
)
class ObjectPresent(Packet):
class LockedTransactions(Packet):
"""
Ask if an object is present. If not present, OID_NOT_FOUND should be
returned. PM -> S.
Answer that an object is present. PM -> S.
Ask locked transactions PM -> S.
Answer locked transactions S -> PM.
"""
_fmt = PStruct('object_present',
POID('oid'),
PTID('tid'),
)
_answer = PStruct('object_present',
POID('oid'),
PTID('tid'),
_answer = PStruct('answer_locked_transactions',
PDict('tid_dict',
PTID('ttid'),
PTID('tid'),
),
)
class DeleteTransaction(Packet):
class FinalTID(Packet):
"""
Delete a transaction. PM -> S.
Return final tid if ttid has been committed. * -> S.
"""
_fmt = PStruct('delete_transaction',
_fmt = PStruct('final_tid',
PTID('ttid'),
)
_answer = PStruct('final_tid',
PTID('tid'),
PFOidList,
)
class CommitTransaction(Packet):
class ValidateTransaction(Packet):
"""
Commit a transaction. PM -> S.
"""
_fmt = PStruct('commit_transaction',
_fmt = PStruct('validate_transaction',
PTID('ttid'),
PTID('tid'),
)
......@@ -878,11 +888,10 @@ class LockInformation(Packet):
_fmt = PStruct('ask_lock_informations',
PTID('ttid'),
PTID('tid'),
PFOidList,
)
_answer = PStruct('answer_information_locked',
PTID('tid'),
PTID('ttid'),
)
class InvalidateObjects(Packet):
......@@ -899,7 +908,7 @@ class UnlockInformation(Packet):
Unlock information on a transaction. PM -> S.
"""
_fmt = PStruct('notify_unlock_information',
PTID('tid'),
PTID('ttid'),
)
class GenerateOIDs(Packet):
......@@ -961,10 +970,17 @@ class StoreTransaction(Packet):
PString('extension'),
PFOidList,
)
_answer = PFEmpty
_answer = PStruct('answer_store_transaction',
class VoteTransaction(Packet):
"""
Ask to store a transaction. C -> S.
Answer if transaction has been stored. S -> C.
"""
_fmt = PStruct('ask_vote_transaction',
PTID('tid'),
)
_answer = PFEmpty
class GetObject(Packet):
"""
......@@ -1462,13 +1478,14 @@ class ReplicationDone(Packet):
class Truncate(Packet):
"""
XXX: Used for both make storage consistent and leave backup mode
M -> S
Request DB to be truncated. Also used to leave backup mode.
"""
_fmt = PStruct('truncate',
PTID('tid'),
)
_answer = Error
StaticRegistry = {}
def register(request, ignore_when_closed=None):
......@@ -1586,6 +1603,8 @@ class Packets(dict):
ReelectPrimary)
NotifyNodeInformation = register(
NotifyNodeInformation)
AskRecovery, AnswerRecovery = register(
Recovery)
AskLastIDs, AnswerLastIDs = register(
LastIDs)
AskPartitionTable, AnswerPartitionTable = register(
......@@ -1600,12 +1619,12 @@ class Packets(dict):
StopOperation)
AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
UnfinishedTransactions)
AskObjectPresent, AnswerObjectPresent = register(
ObjectPresent)
DeleteTransaction = register(
DeleteTransaction)
CommitTransaction = register(
CommitTransaction)
AskLockedTransactions, AnswerLockedTransactions = register(
LockedTransactions)
AskFinalTID, AnswerFinalTID = register(
FinalTID)
ValidateTransaction = register(
ValidateTransaction)
AskBeginTransaction, AnswerBeginTransaction = register(
BeginTransaction)
AskFinishTransaction, AnswerTransactionFinished = register(
......@@ -1624,6 +1643,8 @@ class Packets(dict):
AbortTransaction)
AskStoreTransaction, AnswerStoreTransaction = register(
StoreTransaction)
AskVoteTransaction, AnswerVoteTransaction = register(
VoteTransaction)
AskObject, AnswerObject = register(
GetObject)
AskTIDs, AnswerTIDs = register(
......
......@@ -24,7 +24,7 @@ from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
from neo.lib.handler import EventHandler
from neo.lib.connection import ListeningConnection, ClientConnection
from neo.lib.exception import ElectionFailure, PrimaryFailure, OperationFailure
from neo.lib.exception import ElectionFailure, PrimaryFailure, StoppedOperation
class StateChangedException(Exception): pass
......@@ -45,6 +45,7 @@ class Application(BaseApplication):
backup_tid = None
backup_app = None
uuid = None
truncate_tid = None
def __init__(self, config):
super(Application, self).__init__(
......@@ -77,7 +78,6 @@ class Application(BaseApplication):
self.primary = None
self.primary_master_node = None
self.cluster_state = None
self._startup_allowed = False
uuid = config.getUUID()
if uuid:
......@@ -221,7 +221,7 @@ class Application(BaseApplication):
self.primary = self.primary is None
break
def broadcastNodesInformation(self, node_list):
def broadcastNodesInformation(self, node_list, exclude=None):
"""
Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth
......@@ -243,7 +243,7 @@ class Application(BaseApplication):
# send at most one non-empty notification packet per node
for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType())
if node_list and node.isRunning():
if node_list and node.isRunning() and node is not exclude:
node.notify(Packets.NotifyNodeInformation(node_list))
def broadcastPartitionChanges(self, cell_list):
......@@ -254,7 +254,6 @@ class Application(BaseApplication):
ptid = self.pt.setNextID()
packet = Packets.NotifyPartitionChanges(ptid, cell_list)
for node in self.nm.getIdentifiedList():
# TODO: notify masters
if node.isRunning() and not node.isMaster():
node.notify(packet)
......@@ -266,8 +265,6 @@ class Application(BaseApplication):
"""
logging.info('provide service')
poll = self.em.poll
self.tm.reset()
self.changeClusterState(ClusterStates.RUNNING)
# Now everything is passive.
......@@ -278,8 +275,13 @@ class Application(BaseApplication):
if e.args[0] != ClusterStates.STARTING_BACKUP:
raise
self.backup_tid = tid = self.getLastTransaction()
self.pt.setBackupTidDict({node.getUUID(): tid
for node in self.nm.getStorageList(only_identified=True)})
packet = Packets.StartOperation(True)
tid_dict = {}
for node in self.nm.getStorageList(only_identified=True):
tid_dict[node.getUUID()] = tid
if node.isRunning():
node.notify(packet)
self.pt.setBackupTidDict(tid_dict)
def playPrimaryRole(self):
logging.info('play the primary role with %r', self.listening_conn)
......@@ -323,30 +325,46 @@ class Application(BaseApplication):
in_conflict)
in_conflict.setUUID(None)
# recover the cluster status at startup
# Do not restart automatically if ElectionFailure is raised, in order
# to avoid a split of the database. For example, with 2 machines with
# a master and a storage on each one and replicas=1, the secondary
# master becomes primary in case of network failure between the 2
# machines but must not start automatically: otherwise, each storage
# node would diverge.
self._startup_allowed = False
try:
self.runManager(RecoveryManager)
while True:
self.runManager(VerificationManager)
self.runManager(RecoveryManager)
try:
if self.backup_tid:
if self.backup_app is None:
raise RuntimeError("No upstream cluster to backup"
" defined in configuration")
self.backup_app.provideService()
# Reset connection with storages (and go through a
# recovery phase) when leaving backup mode in order
# to get correct last oid/tid.
self.runManager(RecoveryManager)
continue
self.provideService()
except OperationFailure:
self.runManager(VerificationManager)
if not self.backup_tid:
self.provideService()
# self.provideService only returns without raising
# when switching to backup mode.
if self.backup_app is None:
raise RuntimeError("No upstream cluster to backup"
" defined in configuration")
truncate = Packets.Truncate(
self.backup_app.provideService())
except StoppedOperation, e:
logging.critical('No longer operational')
truncate = Packets.Truncate(*e.args) if e.args else None
# Automatic restart except if we truncate or retry to.
self._startup_allowed = not (self.truncate_tid or truncate)
node_list = []
for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient():
node.notify(Packets.StopOperation())
conn = node.getConnection()
conn.notify(Packets.StopOperation())
if node.isClient():
node.getConnection().abort()
conn.abort()
continue
if truncate:
conn.notify(truncate)
if node.isRunning():
node.setPending()
node_list.append(node)
self.broadcastNodesInformation(node_list)
except StateChangedException, e:
assert e.args[0] == ClusterStates.STOPPING
self.shutdown()
......@@ -427,7 +445,7 @@ class Application(BaseApplication):
continue # keep handler
if type(handler) is not type(conn.getLastHandler()):
conn.setHandler(handler)
handler.connectionCompleted(conn)
handler.connectionCompleted(conn, new=False)
self.cluster_state = state
def getNewUUID(self, uuid, address, node_type):
......@@ -461,7 +479,7 @@ class Application(BaseApplication):
# wait for all transaction to be finished
while self.tm.hasPending():
self.em.poll(1)
except OperationFailure:
except StoppedOperation:
logging.critical('No longer operational')
logging.info("asking remaining nodes to shutdown")
......
......@@ -152,24 +152,20 @@ class BackupApplication(object):
assert tid != ZERO_TID
logging.warning("Truncating at %s (last_tid was %s)",
dump(app.backup_tid), dump(last_tid))
# XXX: We want to go through a recovery phase in order to
# initialize the transaction manager, but this is only
# possible if storages already know that we left backup
# mode. To that purpose, we always send a Truncate packet,
# even if there's nothing to truncate.
p = Packets.Truncate(tid)
for node in app.nm.getStorageList(only_identified=True):
conn = node.getConnection()
conn.setHandler(handler)
node.setState(NodeStates.TEMPORARILY_DOWN)
# Packets will be sent at the beginning of the recovery
# phase.
conn.notify(p)
conn.abort()
else:
# We will do a dummy truncation, just to leave backup mode,
# so it's fine to start automatically if there's any
# missing storage.
# XXX: Consider using another method to leave backup mode,
# at least when there's nothing to truncate. Because
# in case of StoppedOperation during VERIFYING state,
# this flag will be wrongly set to False.
app._startup_allowed = True
# If any error happened before reaching this line, we'd go back
# to backup mode, which is the right mode to recover.
del app.backup_tid
break
# Now back to RECOVERY...
return tid
finally:
del self.primary_partition_dict, self.tid_list
pt.clearReplicating()
......
......@@ -15,6 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from neo.lib.handler import EventHandler
from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets,
BrokenNodeDisallowedError,
......@@ -23,6 +24,10 @@ from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets,
class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def connectionCompleted(self, conn, new=None):
if new is None:
super(MasterHandler, self).connectionCompleted(conn)
def requestIdentification(self, conn, node_type, uuid, address, name):
self.checkClusterName(name)
app = self.app
......@@ -61,25 +66,31 @@ class MasterHandler(EventHandler):
state = self.app.getClusterState()
conn.answer(Packets.AnswerClusterState(state))
def askLastIDs(self, conn):
def askRecovery(self, conn):
app = self.app
conn.answer(Packets.AnswerLastIDs(
app.tm.getLastOID(),
app.tm.getLastTID(),
conn.answer(Packets.AnswerRecovery(
app.pt.getID(),
app.backup_tid))
app.backup_tid and app.pt.getBackupTid(),
app.truncate_tid))
def askLastIDs(self, conn):
tm = self.app.tm
conn.answer(Packets.AnswerLastIDs(tm.getLastOID(), tm.getLastTID()))
def askLastTransaction(self, conn):
conn.answer(Packets.AnswerLastTransaction(
self.app.getLastTransaction()))
def askNodeInformation(self, conn):
def _notifyNodeInformation(self, conn):
nm = self.app.nm
node_list = []
node_list.extend(n.asTuple() for n in nm.getMasterList())
node_list.extend(n.asTuple() for n in nm.getClientList())
node_list.extend(n.asTuple() for n in nm.getStorageList())
conn.notify(Packets.NotifyNodeInformation(node_list))
def askNodeInformation(self, conn):
self._notifyNodeInformation(conn)
conn.answer(Packets.AnswerNodeInformation())
def askPartitionTable(self, conn):
......@@ -94,15 +105,18 @@ DISCONNECTED_STATE_DICT = {
class BaseServiceHandler(MasterHandler):
"""This class deals with events for a service phase."""
def nodeLost(self, conn, node):
# This method provides a hook point overridable by service classes.
# It is triggered when a connection to a node gets lost.
pass
def connectionCompleted(self, conn, new):
self._notifyNodeInformation(conn)
pt = self.app.pt
conn.notify(Packets.SendPartitionTable(pt.getID(), pt.getRowList()))
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
app = self.app
node = app.nm.getByUUID(conn.getUUID())
if node is None:
return # for example, when a storage is removed by an admin
assert node.isStorage(), node
logging.info('storage node lost')
if new_state != NodeStates.BROKEN:
new_state = DISCONNECTED_STATE_DICT.get(node.getType(),
NodeStates.DOWN)
......@@ -117,10 +131,13 @@ class BaseServiceHandler(MasterHandler):
# was in pending state, so drop it from the node manager to forget
# it and do not set in running state when it comes back
logging.info('drop a pending node from the node manager')
self.app.nm.remove(node)
self.app.broadcastNodesInformation([node])
# clean node related data in specialized handlers
self.nodeLost(conn, node)
app.nm.remove(node)
app.broadcastNodesInformation([node])
if app.truncate_tid:
raise StoppedOperation
app.broadcastPartitionChanges(app.pt.outdate(node))
if not app.pt.operational():
raise StoppedOperation
def notifyReady(self, conn):
self.app.setStorageReady(conn.getUUID())
......
......@@ -19,6 +19,7 @@ import random
from . import MasterHandler
from ..app import StateChangedException
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from neo.lib.pt import PartitionTableException
from neo.lib.protocol import ClusterStates, Errors, \
NodeStates, NodeTypes, Packets, ProtocolError, uuid_str
......@@ -159,6 +160,13 @@ class AdministrationHandler(MasterHandler):
map(app.nm.getByUUID, uuid_list)))
conn.answer(Errors.Ack(''))
def truncate(self, conn, tid):
app = self.app
if app.cluster_state != ClusterStates.RUNNING:
raise ProtocolError('Can not truncate in this state')
conn.answer(Errors.Ack(''))
raise StoppedOperation(tid)
def checkReplicas(self, conn, partition_dict, min_tid, max_tid):
app = self.app
pt = app.pt
......
......@@ -20,9 +20,6 @@ from . import MasterHandler
class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """
def connectionCompleted(self, conn):
pass
def connectionLost(self, conn, new_state):
# cancel its transactions and forgot the node
app = self.app
......@@ -59,9 +56,10 @@ class ClientServiceHandler(MasterHandler):
pt = app.pt
# Collect partitions related to this transaction.
lock_oid_list = oid_list + checked_list
partition_set = set(map(pt.getPartition, lock_oid_list))
partition_set.add(pt.getPartition(ttid))
getPartition = pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.update(map(getPartition, checked_list))
partition_set.add(getPartition(ttid))
# Collect the UUIDs of nodes related to this transaction.
uuid_list = filter(app.isStorageReady, {cell.getUUID()
......@@ -85,7 +83,6 @@ class ClientServiceHandler(MasterHandler):
{x.getUUID() for x in identified_node_list},
conn.getPeerId(),
),
lock_oid_list,
)
for node in identified_node_list:
node.ask(p, timeout=60)
......
......@@ -26,7 +26,7 @@ class IdentificationHandler(MasterHandler):
**kw)
handler = conn.getHandler()
assert not isinstance(handler, IdentificationHandler), handler
handler.connectionCompleted(conn)
handler.connectionCompleted(conn, True)
def _setupNode(self, conn, node_type, uuid, address, node):
app = self.app
......@@ -72,7 +72,7 @@ class IdentificationHandler(MasterHandler):
node.setState(state)
node.setConnection(conn)
conn.setHandler(handler)
app.broadcastNodesInformation([node])
app.broadcastNodesInformation([node], node)
return uuid
class SecondaryIdentificationHandler(MasterHandler):
......
......@@ -16,7 +16,7 @@
from neo.lib import logging
from neo.lib.protocol import CellStates, ClusterStates, Packets, ProtocolError
from neo.lib.exception import OperationFailure
from neo.lib.exception import StoppedOperation
from neo.lib.pt import PartitionTableException
from . import BaseServiceHandler
......@@ -24,25 +24,27 @@ from . import BaseServiceHandler
class StorageServiceHandler(BaseServiceHandler):
""" Handler dedicated to storages during service state """
def connectionCompleted(self, conn):
# TODO: unit test
def connectionCompleted(self, conn, new):
app = self.app
uuid = conn.getUUID()
node = app.nm.getByUUID(uuid)
app.setStorageNotReady(uuid)
if new:
super(StorageServiceHandler, self).connectionCompleted(conn, new)
# XXX: what other values could happen ?
if node.isRunning():
conn.notify(Packets.StartOperation(bool(app.backup_tid)))
def nodeLost(self, conn, node):
logging.info('storage node lost')
assert not node.isRunning(), node.getState()
def connectionLost(self, conn, new_state):
app = self.app
app.broadcastPartitionChanges(app.pt.outdate(node))
if not app.pt.operational():
raise OperationFailure, 'cannot continue operation'
node = app.nm.getByUUID(conn.getUUID())
super(StorageServiceHandler, self).connectionLost(conn, new_state)
app.tm.forget(conn.getUUID())
if app.getClusterState() == ClusterStates.BACKINGUP:
if (app.getClusterState() == ClusterStates.BACKINGUP
# Also check if we're exiting, because backup_app is not usable
# in this case. Maybe cluster state should be set to something
# else, like STOPPING, during cleanup (__del__/close).
and app.listening_conn):
app.backup_app.nodeLost(node)
if app.packing is not None:
self.answerPack(conn, False)
......@@ -74,7 +76,7 @@ class StorageServiceHandler(BaseServiceHandler):
CellStates.CORRUPTED))
self.app.broadcastPartitionChanges(change_list)
if not self.app.pt.operational():
raise OperationFailure('cannot continue operation')
raise StoppedOperation
def notifyReplicationDone(self, conn, offset, tid):
app = self.app
......
......@@ -299,15 +299,19 @@ class PartitionTable(neo.lib.pt.PartitionTable):
yield offset, cell
break
def getReadableCellNodeSet(self):
def getOperationalNodeSet(self):
"""
Return a set of all nodes which are part of at least one UP TO DATE
partition.
partition. An empty list is returned if these nodes aren't enough to
become operational.
"""
return {cell.getNode()
for row in self.partition_list
for cell in row
if cell.isReadable()}
node_set = set()
for row in self.partition_list:
if not any(cell.isReadable() and cell.getNode().isPending()
for cell in row):
return () # not operational
node_set.update(cell.getNode() for cell in row if cell.isReadable())
return node_set
def clearReplicating(self):
for row in self.partition_list:
......
......@@ -15,9 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from neo.lib import logging
from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates
from neo.lib.protocol import ZERO_OID
from .handlers import MasterHandler
......@@ -29,7 +27,9 @@ class RecoveryManager(MasterHandler):
def __init__(self, app):
# The target node's uuid to request next.
self.target_ptid = None
self.ask_pt = []
self.backup_tid_dict = {}
self.truncate_dict = {}
def getHandler(self):
return self
......@@ -51,7 +51,7 @@ class RecoveryManager(MasterHandler):
app = self.app
pt = app.pt
app.changeClusterState(ClusterStates.RECOVERING)
pt.setID(None)
pt.clear()
# collect the last partition table available
poll = app.em.poll
......@@ -60,11 +60,15 @@ class RecoveryManager(MasterHandler):
if pt.filled():
# A partition table exists, we are starting an existing
# cluster.
node_list = pt.getReadableCellNodeSet()
node_list = pt.getOperationalNodeSet()
if app._startup_allowed:
node_list = [node for node in node_list if node.isPending()]
elif not all(node.isPending() for node in node_list):
continue
elif node_list:
# we want all nodes to be there if we're going to truncate
if app.truncate_tid:
node_list = pt.getNodeSet()
if not all(node.isPending() for node in node_list):
continue
elif app._startup_allowed or app.autostart:
# No partition table and admin allowed startup, we are
# creating a new cluster out of all pending nodes.
......@@ -76,6 +80,17 @@ class RecoveryManager(MasterHandler):
if node_list and not any(node.getConnection().isPending()
for node in node_list):
if pt.filled():
if app.truncate_tid:
node_list = app.nm.getIdentifiedList(pool_set={uuid
for uuid, tid in self.truncate_dict.iteritems()
if not tid or app.truncate_tid < tid})
if node_list:
truncate = Packets.Truncate(app.truncate_tid)
for node in node_list:
conn = node.getConnection()
conn.notify(truncate)
self.connectionCompleted(conn, False)
continue
node_list = pt.getConnectedNodeList()
break
......@@ -88,64 +103,81 @@ class RecoveryManager(MasterHandler):