pt.py 10.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 18

import neo.pt
19
from struct import pack, unpack
20
from neo.protocol import CellStates
21 22 23 24

class PartitionTable(neo.pt.PartitionTable):
    """This class manages a partition table for the primary master node"""

25 26 27 28
    def setID(self, id):
        self.id = id

    def setNextID(self):
29
        if self.id is None:
30 31 32 33 34
            raise RuntimeError, 'I do not know the last Partition Table ID'
        last_id = unpack('!Q', self.id)[0]
        self.id = pack('!Q', last_id + 1)
        return self.id

35 36 37
    def getPartition(self, oid_or_tid):
        return unpack('!Q', oid_or_tid)[0] % self.getPartitions()

38 39
    def make(self, node_list):
        """Make a new partition table from scratch."""
40 41
        # start with the first PTID
        self.id = pack('!Q', 1)
42
        # First, filter the list of nodes.
43 44
        node_list = [n for n in node_list if n.isRunning() \
                and n.getUUID() is not None]
45 46
        if len(node_list) == 0:
            # Impossible.
47 48
            raise RuntimeError, 'cannot make a partition table with an ' \
                    'empty storage node list'
49

50 51
        # Take it into account that the number of storage nodes may be less 
        # than the number of replicas.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
        repeats = min(self.nr + 1, len(node_list))
        index = 0
        for offset in xrange(self.np):
            row = []
            for i in xrange(repeats):
                node = node_list[index]
                row.append(neo.pt.Cell(node))
                self.count_dict[node] = self.count_dict.get(node, 0) + 1
                index += 1
                if index == len(node_list):
                    index = 0
            self.partition_list[offset] = row

        self.num_filled_rows = self.np

    def findLeastUsedNode(self, excluded_node_list = ()):
        min_count = self.np + 1
        min_node = None
        for node, count in self.count_dict.iteritems():
            if min_count > count \
                    and node not in excluded_node_list \
73
                    and node.isRunning():
74 75 76 77 78 79 80 81
                min_node = node
                min_count = count
        return min_node

    def dropNode(self, node):
        cell_list = []
        uuid = node.getUUID()
        for offset, row in enumerate(self.partition_list):
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
            if row is None:
                continue
            for cell in row:
                if cell.getNode() is node:
                    if not cell.isFeeding():
                        # If this cell is not feeding, find another node
                        # to be added.
                        node_list = [c.getNode() for c in row]
                        n = self.findLeastUsedNode(node_list)
                        if n is not None:
                            row.append(neo.pt.Cell(n, CellStates.OUT_OF_DATE))
                            self.count_dict[n] += 1
                            cell_list.append((offset, n.getUUID(),
                                              CellStates.OUT_OF_DATE))
                    row.remove(cell)
                    cell_list.append((offset, uuid, CellStates.DISCARDED))
                    break
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124

        try:
            del self.count_dict[node]
        except KeyError:
            pass

        return cell_list

    def addNode(self, node):
        """Add a node. Take it into account that it might not be really a new
        node. The strategy is, if a row does not contain a good number of
        cells, add this node to the row, unless the node is already present
        in the same row. Otherwise, check if this node should replace another
        cell."""
        cell_list = []
        node_count = self.count_dict.get(node, 0)
        for offset, row in enumerate(self.partition_list):
            feeding_cell = None
            max_count = 0
            max_cell = None
            num_cells = 0
            skip = False
            for cell in row:
                if cell.getNode() == node:
                    skip = True
                    break
125
                if cell.isFeeding():
126 127 128 129 130 131 132 133 134 135 136 137
                    feeding_cell = cell
                else:
                    num_cells += 1
                    count = self.count_dict[cell.getNode()]
                    if count > max_count:
                        max_count = count
                        max_cell = cell

            if skip:
                continue

            if num_cells <= self.nr:
138
                row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE))
139 140
                cell_list.append((offset, node.getUUID(), 
                    CellStates.OUT_OF_DATE))
141 142 143
                node_count += 1
            else:
                if max_count - node_count > 1:
144
                    if feeding_cell is not None or max_cell.isOutOfDate():
145 146 147 148
                        # If there is a feeding cell already or it is
                        # out-of-date, just drop the node.
                        row.remove(max_cell)
                        cell_list.append((offset, max_cell.getUUID(), 
149
                                          CellStates.DISCARDED))
150 151 152
                        self.count_dict[max_cell.getNode()] -= 1
                    else:
                        # Otherwise, use it as a feeding cell for safety.
153
                        max_cell.setState(CellStates.FEEDING)
154
                        cell_list.append((offset, max_cell.getUUID(), 
155
                                          CellStates.FEEDING))
156 157
                        # Don't count a feeding cell.
                        self.count_dict[max_cell.getNode()] -= 1
158
                    row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE))
159
                    cell_list.append((offset, node.getUUID(), 
160
                                      CellStates.OUT_OF_DATE))
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
                    node_count += 1

        self.count_dict[node] = node_count
        self.log()
        return cell_list

    def tweak(self):
        """Test if nodes are distributed uniformly. Otherwise, correct the
        partition table."""
        changed_cell_list = []

        for offset, row in enumerate(self.partition_list):
            removed_cell_list = []
            feeding_cell = None
            out_of_date_cell_list = []
            up_to_date_cell_list = []
            for cell in row:
178
                if cell.getNode().isBroken():
179 180
                    # Remove a broken cell.
                    removed_cell_list.append(cell)
181
                elif cell.isFeeding():
182 183 184 185 186
                    if feeding_cell is None:
                        feeding_cell = cell
                    else:
                        # Remove an excessive feeding cell.
                        removed_cell_list.append(cell)
187
                elif cell.isOutOfDate():
188 189 190 191 192 193 194 195 196
                    out_of_date_cell_list.append(cell)
                else:
                    up_to_date_cell_list.append(cell)

            # If all cells are up-to-date, a feeding cell is not required.
            if len(out_of_date_cell_list) == 0 and feeding_cell is not None:
                removed_cell_list.append(feeding_cell)

            ideal_num = self.nr + 1
197 198
            while len(out_of_date_cell_list) + len(up_to_date_cell_list) > \
                    ideal_num:
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
                # This row contains too many cells.
                if len(up_to_date_cell_list) > 1:
                    # There are multiple up-to-date cells, so choose whatever
                    # used too much.
                    cell_list = out_of_date_cell_list + up_to_date_cell_list
                else:
                    # Drop an out-of-date cell.
                    cell_list = out_of_date_cell_list

                max_count = 0
                chosen_cell = None
                for cell in cell_list:
                    count = self.count_dict[cell.getNode()]
                    if max_count < count:
                        max_count = count
                        chosen_cell = cell
                removed_cell_list.append(chosen_cell)
                try:
                    out_of_date_cell_list.remove(chosen_cell)
                except ValueError:
                    up_to_date_cell_list.remove(chosen_cell)

            # Now remove cells really.
            for cell in removed_cell_list:
                row.remove(cell)
224
                if not cell.isFeeding():
225
                    self.count_dict[cell.getNode()] -= 1
226 227
                changed_cell_list.append((offset, cell.getUUID(), 
                    CellStates.DISCARDED))
228 229 230 231 232

        # Add cells, if a row contains less than the number of replicas.
        for offset, row in enumerate(self.partition_list):
            num_cells = 0
            for cell in row:
233
                if not cell.isFeeding():
234 235 236 237 238
                    num_cells += 1
            while num_cells <= self.nr:
                node = self.findLeastUsedNode([cell.getNode() for cell in row])
                if node is None:
                    break
239
                row.append(neo.pt.Cell(node, CellStates.OUT_OF_DATE))
240 241
                changed_cell_list.append((offset, node.getUUID(), 
                    CellStates.OUT_OF_DATE))
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
                self.count_dict[node] += 1
                num_cells += 1

        # FIXME still not enough. It is necessary to check if it is possible
        # to reduce differences between frequently used nodes and rarely used
        # nodes by replacing cells.

        self.log()
        return changed_cell_list

    def outdate(self):
        """Outdate all non-working nodes."""
        cell_list = []
        for offset, row in enumerate(self.partition_list):
            for cell in row:
257
                if not cell.getNode().isRunning() and not cell.isOutOfDate():
258
                    cell.setState(CellStates.OUT_OF_DATE)
259 260
                    cell_list.append((offset, cell.getUUID(), 
                        CellStates.OUT_OF_DATE))
261 262
        return cell_list