recovery.py 5.66 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#
# Copyright (C) 2006-2010  Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

from struct import pack

20
import neo
21 22 23
from neo.lib.util import dump
from neo.lib.protocol import Packets, ProtocolError, ClusterStates, NodeStates
from neo.lib.protocol import NotReadyError, ZERO_OID, ZERO_TID
24
from .handlers import MasterHandler
25 26 27 28 29 30 31 32 33 34


class RecoveryManager(MasterHandler):
    """
      Manage the cluster recovery
    """

    def __init__(self, app):
        super(RecoveryManager, self).__init__(app)
        # The target node's uuid to request next.
35
        self.target_ptid = None
36 37 38 39 40 41 42 43

    def getHandler(self):
        return self

    def identifyStorageNode(self, uuid, node):
        """
            Returns the handler for storage nodes
        """
44
        return uuid, NodeStates.PENDING, self
45 46 47 48 49 50 51 52

    def run(self):
        """
        Recover the status about the cluster. Obtain the last OID, the last
        TID, and the last Partition Table ID from storage nodes, then get
        back the latest partition table or make a new table from scratch,
        if this is the first time.
        """
53
        neo.lib.logging.info('begin the recovery of the status')
54 55 56 57

        self.app.changeClusterState(ClusterStates.RECOVERING)
        em = self.app.em

58
        self.app.tm.setLastOID(None)
59 60 61
        self.app.pt.setID(None)

        # collect the last partition table available
62
        while 1:
63
            em.poll(1)
64 65 66 67 68 69 70 71 72 73
            if self.app._startup_allowed:
                allowed_node_set = set()
                for node in self.app.nm.getStorageList():
                    if node.isPending():
                        break # waiting for an answer
                    if node.isRunning():
                        allowed_node_set.add(node)
                else:
                    if allowed_node_set:
                        break # no ready storage node
74

75
        neo.lib.logging.info('startup allowed')
76 77

        if self.app.pt.getID() is None:
78 79 80 81 82 83
            neo.lib.logging.info('creating a new partition table')
            # reset IDs generators & build new partition with running nodes
            self.app.tm.setLastOID(ZERO_OID)
            self.app.pt.make(allowed_node_set)
            self._broadcastPartitionTable(self.app.pt.getID(),
                                          self.app.pt.getRowList())
84 85 86

        # collect node that are connected but not in the selected partition
        # table and set them in pending state
87 88 89 90 91 92
        refused_node_set = allowed_node_set.difference(
            self.app.pt.getNodeList())
        if refused_node_set:
            for node in refused_node_set:
                node.setPending()
            self.app.broadcastNodesInformation(refused_node_set)
93

94
        self.app.setLastTransaction(self.app.tm.getLastTID())
95 96 97
        neo.lib.logging.debug(
                        'cluster starts with loid=%s and this partition ' \
                        'table :', dump(self.app.tm.getLastOID()))
98 99 100 101 102 103 104 105
        self.app.pt.log()

    def connectionLost(self, conn, new_state):
        node = self.app.nm.getByUUID(conn.getUUID())
        assert node is not None
        if node.getState() == new_state:
            return
        node.setState(new_state)
106 107
        # broadcast to all so that admin nodes gets informed
        self.app.broadcastNodesInformation([node])
108 109

    def connectionCompleted(self, conn):
110 111
        # ask the last IDs to perform the recovery
        conn.ask(Packets.AskLastIDs())
112

Vincent Pelletier's avatar
Vincent Pelletier committed
113 114 115 116 117 118
    def _lastIDsCompleted(self, conn):
        node = self.app.nm.getByUUID(conn.getUUID())
        assert node.isPending()
        node.setRunning()
        self.app.broadcastNodesInformation([node])

119 120
    def answerLastIDs(self, conn, loid, ltid, lptid):
        # Get max values.
121
        if loid is not None:
122
            self.app.tm.setLastOID(max(loid, self.app.tm.getLastOID()))
123 124
        if ltid is not None:
            self.app.tm.setLastTID(ltid)
125
        if lptid > self.target_ptid:
126
            # something newer
127
            self.target_ptid = lptid
128
            conn.ask(Packets.AskPartitionTable())
129
        else:
Vincent Pelletier's avatar
Vincent Pelletier committed
130
            self._lastIDsCompleted(conn)
131 132

    def answerPartitionTable(self, conn, ptid, row_list):
133
        if ptid != self.target_ptid:
134
            # If this is not from a target node, ignore it.
135
            neo.lib.logging.warn('Got %s while waiting %s', dump(ptid),
136
                    dump(self.target_ptid))
137 138
        else:
            self._broadcastPartitionTable(ptid, row_list)
Vincent Pelletier's avatar
Vincent Pelletier committed
139
        self._lastIDsCompleted(conn)
140 141

    def _broadcastPartitionTable(self, ptid, row_list):
142
        try:
143
            new_nodes = self.app.pt.load(ptid, row_list, self.app.nm)
144 145
        except IndexError:
            raise ProtocolError('Invalid offset')
146
        else:
Grégory Wisniewski's avatar
Grégory Wisniewski committed
147
            notification = Packets.NotifyNodeInformation(new_nodes)
148 149 150
            ptid = self.app.pt.getID()
            row_list = self.app.pt.getRowList()
            partition_table = Packets.SendPartitionTable(ptid, row_list)
151 152
            # notify the admin nodes
            for node in self.app.nm.getAdminList(only_identified=True):
153
                node.notify(notification)
154
                node.notify(partition_table)