Commit 7b0e997f authored by Grégory Wisniewski's avatar Grégory Wisniewski

Splity master node handlers:

- All incoming connections start with the identification handler which will
apply the right handler depending on the current cluster state and the node
type. So we are now sure that a node is well identified, got an UUID and is
registered into the node manager when it leaves the identification handler.
- During service, clietnt and storage nodes have their own handler


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@686 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5088acb2
......@@ -24,18 +24,20 @@ from neo.config import ConfigurationManager
from neo import protocol
from neo.protocol import Packet, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \
PENDING_STATE, INVALID_UUID, INVALID_OID, INVALID_TID, INVALID_PTID, \
CLIENT_NODE_TYPE, MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
UUID_NAMESPACES, ADMIN_NODE_TYPE, BOOTING
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode
from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection, ServerConnection
from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \
OperationFailure
from neo.master.identification import IdentificationEventHandler
from neo.master.administration import AdministrationEventHandler
from neo.master.election import ElectionEventHandler
from neo.master.recovery import RecoveryEventHandler
from neo.master.verification import VerificationEventHandler
from neo.master.service import ServiceEventHandler
from neo.master.service import ClientServiceEventHandler, StorageServiceEventHandler
from neo.master.secondary import SecondaryEventHandler
from neo.master.pt import PartitionTable
from neo.util import dump
......@@ -87,7 +89,7 @@ class Application(object):
self.ltid = INVALID_TID
# The target node's uuid to request next.
self.target_uuid = None
self.cluster_state = BOOTING
def run(self):
"""Make sure that the status is sane and start a loop."""
......@@ -95,8 +97,10 @@ class Application(object):
self.nm.add(MasterNode(server = server))
# Make a listening port.
ListeningConnection(self.em, None, addr = self.server,
connector_handler = self.connector_handler)
self.listening_conn = ListeningConnection(self.em, None,
addr = self.server, connector_handler = self.connector_handler)
self.cluster_state = BOOTING
# Start the election of a primary master node.
self.electPrimary()
......@@ -272,6 +276,7 @@ class Application(object):
def broadcastNodeInformation(self, node):
"""Broadcast a Notify Node Information packet."""
logging.debug('broadcasting node information')
node_type = node.getNodeType()
state = node.getState()
uuid = node.getUUID()
......@@ -359,18 +364,11 @@ class Application(object):
partition table or make a new table from scratch, if this is the first time."""
logging.info('begin the recovery of the status')
handler = RecoveryEventHandler(self)
self.changeClusterState(protocol.RECOVERING)
em = self.em
nm = self.nm
# Make sure that every connection has the status recovery event handler.
for conn in em.getConnectionList():
node = nm.getNodeByUUID(conn.getUUID())
# admin should keep their own handlers
# FIXME: should be filtered at node manager level
if node is None or node.getNodeType() != ADMIN_NODE_TYPE:
conn.setHandler(handler)
self.loid = INVALID_OID
self.ltid = INVALID_TID
self.pt.setID(INVALID_PTID)
......@@ -532,7 +530,6 @@ class Application(object):
"""Verify the data in storage nodes and clean them up, if necessary."""
logging.info('start to verify data')
handler = VerificationEventHandler(self)
em = self.em
nm = self.nm
......@@ -542,11 +539,7 @@ class Application(object):
while time() < t + 1:
em.poll(1)
# Make sure that every connection has the data verification event handler.
for conn in em.getConnectionList():
node = nm.getNodeByUUID(conn.getUUID())
if node is None or node.getNodeType() != ADMIN_NODE_TYPE:
conn.setHandler(handler)
self.changeClusterState(protocol.VERIFYING)
# FIXME this part has a potential problem that the write buffers can
# be very huge. Thus it would be better to flush the buffers from time
......@@ -623,13 +616,6 @@ class Application(object):
# is not uniform.
cell_list.extend(self.pt.tweak())
# And, add unused nodes.
if not ENABLE_PENDING_NODES:
node_list = self.pt.getNodeList()
for node in nm.getStorageNodeList():
if node.getState() == RUNNING_STATE and node not in node_list:
cell_list.extend(self.pt.addNode(node))
# If anything changed, send the changes.
if cell_list:
self.broadcastPartitionChanges(self.pt.setNextID(), cell_list)
......@@ -639,25 +625,14 @@ class Application(object):
and stop the service only if a catastrophy happens or the user commits
a shutdown."""
logging.info('provide service')
handler = ServiceEventHandler(self)
em = self.em
nm = self.nm
self.changeClusterState(protocol.RUNNING)
# This dictionary is used to hold information on transactions being finished.
self.finishing_transaction_dict = {}
# Make sure that every connection has the service event handler.
# and storage nodes should know that the cluster is operational.
for conn in em.getConnectionList():
node = nm.getNodeByUUID(conn.getUUID())
if node is not None and node.getNodeType() == ADMIN_NODE_TYPE:
continue
conn.setHandler(handler)
if not conn.isListeningConnection() and (node is None or \
node.getNodeType() == STORAGE_NODE_TYPE):
conn.notify(protocol.startOperation())
# Now everything is passive.
expiration = 10
while 1:
......@@ -704,6 +679,9 @@ class Application(object):
logging.info('play the primary role with %s (%s:%d)',
dump(self.uuid), *(self.server))
# all incoming connections identify through this handler
self.listening_conn.setHandler(IdentificationEventHandler(self))
# If I know any storage node, make sure that they are not in the running state,
# because they are not connected at this stage.
for node in self.nm.getStorageNodeList():
......@@ -719,7 +697,6 @@ class Application(object):
self.verifyData()
except VerificationFailure:
recovering = True
self.provideService()
def playSecondaryRole(self):
......@@ -738,6 +715,47 @@ class Application(object):
while 1:
em.poll(1)
def changeClusterState(self, state):
""" Change the cluster state and apply right handler on each connections """
if self.cluster_state == state:
return
nm, em = self.nm, self.em
# select the storage handler
if state == protocol.BOOTING:
storage_handler = RecoveryEventHandler
elif state == protocol.RECOVERING:
storage_handler = RecoveryEventHandler
elif state == protocol.VERIFYING:
storage_handler = VerificationEventHandler
elif state == protocol.RUNNING:
storage_handler = StorageServiceEventHandler
else:
RuntimeError('Unexpected node type')
# change handlers
for conn in em.getConnectionList():
node = nm.getNodeByUUID(conn.getUUID())
if conn.isListeningConnection() or node is None:
# not identified or listening, keep the identification handler
continue
node_type = node.getNodeType()
if node_type in (ADMIN_NODE_TYPE, MASTER_NODE_TYPE):
# those node types keep their own handler
continue
if node_type == CLIENT_NODE_TYPE:
if state != RUNNING:
# FIXME: cut the connection ?
pass
handler = ClientServiceEventHandler
elif node_type == STORAGE_NODE_TYPE:
handler = storage_handler
handler = handler(self)
conn.setHandler(handler)
handler.connectionCompleted(conn)
self.cluster_state = state
def getNextOID(self):
if self.loid is None:
raise RuntimeError, 'I do not know the last OID'
......@@ -826,6 +844,68 @@ class Application(object):
c.notify(protocol.notifyNodeInformation(node_list))
# then shutdown
sys.exit("Cluster has been asked to shut down")
def identifyStorageNode(self, uuid, node):
# TODO: check all cases here, when server address change...
# in verification and running states, if the node is unknown but the
# uuid != INVALID_UUID, we have to give it a new uuid, but in recovery
# the node must keep it's UUID
state = protocol.RUNNING_STATE
handler = None
if self.cluster_state == protocol.RECOVERING:
# TODO: Enable empty node rejection when manual startup is ok :
if False and uuid == protocol.INVALID_UUID:
logging.info('reject empty storage node')
raise protocol.NotReadyError
handler = RecoveryEventHandler
elif self.cluster_state == protocol.VERIFYING:
if uuid == INVALID_UUID or node is None:
# if node is unknown, it has been forget when the current
# partition was validated by the admin
uuid = INVALID_UUID
state = protocol.PENDING_STATE
handler = VerificationEventHandler
elif self.cluster_state == protocol.RUNNING:
if uuid == INVALID_UUID or node is None:
# same as for verification
uuid = INVALID_UUID
state = protocol.PENDING_STATE
handler = StorageServiceEventHandler
elif self.cluster_state == protocol.STOPPING:
# FIXME: raise a ShutdowningError ?
raise protocol.NotReadyError
else:
raise RuntimeError('unhandled cluster state')
return (uuid, state, handler)
def identifyNode(self, node_type, uuid, node):
state = protocol.RUNNING_STATE
handler = self.__class__
if node_type == protocol.ADMIN_NODE_TYPE:
# always accept admin nodes
klass = AdminNode
handler = AdministrationEventHandler
logging.info('Accept an admin %s' % dump(uuid))
elif node_type == protocol.MASTER_NODE_TYPE:
# always put other master in waiting state
klass = MasterNode
# FIXME: Apply a dedicated handler
logging.info('Accept a master %s' % dump(uuid))
elif node_type == protocol.CLIENT_NODE_TYPE:
# refuse any client before running
if self.cluster_state != protocol.RUNNING:
logging.info('reject a connection from a client')
raise protocol.NotReadyError
klass = ClientNode
# FIXME: Apply an handler dedicated to client nodes
handler = ClientServiceEventHandler
logging.info('Accept a client %s' % dump(uuid))
elif node_type == protocol.STORAGE_NODE_TYPE:
klass = StorageNode
(uuid, state, handler) = self.identifyStorageNode(uuid, node)
return (uuid, node, state, handler, klass)
......@@ -30,29 +30,6 @@ class MasterEventHandler(EventHandler):
self.app = app
EventHandler.__init__(self)
def acceptNodeIdentification(self, conn, packet, uuid):
""" Send a packet to accept the node identification """
app = self.app
args = (protocol.MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1],
app.pt.getPartitions(), app.pt.getReplicas(),
uuid)
p = protocol.acceptNodeIdentification(*args)
conn.answer(p, packet)
def registerAdminNode(self, conn, packet, uuid, server):
""" Register the connection's peer as an admin node """
from neo.master.administration import AdministrationEventHandler
from neo.node import AdminNode
node = self.app.nm.getNodeByUUID(uuid)
if node is None:
uuid = self.app.getNewUUID(protocol.ADMIN_NODE_TYPE)
self.app.nm.add(AdminNode(uuid=uuid, server=server))
conn.setUUID(uuid)
conn.setHandler(AdministrationEventHandler(self.app))
logging.info('Register admin node %s' % util.dump(uuid))
self.acceptNodeIdentification(conn, packet, uuid)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
raise NotImplementedError('this method must be overridden')
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo import protocol
from neo.node import AdminNode, MasterNode, ClientNode, StorageNode
from neo.master.handler import MasterEventHandler
from neo import decorators
class IdentificationEventHandler(MasterEventHandler):
"""This class deals with messages from the admin node only"""
def connectionClosed(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
def timeoutExpired(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
def peerBroken(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
# TODO: move this into a new handler
@decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def handleNotifyNodeInformation(self, conn, packet, node_list):
# XXX: Secondary master can send this packet
logging.error('ignoring NotifyNodeInformation packet')
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
# TODO: handle broken nodes
self.checkClusterName(name)
app, nm = self.app, self.app.nm
server = (ip_address, port)
node_by_uuid = nm.getNodeByUUID(uuid)
node_by_addr = nm.getNodeByServer(server)
if node_by_uuid is not None and node_by_addr is not None and \
node_by_uuid is not node_by_addr:
# got a conflict, but UUIDs should be more reliable
# TODO: delete the old node...
raise RuntimeError('node conflict not implemented yet')
pass
node = node_by_uuid or node_by_addr
if node is not None and node.getServer() != server:
# address changed
# TODO: delete or update the old node ?
node.setServer(server)
raise RuntimeError('node address changement not implemented yet')
# ask the app the node identification, if refused, an exception is raised
result = self.app.identifyNode(node_type, uuid, node)
(uuid, node, state, handler, klass) = result
if uuid == protocol.INVALID_UUID:
# no valid uuid, give it one
uuid = app.getNewUUID(node_type)
if node is None:
# new node
node = klass(uuid=uuid, server=(ip_address, port))
app.nm.add(node)
handler = handler(self.app)
# set up the node
node.setUUID(uuid)
node.setState(state)
# set up the connection
conn.setUUID(uuid)
conn.setHandler(handler)
# XXX: Here we could bin conn and node together
# answer
args = (protocol.MASTER_NODE_TYPE, app.uuid, app.server[0], app.server[1],
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
conn.answer(protocol.acceptNodeIdentification(*args), packet)
# trigger the event
handler.connectionCompleted(conn)
app.broadcastNodeInformation(node)
......@@ -26,161 +26,42 @@ from neo.exception import ElectionFailure
from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID, INVALID_PTID
from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo.util import dump
from neo import decorators
class RecoveryEventHandler(MasterEventHandler):
"""This class deals with events for a recovery phase."""
def connectionCompleted(self, conn):
# ask the last IDs to perform the the recovery
conn.ask(protocol.askLastIDs())
def connectionClosed(self, conn):
app = self.app
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
app = self.app
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet):
MasterEventHandler.packetReceived(self, conn, packet)
def handleRequestNodeIdentification(self, conn, packet, node_type, uuid,
ip_address, port, name):
self.checkClusterName(name)
app = self.app
addr = (ip_address, port)
if node_type == ADMIN_NODE_TYPE:
self.registerAdminNode(conn, packet, uuid, addr)
return
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
logging.info('reject a connection from a client')
raise protocol.NotReadyError
if node_type is STORAGE_NODE_TYPE and uuid is INVALID_UUID:
# refuse an empty storage node (with no UUID) to avoid potential
# UUID conflict
logging.info('reject empty storage node')
raise protocol.NotReadyError
# Here are many situations. In principle, a node should be identified by
# an UUID, since an UUID never change when moving a storage node to a different
# server, and an UUID always changes for a master node and a client node whenever
# it restarts, so more reliable than a server address.
#
# However, master nodes can be known only as the server addresses. And, a node
# may claim a server address used by another node.
node = app.nm.getNodeByUUID(uuid)
if not app.isValidUUID(uuid, addr):
# Here we have an UUID conflict, assume that's a new node
node = None
else:
# First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid)
if node is None:
# generate an uuid for this node
while not app.isValidUUID(uuid, addr):
uuid = app.getNewUUID(node_type)
# If nothing is present, try with the server address.
node = app.nm.getNodeByServer(addr)
if node is None:
# Nothing is found. So this must be the first time that this node
# connected to me.
if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid)
else:
node = StorageNode(server = addr, uuid = uuid)
app.nm.add(node)
app.broadcastNodeInformation(node)
else:
# Otherwise, I know it only by the server address or the same server
# address but with a different UUID.
if node.getUUID() is None:
# This must be a master node.
if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as a master
# node.
raise protocol.ProtocolError('invalid server address')
node.setUUID(uuid)
if node.getState() != RUNNING_STATE:
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# This node has a different UUID.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
raise protocol.ProtocolError('invalid server address')
# Otherwise, forget the old one.
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
# And insert a new one.
node.setUUID(uuid)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# I know this node by the UUID.
if node.getServer() != addr:
# This node has a different server address.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
raise protocol.ProtocolError('invalid server address')
# Otherwise, forget the old one.
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
# And insert a new one.
node.setServer(addr)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# If this node is broken, reject it. Otherwise, assume that it is
# working again.
if node.getState() == BROKEN_STATE:
raise protocol.BrokenNodeDisallowedError
node.setUUID(uuid)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
conn.setUUID(uuid)
self.acceptNodeIdentification(conn, packet, uuid)
if node_type is STORAGE_NODE_TYPE:
# ask the last IDs.
conn.ask(protocol.askLastIDs())
@decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
node = app.nm.getNodeByUUID(uuid)
if node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
MasterEventHandler.peerBroken(self, conn)
@decorators.identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
......@@ -223,14 +104,12 @@ class RecoveryEventHandler(MasterEventHandler):
# Something wrong happened possibly. Cut the connection to this node,
# if any, and notify the information to others.
# XXX this can be very slow.
for c in app.em.getConnectionList():
if c.getUUID() == uuid:
c.close()
c = app.em.getConnectionByUUID(uuid)
if c is not None:
c.close()
node.setState(state)
app.broadcastNodeInformation(node)
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID()
app = self.app
......@@ -251,8 +130,6 @@ class RecoveryEventHandler(MasterEventHandler):
elif app.pt.getID() == lptid and app.target_uuid is None:
app.target_uuid = uuid
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
uuid = conn.getUUID()
app = self.app
......
......@@ -19,7 +19,7 @@ import logging
from copy import copy
from neo import protocol
from neo.protocol import MASTER_NODE_TYPE, CLIENT_NODE_TYPE, \
from neo.protocol import CLIENT_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, DISCARDED_STATE, \
STORAGE_NODE_TYPE, ADMIN_NODE_TYPE, OUT_OF_DATE_STATE, \
......@@ -30,53 +30,12 @@ from neo.exception import OperationFailure, ElectionFailure
from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo.util import dump
from neo.master import ENABLE_PENDING_NODES
from neo import decorators
class FinishingTransaction(object):
"""This class describes a finishing transaction."""
def __init__(self, conn):
self._conn = conn
self._msg_id = None
self._oid_list = None
self._uuid_set = None
self._locked_uuid_set = set()
def getConnection(self):
return self._conn
def setMessageId(self, msg_id):
self._msg_id = msg_id
def getMessageId(self):
return self._msg_id
def setOIDList(self, oid_list):
self._oid_list = oid_list
def getOIDList(self):
return self._oid_list
def setUUIDSet(self, uuid_set):
self._uuid_set = uuid_set
def getUUIDSet(self):
return self._uuid_set
def addLockedUUID(self, uuid):
if uuid in self._uuid_set:
self._locked_uuid_set.add(uuid)
def allLocked(self):
return self._uuid_set == self._locked_uuid_set
class ServiceEventHandler(MasterEventHandler):
"""This class deals with events for a service phase."""
def _dealWithNodeFailure(self, conn, new_state):
uuid = conn.getUUID()
if uuid is None:
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node is not None and node.getState() == RUNNING_STATE:
......@@ -105,181 +64,29 @@ class ServiceEventHandler(MasterEventHandler):
def peerBroken(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node is not None and node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
if node.getNodeType() == CLIENT_NODE_TYPE:
# If this node is a client, just forget it.
app.nm.remove(node)
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif node.getNodeType() == STORAGE_NODE_TYPE:
cell_list = app.pt.dropNode(node)
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
if not app.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
MasterEventHandler.peerBroken(self, conn)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
self.checkClusterName(name)
app = self.app
addr = (ip_address, port)
if node_type == ADMIN_NODE_TYPE:
self.registerAdminNode(conn, packet, uuid, addr)
return
# Here are many situations. In principle, a node should be identified
# by an UUID, since an UUID never change when moving a storage node
# to a different server, and an UUID always changes for a master node
# and a client node whenever it restarts, so more reliable than a
# server address.
#
# However, master nodes can be known only as the server addresses.
# And, a node may claim a server address used by another node.
# First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid)
if node is not None and node.getServer() != addr:
# Here we have an UUID conflict, assume that's a new node
# XXX what about a storage node wich has changed of address ?
# it still must be used with its old data if marked out of date
# into the partition table
node = None
old_node = None
if node is None:
# generate a new uuid for this node
while not app.isValidUUID(uuid, addr):
uuid = app.getNewUUID(node_type)
# If nothing is present, try with the server address.
node = app.nm.getNodeByServer(addr)
if node is None:
# Nothing is found. So this must be the first time that
# this node connected to me.
if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid)
elif node_type == CLIENT_NODE_TYPE:
node = ClientNode(uuid = uuid)
else:
node = StorageNode(server = addr, uuid = uuid)
if ENABLE_PENDING_NODES:
node.setState(PENDING_STATE)
app.nm.add(node)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
else:
# Otherwise, I know it only by the server address or the same
# server address but with a different UUID.
if node.getUUID() is None:
# This must be a master node loaded from configuration
if node.getNodeType() != MASTER_NODE_TYPE \
or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as
# a master node.
raise protocol.ProtocolError('invalid server address')
node.setUUID(uuid)
if node.getState() != RUNNING_STATE:
node.setState(RUNNING_STATE)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
else:
# This node has a different UUID.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
raise protocol.ProtocolError('invalid uuid')
else:
# Otherwise, forget the old one.
node.setState(DOWN_STATE)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
app.nm.remove(node)
old_node = node
node = copy(node)
# And insert a new one.
node.setUUID(uuid)
node.setState(RUNNING_STATE)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
app.nm.add(node)
else:
# I know this node by the UUID.
try:
ip_address, port = node.getServer()
except TypeError:
ip_address, port = '0.0.0.0', 0
if (ip_address, port) != addr:
# This node has a different server address.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
raise protocol.ProtocolError('invalid server address')
# Otherwise, forget the old one.
node.setState(DOWN_STATE)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
if node is not None and node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
if node.getNodeType() == CLIENT_NODE_TYPE:
# If this node is a client, just forget it.
app.nm.remove(node)
old_node = node
node = copy(node)
# And insert a new one.
node.setServer(addr)
node.setState(RUNNING_STATE)
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
app.nm.add(node)
else:
# If this node is broken, reject it. Otherwise, assume that
# it is working again.
if node.getState() == BROKEN_STATE:
raise protocol.BrokenNodeDisallowedError
else:
node.setUUID(uuid)
node.setState(RUNNING_STATE)
logging.info('broadcasting node information as running %s' %(node.getState(),))
app.broadcastNodeInformation(node)
conn.setUUID(uuid)
if not ENABLE_PENDING_NODES and node.getNodeType() == STORAGE_NODE_TYPE:
# If this is a storage node, add it into the partition table.
# Note that this does no harm, even if the node is not new.
if old_node is not None:
logging.info('dropping %s from a partition table',
dump(old_node.getUUID()))
cell_list = app.pt.dropNode(old_node)
else:
cell_list = []
cell_list.extend(app.pt.addNode(node))
logging.info('added %s into a partition table (%d modifications)',
dump(node.getUUID()), len(cell_list))
if len(cell_list) != 0:
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif node.getNodeType() == STORAGE_NODE_TYPE:
cell_list = app.pt.dropNode(node)
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
if not app.pt.operational():
# Catastrophic.
raise OperationFailure, 'cannot continue operation'
MasterEventHandler.peerBroken(self, conn)
self.acceptNodeIdentification(conn, packet, uuid)
@decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet):
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
@decorators.identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
uuid = conn.getUUID()
conn_node = app.nm.getNodeByUUID(uuid)
if conn_node is None:
raise RuntimeError('I do not know the uuid %r' % dump(uuid))
for node_type, ip_address, port, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
# No interest.
......@@ -324,11 +131,10 @@ class ServiceEventHandler(MasterEventHandler):
# this node, if any, and notify the information to others.
# XXX this can be very slow.
# XXX does this need to be closed in all cases ?
for c in app.em.getConnectionList():
if c.getUUID() == uuid:
c.close()
c = app.em.getConnectionByUUID(uuid)
if c is not None:
c.close()
logging.debug('broadcasting node information')
app.broadcastNodeInformation(node)
if node.getNodeType() == STORAGE_NODE_TYPE:
if state in (DOWN_STATE, BROKEN_STATE):
......@@ -344,19 +150,70 @@ class ServiceEventHandler(MasterEventHandler):
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID()
def handleAskLastIDs(self, conn, packet):
app = self.app
node = app.nm.getNodeByUUID(uuid)
# If I get a bigger value here, it is dangerous.
if app.loid < loid or app.ltid < ltid or app.pt.getID() < lptid:
logging.critical('got later information in service')
raise OperationFailure
conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.pt.getID()), packet)
def handleAskUnfinishedTransactions(self, conn, packet):
app = self.app
p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys())
conn.answer(p, packet)
class FinishingTransaction(object):
"""This class describes a finishing transaction."""
def __init__(self, conn):
self._conn = conn
self._msg_id = None
self._oid_list = None
self._uuid_set = None
self._locked_uuid_set = set()
def getConnection(self):
return self._conn
def setMessageId(self, msg_id):
self._msg_id = msg_id
def getMessageId(self):
return self._msg_id
def setOIDList(self, oid_list):
self._oid_list = oid_list
def getOIDList(self):
return self._oid_list
def setUUIDSet(self, uuid_set):
self._uuid_set = uuid_set
def getUUIDSet(self):
return self._uuid_set
def addLockedUUID(self, uuid):
if uuid in self._uuid_set:
self._locked_uuid_set.add(uuid)
def allLocked(self):
return self._uuid_set == self._locked_uuid_set
class ClientServiceEventHandler(ServiceEventHandler):
def connectionCompleted(self, conn):
pass
def handleAbortTransaction(self, conn, packet, tid):
uuid = conn.getUUID()
node = self.app.nm.getNodeByUUID(uuid)
try:
del self.app.finishing_transaction_dict[tid]
except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid))
pass
@decorators.identification_required
@decorators.restrict_node_types(CLIENT_NODE_TYPE)
def handleAskNewTID(self, conn, packet):
uuid = conn.getUUID()
app = self.app
......@@ -365,8 +222,6 @@ class ServiceEventHandler(MasterEventHandler):
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.answer(protocol.answerNewTID(tid), packet)
@decorators.identification_required
@decorators.restrict_node_types(CLIENT_NODE_TYPE)
def handleAskNewOIDs(self, conn, packet, num_oids):
uuid = conn.getUUID()
app = self.app
......@@ -374,8 +229,6 @@ class ServiceEventHandler(MasterEventHandler):
oid_list = app.getNewOIDList(num_oids)
conn.answer(protocol.answerNewOIDs(oid_list), packet)
@decorators.identification_required
@decorators.restrict_node_types(CLIENT_NODE_TYPE)
def handleFinishTransaction(self, conn, packet, oid_list, tid):
uuid = conn.getUUID()
app = self.app
......@@ -415,8 +268,12 @@ class ServiceEventHandler(MasterEventHandler):
logging.warn('finishing transaction %s does not exist', dump(tid))
pass
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
class StorageServiceEventHandler(ServiceEventHandler):
def connectionCompleted(self, conn):
conn.notify(protocol.startOperation())
def handleNotifyInformationLocked(self, conn, packet, tid):
uuid = conn.getUUID()
app = self.app
......@@ -455,30 +312,15 @@ class ServiceEventHandler(MasterEventHandler):
# What is this?
pass
@decorators.identification_required
@decorators.restrict_node_types(CLIENT_NODE_TYPE)
def handleAbortTransaction(self, conn, packet, tid):
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID()
node = self.app.nm.getNodeByUUID(uuid)
try:
del self.app.finishing_transaction_dict[tid]
except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid))
pass
@decorators.identification_required
def handleAskLastIDs(self, conn, packet):
app = self.app
conn.answer(protocol.answerLastIDs(app.loid, app.ltid, app.pt.getID()), packet)
@decorators.identification_required
def handleAskUnfinishedTransactions(self, conn, packet):
app = self.app
p = protocol.answerUnfinishedTransactions(app.finishing_transaction_dict.keys())
conn.answer(p, packet)
node = app.nm.getNodeByUUID(uuid)
# If I get a bigger value here, it is dangerous.
if app.loid < loid or app.ltid < ltid or app.pt.getID() < lptid:
logging.critical('got later information in service')
raise OperationFailure
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
# This should be sent when a cell becomes up-to-date because
# a replication has finished.
......
......@@ -26,180 +26,53 @@ from neo.exception import VerificationFailure, ElectionFailure
from neo.protocol import Packet, UnexpectedPacketError, INVALID_UUID
from neo.util import dump
from neo.node import ClientNode, StorageNode, MasterNode, AdminNode
from neo import decorators
class VerificationEventHandler(MasterEventHandler):
"""This class deals with events for a verification phase."""
def connectionCompleted(self, conn):
pass
def connectionClosed(self, conn):
app = self.app
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if node.getNodeType() in (CLIENT_NODE_TYPE):
# If this node is a client, just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE:
if not app.pt.operational():
# Catastrophic.
raise VerificationFailure, 'cannot continue verification'
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if not app.pt.operational():
# Catastrophic.
raise VerificationFailure, 'cannot continue verification'
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
app = self.app
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if node.getNodeType() in (CLIENT_NODE_TYPE):
# If this node is a client, just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE:
if not app.pt.operational():
# Catastrophic.
raise VerificationFailure, 'cannot continue verification'
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if not app.pt.operational():
# Catastrophic.
raise VerificationFailure, 'cannot continue verification'
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
uuid = conn.getUUID()
if uuid is not None:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
if node.getNodeType() in (CLIENT_NODE_TYPE):
# If this node is a client, just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE:
cell_list = app.pt.dropNode(node)
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
if not app.pt.operational():
# Catastrophic.
raise VerificationFailure, 'cannot continue verification'
MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet):
MasterEventHandler.packetReceived(self, conn, packet)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
self.checkClusterName(name)
app = self.app
addr = (ip_address, port)
if node_type == ADMIN_NODE_TYPE:
self.registerAdminNode(conn, packet, uuid, addr)
return
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
logging.info('reject a connection from a client')
raise protocol.NotReadyError
# Here are many situations. In principle, a node should be identified by
# an UUID, since an UUID never change when moving a storage node to a different
# server, and an UUID always changes for a master node and a client node whenever
# it restarts, so more reliable than a server address.
#
# However, master nodes can be known only as the server addresses. And, a node
# may claim a server address used by another node.
if not app.isValidUUID(uuid, addr):
# Here we have an UUID conflict, assume that's a new node
node = None
else:
# First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid)
if node is None:
# generate a new uuid for this node
while not app.isValidUUID(uuid, addr):
uuid = app.getNewUUID(node_type)
# If nothing is present, try with the server address.
node = app.nm.getNodeByServer(addr)
if node is None:
# Nothing is found. So this must be the first time that this node
# connected to me.
if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid)
else:
# empty storage nodes starts in PENDING state
node = StorageNode(server = addr, uuid = uuid)
node.setState(PENDING_STATE)
app.nm.add(node)
app.broadcastNodeInformation(node)
else:
# Otherwise, I know it only by the server address or the same server
# address but with a different UUID.
if node.getUUID() is None:
# This must be a master node.
if node.getNodeType() != MASTER_NODE_TYPE or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as a master
# node.
raise protocol.ProtocolError('invalid server address')
node.setUUID(uuid)
if node.getState() != RUNNING_STATE:
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# This node has a different UUID.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
raise protocol.ProtocolError('invalid server address')
# Otherwise, forget the old one.
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
# And insert a new one.
node.setUUID(uuid)
if node_type is STORAGE_NODE_TYPE:
# empty node
node.setState(PENDING_STATE)
else:
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# I know this node by the UUID.
if node.getServer() != addr:
# This node has a different server address.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
raise protocol.ProtocolError('invalid server address')
# Otherwise, forget the old one.
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
# And insert a new one.
node.setServer(addr)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# If this node is broken, reject it. Otherwise, assume that it is
# working again.
if node.getState() == BROKEN_STATE:
raise protocol.BrokenNodeDisallowedError
node.setUUID(uuid)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
conn.setUUID(uuid)
self.acceptNodeIdentification(conn, packet, uuid)
@decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
# I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises'
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
node = app.nm.getNodeByUUID(uuid)
if node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
cell_list = app.pt.dropNode(node)
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
if not app.pt.operational():
# Catastrophic.
raise VerificationFailure, 'cannot continue verification'
MasterEventHandler.peerBroken(self, conn)
@decorators.identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
......@@ -242,14 +115,12 @@ class VerificationEventHandler(MasterEventHandler):
# Something wrong happened possibly. Cut the connection to this node,
# if any, and notify the information to others.
# XXX this can be very slow.
for c in app.em.getConnectionList():
if c.getUUID() == uuid:
c.close()
c = app.em.getConnectionByUUID(uuid)
if c is not None:
c.close()
node.setState(state)
app.broadcastNodeInformation(node)
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
uuid = conn.getUUID()
app = self.app
......@@ -263,8 +134,6 @@ class VerificationEventHandler(MasterEventHandler):
# Ignore this packet.
pass
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
uuid = conn.getUUID()
logging.info('got unfinished transactions %s from %s:%d',
......@@ -277,8 +146,6 @@ class VerificationEventHandler(MasterEventHandler):
app.unfinished_tid_set.update(tid_list)
app.asking_uuid_dict[uuid] = True
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list):
uuid = conn.getUUID()
......@@ -300,8 +167,6 @@ class VerificationEventHandler(MasterEventHandler):
app.unfinished_oid_set = None
app.asking_uuid_dict[uuid] = True
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleTidNotFound(self, conn, packet, message):
uuid = conn.getUUID()
logging.info('TID not found: %s', message)
......@@ -313,8 +178,6 @@ class VerificationEventHandler(MasterEventHandler):
app.unfinished_oid_set = None
app.asking_uuid_dict[uuid] = True
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleAnswerObjectPresent(self, conn, packet, oid, tid):
uuid = conn.getUUID()
logging.info('object %s:%s found', dump(oid), dump(tid))
......@@ -325,8 +188,6 @@ class VerificationEventHandler(MasterEventHandler):
return
app.asking_uuid_dict[uuid] = True
@decorators.identification_required
@decorators.restrict_node_types(STORAGE_NODE_TYPE)
def handleOidNotFound(self, conn, packet, message):
uuid = conn.getUUID()
logging.info('OID not found: %s', message)
......
......@@ -314,8 +314,10 @@ INTERNAL_ERROR_CODE = 8
# Cluster states
cluster_states = Enum({
'BOOTING': 1,
'RUNNING': 2,
'STOPPING': 3,
'RECOVERING': 2,
'VERIFYING': 3,
'RUNNING': 4,
'STOPPING': 5,
})
VALID_CLUSTER_STATE_LIST = (BOOTING, RUNNING, STOPPING)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment