Commit f2f0e7be authored by Grégory Wisniewski's avatar Grégory Wisniewski

Implement support of a new transaction with a TID supplied by the client.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1010 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 8c830924
......@@ -659,14 +659,13 @@ class Application(object):
if self.local_var.txn is transaction:
# We already begin the same transaction
return
# Get a new transaction id if necessary
if tid is None:
# ask the primary master to start a transaction, if no tid is supplied,
# the master will supply us one. Otherwise the requested tid will be
# used if possible.
self.local_var.tid = None
self._askPrimary(protocol.askBeginTransaction())
self._askPrimary(protocol.askBeginTransaction(tid))
if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed')
else:
self.local_var.tid = tid
self.local_var.txn = transaction
......
......@@ -236,7 +236,7 @@ class EventHandler(object):
def handleCommitTransaction(self, conn, packet, tid):
raise UnexpectedPacketError
def handleAskBeginTransaction(self, conn, packet):
def handleAskBeginTransaction(self, conn, packet, tid):
raise UnexpectedPacketError
def handleAnswerBeginTransaction(self, conn, packet, tid):
......
......@@ -80,7 +80,7 @@ class MasterHandler(EventHandler):
def handleOidNotFound(self, conn, packet, message):
logging.error('ignoring OID Not Found in %s' % self.__class__.__name__)
def handleAskBeginTransaction(self, conn, packet):
def handleAskBeginTransaction(self, conn, packet, tid):
logging.error('ignoring Ask New TID in %s' % self.__class__.__name__)
def handleAskNewOIDs(self, conn, packet, num_oids):
......
......@@ -94,9 +94,15 @@ class ClientServiceHandler(BaseServiceHandler):
except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid))
def handleAskBeginTransaction(self, conn, packet):
def handleAskBeginTransaction(self, conn, packet, tid):
app = self.app
if tid is not None and tid < app.ltid:
# supplied TID is in the past
raise protocol.ProtocolError('invalid TID requested')
if tid is None:
# give a new transaction ID
tid = app.getNextTID()
app.ltid = tid
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.answer(protocol.answerBeginTransaction(tid), packet)
......
......@@ -36,7 +36,7 @@ class ShutdownHandler(BaseServiceHandler):
@decorators.identification_required
@decorators.restrict_node_types(CLIENT_NODE_TYPE)
def handleAskBeginTransaction(self, conn, packet):
def handleAskBeginTransaction(self, conn, packet, tid):
logging.error('reject any new demand for new tid')
raise protocol.ProtocolError('cluster is shutting down')
......
......@@ -172,10 +172,10 @@ packet_types = Enum({
# Commit a transaction. PM -> S.
'COMMIT_TRANSACTION': 0x0011,
# Ask a new transaction ID. C -> PM.
# Ask to begin a new transaction. C -> PM.
'ASK_BEGIN_TRANSACTION': 0x0012,
# Answer a new transaction ID. PM -> C.
# Answer when a transaction begin, give a TID if necessary. PM -> C.
'ANSWER_BEGIN_TRANSACTION': 0x8012,
# Finish a transaction. C -> PM.
......@@ -531,6 +531,16 @@ def _encodePTID(ptid):
return INVALID_PTID
return ptid
def _decodeTID(tid):
if tid == INVALID_TID:
return None
return tid
def _encodeTID(tid):
if tid is None:
return INVALID_TID
return tid
def _readString(buf, name, offset=0):
buf = buf[offset:]
(size, ) = unpack('!L', buf[:4])
......@@ -752,12 +762,15 @@ decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
@handle_errors
def _decodeAskBeginTransaction(body):
pass
(tid, ) = unpack('8s', body)
tid = _decodeTID(tid)
return (tid, )
decode_table[ASK_BEGIN_TRANSACTION] = _decodeAskBeginTransaction
@handle_errors
def _decodeAnswerBeginTransaction(body):
(tid, ) = unpack('8s', body)
tid = _decodeTID(tid)
return (tid, )
decode_table[ANSWER_BEGIN_TRANSACTION] = _decodeAnswerBeginTransaction
......@@ -870,8 +883,7 @@ def _decodeAskObject(body):
(oid, serial, tid) = unpack('8s8s8s', body)
if serial == INVALID_TID:
serial = None
if tid == INVALID_TID:
tid = None
tid = _decodeTID(tid)
return (oid, serial, tid)
decode_table[ASK_OBJECT] = _decodeAskObject
......@@ -1156,8 +1168,7 @@ def answerLastIDs(loid, ltid, lptid):
# packet when no last IDs are known
if loid is None:
loid = INVALID_OID
if ltid is None:
ltid = INVALID_TID
ltid = _encodeTID(ltid)
lptid = _encodePTID(lptid)
return Packet(ANSWER_LAST_IDS, loid + ltid + lptid)
......@@ -1226,10 +1237,12 @@ def deleteTransaction(tid):
def commitTransaction(tid):
return Packet(COMMIT_TRANSACTION, tid)
def askBeginTransaction():
return Packet(ASK_BEGIN_TRANSACTION)
def askBeginTransaction(tid):
tid = _encodeTID(tid)
return Packet(ASK_BEGIN_TRANSACTION, tid)
def answerBeginTransaction(tid):
tid = _encodeTID(tid)
return Packet(ANSWER_BEGIN_TRANSACTION, tid)
def askNewOIDs(num_oids):
......@@ -1297,11 +1310,8 @@ def answerStoreObject(conflicting, oid, serial):
return Packet(ANSWER_STORE_OBJECT, body)
def askObject(oid, serial, tid):
if tid is None:
# tid can be unspecified
tid = INVALID_TID
if serial is None:
serial = INVALID_TID
tid = _encodeTID(tid)
serial = _encodeTID(serial) # serial is the previous TID
return Packet(ASK_OBJECT, pack('!8s8s8s', oid, serial, tid))
def answerObject(oid, serial_start, serial_end, compression,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment