pt.py 11.1 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18
from neo import logging
19

20
from neo.protocol import UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE, \
21
        DISCARDED_STATE, RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \
22
        BROKEN_STATE, VALID_CELL_STATE_LIST, HIDDEN_STATE, PENDING_STATE
23
from neo.util import dump, u64
24 25
from neo.locking import RLock

26 27 28 29 30 31

class Cell(object):
    """This class represents a cell in a partition table."""

    def __init__(self, node, state = UP_TO_DATE_STATE):
        self.node = node
32
        assert state in VALID_CELL_STATE_LIST
33 34 35 36 37 38
        self.state = state

    def getState(self):
        return self.state

    def setState(self, state):
39
        assert state in VALID_CELL_STATE_LIST
40 41 42 43 44 45 46 47 48
        self.state = state

    def getNode(self):
        return self.node

    def getNodeState(self):
        """This is a short hand."""
        return self.node.getState()

49 50 51
    def getUUID(self):
        return self.node.getUUID()

Aurel's avatar
Aurel committed
52 53 54
    def getServer(self):
        return self.node.getServer()

55 56 57 58
class PartitionTable(object):
    """This class manages a partition table."""

    def __init__(self, num_partitions, num_replicas):
59
        self.id = None
60 61 62
        self.np = num_partitions
        self.nr = num_replicas
        self.num_filled_rows = 0
63 64 65 66
        # Note: don't use [[]] * num_partition construct, as it duplicates
        # instance *references*, so the outer list contains really just one
        # inner list instance.
        self.partition_list = [[] for x in xrange(num_partitions)]
67
        self.count_dict = {}
68

69 70 71
    def getID(self):
        return self.id

72
    def getPartitions(self):
73
        return self.np
74 75

    def getReplicas(self):
76
        return self.nr
77

78 79 80
    def clear(self):
        """Forget an existing partition table."""
        self.num_filled_rows = 0
81 82 83 84
        # Note: don't use [[]] * self.np construct, as it duplicates
        # instance *references*, so the outer list contains really just one
        # inner list instance.
        self.partition_list = [[] for x in xrange(self.np)]
85
        self.count_dict.clear()
86

87 88 89 90 91 92
    def hasOffset(self, offset):
        try:
            return len(self.partition_list[offset]) > 0
        except IndexError:
            return False

93 94 95 96 97 98 99 100
    def getNodeList(self):
        """Return all used nodes."""
        node_list = []
        for node, count in self.count_dict.iteritems():
            if count > 0:
                node_list.append(node)
        return node_list

101 102 103 104 105 106 107 108 109 110
    def getCellList(self, offset, readable=False, writable=False):
        # allow all cell states
        state_set = set(VALID_CELL_STATE_LIST)
        if readable or writable:
            # except non readables
            state_set.remove(DISCARDED_STATE)
        if readable:
            # except non writables
            state_set.remove(OUT_OF_DATE_STATE)
        allowed_states = tuple(state_set)
Aurel's avatar
Aurel committed
111
        try:
112 113 114
            return [cell for cell in self.partition_list[offset] \
                    if cell is not None and cell.getState() in allowed_states]
        except (TypeError, KeyError):
Aurel's avatar
Aurel committed
115
            return []
116

117 118 119 120 121 122 123 124 125
    def getCellListForTID(self, tid, readable=False, writable=False):
        return self.getCellList(self._getPartitionFromIndex(u64(tid)),
                                readable, writable)

    def getCellListForOID(self, oid, readable=False, writable=False):
        return self.getCellList(self._getPartitionFromIndex(u64(oid)),
                                readable, writable)

    def _getPartitionFromIndex(self, index):
126
        return index % self.np
127

128
    def setCell(self, offset, node, state):
129
        assert state in VALID_CELL_STATE_LIST
130 131
        if state == DISCARDED_STATE:
            return self.removeCell(offset, node)
132 133 134
        if node.getState() in (BROKEN_STATE, DOWN_STATE):
            return

135
        row = self.partition_list[offset]
136
        if len(row) == 0:
137
            # Create a new row.
138
            row = [Cell(node, state), ]
139
            if state != FEEDING_STATE:
140
                self.count_dict[node] = self.count_dict.get(node, 0) + 1
141
            self.partition_list[offset] = row
142 143 144 145 146 147 148 149

            self.num_filled_rows += 1
        else:
            # XXX this can be slow, but it is necessary to remove a duplicate,
            # if any.
            for cell in row:
                if cell.getNode() == node:
                    row.remove(cell)
150
                    if cell.getState() != FEEDING_STATE:
151
                        self.count_dict[node] = self.count_dict.get(node, 0) - 1
152 153
                    break
            row.append(Cell(node, state))
154
            if state != FEEDING_STATE:
155
                self.count_dict[node] = self.count_dict.get(node, 0) + 1
156

157 158 159 160 161 162
    def removeCell(self, offset, node):
        row = self.partition_list[offset]
        if row is not None:
            for cell in row:
                if cell.getNode() == node:
                    row.remove(cell)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
163
                    if cell.getState() != FEEDING_STATE:
164 165 166
                        self.count_dict[node] = self.count_dict.get(node, 0) - 1
                    break

167 168 169 170 171
    # XXX: node manager is given here just to verify that any node in the
    # partition table is known, this will be removed when checked.
    def update(self, cell_list, nm):
        for offset, uuid, state in cell_list:
            node = nm.getNodeByUUID(uuid) 
172 173 174 175 176 177
            if node is None:
                logging.warning('Updating partition table with an unknown UUID : %s', 
                        dump(uuid))
                from neo.node import StorageNode
                node = StorageNode(uuid=uuid)
                nm.add(node)
178 179 180 181
            self.setCell(offset, node, state)
        logging.debug('partition table updated')
        self.log()

182 183 184
    def filled(self):
        return self.num_filled_rows == self.np

185
    def log(self):
186 187 188 189 190 191 192 193 194 195 196 197
        """Help debugging partition table management.

        Output sample:
        DEBUG:root:pt: node 0: ad7ffe8ceef4468a0c776f3035c7a543, R
        DEBUG:root:pt: node 1: a68a01e8bf93e287bd505201c1405bc2, R
        DEBUG:root:pt: node 2: 67ae354b4ed240a0594d042cf5c01b28, R
        DEBUG:root:pt: node 3: df57d7298678996705cd0092d84580f4, R
        DEBUG:root:pt: 00000000: .UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.
        DEBUG:root:pt: 00000009: U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U

        Here, there are 4 nodes in RUNNING_STATE.
        The first partition has 2 replicas in UP_TO_DATE_STATE, on nodes 1 and
Aurel's avatar
Aurel committed
198
        2 (nodes 0 and 3 are displayed as unused for that partition by
199 200 201 202 203
        displaying a dot).
        The 8-digits number on the left represents the number of the first
        partition on the line (here, line length is 9 to keep the docstring
        width under 80 column).
        """
204 205 206
        node_state_dict = { RUNNING_STATE: 'R',
                            TEMPORARILY_DOWN_STATE: 'T',
                            DOWN_STATE: 'D',
207
                            BROKEN_STATE: 'B',
208 209
                            HIDDEN_STATE: 'H',
                            PENDING_STATE: 'P'}
210 211
        cell_state_dict = { UP_TO_DATE_STATE: 'U', 
                            OUT_OF_DATE_STATE: 'O', 
212 213
                            FEEDING_STATE: 'F',
                            DISCARDED_STATE: 'D'}
214 215 216 217 218 219 220 221 222 223
        node_list = self.count_dict.keys()
        node_list.sort()
        node_dict = {}
        for i, node in enumerate(node_list):
            node_dict[node] = i
        for node, i in node_dict.iteritems():
            logging.debug('pt: node %d: %s, %s', i, dump(node.getUUID()),
                          node_state_dict[node.getState()])
        line = []
        max_line_len = 20 # XXX: hardcoded number of partitions per line
224
        for offset, row in enumerate(self.partition_list):
225 226 227 228
            if len(line) == max_line_len:
                logging.debug('pt: %08d: %s', offset - max_line_len,
                              '|'.join(line))
                line = []
229
            if row is None:
230
                line.append('X' * len(node_list))
231
            else:
232 233 234 235 236 237 238 239 240
                cell = []
                cell_dict = dict([(node_dict[x.getNode()], x) for x in row])
                for node in xrange(len(node_list)):
                    if node in cell_dict:
                        cell.append(cell_state_dict[cell_dict[node].getState()])
                    else:
                        cell.append('.')
                line.append(''.join(cell))
        if len(line):
241
            logging.debug('pt: %08d: %s', offset - len(line) + 1,
242 243
                          '|'.join(line))

244

245
    def operational(self):        
246 247 248 249 250 251 252 253
        if not self.filled():
            return False

        # FIXME it is better to optimize this code, as this could be extremely
        # slow. The possible fix is to have a handler to notify a change on
        # a node state, and record which rows are ready.
        for row in self.partition_list:
            for cell in row:
254
                if cell.getState() in (UP_TO_DATE_STATE, FEEDING_STATE) \
255 256 257 258 259 260
                        and cell.getNodeState() == RUNNING_STATE:
                    break
            else:
                return False

        return True
261

262 263 264
    def getRow(self, offset):
        row = self.partition_list[offset]
        if row is None:
265
            return []
266 267
        return [(cell.getUUID(), cell.getState()) for cell in row]

268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293

def thread_safe(method):
    def wrapper(self, *args, **kwargs):
        self.lock()
        try:
            return method(self, *args, **kwargs)
        finally:
            self.unlock()
    return wrapper


class MTPartitionTable(PartitionTable):
    """ Thread-safe aware version of the partition table, override only methods
        used in the client """

    def __init__(self, *args, **kwargs):
        self._lock = RLock()
        PartitionTable.__init__(self, *args, **kwargs)

    def lock(self):
        self._lock.acquire()

    def unlock(self):
        self._lock.release()

    @thread_safe
294 295 296 297 298 299
    def getCellListForTID(self, *args, **kwargs):
        return PartitionTable.getCellListForTID(self, *args, **kwargs)

    @thread_safe
    def getCellListForOID(self, *args, **kwargs):
        return PartitionTable.getCellListForOID(self, *args, **kwargs)
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316

    @thread_safe
    def setCell(self, *args, **kwargs):
        return PartitionTable.setCell(self, *args, **kwargs)

    @thread_safe
    def clear(self, *args, **kwargs):
        return PartitionTable.clear(self, *args, **kwargs)

    @thread_safe
    def operational(self, *args, **kwargs):
        return PartitionTable.operational(self, *args, **kwargs)

    @thread_safe
    def getNodeList(self, *args, **kwargs):
        return PartitionTable.getNodeList(self, *args, **kwargs)