Commit e387ad59 authored by Julien Muchembled's avatar Julien Muchembled

importer: fix possible data loss on writeback

If the source DB is lost during the import and then restored from a backup,
all new transactions have to written back again on resume. It is the most
common case for which the writeback hits the maximum number of transactions
per partition to process at each iteration; the previous code was buggy in
that it could skip transactions.
parent 48d936cb
...@@ -678,6 +678,7 @@ class WriteBack(object): ...@@ -678,6 +678,7 @@ class WriteBack(object):
_changed = False _changed = False
_process = None _process = None
chunk_size = 100
def __init__(self, db, storage): def __init__(self, db, storage):
self._db = db self._db = db
...@@ -737,7 +738,6 @@ class WriteBack(object): ...@@ -737,7 +738,6 @@ class WriteBack(object):
def iterator(self): def iterator(self):
db = self._db db = self._db
np = self._np np = self._np
chunk_size = max(2, 1000 // np)
offset_list = xrange(np) offset_list = xrange(np)
while 1: while 1:
with db: with db:
...@@ -748,23 +748,26 @@ class WriteBack(object): ...@@ -748,23 +748,26 @@ class WriteBack(object):
if np == len(db._readable_set): if np == len(db._readable_set):
while 1: while 1:
tid_list = [] tid_list = []
loop = False max_tid = MAX_TID
for offset in offset_list: for offset in offset_list:
x = db.getReplicationTIDList( x = db.getReplicationTIDList(
self.min_tid, MAX_TID, chunk_size, offset) self.min_tid, max_tid, self.chunk_size, offset)
tid_list += x tid_list += x
if len(x) == chunk_size: if len(x) == self.chunk_size:
loop = True max_tid = x[-1]
if tid_list: if not tid_list:
break
tid_list.sort() tid_list.sort()
for tid in tid_list: for tid in tid_list:
if self._stop.is_set(): if self._stop.is_set():
return return
yield TransactionRecord(db, tid) yield TransactionRecord(db, tid)
if tid == max_tid:
break
else:
self.min_tid = util.add64(tid, 1) self.min_tid = util.add64(tid, 1)
if loop:
continue
break break
self.min_tid = util.add64(tid, 1)
if not self._event.is_set(): if not self._event.is_set():
self._idle.set() self._idle.set()
self._event.wait() self._event.wait()
......
...@@ -21,9 +21,11 @@ import os, random, shutil, time, unittest ...@@ -21,9 +21,11 @@ import os, random, shutil, time, unittest
import transaction, ZODB import transaction, ZODB
from neo.client.exception import NEOPrimaryMasterLost from neo.client.exception import NEOPrimaryMasterLost
from neo.lib import logging from neo.lib import logging
from neo.lib.util import u64 from neo.lib.util import p64, u64
from neo.master.transactions import TransactionManager
from neo.storage.database import getAdapterKlass, importer, manager from neo.storage.database import getAdapterKlass, importer, manager
from neo.storage.database.importer import Repickler, TransactionRecord from neo.storage.database.importer import \
Repickler, TransactionRecord, WriteBack
from .. import expectedFailure, getTempDirectory, random_tree, Patch from .. import expectedFailure, getTempDirectory, random_tree, Patch
from . import NEOCluster, NEOThreadedTest from . import NEOCluster, NEOThreadedTest
from ZODB import serialize from ZODB import serialize
...@@ -179,7 +181,7 @@ class ImporterTests(NEOThreadedTest): ...@@ -179,7 +181,7 @@ class ImporterTests(NEOThreadedTest):
del db_list, iter_list del db_list, iter_list
#del zodb[0][1][zodb.pop()[0]] #del zodb[0][1][zodb.pop()[0]]
# Start NEO cluster with transparent import. # Start NEO cluster with transparent import.
with NEOCluster(importer=importer) as cluster: with NEOCluster(importer=importer, partitions=2) as cluster:
# Suspend import for a while, so that import # Suspend import for a while, so that import
# is finished in the middle of the below 'for' loop. # is finished in the middle of the below 'for' loop.
# Use a slightly different main loop for storage so that it # Use a slightly different main loop for storage so that it
...@@ -243,10 +245,10 @@ class ImporterTests(NEOThreadedTest): ...@@ -243,10 +245,10 @@ class ImporterTests(NEOThreadedTest):
db.close() db.close()
@unittest.skipUnless(importer.FORK, 'no os.fork') @unittest.skipUnless(importer.FORK, 'no os.fork')
def test1(self): def testMultiProcessWriteBack(self):
self._importFromFileStorage() self._importFromFileStorage()
def testThreadedWriteback(self): def testThreadedWritebackAndDBReconnection(self):
# Also check reconnection to the underlying DB for relevant backends. # Also check reconnection to the underlying DB for relevant backends.
tid_list = [] tid_list = []
def __init__(orig, tr, db, tid): def __init__(orig, tr, db, tid):
...@@ -274,6 +276,24 @@ class ImporterTests(NEOThreadedTest): ...@@ -274,6 +276,24 @@ class ImporterTests(NEOThreadedTest):
self.assertFalse(p.applied) self.assertFalse(p.applied)
self.assertEqual(len(tid_list), 11) self.assertEqual(len(tid_list), 11)
def testThreadedWritebackWithUnbalancedPartitions(self):
N = 7
nonlocal_ = [0]
def committed(orig, self):
if nonlocal_[0] > N:
orig(self)
def _nextTID(orig, self, *args):
if args:
return orig(self, *args)
nonlocal_[0] += 1
return orig(self, p64(nonlocal_[0] == N), 2)
with Patch(importer, FORK=False), \
Patch(TransactionManager, _nextTID=_nextTID), \
Patch(WriteBack, chunk_size=N-2), \
Patch(WriteBack, committed=committed):
self._importFromFileStorage()
self.assertEqual(nonlocal_[0], 10)
def testMerge(self): def testMerge(self):
multi = 1, 2, 3 multi = 1, 2, 3
self._importFromFileStorage(multi, self._importFromFileStorage(multi,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment