Commit 7b03536b authored by Vincent Pelletier's avatar Vincent Pelletier

Replace dipatcher.pop by dispatcher.dispatch .

It is important that unregistering a response from dispatcher and putting
it in its queue are atomic at dispatcher level to support askStoreObject
pipelining.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1772 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 3b4d8542
...@@ -38,10 +38,8 @@ class BaseHandler(EventHandler): ...@@ -38,10 +38,8 @@ class BaseHandler(EventHandler):
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
"""Redirect all received packet to dispatcher thread.""" """Redirect all received packet to dispatcher thread."""
if packet.isResponse(): if packet.isResponse():
queue = self.dispatcher.pop(conn, packet.getId(), None) if not self.dispatcher.dispatch(conn, packet.getId(), (conn, packet)):
if queue is None:
raise ProtocolError('Unexpected response packet') raise ProtocolError('Unexpected response packet')
queue.put((conn, packet))
else: else:
self.dispatch(conn, packet) self.dispatch(conn, packet)
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo.locking import Lock from neo.locking import Lock
MARKER = []
EMPTY = {} EMPTY = {}
def giant_lock(func): def giant_lock(func):
...@@ -38,12 +37,13 @@ class Dispatcher: ...@@ -38,12 +37,13 @@ class Dispatcher:
self.lock_release = lock.release self.lock_release = lock.release
@giant_lock @giant_lock
def pop(self, conn, msg_id, default=MARKER): def dispatch(self, conn, msg_id, data):
"""Retrieve register-time provided payload.""" """Retrieve register-time provided queue, and put data in it."""
result = self.message_table.get(id(conn), EMPTY).pop(msg_id, default) queue = self.message_table.get(id(conn), EMPTY).pop(msg_id, None)
if result is MARKER: if queue is None:
raise KeyError, (id(conn), msg_id) return False
return result queue.put(data)
return True
@giant_lock @giant_lock
def register(self, conn, msg_id, queue): def register(self, conn, msg_id, queue):
......
...@@ -19,6 +19,7 @@ import unittest ...@@ -19,6 +19,7 @@ import unittest
from mock import Mock from mock import Mock
from neo.dispatcher import Dispatcher from neo.dispatcher import Dispatcher
from Queue import Queue
class DispatcherTests(unittest.TestCase): class DispatcherTests(unittest.TestCase):
...@@ -27,9 +28,15 @@ class DispatcherTests(unittest.TestCase): ...@@ -27,9 +28,15 @@ class DispatcherTests(unittest.TestCase):
def testRegister(self): def testRegister(self):
conn = object() conn = object()
self.dispatcher.register(conn, 1, 0) queue = Queue()
self.assertEqual(self.dispatcher.pop(conn, 1, None), 0) MARKER = object()
self.assertEqual(self.dispatcher.pop(conn, 2, 3), 3) self.dispatcher.register(conn, 1, queue)
self.assertTrue(queue.empty())
self.assertTrue(self.dispatcher.dispatch(conn, 1, MARKER))
self.assertFalse(queue.empty())
self.assertTrue(queue.get(block=False) is MARKER)
self.assertTrue(queue.empty())
self.assertFalse(self.dispatcher.dispatch(conn, 2, None))
def testUnregister(self): def testUnregister(self):
conn = object() conn = object()
...@@ -37,7 +44,7 @@ class DispatcherTests(unittest.TestCase): ...@@ -37,7 +44,7 @@ class DispatcherTests(unittest.TestCase):
self.dispatcher.register(conn, 2, queue) self.dispatcher.register(conn, 2, queue)
self.dispatcher.unregister(conn) self.dispatcher.unregister(conn)
self.assertEqual(len(queue.mockGetNamedCalls('put')), 1) self.assertEqual(len(queue.mockGetNamedCalls('put')), 1)
self.assertEqual(self.dispatcher.pop(conn, 2, 3), 3) self.assertFalse(self.dispatcher.dispatch(conn, 2, None))
def testRegistered(self): def testRegistered(self):
conn1 = object() conn1 = object()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment