Commit 40652c1c authored by Vincent Pelletier's avatar Vincent Pelletier

Provide ReadVerifyingStorage.

Also, fix bogus checkCurrentSerialInTransaction implementation (it was not
properly locking object, allowing them to become non-current by the time
tpc_finish occurs).

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2515 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent dfa89729
......@@ -53,6 +53,7 @@ class Storage(BaseStorage.BaseStorage,
# ZODB.interfaces.IStorageIteration,
ZODB.interfaces.IStorageUndoable,
ZODB.interfaces.IExternalGC,
ZODB.interfaces.ReadVerifyingStorage,
)
def __init__(self, master_nodes, name, connector=None, read_only=False,
......
......@@ -702,33 +702,40 @@ class Application(object):
data = data_dict[oid]
tid = local_var.tid
resolved = False
if conflict_serial <= tid:
new_data = tryToResolveConflict(oid, conflict_serial, serial,
data)
if new_data is not None:
neo.logging.info('Conflict resolution succeed for ' \
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
# Mark this conflict as resolved
resolved_serial_set.update(conflict_serial_dict.pop(oid))
# Try to store again
self._store(oid, conflict_serial, new_data)
append(oid)
resolved = True
if data is not None:
if conflict_serial <= tid:
new_data = tryToResolveConflict(oid, conflict_serial,
serial, data)
if new_data is not None:
neo.logging.info('Conflict resolution succeed for ' \
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
# Mark this conflict as resolved
resolved_serial_set.update(conflict_serial_dict.pop(
oid))
# Try to store again
self._store(oid, conflict_serial, new_data)
append(oid)
resolved = True
else:
neo.logging.info('Conflict resolution failed for ' \
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
else:
neo.logging.info('Conflict resolution failed for ' \
'%r:%r with %r', dump(oid), dump(serial),
dump(conflict_serial))
else:
neo.logging.info('Conflict reported for %r:%r with later ' \
'transaction %r , cannot resolve conflict.', dump(oid),
dump(serial), dump(conflict_serial))
neo.logging.info('Conflict reported for %r:%r with ' \
'later transaction %r , cannot resolve conflict.',
dump(oid), dump(serial), dump(conflict_serial))
if not resolved:
# XXX: Is it really required to remove from data_dict ?
del data_dict[oid]
local_var.data_list.remove(oid)
raise ConflictError(oid=oid,
serials=(tid, serial), data=data)
if data is None:
exc = ReadConflictError(oid=oid, serials=(conflict_serial,
serial))
else:
exc = ConflictError(oid=oid, serials=(tid, serial),
data=data)
raise exc
return result
@profiler_decorator
......@@ -1252,9 +1259,31 @@ class Application(object):
return self._load(oid)[1]
def checkCurrentSerialInTransaction(self, oid, serial, transaction):
if transaction is not self.local_var.txn:
local_var = self.local_var
if transaction is not local_var.txn:
raise StorageTransactionError(self, transaction)
committed_tid = self.getLastTID(oid)
if committed_tid != serial:
raise ReadConflictError(oid=oid, serials=(committed_tid, serial))
cell_list = self._getCellListForOID(oid, writable=True)
if len(cell_list) == 0:
raise NEOStorageError
p = Packets.AskCheckCurrentSerial(local_var.tid, serial, oid)
getConnForCell = self.cp.getConnForCell
queue = local_var.queue
local_var.object_serial_dict[oid] = serial
# Placeholders
local_var.object_stored_counter_dict[oid] = {}
data_dict = local_var.data_dict
if oid not in data_dict:
# Marker value so we don't try to resolve conflicts.
data_dict[oid] = None
local_var.data_list.append(oid)
for cell in cell_list:
conn = getConnForCell(cell)
if conn is None:
continue
try:
conn.ask(p, queue=queue)
except ConnectionClosed:
continue
self._waitAnyMessage(False)
......@@ -89,6 +89,8 @@ class StorageAnswersHandler(AnswerBaseHandler):
object_stored_counter_dict[serial] = \
object_stored_counter_dict.get(serial, 0) + 1
answerCheckCurrentSerial = answerStoreObject
def answerStoreTransaction(self, conn, tid):
if tid != self.app.getTID():
raise ProtocolError('Wrong TID, transaction not started')
......
......@@ -372,6 +372,11 @@ class EventHandler(object):
def answerLastTransaction(self, conn, tid):
raise UnexpectedPacketError
def askCheckCurrentSerial(self, conn, tid, serial, oid):
raise UnexpectedPacketError
answerCheckCurrentSerial = answerStoreObject
# Error packet handlers.
def error(self, conn, code, message):
......@@ -492,6 +497,8 @@ class EventHandler(object):
d[Packets.NotifyReady] = self.notifyReady
d[Packets.AskLastTransaction] = self.askLastTransaction
d[Packets.AnswerLastTransaction] = self.answerLastTransaction
d[Packets.AskCheckCurrentSerial] = self.askCheckCurrentSerial
d[Packets.AnswerCheckCurrentSerial] = self.answerCheckCurrentSerial
return d
......
......@@ -1591,6 +1591,28 @@ class AnswerHasLock(Packet):
oid, state = unpack(self._header_format, body)
return (oid, _decodeLockState(state))
class AskCheckCurrentSerial(Packet):
"""
Verifies if given serial is current for object oid in the database, and
take a write lock on it (so that this state is not altered until
transaction ends).
"""
_header_format = '!8s8s8s'
def _encode(self, tid, serial, oid):
return tid + serial + oid
def _decode(self, body):
return unpack(self._header_format, body)
class AnswerCheckCurrentSerial(AnswerStoreObject):
"""
Answer to AskCheckCurrentSerial.
Same structure as AnswerStoreObject, to handle the same way, except there
is nothing to invalidate in any client's cache.
"""
pass
class AskBarrier(Packet):
"""
Initates a "network barrier", allowing the node sending this packet to know
......@@ -1993,6 +2015,11 @@ class PacketRegistry(dict):
AskLastTransaction,
AnswerLastTransaction,
)
AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
0x003D,
AskCheckCurrentSerial,
AnswerCheckCurrentSerial,
)
# build a "singleton"
Packets = PacketRegistry()
......
......@@ -139,3 +139,33 @@ class ClientOperationHandler(BaseClientAndStorageOperationHandler):
history_list = []
conn.answer(Packets.AnswerObjectHistory(oid, history_list))
def askCheckCurrentSerial(self, conn, tid, serial, oid):
self._askCheckCurrentSerial(conn, tid, serial, oid, time.time())
def _askCheckCurrentSerial(self, conn, tid, serial, oid, request_time):
if tid not in self.app.tm:
# transaction was aborted, cancel this event
neo.logging.info('Forget serial check of %s:%s by %s delayed by '
'%s', dump(oid), dump(serial), dump(tid),
dump(self.app.tm.getLockingTID(oid)))
# send an answer as the client side is waiting for it
conn.answer(Packets.AnswerStoreObject(0, oid, serial))
return
try:
self.app.tm.checkCurrentSerial(tid, serial, oid)
except ConflictError, err:
# resolvable or not
conn.answer(Packets.AnswerCheckCurrentSerial(1, oid,
err.getTID()))
except DelayedError:
# locked by a previous transaction, retry later
self.app.queueEvent(self._askCheckCurrentSerial, conn, tid, serial,
oid, request_time)
else:
if SLOW_STORE is not None:
duration = time.time() - request_time
if duration > SLOW_STORE:
neo.logging.info('CheckCurrentSerial delay: %.02fs',
duration)
conn.answer(Packets.AnswerCheckCurrentSerial(0, oid, serial))
......@@ -52,6 +52,7 @@ class Transaction(object):
self._transaction = None
self._locked = False
self._birth = time()
self._checked_set = set()
def __repr__(self):
return "<%s(tid=%r, uuid=%r, locked=%r, age=%.2fs)> at %x" % (
......@@ -63,6 +64,9 @@ class Transaction(object):
id(self),
)
def addCheckedObject(self, oid):
self._checked_set.add(oid)
def getTID(self):
return self._tid
......@@ -99,6 +103,9 @@ class Transaction(object):
def getOIDList(self):
return self._object_dict.keys()
def getLockedOIDList(self):
return self._object_dict.keys() + list(self._checked_set)
def getTransactionInformations(self):
return self._transaction
......@@ -191,10 +198,13 @@ class TransactionManager(object):
def getLockingTID(self, oid):
return self._store_lock_dict.get(oid)
def storeObject(self, tid, serial, oid, compression, checksum, data,
value_serial):
def lockObject(self, tid, serial, oid):
"""
Store an object received from client node
Take a write lock on given object, checking that "serial" is
current.
Raises:
DelayedError
ConflictError
"""
# check if the object if locked
locking_tid = self._store_lock_dict.get(oid)
......@@ -222,6 +232,18 @@ class TransactionManager(object):
dump(oid), dump(tid), dump(locking_tid))
raise ConflictError(locking_tid)
def checkCurrentSerial(self, tid, serial, oid):
self.lockObject(tid, serial, oid)
assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
transaction.addCheckedObject(oid)
def storeObject(self, tid, serial, oid, compression, checksum, data,
value_serial):
"""
Store an object received from client node
"""
self.lockObject(tid, serial, oid)
# store object
assert tid in self, "Transaction not registered"
transaction = self._transaction_dict[tid]
......@@ -245,12 +267,12 @@ class TransactionManager(object):
if not even_if_locked and has_load_lock:
return
# unlock any object
for oid in transaction.getOIDList():
for oid in transaction.getLockedOIDList():
if has_load_lock:
lock_tid = self._load_lock_dict.pop(oid)
assert lock_tid == tid, 'Transaction %s tried to release ' \
'the lock on oid %s, but it was held by %s' % (dump(tid),
dump(oid), dump(lock_tid))
lock_tid = self._load_lock_dict.pop(oid, None)
assert lock_tid in (tid, None), 'Transaction %s tried to ' \
'release the lock on oid %s, but it was held by %s' % (
dump(tid), dump(oid), dump(lock_tid))
del self._store_lock_dict[oid]
# remove the transaction
uuid = transaction.getUUID()
......
......@@ -702,6 +702,16 @@ class ProtocolTests(NeoUnitTestBase):
ptid = p.decode()[0]
self.assertEqual(ptid, tid)
def test_AskCheckCurrentSerial(self):
tid = self.getNextTID()
serial = self.getNextTID()
oid = self.getNextTID()
p = Packets.AskCheckCurrentSerial(tid, serial, oid)
ptid, pserial, poid = p.decode()
self.assertEqual(ptid, tid)
self.assertEqual(pserial, serial)
self.assertEqual(poid, oid)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment