Commit de645092 authored by Julien Muchembled's avatar Julien Muchembled

importer: add support for undo, faster conflict check when import is finished

parent bd5ba87a
...@@ -516,7 +516,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -516,7 +516,7 @@ class ImporterDatabaseManager(DatabaseManager):
" your configuration to use the native backend and restart.") " your configuration to use the native backend and restart.")
self._import = None self._import = None
for x in """getObject getReplicationTIDList getReplicationObjectList for x in """getObject getReplicationTIDList getReplicationObjectList
_fetchObject _fetchObject _getDataTID getLastObjectTID
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
for zodb in self.zodb: for zodb in self.zodb:
...@@ -574,6 +574,19 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -574,6 +574,19 @@ class ImporterDatabaseManager(DatabaseManager):
return (max(tid, util.p64(self.zodb_ltid)), return (max(tid, util.p64(self.zodb_ltid)),
max(oid, util.p64(self.zodb_loid))) max(oid, util.p64(self.zodb_loid)))
def _getObject(self, oid, tid=None, before_tid=None):
p64 = util.p64
r = self.getObject(p64(oid),
None if tid is None else p64(tid),
None if before_tid is None else p64(before_tid))
if r:
serial, next_serial, compression, checksum, data, data_serial = r
u64 = util.u64
return (u64(serial),
next_serial and u64(next_serial),
compression, checksum, data,
data_serial and u64(data_serial))
def getObject(self, oid, tid=None, before_tid=None): def getObject(self, oid, tid=None, before_tid=None):
u64 = util.u64 u64 = util.u64
u_oid = u64(oid) u_oid = u64(oid)
......
...@@ -29,6 +29,7 @@ from neo.storage.database.importer import \ ...@@ -29,6 +29,7 @@ from neo.storage.database.importer import \
from .. import expectedFailure, getTempDirectory, random_tree, Patch from .. import expectedFailure, getTempDirectory, random_tree, Patch
from . import NEOCluster, NEOThreadedTest from . import NEOCluster, NEOThreadedTest
from ZODB import serialize from ZODB import serialize
from ZODB.DB import TransactionalUndo
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
class Equal: class Equal:
...@@ -242,6 +243,13 @@ class ImporterTests(NEOThreadedTest): ...@@ -242,6 +243,13 @@ class ImporterTests(NEOThreadedTest):
t.commit() t.commit()
if cluster.storage.dm._import: if cluster.storage.dm._import:
last_import = i last_import = i
for x in 0, 1:
undo = TransactionalUndo(c.db(), [storage.lastTransaction()])
txn = transaction.Transaction()
undo.tpc_begin(txn)
undo.commit(txn)
undo.tpc_vote(txn)
undo.tpc_finish(txn)
self.tic() self.tic()
# Same as above. We want last_import smaller enough compared to i # Same as above. We want last_import smaller enough compared to i
assert i < last_import * 3 < 2 * i, (last_import, i) assert i < last_import * 3 < 2 * i, (last_import, i)
...@@ -286,7 +294,7 @@ class ImporterTests(NEOThreadedTest): ...@@ -286,7 +294,7 @@ class ImporterTests(NEOThreadedTest):
Patch(time, sleep=sleep) as p: Patch(time, sleep=sleep) as p:
self._importFromFileStorage() self._importFromFileStorage()
self.assertFalse(p.applied) self.assertFalse(p.applied)
self.assertEqual(len(tid_list), 11) self.assertEqual(len(tid_list), 13)
def testThreadedWritebackWithUnbalancedPartitions(self): def testThreadedWritebackWithUnbalancedPartitions(self):
N = 7 N = 7
...@@ -304,7 +312,7 @@ class ImporterTests(NEOThreadedTest): ...@@ -304,7 +312,7 @@ class ImporterTests(NEOThreadedTest):
Patch(WriteBack, chunk_size=N-2), \ Patch(WriteBack, chunk_size=N-2), \
Patch(WriteBack, committed=committed): Patch(WriteBack, committed=committed):
self._importFromFileStorage() self._importFromFileStorage()
self.assertEqual(nonlocal_[0], 10) self.assertEqual(nonlocal_[0], 12)
def testMerge(self): def testMerge(self):
multi = 1, 2, 3 multi = 1, 2, 3
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment