Commit 1c9232ca authored by Aurel's avatar Aurel

use a queue to manage received message in order not to lose them


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@83 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent d23a31a3
from threading import Thread from threading import Thread
from Queue import Empty from Queue import Empty, Queue
from neo.protocol import PING, Packet from neo.protocol import PING, Packet
import logging
class Dispatcher(Thread): class Dispatcher(Thread):
"""Dispatcher class use to redirect request to thread.""" """Dispatcher class use to redirect request to thread."""
...@@ -11,6 +13,8 @@ class Dispatcher(Thread): ...@@ -11,6 +13,8 @@ class Dispatcher(Thread):
self._message_queue = message_queue self._message_queue = message_queue
self._request_queue = request_queue self._request_queue = request_queue
self.em = em self.em = em
# Queue of received packet that have to be processed
self.message = Queue()
# This dict is used to associate conn/message id to client thread queue # This dict is used to associate conn/message id to client thread queue
# and thus redispatch answer to the original thread # and thus redispatch answer to the original thread
self.message_table = {} self.message_table = {}
...@@ -18,25 +22,31 @@ class Dispatcher(Thread): ...@@ -18,25 +22,31 @@ class Dispatcher(Thread):
def run(self): def run(self):
while 1: while 1:
# First check if we receive any new message from other node # First check if we receive any new message from other node
self.message = None
m = None m = None
self.em.poll(1) try:
if self.message is not None: self.em.poll(1)
conn, packet = self.message except KeyError:
# now send message to waiting thread # This happen when there is no connection
logging.error('Dispatcher, run, poll returned a KeyError')
while 1:
try:
conn, packet = self.message.get_nowait()
except Empty:
break
# Send message to waiting thread
key = "%s-%s" %(conn.getUUID(),packet.getId()) key = "%s-%s" %(conn.getUUID(),packet.getId())
if self.message_table.has_key(key): if self.message_table.has_key(key):
tmp_q = self.message_table.pop(key) tmp_q = self.message_table.pop(key)
tmp_q.put(self.message, True) tmp_q.put((conn, packet), True)
else: else:
conn, packet = self.message #conn, packet = self.message
method_type = packet.getType() method_type = packet.getType()
if method_type == PING: if method_type == PING:
# must answer with no delay # must answer with no delay
conn.addPacket(Packet().pong(packet.getId())) conn.addPacket(Packet().pong(packet.getId()))
else: else:
# put message in request queue # put message in request queue
self._request_queue.put(self.message, True) self._request_queue.put((conn, packet), True)
# Then check if a client ask me to send a message # Then check if a client ask me to send a message
try: try:
...@@ -44,10 +54,11 @@ class Dispatcher(Thread): ...@@ -44,10 +54,11 @@ class Dispatcher(Thread):
if m is not None: if m is not None:
tmp_q, msg_id, conn, p = m tmp_q, msg_id, conn, p = m
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id)
if tmp_q is not None: if tmp_q is not None:
# We expect an answer
key = "%s-%s" %(conn.getUUID(),msg_id) key = "%s-%s" %(conn.getUUID(),msg_id)
self.message_table[key] = tmp_q self.message_table[key] = tmp_q
conn.expectMessage(msg_id)
except Empty: except Empty:
continue continue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment