Commit a8f9fedb authored by Julien Muchembled's avatar Julien Muchembled

storage: simplify Transaction API

parent 87eee431
...@@ -77,7 +77,7 @@ class ClientOperationHandler(EventHandler): ...@@ -77,7 +77,7 @@ class ClientOperationHandler(EventHandler):
checksum, data, data_serial, unlock) checksum, data, data_serial, unlock)
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
conn.answer(Packets.AnswerStoreObject(1, oid, err.getTID())) conn.answer(Packets.AnswerStoreObject(1, oid, err.tid))
except DelayedError: except DelayedError:
# locked by a previous transaction, retry later # locked by a previous transaction, retry later
# If we are unlocking, we want queueEvent to raise # If we are unlocking, we want queueEvent to raise
...@@ -194,8 +194,7 @@ class ClientOperationHandler(EventHandler): ...@@ -194,8 +194,7 @@ class ClientOperationHandler(EventHandler):
self.app.tm.checkCurrentSerial(ttid, serial, oid) self.app.tm.checkCurrentSerial(ttid, serial, oid)
except ConflictError, err: except ConflictError, err:
# resolvable or not # resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(1, oid, conn.answer(Packets.AnswerCheckCurrentSerial(1, oid, err.tid))
err.getTID()))
except DelayedError: except DelayedError:
# locked by a previous transaction, retry later # locked by a previous transaction, retry later
try: try:
......
...@@ -27,10 +27,7 @@ class ConflictError(Exception): ...@@ -27,10 +27,7 @@ class ConflictError(Exception):
def __init__(self, tid): def __init__(self, tid):
Exception.__init__(self) Exception.__init__(self)
self._tid = tid self.tid = tid
def getTID(self):
return self._tid
class DelayedError(Exception): class DelayedError(Exception):
...@@ -47,76 +44,41 @@ class Transaction(object): ...@@ -47,76 +44,41 @@ class Transaction(object):
""" """
Container for a pending transaction Container for a pending transaction
""" """
_tid = None tid = None
has_trans = False has_trans = False
def __init__(self, uuid, ttid): def __init__(self, uuid, ttid):
self._uuid = uuid
self._ttid = ttid
self._object_dict = {}
self._locked = False
self._birth = time() self._birth = time()
self._checked_set = set() self.uuid = uuid
# Consider using lists.
self.store_dict = {}
self.checked_set = set()
def __repr__(self): def __repr__(self):
return "<%s(ttid=%r, tid=%r, uuid=%r, locked=%r, age=%.2fs) at 0x%x>" \ return "<%s(tid=%r, uuid=%r, age=%.2fs) at 0x%x>" \
% (self.__class__.__name__, % (self.__class__.__name__,
dump(self._ttid), dump(self.tid),
dump(self._tid), uuid_str(self.uuid),
uuid_str(self._uuid),
self.isLocked(),
time() - self._birth, time() - self._birth,
id(self)) id(self))
def addCheckedObject(self, oid): def check(self, oid):
assert oid not in self._object_dict, dump(oid) assert oid not in self.store_dict, dump(oid)
self._checked_set.add(oid) assert oid not in self.checked_set, dump(oid)
self.checked_set.add(oid)
def getTTID(self):
return self._ttid
def setTID(self, tid):
assert self._tid is None, dump(self._tid)
assert tid is not None
self._tid = tid
def getTID(self):
return self._tid
def getUUID(self):
return self._uuid
def lock(self):
assert not self._locked
self._locked = True
def isLocked(self): def store(self, oid, data_id, value_serial):
return self._locked
def addObject(self, oid, data_id, value_serial):
""" """
Add an object to the transaction Add an object to the transaction
""" """
assert oid not in self._checked_set, dump(oid) assert oid not in self.checked_set, dump(oid)
self._object_dict[oid] = oid, data_id, value_serial self.store_dict[oid] = oid, data_id, value_serial
def delObject(self, oid): def cancel(self, oid):
try: try:
return self._object_dict.pop(oid)[1] return self.store_dict.pop(oid)[1]
except KeyError: except KeyError:
self._checked_set.remove(oid) self.checked_set.remove(oid)
def getObject(self, oid):
return self._object_dict[oid]
def getObjectList(self):
return self._object_dict.values()
def getOIDList(self):
return self._object_dict.keys()
def getLockedOIDList(self):
return self._object_dict.keys() + list(self._checked_set)
class TransactionManager(object): class TransactionManager(object):
...@@ -145,7 +107,7 @@ class TransactionManager(object): ...@@ -145,7 +107,7 @@ class TransactionManager(object):
Return None if not found. Return None if not found.
""" """
try: try:
return self._transaction_dict[ttid].getObject(oid) return self._transaction_dict[ttid].store_dict[oid]
except KeyError: except KeyError:
return None return None
...@@ -166,7 +128,7 @@ class TransactionManager(object): ...@@ -166,7 +128,7 @@ class TransactionManager(object):
transaction = self._transaction_dict[ttid] transaction = self._transaction_dict[ttid]
except KeyError: except KeyError:
raise ProtocolError("unknown ttid %s" % dump(ttid)) raise ProtocolError("unknown ttid %s" % dump(ttid))
object_list = transaction.getObjectList() object_list = transaction.store_dict.itervalues()
if txn_info: if txn_info:
user, desc, ext, oid_list = txn_info user, desc, ext, oid_list = txn_info
txn_info = oid_list, user, desc, ext, False, ttid txn_info = oid_list, user, desc, ext, False, ttid
...@@ -185,21 +147,20 @@ class TransactionManager(object): ...@@ -185,21 +147,20 @@ class TransactionManager(object):
transaction = self._transaction_dict[ttid] transaction = self._transaction_dict[ttid]
except KeyError: except KeyError:
raise ProtocolError("unknown ttid %s" % dump(ttid)) raise ProtocolError("unknown ttid %s" % dump(ttid))
# remember that the transaction has been locked assert transaction.tid is None, dump(transaction.tid)
transaction.lock() assert ttid <= tid, (ttid, tid)
transaction.tid = tid
self._load_lock_dict.update( self._load_lock_dict.update(
dict.fromkeys(transaction.getOIDList(), ttid)) dict.fromkeys(transaction.store_dict, ttid))
# commit transaction and remember its definitive TID
if transaction.has_trans: if transaction.has_trans:
self._app.dm.lockTransaction(tid, ttid) self._app.dm.lockTransaction(tid, ttid)
transaction.setTID(tid)
def unlock(self, ttid): def unlock(self, ttid):
""" """
Unlock transaction Unlock transaction
""" """
try: try:
tid = self._transaction_dict[ttid].getTID() tid = self._transaction_dict[ttid].tid
except KeyError: except KeyError:
raise ProtocolError("unknown ttid %s" % dump(ttid)) raise ProtocolError("unknown ttid %s" % dump(ttid))
logging.debug('Unlock TXN %s (ttid=%s)', dump(tid), dump(ttid)) logging.debug('Unlock TXN %s (ttid=%s)', dump(tid), dump(ttid))
...@@ -210,7 +171,7 @@ class TransactionManager(object): ...@@ -210,7 +171,7 @@ class TransactionManager(object):
def getFinalTID(self, ttid): def getFinalTID(self, ttid):
try: try:
return self._transaction_dict[ttid].getTID() return self._transaction_dict[ttid].tid
except KeyError: except KeyError:
return self._app.dm.getFinalTID(ttid) return self._app.dm.getFinalTID(ttid)
...@@ -233,7 +194,7 @@ class TransactionManager(object): ...@@ -233,7 +194,7 @@ class TransactionManager(object):
# drop the lock it held on this object, and drop object data for # drop the lock it held on this object, and drop object data for
# consistency. # consistency.
del self._store_lock_dict[oid] del self._store_lock_dict[oid]
data_id = self._transaction_dict[ttid].delObject(oid) data_id = self._transaction_dict[ttid].cancel(oid)
if data_id: if data_id:
self._app.dm.pruneData((data_id,)) self._app.dm.pruneData((data_id,))
# Give a chance to pending events to take that lock now. # Give a chance to pending events to take that lock now.
...@@ -245,7 +206,7 @@ class TransactionManager(object): ...@@ -245,7 +206,7 @@ class TransactionManager(object):
elif locking_tid == ttid: elif locking_tid == ttid:
# If previous store was an undo, next store must be based on # If previous store was an undo, next store must be based on
# undo target. # undo target.
previous_serial = self._transaction_dict[ttid].getObject(oid)[2] previous_serial = self._transaction_dict[ttid].store_dict[oid][2]
if previous_serial is None: if previous_serial is None:
# XXX: use some special serial when previous store was not # XXX: use some special serial when previous store was not
# an undo ? Maybe it should just not happen. # an undo ? Maybe it should just not happen.
...@@ -290,7 +251,7 @@ class TransactionManager(object): ...@@ -290,7 +251,7 @@ class TransactionManager(object):
except KeyError: except KeyError:
raise NotRegisteredError raise NotRegisteredError
self.lockObject(ttid, serial, oid, unlock=True) self.lockObject(ttid, serial, oid, unlock=True)
transaction.addCheckedObject(oid) transaction.check(oid)
def storeObject(self, ttid, serial, oid, compression, checksum, data, def storeObject(self, ttid, serial, oid, compression, checksum, data,
value_serial, unlock=False): value_serial, unlock=False):
...@@ -307,7 +268,7 @@ class TransactionManager(object): ...@@ -307,7 +268,7 @@ class TransactionManager(object):
data_id = None data_id = None
else: else:
data_id = self._app.dm.holdData(checksum, data, compression) data_id = self._app.dm.holdData(checksum, data, compression)
transaction.addObject(oid, data_id, value_serial) transaction.store(oid, data_id, value_serial)
def abort(self, ttid, even_if_locked=False): def abort(self, ttid, even_if_locked=False):
""" """
...@@ -323,24 +284,25 @@ class TransactionManager(object): ...@@ -323,24 +284,25 @@ class TransactionManager(object):
return return
logging.debug('Abort TXN %s', dump(ttid)) logging.debug('Abort TXN %s', dump(ttid))
transaction = self._transaction_dict[ttid] transaction = self._transaction_dict[ttid]
has_load_lock = transaction.isLocked() locked = transaction.tid
# if the transaction is locked, ensure we can drop it # if the transaction is locked, ensure we can drop it
if has_load_lock: if locked:
if not even_if_locked: if not even_if_locked:
return return
else: else:
self._app.dm.abortTransaction(ttid) self._app.dm.abortTransaction(ttid)
# unlock any object # unlock any object
for oid in transaction.getLockedOIDList(): for oid in transaction.store_dict, transaction.checked_set:
if has_load_lock: for oid in oid:
lock_ttid = self._load_lock_dict.pop(oid, None) if locked:
assert lock_ttid in (ttid, None), 'Transaction %s tried to ' \ lock_ttid = self._load_lock_dict.pop(oid, None)
'release the lock on oid %s, but it was held by %s' % ( assert lock_ttid in (ttid, None), ('Transaction %s tried'
dump(ttid), dump(oid), dump(lock_ttid)) ' to release the lock on oid %s, but it was held by %s'
write_locking_tid = self._store_lock_dict.pop(oid) % (dump(ttid), dump(oid), dump(lock_ttid)))
assert write_locking_tid == ttid, 'Inconsistent locking state: ' \ write_locking_tid = self._store_lock_dict.pop(oid)
'aborting %s:%s but %s has the lock.' % (dump(ttid), dump(oid), assert write_locking_tid == ttid, ('Inconsistent locking'
dump(write_locking_tid)) ' state: aborting %s:%s but %s has the lock.'
% (dump(ttid), dump(oid), dump(write_locking_tid)))
# remove the transaction # remove the transaction
del self._transaction_dict[ttid] del self._transaction_dict[ttid]
# some locks were released, some pending locks may now succeed # some locks were released, some pending locks may now succeed
...@@ -352,37 +314,35 @@ class TransactionManager(object): ...@@ -352,37 +314,35 @@ class TransactionManager(object):
""" """
logging.debug('Abort for %s', uuid_str(uuid)) logging.debug('Abort for %s', uuid_str(uuid))
# abort any non-locked transaction of this node # abort any non-locked transaction of this node
for transaction in self._transaction_dict.values(): for ttid, transaction in self._transaction_dict.items():
if transaction.getUUID() == uuid: if transaction.uuid == uuid:
self.abort(transaction.getTTID()) self.abort(ttid)
def isLockedTid(self, tid): def isLockedTid(self, tid):
for t in self._transaction_dict.itervalues(): return any(None is not t.tid <= tid
if t.isLocked() and t.getTID() <= tid: for t in self._transaction_dict.itervalues())
return True
return False
def loadLocked(self, oid): def loadLocked(self, oid):
return oid in self._load_lock_dict return oid in self._load_lock_dict
def log(self): def log(self):
logging.info("Transactions:") logging.info("Transactions:")
for txn in self._transaction_dict.values(): for ttid, txn in self._transaction_dict.iteritems():
logging.info(' %r', txn) logging.info(' %s %r', dump(ttid), txn)
logging.info(' Read locks:') logging.info(' Read locks:')
for oid, ttid in self._load_lock_dict.items(): for oid, ttid in self._load_lock_dict.iteritems():
logging.info(' %r by %r', dump(oid), dump(ttid)) logging.info(' %r by %r', dump(oid), dump(ttid))
logging.info(' Write locks:') logging.info(' Write locks:')
for oid, ttid in self._store_lock_dict.items(): for oid, ttid in self._store_lock_dict.iteritems():
logging.info(' %r by %r', dump(oid), dump(ttid)) logging.info(' %r by %r', dump(oid), dump(ttid))
def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id): def updateObjectDataForPack(self, oid, orig_serial, new_serial, data_id):
lock_tid = self.getLockingTID(oid) lock_tid = self.getLockingTID(oid)
if lock_tid is not None: if lock_tid is not None:
transaction = self._transaction_dict[lock_tid] transaction = self._transaction_dict[lock_tid]
if transaction.getObject(oid)[2] == orig_serial: if transaction.store_dict[oid][2] == orig_serial:
if new_serial: if new_serial:
data_id = None data_id = None
else: else:
self._app.dm.holdData(data_id) self._app.dm.holdData(data_id)
transaction.addObject(oid, data_id, new_serial) transaction.store(oid, data_id, new_serial)
...@@ -17,19 +17,9 @@ ...@@ -17,19 +17,9 @@
import unittest import unittest
from mock import Mock from mock import Mock
from .. import NeoUnitTestBase from .. import NeoUnitTestBase
from neo.storage.transactions import Transaction, TransactionManager from neo.storage.transactions import TransactionManager
class TransactionTests(NeoUnitTestBase):
def testLock(self):
txn = Transaction(self.getClientUUID(), self.getNextTID())
self.assertFalse(txn.isLocked())
txn.lock()
self.assertTrue(txn.isLocked())
# disallow lock more than once
self.assertRaises(AssertionError, txn.lock)
class TransactionManagerTests(NeoUnitTestBase): class TransactionManagerTests(NeoUnitTestBase):
def setUp(self): def setUp(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment