replicator.py 19.9 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
2
# Copyright (C) 2006-2017  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17 18
"""
Replication algorithm
19

20 21 22 23 24 25
Purpose: replicate the content of a reference node into a replicating node,
bringing it up-to-date. This happens in the following cases:
- A new storage is added to en existing cluster.
- A node was separated from cluster and rejoins it.
- In a backup cluster, the master notifies a node that new data exists upstream
  (note that in this case, the cell is always marked as UP_TO_DATE).
26

27 28
Replication happens per partition. Reference node can change between
partitions.
29

30 31
2 parts, done sequentially:
- Transaction (metadata) replication
32
- Object (metadata+data) replication
33

34 35 36 37 38 39
Both parts follow the same mechanism:
- The range of data to replicate is split into chunks of FETCH_COUNT items
  (transaction or object).
- For every chunk, the requesting node sends to seeding node the list of items
  it already has.
- Before answering, the seeding node sends 1 packet for every missing item.
40 41
  For items that are already on the replicating node, there is no check that
  values matches.
42 43
- The seeding node finally answers with the list of items to delete (usually
  empty).
44

45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
Internal replication, which is similar to RAID1 (and as opposed to asynchronous
replication to a backup cluster) requires extra care with respect to
transactions. The transition of a cell from OUT_OF_DATE to UP_TO_DATE is done
is several steps.

A replicating node can not depend on other nodes to fetch the data
recently/being committed because that can not be done atomically: it could miss
writes between the processing of its request by a source node and the reception
of the answer.

Therefore, outdated cells are writable: a storage node asks the master for
transactions being committed and then it is expected to fully receive from the
client any transaction that is started after this answer.

Which has in turn other consequences:
- The client must not fail to write to a storage node after the above request
  to the master: for this, the storage must have announced it is ready, and it
  must delay identification of unknown clients (those for which it hasn't
  received yet a notification from the master).
- Writes must be accepted blindly (i.e. without taking a write-lock) when a
  storage node lacks the data to check for conflicts. This is possible because
  1 up-to-date cell (for each partition) is enough to do these checks.
- Because the client can not reliably know if a storage node is expected to
  receive a transaction in full, all writes must succeed.
- Even if the replication is finished, we have to wait that we don't have any
  lockless writes left before announcing to the master that we're up-to-date.

To sum up:
1. ask unfinished transactions -> (last_transaction, ttid_list)
2. replicate to last_transaction
3. wait for all ttid_list to be finished -> new last_transaction
4. replicate to last_transaction
5. no lockless write anymore, except to (oid, ttid) that were already
   stored/checked without taking a lock
6. wait for all transactions with lockless writes to be finished
7. announce we're up-to-date

For any failed write, the client marks the storage node as failed and stops
writing to it for the transaction. Unless there's no failed write, vote ends
with an extra request to the master: the transaction will only succeed if the
failed nodes can be disconnected, forcing them to replicate the missing data.
86

Julien Muchembled's avatar
Julien Muchembled committed
87
TODO: Packing and replication currently fail when they happen at the same time.
88
"""
89

90
import random
91

92 93
from neo.lib import logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \
Julien Muchembled's avatar
Julien Muchembled committed
94
    Packets, INVALID_TID, ZERO_TID, ZERO_OID
95
from neo.lib.connection import ClientConnection, ConnectionClosed
96
from neo.lib.util import add64, dump, p64
97 98 99
from .handlers.storage import StorageOperationHandler

FETCH_COUNT = 1000
100 101


102
class Partition(object):
103

104
    __slots__ = 'next_trans', 'next_obj', 'max_ttid'
105 106

    def __repr__(self):
107 108 109 110
        return '<%s(%s) at 0x%x>' % (self.__class__.__name__,
            ', '.join('%s=%r' % (x, getattr(self, x)) for x in self.__slots__
                                                      if hasattr(self, x)),
            id(self))
111

112
class Replicator(object):
Vincent Pelletier's avatar
Vincent Pelletier committed
113

114 115 116 117 118 119
    # When the replication of a partition is aborted, the connection to the
    # feeding node may still be open, e.g. on PT update from the master. In
    # such case, replication is also aborted on the other side but there may
    # be a few incoming packets that must be discarded.
    _conn_msg_id = None

120
    current_node = None
121 122
    current_partition = None

123 124
    def __init__(self, app):
        self.app = app
125

126 127
    def getCurrentConnection(self):
        node = self.current_node
128
        if node is not None and node.isConnected(True):
129
            return node.getConnection()
130

131 132 133 134
    def isReplicatingConnection(self, conn):
        return conn is self.getCurrentConnection() and \
            conn.getPeerId() == self._conn_msg_id

135
    def setUnfinishedTIDList(self, max_tid, ttid_list, offset_list):
136
        """This is a callback from MasterOperationHandler."""
137
        assert self.ttid_set.issubset(ttid_list), (self.ttid_set, ttid_list)
138 139 140 141 142 143 144 145 146
        if ttid_list:
            self.ttid_set.update(ttid_list)
            max_ttid = max(ttid_list)
        else:
            max_ttid = None
        for offset in offset_list:
            self.partition_dict[offset].max_ttid = max_ttid
            self.replicate_dict[offset] = max_tid
        self._nextPartition()
147

148
    def transactionFinished(self, ttid, max_tid=None):
149
        """ Callback from MasterOperationHandler """
150 151 152 153 154
        try:
            self.ttid_set.remove(ttid)
        except KeyError:
            assert max_tid is None, max_tid
            return
155 156
        min_ttid = min(self.ttid_set) if self.ttid_set else INVALID_TID
        for offset, p in self.partition_dict.iteritems():
157
            if p.max_ttid:
158
                if max_tid:
159 160 161 162 163 164
                    # Filling replicate_dict while there are still unfinished
                    # transactions for this partition is not the most
                    # efficient (due to the overhead of potentially replicating
                    # the last transactions in several times), but that's a
                    # simple way to make sure it is filled even if the
                    # remaining unfinished transactions are aborted.
165
                    self.replicate_dict[offset] = max_tid
166 167 168 169 170 171 172 173 174 175 176 177
                if p.max_ttid < min_ttid:
                    # no more unfinished transaction for this partition
                    if not (offset == self.current_partition
                            or offset in self.replicate_dict):
                        logging.debug(
                            "All unfinished transactions have been aborted."
                            " Mark partition %u as already fully replicated",
                            offset)
                        # We don't have anymore the previous value of
                        # self.replicate_dict[offset], but p.max_ttid is not
                        # wrong. Anyway here, we're not in backup mode and this
                        # value will be ignored.
178
                        # XXX: see NonReadableCell.__doc__
179 180
                        self.app.tm.replicated(offset, p.max_ttid)
                    p.max_ttid = None
181 182 183 184 185 186 187 188
        self._nextPartition()

    def getBackupTID(self):
        outdated_set = set(self.app.pt.getOutdatedOffsetListFor(self.app.uuid))
        tid = INVALID_TID
        for offset, p in self.partition_dict.iteritems():
            if offset not in outdated_set:
                tid = min(tid, p.next_trans, p.next_obj)
189
        if ZERO_TID != tid != INVALID_TID:
190
            return add64(tid, -1)
191
        return ZERO_TID
192

193
    def updateBackupTID(self, commit=False):
Julien Muchembled's avatar
Julien Muchembled committed
194 195 196 197 198
        dm = self.app.dm
        tid = dm.getBackupTID()
        if tid:
            new_tid = self.getBackupTID()
            if tid != new_tid:
199
                dm._setBackupTID(new_tid)
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
                if commit:
                    dm.commit()

    def startOperation(self, backup):
        dm = self.app.dm
        if backup:
            if dm.getBackupTID():
                assert not hasattr(self, 'partition_dict'), self.partition_dict
                return
            tid = dm.getLastIDs()[0] or ZERO_TID
        else:
            tid = None
        dm._setBackupTID(tid)
        dm.commit()
        try:
            partition_dict = self.partition_dict
        except AttributeError:
            return
        for offset, next_tid in dm.iterCellNextTIDs():
            if type(next_tid) is not bytes: # readable
                p = partition_dict[offset]
                p.next_trans, p.next_obj = next_tid
Julien Muchembled's avatar
Julien Muchembled committed
222

223
    def populate(self):
224
        self.partition_dict = {}
225 226 227 228
        self.replicate_dict = {}
        self.source_dict = {}
        self.ttid_set = set()
        outdated_list = []
229 230 231 232 233 234 235 236 237
        for offset, next_tid in self.app.dm.iterCellNextTIDs():
            self.partition_dict[offset] = p = Partition()
            if type(next_tid) is bytes: # OUT_OF_DATE
                outdated_list.append(offset)
                p.next_trans = p.next_obj = next_tid
                p.max_ttid = INVALID_TID
            else: # readable
                p.next_trans, p.next_obj = next_tid or (None, None)
                p.max_ttid = None
238
        if outdated_list:
239
            self.app.tm.replicating(outdated_list)
240 241 242 243 244

    def notifyPartitionChanges(self, cell_list):
        """This is a callback from MasterOperationHandler."""
        abort = False
        added_list = []
245 246
        discarded_list = []
        readable_list = []
247 248 249
        app = self.app
        for offset, uuid, state in cell_list:
            if uuid == app.uuid:
250 251 252 253 254
                if state in (CellStates.DISCARDED, CellStates.CORRUPTED):
                    try:
                        del self.partition_dict[offset]
                    except KeyError:
                        continue
