Commit 3fd4afc0 authored by Vincent Pelletier's avatar Vincent Pelletier

Simplify IdleEvent.

- Return False when a ping has been sent, so current instance isn't trashed
  and replaced by ping's internal expectPacket.
- Make current idleEvent instance handle ping's timeout, to avoid creating
  a new event for each ping sent in relation to an existing event.
- Don't call expectPacket from ping when given a msg_id, meaning it is
  related to an existing event.
- Add a minimal delay of 5 seconds between 2 pings.
- Update event tests

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1713 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent e470d940
...@@ -290,13 +290,17 @@ class Connection(BaseConnection): ...@@ -290,13 +290,17 @@ class Connection(BaseConnection):
return return
self.read_buf = self.read_buf[len(packet):] self.read_buf = self.read_buf[len(packet):]
packet_type = packet.getType()
# Remove idle events, if appropriate packets were received. # Remove idle events, if appropriate packets were received.
for msg_id in (None, packet.getId()): for msg_id in (None, packet.getId()):
event = self.event_dict.pop(msg_id, None) event = self.event_dict.pop(msg_id, None)
if event is not None: if event is not None:
if packet_type == Packets.Pong:
self.em.refreshIdleEvent(event)
self.event_dict[msg_id] = event
else:
self.em.removeIdleEvent(event) self.em.removeIdleEvent(event)
packet_type = packet.getType()
if packet_type == Packets.Ping: if packet_type == Packets.Ping:
# Send a pong notification # Send a pong notification
self.answer(Packets.Pong(), packet.getId()) self.answer(Packets.Pong(), packet.getId())
...@@ -474,12 +478,13 @@ class Connection(BaseConnection): ...@@ -474,12 +478,13 @@ class Connection(BaseConnection):
assert packet.isResponse(), packet assert packet.isResponse(), packet
self._addPacket(packet) self._addPacket(packet)
def ping(self, timeout=5): def ping(self, timeout=5, msg_id=None):
""" Send a ping and expect to receive a pong notification """ """ Send a ping and expect to receive a pong notification """
packet = Packets.Ping() packet = Packets.Ping()
if msg_id is None:
msg_id = self._getNextId() msg_id = self._getNextId()
packet.setId(msg_id)
self.expectMessage(msg_id, timeout, 0) self.expectMessage(msg_id, timeout, 0)
packet.setId(msg_id)
self._addPacket(packet) self._addPacket(packet)
......
...@@ -21,6 +21,9 @@ from time import time ...@@ -21,6 +21,9 @@ from time import time
from neo.epoll import Epoll from neo.epoll import Epoll
PING_DELAY = 5
PING_TIMEOUT = 5
class IdleEvent(object): class IdleEvent(object):
""" """
This class represents an event called when a connection is waiting for This class represents an event called when a connection is waiting for
...@@ -33,7 +36,7 @@ class IdleEvent(object): ...@@ -33,7 +36,7 @@ class IdleEvent(object):
t = time() t = time()
self._time = t + timeout self._time = t + timeout
self._critical_time = t + timeout + additional_timeout self._critical_time = t + timeout + additional_timeout
self._additional_timeout = additional_timeout self.refresh()
def getId(self): def getId(self):
return self._id return self._id
...@@ -44,9 +47,12 @@ class IdleEvent(object): ...@@ -44,9 +47,12 @@ class IdleEvent(object):
def getCriticalTime(self): def getCriticalTime(self):
return self._critical_time return self._critical_time
def refresh(self):
self._next_critical_time = self._critical_time
def __call__(self, t): def __call__(self, t):
conn = self._conn conn = self._conn
if t > self._critical_time: if t > self._next_critical_time:
# No answer after _critical_time, close connection. # No answer after _critical_time, close connection.
# This means that remote peer is processing the request for too # This means that remote peer is processing the request for too
# long, although being responsive at network level. # long, although being responsive at network level.
...@@ -62,31 +68,20 @@ class IdleEvent(object): ...@@ -62,31 +68,20 @@ class IdleEvent(object):
elif t > self._time: elif t > self._time:
# Still no answer after _time, send a ping to see if connection is # Still no answer after _time, send a ping to see if connection is
# broken. # broken.
# Sending a ping triggers a new IdleEvent for the ping (hard timeout
# after 5 seconds, see part on additional_timeout above).
# XXX: Here, we return True, which causes the current IdleEvent
# instance to be discarded, and a new instance is created with
# reduced additional_timeout. It must be possible to avoid
# recreating a new instance just to keep waiting for the same
# response.
# XXX: This code has no meaning if the remote peer is single- # XXX: This code has no meaning if the remote peer is single-
# threaded. Nevertheless, it should be kept in case it gets # threaded. Nevertheless, it should be kept in case it gets
# multithreaded, someday (master & storage are the only candidates # multithreaded, someday (master & storage are the only candidates
# for using this code, as other don't receive requests). # for using this code, as other don't receive requests).
conn.lock() conn.lock()
try: try:
if self._additional_timeout > 5:
# XXX this line is misleading: we modify self, but this
# instance is doomed anyway: we will return True, causing
# it to be discarded.
self._additional_timeout -= 5
conn.expectMessage(self._id, 5, self._additional_timeout)
conn.ping() conn.ping()
else:
conn.expectMessage(self._id, self._additional_timeout, 0)
return True
finally: finally:
conn.unlock() conn.unlock()
# Don't retry pinging after at least PING_DELAY seconds have
# passed.
self._time = t + PING_DELAY
self._next_critical_time = min(self._critical_time,
t + PING_TIMEOUT)
return False return False
class EpollEventManager(object): class EpollEventManager(object):
...@@ -231,6 +226,9 @@ class EpollEventManager(object): ...@@ -231,6 +226,9 @@ class EpollEventManager(object):
except ValueError: except ValueError:
pass pass
def refreshIdleEvent(self, event):
event.refresh()
def addReader(self, conn): def addReader(self, conn):
connector = conn.getConnector() connector = conn.getConnector()
assert connector is not None, conn.whoSetConnector() assert connector is not None, conn.whoSetConnector()
......
...@@ -157,7 +157,6 @@ class EventTests(NeoTestBase): ...@@ -157,7 +157,6 @@ class EventTests(NeoTestBase):
self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0) self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("close")), 0) self.assertEquals(len(conn.mockGetNamedCalls("close")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 0) self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("expectMessage")), 0)
self.checkNoPacketSent(conn) self.checkNoPacketSent(conn)
self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0) self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0)
...@@ -165,13 +164,12 @@ class EventTests(NeoTestBase): ...@@ -165,13 +164,12 @@ class EventTests(NeoTestBase):
t = time + 5 t = time + 5
self.assertTrue(t < critical_time) self.assertTrue(t < critical_time)
r = event(t) r = event(t)
self.assertTrue(r) self.assertFalse(r)
self.assertEquals(len(conn.mockGetNamedCalls("lock")), 1) self.assertEquals(len(conn.mockGetNamedCalls("lock")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0) self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("close")), 0) self.assertEquals(len(conn.mockGetNamedCalls("close")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 1) self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("ping")), 1) self.assertEquals(len(conn.mockGetNamedCalls("ping")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("expectMessage")), 1)
self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0) self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0)
# call with time < critical_time < t # call with time < critical_time < t
...@@ -184,7 +182,6 @@ class EventTests(NeoTestBase): ...@@ -184,7 +182,6 @@ class EventTests(NeoTestBase):
self.assertEquals(len(conn.mockGetNamedCalls("close")), 1) self.assertEquals(len(conn.mockGetNamedCalls("close")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 2) self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 2)
self.assertEquals(len(conn.mockGetNamedCalls("ping")), 1) self.assertEquals(len(conn.mockGetNamedCalls("ping")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("expectMessage")), 1)
self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 1) self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 1)
# same test with additional time < 5 # same test with additional time < 5
...@@ -208,7 +205,6 @@ class EventTests(NeoTestBase): ...@@ -208,7 +205,6 @@ class EventTests(NeoTestBase):
self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0) self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("close")), 0) self.assertEquals(len(conn.mockGetNamedCalls("close")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 0) self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("expectMessage")), 0)
self.checkNoPacketSent(conn) self.checkNoPacketSent(conn)
self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0) self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0)
...@@ -216,12 +212,11 @@ class EventTests(NeoTestBase): ...@@ -216,12 +212,11 @@ class EventTests(NeoTestBase):
t = time + 1 t = time + 1
self.assertTrue(t < critical_time) self.assertTrue(t < critical_time)
r = event(t) r = event(t)
self.assertTrue(r) self.assertFalse(r)
self.assertEquals(len(conn.mockGetNamedCalls("lock")), 1) self.assertEquals(len(conn.mockGetNamedCalls("lock")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0) self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("close")), 0) self.assertEquals(len(conn.mockGetNamedCalls("close")), 0)
self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 1) self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("expectMessage")), 1)
self.checkNoPacketSent(conn) self.checkNoPacketSent(conn)
self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0) self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 0)
...@@ -234,7 +229,6 @@ class EventTests(NeoTestBase): ...@@ -234,7 +229,6 @@ class EventTests(NeoTestBase):
self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 1) self.assertEquals(len(conn.mockGetNamedCalls("getHandler")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("close")), 1) self.assertEquals(len(conn.mockGetNamedCalls("close")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 2) self.assertEquals(len(conn.mockGetNamedCalls("unlock")), 2)
self.assertEquals(len(conn.mockGetNamedCalls("expectMessage")), 1)
self.checkNoPacketSent(conn) self.checkNoPacketSent(conn)
self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 1) self.assertEquals(len(handler.mockGetNamedCalls("timeoutExpired")), 1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment