replicator.py 9.18 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from neo import logging
19 20
from random import choice

Grégory Wisniewski's avatar
Grégory Wisniewski committed
21
from neo.storage.handlers import replication
22
from neo.protocol import NodeTypes, NodeStates, CellStates, Packets, ZERO_TID
23
from neo.connection import ClientConnection
24
from neo.util import dump
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39

class Partition(object):
    """This class abstracts the state of a partition."""

    def __init__(self, rid):
        self.rid = rid
        self.tid = None

    def getRID(self):
        return self.rid

    def getCriticalTID(self):
        return self.tid

    def setCriticalTID(self, tid):
40
        if tid is None:
41
            tid = ZERO_TID
42 43
        self.tid = tid

44 45 46 47
    def safe(self, min_pending_tid):
        tid = self.tid
        return tid is not None and (
            min_pending_tid is None or tid < min_pending_tid)
48 49 50

class Replicator(object):
    """This class handles replications of objects and transactions.
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
    Assumptions:

        - Client nodes recognize partition changes reasonably quickly.

        - When an out of date partition is added, next transaction ID
          is given after the change is notified and serialized.

    Procedures:

        - Get the last TID right after a partition is added. This TID
          is called a "critical TID", because this and TIDs before this
          may not be present in this storage node yet. After a critical
          TID, all transactions must exist in this storage node.

        - Check if a primary master node still has pending transactions
          before and at a critical TID. If so, I must wait for them to be
          committed or aborted.

        - In order to copy data, first get the list of TIDs. This is done
          part by part, because the list can be very huge. When getting
          a part of the list, I verify if they are in my database, and
          ask data only for non-existing TIDs. This is performed until
          the check reaches a critical TID.

        - Next, get the list of OIDs. And, for each OID, ask the history,
          namely, a list of serials. This is also done part by part, and
          I ask only non-existing data. """

    def __init__(self, app):
        self.app = app
82 83 84 85 86 87 88

    def populate(self):
        """
        Populate partitions to replicate. Must be called when partition
        table is the one accepted by primary master.
        Implies a reset.
        """
89
        self.new_partition_dict = self._getOutdatedPartitionList()
90
        self.critical_tid_dict = {}
91
        self.reset()
92 93 94 95 96 97 98 99

    def reset(self):
        """Reset attributes to restart replicating."""
        self.current_partition = None
        self.current_connection = None
        self.waiting_for_unfinished_tids = False
        self.unfinished_tid_list = None
        self.replication_done = True
100
        self.partition_dict = {}
101 102 103

    def _getOutdatedPartitionList(self):
        app = self.app
104
        partition_dict = {}
105
        for offset in xrange(app.pt.getPartitions()):
106
            for uuid, state in app.pt.getRow(offset):
107
                if uuid == app.uuid and state == CellStates.OUT_OF_DATE:
108 109
                    partition_dict[offset] = Partition(offset)
        return partition_dict
110 111 112

    def pending(self):
        """Return whether there is any pending partition."""
113
        return len(self.partition_dict) or len(self.new_partition_dict)
114

115
    def setCriticalTID(self, uuid, tid):
116
        """This is a callback from MasterOperationHandler."""
117
        try:
118 119
            partition_list = self.critical_tid_dict[uuid]
            logging.debug('setting critical TID %s to %s', dump(tid),
120
                         ', '.join([str(p.getRID()) for p in partition_list]))
121
            for partition in self.critical_tid_dict[uuid]:
122
                partition.setCriticalTID(tid)
123
            del self.critical_tid_dict[uuid]
124
        except KeyError:
125 126
            logging.debug("setCriticalTID raised KeyError for %s" %
                    (dump(uuid), ))
127 128

    def _askCriticalTID(self):
129
        conn = self.app.master_conn
130 131 132
        conn.ask(Packets.AskLastIDs())
        uuid = conn.getUUID()
        self.critical_tid_dict[uuid] = self.new_partition_dict.values()
133 134
        self.partition_dict.update(self.new_partition_dict)
        self.new_partition_dict = {}
135 136

    def setUnfinishedTIDList(self, tid_list):
137
        """This is a callback from MasterOperationHandler."""
138 139
        logging.debug('setting unfinished TIDs %s',
                      ','.join([dump(tid) for tid in tid_list]))
140 141 142 143
        self.waiting_for_unfinished_tids = False
        self.unfinished_tid_list = tid_list

    def _askUnfinishedTIDs(self):
144
        conn = self.app.master_conn
145
        conn.ask(Packets.AskUnfinishedTransactions())
146 147 148 149 150 151
        self.waiting_for_unfinished_tids = True

    def _startReplication(self):
        # Choose a storage node for the source.
        app = self.app
        try:
152
            cell_list = app.pt.getCellList(self.current_partition.getRID(),
153
                                           readable=True)
154
            node_list = [cell.getNode() for cell in cell_list
155
                            if cell.getNodeState() == NodeStates.RUNNING]
156
            node = choice(node_list)
157
        except IndexError:
158
            # Not operational.
159 160
            logging.error('not operational', exc_info = 1)
            self.current_partition = None
161 162
            return

163
        addr = node.getAddress()
164
        if addr is None:
165
            logging.error("no address known for the selected node %s" %
166
                    (dump(node.getUUID()), ))
167
            return
168
        if self.current_connection is not None:
169
            if self.current_connection.getAddress() != addr:
170 171 172 173
                self.current_connection.close()
                self.current_connection = None

        if self.current_connection is None:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
174
            handler = replication.ReplicationHandler(app)
175
            self.current_connection = ClientConnection(app.em, handler,
176
                   addr=addr, connector=app.connector_handler())
177
            p = Packets.RequestIdentification(NodeTypes.STORAGE,
178
                    app.uuid, app.server, app.name)
179
            self.current_connection.ask(p)
180

181 182
        p = Packets.AskTIDsFrom(ZERO_TID, 1000,
            self.current_partition.getRID())
183
        self.current_connection.ask(p, timeout=300)
184 185 186

        self.replication_done = False

187 188
    def _finishReplication(self):
        app = self.app
189
        # TODO: remove try..except: pass
190
        try:
191
            self.partition_dict.pop(self.current_partition.getRID())
192
            # Notify to a primary master node that my cell is now up-to-date.
193
            conn = self.app.master_conn
194 195
            offset = self.current_partition.getRID()
            conn.notify(Packets.NotifyReplicationDone(offset))
196
        except KeyError:
197 198 199
            pass
        self.current_partition = None

200 201 202
    def act(self):
        # If the new partition list is not empty, I must ask a critical
        # TID to a primary master node.
203
        if self.new_partition_dict:
204
            self._askCriticalTID()
205

206
        if self.current_partition is not None:
207
            if self.replication_done:
208
                # finish a replication
209
                logging.info('replication is done for %s' %
210
                        (self.current_partition.getRID(), ))
211
                self._finishReplication()
212 213 214 215 216 217 218 219 220 221 222 223 224 225
            return

        if self.waiting_for_unfinished_tids:
            # Still waiting.
            logging.debug('waiting for unfinished tids')
            return

        if self.unfinished_tid_list is None:
            # Ask pending transactions.
            logging.debug('asking unfinished tids')
            self._askUnfinishedTIDs()
            return

        # Try to select something.
226 227 228 229 230
        if len(self.unfinished_tid_list):
            min_unfinished_tid = min(self.unfinished_tid_list)
        else:
            min_unfinished_tid = None
        self.unfinished_tid_list = None
231
        for partition in self.partition_dict.values():
232
            if partition.safe(min_unfinished_tid):
233 234 235 236 237 238 239 240
                self.current_partition = partition
                break
        else:
            # Not yet.
            logging.debug('not ready yet')
            return

        self._startReplication()
241 242

    def removePartition(self, rid):
243
        """This is a callback from MasterOperationHandler."""
244 245
        self.partition_dict.pop(rid, None)
        self.new_partition_dict.pop(rid, None)
246 247

    def addPartition(self, rid):
248
        """This is a callback from MasterOperationHandler."""
249 250 251
        if not self.partition_dict.has_key(rid) \
                and not self.new_partition_dict.has_key(rid):
            self.new_partition_dict[rid] = Partition(rid)
252