Commit 91e6c9ea authored by Grégory Wisniewski's avatar Grégory Wisniewski

Initial implementation of HandlerSwitcher for connections.

The main purpose is to apply and handler after all pending requests are
satisfied and allow process answer packets of current state out of order.
This fix a bug that appears when a storage tries to get a lock on an object that
is already held by a previous transaction. Is this case the store is delayed
causing the answer packet be sent out of order (packet sequence breakage), so
unexpected from client's point of view.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1874 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5c890a73
...@@ -15,8 +15,6 @@ ...@@ -15,8 +15,6 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from Queue import deque
from neo import logging from neo import logging
from neo.locking import RLock from neo.locking import RLock
...@@ -63,6 +61,65 @@ def lockCheckWrapper(func): ...@@ -63,6 +61,65 @@ def lockCheckWrapper(func):
return wrapper return wrapper
class HandlerSwitcher(object):
def __init__(self, connection, handler):
self._connection = connection
# pending handlers and related requests
self._pending = [[{}, handler]]
def clear(self):
handler = self._pending[0][1]
self._pending = [[{}, handler]]
def isPending(self):
return self._pending[0][0]
def getHandler(self):
return self._pending[0][1]
def emit(self, request):
# register the request in the current handler
assert len(self._pending) == 1 or self._pending[0][0]
(request_dict, _) = self._pending[-1]
msg_id = request.getId()
assert request.getAnswerClass() is not None, "Not a request"
assert msg_id not in request_dict, "Packet id already expected"
request_dict[msg_id] = request.getAnswerClass()
def handle(self, packet):
assert len(self._pending) == 1 or self._pending[0][0]
PACKET_LOGGER.dispatch(self._connection, packet, 'from')
msg_id = packet.getId()
(request_dict, handler) = self._pending[0]
# notifications are not expected
if not packet.isResponse():
handler.packetReceived(self._connection, packet)
return
# checkout the expected answer class
klass = request_dict.pop(msg_id, None)
if klass and isinstance(packet, klass) or packet.isError():
handler.packetReceived(self._connection, packet)
# apply a pending handler if no more answers are pending
if len(self._pending) > 1 and not request_dict:
del self._pending[0]
logging.debug('Apply handler %r', self._pending[0][1])
else:
logging.error('Unexpected answer: %r', packet)
self._connection.abort()
handler.peerBroken()
def setHandler(self, handler):
if len(self._pending) == 1 and not self._pending[0][0]:
# nothing is pending, change immediately
logging.debug('Set handler %r', handler)
self._pending[0][1] = handler
else:
# put the next handler in queue
logging.debug('Delay handler %r', handler)
self._pending.append([{}, handler])
class BaseConnection(object): class BaseConnection(object):
"""A base connection.""" """A base connection."""
...@@ -71,7 +128,7 @@ class BaseConnection(object): ...@@ -71,7 +128,7 @@ class BaseConnection(object):
self.em = event_manager self.em = event_manager
self.connector = connector self.connector = connector
self.addr = addr self.addr = addr
self.handler = handler self._handlers = HandlerSwitcher(self, handler)
if connector is not None: if connector is not None:
self.connector_handler = connector.__class__ self.connector_handler = connector.__class__
event_manager.register(self) event_manager.register(self)
...@@ -117,10 +174,10 @@ class BaseConnection(object): ...@@ -117,10 +174,10 @@ class BaseConnection(object):
__del__ = close __del__ = close
def getHandler(self): def getHandler(self):
return self.handler return self._handlers.getHandler()
def setHandler(self, handler): def setHandler(self, handler):
self.handler = handler self._handlers.setHandler(handler)
def getEventManager(self): def getEventManager(self):
return self.em return self.em
...@@ -143,9 +200,6 @@ class BaseConnection(object): ...@@ -143,9 +200,6 @@ class BaseConnection(object):
def hasPendingMessages(self): def hasPendingMessages(self):
return False return False
def hasPendingRequests(self):
return False
def whoSetConnector(self): def whoSetConnector(self):
""" """
Debugging method: call this method to know who set the current Debugging method: call this method to know who set the current
...@@ -200,8 +254,6 @@ class Connection(BaseConnection): ...@@ -200,8 +254,6 @@ class Connection(BaseConnection):
self.aborted = False self.aborted = False
self.uuid = None self.uuid = None
self._queue = [] self._queue = []
self._expected = deque()
self._next_handler = None
self._on_close = None self._on_close = None
BaseConnection.__init__(self, event_manager, handler, BaseConnection.__init__(self, event_manager, handler,
connector = connector, addr = addr, connector = connector, addr = addr,
...@@ -209,17 +261,6 @@ class Connection(BaseConnection): ...@@ -209,17 +261,6 @@ class Connection(BaseConnection):
if connector is not None: if connector is not None:
event_manager.addReader(self) event_manager.addReader(self)
def setHandler(self, handler):
assert self._next_handler is None
if self.hasPendingRequests():
logging.debug('Delaying: %s -> %s after %s',
self.handler.__class__.__name__, handler.__class__.__name__,
list(self._expected))
self._expected.append(APPLY_HANDLER)
self._next_handler = handler
else:
self.handler = handler
def setOnClose(self, callback): def setOnClose(self, callback):
assert self._on_close is None assert self._on_close is None
self._on_close = callback self._on_close = callback
...@@ -256,7 +297,7 @@ class Connection(BaseConnection): ...@@ -256,7 +297,7 @@ class Connection(BaseConnection):
self.event_dict.clear() self.event_dict.clear()
self.write_buf = "" self.write_buf = ""
self.read_buf = "" self.read_buf = ""
self._expected.clear() self._handlers.clear()
def abort(self): def abort(self):
"""Abort dealing with this connection.""" """Abort dealing with this connection."""
...@@ -290,7 +331,7 @@ class Connection(BaseConnection): ...@@ -290,7 +331,7 @@ class Connection(BaseConnection):
if packet is None: if packet is None:
break break
except PacketMalformedError, msg: except PacketMalformedError, msg:
self.handler._packetMalformed(self, msg) self.getHandler()._packetMalformed(self, msg)
return return
self.read_buf = self.read_buf[len(packet):] self.read_buf = self.read_buf[len(packet):]
...@@ -319,39 +360,13 @@ class Connection(BaseConnection): ...@@ -319,39 +360,13 @@ class Connection(BaseConnection):
""" """
return len(self._queue) != 0 return len(self._queue) != 0
def hasPendingRequests(self):
"""
Returns True if there are pending expected answer packets
"""
return bool(self._expected)
def process(self): def process(self):
""" """
Process a pending packet. Process a pending packet.
""" """
# check out packet and check if it's and expected answer # check out packet and process it with current handler
packet = self._queue.pop(0) packet = self._queue.pop(0)
if packet.isResponse(): self._handlers.handle(packet)
request = None
if self._expected:
request = self._expected.popleft()
if request is APPLY_HANDLER:
logging.debug('Apply handler %s',
self._next_handler.__class__.__name__)
self.handler = self._next_handler
self._next_handler = None
return
if not request or not request.answerMatch(packet):
req_info = ('', '')
if request is not None:
req_info = (request.getId(), request.__class__)
rep_info = (packet.getId(), packet.__class__)
logging.warning('Unexpected answer: %s:%s %s:%s' %
(rep_info + req_info))
# process packet
PACKET_LOGGER.dispatch(self, packet, 'from')
self.handler.packetReceived(self, packet)
def pending(self): def pending(self):
return self.connector is not None and self.write_buf return self.connector is not None and self.write_buf
...@@ -359,7 +374,7 @@ class Connection(BaseConnection): ...@@ -359,7 +374,7 @@ class Connection(BaseConnection):
def _closure(self): def _closure(self):
assert self.connector is not None, self.whoSetConnector() assert self.connector is not None, self.whoSetConnector()
self.close() self.close()
self.handler.connectionClosed(self) self.getHandler().connectionClosed(self)
def _recv(self): def _recv(self):
"""Receive data from a connector.""" """Receive data from a connector."""
...@@ -375,7 +390,7 @@ class Connection(BaseConnection): ...@@ -375,7 +390,7 @@ class Connection(BaseConnection):
except ConnectorConnectionRefusedException: except ConnectorConnectionRefusedException:
# should only occur while connecting # should only occur while connecting
self.close() self.close()
self.handler.connectionFailed(self) self.getHandler().connectionFailed(self)
except ConnectorConnectionClosedException: except ConnectorConnectionClosedException:
# connection resetted by peer, according to the man, this error # connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false # should not occurs but it seems it's false
...@@ -469,8 +484,7 @@ class Connection(BaseConnection): ...@@ -469,8 +484,7 @@ class Connection(BaseConnection):
self.expectMessage(msg_id, timeout=timeout, self.expectMessage(msg_id, timeout=timeout,
additional_timeout=additional_timeout) additional_timeout=additional_timeout)
self._addPacket(packet) self._addPacket(packet)
assert packet.getAnswer() is not None, packet self._handlers.emit(packet)
self._expected.append(packet)
return msg_id return msg_id
@not_closed @not_closed
...@@ -509,7 +523,7 @@ class ClientConnection(Connection): ...@@ -509,7 +523,7 @@ class ClientConnection(Connection):
event_manager.addWriter(self) event_manager.addWriter(self)
else: else:
self.connecting = False self.connecting = False
self.handler.connectionCompleted(self) self.getHandler().connectionCompleted(self)
event_manager.addReader(self) event_manager.addReader(self)
except ConnectorConnectionRefusedException: except ConnectorConnectionRefusedException:
handler.connectionFailed(self) handler.connectionFailed(self)
...@@ -525,12 +539,12 @@ class ClientConnection(Connection): ...@@ -525,12 +539,12 @@ class ClientConnection(Connection):
if self.connecting: if self.connecting:
err = self.connector.getError() err = self.connector.getError()
if err: if err:
self.handler.connectionFailed(self) self.getHandler().connectionFailed(self)
self.close() self.close()
return return
else: else:
self.connecting = False self.connecting = False
self.handler.connectionCompleted(self) self.getHandler().connectionCompleted(self)
self.em.addReader(self) self.em.addReader(self)
else: else:
Connection.writable(self) Connection.writable(self)
...@@ -594,8 +608,7 @@ class MTClientConnection(ClientConnection): ...@@ -594,8 +608,7 @@ class MTClientConnection(ClientConnection):
self.dispatcher.register(self, msg_id, queue) self.dispatcher.register(self, msg_id, queue)
self.expectMessage(msg_id) self.expectMessage(msg_id)
self._addPacket(packet) self._addPacket(packet)
assert packet.getAnswer() is not None, packet self._handlers.emit(packet)
self._expected.append(packet)
return msg_id return msg_id
@lockCheckWrapper @lockCheckWrapper
......
...@@ -294,16 +294,13 @@ class Packet(object): ...@@ -294,16 +294,13 @@ class Packet(object):
assert body == '', "Non-empty packet decoding not implemented """ assert body == '', "Non-empty packet decoding not implemented """
return () return ()
def isError(self):
return isinstance(self, Error)
def isResponse(self): def isResponse(self):
return self._code & RESPONSE_MASK == RESPONSE_MASK return self._code & RESPONSE_MASK == RESPONSE_MASK
def answerMatch(self, answer): def getAnswerClass(self):
id_match = self._id == answer._id
is_error = answer.__class__ == Error
assert self._answer is not None
return id_match and (is_error or isinstance(answer, self._answer))
def getAnswer(self):
return self._answer return self._answer
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment