Commit 27e3f620 by Julien Muchembled

New --new-nid storage option for fast cloning

It is often faster to set up replicas by stopping a node (and any
underlying database server like MariaDB) and do a raw copy of the
database (e.g. with rsync). So far, it required to stop the whole
cluster and use tools like 'mysql' or sqlite3' to edit:
- the 'pt' table in databases,
- the 'config.nid' values of the new nodes.

With this new option, if you already have 1 replica, you can set up
new replicas with such fast raw copy, and without interruption of
service. Obviously, this implies less redundancy during the operation.
1 parent 64e02391
......@@ -226,8 +226,8 @@ class Application(ThreadedApplication):
self.notifications_handler,
node=node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name, (), None)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, None, (), ())
try:
ask(conn, p, handler=handler)
except ConnectionClosed:
......@@ -270,7 +270,7 @@ class Application(ThreadedApplication):
conn = MTClientConnection(self, self.storage_event_handler, node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
self.uuid, None, self.name, (), self.id_timestamp)
self.uuid, None, self.name, self.id_timestamp, (), ())
try:
self._ask(conn, p, handler=self.storage_bootstrap_handler)
except ConnectionClosed:
......
......@@ -26,7 +26,7 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, node_type, server=None, devpath=()):
def __init__(self, app, node_type, server=None, devpath=(), new_nid=()):
"""
Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node
......@@ -34,6 +34,7 @@ class BootstrapManager(EventHandler):
"""
self.server = server
self.devpath = devpath
self.new_nid = new_nid
self.node_type = node_type
self.num_replicas = None
self.num_partitions = None
......@@ -44,7 +45,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, self.devpath, None))
self.server, self.app.name, None, self.devpath, self.new_nid))
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
......
......@@ -685,8 +685,10 @@ class RequestIdentification(Packet):
PUUID('uuid'),
PAddress('address'),
PString('name'),
PList('devpath', PString('devid')),
PFloat('id_timestamp'),
# storage:
PList('devpath', PString('devid')),
PList('new_nid', PNumber('offset')),
)
_answer = PStruct('accept_identification',
......
......@@ -196,7 +196,7 @@ class Application(BaseApplication):
node_dict[NodeTypes.MASTER].append(node_info)
return node_dict
def broadcastNodesInformation(self, node_list, exclude=None):
def broadcastNodesInformation(self, node_list):
"""
Broadcast changes for a set a nodes
Send only one packet per connection to reduce bandwidth
......@@ -209,7 +209,7 @@ class Application(BaseApplication):
# We don't skip pending storage nodes because we don't send them
# the full list of nodes when they're added, and it's also quite
# useful to notify them about new masters.
if node_list and node is not exclude:
if node_list:
node.send(Packets.NotifyNodeInformation(now, node_list))
def broadcastPartitionChanges(self, cell_list):
......
......@@ -17,14 +17,14 @@
from neo.lib import logging
from neo.lib.exception import PrimaryElected
from neo.lib.handler import EventHandler
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, \
NotReadyError, Packets, ProtocolError, uuid_str
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, \
NodeTypes, NotReadyError, Packets, ProtocolError, uuid_str
from ..app import monotonic_time
class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp):
address, name, id_timestamp, devpath, new_nid):
app = self.app
self.checkClusterName(name)
if address == app.server:
......@@ -77,6 +77,16 @@ class IdentificationHandler(EventHandler):
manager = app
state, handler = manager.identifyStorageNode(
uuid is not None and node is not None)
if not address:
if app.cluster_state == ClusterStates.RECOVERING:
raise NotReadyError
if uuid or not new_nid:
raise ProtocolError
state = NodeStates.DOWN
# We'll let the storage node close the connection. If we
# aborted it at the end of the method, BootstrapManager
# (which is used by storage nodes) could see the closure
# and try to reconnect to a master.
human_readable_node_type = ' storage (%s) ' % (state, )
elif node_type == NodeTypes.MASTER:
if app.election:
......@@ -105,9 +115,15 @@ class IdentificationHandler(EventHandler):
node.devpath = tuple(devpath)
node.id_timestamp = monotonic_time()
node.setState(state)
app.broadcastNodesInformation([node])
if new_nid:
changed_list = []
for offset in new_nid:
changed_list.append((offset, uuid, CellStates.OUT_OF_DATE))
app.pt._setCell(offset, node, CellStates.OUT_OF_DATE)
app.broadcastPartitionChanges(changed_list)
conn.setHandler(handler)
node.setConnection(conn, not node.isIdentified())
app.broadcastNodesInformation([node], node)
conn.answer(Packets.AcceptIdentification(
NodeTypes.MASTER,
......@@ -118,11 +134,10 @@ class IdentificationHandler(EventHandler):
handler._notifyNodeInformation(conn)
handler.connectionCompleted(conn, True)
class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, devpath, id_timestamp):
address, name, id_timestamp, devpath, new_nid):
app = self.app
self.checkClusterName(name)
if address == app.server:
......
......@@ -38,7 +38,7 @@ class ElectionHandler(MasterHandler):
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, (), app.election))
app.uuid, app.server, app.name, app.election, (), ()))
def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn)
......
......@@ -63,6 +63,11 @@ class Application(BaseApplication):
help="do not delete data of discarded cells, which is useful for"
" big databases because the current implementation is"
" inefficient (this option should disappear in the future)")
_.bool('new-nid',
help="request a new NID from a cluster that is already"
" operational, update the database with the new NID and exit,"
" which makes easier to quickly set up a replica by copying"
" the database of another node while it was stopped")
_ = parser.group('database creation')
_.int('i', 'nid',
......@@ -118,10 +123,16 @@ class Application(BaseApplication):
self.loadConfiguration()
self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only
if 'nid' in config:
self.uuid = config['nid']
logging.node(self.name, self.uuid)
if config.get('new_nid'):
self.new_nid = [x[0] for x in self.dm.iterAssignedCells()]
if not self.new_nid:
sys.exit('database is empty')
self.uuid = None
else:
self.new_nid = ()
if 'nid' in config: # for testing purpose only
self.uuid = config['nid']
logging.node(self.name, self.uuid)
registerLiveDebugger(on_log=self.log)
......@@ -250,8 +261,9 @@ class Application(BaseApplication):
pt = self.pt
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server,
self.devpath)
bootstrap = BootstrapManager(self, NodeTypes.STORAGE,
None if self.new_nid else self.server,
self.devpath, self.new_nid)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
self.dm.setUUID(self.uuid)
......
......@@ -51,7 +51,7 @@ class Checker(object):
else:
conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
uuid, app.server, name, (), app.id_timestamp))
uuid, app.server, name, app.id_timestamp, (), ()))
self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict)
conn_set.discard(None)
......
......@@ -379,7 +379,7 @@ class ImporterDatabaseManager(DatabaseManager):
db = self.db = buildDatabaseManager(conf['adapter'],
(conf['database'], conf.get('engine'), conf['wait']))
for x in """getConfiguration _setConfiguration setNumPartitions
query erase getPartitionTable _iterAssignedCells
query erase getPartitionTable iterAssignedCells
updateCellTID getUnfinishedTIDDict dropUnfinishedData
abortTransaction storeTransaction lockTransaction
loadData storeData getOrphanList _pruneData deferCommit
......
......@@ -180,6 +180,10 @@ class DatabaseManager(object):
def erase(self):
""""""
def restore(self, dump): # for tests
self.erase()
self._restore(dump)
def _setup(self, dedup=False):
"""To be overridden by the backend to set up a database
......@@ -532,7 +536,7 @@ class DatabaseManager(object):
None if data_serial is None else util.p64(data_serial))
@requires(_getPartitionTable)
def _iterAssignedCells(self):
def iterAssignedCells(self):
my_nid = self.getUUID()
return ((offset, tid) for offset, nid, tid in self._getPartitionTable()
if my_nid == nid)
......@@ -578,13 +582,13 @@ class DatabaseManager(object):
d.append(p << 48 if i is None else i + 1)
else:
readable_set.clear()
readable_set.update(x[0] for x in self._iterAssignedCells()
readable_set.update(x[0] for x in self.iterAssignedCells()
if -x[1] in READABLE)
@requires(_changePartitionTable, _getLastIDs, _getLastTID)
def changePartitionTable(self, ptid, cell_list, reset=False):
my_nid = self.getUUID()
pt = dict(self._iterAssignedCells())
pt = dict(self.iterAssignedCells())
# In backup mode, the last transactions of a readable cell may be
# incomplete.
backup_tid = self.getBackupTID()
......@@ -609,7 +613,7 @@ class DatabaseManager(object):
@requires(_changePartitionTable)
def updateCellTID(self, partition, tid):
t, = (t for p, t in self._iterAssignedCells() if p == partition)
t, = (t for p, t in self.iterAssignedCells() if p == partition)
if t < 0:
return
tid = util.u64(tid)
......@@ -631,7 +635,7 @@ class DatabaseManager(object):
next_tid = util.u64(backup_tid)
if next_tid:
next_tid += 1
for offset, tid in self._iterAssignedCells():
for offset, tid in self.iterAssignedCells():
if tid >= 0: # OUT_OF_DATE
yield offset, p64(tid and tid + 1)
elif -tid in READABLE:
......@@ -873,7 +877,7 @@ class DatabaseManager(object):
assert tid, tid
cell_list = []
my_nid = self.getUUID()
for partition, state in self._iterAssignedCells():
for partition, state in self.iterAssignedCells():
if state > tid:
cell_list.append((partition, my_nid, tid))
self._deleteRange(partition, tid)
......
......@@ -979,7 +979,7 @@ class MySQLDatabaseManager(DatabaseManager):
cmd += self._cmdline()
return subprocess.check_output(cmd)
def restore(self, sql):
def _restore(self, sql):
import subprocess
cmd = ['mysql']
cmd += self._cmdline()
......
......@@ -713,5 +713,5 @@ class SQLiteDatabaseManager(DatabaseManager):
main[-1:-1] = data
return '\n'.join(main) + '\n'
def restore(self, sql):
def _restore(self, sql):
self.conn.executescript(sql)
......@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler):
return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name,
devpath, id_timestamp):
id_timestamp, devpath, new_nid):
self.checkClusterName(name)
app = self.app
# reject any incoming connections if not ready
......
......@@ -350,7 +350,7 @@ class Replicator(object):
try:
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name,
(), app.id_timestamp))
app.id_timestamp, (), ()))
except ConnectionClosed:
if previous_node is self.current_node:
return
......
......@@ -383,7 +383,10 @@ class ServerNode(Node):
assert not self.is_alive()
init_args = self._init_args
init_args['reset'] = False
assert set(kw).issubset(init_args), (kw, init_args)
if __debug__:
x = set(kw).difference(init_args)
assert not x or x.issubset(self.option_parser.getOptionDict()), (
kw, init_args)
init_args.update(kw)
self.close()
self.__init__(**init_args)
......@@ -851,9 +854,13 @@ class NEOCluster(object):
expected_state = (NodeStates.PENDING
if state == ClusterStates.RECOVERING
else NodeStates.RUNNING)
for node in self.storage_list if storage_list is None else storage_list:
for node, expected_state in (
storage_list if isinstance(storage_list, dict) else
dict.fromkeys(self.storage_list if storage_list is None else
storage_list, expected_state)
).iteritems():
state = self.getNodeState(node)
assert state == expected_state, (repr(node), state)
assert state == expected_state, (repr(node), state, expected_state)
def stop(self, clear_database=False, __print_exc=traceback.print_exc, **kw):
if self.started:
......@@ -1006,18 +1013,18 @@ class NEOCluster(object):
"""Sort storages so that storage_list[i] has partition i for all i"""
pt = [{x.getUUID() for x in x}
for x in self.primary_master.pt.partition_list]
n = len(self.storage_list)
r = []
x = [iter(pt[0])]
try:
while 1:
try:
r.append(next(x[-1]))
except StopIteration:
del r[-1], x[-1]
else:
x.append(iter(pt[len(r)].difference(r)))
except IndexError:
assert len(r) == len(self.storage_list)
while 1:
try:
r.append(next(x[-1]))
except StopIteration:
del r[-1], x[-1]
else:
if len(r) == n:
break
x.append(iter(pt[len(r)].difference(r)))
x = {x.uuid: x for x in self.storage_list}
self.storage_list[:] = (x[r] for r in r)
return self.storage_list
......
......@@ -2823,7 +2823,6 @@ class Test(NEOThreadedTest):
dm = s.dm
dm.commit()
dump_dict[s.uuid] = dm.dump()
dm.erase()
with open(path % (s.getAdapter(), s.uuid)) as f:
dm.restore(f.read())
with NEOCluster(storage_count=3, partitions=3, replicas=1,
......
......@@ -29,7 +29,7 @@ from neo.storage.database.manager import DatabaseManager
from neo.storage import replicator
from neo.lib.connector import SocketConnector
from neo.lib.connection import ClientConnection
from neo.lib.protocol import CellStates, ClusterStates, Packets, \
from neo.lib.protocol import CellStates, ClusterStates, NodeStates, Packets, \
ZERO_OID, ZERO_TID, MAX_TID, uuid_str
from neo.lib.util import add64, p64, u64
from .. import Patch, TransactionalResource
......@@ -928,6 +928,39 @@ class ReplicationTests(NEOThreadedTest):
def testReplicationBlockedByUnfinished2(self):
self.testReplicationBlockedByUnfinished1(True)
@with_cluster(partitions=6, storage_count=4, start_cluster=0)
def testCloneStorage(self, cluster):
"""
Test cloning of storage nodes using --new-nid instead NEO replication.
"""
s01 = cluster.storage_list[:2]
s23 = cluster.storage_list[2:]
cluster.start(storage_list=s01)
cluster.importZODB()(6)
self.tic()
with Patch(cluster, storage_list=s01):
cluster.sortStorageList()
cluster.stop(replicas=1)
cluster.storage_list[:2] = s01
storage_dict = {}
for s, d in zip(s01, s23):
d.dm.restore(s.dm.dump())
d.resetNode(new_nid=True)
storage_dict[s] = NodeStates.RUNNING
storage_dict[d] = NodeStates.DOWN
cluster.start(storage_dict)
cluster.join(s23)
for d in s23:
d.resetNode(new_nid=False)
d.start()
self.tic()
self.checkReplicas(cluster)
expected = '|'.join(['U.U.|.U.U'] * 3)
self.assertPartitionTable(cluster, expected)
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertPartitionTable(cluster, expected)
@with_cluster(partitions=5, replicas=2, storage_count=3)
def testCheckReplicas(self, cluster):
from neo.storage import checker
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!