Commit 3a8db361 authored by Aurel's avatar Aurel

add more messages taken into account in the handler

add missing variable and defined the handler for the listening
connection



git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@436 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 3d18da7f
...@@ -57,6 +57,7 @@ class Application(object): ...@@ -57,6 +57,7 @@ class Application(object):
self.pt = None self.pt = None
self.uuid = INVALID_UUID self.uuid = INVALID_UUID
self.primary_master_node = None self.primary_master_node = None
self.ptid = INVALID_PTID
def run(self): def run(self):
...@@ -70,7 +71,8 @@ class Application(object): ...@@ -70,7 +71,8 @@ class Application(object):
self.nm.add(MasterNode(server = server)) self.nm.add(MasterNode(server = server))
# Make a listening port. # Make a listening port.
ListeningConnection(self.em, None, addr = self.server, handler = AdminEventHandler(self)
ListeningConnection(self.em, handler, addr = self.server,
connector_handler = self.connector_handler) connector_handler = self.connector_handler)
# Connect to a primary master node, verify data, and # Connect to a primary master node, verify data, and
......
...@@ -31,28 +31,24 @@ from neo.util import dump ...@@ -31,28 +31,24 @@ from neo.util import dump
class BaseEventHandler(EventHandler): class BaseEventHandler(EventHandler):
""" Base handler for admin node """ """ Base handler for admin node """
def connectionAccepted(self, conn, s, addr): def __init__(self, app):
"""Called when a connection is accepted.""" self.app = app
# we only accept connection from command tool EventHandler.__init__(self)
logging.info("accepted a connection from %s:%d" %(conn.getAddress(),))
if conn.isServerConnection():
conn.setHandler(AdminEventHandler)
else:
# XXX why do we get there ?
self.handleUnexpectedPacket(conn, packet)
class AdminEventHandler(EventHandler): class AdminEventHandler(BaseEventHandler):
"""This class deals with events for administrating cluster.""" """This class deals with events for administrating cluster."""
pass
def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted."""
# we only accept connection from command tool
logging.info("accepted a connection from %s" %(addr,))
class MonitoringEventHandler(EventHandler): class MonitoringEventHandler(BaseEventHandler):
"""This class deals with events for monitoring cluster.""" """This class deals with events for monitoring cluster."""
def __init__(self, app): def connectionAccepted(self, conn, s, addr):
self.app = app """Called when a connection is accepted."""
EventHandler.__init__(self) self.handleUnexpectedPacket(conn, packet)
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
app = self.app app = self.app
...@@ -226,12 +222,108 @@ class MonitoringEventHandler(EventHandler): ...@@ -226,12 +222,108 @@ class MonitoringEventHandler(EventHandler):
def handleSendPartitionTable(self, conn, packet, ptid, row_list): def handleSendPartitionTable(self, conn, packet, ptid, row_list):
logging.warning("handleSendPartitionTable") logging.warning("handleSendPartitionTable")
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
nm = app.nm
pt = app.pt
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
logging.warning("handleNotifyPartitionChanges") logging.warning("handleNotifyPartitionChanges")
app = self.app
nm = app.nm
pt = app.pt
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE \
or app.primary_master_node is None \
or app.primary_master_node.getUUID() != uuid:
return
if app.ptid >= ptid:
# Ignore this packet.
return
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
logging.warning("handleNotifyNodeInformation") logging.warning("handleNotifyNodeInformation")
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
nm = app.nm
node = nm.getNodeByUUID(uuid)
# This must be sent only by a primary master node.
# Note that this may be sent before I know that it is
# a primary master node.
if node.getNodeType() != MASTER_NODE_TYPE:
logging.warn('ignoring notify node information from %s',
dump(uuid))
return
for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes.
addr = (ip_address, port)
if node_type == MASTER_NODE_TYPE:
n = nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
n = nm.getNodeByUUID(uuid)
if n is None:
n = StorageNode(server = addr, uuid = uuid)
nm.add(n)
else:
n.setServer(addr)
elif node_type == CLIENT_NODE_TYPE:
continue
n.setState(state)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment