From f26983bba78f1a7176763172717047f72e99167a Mon Sep 17 00:00:00 2001 From: Vincent Pelletier <vincent@nexedi.com> Date: Tue, 27 Apr 2010 15:11:35 +0000 Subject: [PATCH] Add tests for ping/pong handling in Connection.analyse . git-svn-id: https://svn.erp5.org/repos/neo/trunk@2032 71dcc9de-d417-0410-9af5-da40c76e7ee4 --- neo/tests/testConnection.py | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/neo/tests/testConnection.py b/neo/tests/testConnection.py index 7fb5796b..151d7c78 100644 --- a/neo/tests/testConnection.py +++ b/neo/tests/testConnection.py @@ -24,8 +24,9 @@ from neo.connector import getConnectorHandler, registerConnectorHandler from neo.tests import DoNothingConnector from neo.connector import ConnectorException, ConnectorTryAgainException, \ ConnectorInProgressException, ConnectorConnectionRefusedException -from neo.protocol import Packets +from neo.protocol import Packets, ParserState from neo.tests import NeoTestBase +from neo.util import ReadBuffer class ConnectionTests(NeoTestBase): @@ -505,6 +506,40 @@ class ConnectionTests(NeoTestBase): self.assertEqual(data.decode(), p.decode()) self._checkReadBuf(bc, '') + def test_Connection_analyse5(self): + # test ping handling + bc = self._makeConnection() + bc._queue = Mock() + p = Packets.Ping() + p.setId(1) + self._appendPacketToReadBuf(bc, p) + bc.analyse() + # check no packet was queued + self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 0) + # check pong answered + parser_state = ParserState() + buffer = ReadBuffer() + for chunk in bc.write_buf: + buffer.append(chunk) + answer = Packets.parse(buffer, parser_state) + self.assertTrue(answer is not None) + self.assertTrue(answer.getType() == Packets.Pong) + self.assertEqual(answer.getId(), p.getId()) + + def test_Connection_analyse6(self): + # test pong handling + bc = self._makeConnection() + bc._timeout = Mock() + bc._queue = Mock() + p = Packets.Pong() + p.setId(1) + self._appendPacketToReadBuf(bc, p) + bc.analyse() + # check no packet was queued + self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 0) + # check timeout has been refreshed + self.assertEquals(len(bc._timeout.mockGetNamedCalls("refresh")), 1) + def test_Connection_writable1(self): # with pending operation after send def send(self, data): -- 2.30.9