Commit 02dcbf60 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Revert previous commit, only file was supposed to be commited. Sorry for the


git-svn-id: 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 921f70dc
......@@ -123,14 +123,6 @@ RC Known bugs
- Fix inconsistencies between client oid pool and the last oid returned by storage nodes.
Currently, the max oid returned by a storage is the max of the oid column, which ignores oids generated by master node which are not used yet in any object.
The oid pool at client side can then contain oids greater than loid known by primary master.
- It should broadcast to storage and client the last OID generated to allow
them dectect OID overlapping.
- The storage must detect when a client store a object with a greater OID
than the last generated by the master node. It should at least notice to
restart the whole cluster to avoid conflicts. This is mainly the case when
importing data.
- Consider auto-generating cluster name upon initial startup (it might actualy be a partition property).
# Default parameters.
# The cluster name
name: main
# The list of master nodes.
# Partition table configuration
......@@ -659,13 +659,14 @@ class Application(object):
if self.local_var.txn is transaction:
# We already begin the same transaction
# ask the primary master to start a transaction, if no tid is supplied,
# the master will supply us one. Otherwise the requested tid will be
# used if possible.
self.local_var.tid = None
if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed')
# Get a new transaction id if necessary
if tid is None:
self.local_var.tid = None
if self.local_var.tid is None:
raise NEOStorageError('tpc_begin failed')
self.local_var.tid = tid
self.local_var.txn = transaction
......@@ -236,7 +236,7 @@ class EventHandler(object):
def handleCommitTransaction(self, conn, packet, tid):
raise UnexpectedPacketError
def handleAskBeginTransaction(self, conn, packet, tid):
def handleAskBeginTransaction(self, conn, packet):
raise UnexpectedPacketError
def handleAnswerBeginTransaction(self, conn, packet, tid):
......@@ -80,7 +80,7 @@ class MasterHandler(EventHandler):
def handleOidNotFound(self, conn, packet, message):
logging.error('ignoring OID Not Found in %s' % self.__class__.__name__)
def handleAskBeginTransaction(self, conn, packet, tid):
def handleAskBeginTransaction(self, conn, packet):
logging.error('ignoring Ask New TID in %s' % self.__class__.__name__)
def handleAskNewOIDs(self, conn, packet, num_oids):
......@@ -94,15 +94,9 @@ class ClientServiceHandler(BaseServiceHandler):
except KeyError:
logging.warn('aborting transaction %s does not exist', dump(tid))
def handleAskBeginTransaction(self, conn, packet, tid):
def handleAskBeginTransaction(self, conn, packet):
app =
if tid is not None and tid < app.ltid:
# supplied TID is in the past
raise protocol.ProtocolError('invalid TID requested')
if tid is None:
# give a new transaction ID
tid = app.getNextTID()
app.ltid = tid
tid = app.getNextTID()
app.finishing_transaction_dict[tid] = FinishingTransaction(conn)
conn.answer(protocol.answerBeginTransaction(tid), packet)
......@@ -36,7 +36,7 @@ class ShutdownHandler(BaseServiceHandler):
def handleAskBeginTransaction(self, conn, packet, tid):
def handleAskBeginTransaction(self, conn, packet):
logging.error('reject any new demand for new tid')
raise protocol.ProtocolError('cluster is shutting down')
......@@ -172,10 +172,10 @@ packet_types = Enum({
# Commit a transaction. PM -> S.
# Ask to begin a new transaction. C -> PM.
# Ask a new transaction ID. C -> PM.
# Answer when a transaction begin, give a TID if necessary. PM -> C.
# Answer a new transaction ID. PM -> C.
# Finish a transaction. C -> PM.
......@@ -531,16 +531,6 @@ def _encodePTID(ptid):
return ptid
def _decodeTID(tid):
if tid == INVALID_TID:
return None
return tid
def _encodeTID(tid):
if tid is None:
return tid
def _readString(buf, name, offset=0):
buf = buf[offset:]
(size, ) = unpack('!L', buf[:4])
......@@ -762,15 +752,12 @@ decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
def _decodeAskBeginTransaction(body):
(tid, ) = unpack('8s', body)
tid = _decodeTID(tid)
return (tid, )
decode_table[ASK_BEGIN_TRANSACTION] = _decodeAskBeginTransaction
def _decodeAnswerBeginTransaction(body):
(tid, ) = unpack('8s', body)
tid = _decodeTID(tid)
return (tid, )
decode_table[ANSWER_BEGIN_TRANSACTION] = _decodeAnswerBeginTransaction
......@@ -883,7 +870,8 @@ def _decodeAskObject(body):
(oid, serial, tid) = unpack('8s8s8s', body)
if serial == INVALID_TID:
serial = None
tid = _decodeTID(tid)
if tid == INVALID_TID:
tid = None
return (oid, serial, tid)
decode_table[ASK_OBJECT] = _decodeAskObject
......@@ -1168,7 +1156,8 @@ def answerLastIDs(loid, ltid, lptid):
# packet when no last IDs are known
if loid is None:
ltid = _encodeTID(ltid)
if ltid is None:
lptid = _encodePTID(lptid)
return Packet(ANSWER_LAST_IDS, loid + ltid + lptid)
......@@ -1237,12 +1226,10 @@ def deleteTransaction(tid):
def commitTransaction(tid):
return Packet(COMMIT_TRANSACTION, tid)
def askBeginTransaction(tid):
tid = _encodeTID(tid)
return Packet(ASK_BEGIN_TRANSACTION, tid)
def askBeginTransaction():
def answerBeginTransaction(tid):
tid = _encodeTID(tid)
def askNewOIDs(num_oids):
......@@ -1310,8 +1297,11 @@ def answerStoreObject(conflicting, oid, serial):
return Packet(ANSWER_STORE_OBJECT, body)
def askObject(oid, serial, tid):
tid = _encodeTID(tid)
serial = _encodeTID(serial) # serial is the previous TID
if tid is None:
# tid can be unspecified
if serial is None:
serial = INVALID_TID
return Packet(ASK_OBJECT, pack('!8s8s8s', oid, serial, tid))
def answerObject(oid, serial_start, serial_end, compression,
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment