Commit ee5cb1f9 authored by Julien Muchembled's avatar Julien Muchembled

Fix several issues when undoing transactions with conflict resolutions

parent c42baaef
...@@ -655,7 +655,13 @@ class Application(ThreadedApplication): ...@@ -655,7 +655,13 @@ class Application(ThreadedApplication):
undo_object_tid_dict = {} undo_object_tid_dict = {}
snapshot_tid = p64(u64(self.last_tid) + 1) snapshot_tid = p64(u64(self.last_tid) + 1)
for partition, oid_list in partition_oid_dict.iteritems(): for partition, oid_list in partition_oid_dict.iteritems():
cell_list = getCellList(partition, readable=True) cell_list = [cell
for cell in getCellList(partition, readable=True)
# Exclude nodes that may have missed previous resolved
# conflicts. For example, if a network failure happened only
# between the client and the storage, the latter would still
# be readable until we commit.
if txn_context.involved_nodes.get(cell.getUUID(), 0) < 2]
# We do want to shuffle before getting one with the smallest # We do want to shuffle before getting one with the smallest
# key, so that all cells with the same (smallest) key has # key, so that all cells with the same (smallest) key has
# identical chance to be chosen. # identical chance to be chosen.
...@@ -685,7 +691,11 @@ class Application(ThreadedApplication): ...@@ -685,7 +691,11 @@ class Application(ThreadedApplication):
# object. This is an undo conflict, try to resolve it. # object. This is an undo conflict, try to resolve it.
try: try:
# Load the latest version we are supposed to see # Load the latest version we are supposed to see
data = self.load(oid, current_serial)[0] if current_serial == ttid:
# XXX: see TODO below
data = txn_context.cache_dict[oid]
else:
data = self.load(oid, current_serial)[0]
# Load the version we were undoing to # Load the version we were undoing to
undo_data = self.load(oid, undo_serial)[0] undo_data = self.load(oid, undo_serial)[0]
except NEOStorageNotFoundError: except NEOStorageNotFoundError:
...@@ -699,8 +709,14 @@ class Application(ThreadedApplication): ...@@ -699,8 +709,14 @@ class Application(ThreadedApplication):
raise UndoError('Some data were modified by a later ' \ raise UndoError('Some data were modified by a later ' \
'transaction', oid) 'transaction', oid)
undo_serial = None undo_serial = None
# TODO: The situation is similar to deadlock avoidance.
# Reenable the cache size limit to avoid OOM when there's
# a huge amount conflicting data, and get the data back
# from the storage when it's not in cache_dict anymore.
txn_context.cache_size = - float('inf')
self._store(txn_context, oid, current_serial, data, undo_serial) self._store(txn_context, oid, current_serial, data, undo_serial)
self.waitStoreResponses(txn_context)
return None, txn_oid_list return None, txn_oid_list
def _insertMetadata(self, txn_info, extension): def _insertMetadata(self, txn_info, extension):
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading import struct, threading
from collections import defaultdict from collections import defaultdict
from contextlib import contextmanager from contextlib import contextmanager
from functools import wraps from functools import wraps
...@@ -569,11 +569,14 @@ class DatabaseManager(object): ...@@ -569,11 +569,14 @@ class DatabaseManager(object):
return current_tid, current_tid return current_tid, current_tid
return current_tid, tid return current_tid, tid
if transaction_object: if transaction_object:
current_tid = current_data_tid = u64(transaction_object[2]) try:
current_tid = current_data_tid = u64(transaction_object[2])
except struct.error:
current_tid = current_data_tid = tid
else: else:
current_tid, current_data_tid = getDataTID(before_tid=ltid) current_tid, current_data_tid = getDataTID(before_tid=ltid)
if current_tid is None: if current_tid is None:
return (None, None, False) return None, None, False
found_undone_tid, undone_data_tid = getDataTID(tid=undone_tid) found_undone_tid, undone_data_tid = getDataTID(tid=undone_tid)
assert found_undone_tid is not None, (oid, undone_tid) assert found_undone_tid is not None, (oid, undone_tid)
is_current = undone_data_tid in (current_data_tid, tid) is_current = undone_data_tid in (current_data_tid, tid)
......
...@@ -156,37 +156,45 @@ class Test(NEOThreadedTest): ...@@ -156,37 +156,45 @@ class Test(NEOThreadedTest):
self.assertEqual((x['tid'], x['size']), expected.pop()) self.assertEqual((x['tid'], x['size']), expected.pop())
self.assertFalse(expected) self.assertFalse(expected)
@with_cluster() def _testUndoConflict(self, cluster, *inc):
def testUndoConflict(self, cluster, conflict_during_store=False):
def waitResponses(orig, *args): def waitResponses(orig, *args):
orig(*args) orig(*args)
p.revert() p.revert()
ob.value += 3
t.commit()
if 1:
t, c = cluster.getTransaction()
c.root()[0] = ob = PCounterWithResolution()
t.commit() t.commit()
ob.value += 1 t, c = cluster.getTransaction()
c.root()[0] = ob = PCounterWithResolution()
t.commit()
tids = []
for x in inc:
ob.value += x
t.commit() t.commit()
undo = TransactionalUndo(cluster.db, (ob._p_serial,)) tids.append(ob._p_serial)
txn = transaction.Transaction() undo = TransactionalUndo(cluster.db, tids)
undo.tpc_begin(txn) txn = transaction.Transaction()
if conflict_during_store: undo.tpc_begin(txn)
with Patch(cluster.client, waitResponses=waitResponses) as p: ob.value += 5
undo.commit(txn) with Patch(cluster.client, waitResponses=waitResponses) as p:
else: undo.commit(txn)
ob.value += 3 undo.tpc_vote(txn)
t.commit() undo.tpc_finish(txn)
undo.commit(txn) t.begin()
undo.tpc_vote(txn) self.assertEqual(ob.value, 5)
undo.tpc_finish(txn) return ob
t.begin()
self.assertEqual(ob.value, 3) @with_cluster()
def testUndoConflictSmallCache(self, cluster):
big = 'x' * cluster.cache_size
def resolve(orig, *args):
state = orig(*args)
state['x'] = big
return state
with Patch(PCounterWithResolution, _p_resolveConflict=resolve):
self.assertEqual(self._testUndoConflict(cluster, 1, 3).x, big)
@expectedFailure(POSException.ConflictError) @expectedFailure(POSException.ConflictError)
def testUndoConflictDuringStore(self): @with_cluster()
self.testUndoConflict(True) def testUndoConflictDuringStore(self, cluster):
self._testUndoConflict(cluster, 1)
@with_cluster() @with_cluster()
def testStorageDataLock(self, cluster): def testStorageDataLock(self, cluster):
......
...@@ -28,14 +28,6 @@ class UndoTests(ZODBTestCase, StorageTestBase, TransactionalUndoStorage, ...@@ -28,14 +28,6 @@ class UndoTests(ZODBTestCase, StorageTestBase, TransactionalUndoStorage,
checkTransactionalUndoAfterPack = expectedFailure()( checkTransactionalUndoAfterPack = expectedFailure()(
TransactionalUndoStorage.checkTransactionalUndoAfterPack) TransactionalUndoStorage.checkTransactionalUndoAfterPack)
for x in ('checkUndoMultipleConflictResolution',
'checkUndoMultipleConflictResolutionReversed'):
try:
setattr(UndoTests, x,
expectedFailure(KeyError)(getattr(TransactionalUndoStorage, x)))
except AttributeError:
pass
if __name__ == "__main__": if __name__ == "__main__":
suite = unittest.makeSuite(UndoTests, 'check') suite = unittest.makeSuite(UndoTests, 'check')
unittest.main(defaultTest='suite') unittest.main(defaultTest='suite')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment