Commit b9cac3f8 authored by Julien Muchembled's avatar Julien Muchembled

New --new-nid storage option for fast cloning

It is often faster to set up replicas by stopping a node (and any
underlying database server like MariaDB) and do a raw copy of the
database (e.g. with rsync). So far, it required to stop the whole
cluster and use tools like 'mysql' or sqlite3' to edit:
- the 'pt' table in databases,
- the 'config.nid' values of the new nodes.

With this new option, if you already have 1 replica, you can set up
new replicas with such fast raw copy, and without interruption of
service. Obviously, this implies less redundancy during the operation.
parent 250f3fe6
...@@ -226,8 +226,8 @@ class Application(ThreadedApplication): ...@@ -226,8 +226,8 @@ class Application(ThreadedApplication):
self.notifications_handler, self.notifications_handler,
node=node, node=node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
p = Packets.RequestIdentification( p = Packets.RequestIdentification(NodeTypes.CLIENT,
NodeTypes.CLIENT, self.uuid, None, self.name, (), None) self.uuid, None, self.name, None, (), ())
try: try:
ask(conn, p, handler=handler) ask(conn, p, handler=handler)
except ConnectionClosed: except ConnectionClosed:
...@@ -270,7 +270,7 @@ class Application(ThreadedApplication): ...@@ -270,7 +270,7 @@ class Application(ThreadedApplication):
conn = MTClientConnection(self, self.storage_event_handler, node, conn = MTClientConnection(self, self.storage_event_handler, node,
dispatcher=self.dispatcher) dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT, p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, (), self.id_timestamp) self.uuid, None, self.name, self.id_timestamp, (), ())
try: try:
self._ask(conn, p, handler=self.storage_bootstrap_handler) self._ask(conn, p, handler=self.storage_bootstrap_handler)
except ConnectionClosed: except ConnectionClosed:
......
...@@ -26,7 +26,7 @@ class BootstrapManager(EventHandler): ...@@ -26,7 +26,7 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it Manage the bootstrap stage, lookup for the primary master then connect to it
""" """
def __init__(self, app, node_type, server=None, devpath=()): def __init__(self, app, node_type, server=None, devpath=(), new_nid=()):
""" """
Manage the bootstrap stage of a non-master node, it lookup for the Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node primary master node, connect to it then returns when the master node
...@@ -34,6 +34,7 @@ class BootstrapManager(EventHandler): ...@@ -34,6 +34,7 @@ class BootstrapManager(EventHandler):
""" """
self.server = server self.server = server
self.devpath = devpath self.devpath = devpath
self.new_nid = new_nid
self.node_type = node_type self.node_type = node_type
self.num_replicas = None self.num_replicas = None
self.num_partitions = None self.num_partitions = None
...@@ -44,7 +45,7 @@ class BootstrapManager(EventHandler): ...@@ -44,7 +45,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid, conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, self.devpath, None)) self.server, self.app.name, None, self.devpath, self.new_nid))
def connectionFailed(self, conn): def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn) EventHandler.connectionFailed(self, conn)
......
...@@ -685,8 +685,10 @@ class RequestIdentification(Packet): ...@@ -685,8 +685,10 @@ class RequestIdentification(Packet):
PUUID('uuid'), PUUID('uuid'),
PAddress('address'), PAddress('address'),
PString('name'), PString('name'),
PList('devpath', PString('devid')),
PFloat('id_timestamp'), PFloat('id_timestamp'),
# storage:
PList('devpath', PString('devid')),
PList('new_nid', PNumber('offset')),
) )
_answer = PStruct('accept_identification', _answer = PStruct('accept_identification',
......
...@@ -196,7 +196,7 @@ class Application(BaseApplication): ...@@ -196,7 +196,7 @@ class Application(BaseApplication):
node_dict[NodeTypes.MASTER].append(node_info) node_dict[NodeTypes.MASTER].append(node_info)
return node_dict return node_dict
def broadcastNodesInformation(self, node_list, exclude=None): def broadcastNodesInformation(self, node_list):
""" """
Broadcast changes for a set a nodes Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth Send only one packet per connection to reduce bandwidth
...@@ -209,7 +209,7 @@ class Application(BaseApplication): ...@@ -209,7 +209,7 @@ class Application(BaseApplication):
# We don't skip pending storage nodes because we don't send them # We don't skip pending storage nodes because we don't send them
# the full list of nodes when they're added, and it's also quite # the full list of nodes when they're added, and it's also quite
# useful to notify them about new masters. # useful to notify them about new masters.
if node_list and node is not exclude: if node_list:
node.send(Packets.NotifyNodeInformation(now, node_list)) node.send(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list): def broadcastPartitionChanges(self, cell_list):
......
...@@ -17,14 +17,14 @@ ...@@ -17,14 +17,14 @@
from neo.lib import logging from neo.lib import logging
from neo.lib.exception import PrimaryElected from neo.lib.exception import PrimaryElected
from neo.lib.handler import EventHandler from neo.lib.handler import EventHandler
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \ from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \
NotReadyError, Packets, ProtocolError, uuid_str NodeTypes, NotReadyError, Packets, ProtocolError, uuid_str
from ..app import monotonic_time from ..app import monotonic_time
class IdentificationHandler(EventHandler): class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid, def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp): address, name, id_timestamp, devpath, new_nid):
app = self.app app = self.app
self.checkClusterName(name) self.checkClusterName(name)
if address == app.server: if address == app.server:
...@@ -77,6 +77,16 @@ class IdentificationHandler(EventHandler): ...@@ -77,6 +77,16 @@ class IdentificationHandler(EventHandler):
manager = app manager = app
state, handler = manager.identifyStorageNode( state, handler = manager.identifyStorageNode(
uuid is not None and node is not None) uuid is not None and node is not None)
if not address:
if app.cluster_state == ClusterStates.RECOVERING:
raise NotReadyError
if uuid or not new_nid:
raise ProtocolError
state = NodeStates.DOWN
# We'll let the storage node close the connection. If we
# aborted it at the end of the method, BootstrapManager
# (which is used by storage nodes) could see the closure
# and try to reconnect to a master.
human_readable_node_type = ' storage (%s) ' % (state, ) human_readable_node_type = ' storage (%s) ' % (state, )
elif node_type == NodeTypes.MASTER: elif node_type == NodeTypes.MASTER:
if app.election: if app.election:
...@@ -105,9 +115,15 @@ class IdentificationHandler(EventHandler): ...@@ -105,9 +115,15 @@ class IdentificationHandler(EventHandler):
node.devpath = tuple(devpath) node.devpath = tuple(devpath)
node.id_timestamp = monotonic_time() node.id_timestamp = monotonic_time()
node.setState(state) node.setState(state)
app.broadcastNodesInformation([node])
if new_nid:
changed_list = []
for offset in new_nid:
changed_list.append((offset, uuid, CellStates.OUT_OF_DATE))
app.pt._setCell(offset, node, CellStates.OUT_OF_DATE)
app.broadcastPartitionChanges(changed_list)
conn.setHandler(handler) conn.setHandler(handler)
node.setConnection(conn, not node.isIdentified()) node.setConnection(conn, not node.isIdentified())
app.broadcastNodesInformation([node], node)
conn.answer(Packets.AcceptIdentification( conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER, NodeTypes.MASTER,
...@@ -118,11 +134,10 @@ class IdentificationHandler(EventHandler): ...@@ -118,11 +134,10 @@ class IdentificationHandler(EventHandler):
handler._notifyNodeInformation(conn) handler._notifyNodeInformation(conn)
handler.connectionCompleted(conn, True) handler.connectionCompleted(conn, True)
class SecondaryIdentificationHandler(EventHandler): class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid, def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp): address, name, id_timestamp, devpath, new_nid):
app = self.app app = self.app
self.checkClusterName(name) self.checkClusterName(name)
if address == app.server: if address == app.server:
......
...@@ -38,7 +38,7 @@ class ElectionHandler(MasterHandler): ...@@ -38,7 +38,7 @@ class ElectionHandler(MasterHandler):
super(ElectionHandler, self).connectionCompleted(conn) super(ElectionHandler, self).connectionCompleted(conn)
app = self.app app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER, conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, (), app.election)) app.uuid, app.server, app.name, app.election, (), ()))
def connectionFailed(self, conn): def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn) super(ElectionHandler, self).connectionFailed(conn)
......
...@@ -63,6 +63,11 @@ class Application(BaseApplication): ...@@ -63,6 +63,11 @@ class Application(BaseApplication):
help="do not delete data of discarded cells, which is useful for" help="do not delete data of discarded cells, which is useful for"
" big databases because the current implementation is" " big databases because the current implementation is"
" inefficient (this option should disappear in the future)") " inefficient (this option should disappear in the future)")
_.bool('new-nid',
help="request a new NID from a cluster that is already"
" operational, update the database with the new NID and exit,"
" which makes easier to quickly set up a replica by copying"
" the database of another node while it was stopped")
_ = parser.group('database creation') _ = parser.group('database creation')
_.int('i', 'nid', _.int('i', 'nid',
...@@ -118,10 +123,16 @@ class Application(BaseApplication): ...@@ -118,10 +123,16 @@ class Application(BaseApplication):
self.loadConfiguration() self.loadConfiguration()
self.devpath = self.dm.getTopologyPath() self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only if config.get('new_nid'):
if 'nid' in config: self.new_nid = [x[0] for x in self.dm.iterAssignedCells()]
self.uuid = config['nid'] if not self.new_nid:
logging.node(self.name, self.uuid) sys.exit('database is empty')
self.uuid = None
else:
self.new_nid = ()
if 'nid' in config: # for testing purpose only
self.uuid = config['nid']
logging.node(self.name, self.uuid)
registerLiveDebugger(on_log=self.log) registerLiveDebugger(on_log=self.log)
...@@ -250,8 +261,9 @@ class Application(BaseApplication): ...@@ -250,8 +261,9 @@ class Application(BaseApplication):
pt = self.pt pt = self.pt
# search, find, connect and identify to the primary master # search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server, bootstrap = BootstrapManager(self, NodeTypes.STORAGE,
self.devpath) None if self.new_nid else self.server,
self.devpath, self.new_nid)
self.master_node, self.master_conn, num_partitions, num_replicas = \ self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection() bootstrap.getPrimaryConnection()
self.dm.setUUID(self.uuid) self.dm.setUUID(self.uuid)
......
...@@ -51,7 +51,7 @@ class Checker(object): ...@@ -51,7 +51,7 @@ class Checker(object):
else: else:
conn = ClientConnection(app, StorageOperationHandler(app), node) conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE, conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
uuid, app.server, name, (), app.id_timestamp)) uuid, app.server, name, app.id_timestamp, (), ()))
self.conn_dict[conn] = node.isIdentified() self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict) conn_set = set(self.conn_dict)
conn_set.discard(None) conn_set.discard(None)
......
...@@ -379,7 +379,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -379,7 +379,7 @@ class ImporterDatabaseManager(DatabaseManager):
db = self.db = buildDatabaseManager(conf['adapter'], db = self.db = buildDatabaseManager(conf['adapter'],
(conf['database'], conf.get('engine'), conf['wait'])) (conf['database'], conf.get('engine'), conf['wait']))
for x in """getConfiguration _setConfiguration setNumPartitions for x in """getConfiguration _setConfiguration setNumPartitions
query erase getPartitionTable _iterAssignedCells query erase getPartitionTable iterAssignedCells
updateCellTID getUnfinishedTIDDict dropUnfinishedData updateCellTID getUnfinishedTIDDict dropUnfinishedData
abortTransaction storeTransaction lockTransaction abortTransaction storeTransaction lockTransaction
loadData storeData getOrphanList _pruneData deferCommit loadData storeData getOrphanList _pruneData deferCommit
......
...@@ -180,6 +180,10 @@ class DatabaseManager(object): ...@@ -180,6 +180,10 @@ class DatabaseManager(object):
def erase(self): def erase(self):
"""""" """"""
def restore(self, dump): # for tests
self.erase()
self._restore(dump)
def _setup(self, dedup=False): def _setup(self, dedup=False):
"""To be overridden by the backend to set up a database """To be overridden by the backend to set up a database
...@@ -532,7 +536,7 @@ class DatabaseManager(object): ...@@ -532,7 +536,7 @@ class DatabaseManager(object):
None if data_serial is None else util.p64(data_serial)) None if data_serial is None else util.p64(data_serial))
@requires(_getPartitionTable) @requires(_getPartitionTable)
def _iterAssignedCells(self): def iterAssignedCells(self):
my_nid = self.getUUID() my_nid = self.getUUID()
return ((offset, tid) for offset, nid, tid in self._getPartitionTable() return ((offset, tid) for offset, nid, tid in self._getPartitionTable()
if my_nid == nid) if my_nid == nid)
...@@ -578,13 +582,13 @@ class DatabaseManager(object): ...@@ -578,13 +582,13 @@ class DatabaseManager(object):
d.append(p << 48 if i is None else i + 1) d.append(p << 48 if i is None else i + 1)
else: else:
readable_set.clear() readable_set.clear()
readable_set.update(x[0] for x in self._iterAssignedCells() readable_set.update(x[0] for x in self.iterAssignedCells()
if -x[1] in READABLE) if -x[1] in READABLE)
@requires(_changePartitionTable, _getLastIDs, _getLastTID) @requires(_changePartitionTable, _getLastIDs, _getLastTID)
def changePartitionTable(self, ptid, cell_list, reset=False): def changePartitionTable(self, ptid, cell_list, reset=False):
my_nid = self.getUUID() my_nid = self.getUUID()
pt = dict(self._iterAssignedCells()) pt = dict(self.iterAssignedCells())
# In backup mode, the last transactions of a readable cell may be # In backup mode, the last transactions of a readable cell may be
# incomplete. # incomplete.
backup_tid = self.getBackupTID() backup_tid = self.getBackupTID()
...@@ -609,7 +613,7 @@ class DatabaseManager(object): ...@@ -609,7 +613,7 @@ class DatabaseManager(object):
@requires(_changePartitionTable) @requires(_changePartitionTable)
def updateCellTID(self, partition, tid): def updateCellTID(self, partition, tid):
t, = (t for p, t in self._iterAssignedCells() if p == partition) t, = (t for p, t in self.iterAssignedCells() if p == partition)
if t < 0: if t < 0:
return return
tid = util.u64(tid) tid = util.u64(tid)
...@@ -631,7 +635,7 @@ class DatabaseManager(object): ...@@ -631,7 +635,7 @@ class DatabaseManager(object):
next_tid = util.u64(backup_tid) next_tid = util.u64(backup_tid)
if next_tid: if next_tid:
next_tid += 1 next_tid += 1
for offset, tid in self._iterAssignedCells(): for offset, tid in self.iterAssignedCells():
if tid >= 0: # OUT_OF_DATE if tid >= 0: # OUT_OF_DATE
yield offset, p64(tid and tid + 1) yield offset, p64(tid and tid + 1)
elif -tid in READABLE: elif -tid in READABLE:
...@@ -873,7 +877,7 @@ class DatabaseManager(object): ...@@ -873,7 +877,7 @@ class DatabaseManager(object):
assert tid, tid assert tid, tid
cell_list = [] cell_list = []
my_nid = self.getUUID() my_nid = self.getUUID()
for partition, state in self._iterAssignedCells(): for partition, state in self.iterAssignedCells():
if state > tid: if state > tid:
cell_list.append((partition, my_nid, tid)) cell_list.append((partition, my_nid, tid))
self._deleteRange(partition, tid) self._deleteRange(partition, tid)
......
...@@ -965,7 +965,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -965,7 +965,7 @@ class MySQLDatabaseManager(DatabaseManager):
cmd += self._cmdline() cmd += self._cmdline()
return subprocess.check_output(cmd) return subprocess.check_output(cmd)
def restore(self, sql): def _restore(self, sql):
import subprocess import subprocess
cmd = ['mysql'] cmd = ['mysql']
cmd += self._cmdline() cmd += self._cmdline()
......
...@@ -712,5 +712,5 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -712,5 +712,5 @@ class SQLiteDatabaseManager(DatabaseManager):
main[-1:-1] = data main[-1:-1] = data
return '\n'.join(main) + '\n' return '\n'.join(main) + '\n'
def restore(self, sql): def _restore(self, sql):
self.conn.executescript(sql) self.conn.executescript(sql)
...@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler): ...@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler):
return self.app.nm return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name, def requestIdentification(self, conn, node_type, uuid, address, name,
devpath, id_timestamp): id_timestamp, devpath, new_nid):
self.checkClusterName(name) self.checkClusterName(name)
app = self.app app = self.app
# reject any incoming connections if not ready # reject any incoming connections if not ready
......
...@@ -350,7 +350,7 @@ class Replicator(object): ...@@ -350,7 +350,7 @@ class Replicator(object):
try: try:
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE, conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name, None if name else app.uuid, app.server, name or app.name,
(), app.id_timestamp)) app.id_timestamp, (), ()))
except ConnectionClosed: except ConnectionClosed:
if previous_node is self.current_node: if previous_node is self.current_node:
return return
......
...@@ -383,7 +383,10 @@ class ServerNode(Node): ...@@ -383,7 +383,10 @@ class ServerNode(Node):
assert not self.is_alive() assert not self.is_alive()
init_args = self._init_args init_args = self._init_args
init_args['reset'] = False init_args['reset'] = False
assert set(kw).issubset(init_args), (kw, init_args) if __debug__:
x = set(kw).difference(init_args)
assert not x or x.issubset(self.option_parser.getOptionDict()), (
kw, init_args)
init_args.update(kw) init_args.update(kw)
self.close() self.close()
self.__init__(**init_args) self.__init__(**init_args)
...@@ -852,9 +855,13 @@ class NEOCluster(object): ...@@ -852,9 +855,13 @@ class NEOCluster(object):
expected_state = (NodeStates.PENDING expected_state = (NodeStates.PENDING
if state == ClusterStates.RECOVERING if state == ClusterStates.RECOVERING
else NodeStates.RUNNING) else NodeStates.RUNNING)
for node in self.storage_list if storage_list is None else storage_list: for node, expected_state in (
storage_list if isinstance(storage_list, dict) else
dict.fromkeys(self.storage_list if storage_list is None else
storage_list, expected_state)
).iteritems():
state = self.getNodeState(node) state = self.getNodeState(node)
assert state == expected_state, (repr(node), state) assert state == expected_state, (repr(node), state, expected_state)
def stop(self, clear_database=False, __print_exc=traceback.print_exc, **kw): def stop(self, clear_database=False, __print_exc=traceback.print_exc, **kw):
if self.started: if self.started:
...@@ -1007,18 +1014,18 @@ class NEOCluster(object): ...@@ -1007,18 +1014,18 @@ class NEOCluster(object):
"""Sort storages so that storage_list[i] has partition i for all i""" """Sort storages so that storage_list[i] has partition i for all i"""
pt = [{x.getUUID() for x in x} pt = [{x.getUUID() for x in x}
for x in self.primary_master.pt.partition_list] for x in self.primary_master.pt.partition_list]
n = len(self.storage_list)
r = [] r = []
x = [iter(pt[0])] x = [iter(pt[0])]
try: while 1:
while 1: try:
try: r.append(next(x[-1]))
r.append(next(x[-1])) except StopIteration:
except StopIteration: del r[-1], x[-1]
del r[-1], x[-1] else:
else: if len(r) == n:
x.append(iter(pt[len(r)].difference(r))) break
except IndexError: x.append(iter(pt[len(r)].difference(r)))
assert len(r) == len(self.storage_list)
x = {x.uuid: x for x in self.storage_list} x = {x.uuid: x for x in self.storage_list}
self.storage_list[:] = (x[r] for r in r) self.storage_list[:] = (x[r] for r in r)
return self.storage_list return self.storage_list
......
...@@ -2815,7 +2815,6 @@ class Test(NEOThreadedTest): ...@@ -2815,7 +2815,6 @@ class Test(NEOThreadedTest):
dm = s.dm dm = s.dm
dm.commit() dm.commit()
dump_dict[s.uuid] = dm.dump() dump_dict[s.uuid] = dm.dump()
dm.erase()
with open(path % (s.getAdapter(), s.uuid)) as f: with open(path % (s.getAdapter(), s.uuid)) as f:
dm.restore(f.read()) dm.restore(f.read())
with NEOCluster(storage_count=3, partitions=3, replicas=1, with NEOCluster(storage_count=3, partitions=3, replicas=1,
......
...@@ -29,7 +29,7 @@ from neo.storage.database.manager import DatabaseManager ...@@ -29,7 +29,7 @@ from neo.storage.database.manager import DatabaseManager
from neo.storage import replicator from neo.storage import replicator
from neo.lib.connector import SocketConnector from neo.lib.connector import SocketConnector
from neo.lib.connection import ClientConnection from neo.lib.connection import ClientConnection
from neo.lib.protocol import CellStates, ClusterStates, Packets, \ from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import add64, p64, u64 from neo.lib.util import add64, p64, u64
from .. import Patch, TransactionalResource from .. import Patch, TransactionalResource
...@@ -928,6 +928,39 @@ class ReplicationTests(NEOThreadedTest): ...@@ -928,6 +928,39 @@ class ReplicationTests(NEOThreadedTest):
def testReplicationBlockedByUnfinished2(self): def testReplicationBlockedByUnfinished2(self):
self.testReplicationBlockedByUnfinished1(True) self.testReplicationBlockedByUnfinished1(True)
@with_cluster(partitions=6, storage_count=4, start_cluster=0)
def testCloneStorage(self, cluster):
"""
Test cloning of storage nodes using --new-nid instead NEO replication.
"""
s01 = cluster.storage_list[:2]
s23 = cluster.storage_list[2:]
cluster.start(storage_list=s01)
cluster.importZODB()(6)
self.tic()
with Patch(cluster, storage_list=s01):
cluster.sortStorageList()
cluster.stop(replicas=1)
cluster.storage_list[:2] = s01
storage_dict = {}
for s, d in zip(s01, s23):
d.dm.restore(s.dm.dump())
d.resetNode(new_nid=True)
storage_dict[s] = NodeStates.RUNNING
storage_dict[d] = NodeStates.DOWN
cluster.start(storage_dict)
cluster.join(s23)
for d in s23:
d.resetNode(new_nid=False)
d.start()
self.tic()
self.checkReplicas(cluster)
expected = '|'.join(['U.U.|.U.U'] * 3)
self.assertPartitionTable(cluster, expected)
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertPartitionTable(cluster, expected)
@with_cluster(partitions=5, replicas=2, storage_count=3) @with_cluster(partitions=5, replicas=2, storage_count=3)
def testCheckReplicas(self, cluster): def testCheckReplicas(self, cluster):
from neo.storage import checker from neo.storage import checker
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment