replicator.py 19.9 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
#
# Copyright (C) 2006-2017  Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
Replication algorithm

Purpose: replicate the content of a reference node into a replicating node,
bringing it up-to-date. This happens in the following cases:
- A new storage is added to en existing cluster.
- A node was separated from cluster and rejoins it.
- In a backup cluster, the master notifies a node that new data exists upstream
  (note that in this case, the cell is always marked as UP_TO_DATE).

Replication happens per partition. Reference node can change between
partitions.

2 parts, done sequentially:
- Transaction (metadata) replication
- Object (metadata+data) replication

Both parts follow the same mechanism:
- The range of data to replicate is split into chunks of FETCH_COUNT items
  (transaction or object).
- For every chunk, the requesting node sends to seeding node the list of items
  it already has.
- Before answering, the seeding node sends 1 packet for every missing item.
  For items that are already on the replicating node, there is no check that
  values matches.
- The seeding node finally answers with the list of items to delete (usually
  empty).

Internal replication, which is similar to RAID1 (and as opposed to asynchronous
replication to a backup cluster) requires extra care with respect to
transactions. The transition of a cell from OUT_OF_DATE to UP_TO_DATE is done
is several steps.

A replicating node can not depend on other nodes to fetch the data
recently/being committed because that can not be done atomically: it could miss
writes between the processing of its request by a source node and the reception
of the answer.

Therefore, outdated cells are writable: a storage node asks the master for
transactions being committed and then it is expected to fully receive from the
client any transaction that is started after this answer.

Which has in turn other consequences:
- The client must not fail to write to a storage node after the above request
  to the master: for this, the storage must have announced it is ready, and it
  must delay identification of unknown clients (those for which it hasn't
  received yet a notification from the master).
- Writes must be accepted blindly (i.e. without taking a write-lock) when a
  storage node lacks the data to check for conflicts. This is possible because
  1 up-to-date cell (for each partition) is enough to do these checks.
- Because the client can not reliably know if a storage node is expected to
  receive a transaction in full, all writes must succeed.
- Even if the replication is finished, we have to wait that we don't have any
  lockless writes left before announcing to the master that we're up-to-date.

To sum up:
1. ask unfinished transactions -> (last_transaction, ttid_list)
2. replicate to last_transaction
3. wait for all ttid_list to be finished -> new last_transaction
4. replicate to last_transaction
5. no lockless write anymore, except to (oid, ttid) that were already
   stored/checked without taking a lock
6. wait for all transactions with lockless writes to be finished
7. announce we're up-to-date

For any failed write, the client marks the storage node as failed and stops
writing to it for the transaction. Unless there's no failed write, vote ends
with an extra request to the master: the transaction will only succeed if the
failed nodes can be disconnected, forcing them to replicate the missing data.

TODO: Packing and replication currently fail when they happen at the same time.
"""

import random

from neo.lib import logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \
    Packets, INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.util import add64, dump, p64
from .handlers.storage import StorageOperationHandler

FETCH_COUNT = 1000


class Partition(object):

    __slots__ = 'next_trans', 'next_obj', 'max_ttid'

    def __repr__(self):
        return '<%s(%s) at 0x%x>' % (self.__class__.__name__,
            ', '.join('%s=%r' % (x, getattr(self, x)) for x in self.__slots__
                                                      if hasattr(self, x)),
            id(self))

class Replicator(object):

    # When the replication of a partition is aborted, the connection to the
    # feeding node may still be open, e.g. on PT update from the master. In
    # such case, replication is also aborted on the other side but there may
    # be a few incoming packets that must be discarded.
    _conn_msg_id = None

    current_node = None
    current_partition = None

    def __init__(self, app):
        self.app = app

    def getCurrentConnection(self):
        node = self.current_node
        if node is not None and node.isConnected(True):
            return node.getConnection()

    def isReplicatingConnection(self, conn):
        return conn is self.getCurrentConnection() and \
            conn.getPeerId() == self._conn_msg_id

    def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
        """This is a callback from MasterOperationHandler."""
        assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list)
        if ttid_list:
            self.ttid_set.update(ttid_list)
            max_ttid = max(ttid_list)
        else:
            max_ttid = None
        for offset in offset_list:
            self.partition_dict[offset].max_ttid = max_ttid
            self.replicate_dict[offset] = max_tid
        self._nextPartition()

    def transactionFinished(self, ttid, max_tid=None):
        """ Callback from MasterOperationHandler """
        try:
            self.ttid_set.remove(ttid)
        except KeyError:
            assert max_tid is None, max_tid
            return
        min_ttid = min(self.ttid_set) if self.ttid_set else INVALID_TID
        for offset, p in self.partition_dict.iteritems():
            if p.max_ttid:
                if max_tid:
                    # Filling replicate_dict while there are still unfinished
                    # transactions for this partition is not the most
                    # efficient (due to the overhead of potentially replicating
                    # the last transactions in several times), but that's a
                    # simple way to make sure it is filled even if the
                    # remaining unfinished transactions are aborted.
                    self.replicate_dict[offset] = max_tid
                if p.max_ttid < min_ttid:
                    # no more unfinished transaction for this partition
                    if not (offset == self.current_partition
                            or offset in self.replicate_dict):
                        logging.debug(
                            "All unfinished transactions have been aborted."
                            " Mark partition %u as already fully replicated",
                            offset)
                        # We don't have anymore the previous value of
                        # self.replicate_dict[offset], but p.max_ttid is not
                        # wrong. Anyway here, we're not in backup mode and this
                        # value will be ignored.
                        # XXX: see NonReadableCell.__doc__
                        self.app.tm.replicated(offset, p.max_ttid)
                    p.max_ttid = None
        self._nextPartition()

    def getBackupTID(self):
        outdated_set = set(self.app.pt.getOutdatedOffsetListFor(self.app.uuid))
        tid = INVALID_TID
        for offset, p in self.partition_dict.iteritems():
            if offset not in outdated_set:
                tid = min(tid, p.next_trans, p.next_obj)
        if ZERO_TID != tid != INVALID_TID:
            return add64(tid, -1)
        return ZERO_TID

    def updateBackupTID(self, commit=False):
        dm = self.app.dm
        tid = dm.getBackupTID()
        if tid:
            new_tid = self.getBackupTID()
            if tid != new_tid:
                dm._setBackupTID(new_tid)
                if commit:
                    dm.commit()

    def startOperation(self, backup):
        dm = self.app.dm
        if backup:
            if dm.getBackupTID():
                assert not hasattr(self, 'partition_dict'), self.partition_dict
                return
            tid = dm.getLastIDs()[0] or ZERO_TID
        else:
            tid = None
        dm._setBackupTID(tid)
        dm.commit()
        try:
            partition_dict = self.partition_dict
        except AttributeError:
            return
        for offset, next_tid in dm.iterCellNextTIDs():
            if type(next_tid) is not bytes: # readable
                p = partition_dict[offset]
                p.next_trans, p.next_obj = next_tid

    def populate(self):
        self.partition_dict = {}
        self.replicate_dict = {}
        self.source_dict = {}
        self.ttid_set = set()
        outdated_list = []
        for offset, next_tid in self.app.dm.iterCellNextTIDs():
            self.partition_dict[offset] = p = Partition()
            if type(next_tid) is bytes: # OUT_OF_DATE
                outdated_list.append(offset)
                p.next_trans = p.next_obj = next_tid
                p.max_ttid = INVALID_TID
            else: # readable
                p.next_trans, p.next_obj = next_tid or (None, None)
                p.max_ttid = None
        if outdated_list:
            self.app.tm.replicating(outdated_list)

    def notifyPartitionChanges(self, cell_list):
        """This is a callback from MasterOperationHandler."""
        abort = False
        added_list = []
        discarded_list = []
        readable_list = []
        app = self.app
        for offset, uuid, state in cell_list:
            if uuid == app.uuid:
                if state in (CellStates.DISCARDED, CellStates.CORRUPTED):
                    try:
                        del self.partition_dict[offset]
                    except KeyError:
                        continue
                    self.replicate_dict.pop(offset, None)
                    self.source_dict.pop(offset, None)
                    abort = abort or self.current_partition == offset
                    discarded_list.append(offset)
                elif state == CellStates.OUT_OF_DATE:
                    assert offset not in self.partition_dict
                    self.partition_dict[offset] = p = Partition()
                    # New cell. 0 is also what should be stored by the backend.
                    # Nothing to optimize.
                    p.next_trans = p.next_obj = ZERO_TID
                    p.max_ttid = INVALID_TID
                    added_list.append(offset)
                else:
                    assert state in (CellStates.UP_TO_DATE,
                                     CellStates.FEEDING), state
                    readable_list.append(offset)
        tm = app.tm
        if added_list:
            tm.replicating(added_list)
        if discarded_list:
            tm.discarded(discarded_list)
        if readable_list:
            tm.readable(readable_list)
        if abort:
            self.abort()

    def backup(self, tid, source_dict):
        next_tid = None
        for offset, source in source_dict.iteritems():
            if source:
                self.source_dict[offset] = source
                self.replicate_dict[offset] = tid
            elif offset != self.current_partition and \
                 offset not in self.replicate_dict:
                # The master did its best to avoid useless replication orders
                # but there may still be a few, and we may receive redundant
                # update notification of backup_tid.
                # So, we do nothing here if we are already replicating.
                p = self.partition_dict[offset]
                if not next_tid:
                    next_tid = add64(tid, 1)
                p.next_trans = p.next_obj = next_tid
        if next_tid:
            self.updateBackupTID(True)
        self._nextPartition()

    def _nextPartitionSortKey(self, offset):
        p = self.partition_dict[offset]
        return p.next_obj, bool(p.max_ttid)

    def _nextPartition(self):
        # XXX: One connection to another storage may remain open forever.
        #      All other previous connections are automatically closed
        #      after some time of inactivity.
        #      This should be improved in several ways:
        #      - Keeping connections open between 2 clusters (backup case) is
        #        quite a good thing because establishing a connection costs
        #        time/bandwidth and replication is actually never finished.
        #      - When all storages of a non-backup cluster are up-to-date,
        #        there's no reason to keep any connection open.
        if self.current_partition is not None or not self.replicate_dict:
            return
        app = self.app
        assert app.master_conn and app.operational, (
            app.master_conn, app.operational)
        # Start replicating the partition which is furthest behind,
        # to increase the overall backup_tid as soon as possible.
        # Then prefer a partition with no unfinished transaction.
        # XXX: When leaving backup mode, we should only consider UP_TO_DATE
        #      cells.
        offset = min(self.replicate_dict, key=self._nextPartitionSortKey)
        try:
            addr, name = self.source_dict[offset]
        except KeyError:
            assert app.pt.getCell(offset, app.uuid).isOutOfDate(), (
                offset, app.pt.getCell(offset, app.uuid).getState())
            node = random.choice([cell.getNode()
                for cell in app.pt.getCellList(offset, readable=True)
                if cell.getNodeState() == NodeStates.RUNNING])
            name = None
        else:
            node = app.nm.getByAddress(addr)
            if node is None:
                assert name, addr
                node = app.nm.createStorage(address=addr)
        self.current_partition = offset
        previous_node = self.current_node
        self.current_node = node
        if node.isConnected(connecting=True):
            if node.isIdentified():
                node.getConnection().asClient()
                self.fetchTransactions()
        else:
            assert name or node.getUUID() != app.uuid, "loopback connection"
            conn = ClientConnection(app, StorageOperationHandler(app), node)
            try:
                conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
                    None if name else app.uuid, app.server, name or app.name,
                    app.id_timestamp))
            except ConnectionClosed:
                if previous_node is self.current_node:
                    return
        if previous_node is not None and previous_node.isConnected():
            app.closeClient(previous_node.getConnection())

    def connected(self, node):
        if self.current_node is node and self.current_partition is not None:
            self.fetchTransactions()

    def fetchTransactions(self, min_tid=None):
        assert self.current_node.getConnection().isClient(), self.current_node
        offset = self.current_partition
        p = self.partition_dict[offset]
        if min_tid:
            p.next_trans = min_tid
        else:
            try:
                addr, name = self.source_dict[offset]
            except KeyError:
                pass
            else:
                if addr != self.current_node.getAddress():
                    return self.abort()
            min_tid = p.next_trans
            self.replicate_tid = self.replicate_dict.pop(offset)
            logging.debug("starting replication of <partition=%u"
                " min_tid=%s max_tid=%s> from %r", offset, dump(min_tid),
                dump(self.replicate_tid), self.current_node)
        max_tid = self.replicate_tid
        tid_list = self.app.dm.getReplicationTIDList(min_tid, max_tid,
            FETCH_COUNT, offset)
        self._conn_msg_id = self.current_node.ask(Packets.AskFetchTransactions(
            offset, FETCH_COUNT, min_tid, max_tid, tid_list))

    def fetchObjects(self, min_tid=None, min_oid=ZERO_OID):
        offset = self.current_partition
        p = self.partition_dict[offset]
        max_tid = self.replicate_tid
        if min_tid:
            p.next_obj = min_tid
        else:
            min_tid = p.next_obj
            p.next_trans = add64(max_tid, 1)
        object_dict = {}
        for serial, oid in self.app.dm.getReplicationObjectList(min_tid,
                max_tid, FETCH_COUNT, offset, min_oid):
            try:
                object_dict[serial].append(oid)
            except KeyError:
                object_dict[serial] = [oid]
        self._conn_msg_id = self.current_node.ask(Packets.AskFetchObjects(
            offset, FETCH_COUNT, min_tid, max_tid, min_oid, object_dict))

    def finish(self):
        offset = self.current_partition
        tid = self.replicate_tid
        del self.current_partition, self._conn_msg_id, self.replicate_tid
        p = self.partition_dict[offset]
        p.next_obj = add64(tid, 1)
        self.updateBackupTID()
        app = self.app
        app.dm.updateCellTID(offset, tid)
        app.dm.commit()
        if p.max_ttid or offset in self.replicate_dict and \
                         offset not in self.source_dict:
            logging.debug("unfinished transactions: %r", self.ttid_set)
        else:
            app.tm.replicated(offset, tid)
        logging.debug("partition %u replicated up to %s from %r",
                      offset, dump(tid), self.current_node)
        self.getCurrentConnection().setReconnectionNoDelay()
        self._nextPartition()

    def abort(self, message=''):
        offset = self.current_partition
        if offset is None:
            return
        del self.current_partition
        self._conn_msg_id = None
        logging.warning('replication aborted for partition %u%s',
                        offset, message and ' (%s)' % message)
        if offset in self.partition_dict:
            # XXX: Try another partition if possible, to increase probability to
            #      connect to another node. It would be better to explicitly
            #      search for another node instead.
            tid = self.replicate_dict.pop(offset, None) or self.replicate_tid
            if self.replicate_dict:
                self._nextPartition()
                self.replicate_dict[offset] = tid
            else:
                self.replicate_dict[offset] = tid
                self._nextPartition()
        else: # partition removed
            self._nextPartition()

    def stop(self):
        # Close any open connection to an upstream storage,
        # possibly aborting current replication.
        node = self.current_node
        if node is not None is node.getUUID():
            offset = self.current_partition
            if offset is not None:
                logging.info('cancel replication of partition %u', offset)
                del self.current_partition
                if self._conn_msg_id is not None:
                    self.replicate_dict.setdefault(offset, self.replicate_tid)
                    del self._conn_msg_id, self.replicate_tid
                self.getCurrentConnection().close()
        # Cancel all replication orders from upstream cluster.
        for offset in self.replicate_dict.keys():
            addr, name = self.source_dict.get(offset, (None, None))
            if name:
                tid = self.replicate_dict.pop(offset)
                logging.info('cancel replication of partition %u from %r'
                             ' up to %s', offset, addr, dump(tid))
        # Make UP_TO_DATE cells really UP_TO_DATE
        self._nextPartition()