Commit e9fb5361 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Add a handler dedicated to admin node since the admin node does'nt care about

the current cluster state, so the primary master should not change the admin's 
connection handler when doing a state-switch.
Factorise a little the response to an identification request.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@682 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f627727f
......@@ -83,7 +83,7 @@ class SelectEventManager(object):
self.event_list = []
self.prev_time = time()
def getConnectionList(self):
def getConnectionList(self, with_admin_nodes=False):
return self.connection_dict.values()
def register(self, conn):
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo import protocol
from neo.master.handler import MasterEventHandler
from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
STORAGE_NODE_TYPE, HIDDEN_STATE, PENDING_STATE
from neo.util import dump
class AdministrationEventHandler(MasterEventHandler):
"""This class deals with messages from the admin node only"""
def _discardNode(self, conn):
uuid = conn.getUUID()
node = self.app.nm.getNodeByUUID(uuid)
if node is not None:
self.app.nm.remove(node)
def handleAskPrimaryMaster(self, conn, packet):
app = self.app
# I'm the primary
conn.answer(protocol.answerPrimaryMaster(app.uuid, []), packet)
# TODO: Admin will ask itself for these data
app.sendNodesInformations(conn)
app.sendPartitionTable(conn)
def connectionClosed(self, conn):
self._discardNode(conn)
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
self._discardNode(conn)
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
self._discardNode(conn)
MasterEventHandler.peerBroken(self, conn)
def handleSetClusterState(self, conn, packet, name, state):
self.checkClusterName(name)
if state == protocol.RUNNING:
self.app.cluster_state = state
conn.answer(protocol.answerClusterState(self.app.cluster_state), packet)
def handleSetNodeState(self, conn, packet, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s : %s" % (dump(uuid), state, modify_partition_table))
app = self.app
if uuid == app.uuid:
# get message for self
if state == RUNNING_STATE:
# yes I know
p = protocol.answerNodeState(app.uuid, state)
conn.answer(p, packet)
return
else:
# I was asked to shutdown
node.setState(state)
ip, port = node.getServer()
node_list = [(node.getNodeType(), ip, port, node.getUUID(), node.getState()),]
conn.answer(protocol.notifyNodeInformation(node_list), packet)
app.shutdown()
node = app.nm.getNodeByUUID(uuid)
if node is None:
p = protocol.protocolError('invalid uuid')
conn.notify(p)
return
if node.getState() == state:
# no change, just notify admin node
node.setState(state)
ip, port = node.getServer()
node_list = [(node.getNodeType(), ip, port, node.getUUID(), node.getState()),]
conn.answer(protocol.notifyNodeInformation(node_list), packet)
# forward information to all nodes
if node.getState() != state:
node.setState(state)
ip, port = node.getServer()
node_list = [(node.getNodeType(), ip, port, node.getUUID(), node.getState()),]
conn.answer(protocol.notifyNodeInformation(node_list), packet)
app.broadcastNodeInformation(node)
# If this is a storage node, ask it to start.
if node.getNodeType() == STORAGE_NODE_TYPE and state == RUNNING_STATE:
for sn_conn in app.em.getConnectionList():
if sn_conn.getUUID() == node.getUUID():
logging.info("asking sn to start operation")
sn_conn.notify(protocol.startOperation())
# modify the partition table if required
if modify_partition_table and node.getNodeType() == STORAGE_NODE_TYPE:
if state in (DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE):
# remove from pt
cell_list = app.pt.dropNode(node)
else:
# add to pt
cell_list = app.pt.addNode(node)
if len(cell_list) != 0:
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
else:
# outdate node in partition table
cell_list = app.pt.outdate()
if len(cell_list) != 0:
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
def handleAddPendingNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list])
logging.debug('Add nodes %s' % uuids)
app, nm, em, pt = self.app, self.app.nm, self.app.em, self.app.pt
cell_list = []
uuid_set = set()
# take all pending nodes
for node in nm.getStorageNodeList():
if node.getState() == PENDING_STATE:
uuid_set.add(node.getUUID())
# keep only selected nodes
if uuid_list:
uuid_set = uuid_set.intersection(set(uuid_list))
# nothing to do
if not uuid_set:
logging.warning('No nodes added')
conn.answer(protocol.answerNewNodes(()), packet)
return
uuids = ', '.join([dump(uuid) for uuid in uuid_set])
logging.info('Adding nodes %s' % uuids)
# switch nodes to running state
for uuid in uuid_set:
node = nm.getNodeByUUID(uuid)
new_cells = pt.addNode(node)
cell_list.extend(new_cells)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
# start nodes
for s_conn in em.getConnectionList():
if s_conn.getUUID() in uuid_set:
s_conn.notify(protocol.startOperation())
# broadcast the new partition table
app.broadcastPartitionChanges(app.pt.setNextID(), cell_list)
conn.answer(protocol.answerNewNodes(list(uuid_set)), packet)
......@@ -306,6 +306,7 @@ class Application(object):
def broadcastPartitionChanges(self, ptid, cell_list):
"""Broadcast a Notify Partition Changes packet."""
logging.info('broadcastPartitionChanges')
self.pt.log()
for c in self.em.getConnectionList():
if c.getUUID() is not None:
......@@ -364,7 +365,11 @@ class Application(object):
# Make sure that every connection has the status recovery event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
node = nm.getNodeByUUID(conn.getUUID())
# admin should keep their own handlers
# FIXME: should be filtered at node manager level
if node is None or node.getNodeType() != ADMIN_NODE_TYPE:
conn.setHandler(handler)
self.loid = INVALID_OID
self.ltid = INVALID_TID
......@@ -539,7 +544,9 @@ class Application(object):
# Make sure that every connection has the data verification event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
node = nm.getNodeByUUID(conn.getUUID())
if node is None or node.getNodeType() != ADMIN_NODE_TYPE:
conn.setHandler(handler)
# FIXME this part has a potential problem that the write buffers can
# be very huge. Thus it would be better to flush the buffers from time
......@@ -641,16 +648,15 @@ class Application(object):
self.finishing_transaction_dict = {}
# Make sure that every connection has the service event handler.
# and storage nodes should know that the cluster is operational.
for conn in em.getConnectionList():
node = nm.getNodeByUUID(conn.getUUID())
if node is not None and node.getNodeType() == ADMIN_NODE_TYPE:
continue
conn.setHandler(handler)
# Now storage nodes should know that the cluster is operational.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE:
conn.notify(protocol.startOperation())
if not conn.isListeningConnection() and node is None or \
node.getNodeType() == STORAGE_NODE_TYPE:
conn.notify(protocol.startOperation())
# Now everything is passive.
expiration = 10
......
......@@ -17,16 +17,42 @@
import logging
from neo import protocol
from neo.handler import EventHandler
from neo.protocol import INVALID_UUID, BROKEN_STATE, ADMIN_NODE_TYPE
from neo import protocol
from neo import util
class MasterEventHandler(EventHandler):
"""This class implements a generic part of the event handlers."""
def __init__(self, app):
self.app = app
EventHandler.__init__(self)
def acceptNodeIdentification(self, conn, packet, uuid):
""" Send a packet to accept the node identification """
app = self.app
args = (protocol.MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1],
app.pt.getPartitions(), app.pt.getReplicas(),
uuid)
p = protocol.acceptNodeIdentification(*args)
conn.answer(p, packet)
def registerAdminNode(self, conn, packet, uuid, server):
""" Register the connection's peer as an admin node """
from neo.master.administration import AdministrationEventHandler
from neo.node import AdminNode
node = self.app.nm.getNodeByUUID(uuid)
if node is None:
uuid = self.app.getNewUUID(protocol.ADMIN_NODE_TYPE)
self.app.nm.add(AdminNode(uuid=uuid, server=server))
conn.setUUID(uuid)
conn.setHandler(AdministrationEventHandler(self.app))
logging.info('Register admin node %s' % util.dump(uuid))
self.acceptNodeIdentification(conn, packet, uuid)
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
raise NotImplementedError('this method must be overridden')
......
......@@ -68,7 +68,13 @@ class RecoveryEventHandler(MasterEventHandler):
ip_address, port, name):
self.checkClusterName(name)
app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
addr = (ip_address, port)
if node_type == ADMIN_NODE_TYPE:
self.registerAdminNode(conn, packet, uuid, addr)
return
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
logging.info('reject a connection from a client')
raise protocol.NotReadyError
if node_type is STORAGE_NODE_TYPE and uuid is INVALID_UUID:
......@@ -85,7 +91,6 @@ class RecoveryEventHandler(MasterEventHandler):
# However, master nodes can be known only as the server addresses. And, a node
# may claim a server address used by another node.
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if not app.isValidUUID(uuid, addr):
# Here we have an UUID conflict, assume that's a new node
......@@ -104,8 +109,6 @@ class RecoveryEventHandler(MasterEventHandler):
# connected to me.
if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid)
elif node_type == ADMIN_NODE_TYPE:
node = AdminNode(uuid = uuid)
else:
node = StorageNode(server = addr, uuid = uuid)
app.nm.add(node)
......@@ -160,10 +163,7 @@ class RecoveryEventHandler(MasterEventHandler):
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
conn.answer(p, packet)
self.acceptNodeIdentification(conn, packet, uuid)
if node_type is STORAGE_NODE_TYPE:
# ask the last IDs.
......
......@@ -89,9 +89,6 @@ class ServiceEventHandler(MasterEventHandler):
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif node.getNodeType() == ADMIN_NODE_TYPE:
# If this node is an admin , just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE:
if not app.pt.operational():
# Catastrophic.
......@@ -121,9 +118,6 @@ class ServiceEventHandler(MasterEventHandler):
for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn:
del app.finishing_transaction_dict[tid]
elif node.getNodeType() == ADMIN_NODE_TYPE:
# If this node is an admin , just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE:
cell_list = app.pt.dropNode(node)
ptid = app.pt.setNextID()
......@@ -137,6 +131,12 @@ class ServiceEventHandler(MasterEventHandler):
uuid, ip_address, port, name):
self.checkClusterName(name)
app = self.app
addr = (ip_address, port)
if node_type == ADMIN_NODE_TYPE:
self.registerAdminNode(conn, packet, uuid, addr)
return
# Here are many situations. In principle, a node should be identified
# by an UUID, since an UUID never change when moving a storage node
# to a different server, and an UUID always changes for a master node
......@@ -145,7 +145,6 @@ class ServiceEventHandler(MasterEventHandler):
#
# However, master nodes can be known only as the server addresses.
# And, a node may claim a server address used by another node.
addr = (ip_address, port)
# First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid)
if node is not None and node.getServer() != addr:
......@@ -168,8 +167,6 @@ class ServiceEventHandler(MasterEventHandler):
node = MasterNode(server = addr, uuid = uuid)
elif node_type == CLIENT_NODE_TYPE:
node = ClientNode(uuid = uuid)
elif node_type == ADMIN_NODE_TYPE:
node = AdminNode(uuid = uuid)
else:
node = StorageNode(server = addr, uuid = uuid)
if ENABLE_PENDING_NODES:
......@@ -265,11 +262,7 @@ class ServiceEventHandler(MasterEventHandler):
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
self.acceptNodeIdentification(conn, packet, uuid)
@decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet):
......@@ -532,105 +525,3 @@ class ServiceEventHandler(MasterEventHandler):
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, new_cell_list)
@decorators.identification_required
@decorators.restrict_node_types(ADMIN_NODE_TYPE)
def handleSetNodeState(self, conn, packet, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s : %s" %(dump(uuid), state, modify_partition_table))
app = self.app
if uuid == app.uuid:
# get message for self
if state == RUNNING_STATE:
# yes I know
p = protocol.answerNodeState(app.uuid, state)
conn.answer(p, packet)
return
else:
# I was asked to shutdown
node.setState(state)
ip, port = node.getServer()
node_list = [(node.getNodeType(), ip, port, node.getUUID(), node.getState()),]
conn.answer(protocol.notifyNodeInformation(node_list), packet)
app.shutdown()
node = app.nm.getNodeByUUID(uuid)
if node is None:
p = protocol.protocolError('invalid uuid')
conn.notify(p)
return
if node.getState() == state:
# no change, just notify admin node
node.setState(state)
ip, port = node.getServer()
node_list = [(node.getNodeType(), ip, port, node.getUUID(), node.getState()),]
conn.answer(protocol.notifyNodeInformation(node_list), packet)
# forward information to all nodes
if node.getState() != state:
node.setState(state)
ip, port = node.getServer()
node_list = [(node.getNodeType(), ip, port, node.getUUID(), node.getState()),]
conn.answer(protocol.notifyNodeInformation(node_list), packet)
app.broadcastNodeInformation(node)
# If this is a storage node, ask it to start.
if node.getNodeType() == STORAGE_NODE_TYPE and state == RUNNING_STATE:
for sn_conn in app.em.getConnectionList():
if sn_conn.getUUID() == node.getUUID():
logging.info("asking sn to start operation")
sn_conn.notify(protocol.startOperation())
# modify the partition table if required
if modify_partition_table and node.getNodeType() == STORAGE_NODE_TYPE:
if state in (DOWN_STATE, TEMPORARILY_DOWN_STATE, HIDDEN_STATE):
# remove from pt
cell_list = app.pt.dropNode(node)
else:
# add to pt
cell_list = app.pt.addNode(node)
if len(cell_list) != 0:
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
else:
# outdate node in partition table
cell_list = app.pt.outdate()
if len(cell_list) != 0:
ptid = app.pt.setNextID()
app.broadcastPartitionChanges(ptid, cell_list)
@decorators.identification_required
@decorators.restrict_node_types(ADMIN_NODE_TYPE)
def handleAddPendingNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list])
logging.debug('Add nodes %s' % uuids)
app, nm, em, pt = self.app, self.app.nm, self.app.em, self.app.pt
cell_list = []
uuid_set = set()
# take all pending nodes
for node in nm.getStorageNodeList():
if node.getState() == PENDING_STATE:
uuid_set.add(node.getUUID())
# keep only selected nodes
if uuid_list:
uuid_set = uuid_set.intersection(set(uuid_list))
# nothing to do
if not uuid_set:
logging.warning('No nodes added')
conn.answer(protocol.answerNewNodes(()), packet)
return
uuids = ', '.join([dump(uuid) for uuid in uuid_set])
logging.info('Adding nodes %s' % uuids)
# switch nodes to running state
for uuid in uuid_set:
node = nm.getNodeByUUID(uuid)
new_cells = pt.addNode(node)
cell_list.extend(new_cells)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
# start nodes
for s_conn in em.getConnectionList():
if s_conn.getUUID() in uuid_set:
s_conn.notify(protocol.startOperation())
# broadcast the new partition table
app.broadcastPartitionChanges(app.pt.setNextID(), cell_list)
conn.answer(protocol.answerNewNodes(list(uuid_set)), packet)
......@@ -39,7 +39,7 @@ class VerificationEventHandler(MasterEventHandler):
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if node.getNodeType() in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
if node.getNodeType() in (CLIENT_NODE_TYPE):
# If this node is a client, just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE:
......@@ -56,7 +56,7 @@ class VerificationEventHandler(MasterEventHandler):
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
if node.getNodeType() in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
if node.getNodeType() in (CLIENT_NODE_TYPE):
# If this node is a client, just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE:
......@@ -73,7 +73,7 @@ class VerificationEventHandler(MasterEventHandler):
if node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
if node.getNodeType() in (CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
if node.getNodeType() in (CLIENT_NODE_TYPE):
# If this node is a client, just forget it.
app.nm.remove(node)
elif node.getNodeType() == STORAGE_NODE_TYPE:
......@@ -92,9 +92,15 @@ class VerificationEventHandler(MasterEventHandler):
uuid, ip_address, port, name):
self.checkClusterName(name)
app = self.app
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
addr = (ip_address, port)
if node_type == ADMIN_NODE_TYPE:
self.registerAdminNode(conn, packet, uuid, addr)
return
if node_type not in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE):
logging.info('reject a connection from a client')
raise protocol.NotReadyError
# Here are many situations. In principle, a node should be identified by
# an UUID, since an UUID never change when moving a storage node to a different
# server, and an UUID always changes for a master node and a client node whenever
......@@ -103,7 +109,6 @@ class VerificationEventHandler(MasterEventHandler):
# However, master nodes can be known only as the server addresses. And, a node
# may claim a server address used by another node.
addr = (ip_address, port)
if not app.isValidUUID(uuid, addr):
# Here we have an UUID conflict, assume that's a new node
node = None
......@@ -121,8 +126,6 @@ class VerificationEventHandler(MasterEventHandler):
# connected to me.
if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid)
elif node_type == ADMIN_NODE_TYPE:
node = AdminNode(uuid = uuid)
else:
# empty storage nodes starts in PENDING state
node = StorageNode(server = addr, uuid = uuid)
......@@ -183,11 +186,7 @@ class VerificationEventHandler(MasterEventHandler):
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.pt.getPartitions(), app.pt.getReplicas(), uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
self.acceptNodeIdentification(conn, packet, uuid)
@decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet):
......
......@@ -155,7 +155,8 @@ class StorageEventHandler(EventHandler):
self.app.shutdown()
elif state == HIDDEN_STATE:
n = app.nm.getNodeByUUID(uuid)
n.setState(state)
if n is not None:
n.setState(state)
raise OperationFailure
if n is None:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment