Commit 37decce5 authored by Vincent Pelletier's avatar Vincent Pelletier

Don't store connection on instances.

This causes garbage collection problems, as BaseConnection instances also
hold a reference to HandlerSwitcher instance. It is unclear why it fails,
as microbenchs show this should not be a problem for gc.
Also, move loggers out of HandlerSwitcher.setHandler to avoid passing a
connection parameter just for them.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2222 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent d8a7a177
...@@ -84,8 +84,7 @@ class HandlerSwitcher(object): ...@@ -84,8 +84,7 @@ class HandlerSwitcher(object):
_next_timeout_msg_id = None _next_timeout_msg_id = None
_next_on_timeout = None _next_on_timeout = None
def __init__(self, connection, handler): def __init__(self, handler):
self._connection = connection
# pending handlers and related requests # pending handlers and related requests
self._pending = [[{}, handler]] self._pending = [[{}, handler]]
self._is_handling = False self._is_handling = False
...@@ -127,14 +126,14 @@ class HandlerSwitcher(object): ...@@ -127,14 +126,14 @@ class HandlerSwitcher(object):
self._next_on_timeout = on_timeout self._next_on_timeout = on_timeout
request_dict[msg_id] = (answer_class, timeout, on_timeout) request_dict[msg_id] = (answer_class, timeout, on_timeout)
def checkTimeout(self, t): def checkTimeout(self, connection, t):
next_timeout = self._next_timeout next_timeout = self._next_timeout
if next_timeout is not None and next_timeout < t: if next_timeout is not None and next_timeout < t:
msg_id = self._next_timeout_msg_id msg_id = self._next_timeout_msg_id
if self._next_on_timeout is None: if self._next_on_timeout is None:
result = msg_id result = msg_id
else: else:
if self._next_on_timeout(self._connection, msg_id): if self._next_on_timeout(connection, msg_id):
# Don't notify that a timeout occured, and forget about # Don't notify that a timeout occured, and forget about
# this answer. # this answer.
for (request_dict, _) in self._pending: for (request_dict, _) in self._pending:
...@@ -148,39 +147,39 @@ class HandlerSwitcher(object): ...@@ -148,39 +147,39 @@ class HandlerSwitcher(object):
result = None result = None
return result return result
def handle(self, packet): def handle(self, connection, packet):
assert not self._is_handling assert not self._is_handling
self._is_handling = True self._is_handling = True
try: try:
self._handle(packet) self._handle(connection, packet)
finally: finally:
self._is_handling = False self._is_handling = False
@profiler_decorator @profiler_decorator
def _handle(self, packet): def _handle(self, connection, packet):
assert len(self._pending) == 1 or self._pending[0][0] assert len(self._pending) == 1 or self._pending[0][0]
PACKET_LOGGER.dispatch(self._connection, packet, 'from') PACKET_LOGGER.dispatch(connection, packet, 'from')
msg_id = packet.getId() msg_id = packet.getId()
(request_dict, handler) = self._pending[0] (request_dict, handler) = self._pending[0]
# notifications are not expected # notifications are not expected
if not packet.isResponse(): if not packet.isResponse():
handler.packetReceived(self._connection, packet) handler.packetReceived(connection, packet)
return return
# checkout the expected answer class # checkout the expected answer class
(klass, timeout, _) = request_dict.pop(msg_id, (None, None, None)) (klass, timeout, _) = request_dict.pop(msg_id, (None, None, None))
if klass and isinstance(packet, klass) or packet.isError(): if klass and isinstance(packet, klass) or packet.isError():
handler.packetReceived(self._connection, packet) handler.packetReceived(connection, packet)
else: else:
logging.error('Unexpected answer %r in %r', packet, self._connection) logging.error('Unexpected answer %r in %r', packet, connection)
notification = Packets.Notify('Unexpected answer: %r' % packet) notification = Packets.Notify('Unexpected answer: %r' % packet)
self._connection.notify(notification) connection.notify(notification)
self._connection.abort() connection.abort()
handler.peerBroken(self._connection) handler.peerBroken(connection)
# apply a pending handler if no more answers are pending # apply a pending handler if no more answers are pending
while len(self._pending) > 1 and not self._pending[0][0]: while len(self._pending) > 1 and not self._pending[0][0]:
del self._pending[0] del self._pending[0]
logging.debug('Apply handler %r on %r', self._pending[0][1], logging.debug('Apply handler %r on %r', self._pending[0][1],
self._connection) connection)
if timeout == self._next_timeout: if timeout == self._next_timeout:
self._updateNextTimeout() self._updateNextTimeout()
...@@ -202,14 +201,14 @@ class HandlerSwitcher(object): ...@@ -202,14 +201,14 @@ class HandlerSwitcher(object):
@profiler_decorator @profiler_decorator
def setHandler(self, handler): def setHandler(self, handler):
if len(self._pending) == 1 and not self._pending[0][0]: can_apply = len(self._pending) == 1 and not self._pending[0][0]
if can_apply:
# nothing is pending, change immediately # nothing is pending, change immediately
logging.debug('Set handler %r on %r', handler, self._connection)
self._pending[0][1] = handler self._pending[0][1] = handler
else: else:
# put the next handler in queue # put the next handler in queue
logging.debug('Delay handler %r on %r', handler, self._connection)
self._pending.append([{}, handler]) self._pending.append([{}, handler])
return can_apply
class Timeout(object): class Timeout(object):
...@@ -265,14 +264,14 @@ class BaseConnection(object): ...@@ -265,14 +264,14 @@ class BaseConnection(object):
self.em = event_manager self.em = event_manager
self.connector = connector self.connector = connector
self.addr = addr self.addr = addr
self._handlers = HandlerSwitcher(self, handler) self._handlers = HandlerSwitcher(handler)
self._timeout = Timeout() self._timeout = Timeout()
event_manager.register(self) event_manager.register(self)
def checkTimeout(self, t): def checkTimeout(self, t):
handlers = self._handlers handlers = self._handlers
if handlers.isPending(): if handlers.isPending():
msg_id = handlers.checkTimeout(t) msg_id = handlers.checkTimeout(self, t)
if msg_id is not None: if msg_id is not None:
logging.info('timeout for %r with %r', msg_id, self) logging.info('timeout for %r with %r', msg_id, self)
self.close() self.close()
...@@ -332,7 +331,10 @@ class BaseConnection(object): ...@@ -332,7 +331,10 @@ class BaseConnection(object):
return self._handlers.getHandler() return self._handlers.getHandler()
def setHandler(self, handler): def setHandler(self, handler):
self._handlers.setHandler(handler) if self._handlers.setHandler(handler):
logging.debug('Set handler %r on %r', handler, self)
else:
logging.debug('Delay handler %r on %r', handler, self)
def getEventManager(self): def getEventManager(self):
return self.em return self.em
...@@ -504,7 +506,7 @@ class Connection(BaseConnection): ...@@ -504,7 +506,7 @@ class Connection(BaseConnection):
""" """
# check out packet and process it with current handler # check out packet and process it with current handler
packet = self._queue.pop(0) packet = self._queue.pop(0)
self._handlers.handle(packet) self._handlers.handle(self, packet)
def pending(self): def pending(self):
return self.connector is not None and self.write_buf return self.connector is not None and self.write_buf
......
...@@ -840,11 +840,11 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -840,11 +840,11 @@ class HandlerSwitcherTests(NeoTestBase):
self._handler = handler = Mock({ self._handler = handler = Mock({
'__repr__': 'initial handler', '__repr__': 'initial handler',
}) })
self._connection = connection = Mock({ self._connection = Mock({
'__repr__': 'connection', '__repr__': 'connection',
'getAddress': ('127.0.0.1', 10000), 'getAddress': ('127.0.0.1', 10000),
}) })
self._handlers = HandlerSwitcher(connection, handler) self._handlers = HandlerSwitcher(handler)
def _makeNotification(self, msg_id): def _makeNotification(self, msg_id):
packet = Packets.StartOperation() packet = Packets.StartOperation()
...@@ -884,42 +884,44 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -884,42 +884,44 @@ class HandlerSwitcherTests(NeoTestBase):
# Second case, emit is called from inside a handler with a pending # Second case, emit is called from inside a handler with a pending
# handler change. # handler change.
new_handler = self._makeHandler() new_handler = self._makeHandler()
self._handlers.setHandler(new_handler) applied = self._handlers.setHandler(new_handler)
self.assertFalse(applied)
self._checkCurrentHandler(self._handler) self._checkCurrentHandler(self._handler)
call_tracker = [] call_tracker = []
def packetReceived(conn, packet): def packetReceived(conn, packet):
self._handlers.emit(self._makeRequest(2), 0, None) self._handlers.emit(self._makeRequest(2), 0, None)
call_tracker.append(True) call_tracker.append(True)
self._handler.packetReceived = packetReceived self._handler.packetReceived = packetReceived
self._handlers.handle(self._makeAnswer(1)) self._handlers.handle(self._connection, self._makeAnswer(1))
self.assertEqual(call_tracker, [True]) self.assertEqual(call_tracker, [True])
# Effective handler must not have changed (new request is blocking # Effective handler must not have changed (new request is blocking
# it) # it)
self._checkCurrentHandler(self._handler) self._checkCurrentHandler(self._handler)
# Handling the next response will cause the handler to change # Handling the next response will cause the handler to change
delattr(self._handler, 'packetReceived') delattr(self._handler, 'packetReceived')
self._handlers.handle(self._makeAnswer(2)) self._handlers.handle(self._connection, self._makeAnswer(2))
self._checkCurrentHandler(new_handler) self._checkCurrentHandler(new_handler)
def testHandleNotification(self): def testHandleNotification(self):
# handle with current handler # handle with current handler
notif1 = self._makeNotification(1) notif1 = self._makeNotification(1)
self._handlers.handle(notif1) self._handlers.handle(self._connection, notif1)
self._checkPacketReceived(self._handler, notif1) self._checkPacketReceived(self._handler, notif1)
# emit a request and delay an handler # emit a request and delay an handler
request = self._makeRequest(2) request = self._makeRequest(2)
self._handlers.emit(request, 0, None) self._handlers.emit(request, 0, None)
handler = self._makeHandler() handler = self._makeHandler()
self._handlers.setHandler(handler) applied = self._handlers.setHandler(handler)
self.assertFalse(applied)
# next notification fall into the current handler # next notification fall into the current handler
notif2 = self._makeNotification(3) notif2 = self._makeNotification(3)
self._handlers.handle(notif2) self._handlers.handle(self._connection, notif2)
self._checkPacketReceived(self._handler, notif2, index=1) self._checkPacketReceived(self._handler, notif2, index=1)
# handle with new handler # handle with new handler
answer = self._makeAnswer(2) answer = self._makeAnswer(2)
self._handlers.handle(answer) self._handlers.handle(self._connection, answer)
notif3 = self._makeNotification(4) notif3 = self._makeNotification(4)
self._handlers.handle(notif3) self._handlers.handle(self._connection, notif3)
self._checkPacketReceived(handler, notif2) self._checkPacketReceived(handler, notif2)
def testHandleAnswer1(self): def testHandleAnswer1(self):
...@@ -927,7 +929,7 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -927,7 +929,7 @@ class HandlerSwitcherTests(NeoTestBase):
request = self._makeRequest(1) request = self._makeRequest(1)
self._handlers.emit(request, 0, None) self._handlers.emit(request, 0, None)
answer = self._makeAnswer(1) answer = self._makeAnswer(1)
self._handlers.handle(answer) self._handlers.handle(self._connection, answer)
self._checkPacketReceived(self._handler, answer) self._checkPacketReceived(self._handler, answer)
def testHandleAnswer2(self): def testHandleAnswer2(self):
...@@ -935,9 +937,10 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -935,9 +937,10 @@ class HandlerSwitcherTests(NeoTestBase):
request = self._makeRequest(1) request = self._makeRequest(1)
self._handlers.emit(request, 0, None) self._handlers.emit(request, 0, None)
handler = self._makeHandler() handler = self._makeHandler()
self._handlers.setHandler(handler) applied = self._handlers.setHandler(handler)
self.assertFalse(applied)
answer = self._makeAnswer(1) answer = self._makeAnswer(1)
self._handlers.handle(answer) self._handlers.handle(self._connection, answer)
self._checkPacketReceived(self._handler, answer) self._checkPacketReceived(self._handler, answer)
self._checkCurrentHandler(handler) self._checkCurrentHandler(handler)
...@@ -954,19 +957,22 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -954,19 +957,22 @@ class HandlerSwitcherTests(NeoTestBase):
h3 = self._makeHandler() h3 = self._makeHandler()
# emit all requests and setHandleres # emit all requests and setHandleres
self._handlers.emit(r1, 0, None) self._handlers.emit(r1, 0, None)
self._handlers.setHandler(h1) applied = self._handlers.setHandler(h1)
self.assertFalse(applied)
self._handlers.emit(r2, 0, None) self._handlers.emit(r2, 0, None)
self._handlers.setHandler(h2) applied = self._handlers.setHandler(h2)
self.assertFalse(applied)
self._handlers.emit(r3, 0, None) self._handlers.emit(r3, 0, None)
self._handlers.setHandler(h3) applied = self._handlers.setHandler(h3)
self.assertFalse(applied)
self._checkCurrentHandler(self._handler) self._checkCurrentHandler(self._handler)
self.assertTrue(self._handlers.isPending()) self.assertTrue(self._handlers.isPending())
# process answers # process answers
self._handlers.handle(a1) self._handlers.handle(self._connection, a1)
self._checkCurrentHandler(h1) self._checkCurrentHandler(h1)
self._handlers.handle(a2) self._handlers.handle(self._connection, a2)
self._checkCurrentHandler(h2) self._checkCurrentHandler(h2)
self._handlers.handle(a3) self._handlers.handle(self._connection, a3)
self._checkCurrentHandler(h3) self._checkCurrentHandler(h3)
def testHandleAnswer4(self): def testHandleAnswer4(self):
...@@ -982,13 +988,14 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -982,13 +988,14 @@ class HandlerSwitcherTests(NeoTestBase):
self._handlers.emit(r1, 0, None) self._handlers.emit(r1, 0, None)
self._handlers.emit(r2, 0, None) self._handlers.emit(r2, 0, None)
self._handlers.emit(r3, 0, None) self._handlers.emit(r3, 0, None)
self._handlers.setHandler(h) applied = self._handlers.setHandler(h)
self.assertFalse(applied)
# process answers # process answers
self._handlers.handle(a1) self._handlers.handle(self._connection, a1)
self._checkCurrentHandler(self._handler) self._checkCurrentHandler(self._handler)
self._handlers.handle(a2) self._handlers.handle(self._connection, a2)
self._checkCurrentHandler(self._handler) self._checkCurrentHandler(self._handler)
self._handlers.handle(a3) self._handlers.handle(self._connection, a3)
self._checkCurrentHandler(h) self._checkCurrentHandler(h)
def testHandleUnexpected(self): def testHandleUnexpected(self):
...@@ -999,10 +1006,11 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -999,10 +1006,11 @@ class HandlerSwitcherTests(NeoTestBase):
h = self._makeHandler() h = self._makeHandler()
# emit requests aroung state setHandler # emit requests aroung state setHandler
self._handlers.emit(r1, 0, None) self._handlers.emit(r1, 0, None)
self._handlers.setHandler(h) applied = self._handlers.setHandler(h)
self.assertFalse(applied)
self._handlers.emit(r2, 0, None) self._handlers.emit(r2, 0, None)
# process answer for next state # process answer for next state
self._handlers.handle(a2) self._handlers.handle(self._connection, a2)
self.checkAborted(self._connection) self.checkAborted(self._connection)
def testTimeout(self): def testTimeout(self):
...@@ -1012,7 +1020,8 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -1012,7 +1020,8 @@ class HandlerSwitcherTests(NeoTestBase):
""" """
now = time() now = time()
# No timeout when no pending request # No timeout when no pending request
self.assertEqual(self._handlers.checkTimeout(now), None) self.assertEqual(self._handlers.checkTimeout(self._connection, now),
None)
# Prepare some requests # Prepare some requests
msg_id_1 = 1 msg_id_1 = 1
msg_id_2 = 2 msg_id_2 = 2
...@@ -1041,21 +1050,26 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -1041,21 +1050,26 @@ class HandlerSwitcherTests(NeoTestBase):
self._handlers.emit(r1, msg_1_time, None) self._handlers.emit(r1, msg_1_time, None)
self._handlers.emit(r2, msg_2_time, None) self._handlers.emit(r2, msg_2_time, None)
# No timeout before msg_1_time # No timeout before msg_1_time
self.assertEqual(self._handlers.checkTimeout(now), None) self.assertEqual(self._handlers.checkTimeout(self._connection, now),
None)
# Timeout for msg_1 after msg_1_time # Timeout for msg_1 after msg_1_time
self.assertEqual(self._handlers.checkTimeout(msg_1_time + 0.5), self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_id_1) msg_1_time + 0.5), msg_id_1)
# If msg_1 met its answer, no timeout after msg_1_time # If msg_1 met its answer, no timeout after msg_1_time
self._handlers.handle(a1) self._handlers.handle(self._connection, a1)
self.assertEqual(self._handlers.checkTimeout(msg_1_time + 0.5), None) self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_1_time + 0.5), None)
# Next timeout is after msg_2_time # Next timeout is after msg_2_time
self.assertEqual(self._handlers.checkTimeout(msg_2_time + 0.5), msg_id_2) self.assertEqual(self._handlers.checkTimeout(self._connection,
self._handlers.handle(a2) msg_2_time + 0.5), msg_id_2)
self._handlers.handle(self._connection, a2)
# Sanity check # Sanity check
self.assertEqual(self._handlers.checkTimeout(msg_2_time + 0.5), None) self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_2_time + 0.5), None)
# msg_3 timeout will fire msg_3_on_timeout callback, which causes the # msg_3 timeout will fire msg_3_on_timeout callback, which causes the
# timeout to be ignored (it returns True) # timeout to be ignored (it returns True)
self.assertEqual(self._handlers.checkTimeout(msg_3_time + 0.5), None) self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_3_time + 0.5), None)
# ...check that callback actually fired # ...check that callback actually fired
self.assertEqual(len(markers), 1) self.assertEqual(len(markers), 1)
# ...with expected parameters # ...with expected parameters
...@@ -1067,7 +1081,8 @@ class HandlerSwitcherTests(NeoTestBase): ...@@ -1067,7 +1081,8 @@ class HandlerSwitcherTests(NeoTestBase):
self._handlers.emit(r4, msg_4_time, OnTimeout(msg_4_on_timeout)) self._handlers.emit(r4, msg_4_time, OnTimeout(msg_4_on_timeout))
# msg_4 timeout will fire msg_4_on_timeout callback, which lets the # msg_4 timeout will fire msg_4_on_timeout callback, which lets the
# timeout be detected (it returns False) # timeout be detected (it returns False)
self.assertEqual(self._handlers.checkTimeout(msg_4_time + 0.5), msg_id_4) self.assertEqual(self._handlers.checkTimeout(self._connection,
msg_4_time + 0.5), msg_id_4)
# ...check that callback actually fired # ...check that callback actually fired
self.assertEqual(len(markers), 1) self.assertEqual(len(markers), 1)
# ...with expected parameters # ...with expected parameters
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment