pax_global_header 0000666 0000000 0000000 00000000064 14577262335 0014530 g ustar 00root root 0000000 0000000 52 comment=a0280bec9681c5168eef0d6a48d430b9c19a86b2
neoppod-master-neo-master-handlers/ 0000775 0000000 0000000 00000000000 14577262335 0017677 5 ustar 00root root 0000000 0000000 neoppod-master-neo-master-handlers/neo/ 0000775 0000000 0000000 00000000000 14577262335 0020460 5 ustar 00root root 0000000 0000000 neoppod-master-neo-master-handlers/neo/master/ 0000775 0000000 0000000 00000000000 14577262335 0021753 5 ustar 00root root 0000000 0000000 neoppod-master-neo-master-handlers/neo/master/handlers/ 0000775 0000000 0000000 00000000000 14577262335 0023553 5 ustar 00root root 0000000 0000000 neoppod-master-neo-master-handlers/neo/master/handlers/__init__.py 0000664 0000000 0000000 00000010174 14577262335 0025667 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from ..app import monotonic_time
from ..pack import RequestOld
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from neo.lib.handler import EventHandler
from neo.lib.protocol import Packets, ZERO_TID
class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def connectionLost(self, conn, new_state=None):
if self.app.listening_conn: # if running
self._connectionLost(conn)
def askClusterState(self, conn):
state = self.app.getClusterState()
conn.answer(Packets.AnswerClusterState(state))
def askRecovery(self, conn):
app = self.app
conn.answer(Packets.AnswerRecovery(
app.pt.getID(),
app.backup_tid and app.pt.getBackupTid(),
app.truncate_tid))
def askLastIDs(self, conn):
tm = self.app.tm
conn.answer(Packets.AnswerLastIDs(tm.getLastTID(), tm.getLastOID()))
def askLastTransaction(self, conn):
conn.answer(Packets.AnswerLastTransaction(
self.app.getLastTransaction()))
def _askPackOrders(self, conn, pack_id, only_first_approved):
app = self.app
if pack_id is not None is not app.pm.max_completed >= pack_id:
RequestOld(app, pack_id, only_first_approved,
conn.delayedAnswer(Packets.AnswerPackOrders))
else:
conn.answer(Packets.AnswerPackOrders(
app.pm.dump(pack_id or ZERO_TID, only_first_approved)))
def _notifyNodeInformation(self, conn):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
node_list = app.nm.getList()
node_list.remove(node)
node_list = ([node.asTuple()] # for id_timestamp
+ app.getNodeInformationGetter(node_list)(node))
conn.send(Packets.NotifyNodeInformation(monotonic_time(), node_list))
def handlerSwitched(self, conn, new):
pt = self.app.pt
# Except storages during recovery and secondary masters, all nodes
# receives the full partition table as soon as they're identified.
# It is also sent in 2 other cases:
# - to admins during recovery, whenever a newer PT is loaded;
# - to storage when switching from recovery to verification.
# After that, non-master nodes only receive incremental updates.
conn.send(Packets.SendPartitionTable(
pt.getID(), pt.getReplicas(), pt.getRowList()))
class BaseServiceHandler(MasterHandler):
"""Common handler class for storage nodes."""
def connectionLost(self, conn, new_state):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
if node is None:
return # for example, when a storage is removed by an admin
assert node.isStorage(), node
logging.info('storage node lost')
if node.isPending():
# was in pending state, so drop it from the node manager to forget
# it and do not set in running state when it comes back
logging.info('drop a pending node from the node manager')
node.setUnknown()
elif node.isDown():
# Already put in DOWN state by AdministrationHandler.setNodeState
return
else:
node.setDown()
app.broadcastNodesInformation([node])
if app.truncate_tid:
raise StoppedOperation
app.broadcastPartitionChanges(app.pt.outdate(node))
if not app.pt.operational():
raise StoppedOperation
neoppod-master-neo-master-handlers/neo/master/handlers/administration.py 0000664 0000000 0000000 00000027736 14577262335 0027171 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import random
from functools import wraps
from . import MasterHandler
from ..app import monotonic_time, StateChangedException
from neo.lib import logging
from neo.lib.exception import StoppedOperation
from neo.lib.handler import AnswerDenied
from neo.lib.pt import PartitionTableException
from neo.lib.protocol import ClusterStates, Errors, \
NodeStates, NodeTypes, Packets, uuid_str
from neo.lib.util import add64, dump
CLUSTER_STATE_WORKFLOW = {
# destination: sources
ClusterStates.VERIFYING: (ClusterStates.RECOVERING,),
ClusterStates.STARTING_BACKUP: (ClusterStates.RUNNING,
ClusterStates.STOPPING_BACKUP),
ClusterStates.STOPPING_BACKUP: (ClusterStates.BACKINGUP,
ClusterStates.STARTING_BACKUP),
}
NODE_STATE_WORKFLOW = {
NodeTypes.MASTER: (NodeStates.DOWN,),
NodeTypes.STORAGE: (NodeStates.DOWN, NodeStates.UNKNOWN),
}
def check_state(*states):
def decorator(wrapped):
def wrapper(self, *args):
state = self.app.getClusterState()
if state not in states:
raise AnswerDenied('%s RPC can not be used in %s state'
% (wrapped.__name__, state))
wrapped(self, *args)
return wraps(wrapped)(wrapper)
return decorator
class AdministrationHandler(MasterHandler):
"""This class deals with messages from the admin node only"""
def handlerSwitched(self, conn, new):
assert new
super(AdministrationHandler, self).handlerSwitched(conn, new)
app = self.app.backup_app
if app is not None:
for node in app.nm.getAdminList():
if node.isRunning():
app.notifyUpstreamAdmin(node.getAddress())
break
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
if node is not None:
self.app.nm.remove(node)
def flushLog(self, conn):
p = Packets.FlushLog()
for node in self.app.nm.getConnectedList():
c = node.getConnection()
c is conn or c.send(p)
super(AdministrationHandler, self).flushLog(conn)
def setClusterState(self, conn, state):
app = self.app
# check request
try:
if app.cluster_state not in CLUSTER_STATE_WORKFLOW[state]:
raise AnswerDenied('Can not switch to this state')
except KeyError:
if state != ClusterStates.STOPPING:
raise AnswerDenied('Invalid state requested')
# change state
if state == ClusterStates.VERIFYING:
storage_list = app.nm.getStorageList(only_identified=True)
if not storage_list:
raise AnswerDenied(
'Cannot exit recovery without any storage node')
for node in storage_list:
assert node.isPending(), node
if node.getConnection().isPending():
raise AnswerDenied(
'Cannot exit recovery now: node %r is entering cluster'
% node,)
app._startup_allowed = True
state = app.cluster_state
elif state == ClusterStates.STARTING_BACKUP:
if app.tm.hasPending() or app.nm.getClientList(True):
raise AnswerDenied("Can not switch to %s state with pending"
" transactions or connected clients" % state)
if app.backup_app is None:
raise AnswerDenied(app.no_upstream_msg)
conn.answer(Errors.Ack('Cluster state changed'))
if state != app.cluster_state:
raise StateChangedException(state)
def setNodeState(self, conn, uuid, state):
logging.info("set node state for %s: %s", uuid_str(uuid), state)
app = self.app
node = app.nm.getByUUID(uuid)
if node is None:
raise AnswerDenied('unknown node')
if state not in NODE_STATE_WORKFLOW.get(node.getType(), ()):
raise AnswerDenied('can not switch node to %s state' % state)
if uuid == app.uuid:
raise AnswerDenied('can not kill primary master node')
state_changed = state != node.getState()
message = ('state changed' if state_changed else
'node already in %s state' % state)
if node.isStorage():
keep = state == NodeStates.DOWN
if node.isRunning() and not keep:
raise AnswerDenied(
"a running node must be stopped before removal")
try:
cell_list = app.pt.dropNodeList([node], keep)
except PartitionTableException, e:
raise AnswerDenied(str(e))
node.setState(state)
if node.isConnected():
# notify itself so it can shutdown
node.send(Packets.NotifyNodeInformation(
monotonic_time(), [node.asTuple()]))
# close to avoid handle the closure as a connection lost
node.getConnection().abort()
if keep:
cell_list = app.pt.outdate()
elif cell_list:
message = 'node permanently removed'
app.broadcastPartitionChanges(cell_list)
else:
node.setState(state)
# /!\ send the node information *after* the partition table change
conn.answer(Errors.Ack(message))
if state_changed:
# notify node explicitly because broadcastNodesInformation()
# ignores non-running nodes
assert not node.isRunning()
if node.isConnected():
node.send(Packets.NotifyNodeInformation(
monotonic_time(), [node.asTuple()]))
app.broadcastNodesInformation([node])
# XXX: Would it be safe to allow more states ?
__change_pt_rpc = check_state(
ClusterStates.RUNNING,
ClusterStates.STARTING_BACKUP,
ClusterStates.BACKINGUP)
@__change_pt_rpc
def addPendingNodes(self, conn, uuid_list):
uuids = ', '.join(map(uuid_str, uuid_list))
logging.debug('Add nodes %s', uuids)
app = self.app
# take all pending nodes
node_list = list(app.pt.addNodeList(node
for node in app.nm.getStorageList()
if node.isPending() and node.getUUID() in uuid_list))
if node_list:
for node in node_list:
node.setRunning()
app.startStorage(node)
app.broadcastNodesInformation(node_list)
conn.answer(Errors.Ack('Nodes added: %s' %
', '.join(uuid_str(x.getUUID()) for x in node_list)))
else:
logging.warning('No node added')
conn.answer(Errors.Ack('No node added'))
def repair(self, conn, uuid_list, *args):
getByUUID = self.app.nm.getByUUID
node_list = []
for uuid in uuid_list:
node = getByUUID(uuid)
if node is None or not (node.isStorage() and node.isIdentified()):
raise AnswerDenied("invalid storage node %s" % uuid_str(uuid))
node_list.append(node)
repair = Packets.NotifyRepair(*args)
for node in node_list:
node.send(repair)
conn.answer(Errors.Ack(''))
@__change_pt_rpc
def setNumReplicas(self, conn, num_replicas):
self.app.broadcastPartitionChanges((), num_replicas)
conn.answer(Errors.Ack(''))
@__change_pt_rpc
def tweakPartitionTable(self, conn, dry_run, uuid_list):
app = self.app
drop_list = []
for node in app.nm.getStorageList():
if node.getUUID() in uuid_list or node.isPending():
drop_list.append(node)
elif not node.isRunning():
raise AnswerDenied(
'tweak: down nodes must be listed explicitly')
if dry_run:
pt = object.__new__(app.pt.__class__)
new_nodes = pt.load(app.pt.getID(), app.pt.getReplicas(),
app.pt.getRowList(), app.nm)
assert not new_nodes
pt.addNodeList(node
for node, count in app.pt.count_dict.iteritems()
if not count)
else:
pt = app.pt
try:
changed_list = pt.tweak(drop_list)
except PartitionTableException, e:
raise AnswerDenied(str(e))
if not dry_run:
app.broadcastPartitionChanges(changed_list)
conn.answer(Packets.AnswerTweakPartitionTable(
bool(changed_list), pt.getRowList()))
@check_state(ClusterStates.RUNNING)
def truncate(self, conn, tid):
app = self.app
if app.getLastTransaction() <= tid:
raise AnswerDenied("Truncating after last transaction does nothing")
if app.pm.getApprovedRejected(add64(tid, 1))[0]:
# TODO: The protocol must be extended to support safe cases
# (e.g. no started pack whose id is after truncation tid).
# The user may also accept having a truncated DB with missing
# records (i.e. have an option to force that).
raise AnswerDenied("Can not truncate before an approved pack")
conn.answer(Errors.Ack(''))
raise StoppedOperation(tid)
def checkReplicas(self, conn, partition_dict, min_tid, max_tid):
app = self.app
pt = app.pt
backingup = bool(app.backup_tid)
if not max_tid:
max_tid = pt.getCheckTid(partition_dict) if backingup else \
app.getLastTransaction()
if min_tid > max_tid:
logging.warning("nothing to check: min_tid=%s > max_tid=%s",
dump(min_tid), dump(max_tid))
else:
getByUUID = app.nm.getByUUID
node_set = set()
for offset, source in partition_dict.iteritems():
# XXX: For the moment, code checking replicas is unable to fix
# corrupted partitions (when a good cell is known)
# so only check readable ones.
# (see also Checker._nextPartition of storage)
cell_list = pt.getCellList(offset, True)
#cell_list = [cell for cell in pt.getCellList(offset)
# if not cell.isOutOfDate()]
if len(cell_list) + (backingup and not source) <= 1:
continue
for cell in cell_list:
node = cell.getNode()
if node in node_set:
break
else:
node_set.add(node)
if source:
source = '', getByUUID(source).getAddress()
else:
readable = [cell for cell in cell_list if cell.isReadable()]
if 1 == len(readable) < len(cell_list):
source = '', readable[0].getAddress()
elif backingup:
source = app.backup_app.name, random.choice(
app.backup_app.pt.getCellList(offset, readable=True)
).getAddress()
else:
source = '', None
node.send(Packets.CheckPartition(
offset, source, min_tid, max_tid))
conn.answer(Errors.Ack(''))
del __change_pt_rpc
neoppod-master-neo-master-handlers/neo/master/handlers/backup.py 0000664 0000000 0000000 00000011462 14577262335 0025376 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2012-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib.exception import PrimaryFailure
from neo.lib.handler import EventHandler
from neo.lib.protocol import NodeTypes, NodeStates, Packets
from neo.lib.pt import PartitionTable
class BackupHandler(EventHandler):
"""Handler dedicated to upstream master during BACKINGUP state"""
def connectionLost(self, conn, new_state):
if self.app.app.listening_conn: # if running
raise PrimaryFailure('connection lost')
def sendPartitionTable(self, conn, ptid, num_replicas, row_list):
app = self.app
pt = app.pt = object.__new__(PartitionTable)
pt.load(ptid, num_replicas, row_list, self.app.nm)
if pt.getPartitions() != app.app.pt.getPartitions():
raise RuntimeError("inconsistent number of partitions")
def notifyPartitionChanges(self, conn, ptid, num_replicas, cell_list):
self.app.pt.update(ptid, num_replicas, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, timestamp, node_list):
super(BackupHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
for node_type, addr, _, state, _ in node_list:
if node_type == NodeTypes.ADMIN and state == NodeStates.RUNNING:
self.app.notifyUpstreamAdmin(addr)
def answerLastTransaction(self, conn, tid):
app = self.app
prev_tid = app.app.getLastTransaction()
if prev_tid <= tid:
# Since we don't know which partitions were modified during our
# absence, we must force replication on all storages. As long as
# they haven't done this first check, our backup tid will remain
# inferior to this 'tid'. We don't know the real prev_tid, which is:
# >= app.app.getLastTransaction()
# < tid
# but passing 'tid' is good enough.
# A special case is when prev_tid == tid: even in this case, we
# must restore the state of the backup app so that any interrupted
# replication (internal or not) is resumed, otherwise the global
# backup_tid could remain stuck to an old tid if upstream is idle.
app.invalidatePartitions(tid, tid, xrange(app.pt.getPartitions()))
else:
raise RuntimeError("upstream DB truncated")
app.ignore_invalidations = False
def invalidateObjects(self, conn, tid, oid_list):
app = self.app
if app.ignore_invalidations:
return
getPartition = app.app.pt.getPartition
partition_set = set(map(getPartition, oid_list))
partition_set.add(getPartition(tid))
prev_tid = app.app.getLastTransaction()
app.invalidatePartitions(tid, prev_tid, partition_set)
# The following 2 methods:
# - keep the PackManager up-to-date;
# - replicate the status of pack orders when they're known after the
# storage nodes have fetched related transactions.
def notifyPackSigned(self, conn, approved, rejected):
backup_app = self.app
if backup_app.ignore_pack_notifications:
return
app = backup_app.app
packs = app.pm.packs
ask_tid = min_tid = None
for approved, tid in (True, approved), (False, rejected):
for tid in tid:
try:
packs[tid].approved = approved
except KeyError:
if not ask_tid or tid < ask_tid:
ask_tid = tid
else:
if not min_tid or tid < min_tid:
min_tid = tid
if ask_tid:
if min_tid is None:
min_tid = ask_tid
else:
assert min_tid < ask_tid, (min_tid, ask_tid)
conn.ask(Packets.AskPackOrders(ask_tid), min_tid=min_tid)
elif min_tid:
backup_app.broadcastApprovedRejected(min_tid)
def answerPackOrders(self, conn, pack_list, min_tid):
backup_app = self.app
app = backup_app.app
add = app.pm.add
for pack_order in pack_list:
add(*pack_order)
backup_app.broadcastApprovedRejected(min_tid)
backup_app.ignore_pack_notifications = False
###
neoppod-master-neo-master-handlers/neo/master/handlers/client.py 0000664 0000000 0000000 00000014330 14577262335 0025404 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2019 Nexedi SA
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib.handler import DelayEvent
from neo.lib.exception import ProtocolError
from neo.lib.protocol import Packets, MAX_TID, ZERO_TID, Errors
from ..app import monotonic_time
from . import MasterHandler
class ClientServiceHandler(MasterHandler):
""" Handler dedicated to client during service state """
def handlerSwitched(self, conn, new):
assert new
super(ClientServiceHandler, self).handlerSwitched(conn, new)
def _connectionLost(self, conn):
# cancel its transactions and forgot the node
app = self.app
node = app.nm.getByUUID(conn.getUUID())
assert node is not None, conn
app.pm.clientLost(conn)
for x in app.tm.clientLost(node):
app.notifyTransactionAborted(*x)
node.setUnknown()
app.broadcastNodesInformation([node])
def askBeginTransaction(self, conn, tid):
"""
A client request a TID, nothing is kept about it until the finish.
"""
app = self.app
# Delay new transaction as long as we are waiting for NotifyReady
# answers, otherwise we can't know if the client is expected to commit
# the transaction in full to all these storage nodes.
if app.storage_starting_set:
raise DelayEvent
node = app.nm.getByUUID(conn.getUUID())
tid = app.tm.begin(node, app.storage_readiness, tid)
conn.answer(Packets.AnswerBeginTransaction(tid))
def askNewOIDs(self, conn, num_oids):
conn.answer(Packets.AnswerNewOIDs(self.app.tm.getNextOIDList(num_oids)))
def getEventQueue(self):
# for askBeginTransaction & failedVote
return self.app.tm
def failedVote(self, conn, *args):
app = self.app
conn.answer((Errors.Ack if app.tm.vote(app, *args) else
Errors.IncompleteTransaction)())
def askFinishTransaction(self, conn, ttid, oid_list, checked_list, pack):
app = self.app
if pack:
tid = pack[1]
if tid is None or not ZERO_TID < tid <= app.getLastTransaction():
raise ProtocolError("invalid pack time")
tid, node_list = app.tm.prepare(
app,
ttid,
oid_list,
checked_list,
conn.getPeerId(),
)
if tid:
p = Packets.AskLockInformation(ttid, tid,
app.pm.new(tid, *pack) if pack else False)
for node in node_list:
node.ask(p)
else:
conn.answer(Errors.IncompleteTransaction())
# It's simpler to abort automatically rather than asking the client
# to send a notification on tpc_abort, since it would have keep the
# transaction longer in list of transactions.
# This should happen so rarely that we don't try to minimize the
# number of abort notifications by looking the modified partitions.
self.abortTransaction(conn, ttid, app.getStorageReadySet())
def askFinalTID(self, conn, ttid):
tm = self.app.tm
if tm.getLastTID() < ttid:
# Invalid ttid, or aborted transaction.
tid = None
elif ttid in tm:
# Transaction is being finished.
# We'll answer when it is unlocked.
tm[ttid].registerForNotification(conn.getUUID())
return
else:
# Transaction committed ? Tell client to ask storages.
tid = MAX_TID
conn.answer(Packets.AnswerFinalTID(tid))
def abortTransaction(self, conn, tid, uuid_list):
# Consider a failure when the connection between the storage and the
# client breaks while the answer to the first write is sent back.
# In other words, the client can not know the exact set of nodes that
# know this transaction, and it sends us all nodes it considered for
# writing.
# We must also add those that are waiting for this transaction to be
# finished (returned by tm.abort), because they may have join the
# cluster after that the client started to abort.
app = self.app
involved = app.tm.abort(tid, conn.getUUID())
involved.update(uuid_list)
app.notifyTransactionAborted(tid, involved)
def askPackOrders(self, conn, pack_id):
return self._askPackOrders(conn, pack_id, False)
def waitForPack(self, conn, tid):
try:
pack = self.app.pm.packs[tid]
except KeyError:
conn.answer(Packets.WaitedForPack())
else:
pack.waitForPack(conn.delayedAnswer(Packets.WaitedForPack))
# like ClientServiceHandler but read-only & only for tid <= backup_tid
class ClientReadOnlyServiceHandler(ClientServiceHandler):
def _readOnly(self, conn, *args, **kw):
conn.answer(Errors.ReadOnlyAccess(
'read-only access because cluster is in backuping mode'))
askBeginTransaction = _readOnly
askNewOIDs = _readOnly
askFinishTransaction = _readOnly
askFinalTID = _readOnly
askPack = _readOnly
abortTransaction = _readOnly
# XXX LastIDs is not used by client at all, and it requires work to determine
# last_oid up to backup_tid, so just make it non-functional for client.
askLastIDs = _readOnly
# like in MasterHandler but returns backup_tid instead of last_tid
def askLastTransaction(self, conn):
assert self.app.backup_tid is not None # we are in BACKUPING mode
backup_tid = self.app.pt.getBackupTid(min)
conn.answer(Packets.AnswerLastTransaction(backup_tid))
neoppod-master-neo-master-handlers/neo/master/handlers/identification.py 0000664 0000000 0000000 00000015053 14577262335 0027122 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.exception import NotReadyError, PrimaryElected, ProtocolError
from neo.lib.handler import EventHandler
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \
NodeTypes, Packets, uuid_str
from ..app import monotonic_time
class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp, extra):
app = self.app
self.checkClusterName(name)
if address == app.server:
raise ProtocolError('address conflict')
node = app.nm.getByUUID(uuid)
by_addr = address and app.nm.getByAddress(address)
while 1:
if by_addr:
if not by_addr.isIdentified():
if node is by_addr:
break
if not node or uuid < 0:
# In case of address conflict for a peer with temporary
# ids, we'll generate a new id.
node = by_addr
break
elif node:
if node.isIdentified():
if uuid < 0:
# The peer wants a temporary id that's already assigned.
# Let's give it another one.
node = uuid = None
break
else:
if node is app._node:
node = None
else:
node.setAddress(address)
break
# Id conflict for a storage node.
else:
break
# cloned/evil/buggy node connecting to us
raise ProtocolError('already connected')
new_nid = extra.pop('new_nid', None)
state = NodeStates.RUNNING
if node_type == NodeTypes.CLIENT:
if app.cluster_state == ClusterStates.RUNNING:
handler = app.client_service_handler
elif app.cluster_state == ClusterStates.BACKINGUP:
handler = app.client_ro_service_handler
else:
raise NotReadyError
human_readable_node_type = ' client '
elif node_type == NodeTypes.STORAGE:
if app.cluster_state == ClusterStates.STOPPING_BACKUP:
raise NotReadyError
manager = app._current_manager
if manager is None:
manager = app
state, handler = manager.identifyStorageNode(
uuid is not None and node is not None)
if not address:
if app.cluster_state == ClusterStates.RECOVERING:
raise NotReadyError
if uuid or not new_nid:
raise ProtocolError
state = NodeStates.DOWN
# We'll let the storage node close the connection. If we
# aborted it at the end of the method, BootstrapManager
# (which is used by storage nodes) could see the closure
# and try to reconnect to a master.
human_readable_node_type = ' storage (%s) ' % (state, )
elif node_type == NodeTypes.MASTER:
if app.election:
if id_timestamp and \
(id_timestamp, address) < (app.election, app.server):
raise PrimaryElected(by_addr or
app.nm.createMaster(address=address))
handler = app.election_handler
else:
handler = app.secondary_handler
human_readable_node_type = ' master '
elif node_type == NodeTypes.ADMIN:
handler = app.administration_handler
human_readable_node_type = 'n admin '
else:
raise ProtocolError
uuid = app.getNewUUID(uuid, address, node_type)
logging.info('Accept a' + human_readable_node_type + uuid_str(uuid))
if node is None:
node = app.nm.createFromNodeType(node_type,
uuid=uuid, address=address)
else:
node.setUUID(uuid)
node.extra = extra
node.id_timestamp = monotonic_time()
node.setState(state)
app.broadcastNodesInformation([node])
if new_nid:
changed_list = []
for offset in new_nid:
changed_list.append((offset, uuid, CellStates.OUT_OF_DATE))
app.pt._setCell(offset, node, CellStates.OUT_OF_DATE)
app.broadcastPartitionChanges(changed_list)
conn.setHandler(handler)
node.setConnection(conn, not node.isIdentified())
conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER,
app.uuid,
uuid))
handler._notifyNodeInformation(conn)
handler.handlerSwitched(conn, True)
class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp, extra):
app = self.app
self.checkClusterName(name)
if address == app.server:
raise ProtocolError('address conflict')
primary = app.primary_master.getAddress()
if primary == address:
primary = None
elif not app.primary_master.isIdentified():
if node_type == NodeTypes.MASTER:
node = app.nm.createMaster(address=address)
if id_timestamp:
conn.close()
raise PrimaryElected(node)
primary = None
# For some cases, we rely on the fact that the remote will not retry
# immediately (see SocketConnector.CONNECT_LIMIT).
known_master_list = [node.getAddress()
for node in app.nm.getMasterList()]
conn.send(Packets.NotPrimaryMaster(
primary and known_master_list.index(primary),
known_master_list))
conn.abort()
neoppod-master-neo-master-handlers/neo/master/handlers/master.py 0000664 0000000 0000000 00000007023 14577262335 0025422 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import sys
from . import MasterHandler
from neo.lib.exception import PrimaryElected, PrimaryFailure
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
class SecondaryHandler(MasterHandler):
"""Handler used by primary to handle secondary masters"""
def handlerSwitched(self, conn, new):
pass
def _connectionLost(self, conn):
app = self.app
node = app.nm.getByUUID(conn.getUUID())
node.setDown()
app.broadcastNodesInformation([node])
class ElectionHandler(SecondaryHandler):
"""Handler used by primary to handle secondary masters during election"""
def connectionCompleted(self, conn):
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election, {}))
def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn)
self.connectionLost(conn)
def _acceptIdentification(self, node):
raise PrimaryElected(node)
def _connectionLost(self, *args):
if self.app.primary: # not switching to secondary role
self.app._current_manager.try_secondary = True
def notPrimaryMaster(self, *args):
try:
super(ElectionHandler, self).notPrimaryMaster(*args)
except PrimaryElected, e:
# We keep playing the primary role when the peer does not
# know yet that we won election against the returned node.
if not e.args[0].isIdentified():
raise
# There may be new master nodes. Connect to them.
self.app._current_manager.try_secondary = True
class PrimaryHandler(ElectionHandler):
"""Handler used by secondaries to handle primary master"""
def _acceptIdentification(self, node):
assert self.app.primary_master is node, (self.app.primary_master, node)
def _connectionLost(self, conn):
node = self.app.primary_master
# node is None when switching to primary role
if node and not node.isConnected(True):
raise PrimaryFailure('primary master is dead')
def notPrimaryMaster(self, *args):
try:
super(ElectionHandler, self).notPrimaryMaster(*args)
except PrimaryElected, e:
if e.args[0] is not self.app.primary_master:
raise
def notifyClusterInformation(self, conn, state):
if state == ClusterStates.STOPPING:
sys.exit()
def notifyNodeInformation(self, conn, timestamp, node_list):
super(PrimaryHandler, self).notifyNodeInformation(
conn, timestamp, node_list)
for node_type, _, uuid, state, _ in node_list:
assert node_type == NodeTypes.MASTER, node_type
if uuid == self.app.uuid and state == NodeStates.DOWN:
sys.exit()
neoppod-master-neo-master-handlers/neo/master/handlers/storage.py 0000664 0000000 0000000 00000012115 14577262335 0025571 0 ustar 00root root 0000000 0000000 #
# Copyright (C) 2006-2019 Nexedi SA
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
from neo.lib import logging
from neo.lib.exception import ProtocolError, StoppedOperation
from neo.lib.protocol import CellStates, ClusterStates, Packets, uuid_str
from neo.lib.pt import PartitionTableException
from neo.lib.util import dump
from . import BaseServiceHandler
EXPERIMENTAL_CORRUPTED_STATE = False
class StorageServiceHandler(BaseServiceHandler):
""" Handler dedicated to storages during service state """
def handlerSwitched(self, conn, new):
app = self.app
if new:
super(StorageServiceHandler, self).handlerSwitched(conn, new)
node = app.nm.getByUUID(conn.getUUID())
if node.isRunning(): # node may be PENDING
app.startStorage(node)
def notifyReady(self, conn):
self.app.setStorageReady(conn.getUUID())
def connectionLost(self, conn, new_state):
app = self.app
uuid = conn.getUUID()
node = app.nm.getByUUID(uuid)
super(StorageServiceHandler, self).connectionLost(conn, new_state)
app.setStorageNotReady(uuid)
app.tm.storageLost(uuid)
app.pm.connectionLost(conn)
app.updateCompletedPackId()
if (app.getClusterState() == ClusterStates.BACKINGUP
# Also check if we're exiting, because backup_app is not usable
# in this case. Maybe cluster state should be set to something
# else, like STOPPING, during cleanup (__del__/close).
and app.listening_conn):
app.backup_app.nodeLost(node)
def askUnfinishedTransactions(self, conn, offset_list):
app = self.app
if app.backup_tid:
last_tid = app.pt.getBackupTid(min)
pending_list = ()
else:
# This can't be app.tm.getLastTID() for imported transactions,
# because outdated cells must at least wait that they're locked
# at source side. For normal transactions, it would not matter.
last_tid = app.getLastTransaction()
pending_list = app.tm.registerForNotification(conn.getUUID())
p = Packets.AnswerUnfinishedTransactions(last_tid, pending_list)
conn.answer(p)
app.pt.updatable(conn.getUUID(), offset_list)
def answerInformationLocked(self, conn, ttid):
app = self.app
# XXX: see testAnswerInformationLockedDuringRecovery
if ClusterStates.RUNNING != app.cluster_state != ClusterStates.STOPPING:
assert app.cluster_state == ClusterStates.RECOVERING
else:
app.tm.lock(ttid, conn.getUUID())
def notifyPartitionCorrupted(self, conn, partition, cell_list):
if not EXPERIMENTAL_CORRUPTED_STATE:
logging.error("Partition %s corrupted in: %s",
partition, ', '.join(map(uuid_str, cell_list)))
return
change_list = []
for cell in self.app.pt.getCellList(partition):
if cell.getUUID() in cell_list:
cell.setState(CellStates.CORRUPTED)
change_list.append((partition, cell.getUUID(),
CellStates.CORRUPTED))
self.app.broadcastPartitionChanges(change_list)
if not self.app.pt.operational():
raise StoppedOperation
def notifyReplicationDone(self, conn, offset, tid):
app = self.app
uuid = conn.getUUID()
node = app.nm.getByUUID(uuid)
if app.backup_tid:
cell_list = app.backup_app.notifyReplicationDone(node, offset, tid)
if not cell_list:
return
else:
try:
cell_list = self.app.pt.setUpToDate(node, offset)
except PartitionTableException, e:
raise ProtocolError(str(e))
if not cell_list:
logging.info("ignored late notification that"
" %s has replicated partition %s up to %s",
uuid_str(uuid), offset, dump(tid))
return
logging.debug("%s is up for partition %s (tid=%s)",
uuid_str(uuid), offset, dump(tid))
self.app.broadcastPartitionChanges(cell_list)
def notifyPackCompleted(self, conn, pack_id):
app = self.app
app.nm.getByUUID(conn.getUUID()).completed_pack_id = pack_id
app.updateCompletedPackId()
def askPackOrders(self, conn, pack_id):
return self._askPackOrders(conn, pack_id, True)
def answerPackOrders(self, conn, pack_list, process):
process(pack_list)