255 256 257
                    self.replicate_dict.pop(offset, None)
                    self.source_dict.pop(offset, None)
                    abort = abort or self.current_partition == offset
258
                    discarded_list.append(offset)
259 260 261
                elif state == CellStates.OUT_OF_DATE:
                    assert offset not in self.partition_dict
                    self.partition_dict[offset] = p = Partition()
262 263 264
                    # New cell. 0 is also what should be stored by the backend.
                    # Nothing to optimize.
                    p.next_trans = p.next_obj = ZERO_TID
265 266
                    p.max_ttid = INVALID_TID
                    added_list.append(offset)
267 268 269 270 271
                else:
                    assert state in (CellStates.UP_TO_DATE,
                                     CellStates.FEEDING), state
                    readable_list.append(offset)
        tm = app.tm
272
        if added_list:
273 274 275 276 277
            tm.replicating(added_list)
        if discarded_list:
            tm.discarded(discarded_list)
        if readable_list:
            tm.readable(readable_list)
278 279 280 281
        if abort:
            self.abort()

    def backup(self, tid, source_dict):
Julien Muchembled's avatar
Julien Muchembled committed
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
        next_tid = None
        for offset, source in source_dict.iteritems():
            if source:
                self.source_dict[offset] = source
                self.replicate_dict[offset] = tid
            elif offset != self.current_partition and \
                 offset not in self.replicate_dict:
                # The master did its best to avoid useless replication orders
                # but there may still be a few, and we may receive redundant
                # update notification of backup_tid.
                # So, we do nothing here if we are already replicating.
                p = self.partition_dict[offset]
                if not next_tid:
                    next_tid = add64(tid, 1)
                p.next_trans = p.next_obj = next_tid
        if next_tid:
298
            self.updateBackupTID(True)
299 300
        self._nextPartition()

301 302 303 304
    def _nextPartitionSortKey(self, offset):
        p = self.partition_dict[offset]
        return p.next_obj, bool(p.max_ttid)

305 306 307 308 309 310 311 312 313 314 315
    def _nextPartition(self):
        # XXX: One connection to another storage may remain open forever.
        #      All other previous connections are automatically closed
        #      after some time of inactivity.
        #      This should be improved in several ways:
        #      - Keeping connections open between 2 clusters (backup case) is
        #        quite a good thing because establishing a connection costs
        #        time/bandwidth and replication is actually never finished.
        #      - When all storages of a non-backup cluster are up-to-date,
        #        there's no reason to keep any connection open.
        if self.current_partition is not None or not self.replicate_dict:
316
            return
317
        app = self.app
318 319
        assert app.master_conn and app.operational, (
            app.master_conn, app.operational)
320 321 322
        # Start replicating the partition which is furthest behind,
        # to increase the overall backup_tid as soon as possible.
        # Then prefer a partition with no unfinished transaction.
323
        # XXX: When leaving backup mode, we should only consider UP_TO_DATE
Julien Muchembled's avatar
Julien Muchembled committed
324
        #      cells.
325
        offset = min(self.replicate_dict, key=self._nextPartitionSortKey)
326
        try:
327
            addr, name = self.source_dict[offset]
328
        except KeyError:
329 330
            assert app.pt.getCell(offset, app.uuid).isOutOfDate(), (
                offset, app.pt.getCell(offset, app.uuid).getState())
331 332 333 334
            node = random.choice([cell.getNode()
                for cell in app.pt.getCellList(offset, readable=True)
                if cell.getNodeState() == NodeStates.RUNNING])
            name = None
335
        else:
336 337 338 339 340 341 342
            node = app.nm.getByAddress(addr)
            if node is None:
                assert name, addr
                node = app.nm.createStorage(address=addr)
        self.current_partition = offset
        previous_node = self.current_node
        self.current_node = node
343 344 345 346
        if node.isConnected(connecting=True):
            if node.isIdentified():
                node.getConnection().asClient()
                self.fetchTransactions()
347 348
        else:
            assert name or node.getUUID() != app.uuid, "loopback connection"
349
            conn = ClientConnection(app, StorageOperationHandler(app), node)
350 351
            try:
                conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
352 353
                    None if name else app.uuid, app.server, name or app.name,
                    app.id_timestamp))
354 355 356
            except ConnectionClosed:
                if previous_node is self.current_node:
                    return
357
        if previous_node is not None and previous_node.isConnected():
358 359 360 361 362
            app.closeClient(previous_node.getConnection())

    def connected(self, node):
        if self.current_node is node and self.current_partition is not None:
            self.fetchTransactions()
363 364

    def fetchTransactions(self, min_tid=None):
365
        assert self.current_node.getConnection().isClient(), self.current_node
366 367 368 369
        offset = self.current_partition
        p = self.partition_dict[offset]
        if min_tid:
            p.next_trans = min_tid
370
        else:
371 372 373 374 375 376 377 378 379
            try:
                addr, name = self.source_dict[offset]
            except KeyError:
                pass
            else:
                if addr != self.current_node.getAddress():
                    return self.abort()
            min_tid = p.next_trans
            self.replicate_tid = self.replicate_dict.pop(offset)
380
            logging.debug("starting replication of <partition=%u"
381 382
                " min_tid=%s max_tid=%s> from %r", offset, dump(min_tid),
                dump(self.replicate_tid), self.current_node)
383 384 385
        max_tid = self.replicate_tid
        tid_list = self.app.dm.getReplicationTIDList(min_tid, max_tid,
            FETCH_COUNT, offset)
386
        self._conn_msg_id = self.current_node.ask(Packets.AskFetchTransactions(
387 388 389 390 391 392 393
            offset, FETCH_COUNT, min_tid, max_tid, tid_list))

    def fetchObjects(self, min_tid=None, min_oid=ZERO_OID):
        offset = self.current_partition
        p = self.partition_dict[offset]
        max_tid = self.replicate_tid
        if min_tid:
Julien Muchembled's avatar
Julien Muchembled committed
394
            p.next_obj = min_tid
395 396
        else:
            min_tid = p.next_obj
Julien Muchembled's avatar
Julien Muchembled committed
397
            p.next_trans = add64(max_tid, 1)
398 399 400 401 402 403 404
        object_dict = {}
        for serial, oid in self.app.dm.getReplicationObjectList(min_tid,
                max_tid, FETCH_COUNT, offset, min_oid):
            try:
                object_dict[serial].append(oid)
            except KeyError:
                object_dict[serial] = [oid]
405
        self._conn_msg_id = self.current_node.ask(Packets.AskFetchObjects(
406 407 408 409 410
            offset, FETCH_COUNT, min_tid, max_tid, min_oid, object_dict))

    def finish(self):
        offset = self.current_partition
        tid = self.replicate_tid
411
        del self.current_partition, self._conn_msg_id, self.replicate_tid
412 413
        p = self.partition_dict[offset]
        p.next_obj = add64(tid, 1)
Julien Muchembled's avatar
Julien Muchembled committed
414
        self.updateBackupTID()
415 416 417
        app = self.app
        app.dm.updateCellTID(offset, tid)
        app.dm.commit()
418 419
        if p.max_ttid or offset in self.replicate_dict and \
                         offset not in self.source_dict:
420 421
            logging.debug("unfinished transactions: %r", self.ttid_set)
        else:
422
            app.tm.replicated(offset, tid)
423 424
        logging.debug("partition %u replicated up to %s from %r",
                      offset, dump(tid), self.current_node)
425
        self.getCurrentConnection().setReconnectionNoDelay()
426 427 428 429 430
        self._nextPartition()

    def abort(self, message=''):
        offset = self.current_partition
        if offset is None:
431
            return
432
        del self.current_partition
433
        self._conn_msg_id = None
434 435
        logging.warning('replication aborted for partition %u%s',
                        offset, message and ' (%s)' % message)
436 437
        if offset in self.partition_dict:
            # XXX: Try another partition if possible, to increase probability to
438
            #      connect to another node. It would be better to explicitly
439 440 441 442 443 444 445 446 447 448
            #      search for another node instead.
            tid = self.replicate_dict.pop(offset, None) or self.replicate_tid
            if self.replicate_dict:
                self._nextPartition()
                self.replicate_dict[offset] = tid
            else:
                self.replicate_dict[offset] = tid
                self._nextPartition()
        else: # partition removed
            self._nextPartition()
Julien Muchembled's avatar
Julien Muchembled committed
449 450

    def stop(self):
451 452 453 454
        # Close any open connection to an upstream storage,
        # possibly aborting current replication.
        node = self.current_node
        if node is not None is node.getUUID():
455 456 457 458 459 460 461 462
            offset = self.current_partition
            if offset is not None:
                logging.info('cancel replication of partition %u', offset)
                del self.current_partition
                if self._conn_msg_id is not None:
                    self.replicate_dict.setdefault(offset, self.replicate_tid)
                    del self._conn_msg_id, self.replicate_tid
                self.getCurrentConnection().close()
Julien Muchembled's avatar
Julien Muchembled committed
463 464
        # Cancel all replication orders from upstream cluster.
        for offset in self.replicate_dict.keys():
465
            addr, name = self.source_dict.get(offset, (None, None))
Julien Muchembled's avatar
Julien Muchembled committed
466 467 468 469
            if name:
                tid = self.replicate_dict.pop(offset)
                logging.info('cancel replication of partition %u from %r'
                             ' up to %s', offset, addr, dump(tid))
470 471
        # Make UP_TO_DATE cells really UP_TO_DATE
        self._nextPartition()