Commit bc0ce9e3 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Rename tid to ttid to clarify the meaning.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2611 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent d700898b
......@@ -747,14 +747,14 @@ class Application(object):
self._waitAnyMessage(False)
def onStoreTimeout(self, conn, msg_id, tid, oid):
def onStoreTimeout(self, conn, msg_id, ttid, oid):
# NOTE: this method is called from poll thread, don't use
# local_var !
# Stop expecting the timed-out store request.
queue = self.dispatcher.forget(conn, msg_id)
# Ask the storage if someone locks the object.
# Shorten timeout to react earlier to an unresponding storage.
conn.ask(Packets.AskHasLock(tid, oid), timeout=5, queue=queue)
conn.ask(Packets.AskHasLock(ttid, oid), timeout=5, queue=queue)
return True
@profiler_decorator
......
......@@ -202,7 +202,7 @@ class EventHandler(object):
def askBeginTransaction(self, conn, tid):
raise UnexpectedPacketError
def answerBeginTransaction(self, conn, tid):
def answerBeginTransaction(self, conn, ttid):
raise UnexpectedPacketError
def askNewOIDs(self, conn, num_oids):
......@@ -211,7 +211,7 @@ class EventHandler(object):
def answerNewOIDs(self, conn, num_oids):
raise UnexpectedPacketError
def askFinishTransaction(self, conn, tid, oid_list):
def askFinishTransaction(self, conn, ttid, oid_list):
raise UnexpectedPacketError
def answerTransactionFinished(self, conn, ttid, tid):
......@@ -226,27 +226,27 @@ class EventHandler(object):
def invalidateObjects(self, conn, tid, oid_list):
raise UnexpectedPacketError
def notifyUnlockInformation(self, conn, tid):
def notifyUnlockInformation(self, conn, ttid):
raise UnexpectedPacketError
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, tid, unlock):
compression, checksum, data, data_serial, ttid, unlock):
raise UnexpectedPacketError
def answerStoreObject(self, conn, conflicting, oid, serial):
raise UnexpectedPacketError
def abortTransaction(self, conn, tid):
def abortTransaction(self, conn, ttid):
raise UnexpectedPacketError
def askStoreTransaction(self, conn, tid, user, desc,
def askStoreTransaction(self, conn, ttid, user, desc,
ext, oid_list):
raise UnexpectedPacketError
def answerStoreTransaction(self, conn, tid):
def answerStoreTransaction(self, conn, ttid):
raise UnexpectedPacketError
def askObject(self, conn, oid, serial, tid):
def askObject(self, conn, oid, serial, ttid):
raise UnexpectedPacketError
def answerObject(self, conn, oid, serial_start,
......@@ -327,13 +327,13 @@ class EventHandler(object):
def notifyReplicationDone(self, conn, offset):
raise UnexpectedPacketError
def askObjectUndoSerial(self, conn, tid, ltid, undone_tid, oid_list):
def askObjectUndoSerial(self, conn, ttid, ltid, undone_tid, oid_list):
raise UnexpectedPacketError
def answerObjectUndoSerial(self, conn, object_tid_dict):
raise UnexpectedPacketError
def askHasLock(self, conn, tid, oid):
def askHasLock(self, conn, ttid, oid):
raise UnexpectedPacketError
def answerHasLock(self, conn, oid, status):
......@@ -375,7 +375,7 @@ class EventHandler(object):
def answerLastTransaction(self, conn, tid):
raise UnexpectedPacketError
def askCheckCurrentSerial(self, conn, tid, serial, oid):
def askCheckCurrentSerial(self, conn, ttid, serial, oid):
raise UnexpectedPacketError
answerCheckCurrentSerial = answerStoreObject
......
......@@ -30,8 +30,8 @@ SLOW_STORE = 2
class ClientOperationHandler(BaseClientAndStorageOperationHandler):
def _askObject(self, oid, serial, tid):
return self.app.dm.getObject(oid, serial, tid)
def _askObject(self, oid, serial, ttid):
return self.app.dm.getObject(oid, serial, ttid)
def connectionLost(self, conn, new_state):
uuid = conn.getUUID()
......@@ -39,31 +39,31 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
assert node is not None, conn
self.app.nm.remove(node)
def abortTransaction(self, conn, tid):
self.app.tm.abort(tid)
def abortTransaction(self, conn, ttid):
self.app.tm.abort(ttid)
def askStoreTransaction(self, conn, tid, user, desc, ext, oid_list):
self.app.tm.register(conn.getUUID(), tid)
self.app.tm.storeTransaction(tid, oid_list, user, desc, ext, False)
conn.answer(Packets.AnswerStoreTransaction(tid))
def askStoreTransaction(self, conn, ttid, user, desc, ext, oid_list):
self.app.tm.register(conn.getUUID(), ttid)
self.app.tm.storeTransaction(ttid, oid_list, user, desc, ext, False)
conn.answer(Packets.AnswerStoreTransaction(ttid))
def _askStoreObject(self, conn, oid, serial, compression, checksum, data,
data_serial, tid, unlock, request_time):
if tid not in self.app.tm:
data_serial, ttid, unlock, request_time):
if ttid not in self.app.tm:
# transaction was aborted, cancel this event
neo.logging.info('Forget store of %s:%s by %s delayed by %s',
dump(oid), dump(serial), dump(tid),
dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
return
try:
self.app.tm.storeObject(tid, serial, oid, compression,
self.app.tm.storeObject(ttid, serial, oid, compression,
checksum, data, data_serial, unlock)
except ConflictError, err:
# resolvable or not
tid_or_serial = err.getTID()
conn.answer(Packets.AnswerStoreObject(1, oid, tid_or_serial))
ttid_or_serial = err.getTID()
conn.answer(Packets.AnswerStoreObject(1, oid, ttid_or_serial))
except DelayedError:
# locked by a previous transaction, retry later
# If we are unlocking, we want queueEvent to raise
......@@ -71,8 +71,8 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
# response.
try:
self.app.queueEvent(self._askStoreObject, conn, (oid, serial,
compression, checksum, data, data_serial, tid,
unlock, request_time), key=(oid, tid),
compression, checksum, data, data_serial, ttid,
unlock, request_time), key=(oid, ttid),
raise_on_duplicate=unlock)
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
......@@ -84,16 +84,16 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
def askStoreObject(self, conn, oid, serial,
compression, checksum, data, data_serial, tid, unlock):
compression, checksum, data, data_serial, ttid, unlock):
# register the transaction
self.app.tm.register(conn.getUUID(), tid)
self.app.tm.register(conn.getUUID(), ttid)
if data_serial is not None:
assert data == '', repr(data)
# Change data to None here, to do it only once, even if store gets
# delayed.
data = None
self._askStoreObject(conn, oid, serial, compression, checksum, data,
data_serial, tid, unlock, time.time())
data_serial, ttid, unlock, time.time())
def askTIDsFrom(self, conn, min_tid, max_tid, length, partition_list):
app = self.app
......@@ -122,14 +122,14 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
app.pt.getPartitions(), partition_list)
conn.answer(Packets.AnswerTIDs(tid_list))
def askObjectUndoSerial(self, conn, tid, ltid, undone_tid, oid_list):
def askObjectUndoSerial(self, conn, ttid, ltid, undone_tid, oid_list):
app = self.app
findUndoTID = app.dm.findUndoTID
getObjectFromTransaction = app.tm.getObjectFromTransaction
object_tid_dict = {}
for oid in oid_list:
current_serial, undo_serial, is_current = findUndoTID(oid, tid,
ltid, undone_tid, getObjectFromTransaction(tid, oid))
current_serial, undo_serial, is_current = findUndoTID(oid, ttid,
ltid, undone_tid, getObjectFromTransaction(ttid, oid))
if current_serial is None:
p = Errors.OidNotFound(dump(oid))
break
......@@ -138,12 +138,12 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
p = Packets.AnswerObjectUndoSerial(object_tid_dict)
conn.answer(p)
def askHasLock(self, conn, tid, oid):
def askHasLock(self, conn, ttid, oid):
locking_tid = self.app.tm.getLockingTID(oid)
neo.logging.info('%r check lock of %r:%r', conn, dump(tid), dump(oid))
neo.logging.info('%r check lock of %r:%r', conn, dump(ttid), dump(oid))
if locking_tid is None:
state = LockState.NOT_LOCKED
elif locking_tid is tid:
elif locking_tid is ttid:
state = LockState.GRANTED
else:
state = LockState.GRANTED_TO_OTHER
......@@ -161,20 +161,20 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
p = Packets.AnswerObjectHistory(oid, history_list)
conn.answer(p)
def askCheckCurrentSerial(self, conn, tid, serial, oid):
self._askCheckCurrentSerial(conn, tid, serial, oid, time.time())
def askCheckCurrentSerial(self, conn, ttid, serial, oid):
self._askCheckCurrentSerial(conn, ttid, serial, oid, time.time())
def _askCheckCurrentSerial(self, conn, tid, serial, oid, request_time):
if tid not in self.app.tm:
def _askCheckCurrentSerial(self, conn, ttid, serial, oid, request_time):
if ttid not in self.app.tm:
# transaction was aborted, cancel this event
neo.logging.info('Forget serial check of %s:%s by %s delayed by '
'%s', dump(oid), dump(serial), dump(tid),
'%s', dump(oid), dump(serial), dump(ttid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
return
try:
self.app.tm.checkCurrentSerial(tid, serial, oid)
self.app.tm.checkCurrentSerial(ttid, serial, oid)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(1, oid,
......@@ -182,8 +182,8 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
except DelayedError:
# locked by a previous transaction, retry later
try:
self.app.queueEvent(self._askCheckCurrentSerial, conn, (tid,
serial, oid, request_time), key=(oid, tid))
self.app.queueEvent(self._askCheckCurrentSerial, conn, (ttid,
serial, oid, request_time), key=(oid, ttid))
except AlreadyPendingError:
conn.answer(Errors.AlreadyPending(dump(oid)))
else:
......
......@@ -60,11 +60,11 @@ class MasterOperationHandler(BaseMasterHandler):
if not conn.isClosed():
conn.answer(Packets.AnswerInformationLocked(tid))
def notifyUnlockInformation(self, conn, tid):
if not tid in self.app.tm:
def notifyUnlockInformation(self, conn, ttid):
if not ttid in self.app.tm:
raise ProtocolError('Unknown transaction')
# TODO: send an answer
self.app.tm.unlock(tid)
self.app.tm.unlock(ttid)
def askPack(self, conn, tid):
app = self.app
......
......@@ -135,29 +135,29 @@ class TransactionManager(object):
self._load_lock_dict = {}
self._uuid_dict = {}
def __contains__(self, tid):
def __contains__(self, ttid):
"""
Returns True if the TID is known by the manager
"""
return tid in self._transaction_dict
return ttid in self._transaction_dict
def register(self, uuid, tid):
def register(self, uuid, ttid):
"""
Register a transaction, it may be already registered
"""
transaction = self._transaction_dict.get(tid, None)
transaction = self._transaction_dict.get(ttid, None)
if transaction is None:
transaction = Transaction(uuid, tid)
transaction = Transaction(uuid, ttid)
self._uuid_dict.setdefault(uuid, set()).add(transaction)
self._transaction_dict[tid] = transaction
self._transaction_dict[ttid] = transaction
return transaction
def getObjectFromTransaction(self, tid, oid):
def getObjectFromTransaction(self, ttid, oid):
"""
Return object data for given running transaction.
Return None if not found.
"""
result = self._transaction_dict.get(tid)
result = self._transaction_dict.get(ttid)
if result is not None:
result = result.getObject(oid)
return result
......@@ -206,18 +206,18 @@ class TransactionManager(object):
self._app.dm.finishTransaction(self.getTIDFromTTID(ttid))
self.abort(ttid, even_if_locked=True)
def storeTransaction(self, tid, oid_list, user, desc, ext, packed):
def storeTransaction(self, ttid, oid_list, user, desc, ext, packed):
"""
Store transaction information received from client node
"""
assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
assert ttid in self, "Transaction not registered"
transaction = self._transaction_dict[ttid]
transaction.prepare(oid_list, user, desc, ext, packed)
def getLockingTID(self, oid):
return self._store_lock_dict.get(oid)
def lockObject(self, tid, serial, oid, unlock=False):
def lockObject(self, ttid, serial, oid, unlock=False):
"""
Take a write lock on given object, checking that "serial" is
current.
......@@ -227,30 +227,30 @@ class TransactionManager(object):
"""
# check if the object if locked
locking_tid = self._store_lock_dict.get(oid)
if locking_tid == tid and unlock:
if locking_tid == ttid and unlock:
neo.logging.info('Deadlock resolution on %r:%r', dump(oid),
dump(tid))
dump(ttid))
# A duplicate store means client is resolving a deadlock, so
# drop the lock it held on this object, and drop object data for
# consistency.
del self._store_lock_dict[oid]
self._transaction_dict[tid].delObject(oid)
self._transaction_dict[ttid].delObject(oid)
# Give a chance to pending events to take that lock now.
self._app.executeQueuedEvents()
# Attemp to acquire lock again.
locking_tid = self._store_lock_dict.get(oid)
if locking_tid in (None, tid):
if locking_tid in (None, ttid):
# check if this is generated from the latest revision.
if locking_tid == tid:
if locking_tid == ttid:
# If previous store was an undo, next store must be based on
# undo target.
_, _, _, _, previous_serial = self._transaction_dict[
tid].getObject(oid)
ttid].getObject(oid)
if previous_serial is None:
# XXX: use some special serial when previous store was not
# an undo ? Maybe it should just not happen.
neo.logging.info('Transaction %s storing %s more than '
'once', dump(tid), dump(oid))
'once', dump(ttid), dump(oid))
else:
previous_serial = None
if previous_serial is None:
......@@ -259,56 +259,56 @@ class TransactionManager(object):
previous_serial = history_list[0][0]
if previous_serial is not None and previous_serial != serial:
neo.logging.info('Resolvable conflict on %r:%r', dump(oid),
dump(tid))
dump(ttid))
raise ConflictError(previous_serial)
neo.logging.info('Transaction %s storing %s', dump(tid), dump(oid))
self._store_lock_dict[oid] = tid
elif locking_tid > tid:
neo.logging.info('Transaction %s storing %s', dump(ttid), dump(oid))
self._store_lock_dict[oid] = ttid
elif locking_tid > ttid:
# We have a smaller TID than locking transaction, so we are older:
# enter waiting queue so we are handled when lock gets released.
neo.logging.info('Store delayed for %r:%r by %r', dump(oid),
dump(tid), dump(locking_tid))
dump(ttid), dump(locking_tid))
raise DelayedError
else:
# We have a bigger TID than locking transaction, so we are
# We have a bigger TTID than locking transaction, so we are
# younger: this is a possible deadlock case, as we might already
# hold locks that older transaction is waiting upon. Make client
# release locks & reacquire them by notifying it of the possible
# deadlock.
neo.logging.info('Possible deadlock on %r:%r with %r',
dump(oid), dump(tid), dump(locking_tid))
dump(oid), dump(ttid), dump(locking_tid))
raise ConflictError(ZERO_TID)
def checkCurrentSerial(self, tid, serial, oid):
self.lockObject(tid, serial, oid, unlock=True)
assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
def checkCurrentSerial(self, ttid, serial, oid):
self.lockObject(ttid, serial, oid, unlock=True)
assert ttid in self, "Transaction not registered"
transaction = self._transaction_dict[ttid]
transaction.addCheckedObject(oid)
def storeObject(self, tid, serial, oid, compression, checksum, data,
def storeObject(self, ttid, serial, oid, compression, checksum, data,
value_serial, unlock=False):
"""
Store an object received from client node
"""
self.lockObject(tid, serial, oid, unlock=unlock)
self.lockObject(ttid, serial, oid, unlock=unlock)
# store object
assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
assert ttid in self, "Transaction not registered"
transaction = self._transaction_dict[ttid]
transaction.addObject(oid, compression, checksum, data, value_serial)
def abort(self, tid, even_if_locked=False):
def abort(self, ttid, even_if_locked=False):
"""
Abort a transaction
Releases locks held on all transaction objects, deletes Transaction
instance, and executed queued events.
Note: does not alter persistent content.
"""
if tid not in self._transaction_dict:
if ttid not in self._transaction_dict:
# the tid may be unknown as the transaction is aborted on every node
# of the partition, even if no data was received (eg. conflict on
# another node)
return
transaction = self._transaction_dict[tid]
transaction = self._transaction_dict[ttid]
has_load_lock = transaction.isLocked()
# if the transaction is locked, ensure we can drop it
if not even_if_locked and has_load_lock:
......@@ -316,23 +316,23 @@ class TransactionManager(object):
# unlock any object
for oid in transaction.getLockedOIDList():
if has_load_lock:
lock_tid = self._load_lock_dict.pop(oid, None)
assert lock_tid in (tid, None), 'Transaction %s tried to ' \
lock_ttid = self._load_lock_dict.pop(oid, None)
assert lock_ttid in (ttid, None), 'Transaction %s tried to ' \
'release the lock on oid %s, but it was held by %s' % (
dump(tid), dump(oid), dump(lock_tid))
dump(ttid), dump(oid), dump(lock_tid))
try:
del self._store_lock_dict[oid]
except KeyError:
# all locks might not have been acquiredwhen aborting
neo.logging.warning('%s write lock was not held by %s',
dump(oid), dump(tid))
dump(oid), dump(ttid))
# remove the transaction
uuid = transaction.getUUID()
self._uuid_dict[uuid].discard(transaction)
# clean node index if there is no more current transactions
if not self._uuid_dict[uuid]:
del self._uuid_dict[uuid]
del self._transaction_dict[tid]
del self._transaction_dict[ttid]
# some locks were released, some pending locks may now succeed
self._app.executeQueuedEvents()
......@@ -356,11 +356,11 @@ class TransactionManager(object):
for txn in self._transaction_dict.values():
neo.logging.info(' %r', txn)
neo.logging.info(' Read locks:')
for oid, tid in self._load_lock_dict.items():
neo.logging.info(' %r by %r', dump(oid), dump(tid))
for oid, ttid in self._load_lock_dict.items():
neo.logging.info(' %r by %r', dump(oid), dump(ttid))
neo.logging.info(' Write locks:')
for oid, tid in self._store_lock_dict.items():
neo.logging.info(' %r by %r', dump(oid), dump(tid))
for oid, ttid in self._store_lock_dict.items():
neo.logging.info(' %r by %r', dump(oid), dump(ttid))
def updateObjectDataForPack(self, oid, orig_serial, new_serial,
getObjectData):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment