Commit bd5e1658 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6c679687
...@@ -1472,7 +1472,6 @@ class AddObject(Packet): ...@@ -1472,7 +1472,6 @@ class AddObject(Packet):
PTID('data_serial'), PTID('data_serial'),
) )
# NOTE
class Replicate(Packet): class Replicate(Packet):
""" """
Notify a storage node to replicate partitions up to given 'tid' Notify a storage node to replicate partitions up to given 'tid'
......
...@@ -213,7 +213,8 @@ class BackupApplication(object): ...@@ -213,7 +213,8 @@ class BackupApplication(object):
for cell in pt.getCellList(offset, readable=True): for cell in pt.getCellList(offset, readable=True):
node = cell.getNode() node = cell.getNode()
assert node.isConnected(), node assert node.isConnected(), node
if cell.backup_tid == prev_tid: # XXX ? if cell.backup_tid == prev_tid:
"""
# Let's given 4 TID t0,t1,t2,t3: if a cell is only # Let's given 4 TID t0,t1,t2,t3: if a cell is only
# modified by t0 & t3 and has all data for t0, 4 values # modified by t0 & t3 and has all data for t0, 4 values
# are possible for its 'backup_tid' until it replicates # are possible for its 'backup_tid' until it replicates
...@@ -224,10 +225,12 @@ class BackupApplication(object): ...@@ -224,10 +225,12 @@ class BackupApplication(object):
# all partitions. t1 is wrong for the same reason. # all partitions. t1 is wrong for the same reason.
# So we have chosen the highest one (t3 - 1). # So we have chosen the highest one (t3 - 1).
# t2 should also work but maybe harder to implement. # t2 should also work but maybe harder to implement.
cell.backup_tid = add64(tid, -1) cell.backup_tid = add64(tid, -1) # XXX wrong! (we did not yet pulled the data)
logging.debug( logging.debug(
"partition %u: updating backup_tid of %r to %s", "partition %u: updating backup_tid of %r to %s",
offset, cell, dump(cell.backup_tid)) offset, cell, dump(cell.backup_tid))
"""
pass
else: else:
assert cell.backup_tid < last_max_tid, ( assert cell.backup_tid < last_max_tid, (
cell.backup_tid, last_max_tid, prev_tid, tid) cell.backup_tid, last_max_tid, prev_tid, tid)
...@@ -240,8 +243,8 @@ class BackupApplication(object): ...@@ -240,8 +243,8 @@ class BackupApplication(object):
self.primary_partition_dict[offset] = \ self.primary_partition_dict[offset] = \
random.choice(node_list) random.choice(node_list)
else: else:
# Partition not touched, so increase 'backup_tid' of all NOTE # Partition not touched, so increase 'backup_tid' of all
# "up-to-date" replicas, without having to replicate. (probably relates to backup_tid=tid initial bug) # "up-to-date" replicas, without having to replicate.
for cell in pt.getCellList(offset, readable=True): for cell in pt.getCellList(offset, readable=True):
if last_max_tid <= cell.backup_tid: if last_max_tid <= cell.backup_tid:
cell.backup_tid = tid cell.backup_tid = tid
......
...@@ -21,6 +21,8 @@ from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets, ...@@ -21,6 +21,8 @@ from neo.lib.protocol import (uuid_str, NodeTypes, NodeStates, Packets,
BrokenNodeDisallowedError, BrokenNodeDisallowedError,
) )
X = 0
class MasterHandler(EventHandler): class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
...@@ -68,9 +70,17 @@ class MasterHandler(EventHandler): ...@@ -68,9 +70,17 @@ class MasterHandler(EventHandler):
def askRecovery(self, conn): def askRecovery(self, conn):
app = self.app app = self.app
backup_tid = app.backup_tid
pt_backup_tid = None
if backup_tid:
pt_backup_tid = app.pt.getBackupTid()
if X:
print 'MASTER askRecovery .backup_tid: %r pt.getBackupTid(): %r' % (
backup_tid, pt_backup_tid)
conn.answer(Packets.AnswerRecovery( conn.answer(Packets.AnswerRecovery(
app.pt.getID(), app.pt.getID(),
app.backup_tid and app.pt.getBackupTid(), #app.backup_tid and app.pt.getBackupTid(),
backup_tid and pt_backup_tid,
app.truncate_tid)) app.truncate_tid))
def askLastIDs(self, conn): def askLastIDs(self, conn):
......
...@@ -61,8 +61,8 @@ UNIT_TEST_MODULES = [ ...@@ -61,8 +61,8 @@ UNIT_TEST_MODULES = [
# 'neo.tests.client.testStorageHandler', # 'neo.tests.client.testStorageHandler',
# 'neo.tests.client.testConnectionPool', # 'neo.tests.client.testConnectionPool',
# light functional tests # light functional tests
'neo.tests.threaded.test', # 'neo.tests.threaded.test',
'neo.tests.threaded.testImporter', # 'neo.tests.threaded.testImporter',
'neo.tests.threaded.testReplication', 'neo.tests.threaded.testReplication',
# 'neo.tests.threaded.testSSL', # 'neo.tests.threaded.testSSL',
] ]
......
...@@ -21,6 +21,8 @@ from neo.lib.exception import DatabaseFailure ...@@ -21,6 +21,8 @@ from neo.lib.exception import DatabaseFailure
from neo.lib.interfaces import abstract, requires from neo.lib.interfaces import abstract, requires
from neo.lib.protocol import ZERO_TID from neo.lib.protocol import ZERO_TID
X = 0
def lazymethod(func): def lazymethod(func):
def getter(self): def getter(self):
cls = self.__class__ cls = self.__class__
...@@ -246,6 +248,10 @@ class DatabaseManager(object): ...@@ -246,6 +248,10 @@ class DatabaseManager(object):
return util.bin(self.getConfiguration('backup_tid')) return util.bin(self.getConfiguration('backup_tid'))
def _setBackupTID(self, tid): def _setBackupTID(self, tid):
if X:
print
print 'SET backup_tid: %r' % tid
import traceback; traceback.print_stack()
tid = util.dump(tid) tid = util.dump(tid)
logging.debug('backup_tid = %s', tid) logging.debug('backup_tid = %s', tid)
return self._setConfiguration('backup_tid', tid) return self._setConfiguration('backup_tid', tid)
...@@ -633,6 +639,7 @@ class DatabaseManager(object): ...@@ -633,6 +639,7 @@ class DatabaseManager(object):
value value
""" """
# FIXME checks only from tids, not oids/objects inside transactions
@abstract @abstract
def checkTIDRange(self, partition, length, min_tid, max_tid): def checkTIDRange(self, partition, length, min_tid, max_tid):
""" """
......
...@@ -477,8 +477,9 @@ class LoggerThreadName(str): ...@@ -477,8 +477,9 @@ class LoggerThreadName(str):
return str.__str__(self) return str.__str__(self)
# filters-out packet which are detected by filter-criterions setup with .add() # catch & delay packets which are detected by filter-criterions setup with .add().
# for a packed detected tobe filtered; further pkts on same conn are always filtered # for a packed detected to be filtered; further pkts on same conn are always filtered.
# delayed packets are delivered after `with ConnnectionFilter` ends.
class ConnectionFilter(object): class ConnectionFilter(object):
filtered_count = 0 filtered_count = 0
...@@ -488,7 +489,7 @@ class ConnectionFilter(object): ...@@ -488,7 +489,7 @@ class ConnectionFilter(object):
_addPacket = Connection._addPacket _addPacket = Connection._addPacket
@contextmanager @contextmanager
def __new__(cls, conn_list=()): def __new__(cls, conn_list=()): # NOTE conn_list=() -> for all connections
self = object.__new__(cls) self = object.__new__(cls)
self.filter_dict = {} self.filter_dict = {}
self.conn_list = frozenset(conn_list) self.conn_list = frozenset(conn_list)
......
...@@ -33,7 +33,6 @@ from .. import Patch ...@@ -33,7 +33,6 @@ from .. import Patch
from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random from . import ConnectionFilter, NEOCluster, NEOThreadedTest, predictable_random
# NOTE
def backup_test(partitions=1, upstream_kw={}, backup_kw={}): def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
def decorator(wrapped): def decorator(wrapped):
def wrapper(self): def wrapper(self):
...@@ -54,7 +53,38 @@ def backup_test(partitions=1, upstream_kw={}, backup_kw={}): ...@@ -54,7 +53,38 @@ def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
return decorator return decorator
# NOTE # handy tool to get various ids of a cluster in tests
class IDs:
def __init__(self, cluster):
self.cluster = cluster
def _recovery(self):
return self.cluster.neoctl.getRecovery()
@property
def ptid(self):
return self._recovery()[0]
@property
def backup_tid(self):
return self._recovery()[1]
@property
def truncated_tid(self):
return self._recovery()[2]
@property
def last_tid(self):
return self.cluster.master.getLastTransaction()
# XXX and attributes
@property
def cluster_state(self):
return self.cluster.neoctl.getClusterState()
class ReplicationTests(NEOThreadedTest): class ReplicationTests(NEOThreadedTest):
def checksumPartition(self, storage, partition, max_tid=MAX_TID): def checksumPartition(self, storage, partition, max_tid=MAX_TID):
...@@ -95,6 +125,10 @@ class ReplicationTests(NEOThreadedTest): ...@@ -95,6 +125,10 @@ class ReplicationTests(NEOThreadedTest):
importZODB(3) importZODB(3)
backup = NEOCluster(partitions=np, replicas=nr-1, storage_count=5, backup = NEOCluster(partitions=np, replicas=nr-1, storage_count=5,
upstream=upstream) upstream=upstream)
U = IDs(upstream)
B = IDs(backup)
# U -> B propagation
try: try:
backup.start() backup.start()
# Initialize & catch up. # Initialize & catch up.
...@@ -104,8 +138,12 @@ class ReplicationTests(NEOThreadedTest): ...@@ -104,8 +138,12 @@ class ReplicationTests(NEOThreadedTest):
# Normal case, following upstream cluster closely. # Normal case, following upstream cluster closely.
importZODB(17) importZODB(17)
self.tic() self.tic()
self.assertEqual(B.backup_tid, U.last_tid)
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(np*nr, self.checkBackup(backup)) self.assertEqual(np*nr, self.checkBackup(backup))
# Check that a backup cluster can be restarted. # Check that a backup cluster can be restarted.
# (U -> B propagation after restart)
finally: finally:
backup.stop() backup.stop()
backup.reset() backup.reset()
...@@ -115,38 +153,78 @@ class ReplicationTests(NEOThreadedTest): ...@@ -115,38 +153,78 @@ class ReplicationTests(NEOThreadedTest):
ClusterStates.BACKINGUP) ClusterStates.BACKINGUP)
importZODB(17) importZODB(17)
self.tic() self.tic()
self.assertEqual(B.backup_tid, U.last_tid)
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(np*nr, self.checkBackup(backup)) self.assertEqual(np*nr, self.checkBackup(backup))
backup.neoctl.checkReplicas(check_dict, ZERO_TID, None) backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
self.tic() self.tic()
# Stop backing up, nothing truncated. # Stop backing up, nothing truncated.
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
self.tic() self.tic()
self.assertEqual(B.backup_tid, None)
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(np*nr, self.checkBackup(backup)) self.assertEqual(np*nr, self.checkBackup(backup))
self.assertEqual(backup.neoctl.getClusterState(), self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.RUNNING) ClusterStates.RUNNING)
finally: finally:
backup.stop() backup.stop()
# U -> B propagation with Mb -> Sb (Replicate) delayed
from neo.storage.database import manager as dbmanager
from neo.master import handlers as mhandler
#dbmanager.X = 1
#mhandler.X = 1
def delaySecondary(conn, packet): def delaySecondary(conn, packet):
if isinstance(packet, Packets.Replicate): if isinstance(packet, Packets.Replicate):
tid, upstream_name, source_dict = packet.decode() tid, upstream_name, source_dict = packet.decode()
return not upstream_name and all(source_dict.itervalues()) print 'REPLICATE tid: %r, upstream_name: %r, source_dict: %r' % \
(tid, upstream_name, source_dict)
return True
#return not upstream_name and all(source_dict.itervalues())
return upstream_name != ""
backup.reset() backup.reset()
try: try:
backup.start() backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic() self.tic()
u_last_tid0 = U.last_tid
self.assertEqual(B.backup_tid, u_last_tid0)
self.assertEqual(B.last_tid, u_last_tid0)
print
print 'B.backup_tid: %r' % B.backup_tid
with backup.master.filterConnection(*backup.storage_list) as f: with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary) f.add(delaySecondary) # NOTE delays backup pickup Mb -> Sb
print 'B.backup_tid: %r' % B.backup_tid
while not f.filtered_count: while not f.filtered_count:
print 'B.backup_tid: %r' % B.backup_tid
importZODB(1) importZODB(1)
self.tic() print 'B.backup_tid: %r' % B.backup_tid
self.tic()
print 'B.backup_tid: %r' % B.backup_tid
print
print 'u_last_tid0: %r' % u_last_tid0
print 'U.last_tid: %r' % U.last_tid
print 'B.backup_tid: %r' % B.backup_tid
self.assertGreater(U.last_tid, u_last_tid0)
self.assertEqual(B.backup_tid, u_last_tid0)
self.assertEqual(B.last_tid, U.last_tid)
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
self.tic() self.tic()
self.assertEqual(B.cluster_state, ClusterStates.RECOVERING)
self.assertEqual(B.backup_tid, None)
self.assertEqual(B.last_tid, U.last_tid) # not-yet truncated
self.tic() self.tic()
self.assertEqual(B.cluster_state, ClusterStates.RUNNING)
self.assertEqual(B.backup_tid, None)
self.assertEqual(B.last_tid, u_last_tid0) # truncated after recovery
self.assertEqual(np*nr, self.checkBackup(backup, self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction())) max_tid=backup.master.getLastTransaction()))
finally: finally:
backup.stop() backup.stop()
dbmanager.X = 0
mhandler.X = 0
# S -> Sb (AddObject) delayed
backup.reset() backup.reset()
try: try:
backup.start() backup.start()
...@@ -158,8 +236,11 @@ class ReplicationTests(NEOThreadedTest): ...@@ -158,8 +236,11 @@ class ReplicationTests(NEOThreadedTest):
while not f.filtered_count: while not f.filtered_count:
importZODB(1) importZODB(1)
self.tic() self.tic()
# TODO assert B.last_tid = U.last_tid
# assert B.backup_tid < U.last_tid
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP) backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
self.tic() self.tic()
# TODO assert B.last_tid = B^.backup_tid ( < U.last_tid )
self.tic() self.tic()
self.assertEqual(np*nr, self.checkBackup(backup, self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction())) max_tid=backup.master.getLastTransaction()))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment