Commit 2a9f2764 authored by Aurel's avatar Aurel

split handler for master node messages :

- one for bootstrap
- one for monitoring
- one for answer to request
add a dispatcher class to manage messages


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@690 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent c4923901
...@@ -28,8 +28,28 @@ from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode ...@@ -28,8 +28,28 @@ from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode
from neo.event import EventManager from neo.event import EventManager
from neo.connection import ListeningConnection, ClientConnection from neo.connection import ListeningConnection, ClientConnection
from neo.exception import OperationFailure, PrimaryFailure from neo.exception import OperationFailure, PrimaryFailure
from neo.admin.handler import MonitoringEventHandler, AdminEventHandler from neo.admin.handler import MasterMonitoringEventHandler, AdminEventHandler, \
MasterBootstrapEventHandler, MasterRequestEventHandler, MasterEventHandler
from neo.connector import getConnectorHandler from neo.connector import getConnectorHandler
from neo import protocol
class Dispatcher:
"""Dispatcher use to redirect master request to handler"""
def __init__(self):
# associate conn/message_id to dispatch
# message to connection
self.message_table = {}
def register(self, msg_id, conn):
self.message_table[msg_id] = conn
def retrieve(self, msg_id):
return self.message_table.pop(msg_id, None)
def registered(self, msg_id):
return self.message_table.has_key(msg_id)
class Application(object): class Application(object):
"""The storage node application.""" """The storage node application."""
...@@ -58,8 +78,10 @@ class Application(object): ...@@ -58,8 +78,10 @@ class Application(object):
self.uuid = INVALID_UUID self.uuid = INVALID_UUID
self.primary_master_node = None self.primary_master_node = None
self.ptid = INVALID_PTID self.ptid = INVALID_PTID
self.monitoring_handler = MasterMonitoringEventHandler(self)
self.request_handler = MasterRequestEventHandler(self)
self.dispatcher = Dispatcher()
def run(self): def run(self):
"""Make sure that the status is sane and start a loop.""" """Make sure that the status is sane and start a loop."""
if self.num_partitions is not None and self.num_partitions <= 0: if self.num_partitions is not None and self.num_partitions <= 0:
...@@ -100,7 +122,7 @@ class Application(object): ...@@ -100,7 +122,7 @@ class Application(object):
at this stage.""" at this stage."""
logging.info('connecting to a primary master node') logging.info('connecting to a primary master node')
handler = MonitoringEventHandler(self) handler = MasterBootstrapEventHandler(self)
em = self.em em = self.em
nm = self.nm nm = self.nm
...@@ -150,3 +172,28 @@ class Application(object): ...@@ -150,3 +172,28 @@ class Application(object):
connector_handler = self.connector_handler) connector_handler = self.connector_handler)
t = time() t = time()
def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
# we have a pt
self.pt.log()
row_list = []
if max_offset == 0:
max_offset = self.num_partitions
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in self.pt.getCellList(offset):
if uuid != INVALID_UUID and cell.getUUID() != uuid:
continue
else:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row))
except IndexError:
p = protocot.protocolError('invalid partition table offset')
conn.notify(p)
return
print "sending packet", len(row_list)
p = protocol.answerPartitionList(self.ptid, row_list)
conn.notify(p)
...@@ -28,7 +28,7 @@ from neo.protocol import Packet, UnexpectedPacketError ...@@ -28,7 +28,7 @@ from neo.protocol import Packet, UnexpectedPacketError
from neo.pt import PartitionTable from neo.pt import PartitionTable
from neo.exception import PrimaryFailure from neo.exception import PrimaryFailure
from neo.util import dump from neo.util import dump
from neo import decorators from neo import decorators, handler
class BaseEventHandler(EventHandler): class BaseEventHandler(EventHandler):
...@@ -53,34 +53,12 @@ class AdminEventHandler(BaseEventHandler): ...@@ -53,34 +53,12 @@ class AdminEventHandler(BaseEventHandler):
if len(app.pt.getNodeList()) == 0: if len(app.pt.getNodeList()) == 0:
master_conn = self.app.master_conn master_conn = self.app.master_conn
p = protocol.askPartitionTable([x for x in xrange(app.num_partitions)]) p = protocol.askPartitionTable([x for x in xrange(app.num_partitions)])
master_conn.ask(p) msg_id = master_conn.ask(p)
self.app.pt = None app.dispatcher.register(msg_id, conn, {'min_offset' : min_offset,
while self.app.pt is None: 'max_offset' : max_offset,
self.app.em.poll(1) 'uuid' : uuid})
# we have a pt else:
app.pt.log() app.sendPartitionTable(conn, min_offset, max_offset, uuid)
row_list = []
if max_offset == 0:
max_offset = app.num_partitions
try:
for offset in xrange(min_offset, max_offset):
row = []
try:
for cell in app.pt.getCellList(offset):
if uuid != INVALID_UUID and cell.getUUID() != uuid:
continue
else:
row.append((cell.getUUID(), cell.getState()))
except TypeError:
pass
row_list.append((offset, row))
except IndexError:
p = protocot.protocolError('invalid partition table offset')
conn.notify(p)
return
print "sending packet", len(row_list)
p = protocol.answerPartitionList(app.ptid, row_list)
conn.notify(p)
def handleAskNodeList(self, conn, packet, node_type): def handleAskNodeList(self, conn, packet, node_type):
...@@ -97,7 +75,7 @@ class AdminEventHandler(BaseEventHandler): ...@@ -97,7 +75,7 @@ class AdminEventHandler(BaseEventHandler):
port = 0 port = 0
node_information_list.append((node.getNodeType(), ip, port, node.getUUID(), node.getState())) node_information_list.append((node.getNodeType(), ip, port, node.getUUID(), node.getState()))
p = protocol.answerNodeList(node_information_list) p = protocol.answerNodeList(node_information_list)
conn.ask(p) conn.answer(p, packet)
def handleSetNodeState(self, conn, packet, uuid, state, modify_partition_table): def handleSetNodeState(self, conn, packet, uuid, state, modify_partition_table):
logging.info("set node state for %s-%s" %(dump(uuid), state)) logging.info("set node state for %s-%s" %(dump(uuid), state))
...@@ -127,12 +105,9 @@ class AdminEventHandler(BaseEventHandler): ...@@ -127,12 +105,9 @@ class AdminEventHandler(BaseEventHandler):
# forward to primary # forward to primary
master_conn = self.app.master_conn master_conn = self.app.master_conn
p = protocol.setClusterState(name, state) p = protocol.setClusterState(name, state)
master_conn.ask(p) msg_id = master_conn.ask(p)
self.app.cluster_state = None self.app.dispatcher.register(msg_id, conn)
while self.app.cluster_state is None:
self.app.em.poll(1)
conn.answer(protocol.answerClusterState(self.app.cluster_state), packet)
def handleAddPendingNodes(self, conn, packet, uuid_list): def handleAddPendingNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list]) uuids = ', '.join([dump(uuid) for uuid in uuid_list])
logging.info('Add nodes %s' % uuids) logging.info('Add nodes %s' % uuids)
...@@ -146,16 +121,28 @@ class AdminEventHandler(BaseEventHandler): ...@@ -146,16 +121,28 @@ class AdminEventHandler(BaseEventHandler):
self.app.em.poll(1) self.app.em.poll(1)
# forward the answer to neoctl # forward the answer to neoctl
uuid_list = self.app.uuid_list uuid_list = self.app.uuid_list
conn.answer(protocol.answerNewNodes(uuid_list), packet)
class MonitoringEventHandler(BaseEventHandler):
"""This class deals with events for monitoring cluster."""
class MasterEventHandler(BaseEventHandler):
""" This class is just used to dispacth message to right handler"""
def dispatch(self, conn, packet):
if self.app.dispatcher.registered(packet.getId()):
# answer to a request
self.app.request_handler.dispatch(conn, packet)
else:
# monitoring phase
self.app.monitoring_handler.dispatch(conn, packet)
class MasterBaseEventHandler(BaseEventHandler):
""" This is the base class for connection to primary master node"""
def connectionAccepted(self, conn, s, addr): def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted.""" """Called when a connection is accepted."""
raise UnexpectedPacketError raise UnexpectedPacketError
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
app = self.app app = self.app
if app.trying_master_node is None: if app.trying_master_node is None:
...@@ -166,6 +153,7 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -166,6 +153,7 @@ class MonitoringEventHandler(BaseEventHandler):
conn.ask(protocol.askPrimaryMaster()) conn.ask(protocol.askPrimaryMaster())
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
app = self.app app = self.app
...@@ -230,13 +218,104 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -230,13 +218,104 @@ class MonitoringEventHandler(BaseEventHandler):
EventHandler.peerBroken(self, conn) EventHandler.peerBroken(self, conn)
@decorators.identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list):
logging.info("handleNotifyNodeInformation")
uuid = conn.getUUID()
app = self.app
nm = app.nm
node = nm.getNodeByUUID(uuid)
# This must be sent only by a primary master node.
# Note that this may be sent before I know that it is
# a primary master node.
if node.getNodeType() != MASTER_NODE_TYPE:
logging.warn('ignoring notify node information from %s',
dump(uuid))
return
for node_type, ip_address, port, uuid, state in node_list:
# Register/update nodes.
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid != INVALID_UUID:
n = nm.getNodeByUUID(uuid)
if n is None:
n = nm.getNodeByServer(addr)
if n is not None and uuid != INVALID_UUID:
# node only exists by address, remove it
nm.remove(n)
n = None
elif n.getServer() != addr:
# same uuid but different address, remove it
nm.remove(n)
n = None
if node_type == MASTER_NODE_TYPE:
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
else:
n.setUUID(INVALID_UUID)
elif node_type in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
if uuid == INVALID_UUID:
# No interest.
continue
if n is None:
if node_type == STORAGE_NODE_TYPE:
n = StorageNode(server = addr, uuid = uuid)
elif node_type == CLIENT_NODE_TYPE:
n = ClientNode(server = addr, uuid = uuid)
elif node_type == ADMIN_NODE_TYPE:
n = AdminNode(server = addr, uuid = uuid)
nm.add(n)
else:
logging.warning("unknown node type %s" %(node_type))
continue
n.setState(state)
self.app.notified = True
class MasterRequestEventHandler(MasterBaseEventHandler):
""" This class handle all answer from primary master node"""
def handleAnswerClusterState(self, conn, packet, state):
logging.info("handleAnswerClusterState for a conn")
client_conn, kw = self.app.retrieve(packet.getId())
conn.answer(protocol.answerClusterState(state), packet)
def handleAnswerNewNodes(self, conn, packet, uuid_list):
logging.info("handleAnswerNewNodes for a conn")
client_conn, kw = self.app.retrieve(packet.getId())
conn.answer(protocol.answerNewNodes(uuid_list), packet)
@decorators.identification_required
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
logging.info("handleAnswerPartitionTable for a conn")
client_conn, kw = self.app.retrieve(packet.getId())
# sent client the partition table
self.app.sendPartitionTable(client_conn, **kw)
class MasterBootstrapEventHandler(MasterBaseEventHandler):
"""This class manage the bootstrap part to the primary master node"""
def handleNotReady(self, conn, packet, message): def handleNotReady(self, conn, packet, message):
app = self.app app = self.app
if app.trying_master_node is not None: if app.trying_master_node is not None:
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, uuid, ip_address, port,
num_partitions, num_replicas, your_uuid): num_partitions, num_replicas, your_uuid):
...@@ -276,6 +355,8 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -276,6 +355,8 @@ class MonitoringEventHandler(BaseEventHandler):
conn.ask(protocol.askNodeInformation()) conn.ask(protocol.askNodeInformation())
conn.ask(protocol.askPartitionTable([])) conn.ask(protocol.askPartitionTable([]))
logging.info("changing handler for master conn")
conn.setHandler(MasterEventHandler(self.app))
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list): known_master_list):
...@@ -324,36 +405,18 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -324,36 +405,18 @@ class MonitoringEventHandler(BaseEventHandler):
p = protocol.requestNodeIdentification(ADMIN_NODE_TYPE, p = protocol.requestNodeIdentification(ADMIN_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.name) app.uuid, app.server[0], app.server[1], app.name)
conn.ask(p) conn.ask(p)
@decorators.identification_required
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
logging.warning("handleSendPartitionTable")
uuid = conn.getUUID()
app = self.app
nm = app.nm
pt = app.pt
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
pt.log() class MasterMonitoringEventHandler(MasterBaseEventHandler):
"""This class deals with events for monitoring cluster."""
@decorators.identification_required
def handleAnswerNodeInformation(self, conn, packet, node_list):
logging.info("handleAnswerNodeInformation")
@decorators.identification_required @decorators.identification_required
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list): def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
logging.warning("handleAnswerPartitionTable") logging.info("handleAnswerPartitionTable")
@decorators.identification_required @decorators.identification_required
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
...@@ -384,79 +447,31 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -384,79 +447,31 @@ class MonitoringEventHandler(BaseEventHandler):
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
pt.log() pt.log()
@decorators.identification_required @decorators.identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleSendPartitionTable(self, conn, packet, ptid, row_list):
logging.info("handleNotifyNodeInformation") logging.warning("handleSendPartitionTable")
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
nm = app.nm nm = app.nm
node = nm.getNodeByUUID(uuid) pt = app.pt
# This must be sent only by a primary master node. node = app.nm.getNodeByUUID(uuid)
# Note that this may be sent before I know that it is # This must be sent only by primary master node
# a primary master node.
if node.getNodeType() != MASTER_NODE_TYPE: if node.getNodeType() != MASTER_NODE_TYPE:
logging.warn('ignoring notify node information from %s',
dump(uuid))
return return
for node_type, ip_address, port, uuid, state in node_list:
# Register/update nodes.
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid != INVALID_UUID:
n = nm.getNodeByUUID(uuid)
if n is None:
n = nm.getNodeByServer(addr)
if n is not None and uuid != INVALID_UUID:
# node only exists by address, remove it
nm.remove(n)
n = None
elif n.getServer() != addr:
# same uuid but different address, remove it
nm.remove(n)
n = None
if node_type == MASTER_NODE_TYPE:
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
else:
n.setUUID(INVALID_UUID)
elif node_type in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
if uuid == INVALID_UUID:
# No interest.
continue
if n is None:
if node_type == STORAGE_NODE_TYPE:
n = StorageNode(server = addr, uuid = uuid)
elif node_type == CLIENT_NODE_TYPE:
n = ClientNode(server = addr, uuid = uuid)
elif node_type == ADMIN_NODE_TYPE:
n = AdminNode(server = addr, uuid = uuid)
nm.add(n)
else:
logging.warning("unknown node type %s" %(node_type))
continue
n.setState(state)
self.app.notified = True
@decorators.identification_required if app.ptid != ptid:
def handleAnswerNodeInformation(self, conn, packet, node_list): app.ptid = ptid
logging.info("handleAnswerNodeInformation") pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
def handleAnswerClusterState(self, conn, packet, state): pt.log()
self.app.cluster_state = state
def handleAnswerNewNodes(self, conn, packet, uuid_list):
self.app.uuid_list = uuid_list
self.app.nn_notified = True
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment