Commit 7c7abfc0 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Track requests packets and check if the answers match.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1705 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f24034f7
......@@ -15,6 +15,8 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from Queue import deque
from neo import logging
from neo.locking import RLock
......@@ -139,6 +141,9 @@ class BaseConnection(object):
def hasPendingMessages(self):
return False
def hasPendingRequests(self):
return False
def whoSetConnector(self):
"""
Debugging method: call this method to know who set the current
......@@ -193,6 +198,7 @@ class Connection(BaseConnection):
self.aborted = False
self.uuid = None
self._queue = []
self._expected = deque()
BaseConnection.__init__(self, event_manager, handler,
connector = connector, addr = addr,
connector_handler = connector_handler)
......@@ -232,6 +238,7 @@ class Connection(BaseConnection):
self.event_dict.clear()
self.write_buf = ""
self.read_buf = ""
self._expected.clear()
def abort(self):
"""Abort dealing with this connection."""
......@@ -290,11 +297,28 @@ class Connection(BaseConnection):
"""
return len(self._queue) != 0
def hasPendingRequests(self):
"""
Returns True if there are pending expected answer packets
"""
return bool(self._expected)
def process(self):
"""
Process a pending packet.
"""
packet = self._queue.pop(0)
if packet.isResponse():
request = None
if self._expected:
request = self._expected.popleft()
if not request or not request.answerMatch(packet):
req_info = ('', '')
if request is not None:
req_info = (request.getId(), request.__class__)
rep_info = (packet.getId(), packet.__class__)
logging.warning('Unexpected answer: %s:%s %s:%s' %
(rep_info + req_info))
PACKET_LOGGER.dispatch(self, packet, 'from')
self.handler.packetReceived(self, packet)
......@@ -414,6 +438,8 @@ class Connection(BaseConnection):
self.expectMessage(msg_id, timeout=timeout,
additional_timeout=additional_timeout)
self._addPacket(packet)
assert packet.getAnswer() is not None, packet
self._expected.append(packet)
return msg_id
@not_closed
......@@ -422,6 +448,7 @@ class Connection(BaseConnection):
if msg_id is None:
msg_id = self.getPeerId()
packet.setId(msg_id)
assert packet.isResponse(), packet
self._addPacket(packet)
def ping(self, timeout=5):
......@@ -535,6 +562,8 @@ class MTClientConnection(ClientConnection):
self.dispatcher.register(self, msg_id, queue)
self.expectMessage(msg_id)
self._addPacket(packet)
assert packet.getAnswer() is not None, packet
self._expected.append(packet)
return msg_id
@lockCheckWrapper
......
......@@ -283,6 +283,12 @@ class Packet(object):
def isResponse(self):
return self._code & RESPONSE_MASK == RESPONSE_MASK
def answerMatch(self, answer):
id_match = self._id == answer._id
is_error = answer.__class__ == Error
assert self._answer is not None
return id_match and (is_error or isinstance(answer, self._answer))
def getAnswer(self):
return self._answer
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment