Commit 9e54a8e0 authored by Julien Muchembled's avatar Julien Muchembled

When processing an answer, also update timeout and handler switcher on exception

This keeps the connection fully functional when a handler raises an exception.
parent 4a82657b
...@@ -112,7 +112,8 @@ class HandlerSwitcher(object): ...@@ -112,7 +112,8 @@ class HandlerSwitcher(object):
self._is_handling = False self._is_handling = False
def _handle(self, connection, packet): def _handle(self, connection, packet):
assert len(self._pending) == 1 or self._pending[0][0] pending = self._pending
assert len(pending) == 1 or pending[0][0], pending
logging.packet(connection, packet, False) logging.packet(connection, packet, False)
if connection.isClosed() and (connection.isAborted() or if connection.isClosed() and (connection.isAborted() or
packet.ignoreOnClosedConnection()): packet.ignoreOnClosedConnection()):
...@@ -121,29 +122,31 @@ class HandlerSwitcher(object): ...@@ -121,29 +122,31 @@ class HandlerSwitcher(object):
return return
if not packet.isResponse(): # notification if not packet.isResponse(): # notification
# XXX: If there are several handlers, which one to use ? # XXX: If there are several handlers, which one to use ?
self._pending[0][1].packetReceived(connection, packet) pending[0][1].packetReceived(connection, packet)
return return
msg_id = packet.getId() msg_id = packet.getId()
request_dict, handler = self._pending[0] request_dict, handler = pending[0]
# checkout the expected answer class # checkout the expected answer class
try: try:
klass, _, _, kw = request_dict.pop(msg_id) klass, _, _, kw = request_dict.pop(msg_id)
except KeyError: except KeyError:
klass = None klass = None
kw = {} kw = {}
try:
if klass and isinstance(packet, klass) or packet.isError(): if klass and isinstance(packet, klass) or packet.isError():
handler.packetReceived(connection, packet, kw) handler.packetReceived(connection, packet, kw)
else: else:
logging.error('Unexpected answer %r in %r', packet, connection) logging.error('Unexpected answer %r in %r', packet, connection)
if not connection.isClosed(): if not connection.isClosed():
notification = Packets.Notify('Unexpected answer: %r' % packet) connection.send(Packets.Notify(
connection.send(notification) 'Unexpected answer: %r' % packet))
connection.abort() connection.abort()
# handler.peerBroken(connection) # handler.peerBroken(connection)
finally:
# apply a pending handler if no more answers are pending # apply a pending handler if no more answers are pending
while len(self._pending) > 1 and not self._pending[0][0]: while len(pending) > 1 and not pending[0][0]:
del self._pending[0] del pending[0]
logging.debug('Apply handler %r on %r', self._pending[0][1], logging.debug('Apply handler %r on %r', pending[0][1],
connection) connection)
if msg_id == self._next_timeout_msg_id: if msg_id == self._next_timeout_msg_id:
self._updateNextTimeout() self._updateNextTimeout()
...@@ -497,7 +500,9 @@ class Connection(BaseConnection): ...@@ -497,7 +500,9 @@ class Connection(BaseConnection):
Process a pending packet. Process a pending packet.
""" """
# check out packet and process it with current handler # check out packet and process it with current handler
try:
self._handlers.handle(self, self._queue.pop(0)) self._handlers.handle(self, self._queue.pop(0))
finally:
self.updateTimeout() self.updateTimeout()
def pending(self): def pending(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment