Commit 4f2ef12a authored by Aurel's avatar Aurel

remove all polling from handler

pass msg_id to register so that we can answer to neoctl request with
right id


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@692 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 6ba61bca
...@@ -41,8 +41,8 @@ class Dispatcher: ...@@ -41,8 +41,8 @@ class Dispatcher:
# message to connection # message to connection
self.message_table = {} self.message_table = {}
def register(self, msg_id, conn): def register(self, msg_id, conn, kw=None):
self.message_table[msg_id] = conn self.message_table[msg_id] = conn, kw
def retrieve(self, msg_id): def retrieve(self, msg_id):
return self.message_table.pop(msg_id, None) return self.message_table.pop(msg_id, None)
...@@ -172,7 +172,7 @@ class Application(object): ...@@ -172,7 +172,7 @@ class Application(object):
connector_handler = self.connector_handler) connector_handler = self.connector_handler)
t = time() t = time()
def sendPartitionTable(self, conn, min_offset, max_offset, uuid): def sendPartitionTable(self, conn, min_offset, max_offset, uuid, msg_id):
# we have a pt # we have a pt
self.pt.log() self.pt.log()
row_list = [] row_list = []
...@@ -196,4 +196,4 @@ class Application(object): ...@@ -196,4 +196,4 @@ class Application(object):
return return
print "sending packet", len(row_list) print "sending packet", len(row_list)
p = protocol.answerPartitionList(self.ptid, row_list) p = protocol.answerPartitionList(self.ptid, row_list)
conn.notify(p) conn.notify(p, msg_id)
...@@ -52,13 +52,14 @@ class AdminEventHandler(BaseEventHandler): ...@@ -52,13 +52,14 @@ class AdminEventHandler(BaseEventHandler):
# check we have one pt otherwise ask it to PMN # check we have one pt otherwise ask it to PMN
if len(app.pt.getNodeList()) == 0: if len(app.pt.getNodeList()) == 0:
master_conn = self.app.master_conn master_conn = self.app.master_conn
p = protocol.askPartitionTable([x for x in xrange(app.num_partitions)]) p = protocol.askPartitionTable([])
msg_id = master_conn.ask(p) msg_id = master_conn.ask(p)
app.dispatcher.register(msg_id, conn, {'min_offset' : min_offset, app.dispatcher.register(msg_id, conn, {'min_offset' : min_offset,
'max_offset' : max_offset, 'max_offset' : max_offset,
'uuid' : uuid}) 'uuid' : uuid,
'msg_id' : packet.getId()})
else: else:
app.sendPartitionTable(conn, min_offset, max_offset, uuid) app.sendPartitionTable(conn, min_offset, max_offset, uuid, packet.getId())
def handleAskNodeList(self, conn, packet, node_type): def handleAskNodeList(self, conn, packet, node_type):
...@@ -92,13 +93,8 @@ class AdminEventHandler(BaseEventHandler): ...@@ -92,13 +93,8 @@ class AdminEventHandler(BaseEventHandler):
# forward to primary master node # forward to primary master node
master_conn = self.app.master_conn master_conn = self.app.master_conn
p = protocol.setNodeState(uuid, state, modify_partition_table) p = protocol.setNodeState(uuid, state, modify_partition_table)
master_conn.ask(p) msg_id = master_conn.ask(p)
self.app.notified = False self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
while not self.app.notified:
self.app.em.poll(1)
node = self.app.nm.getNodeByUUID(uuid)
p = protocol.answerNodeState(node.getUUID(), node.getState())
conn.answer(p, packet)
def handleSetClusterState(self, conn, packet, name, state): def handleSetClusterState(self, conn, packet, name, state):
self.checkClusterName(name) self.checkClusterName(name)
...@@ -106,7 +102,7 @@ class AdminEventHandler(BaseEventHandler): ...@@ -106,7 +102,7 @@ class AdminEventHandler(BaseEventHandler):
master_conn = self.app.master_conn master_conn = self.app.master_conn
p = protocol.setClusterState(name, state) p = protocol.setClusterState(name, state)
msg_id = master_conn.ask(p) msg_id = master_conn.ask(p)
self.app.dispatcher.register(msg_id, conn) self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
def handleAddPendingNodes(self, conn, packet, uuid_list): def handleAddPendingNodes(self, conn, packet, uuid_list):
uuids = ', '.join([dump(uuid) for uuid in uuid_list]) uuids = ', '.join([dump(uuid) for uuid in uuid_list])
...@@ -115,14 +111,8 @@ class AdminEventHandler(BaseEventHandler): ...@@ -115,14 +111,8 @@ class AdminEventHandler(BaseEventHandler):
node = self.app.nm.getNodeByUUID(uuid) node = self.app.nm.getNodeByUUID(uuid)
# forward the request to primary # forward the request to primary
master_conn = self.app.master_conn master_conn = self.app.master_conn
master_conn.ask(protocol.addPendingNodes(uuid_list)) msg_id = master_conn.ask(protocol.addPendingNodes(uuid_list))
self.app.nn_notified = False self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
while not self.app.nn_notified:
self.app.em.poll(1)
# forward the answer to neoctl
uuid_list = self.app.uuid_list
class MasterEventHandler(BaseEventHandler): class MasterEventHandler(BaseEventHandler):
""" This class is just used to dispacth message to right handler""" """ This class is just used to dispacth message to right handler"""
...@@ -283,27 +273,31 @@ class MasterBaseEventHandler(BaseEventHandler): ...@@ -283,27 +273,31 @@ class MasterBaseEventHandler(BaseEventHandler):
self.app.notified = True self.app.notified = True
class MasterRequestEventHandler(MasterBaseEventHandler): class MasterRequestEventHandler(MasterBaseEventHandler):
""" This class handle all answer from primary master node""" """ This class handle all answer from primary master node"""
def handleAnswerClusterState(self, conn, packet, state): def handleAnswerClusterState(self, conn, packet, state):
logging.info("handleAnswerClusterState for a conn") logging.info("handleAnswerClusterState for a conn")
client_conn, kw = self.app.retrieve(packet.getId()) client_conn, kw = self.app.dispatcher.retrieve(packet.getId())
conn.answer(protocol.answerClusterState(state), packet) client_conn.notify(protocol.answerClusterState(state), kw['msg_id'])
def handleAnswerNewNodes(self, conn, packet, uuid_list): def handleAnswerNewNodes(self, conn, packet, uuid_list):
logging.info("handleAnswerNewNodes for a conn") logging.info("handleAnswerNewNodes for a conn")
client_conn, kw = self.app.retrieve(packet.getId()) client_conn, kw = self.app.dispatcher.retrieve(packet.getId())
conn.answer(protocol.answerNewNodes(uuid_list), packet) client_conn.notify(protocol.answerNewNodes(uuid_list), kw['msg_id'])
@decorators.identification_required @decorators.identification_required
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list): def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
logging.info("handleAnswerPartitionTable for a conn") logging.info("handleAnswerPartitionTable for a conn")
client_conn, kw = self.app.retrieve(packet.getId()) client_conn, kw = self.app.dispatcher.retrieve(packet.getId())
# sent client the partition table # sent client the partition table
self.app.sendPartitionTable(client_conn, **kw) self.app.sendPartitionTable(client_conn, **kw)
def handleAnswerNodeState(self, conn, packet, uuid, state):
client_conn, kw = self.app.dispatcher.retrieve(packet.getId())
p = protocol.answerNodeState(uuid, state)
client_conn.notify(p, kw['msg_id'])
class MasterBootstrapEventHandler(MasterBaseEventHandler): class MasterBootstrapEventHandler(MasterBaseEventHandler):
"""This class manage the bootstrap part to the primary master node""" """This class manage the bootstrap part to the primary master node"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment