Commit 3d93441b authored by Grégory Wisniewski's avatar Grégory Wisniewski

All-in-one commit :

- Some private methods of Connection prefixed with '_'
- Thus those methods are no more overriden in MT* classes 
- Add description of notify/ask/answer methods and lockCheckWrapper
- Remove a forgotten addPacket from storage handler and another from master
service handler that requires to add an optional msg_id parameter to notify
- Update tests


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@509 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent ddb2d2bd
......@@ -153,7 +153,7 @@ class Connection(BaseConnection):
def setUUID(self, uuid):
self.uuid = uuid
def getNextId(self):
def _getNextId(self):
next_id = self.cur_id
# Deal with an overflow.
if self.cur_id == 0xffffffff:
......@@ -189,7 +189,7 @@ class Connection(BaseConnection):
def writable(self):
"""Called when self is writable."""
self.send()
self._send()
if not self.pending():
if self.aborted:
self.close()
......@@ -198,7 +198,7 @@ class Connection(BaseConnection):
def readable(self):
"""Called when self is readable."""
self.recv()
self._recv()
self.analyse()
if self.aborted:
......@@ -237,7 +237,7 @@ class Connection(BaseConnection):
def pending(self):
return self.connector is not None and self.write_buf
def recv(self):
def _recv(self):
"""Receive data from a connector."""
try:
data = self.connector.receive()
......@@ -259,7 +259,7 @@ class Connection(BaseConnection):
# unhandled connector exception
raise
def send(self):
def _send(self):
"""Send data to a connector."""
if not self.write_buf:
return
......@@ -278,7 +278,7 @@ class Connection(BaseConnection):
# unhandled connector exception
raise
def addPacket(self, packet):
def _addPacket(self, packet):
"""Add a packet into the write buffer."""
if self.connector is None:
return
......@@ -324,23 +324,27 @@ class Connection(BaseConnection):
self.event_dict[msg_id] = event
self.em.addIdleEvent(event)
def notify(self, packet):
msg_id = self.getNextId()
def notify(self, packet, msg_id=None):
""" Then a packet with a new ID """
if msg_id is None:
msg_id = self._getNextId()
packet.setId(msg_id)
self.addPacket(packet)
self._addPacket(packet)
return msg_id
def ask(self, packet, timeout=5, additional_timeout=30):
msg_id = self.getNextId()
""" Send a packet with a new ID and register the expectation of an answer """
msg_id = self._getNextId()
packet.setId(msg_id)
self.expectMessage(msg_id)
self.addPacket(packet)
self._addPacket(packet)
return msg_id
def answer(self, packet, answer_to):
msg_id = answer_to.getId()
def answer(self, packet, answered_packet):
""" Answer to a packet by re-using its ID for the packet answer """
msg_id = answered_packet.getId()
packet.setId(msg_id)
self.addPacket(packet)
self._addPacket(packet)
def isServerConnection(self):
raise NotImplementedError
......@@ -418,14 +422,6 @@ class MTClientConnection(ClientConnection):
def unlock(self):
self.release()
@lockCheckWrapper
def recv(self, *args, **kw):
return super(MTClientConnection, self).recv(*args, **kw)
@lockCheckWrapper
def send(self, *args, **kw):
return super(MTClientConnection, self).send(*args, **kw)
@lockCheckWrapper
def writable(self, *args, **kw):
return super(MTClientConnection, self).writable(*args, **kw)
......@@ -439,16 +435,20 @@ class MTClientConnection(ClientConnection):
return super(MTClientConnection, self).analyse(*args, **kw)
@lockCheckWrapper
def addPacket(self, *args, **kw):
return super(MTClientConnection, self).addPacket(*args, **kw)
def expectMessage(self, *args, **kw):
return super(MTClientConnection, self).expectMessage(*args, **kw)
@lockCheckWrapper
def notify(self, *args, **kw):
return super(MTClientConnection, self).notify(*args, **kw)
@lockCheckWrapper
def getNextId(self, *args, **kw):
return super(MTClientConnection, self).getNextId(*args, **kw)
def ask(self, *args, **kw):
return super(MTClientConnection, self).ask(*args, **kw)
@lockCheckWrapper
def expectMessage(self, *args, **kw):
return super(MTClientConnection, self).expectMessage(*args, **kw)
def answer(self, *args, **kw):
return super(MTClientConnection, self).answer(*args, **kw)
class MTServerConnection(ServerConnection):
"""A Multithread-safe version of ServerConnection."""
......@@ -469,14 +469,6 @@ class MTServerConnection(ServerConnection):
def unlock(self):
self.release()
@lockCheckWrapper
def recv(self, *args, **kw):
return super(MTServerConnection, self).recv(*args, **kw)
@lockCheckWrapper
def send(self, *args, **kw):
return super(MTServerConnection, self).send(*args, **kw)
@lockCheckWrapper
def writable(self, *args, **kw):
return super(MTServerConnection, self).writable(*args, **kw)
......@@ -490,14 +482,18 @@ class MTServerConnection(ServerConnection):
return super(MTServerConnection, self).analyse(*args, **kw)
@lockCheckWrapper
def addPacket(self, *args, **kw):
return super(MTServerConnection, self).addPacket(*args, **kw)
def expectMessage(self, *args, **kw):
return super(MTServerConnection, self).expectMessage(*args, **kw)
@lockCheckWrapper
def notify(self, *args, **kw):
return super(MTClientConnection, self).notify(*args, **kw)
@lockCheckWrapper
def getNextId(self, *args, **kw):
return super(MTServerConnection, self).getNextId(*args, **kw)
def ask(self, *args, **kw):
return super(MTClientConnection, self).ask(*args, **kw)
@lockCheckWrapper
def expectMessage(self, *args, **kw):
return super(MTServerConnection, self).expectMessage(*args, **kw)
def answer(self, *args, **kw):
return super(MTClientConnection, self).answer(*args, **kw)
......@@ -467,10 +467,8 @@ class ServiceEventHandler(MasterEventHandler):
node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() == CLIENT_NODE_TYPE:
if c is t.getConnection():
# TODO: use connection.notify if possible
p = protocol.notifyTransactionFinished(tid)
p.setId(t.getMessageId())
c.addPacket(p)
c.notify(p, t.getMessageId())
else:
p = protocol.invalidateObjects(t.getOIDList(), tid)
c.notify(p)
......
......@@ -200,7 +200,7 @@ server: 127.0.0.1:10023
def checkCalledNotifyTransactionFinished(self, conn, packet_number=0):
""" Check notifyTransactionFinished message has been send"""
call = conn.mockGetNamedCalls("addPacket")[packet_number]
call = conn.mockGetNamedCalls("notify")[packet_number]
packet = call.getParam(0)
self.assertTrue(isinstance(packet, Packet))
self.assertEquals(packet.getType(), NOTIFY_TRANSACTION_FINISHED)
......@@ -240,7 +240,7 @@ server: 127.0.0.1:10023
args = (node_type, uuid, ip, port, self.app.name)
packet = protocol.requestNodeIdentification(*args)
# test alien cluster
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None})
conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None})
self.service.handleRequestNodeIdentification(conn, packet, *args)
self.checkCalledAcceptNodeIdentification(conn)
return uuid
......@@ -252,14 +252,14 @@ server: 127.0.0.1:10023
args = (STORAGE_NODE_TYPE, uuid, '127.0.0.1', self.storage_port, 'INVALID_NAME')
packet = protocol.requestNodeIdentification(*args)
# test alien cluster
conn = Mock({"addPacket" : None, "abort" : None})
conn = Mock({"_addPacket" : None, "abort" : None})
ptid = self.app.lptid
self.checkProtocolErrorRaised(service.handleRequestNodeIdentification, conn, packet, *args)
self.assertEquals(len(self.app.nm.getStorageNodeList()), 0)
self.assertEquals(self.app.lptid, ptid)
# test connection of a storage node
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None})
conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None})
ptid = self.app.lptid
service.handleRequestNodeIdentification(conn,
packet=packet,
......@@ -277,7 +277,7 @@ server: 127.0.0.1:10023
self.failUnless(self.app.lptid > ptid)
# send message again for the same storage node, MN must recognize it
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None})
conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None})
ptid = self.app.lptid
service.handleRequestNodeIdentification(conn,
packet=packet,
......@@ -296,7 +296,7 @@ server: 127.0.0.1:10023
# send message again for the same storage node but different uuid
# must be rejected as SN is considered as running
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None})
conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None})
ptid = self.app.lptid
new_uuid = self.getNewUUID()
......@@ -318,7 +318,7 @@ server: 127.0.0.1:10023
# same test, but set SN as not running before
# this new node must replaced the old one
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None})
conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None})
ptid = self.app.lptid
sn = self.app.nm.getStorageNodeList()[0]
self.assertEquals(sn.getState(), RUNNING_STATE)
......@@ -344,7 +344,7 @@ server: 127.0.0.1:10023
# send message again for the same storage node but different address
# A new UUID should be send and the node is added to the storage node list
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None})
conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None})
ptid = self.app.lptid
service.handleRequestNodeIdentification(conn,
packet=packet,
......@@ -370,7 +370,7 @@ server: 127.0.0.1:10023
self.failUnless(self.app.lptid > ptid)
# mark the node as broken and request identification, this must be forbidden
conn = Mock({"addPacket" : None, "abort" : None, "expectMessage" : None})
conn = Mock({"_addPacket" : None, "abort" : None, "expectMessage" : None})
ptid = self.app.lptid
sn = self.app.nm.getStorageNodeList()[0]
self.assertEquals(sn.getState(), RUNNING_STATE)
......@@ -400,7 +400,7 @@ server: 127.0.0.1:10023
uuid = self.identifyToMasterNode()
packet = protocol.askPrimaryMaster()
# test answer to a storage node
conn = Mock({"addPacket" : None,
conn = Mock({"_addPacket" : None,
"answerPrimaryMaster" : None,
"notifyNodeInformation" : None,
"sendPartitionTable" : None,
......@@ -417,7 +417,7 @@ server: 127.0.0.1:10023
# Same but identify as a client node, must not get start operation message
uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=11021)
packet = protocol.askPrimaryMaster()
conn = Mock({"addPacket" : None, "abort" : None, "answerPrimaryMaster" : None,
conn = Mock({"_addPacket" : None, "abort" : None, "answerPrimaryMaster" : None,
"notifyNodeInformation" : None, "sendPartitionTable" : None,
"getUUID" : uuid, "getAddress" : ("127.0.0.1", 11021)})
service.handleAskPrimaryMaster(conn, packet)
......@@ -736,7 +736,6 @@ server: 127.0.0.1:10023
self.assertEquals(len(storage_conn_1.mockGetNamedCalls("notify")), 1)
self.assertEquals(len(storage_conn_2.mockGetNamedCalls("ask")), 1)
self.assertEquals(len(storage_conn_2.mockGetNamedCalls("notify")), 1)
self.assertEquals(len(conn.mockGetNamedCalls("addPacket")), 1)
self.checkCalledLockInformation(storage_conn_1)
self.checkCalledLockInformation(storage_conn_2)
......
......@@ -63,8 +63,8 @@ class StorageEventHandler(EventHandler):
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p = protocol.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
conn.addPacket(p)
p = protocol.answerPrimaryMaster(primary_uuid, known_master_list)
conn.answer(p, packet)
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list):
......
......@@ -172,15 +172,15 @@ class testConnection(unittest.TestCase):
self.assertEqual(bc.getUUID(), uuid)
# test next id
cur_id = bc.cur_id
next_id = bc.getNextId()
next_id = bc._getNextId()
self.assertEqual(next_id, cur_id)
next_id = bc.getNextId()
next_id = bc._getNextId()
self.failUnless(next_id > cur_id)
# test overflow of next id
bc.cur_id = 0xffffffff
next_id = bc.getNextId()
next_id = bc._getNextId()
self.assertEqual(next_id, 0xffffffff)
next_id = bc.getNextId()
next_id = bc._getNextId()
self.assertEqual(next_id, 0)
# test abort
bc.abort()
......@@ -224,7 +224,7 @@ class testConnection(unittest.TestCase):
self.assertEqual(bc.read_buf, '')
self.assertNotEqual(bc.getConnector(), None)
bc.recv()
bc._recv()
self.assertEqual(bc.read_buf, "testdata")
# patch receive method to raise try again
......@@ -236,7 +236,7 @@ class testConnection(unittest.TestCase):
connector=connector, addr=("127.0.0.7", 93413))
self.assertEqual(bc.read_buf, '')
self.assertNotEqual(bc.getConnector(), None)
bc.recv()
bc._recv()
self.assertEqual(bc.read_buf, '')
self.assertEquals(len(handler.mockGetNamedCalls("connectionClosed")), 0)
self.assertEquals(len(em.mockGetNamedCalls("unregister")), 0)
......@@ -251,7 +251,7 @@ class testConnection(unittest.TestCase):
self.assertNotEqual(bc.getConnector(), None)
# fake client connection instance with connecting attribute
bc.connecting = True
bc.recv()
bc._recv()
self.assertEqual(bc.read_buf, '')
self.assertEquals(len(handler.mockGetNamedCalls("connectionFailed")), 1)
self.assertEquals(len(em.mockGetNamedCalls("unregister")), 1)
......@@ -264,7 +264,7 @@ class testConnection(unittest.TestCase):
connector=connector, addr=("127.0.0.7", 93413))
self.assertEqual(bc.read_buf, '')
self.assertNotEqual(bc.getConnector(), None)
self.assertRaises(ConnectorException, bc.recv)
self.assertRaises(ConnectorException, bc._recv)
self.assertEqual(bc.read_buf, '')
self.assertEquals(len(handler.mockGetNamedCalls("connectionClosed")), 1)
self.assertEquals(len(em.mockGetNamedCalls("unregister")), 2)
......@@ -280,7 +280,7 @@ class testConnection(unittest.TestCase):
connector=connector, addr=("127.0.0.7", 93413))
self.assertEqual(bc.write_buf, '')
self.assertNotEqual(bc.getConnector(), None)
bc.send()
bc._send()
self.assertEquals(len(connector.mockGetNamedCalls("send")), 0)
self.assertEquals(len(handler.mockGetNamedCalls("connectionClosed")), 0)
self.assertEquals(len(em.mockGetNamedCalls("unregister")), 0)
......@@ -295,7 +295,7 @@ class testConnection(unittest.TestCase):
self.assertEqual(bc.write_buf, '')
bc.write_buf = "testdata"
self.assertNotEqual(bc.getConnector(), None)
bc.send()
bc._send()
self.assertEquals(len(connector.mockGetNamedCalls("send")), 1)
call = connector.mockGetNamedCalls("send")[0]
data = call.getParam(0)
......@@ -314,7 +314,7 @@ class testConnection(unittest.TestCase):
self.assertEqual(bc.write_buf, '')
bc.write_buf = "testdata"
self.assertNotEqual(bc.getConnector(), None)
bc.send()
bc._send()
self.assertEquals(len(connector.mockGetNamedCalls("send")), 1)
call = connector.mockGetNamedCalls("send")[0]
data = call.getParam(0)
......@@ -333,7 +333,7 @@ class testConnection(unittest.TestCase):
self.assertEqual(bc.write_buf, '')
bc.write_buf = "testdata" + "second" + "third"
self.assertNotEqual(bc.getConnector(), None)
bc.send()
bc._send()
self.assertEquals(len(connector.mockGetNamedCalls("send")), 1)
call = connector.mockGetNamedCalls("send")[0]
data = call.getParam(0)
......@@ -352,7 +352,7 @@ class testConnection(unittest.TestCase):
self.assertEqual(bc.write_buf, '')
bc.write_buf = "testdata" + "second" + "third"
self.assertNotEqual(bc.getConnector(), None)
bc.send()
bc._send()
self.assertEquals(len(connector.mockGetNamedCalls("send")), 1)
call = connector.mockGetNamedCalls("send")[0]
data = call.getParam(0)
......@@ -371,7 +371,7 @@ class testConnection(unittest.TestCase):
self.assertEqual(bc.write_buf, '')
bc.write_buf = "testdata" + "second" + "third"
self.assertNotEqual(bc.getConnector(), None)
bc.send()
bc._send()
self.assertEquals(len(connector.mockGetNamedCalls("send")), 1)
call = connector.mockGetNamedCalls("send")[0]
data = call.getParam(0)
......@@ -390,7 +390,7 @@ class testConnection(unittest.TestCase):
self.assertEqual(bc.write_buf, '')
bc.write_buf = "testdata" + "second" + "third"
self.assertNotEqual(bc.getConnector(), None)
self.assertRaises(ConnectorException, bc.send)
self.assertRaises(ConnectorException, bc._send)
self.assertEquals(len(connector.mockGetNamedCalls("send")), 1)
call = connector.mockGetNamedCalls("send")[0]
data = call.getParam(0)
......@@ -410,7 +410,7 @@ class testConnection(unittest.TestCase):
connector=None, addr=("127.0.0.7", 93413))
self.assertEqual(bc.getConnector(), None)
self.assertEqual(bc.write_buf, '')
bc.addPacket(p)
bc._addPacket(p)
self.assertEqual(bc.write_buf, '')
self.assertEquals(len(em.mockGetNamedCalls("addWriter")), 0)
......@@ -420,7 +420,7 @@ class testConnection(unittest.TestCase):
connector=connector, addr=("127.0.0.7", 93413))
self.assertEqual(bc.write_buf, '')
self.assertNotEqual(bc.getConnector(), None)
bc.addPacket(p)
bc._addPacket(p)
self.assertEqual(bc.write_buf, "testdata")
self.assertEquals(len(em.mockGetNamedCalls("addWriter")), 1)
......@@ -959,15 +959,15 @@ class testConnection(unittest.TestCase):
self.assertEqual(bc.getUUID(), uuid)
# test next id
cur_id = bc.cur_id
next_id = bc.getNextId()
next_id = bc._getNextId()
self.assertEqual(next_id, cur_id)
next_id = bc.getNextId()
next_id = bc._getNextId()
self.failUnless(next_id > cur_id)
# test overflow of next id
bc.cur_id = 0xffffffff
next_id = bc.getNextId()
next_id = bc._getNextId()
self.assertEqual(next_id, 0xffffffff)
next_id = bc.getNextId()
next_id = bc._getNextId()
self.assertEqual(next_id, 0)
# test abort
bc.abort()
......@@ -1074,15 +1074,15 @@ class testConnection(unittest.TestCase):
# test next id
bc._lock = Mock({'_is_owned': True})
cur_id = bc.cur_id
next_id = bc.getNextId()
next_id = bc._getNextId()
self.assertEqual(next_id, cur_id)
next_id = bc.getNextId()
next_id = bc._getNextId()
self.failUnless(next_id > cur_id)
# test overflow of next id
bc.cur_id = 0xffffffff
next_id = bc.getNextId()
next_id = bc._getNextId()
self.assertEqual(next_id, 0xffffffff)
next_id = bc.getNextId()
next_id = bc._getNextId()
self.assertEqual(next_id, 0)
# test abort
bc.abort()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment