Commit df2bf949 authored by Julien Muchembled's avatar Julien Muchembled

undo: bugfixes

- When undoing current record, fix:
  - crash of storage nodes that don't have the undo data (non-readable cells);
  - and conflict resolution.
- Fix undo deduplication in replication when NEO deduplication is disabled.
- client: minor fixes in undo() about concurrent storage disconnections
  and PT updates.
parent fd95a217
......@@ -499,7 +499,6 @@ class Application(ThreadedApplication):
compression = 0
checksum = ZERO_HASH
else:
assert data_serial is None
size, compression, compressed_data = self.compress(data)
checksum = makeChecksum(compressed_data)
txn_context.data_size += size
......@@ -529,7 +528,7 @@ class Application(ThreadedApplication):
if data is CHECKED_SERIAL:
raise ReadConflictError(oid=oid,
serials=(serial, old_serial))
# TODO: data can be None if a conflict happens during undo
# data can be None if a conflict happens when undoing creation
if data:
txn_context.data_size -= len(data)
if self.last_tid < serial:
......@@ -760,7 +759,7 @@ class Application(ThreadedApplication):
'partition_oid_dict': partition_oid_dict,
'undo_object_tid_dict': undo_object_tid_dict,
}
while partition_oid_dict:
while 1:
for partition, oid_list in partition_oid_dict.iteritems():
cell_list = [cell
for cell in getCellList(partition, readable=True)
......@@ -769,11 +768,17 @@ class Application(ThreadedApplication):
# only between the client and the storage, the latter would
# still be readable until we commit.
if txn_context.conn_dict.get(cell.getUUID(), 0) is not None]
storage_conn = getConnForNode(
conn = getConnForNode(
min(cell_list, key=getCellSortKey).getNode())
storage_conn.ask(Packets.AskObjectUndoSerial(ttid,
try:
conn.ask(Packets.AskObjectUndoSerial(ttid,
snapshot_tid, undone_tid, oid_list),
partition=partition, **kw)
except AttributeError:
if conn is not None:
raise
except ConnectionClosed:
pass
# Wait for all AnswerObjectUndoSerial. We might get
# OidNotFoundError, meaning that objects in transaction's oid_list
......@@ -785,10 +790,37 @@ class Application(ThreadedApplication):
self.dispatcher.forget_queue(queue)
raise UndoError('non-undoable transaction')
if not partition_oid_dict:
break
# Do not retry too quickly, for example
# when there's an incoming PT update.
self.sync()
# Send undo data to all storage nodes.
for oid, (current_serial, undo_serial, is_current) in \
undo_object_tid_dict.iteritems():
if is_current:
if undo_serial:
# The data are used:
# - by outdated cells that don't have them
# - if there's a conflict to resolve
# Otherwise, they're ignored.
# IDEA: So as an optimization, if all cells we're going to
# write are readable, we could move the following
# load to _handleConflicts and simply pass None here.
# But evaluating such condition without race
# condition is not easy:
# 1. The transaction context must have established
# with all nodes that will be involved (e.g.
# doable while processing partition_oid_dict).
# 2. The partition table must be up-to-date by
# pinging the master (i.e. self.sync()).
# 3. At last, the PT can be looked up here.
try:
data = self.load(oid, undo_serial)[0]
except NEOStorageCreationUndoneError:
data = None
else:
data = None
else:
# Serial being undone is not the latest version for this
......
......@@ -505,9 +505,9 @@ class ImporterDatabaseManager(DatabaseManager):
break
if len(txn) == 3:
oid, data_id, data_tid = txn
if data_id is not None:
checksum, data, compression = data_id
data_id = self.holdData(checksum, oid, data, compression)
checksum, data, compression = data_id or (None, None, 0)
data_id = self.holdData(
checksum, oid, data, compression, data_tid)
data_id_list.append(data_id)
object_list.append((oid, data_id, data_tid))
# Give the main loop the opportunity to process requests
......@@ -518,7 +518,7 @@ class ImporterDatabaseManager(DatabaseManager):
# solved when resuming the migration.
# XXX: The leak was solved by the deduplication,
# but it was disabled by default.
else:
else: # len(txn) == 5
tid = txn[-1]
self.storeTransaction(tid, object_list,
((x[0] for x in object_list),) + txn,
......
......@@ -684,9 +684,16 @@ class DatabaseManager(object):
"""
@abstract
def storeData(self, checksum, oid, data, compression):
def storeData(self, checksum, oid, data, compression, data_tid):
"""To be overridden by the backend to store object raw data
'checksum' must be the result of makeChecksum(data).
'compression' indicates if 'data' is compressed.
In the case of undo, 'data_tid' may not be None:
- if (oid, data_tid) exists, the related data_id must be returned;
- else, if it can happen (e.g. cell is not readable), the caller
must have passed valid (checksum, data, compression) as fallback.
If same data was already stored, the storage only has to check there's
no hash collision.
"""
......@@ -696,21 +703,16 @@ class DatabaseManager(object):
"""Inverse of storeData
"""
def holdData(self, checksum_or_id, *args):
"""Store raw data of temporary object
def holdData(self, *args):
"""Store and hold data
If 'checksum_or_id' is a checksum, it must be the result of
makeChecksum(data) and extra parameters must be (data, compression)
where 'compression' indicates if 'data' is compressed.
A volatile reference is set to this data until 'releaseData' is called
with this checksum.
If called with only an id, it only increment the volatile
reference to the data matching the id.
The parameters are same as storeData.
A volatile reference is set to this data until 'releaseData' is called.
"""
if args:
checksum_or_id = self.storeData(checksum_or_id, *args)
self._uncommitted_data[checksum_or_id] += 1
return checksum_or_id
data_id = self.storeData(*args)
if data_id is not None:
self._uncommitted_data[data_id] += 1
return data_id
def releaseData(self, data_id_list, prune=False):
"""Release 1 volatile reference to given list of data ids
......
......@@ -607,19 +607,10 @@ class MySQLDatabaseManager(DatabaseManager):
for oid, data_id, value_serial in object_list:
oid = u64(oid)
partition = self._getPartition(oid)
if value_serial:
value_serial = u64(value_serial)
(data_id,), = q("SELECT data_id FROM obj"
" WHERE `partition`=%d AND oid=%d AND tid=%d"
% (partition, oid, value_serial))
if temporary:
self.holdData(data_id)
else:
value_serial = 'NULL'
value = "(%s,%s,%s,%s,%s)," % (
partition, oid, tid,
'NULL' if data_id is None else data_id,
value_serial)
u64(value_serial) if value_serial else 'NULL')
values_size += len(value)
# actually: max_values < values_size + EXTRA - len(final comma)
# (test_max_allowed_packet checks that EXTRA == 2)
......@@ -687,7 +678,17 @@ class MySQLDatabaseManager(DatabaseManager):
for i in xrange(bigdata_id,
bigdata_id + (length + 0x7fffff >> 23)))
def storeData(self, checksum, oid, data, compression, _pack=_structLL.pack):
def storeData(self, checksum, oid, data, compression, data_tid,
_pack=_structLL.pack):
oid = util.u64(oid)
p = self._getPartition(oid)
if data_tid:
for r, in self.query("SELECT data_id FROM obj"
" WHERE `partition`=%s AND oid=%s AND tid=%s"
% (p, oid, util.u64(data_tid))):
return r
if not checksum:
return # delete
e = self.escape
checksum = e(checksum)
if 0x1000000 <= len(data): # 16M (MEDIUMBLOB limit)
......@@ -715,7 +716,6 @@ class MySQLDatabaseManager(DatabaseManager):
i = bigdata_id = self.conn.insert_id()
i += 1
data = _pack(bigdata_id, length)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
self.query("INSERT INTO data VALUES (%s, '%s', %d, '%s')" %
......
......@@ -402,11 +402,6 @@ class SQLiteDatabaseManager(DatabaseManager):
partition = self._getPartition(oid)
if value_serial:
value_serial = u64(value_serial)
(data_id,), = q("SELECT data_id FROM obj"
" WHERE partition=? AND oid=? AND tid=?",
(partition, oid, value_serial))
if temporary:
self.holdData(data_id)
try:
q(obj_sql, (partition, oid, tid, data_id, value_serial))
except sqlite3.IntegrityError:
......@@ -445,10 +440,18 @@ class SQLiteDatabaseManager(DatabaseManager):
return len(data_id_list)
return 0
def storeData(self, checksum, oid, data, compression,
def storeData(self, checksum, oid, data, compression, data_tid,
_dup=unique_constraint_message("data", "hash", "compression")):
oid = util.u64(oid)
p = self._getPartition(oid)
if data_tid:
for r, in self.query("SELECT data_id FROM obj"
" WHERE partition=? AND oid=? AND tid=?",
(p, oid, util.u64(data_tid))):
return r
if not checksum:
return # delete
H = buffer(checksum)
p = self._getPartition(util.u64(oid))
r = self._data_last_ids[p]
try:
self.query("INSERT INTO data VALUES (?,?,?,?)",
......
......@@ -121,7 +121,6 @@ class ClientOperationHandler(BaseHandler):
if data or checksum != ZERO_HASH:
# TODO: return an appropriate error packet
assert makeChecksum(data) == checksum
assert data_serial is None
else:
checksum = data = None
try:
......
......@@ -106,13 +106,10 @@ class StorageOperationHandler(EventHandler):
def addObject(self, conn, oid, serial, compression,
checksum, data, data_serial):
dm = self.app.dm
if data or checksum != ZERO_HASH:
data_id = dm.storeData(checksum, oid, data, compression)
else:
data_id = None
# Directly store the transaction.
obj = oid, data_id, data_serial
dm.storeTransaction(serial, (obj,), None, False)
if not data and checksum == ZERO_HASH:
checksum = data = None
data_id = dm.storeData(checksum, oid, data, compression, data_serial)
dm.storeTransaction(serial, ((oid, data_id, data_serial),), None, False)
@checkConnectionIsReplicatorConnection
def replicationError(self, conn, message):
......
......@@ -425,11 +425,8 @@ class TransactionManager(EventQueue):
self._unstore(transaction, oid)
transaction.serial_dict[oid] = serial
# store object
if data is None:
data_id = None
else:
data_id = self._app.dm.holdData(checksum, oid, data, compression)
transaction.store(oid, data_id, value_serial)
transaction.store(oid, self._app.dm.holdData(
checksum, oid, data, compression, value_serial), value_serial)
if not locked:
return ZERO_TID
......@@ -573,8 +570,4 @@ class TransactionManager(EventQueue):
if lock_tid is not None:
transaction = self._transaction_dict[lock_tid]
if transaction.store_dict[oid][2] == orig_serial:
if new_serial:
data_id = None
else:
self._app.dm.holdData(data_id)
transaction.store(oid, data_id, new_serial)
......@@ -96,7 +96,7 @@ class StorageDBTests(NeoUnitTestBase):
self._last_ttid = ttid = add64(self._last_ttid, 1)
transaction = oid_list, 'user', 'desc', 'ext', False, ttid
H = "0" * 20
object_list = [(oid, self.db.holdData(H, oid, '', 1), None)
object_list = [(oid, self.db.holdData(H, oid, '', 1, None), None)
for oid in oid_list]
return (transaction, object_list)
......@@ -383,8 +383,8 @@ class StorageDBTests(NeoUnitTestBase):
tid4 = self.getNextTID()
tid5 = self.getNextTID()
oid1 = p64(1)
foo = db.holdData("3" * 20, oid1, 'foo', 0)
bar = db.holdData("4" * 20, oid1, 'bar', 0)
foo = db.holdData("3" * 20, oid1, 'foo', 0, None)
bar = db.holdData("4" * 20, oid1, 'bar', 0, None)
db.releaseData((foo, bar))
db.storeTransaction(
tid1, (
......
......@@ -102,7 +102,7 @@ class StorageMySQLdbTests(StorageDBTests):
self.assertEqual(2, max(len(self.db.escape(chr(x)))
for x in xrange(256)))
self.assertEqual(2, len(self.db.escape('\0')))
self.db.storeData('\0' * 20, ZERO_OID, '\0' * (2**24-1), 0)
self.db.storeData('\0' * 20, ZERO_OID, '\0' * (2**24-1), 0, None)
size, = query_list
max_allowed = self.db.__class__._max_allowed_packet
self.assertTrue(max_allowed - 1024 < size <= max_allowed, size)
......
......@@ -43,12 +43,12 @@ class TransactionManagerTests(NeoUnitTestBase):
locking_serial = self.getNextTID()
other_serial = self.getNextTID()
new_serial = self.getNextTID()
checksum = "2" * 20
data_id = (1 << 48) + 2
self.register(uuid, locking_serial)
# Object not known, nothing happens
self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
oid), None)
self.manager.updateObjectDataForPack(oid, orig_serial, None, checksum)
self.manager.updateObjectDataForPack(oid, orig_serial, None, data_id)
self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
oid), None)
self.manager.abort(locking_serial, even_if_locked=True)
......@@ -57,10 +57,11 @@ class TransactionManagerTests(NeoUnitTestBase):
self.manager.storeObject(locking_serial, ram_serial, oid, 0, "3" * 20,
'bar', None)
holdData = self.app.dm.mockGetNamedCalls('holdData')
self.assertEqual(holdData.pop(0).params, ("3" * 20, oid, 'bar', 0))
self.assertEqual(holdData.pop(0).params,
("3" * 20, oid, 'bar', 0, None))
orig_object = self.manager.getObjectFromTransaction(locking_serial,
oid)
self.manager.updateObjectDataForPack(oid, orig_serial, None, checksum)
self.manager.updateObjectDataForPack(oid, orig_serial, None, data_id)
self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
oid), orig_object)
self.manager.abort(locking_serial, even_if_locked=True)
......@@ -70,7 +71,7 @@ class TransactionManagerTests(NeoUnitTestBase):
None, other_serial)
orig_object = self.manager.getObjectFromTransaction(locking_serial,
oid)
self.manager.updateObjectDataForPack(oid, orig_serial, None, checksum)
self.manager.updateObjectDataForPack(oid, orig_serial, None, data_id)
self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
oid), orig_object)
self.manager.abort(locking_serial, even_if_locked=True)
......@@ -79,20 +80,18 @@ class TransactionManagerTests(NeoUnitTestBase):
self.manager.storeObject(locking_serial, ram_serial, oid, None, None,
None, orig_serial)
self.manager.updateObjectDataForPack(oid, orig_serial, new_serial,
checksum)
data_id)
self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
oid), (oid, None, new_serial))
oid), (oid, data_id, new_serial))
self.manager.abort(locking_serial, even_if_locked=True)
self.register(uuid, locking_serial)
self.manager.storeObject(locking_serial, ram_serial, oid, None, None,
None, orig_serial)
self.manager.updateObjectDataForPack(oid, orig_serial, None, checksum)
self.assertEqual(holdData.pop(0).params, (checksum,))
self.manager.updateObjectDataForPack(oid, orig_serial, None, data_id)
self.assertEqual(self.manager.getObjectFromTransaction(locking_serial,
oid), (oid, checksum, None))
oid), (oid, data_id, None))
self.manager.abort(locking_serial, even_if_locked=True)
self.assertFalse(holdData)
if __name__ == "__main__":
unittest.main()
......@@ -180,9 +180,26 @@ class Test(NEOThreadedTest):
@with_cluster()
def testUndoConflictDuringStore(self, cluster):
with self.expectedFailure(POSException.ConflictError): \
self._testUndoConflict(cluster, 1)
@with_cluster()
def testUndoConflictCreationUndo(self, cluster):
def waitResponses(orig, *args):
orig(*args)
p.revert()
t.commit()
t, c = cluster.getTransaction()
c.root()[0] = ob = PCounterWithResolution()
t.commit()
undo = TransactionalUndo(cluster.db, [ob._p_serial])
txn = transaction.Transaction()
undo.tpc_begin(txn)
ob.value += 1
with Patch(cluster.client, waitResponses=waitResponses) as p:
self.assertRaises(POSException.ConflictError, undo.commit, txn)
t.begin()
self.assertEqual(ob.value, 1)
def testStorageDataLock(self, dedup=False):
with NEOCluster(dedup=dedup) as cluster:
cluster.start()
......@@ -1743,7 +1760,7 @@ class Test(NEOThreadedTest):
bad = []
ok = []
def data_args(value):
return makeChecksum(value), ZERO_OID, value, 0
return makeChecksum(value), ZERO_OID, value, 0, None
node_list = []
for i, s in enumerate(cluster.storage_list):
node_list.append(s.uuid)
......
......@@ -362,9 +362,6 @@ class ReplicationTests(NEOThreadedTest):
"""
Check both IStorage.history and replication when the DB contains a
deletion record.
XXX: This test reveals that without --dedup, the replication does not
preserve the deduplication that is done by the 'undo' code.
"""
storage = backup.upstream.getZODBStorage()
oid = storage.new_oid()
......@@ -385,6 +382,8 @@ class ReplicationTests(NEOThreadedTest):
self.assertFalse(expected)
self.tic()
self.assertEqual(1, self.checkBackup(backup))
for cluster in backup, backup.upstream:
self.assertEqual(1, cluster.storage.sqlCount('data'))
@backup_test()
def testBackupTid(self, backup):
......
......@@ -19,7 +19,8 @@ from ZODB.tests.StorageTestBase import StorageTestBase
from ZODB.tests.TransactionalUndoStorage import TransactionalUndoStorage
from ZODB.tests.ConflictResolution import ConflictResolvingTransUndoStorage
from .. import expectedFailure
from neo.client.app import Application as ClientApplication
from .. import expectedFailure, Patch
from . import ZODBTestCase
class UndoTests(ZODBTestCase, StorageTestBase, TransactionalUndoStorage,
......@@ -28,7 +29,30 @@ class UndoTests(ZODBTestCase, StorageTestBase, TransactionalUndoStorage,
checkTransactionalUndoAfterPack = expectedFailure()(
TransactionalUndoStorage.checkTransactionalUndoAfterPack)
class AltUndoTests(UndoTests):
"""
These tests covers the beginning of an alternate implementation of undo,
as described by the IDEA comment in the undo method of client's app.
More precisely, they check that the protocol keeps support for data=None
in AskStoreObject when cells are readable.
"""
_patch = Patch(ClientApplication, _store=
lambda orig, self, txn_context, oid, serial, data, data_serial=None:
orig(self, txn_context, oid, serial,
None if data_serial else data, data_serial))
def setUp(self):
super(AltUndoTests, self).setUp()
self._patch.apply()
def _tearDown(self, success):
self._patch.revert()
super(AltUndoTests, self)._tearDown(success)
if __name__ == "__main__":
suite = unittest.makeSuite(UndoTests, 'check')
suite = unittest.TestSuite((
unittest.makeSuite(UndoTests, 'check'),
unittest.makeSuite(AltUndoTests, 'check'),
))
unittest.main(defaultTest='suite')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment