#
# Copyright (C) 2006-2019 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""
Replication algorithm
Purpose: replicate the content of a reference node into a replicating node,
bringing it up-to-date. This happens in the following cases:
- A new storage is added to en existing cluster.
- A node was separated from cluster and rejoins it.
- In a backup cluster, the master notifies a node that new data exists upstream
(note that in this case, the cell is always marked as UP_TO_DATE).
Replication happens per partition. Reference node can change between
partitions.
2 parts, done sequentially:
- Transaction (metadata) replication
- Object (metadata+data) replication
Both parts follow the same mechanism:
- The range of data to replicate is split into chunks of FETCH_COUNT items
(transaction or object).
- For every chunk, the requesting node sends to seeding node the list of items
it already has.
- Before answering, the seeding node sends 1 packet for every missing item.
For items that are already on the replicating node, there is no check that
values matches.
- The seeding node finally answers with the list of items to delete (usually
empty).
Internal replication, which is similar to RAID1 (and as opposed to asynchronous
replication to a backup cluster) requires extra care with respect to
transactions. The transition of a cell from OUT_OF_DATE to UP_TO_DATE is done
is several steps.
A replicating node can not depend on other nodes to fetch the data
recently/being committed because that can not be done atomically: it could miss
writes between the processing of its request by a source node and the reception
of the answer.
Therefore, outdated cells are writable: a storage node asks the master for
transactions being committed and then it is expected to fully receive from the
client any transaction that is started after this answer.
Which has in turn other consequences:
- The client must not fail to write to a storage node after the above request
to the master: for this, the storage must have announced it is ready, and it
must delay identification of unknown clients (those for which it hasn't
received yet a notification from the master).
- Writes must be accepted blindly (i.e. without taking a write-lock) when a
storage node lacks the data to check for conflicts. This is possible because
1 up-to-date cell (for each partition) is enough to do these checks.
- Because the client can not reliably know if a storage node is expected to
receive a transaction in full, all writes must succeed.
- Even if the replication is finished, we have to wait that we don't have any
lockless writes left before announcing to the master that we're up-to-date.
To sum up:
1. ask unfinished transactions -> (last_transaction, ttid_list)
2. replicate to last_transaction
3. wait for all ttid_list to be finished -> new last_transaction
4. replicate to last_transaction
5. no lockless write anymore, except to (oid, ttid) that were already
stored/checked without taking a lock
6. wait for all transactions with lockless writes to be finished
7. announce we're up-to-date
For any failed write, the client marks the storage node as failed and stops
writing to it for the transaction. Unless there's no failed write, vote ends
with an extra request to the master: the transaction will only succeed if the
failed nodes can be disconnected, forcing them to replicate the missing data.
TODO: Packing and replication currently fail when they happen at the same time.
"""
import random
from neo.lib import logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \
Packets, INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.util import add64, dump, p64
from .handlers.storage import StorageOperationHandler
FETCH_COUNT = 1000
class Partition(object):
__slots__ = 'next_trans', 'next_obj', 'max_ttid'
def __repr__(self):
return '<%s(%s) at 0x%x>' % (self.__class__.__name__,
', '.join('%s=%r' % (x, getattr(self, x)) for x in self.__slots__
if hasattr(self, x)),
id(self))
class Replicator(object):
# When the replication of a partition is aborted, the connection to the
# feeding node may still be open, e.g. on PT update from the master. In
# such case, replication is also aborted on the other side but there may
# be a few incoming packets that must be discarded.
_conn_msg_id = None
current_node = None
current_partition = None
def __init__(self, app):
self.app = app
def getCurrentConnection(self):
node = self.current_node
if node is not None and node.isConnected(True):
return node.getConnection()
def isReplicatingConnection(self, conn):
return conn is self.getCurrentConnection() and \
conn.getPeerId() == self._conn_msg_id
def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
"""This is a callback from MasterOperationHandler."""
assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list)
if ttid_list:
self.ttid_set.update(ttid_list)
max_ttid = max(ttid_list)
else:
max_ttid = None
for offset in offset_list:
self.partition_dict[offset].max_ttid = max_ttid
self.replicate_dict[offset] = max_tid
self._nextPartition()
def transactionFinished(self, ttid, max_tid=None):
""" Callback from MasterOperationHandler """
try:
self.ttid_set.remove(ttid)
except KeyError:
assert max_tid is None, max_tid
return
min_ttid = min(self.ttid_set) if self.ttid_set else INVALID_TID
for offset, p in self.partition_dict.iteritems():
if p.max_ttid:
if max_tid:
# Filling replicate_dict while there are still unfinished
# transactions for this partition is not the most
# efficient (due to the overhead of potentially replicating
# the last transactions in several times), but that's a
# simple way to make sure it is filled even if the
# remaining unfinished transactions are aborted.
self.replicate_dict[offset] = max_tid
if p.max_ttid < min_ttid:
# no more unfinished transaction for this partition
if not (offset == self.current_partition
or offset in self.replicate_dict):
logging.debug(
"All unfinished transactions have been aborted."
" Mark partition %u as already fully replicated",
offset)
# We don't have anymore the previous value of
# self.replicate_dict[offset], but p.max_ttid is not
# wrong. Anyway here, we're not in backup mode and this
# value will be ignored.
# XXX: see NonReadableCell.__doc__
self.app.tm.replicated(offset, p.max_ttid)
p.max_ttid = None
self._nextPartition()
def getBackupTID(self):
outdated_set = set(self.app.pt.getOutdatedOffsetListFor(self.app.uuid))
tid = INVALID_TID
for offset, p in self.partition_dict.iteritems():
if offset not in outdated_set:
tid = min(tid, p.next_trans, p.next_obj)
if ZERO_TID != tid != INVALID_TID:
return add64(tid, -1)
return ZERO_TID
def updateBackupTID(self, commit=False):
dm = self.app.dm
tid = dm.getBackupTID()
if tid:
new_tid = self.getBackupTID()
if tid != new_tid:
dm._setBackupTID(new_tid)
if commit:
dm.commit()
def startOperation(self, backup):
dm = self.app.dm
if backup:
if dm.getBackupTID():
assert not hasattr(self, 'partition_dict'), self.partition_dict
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm._setBackupTID(tid)
dm.commit()
try:
partition_dict = self.partition_dict
except AttributeError:
return
for offset, next_tid in dm.iterCellNextTIDs():
if type(next_tid) is not bytes: # readable
p = partition_dict[offset]
p.next_trans, p.next_obj = next_tid
def populate(self):
self.partition_dict = {}
self.replicate_dict = {}
self.source_dict = {}
self.ttid_set = set()
outdated_list = []
for offset, next_tid in self.app.dm.iterCellNextTIDs():
self.partition_dict[offset] = p = Partition()
if type(next_tid) is bytes: # OUT_OF_DATE
outdated_list.append(offset)
p.next_trans = p.next_obj = next_tid
p.max_ttid = INVALID_TID
else: # readable
p.next_trans, p.next_obj = next_tid or (None, None)
p.max_ttid = None
if outdated_list:
self.app.tm.replicating(outdated_list)
def notifyPartitionChanges(self, cell_list):
"""This is a callback from MasterOperationHandler."""
abort = False
added_list = []
discarded_list = []
readable_list = []
app = self.app
for offset, uuid, state in cell_list:
if uuid == app.uuid:
if state in (CellStates.DISCARDED, CellStates.CORRUPTED):
try:
del self.partition_dict[offset]
except KeyError:
continue
self.replicate_dict.pop(offset, None)
self.source_dict.pop(offset, None)
abort = abort or self.current_partition == offset
discarded_list.append(offset)
elif state == CellStates.OUT_OF_DATE:
assert offset not in self.partition_dict
self.partition_dict[offset] = p = Partition()
# New cell. 0 is also what should be stored by the backend.
# Nothing to optimize.
p.next_trans = p.next_obj = ZERO_TID
p.max_ttid = INVALID_TID
added_list.append(offset)
else:
assert state in (CellStates.UP_TO_DATE,
CellStates.FEEDING), state
readable_list.append(offset)
tm = app.tm
if added_list:
tm.replicating(added_list)
if discarded_list:
tm.discarded(discarded_list)
if readable_list:
tm.readable(readable_list)
if abort:
self.abort()
def backup(self, tid, source_dict):
next_tid = None
for offset, source in source_dict.iteritems():
if source:
self.source_dict[offset] = source
self.replicate_dict[offset] = tid
elif offset != self.current_partition and \
offset not in self.replicate_dict:
# The master did its best to avoid useless replication orders
# but there may still be a few, and we may receive redundant
# update notification of backup_tid.
# So, we do nothing here if we are already replicating.
p = self.partition_dict[offset]
if not next_tid:
next_tid = add64(tid, 1)
p.next_trans = p.next_obj = next_tid
if next_tid:
self.updateBackupTID(True)
self._nextPartition()
def _nextPartitionSortKey(self, offset):
p = self.partition_dict[offset]
return p.next_obj, bool(p.max_ttid)
def _nextPartition(self):
# XXX: One connection to another storage may remain open forever.
# All other previous connections are automatically closed
# after some time of inactivity.
# This should be improved in several ways:
# - Keeping connections open between 2 clusters (backup case) is
# quite a good thing because establishing a connection costs
# time/bandwidth and replication is actually never finished.
# - When all storages of a non-backup cluster are up-to-date,
# there's no reason to keep any connection open.
if self.current_partition is not None or not self.replicate_dict:
return
app = self.app
assert app.master_conn and app.operational, (
app.master_conn, app.operational)
# Start replicating the partition which is furthest behind,
# to increase the overall backup_tid as soon as possible.
# Then prefer a partition with no unfinished transaction.
# XXX: When leaving backup mode, we should only consider UP_TO_DATE
# cells.
offset = min(self.replicate_dict, key=self._nextPartitionSortKey)
try:
addr, name = self.source_dict[offset]
except KeyError:
assert app.pt.getCell(offset, app.uuid).isOutOfDate(), (
offset, app.pt.getCell(offset, app.uuid).getState())
node = random.choice([cell.getNode()
for cell in app.pt.getCellList(offset, readable=True)
if cell.getNodeState() == NodeStates.RUNNING])
name = None
else:
node = app.nm.getByAddress(addr)
if node is None:
assert name, addr
node = app.nm.createStorage(address=addr)
self.current_partition = offset
previous_node = self.current_node
self.current_node = node
if node.isConnected(connecting=True):
if node.isIdentified():
node.getConnection().asClient()
self.fetchTransactions()
else:
assert name or node.getUUID() != app.uuid, "loopback connection"
conn = ClientConnection(app, StorageOperationHandler(app), node)
try:
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name,
app.id_timestamp, (), ()))
except ConnectionClosed:
if previous_node is self.current_node:
return
if previous_node is not None and previous_node.isConnected():
app.closeClient(previous_node.getConnection())
def connected(self, node):
if self.current_node is node and self.current_partition is not None:
self.fetchTransactions()
def fetchTransactions(self, min_tid=None):
assert self.current_node.getConnection().isClient(), self.current_node
offset = self.current_partition
p = self.partition_dict[offset]
if min_tid:
# More than one chunk ? This could be a full replication so avoid
# restarting from the beginning by committing now.
self.app.dm.commit()
p.next_trans = min_tid
else:
try:
addr, name = self.source_dict[offset]
except KeyError:
pass
else:
if addr != self.current_node.getAddress():
return self.abort()
min_tid = p.next_trans
self.replicate_tid = self.replicate_dict.pop(offset)
logging.debug("starting replication of from %r", offset, dump(min_tid),
dump(self.replicate_tid), self.current_node)
max_tid = self.replicate_tid
tid_list = self.app.dm.getReplicationTIDList(min_tid, max_tid,
FETCH_COUNT, offset)
self._conn_msg_id = self.current_node.ask(Packets.AskFetchTransactions(
offset, FETCH_COUNT, min_tid, max_tid, tid_list))
def fetchObjects(self, min_tid=None, min_oid=ZERO_OID):
offset = self.current_partition
p = self.partition_dict[offset]
max_tid = self.replicate_tid
dm = self.app.dm
if min_tid:
p.next_obj = min_tid
self.updateBackupTID()
dm.updateCellTID(offset, add64(min_tid, -1))
dm.commit() # like in fetchTransactions
else:
min_tid = p.next_obj
p.next_trans = add64(max_tid, 1)
object_dict = {}
for serial, oid in dm.getReplicationObjectList(min_tid,
max_tid, FETCH_COUNT, offset, min_oid):
try:
object_dict[serial].append(oid)
except KeyError:
object_dict[serial] = [oid]
self._conn_msg_id = self.current_node.ask(Packets.AskFetchObjects(
offset, FETCH_COUNT, min_tid, max_tid, min_oid, object_dict))
def finish(self):
offset = self.current_partition
tid = self.replicate_tid
del self.current_partition, self._conn_msg_id, self.replicate_tid
p = self.partition_dict[offset]
p.next_obj = add64(tid, 1)
self.updateBackupTID()
app = self.app
app.dm.updateCellTID(offset, tid)
app.dm.commit()
if p.max_ttid or offset in self.replicate_dict and \
offset not in self.source_dict:
logging.debug("unfinished transactions: %r", self.ttid_set)
else:
app.tm.replicated(offset, tid)
logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node)
self.getCurrentConnection().setReconnectionNoDelay()
self._nextPartition()
def abort(self, message=''):
offset = self.current_partition
if offset is None:
return
del self.current_partition
self._conn_msg_id = None
logging.warning('replication aborted for partition %u%s',
offset, message and ' (%s)' % message)
if offset in self.partition_dict:
# XXX: Try another partition if possible, to increase probability to
# connect to another node. It would be better to explicitly
# search for another node instead.
tid = self.replicate_dict.pop(offset, None) or self.replicate_tid
if self.replicate_dict:
self._nextPartition()
self.replicate_dict[offset] = tid
else:
self.replicate_dict[offset] = tid
self._nextPartition()
else: # partition removed
self._nextPartition()
def stop(self):
# Close any open connection to an upstream storage,
# possibly aborting current replication.
node = self.current_node
if node is not None is node.getUUID():
offset = self.current_partition
if offset is not None:
logging.info('cancel replication of partition %u', offset)
del self.current_partition
if self._conn_msg_id is not None:
self.replicate_dict.setdefault(offset, self.replicate_tid)
del self._conn_msg_id, self.replicate_tid
self.getCurrentConnection().close()
# Cancel all replication orders from upstream cluster.
for offset in self.replicate_dict.keys():
addr, name = self.source_dict.get(offset, (None, None))
if name:
tid = self.replicate_dict.pop(offset)
logging.info('cancel replication of partition %u from %r'
' up to %s', offset, addr, dump(tid))
# Make UP_TO_DATE cells really UP_TO_DATE
self._nextPartition()