Commit 9ed6ed06 authored by Aurel's avatar Aurel

handle new messages type

clear pt when we get disconnected from PMN


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@534 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 36879f32
...@@ -23,7 +23,7 @@ from collections import deque ...@@ -23,7 +23,7 @@ from collections import deque
from neo.config import ConfigurationManager from neo.config import ConfigurationManager
from neo.protocol import TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \ from neo.protocol import TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE, \
INVALID_UUID, INVALID_PTID, partition_cell_states INVALID_UUID, INVALID_PTID, partition_cell_states, MASTER_NODE_TYPE
from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode
from neo.event import EventManager from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection from neo.connection import ListeningConnection, ClientConnection
...@@ -85,6 +85,10 @@ class Application(object): ...@@ -85,6 +85,10 @@ class Application(object):
self.em.poll(1) self.em.poll(1)
except PrimaryFailure: except PrimaryFailure:
logging.error('primary master is down') logging.error('primary master is down')
# do not trust any longer our informations
self.pt.clear()
self.nm.clear(filter = lambda node: node.getNodeType() != MASTER_NODE_TYPE)
def connectToPrimaryMaster(self): def connectToPrimaryMaster(self):
"""Find a primary master node, and connect to it. """Find a primary master node, and connect to it.
...@@ -108,6 +112,7 @@ class Application(object): ...@@ -108,6 +112,7 @@ class Application(object):
index = 0 index = 0
self.trying_master_node = None self.trying_master_node = None
self.primary_master_node = None self.primary_master_node = None
self.master_conn = None
t = 0 t = 0
while 1: while 1:
em.poll(1) em.poll(1)
...@@ -121,6 +126,7 @@ class Application(object): ...@@ -121,6 +126,7 @@ class Application(object):
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
if node is self.primary_master_node: if node is self.primary_master_node:
logging.info("connected to primary master node %s:%d" % node.getServer()) logging.info("connected to primary master node %s:%d" % node.getServer())
self.master_conn = conn
# Yes, I have. # Yes, I have.
return return
......
...@@ -20,7 +20,7 @@ import logging ...@@ -20,7 +20,7 @@ import logging
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \ from neo.protocol import INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
ADMIN_NODE_TYPE ADMIN_NODE_TYPE, DISCARDED_STATE
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import MasterNode, StorageNode, ClientNode
from neo.connection import ClientConnection from neo.connection import ClientConnection
from neo import protocol from neo import protocol
...@@ -30,6 +30,7 @@ from neo.exception import PrimaryFailure ...@@ -30,6 +30,7 @@ from neo.exception import PrimaryFailure
from neo.util import dump from neo.util import dump
from neo.handler import identification_required, restrict_node_types from neo.handler import identification_required, restrict_node_types
class BaseEventHandler(EventHandler): class BaseEventHandler(EventHandler):
""" Base handler for admin node """ """ Base handler for admin node """
...@@ -43,7 +44,67 @@ class AdminEventHandler(BaseEventHandler): ...@@ -43,7 +44,67 @@ class AdminEventHandler(BaseEventHandler):
def connectionAccepted(self, conn, s, addr): def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted.""" """Called when a connection is accepted."""
# we only accept connection from command tool # we only accept connection from command tool
logging.info("accepted a connection from %s" %(addr,)) BaseEventHandler.connectionAccepted(self, conn, s, addr)
def handleAskPartitionList(self, conn, packet, min_offset, max_offset, uuid):
logging.info("ask partition list from %s to %s for %s" %(min_offset, max_offset, dump(uuid)))
app = self.app
app.pt.log()
row_list = []
if max_offset == 0:
max_offset = app.num_partitions
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in app.pt.getCellList(offset):
if uuid != INVALID_UUID and cell.getUUID() != uuid:
continue
else:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row))
except IndexError:
p = protocot.protocolError('invalid partition table offset')
conn.notify(p)
return
print "sending packet", len(row_list)
p = protocol.answerPartitionList(app.ptid, row_list)
conn.notify(p)
def handleAskNodeList(self, conn, packet, node_type):
logging.info("ask node list for %s" %(node_type))
def node_filter(n):
return n.getNodeType() is node_type
node_list = self.app.nm.getNodeList(node_filter)
node_information_list = []
for node in node_list:
ip, port = node.getServer()
node_information_list.append((node.getNodeType(), ip, port, node.getUUID(), node.getState()))
p = protocol.answerNodeList(node_information_list)
conn.ask(p)
def handleSetNodeState(self, conn, packet, uuid, state):
logging.info("set node state for %s-%s" %(dump(uuid), state))
node = self.app.nm.getNodeByUUID(uuid)
if node is None:
p = protocol.protocolError('invalid uuid')
conn.notify(p)
# send information to master node
master_conn = self.app.master_conn
msg_id = master_conn.getNextId()
ip, port = node.getServer()
node_list = [(node.getNodeType(), ip, port, uuid, state),]
p = protocol.notifyNodeInformation(node_list)
master_conn.ask(p)
self.app.notified = False
while not self.app.notified:
self.app.em.poll(1)
node = self.app.nm.getNodeByUUID(uuid)
p = protocol.answerNodeState(node.getUUID(), node.getState())
conn.answer(p, packet)
class MonitoringEventHandler(BaseEventHandler): class MonitoringEventHandler(BaseEventHandler):
"""This class deals with events for monitoring cluster.""" """This class deals with events for monitoring cluster."""
...@@ -241,6 +302,7 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -241,6 +302,7 @@ class MonitoringEventHandler(BaseEventHandler):
nm.add(node) nm.add(node)
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
pt.log()
@identification_required @identification_required
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
...@@ -268,8 +330,11 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -268,8 +330,11 @@ class MonitoringEventHandler(BaseEventHandler):
if uuid != app.uuid: if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE) node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node) nm.add(node)
# if state == DISCARDED_STATE:
# pt.dropNode(node)
# else:
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
pt.log()
@identification_required @identification_required
...@@ -314,4 +379,5 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -314,4 +379,5 @@ class MonitoringEventHandler(BaseEventHandler):
continue continue
n.setState(state) n.setState(state)
self.app.notified = True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment