Commit 75499dbc authored by Vincent Pelletier's avatar Vincent Pelletier

Make ClientEventHandler.dispatch call a method on Dispatcher class to avoid...

Make ClientEventHandler.dispatch call a method on Dispatcher class to avoid accessing instance properties from a foreign class.
Make dispatch thread-safe by accessing message_table atomicaly.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@269 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 6760a4a0
......@@ -47,6 +47,24 @@ class Dispatcher(Thread):
# This happen when there is no connection
logging.error('Dispatcher, run, poll returned a KeyError')
def dispatch(self, conn, packet):
key = (id(conn), packet.getId())
queue = self.message_table.pop(key, None)
if queue is None:
method_type = packet.getType()
if method_type == PING:
# must answer with no delay
conn.lock()
try:
conn.addPacket(Packet().pong(packet.getId()))
finally:
conn.unlock()
else:
# put message in request queue
self._request_queue.put((conn, packet))
else:
queue.put((conn, packet))
def register(self, conn, msg_id, queue):
"""Register an expectation for a reply. Thanks to GIL, it is
safe not to use a lock here."""
......
......@@ -22,7 +22,7 @@ from neo.connection import MTClientConnection
from neo.protocol import Packet, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
BROKEN_STATE, PING
BROKEN_STATE
from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable
from neo.client.exception import NEOStorageError
......@@ -43,24 +43,7 @@ class ClientEventHandler(EventHandler):
def dispatch(self, conn, packet):
"""Redirect all received packet to dispatcher thread."""
dispatcher = self.dispatcher
# Send message to waiting thread
key = (id(conn), packet.getId())
if key in dispatcher.message_table:
queue = dispatcher.message_table.pop(key)
queue.put((conn, packet))
else:
method_type = packet.getType()
if method_type == PING:
# must answer with no delay
conn.lock()
try:
conn.addPacket(Packet().pong(packet.getId()))
finally:
conn.unlock()
else:
# put message in request queue
dispatcher._request_queue.put((conn, packet))
self.dispatcher.dispatch(conn, packet)
def _dealWithStorageFailure(self, conn, node, state):
app = self.app
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment