diff --git a/neo/admin/app.py b/neo/admin/app.py index a18566d8e87c22cf53b29c02ea33c050015eaba8..6987baef1b8e14ac1c418dc9ce4958c755ceed0e 100644 --- a/neo/admin/app.py +++ b/neo/admin/app.py @@ -23,7 +23,7 @@ from collections import deque from neo.config import ConfigurationManager from neo.protocol import TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \ - INVALID_UUID, INVALID_PTID, partition_cell_states + INVALID_UUID, INVALID_PTID, partition_cell_states, MASTER_NODE_TYPE from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode from neo.event import EventManager from neo.connection import ListeningConnection, ClientConnection @@ -85,6 +85,10 @@ class Application(object): self.em.poll(1) except PrimaryFailure: logging.error('primary master is down') + # do not trust any longer our informations + self.pt.clear() + self.nm.clear(filter = lambda node: node.getNodeType() != MASTER_NODE_TYPE) + def connectToPrimaryMaster(self): """Find a primary master node, and connect to it. @@ -108,6 +112,7 @@ class Application(object): index = 0 self.trying_master_node = None self.primary_master_node = None + self.master_conn = None t = 0 while 1: em.poll(1) @@ -121,6 +126,7 @@ class Application(object): node = nm.getNodeByUUID(uuid) if node is self.primary_master_node: logging.info("connected to primary master node %s:%d" % node.getServer()) + self.master_conn = conn # Yes, I have. return diff --git a/neo/admin/handler.py b/neo/admin/handler.py index 42e8d305238359eb76780cc8440d29cbd654ff7b..66bf14d22da40cceaa7e84637805f4572881317d 100644 --- a/neo/admin/handler.py +++ b/neo/admin/handler.py @@ -20,7 +20,7 @@ import logging from neo.handler import EventHandler from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ - ADMIN_NODE_TYPE + ADMIN_NODE_TYPE, DISCARDED_STATE from neo.node import MasterNode, StorageNode, ClientNode from neo.connection import ClientConnection from neo import protocol @@ -30,6 +30,7 @@ from neo.exception import PrimaryFailure from neo.util import dump from neo.handler import identification_required, restrict_node_types + class BaseEventHandler(EventHandler): """ Base handler for admin node """ @@ -43,7 +44,67 @@ class AdminEventHandler(BaseEventHandler): def connectionAccepted(self, conn, s, addr): """Called when a connection is accepted.""" # we only accept connection from command tool - logging.info("accepted a connection from %s" %(addr,)) + BaseEventHandler.connectionAccepted(self, conn, s, addr) + + def handleAskPartitionList(self, conn, packet, min_offset, max_offset, uuid): + logging.info("ask partition list from %s to %s for %s" %(min_offset, max_offset, dump(uuid))) + app = self.app + app.pt.log() + row_list = [] + if max_offset == 0: + max_offset = app.num_partitions + try: + for offset in xrange(min_offset, max_offset): + row = [] + try: + for cell in app.pt.getCellList(offset): + if uuid != INVALID_UUID and cell.getUUID() != uuid: + continue + else: + row.append((cell.getUUID(), cell.getState())) + except TypeError: + pass + row_list.append((offset, row)) + except IndexError: + p = protocot.protocolError('invalid partition table offset') + conn.notify(p) + return + print "sending packet", len(row_list) + p = protocol.answerPartitionList(app.ptid, row_list) + conn.notify(p) + + + def handleAskNodeList(self, conn, packet, node_type): + logging.info("ask node list for %s" %(node_type)) + def node_filter(n): + return n.getNodeType() is node_type + node_list = self.app.nm.getNodeList(node_filter) + node_information_list = [] + for node in node_list: + ip, port = node.getServer() + node_information_list.append((node.getNodeType(), ip, port, node.getUUID(), node.getState())) + p = protocol.answerNodeList(node_information_list) + conn.ask(p) + + def handleSetNodeState(self, conn, packet, uuid, state): + logging.info("set node state for %s-%s" %(dump(uuid), state)) + node = self.app.nm.getNodeByUUID(uuid) + if node is None: + p = protocol.protocolError('invalid uuid') + conn.notify(p) + # send information to master node + master_conn = self.app.master_conn + msg_id = master_conn.getNextId() + ip, port = node.getServer() + node_list = [(node.getNodeType(), ip, port, uuid, state),] + p = protocol.notifyNodeInformation(node_list) + master_conn.ask(p) + self.app.notified = False + while not self.app.notified: + self.app.em.poll(1) + node = self.app.nm.getNodeByUUID(uuid) + p = protocol.answerNodeState(node.getUUID(), node.getState()) + conn.answer(p, packet) class MonitoringEventHandler(BaseEventHandler): """This class deals with events for monitoring cluster.""" @@ -241,6 +302,7 @@ class MonitoringEventHandler(BaseEventHandler): nm.add(node) pt.setCell(offset, node, state) + pt.log() @identification_required def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): @@ -268,8 +330,11 @@ class MonitoringEventHandler(BaseEventHandler): if uuid != app.uuid: node.setState(TEMPORARILY_DOWN_STATE) nm.add(node) - +# if state == DISCARDED_STATE: +# pt.dropNode(node) +# else: pt.setCell(offset, node, state) + pt.log() @identification_required @@ -314,4 +379,5 @@ class MonitoringEventHandler(BaseEventHandler): continue n.setState(state) + self.app.notified = True