Commit d16b01f2 authored by Vincent Pelletier's avatar Vincent Pelletier

Remove the need to use a global queue for unexpected messages. They will be...

Remove the need to use a global queue for unexpected messages. They will be handled in the Dispatcher thread directly, to prevent them from piling up until a client enters app._waitMessage .


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@274 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 6e45cf77
...@@ -40,18 +40,14 @@ class Storage(BaseStorage.BaseStorage, ...@@ -40,18 +40,14 @@ class Storage(BaseStorage.BaseStorage,
l = Lock() l = Lock()
self._txn_lock_acquire = l.acquire self._txn_lock_acquire = l.acquire
self._txn_lock_release = l.release self._txn_lock_release = l.release
# Create a queue for message between thread and dispatcher
# - request queue is for message receive from other node which have to
# be processed
request_queue = Queue()
# Create the event manager # Create the event manager
em = EventManager() em = EventManager()
# Create dispatcher thread # Create dispatcher thread
dispatcher = Dispatcher(em, request_queue) dispatcher = Dispatcher(em)
dispatcher.setDaemon(True) dispatcher.setDaemon(True)
connector_handler = getattr(Connector, connector) connector_handler = getattr(Connector, connector)
self.app = Application(master_nodes, name, em, dispatcher, self.app = Application(master_nodes, name, em, dispatcher,
request_queue, connector_handler) connector_handler)
# Connect to primary master node # Connect to primary master node
dispatcher.connectToPrimaryMasterNode(self.app, self.app.connector_handler) dispatcher.connectToPrimaryMasterNode(self.app, self.app.connector_handler)
# Start dispatcher # Start dispatcher
......
...@@ -173,7 +173,7 @@ class ConnectionPool(object): ...@@ -173,7 +173,7 @@ class ConnectionPool(object):
class Application(object): class Application(object):
"""The client node application.""" """The client node application."""
def __init__(self, master_nodes, name, em, dispatcher, request_queue, connector, **kw): def __init__(self, master_nodes, name, em, dispatcher, connector, **kw):
logging.basicConfig(level = logging.DEBUG) logging.basicConfig(level = logging.DEBUG)
logging.debug('master node address are %s' %(master_nodes,)) logging.debug('master node address are %s' %(master_nodes,))
# Internal Attributes common to all thread # Internal Attributes common to all thread
...@@ -184,7 +184,6 @@ class Application(object): ...@@ -184,7 +184,6 @@ class Application(object):
self.nm = NodeManager() self.nm = NodeManager()
self.cp = ConnectionPool(self) self.cp = ConnectionPool(self)
self.pt = None self.pt = None
self.request_queue = request_queue
self.primary_master_node = None self.primary_master_node = None
self.master_node_list = master_nodes.split(' ') self.master_node_list = master_nodes.split(' ')
self.master_conn = None self.master_conn = None
...@@ -235,20 +234,16 @@ class Application(object): ...@@ -235,20 +234,16 @@ class Application(object):
def _waitMessage(self, target_conn = None, msg_id = None): def _waitMessage(self, target_conn = None, msg_id = None):
"""Wait for a message returned by the dispatcher in queues.""" """Wait for a message returned by the dispatcher in queues."""
global_queue = self.request_queue
local_queue = self.getQueue() local_queue = self.getQueue()
while 1: while 1:
try: if msg_id is None:
conn, packet = global_queue.get_nowait() try:
except Empty: conn, packet = local_queue.get_nowait()
if msg_id is None: except Empty:
try: break
conn, packet = local_queue.get_nowait() else:
except Empty: conn, packet = local_queue.get()
break
else:
conn, packet = local_queue.get()
if packet is None: if packet is None:
if conn is target_conn: if conn is target_conn:
......
...@@ -28,9 +28,8 @@ import logging ...@@ -28,9 +28,8 @@ import logging
class Dispatcher(Thread): class Dispatcher(Thread):
"""Dispatcher class use to redirect request to thread.""" """Dispatcher class use to redirect request to thread."""
def __init__(self, em, request_queue, **kw): def __init__(self, em, **kw):
Thread.__init__(self, **kw) Thread.__init__(self, **kw)
self._request_queue = request_queue
self.em = em self.em = em
# This dict is used to associate conn/message id to client thread queue # This dict is used to associate conn/message id to client thread queue
# and thus redispatch answer to the original thread # and thus redispatch answer to the original thread
...@@ -47,23 +46,9 @@ class Dispatcher(Thread): ...@@ -47,23 +46,9 @@ class Dispatcher(Thread):
# This happen when there is no connection # This happen when there is no connection
logging.error('Dispatcher, run, poll returned a KeyError') logging.error('Dispatcher, run, poll returned a KeyError')
def dispatch(self, conn, packet): def getQueue(self, conn, packet):
key = (id(conn), packet.getId()) key = (id(conn), packet.getId())
queue = self.message_table.pop(key, None) return self.message_table.pop(key, None)
if queue is None:
method_type = packet.getType()
if method_type == PING:
# must answer with no delay
conn.lock()
try:
conn.addPacket(Packet().pong(packet.getId()))
finally:
conn.unlock()
else:
# put message in request queue
self._request_queue.put((conn, packet))
else:
queue.put((conn, packet))
def register(self, conn, msg_id, queue): def register(self, conn, msg_id, queue):
"""Register an expectation for a reply. Thanks to GIL, it is """Register an expectation for a reply. Thanks to GIL, it is
......
...@@ -45,7 +45,11 @@ class ClientEventHandler(EventHandler): ...@@ -45,7 +45,11 @@ class ClientEventHandler(EventHandler):
"""Redirect all received packet to dispatcher thread.""" """Redirect all received packet to dispatcher thread."""
logging.debug('packet %d:%x received from %s:%d', logging.debug('packet %d:%x received from %s:%d',
packet.getId(), packet.getType(), *(conn.getAddress())) packet.getId(), packet.getType(), *(conn.getAddress()))
self.dispatcher.dispatch(conn, packet) queue = self.dispatcher.getQueue(conn, packet)
if queue is None:
self.dispatch(conn, packet)
else:
queue.put((conn, packet))
def _dealWithStorageFailure(self, conn, node, state): def _dealWithStorageFailure(self, conn, node, state):
app = self.app app = self.app
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment