11.5 KB
# Copyright (C) 2006-2010  Nexedi SA
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

import math
from functools import wraps
import neo

from . import protocol
from .protocol import CellStates
from .util import dump, u64
from .locking import RLock

class PartitionTableException(Exception):
        Base class for partition table exceptions

class Cell(object):
    """This class represents a cell in a partition table."""

    def __init__(self, node, state = CellStates.UP_TO_DATE):
        self.node = node
        self.state = state

    def __repr__(self):
        return "<Cell(uuid=%s, address=%s, state=%s)>" % (

    def getState(self):
        return self.state

    def setState(self, state):
        assert state != CellStates.DISCARDED
        self.state = state

    def isUpToDate(self):
        return self.state == CellStates.UP_TO_DATE

    def isOutOfDate(self):
        return self.state == CellStates.OUT_OF_DATE

    def isFeeding(self):
        return self.state == CellStates.FEEDING

    def isCorrupted(self):
        return self.state == CellStates.CORRUPTED

    def isReadable(self):
        return self.state == CellStates.UP_TO_DATE or \
               self.state == CellStates.FEEDING

    def getNode(self):
        return self.node

    def getNodeState(self):
        """This is a short hand."""
        return self.node.getState()

    def getUUID(self):
        return self.node.getUUID()

    def getAddress(self):
        return self.node.getAddress()

class PartitionTable(object):
    """This class manages a partition table."""

    def __init__(self, num_partitions, num_replicas):
        self._id = None = num_partitions = num_replicas
        self.num_filled_rows = 0
        # Note: don't use [[]] * num_partition construct, as it duplicates
        # instance *references*, so the outer list contains really just one
        # inner list instance.
        self.partition_list = [[] for _ in xrange(num_partitions)]
        self.count_dict = {}

    def getID(self):
        return self._id

    def getPartitions(self):

    def getReplicas(self):

    def clear(self):
        """Forget an existing partition table."""
        self._id = None
        self.num_filled_rows = 0
        # Note: don't use [[]] * construct, as it duplicates
        # instance *references*, so the outer list contains really just one
        # inner list instance.
        self.partition_list = [[] for _ in xrange(]

    def getAssignedPartitionList(self, uuid):
        """ Return the partition assigned to the specified UUID """
        assigned_partitions = []
        for offset in xrange(
            for cell in self.getCellList(offset, readable=True):
                if cell.getUUID() == uuid:
        return assigned_partitions

    def hasOffset(self, offset):
            return len(self.partition_list[offset]) > 0
        except IndexError:
            return False

    def getNodeSet(self):
        return set(x.getNode() for row in self.partition_list for x in row)

    def getConnectedNodeList(self):
        return [node for node in self.getNodeSet() if node.isConnected()]

    def getNodeList(self):
        """Return all used nodes."""
        return [node for node, count in self.count_dict.iteritems() \
                if count > 0]

    def getCellList(self, offset, readable=False):
        if readable:
            return filter(Cell.isReadable, self.partition_list[offset])
        return list(self.partition_list[offset])

    def getPartition(self, oid_or_tid):
        return u64(oid_or_tid) % self.getPartitions()

    def getOutdatedOffsetListFor(self, uuid):
        return [
            offset for offset in xrange(
            for c in self.partition_list[offset]
            if c.getUUID() == uuid and c.getState() == CellStates.OUT_OF_DATE

    def isAssigned(self, oid, uuid):
        """ Check if the oid is assigned to the given node """
        for cell in self.partition_list[u64(oid) %]:
            if cell.getUUID() == uuid:
                return True
        return False

    def getCell(self, offset, uuid):
        for cell in self.partition_list[offset]:
            if cell.getUUID() == uuid:
                return cell

    def setCell(self, offset, node, state):
        if state == CellStates.DISCARDED:
            return self.removeCell(offset, node)
        if node.isBroken() or node.isDown():
            raise PartitionTableException('Invalid node state')

        self.count_dict.setdefault(node, 0)
        for cell in self.partition_list[offset]:
            if cell.getNode() is node:
                if not cell.isFeeding():
                    self.count_dict[node] -= 1
            row = self.partition_list[offset]
            self.num_filled_rows += not row
            row.append(Cell(node, state))
        if state != CellStates.FEEDING:
            self.count_dict[node] += 1
        return offset, node.getUUID(), state

    def removeCell(self, offset, node):
        row = self.partition_list[offset]
        for cell in row:
            if cell.getNode() == node:
                if not cell.isFeeding():
                    self.count_dict[node] -= 1
        return (offset, node.getUUID(), CellStates.DISCARDED)

    def load(self, ptid, row_list, nm):
        Load the partition table with the specified PTID, discard all previous
        self._id = ptid
        for offset, row in row_list:
            if offset >= self.getPartitions():
                raise IndexError
            for uuid, state in row:
                node = nm.getByUUID(uuid)
                # the node must be known by the node manager
                assert node is not None
                self.setCell(offset, node, state)
        neo.lib.logging.debug('partition table loaded (ptid=%s)', ptid)

    def update(self, ptid, cell_list, nm):
        Update the partition with the cell list supplied. Ignore those changes
        if the partition table ID is not greater than the current one. If a node
        is not known, it is created in the node manager and set as unavailable
        if ptid <= self._id:
            neo.lib.logging.warning('ignoring older partition changes')
        self._id = ptid
        for offset, uuid, state in cell_list:
            node = nm.getByUUID(uuid)
            assert node is not None, 'No node found for uuid %r' % (dump(uuid), )
            self.setCell(offset, node, state)
        neo.lib.logging.debug('partition table updated (ptid=%s)', ptid)

    def filled(self):
        return self.num_filled_rows ==

    def log(self):
        for line in self._format():

    def format(self):
        return '\n'.join(self._format())

    def _format(self):
        """Help debugging partition table management.

        Output sample:
          pt: node 0: 67ae354b4ed240a0594d042cf5c01b28, R
          pt: node 1: a68a01e8bf93e287bd505201c1405bc2, R
          pt: node 2: ad7ffe8ceef4468a0c776f3035c7a543, R
          pt: node 3: df57d7298678996705cd0092d84580f4, R
          pt: 00: .UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.
          pt: 11: U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U|.UU.|U..U

        Here, there are 4 nodes in RUNNING state.
        The first partition has 2 replicas in UP_TO_DATE state, on nodes 1 and
        2 (nodes 0 and 3 are displayed as unused for that partition by
        displaying a dot).
        The first number on the left represents the number of the first
        partition on the line (here, line length is 11 to keep the docstring
        width under 80 column).
        node_list = sorted(self.count_dict)
        result = ['pt: node %u: %s, %s' % (i, dump(node.getUUID()),
                  for i, node in enumerate(node_list)]
        append = result.append
        line = []
        max_line_len = 20 # XXX: hardcoded number of partitions per line
        cell_state_dict = protocol.cell_state_prefix_dict
        prefix = 0
        prefix_len = int(math.ceil(math.log10(
        for offset, row in enumerate(self.partition_list):
            if len(line) == max_line_len:
                append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
                line = []
                prefix = offset
            if row is None:
                line.append('X' * len(node_list))
                cell_dict = dict((x.getNode(), cell_state_dict[x.getState()])
                                 for x in row)
                line.append(''.join(cell_dict.get(x, '.') for x in node_list))
        if line:
            append('pt: %0*u: %s' % (prefix_len, prefix, '|'.join(line)))
        return result

    def operational(self):
        if not self.filled():
            return False
        for row in self.partition_list:
            for cell in row:
                if cell.isReadable() and cell.getNode().isRunning():
                return False
        return True

    def getRow(self, offset):
        row = self.partition_list[offset]
        if row is None:
            return []
        return [(cell.getUUID(), cell.getState()) for cell in row]

    def getRowList(self):
        getRow = self.getRow
        return [(x, getRow(x)) for x in xrange(]

def thread_safe(method):
    def wrapper(self, *args, **kwargs):
            return method(self, *args, **kwargs)
    return wraps(method)(wrapper)

class MTPartitionTable(PartitionTable):
    """ Thread-safe aware version of the partition table, override only methods
        used in the client """

    def __init__(self, *args, **kwargs):
        self._lock = RLock()
        PartitionTable.__init__(self, *args, **kwargs)

    def lock(self):

    def unlock(self):

    def setCell(self, *args, **kwargs):
        return PartitionTable.setCell(self, *args, **kwargs)

    def clear(self, *args, **kwargs):
        return PartitionTable.clear(self, *args, **kwargs)

    def operational(self, *args, **kwargs):
        return PartitionTable.operational(self, *args, **kwargs)

    def getNodeList(self, *args, **kwargs):
        return PartitionTable.getNodeList(self, *args, **kwargs)