Commit 192b2ba8 authored by Julien Muchembled's avatar Julien Muchembled

backup: delay processing of replication when requested transaction is still locked

Without this, transactions and even objects may be missing on the backup
database.
parent ebabc92e
...@@ -19,7 +19,7 @@ from functools import wraps ...@@ -19,7 +19,7 @@ from functools import wraps
import neo.lib import neo.lib
from neo.lib.connector import ConnectorConnectionClosedException from neo.lib.connector import ConnectorConnectionClosedException
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import Errors, NodeStates, Packets, \ from neo.lib.protocol import Errors, NodeStates, Packets, ProtocolError, \
ZERO_HASH, ZERO_TID, ZERO_OID ZERO_HASH, ZERO_TID, ZERO_OID
from neo.lib.util import add64 from neo.lib.util import add64
...@@ -168,6 +168,12 @@ class StorageOperationHandler(EventHandler): ...@@ -168,6 +168,12 @@ class StorageOperationHandler(EventHandler):
def askFetchTransactions(self, conn, partition, length, min_tid, max_tid, def askFetchTransactions(self, conn, partition, length, min_tid, max_tid,
tid_list): tid_list):
app = self.app app = self.app
if app.tm.isLockedTid(max_tid):
# Wow, backup cluster is fast. Requested transactions are still in
# ttrans/ttobj so wait a little.
app.queueEvent(self.askFetchTransactions, conn,
(partition, length, min_tid, max_tid, tid_list))
return
msg_id = conn.getPeerId() msg_id = conn.getPeerId()
conn = weakref.proxy(conn) conn = weakref.proxy(conn)
peer_tid_set = set(tid_list) peer_tid_set = set(tid_list)
...@@ -202,6 +208,8 @@ class StorageOperationHandler(EventHandler): ...@@ -202,6 +208,8 @@ class StorageOperationHandler(EventHandler):
def askFetchObjects(self, conn, partition, length, min_tid, max_tid, def askFetchObjects(self, conn, partition, length, min_tid, max_tid,
min_oid, object_dict): min_oid, object_dict):
app = self.app app = self.app
if app.tm.isLockedTid(max_tid):
raise ProtocolError("transactions must be fetched before objects")
msg_id = conn.getPeerId() msg_id = conn.getPeerId()
conn = weakref.proxy(conn) conn = weakref.proxy(conn)
dm = app.dm dm = app.dm
......
...@@ -85,6 +85,11 @@ class Replicator(object): ...@@ -85,6 +85,11 @@ class Replicator(object):
if node is not None and node.isConnected(): if node is not None and node.isConnected():
return node.getConnection() return node.getConnection()
# XXX: We can't replicate unfinished transactions but do we need such
# complex code ? Backup mechanism does not rely on this: instead
# the upstream storage delays the answer. Maybe we can do the same
# for internal replication.
def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list): def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
"""This is a callback from MasterOperationHandler.""" """This is a callback from MasterOperationHandler."""
if ttid_list: if ttid_list:
......
...@@ -361,6 +361,12 @@ class TransactionManager(object): ...@@ -361,6 +361,12 @@ class TransactionManager(object):
if transaction_set is not None and not transaction_set: if transaction_set is not None and not transaction_set:
del self._uuid_dict[uuid] del self._uuid_dict[uuid]
def isLockedTid(self, tid):
for t in self._transaction_dict.itervalues():
if t.isLocked() and t.getTID() <= tid:
return True
return False
def loadLocked(self, oid): def loadLocked(self, oid):
return oid in self._load_lock_dict return oid in self._load_lock_dict
......
...@@ -264,6 +264,18 @@ class ReplicationTests(NEOThreadedTest): ...@@ -264,6 +264,18 @@ class ReplicationTests(NEOThreadedTest):
new_conn, = backup.master.getConnectionList(backup.upstream.master) new_conn, = backup.master.getConnectionList(backup.upstream.master)
self.assertFalse(new_conn is conn) self.assertFalse(new_conn is conn)
@backup_test()
def testBackupDelayedUnlockTransaction(self, backup):
upstream = backup.upstream
with upstream.master.filterConnection(upstream.storage) as f:
f.add(lambda conn, packet:
isinstance(packet, Packets.NotifyUnlockInformation))
upstream.importZODB()(1)
upstream.client.setPoll(0)
backup.tic()
backup.tic(force=1)
self.assertEqual(1, self.checkBackup(backup))
def testReplicationAbortedBySource(self): def testReplicationAbortedBySource(self):
""" """
Check that a feeding node aborts replication when its partition is Check that a feeding node aborts replication when its partition is
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment