Commit fe3cd624 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Rename packets *NewTID to *BeginTransaction to be consistent with future

modifications where a TID can be given when starting a transaction. This is
necessary to support the copyTransactionsFrom() from BaseStorage and have a
better support of the ZODB API, where the tid parameter to tpc_begin() can 
be not None.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1005 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent b1b29e6c
......@@ -662,7 +662,7 @@ class Application(object):
# Get a new transaction id if necessary
if tid is None:
self.local_var.tid = None
self._askPrimary(protocol.askNewTID())
self._askPrimary(protocol.askBeginTransaction())
if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed')
else:
......
......@@ -212,7 +212,7 @@ class PrimaryNotificationsHandler(BaseHandler):
class PrimaryAnswersHandler(AnswerBaseHandler):
""" Handle that process expected packets from the primary master """
def handleAnswerNewTID(self, conn, packet, tid):
def handleAnswerBeginTransaction(self, conn, packet, tid):
app = self.app
app.setTID(tid)
......
......@@ -29,7 +29,7 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT
ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_NEW_TID, ANSWER_NEW_TID, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_BEGIN_TRANSACTION, ANSWER_BEGIN_TRANSACTION, \
FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
......@@ -236,10 +236,10 @@ class EventHandler(object):
def handleCommitTransaction(self, conn, packet, tid):
raise UnexpectedPacketError
def handleAskNewTID(self, conn, packet):
def handleAskBeginTransaction(self, conn, packet):
raise UnexpectedPacketError
def handleAnswerNewTID(self, conn, packet, tid):
def handleAnswerBeginTransaction(self, conn, packet, tid):
raise UnexpectedPacketError
def handleAskNewOIDs(self, conn, packet, num_oids):
......@@ -406,8 +406,8 @@ class EventHandler(object):
d[ANSWER_OBJECT_PRESENT] = self.handleAnswerObjectPresent
d[DELETE_TRANSACTION] = self.handleDeleteTransaction
d[COMMIT_TRANSACTION] = self.handleCommitTransaction
d[ASK_NEW_TID] = self.handleAskNewTID
d[ANSWER_NEW_TID] = self.handleAnswerNewTID
d[ASK_BEGIN_TRANSACTION] = self.handleAskBeginTransaction
d[ANSWER_BEGIN_TRANSACTION] = self.handleAnswerBeginTransaction
d[FINISH_TRANSACTION] = self.handleFinishTransaction
d[NOTIFY_TRANSACTION_FINISHED] = self.handleNotifyTransactionFinished
d[LOCK_INFORMATION] = self.handleLockInformation
......
......@@ -80,7 +80,7 @@ class MasterHandler(EventHandler):
def handleOidNotFound(self, conn, packet, message):
logging.error('ignoring OID Not Found in %s' % self.__class__.__name__)
def handleAskNewTID(self, conn, packet):
def handleAskBeginTransaction(self, conn, packet):
logging.error('ignoring Ask New TID in %s' % self.__class__.__name__)
def handleAskNewOIDs(self, conn, packet, num_oids):
......
......@@ -94,11 +94,11 @@ class ClientServiceHandler(BaseServiceHandler):
except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid))
def handleAskNewTID(self, conn, packet):
def handleAskBeginTransaction(self, conn, packet):
app = self.app
tid = app.getNextTID()
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.answer(protocol.answerNewTID(tid), packet)
conn.answer(protocol.answerBeginTransaction(tid), packet)
def handleAskNewOIDs(self, conn, packet, num_oids):
app = self.app
......
......@@ -36,7 +36,7 @@ class ShutdownHandler(BaseServiceHandler):
@decorators.identification_required
@decorators.restrict_node_types(CLIENT_NODE_TYPE)
def handleAskNewTID(self, conn, packet):
def handleAskBeginTransaction(self, conn, packet):
logging.error('reject any new demand for new tid')
raise protocol.ProtocolError('cluster is shutting down')
......
......@@ -173,10 +173,10 @@ packet_types = Enum({
'COMMIT_TRANSACTION': 0x0011,
# Ask a new transaction ID. C -> PM.
'ASK_NEW_TID': 0x0012,
'ASK_BEGIN_TRANSACTION': 0x0012,
# Answer a new transaction ID. PM -> C.
'ANSWER_NEW_TID': 0x8012,
'ANSWER_BEGIN_TRANSACTION': 0x8012,
# Finish a transaction. C -> PM.
'FINISH_TRANSACTION': 0x0013,
......@@ -751,15 +751,15 @@ def _decodeCommitTransaction(body):
decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
@handle_errors
def _decodeAskNewTID(body):
def _decodeAskBeginTransaction(body):
pass
decode_table[ASK_NEW_TID] = _decodeAskNewTID
decode_table[ASK_BEGIN_TRANSACTION] = _decodeAskBeginTransaction
@handle_errors
def _decodeAnswerNewTID(body):
def _decodeAnswerBeginTransaction(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID
decode_table[ANSWER_BEGIN_TRANSACTION] = _decodeAnswerBeginTransaction
@handle_errors
def _decodeAskNewOIDs(body):
......@@ -1226,11 +1226,11 @@ def deleteTransaction(tid):
def commitTransaction(tid):
return Packet(COMMIT_TRANSACTION, tid)
def askNewTID():
return Packet(ASK_NEW_TID)
def askBeginTransaction():
return Packet(ASK_BEGIN_TRANSACTION)
def answerNewTID(tid):
return Packet(ANSWER_NEW_TID, tid)
def answerBeginTransaction(tid):
return Packet(ANSWER_BEGIN_TRANSACTION, tid)
def askNewOIDs(num_oids):
return Packet(ASK_NEW_OIDS, pack('!H', num_oids))
......
......@@ -281,7 +281,7 @@ class NeoTestBase(unittest.TestCase):
return self.checkAskPacket(conn, protocol.FINISH_TRANSACTION, **kw)
def checkAskNewTid(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_NEW_TID, **kw)
return self.checkAskPacket(conn, protocol.ASK_BEGIN_TRANSACTION, **kw)
def checkAskLastIDs(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_LAST_IDS, **kw)
......
......@@ -345,7 +345,7 @@ class ClientApplicationTests(NeoTestBase):
# no connection -> NEOStorageError (wait until connected to primary)
#self.assertRaises(NEOStorageError, app.tpc_begin, transaction=txn, tid=None)
# ask a tid to pmn
packet = protocol.answerNewTID(tid=tid)
packet = protocol.answerBeginTransaction(tid=tid)
app.master_conn = Mock({
'getNextId': 1,
'expectMessage': None,
......@@ -583,7 +583,7 @@ class ClientApplicationTests(NeoTestBase):
def hook(tid):
self.f_called = True
self.f_called_with_tid = tid
packet = protocol.answerNewTID(INVALID_TID)
packet = protocol.answerBeginTransaction(INVALID_TID)
app.master_conn = Mock({
'getNextId': 1,
'getAddress': ('127.0.0.1', 10000),
......@@ -856,7 +856,7 @@ class ClientApplicationTests(NeoTestBase):
def _waitMessage_hook(app, conn=None, msg_id=None, handler=None):
self.test_ok = True
_waitMessage_old = Application._waitMessage
packet = protocol.askNewTID()
packet = protocol.askBeginTransaction()
Application._waitMessage = _waitMessage_hook
try:
app._askStorage(conn, packet)
......@@ -882,7 +882,7 @@ class ClientApplicationTests(NeoTestBase):
self.test_ok = True
_waitMessage_old = Application._waitMessage
Application._waitMessage = _waitMessage_hook
packet = protocol.askNewTID()
packet = protocol.askBeginTransaction()
try:
app._askPrimary(packet)
finally:
......
......@@ -790,13 +790,13 @@ class ClientHandlerTests(NeoTestBase):
self.assertEquals(calls[2].getParam(1).getUUID(), uuid3)
self.assertEquals(calls[3].getParam(1).getUUID(), uuid4)
def test_AnswerNewTID(self):
def test_AnswerBeginTransaction(self):
app = Mock({'setTID': None})
dispatcher = self.getDispatcher()
client_handler = PrimaryAnswersHandler(app)
conn = self.getConnection()
test_tid = 1
client_handler.handleAnswerNewTID(conn, None, test_tid)
client_handler.handleAnswerBeginTransaction(conn, None, test_tid)
setTID_call_list = app.mockGetNamedCalls('setTID')
self.assertEquals(len(setTID_call_list), 1)
self.assertEquals(setTID_call_list[0].getParam(0), test_tid)
......
......@@ -29,7 +29,7 @@ from neo.master.app import Application
from neo.protocol import ERROR, PING, PONG, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, \
ASK_LAST_IDS, ANSWER_LAST_IDS, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ASK_NEW_TID, FINISH_TRANSACTION, \
ASK_UNFINISHED_TRANSACTIONS, ASK_BEGIN_TRANSACTION, FINISH_TRANSACTION, \
NOTIFY_INFORMATION_LOCKED, ASK_NEW_OIDS, ABORT_TRANSACTION, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
......@@ -167,15 +167,15 @@ class MasterClientHandlerTests(NeoTestBase):
self.assertEquals(lptid, self.app.pt.getID())
def test_07_handleAskNewTID(self):
def test_07_handleAskBeginTransaction(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ASK_NEW_TID)
packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
ltid = self.app.ltid
# client call it
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
service.handleAskNewTID(conn, packet)
service.handleAskBeginTransaction(conn, packet)
self.failUnless(ltid < self.app.ltid)
self.assertEquals(len(self.app.finishing_transaction_dict), 1)
tid = self.app.finishing_transaction_dict.keys()[0]
......@@ -215,7 +215,7 @@ class MasterClientHandlerTests(NeoTestBase):
storage_conn = self.getFakeConnection(storage_uuid, self.storage_address)
self.assertNotEquals(uuid, client_uuid)
conn = self.getFakeConnection(client_uuid, self.client_address)
service.handleAskNewTID(conn, packet)
service.handleAskBeginTransaction(conn, packet)
oid_list = []
tid = self.app.ltid
conn = self.getFakeConnection(client_uuid, self.client_address)
......@@ -281,9 +281,9 @@ class MasterClientHandlerTests(NeoTestBase):
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE,
port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
conn = self.getFakeConnection(uuid, self.storage_address)
service.handleAskUnfinishedTransactions(conn, packet)
packet = self.checkAnswerUnfinishedTransactions(conn, answered_packet=packet)
......@@ -319,10 +319,10 @@ class MasterClientHandlerTests(NeoTestBase):
port = self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
lptid = self.app.pt.getID()
packet = Packet(msg_type=ASK_NEW_TID)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE)
self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3)
service.peerBroken(conn)
......@@ -360,10 +360,10 @@ class MasterClientHandlerTests(NeoTestBase):
port = self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
lptid = self.app.pt.getID()
packet = Packet(msg_type=ASK_NEW_TID)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE)
self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3)
service.timeoutExpired(conn)
......@@ -401,10 +401,10 @@ class MasterClientHandlerTests(NeoTestBase):
port = self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
lptid = self.app.pt.getID()
packet = Packet(msg_type=ASK_NEW_TID)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE)
self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3)
service.connectionClosed(conn)
......
......@@ -32,7 +32,7 @@ from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIF
ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_NEW_TID, ANSWER_NEW_TID, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_BEGIN_TRANSACTION, ANSWER_BEGIN_TRANSACTION, \
FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
......
......@@ -32,7 +32,7 @@ from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIF
ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_NEW_TID, ANSWER_NEW_TID, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_BEGIN_TRANSACTION, ANSWER_BEGIN_TRANSACTION, \
FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
......
......@@ -29,7 +29,7 @@ from neo.master.app import Application
from neo.protocol import ERROR, PING, PONG, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, \
ASK_LAST_IDS, ANSWER_LAST_IDS, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ASK_NEW_TID, FINISH_TRANSACTION, \
ASK_UNFINISHED_TRANSACTIONS, ASK_BEGIN_TRANSACTION, FINISH_TRANSACTION, \
NOTIFY_INFORMATION_LOCKED, ASK_NEW_OIDS, ABORT_TRANSACTION, \
STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, MASTER_NODE_TYPE, \
RUNNING_STATE, BROKEN_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
......@@ -174,7 +174,7 @@ class MasterStorageHandlerTests(NeoTestBase):
storage_conn_1 = self.getFakeConnection(storage_uuid_1, ("127.0.0.1", self.storage_port))
storage_conn_2 = self.getFakeConnection(storage_uuid_2, ("127.0.0.1", 10022))
conn = self.getFakeConnection(client_uuid, self.client_address)
service.handleAskNewTID(conn, packet)
service.handleAskBeginTransaction(conn, packet)
# clean mock object
conn.mockCalledMethods = {}
conn.mockAllCalledMethods = []
......@@ -226,9 +226,9 @@ class MasterStorageHandlerTests(NeoTestBase):
client_uuid = self.identifyToMasterNode(node_type=CLIENT_NODE_TYPE,
port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
conn = self.getFakeConnection(uuid, self.storage_address)
service.handleAskUnfinishedTransactions(conn, packet)
packet = self.checkAnswerUnfinishedTransactions(conn, answered_packet=packet)
......@@ -333,10 +333,10 @@ class MasterStorageHandlerTests(NeoTestBase):
port = self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
lptid = self.app.pt.getID()
packet = Packet(msg_type=ASK_NEW_TID)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE)
self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3)
service.peerBroken(conn)
......@@ -379,10 +379,10 @@ class MasterStorageHandlerTests(NeoTestBase):
port = self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
lptid = self.app.pt.getID()
packet = Packet(msg_type=ASK_NEW_TID)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE)
self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3)
service.timeoutExpired(conn)
......@@ -425,10 +425,10 @@ class MasterStorageHandlerTests(NeoTestBase):
port = self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
lptid = self.app.pt.getID()
packet = Packet(msg_type=ASK_NEW_TID)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
service.handleAskNewTID(conn, packet)
packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
service.handleAskBeginTransaction(conn, packet)
self.assertEquals(self.app.nm.getNodeByUUID(client_uuid).getState(), RUNNING_STATE)
self.assertEquals(len(self.app.finishing_transaction_dict.keys()), 3)
service.connectionClosed(conn)
......
......@@ -287,13 +287,13 @@ class ProtocolTests(unittest.TestCase):
self.assertEqual(ptid, tid)
def test_32_askNewTID(self):
p = protocol.askNewTID()
def test_32_askBeginTransaction(self):
p = protocol.askBeginTransaction()
self.assertEqual(p.decode(), None)
def test_33_answerNewTID(self):
def test_33_answerBeginTransaction(self):
tid = self.getNextTID()
p = protocol.answerNewTID(tid)
p = protocol.answerBeginTransaction(tid)
ptid = p.decode()[0]
self.assertEqual(ptid, tid)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment