app.py 11.4 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
2
# Copyright (C) 2006-2017  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
from .neoctl import NeoCTL, NotReadyException
18
from neo.lib.util import p64, u64, tidFromTime, timeStringFromTID
19 20
from neo.lib.protocol import uuid_str, formatNodeList, \
    ClusterStates, NodeTypes, UUID_NAMESPACES, ZERO_TID
Aurel's avatar
Aurel committed
21

22 23
action_dict = {
    'print': {
24
        'ids': 'getLastIds',
25 26 27
        'pt': 'getPartitionRowList',
        'node': 'getNodeList',
        'cluster': 'getClusterState',
28
        'primary': 'getPrimary',
29 30
    },
    'set': {
31
        'cluster': 'setClusterState',
32
    },
33
    'check': 'checkReplicas',
34
    'start': 'startCluster',
35
    'add': 'enableStorageList',
36
    'tweak': 'tweakPartitionTable',
37
    'drop': 'dropNode',
38
    'kill': 'killNode',
39
    'prune_orphan': 'pruneOrphan',
40
    'truncate': 'truncate',
41 42
}

43 44
uuid_int = (lambda ns: lambda uuid:
    (ns[uuid[0]] << 24) + int(uuid[1:])
45
    )({str(k)[0]: v for k, v in UUID_NAMESPACES.iteritems()})
46

47
class TerminalNeoCTL(object):
Julien Muchembled's avatar
Julien Muchembled committed
48 49
    def __init__(self, *args, **kw):
        self.neoctl = NeoCTL(*args, **kw)
50

51 52 53
    def __del__(self):
        self.neoctl.close()

54 55
    # Utility methods (could be functions)
    def asNodeType(self, value):
56
        return getattr(NodeTypes, value.upper())
57 58

    def asClusterState(self, value):
59
        return getattr(ClusterStates, value.upper())
60

61
    def asTID(self, value):
62 63
        if '.' in value:
            return tidFromTime(float(value))
64 65
        return p64(int(value, 0))

66
    asNode = staticmethod(uuid_int)
67 68

    def formatRowList(self, row_list):
69
        return '\n'.join('%03d | %s' % (offset,
70
            ''.join('%s - %s |' % (uuid_str(uuid), state)
71 72
            for (uuid, state) in cell_list))
            for (offset, cell_list) in row_list)
73 74

    # Actual actions
75 76 77 78 79
    def getLastIds(self, params):
        """
          Get last ids.
        """
        assert not params
80 81 82
        ptid, backup_tid, truncate_tid = self.neoctl.getRecovery()
        if backup_tid:
            ltid = self.neoctl.getLastTransaction()
83 84
            r = "backup_tid = 0x%x (%s)" % (u64(backup_tid),
                                            timeStringFromTID(backup_tid))
85 86
        else:
            loid, ltid = self.neoctl.getLastIds()
87
            r = "last_oid = 0x%x" % (u64(loid))
88
        return r + "\nlast_tid = 0x%x (%s)\nlast_ptid = %s" % \
89
                                    (u64(ltid), timeStringFromTID(ltid), ptid)
90

91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    def getPartitionRowList(self, params):
        """
          Get a list of partition rows, bounded by min & max and involving
          given node.
          Parameters: [min [max [node]]]
            min: offset of the first row to fetch (starts at 0)
            max: offset of the last row to fetch (0 for no limit)
            node: filters the list of nodes serving a line to this node
        """
        params = params + [0, 0, None][len(params):]
        min_offset, max_offset, node = params
        min_offset = int(min_offset)
        max_offset = int(max_offset)
        if node is not None:
            node = self.asNode(node)
106 107
        ptid, row_list = self.neoctl.getPartitionRowList(
                min_offset=min_offset, max_offset=max_offset, node=node)
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
        # TODO: return ptid
        return self.formatRowList(row_list)

    def getNodeList(self, params):
        """
          Get a list of nodes, filtering with given type.
          Parameters: [type]
            type: type of node to display
        """
        assert len(params) < 2
        if len(params):
            node_type = self.asNodeType(params[0])
        else:
            node_type = None
        node_list = self.neoctl.getNodeList(node_type=node_type)
123
        return '\n'.join(formatNodeList(node_list)) or 'Empty list!'
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138

    def getClusterState(self, params):
        """
          Get cluster state.
        """
        assert len(params) == 0
        return str(self.neoctl.getClusterState())

    def setClusterState(self, params):
        """
          Set cluster state.
          Parameters: state
            state: state to put the cluster in
        """
        assert len(params) == 1
139
        return self.neoctl.setClusterState(self.asClusterState(params[0]))
140 141 142 143 144

    def startCluster(self, params):
        """
          Starts cluster operation after a startup.
          Equivalent to:
145
            set cluster verifying
146 147
        """
        assert len(params) == 0
148
        return self.neoctl.startCluster()
149

150 151 152 153 154 155
    def _getStorageList(self, params):
        if len(params) == 1 and params[0] == 'all':
            node_list = self.neoctl.getNodeList(NodeTypes.STORAGE)
            return [node[2] for node in node_list]
        return map(self.asNode, params)

156 157 158
    def enableStorageList(self, params):
        """
          Enable cluster to make use of pending storages.
159 160
          Parameters: node [node [...]]
            node: if "all", add all pending storage nodes,
161 162
                  otherwise, the list of storage nodes to enable.
        """
163
        return self.neoctl.enableStorageList(self._getStorageList(params))
164

165 166 167
    def tweakPartitionTable(self, params):
        """
          Optimize partition table.
168
          No partition will be assigned to specified storage nodes.
169 170 171 172
          Parameters: [node [...]]
        """
        return self.neoctl.tweakPartitionTable(map(self.asNode, params))

173 174 175 176 177 178 179
    def killNode(self, params):
        """
          Kill redundant nodes (either a storage or a secondary master).
          Parameters: node
        """
        return self.neoctl.killNode(self.asNode(*params))

180 181
    def dropNode(self, params):
        """
182
          Remove storage node permanently.
183 184
          Parameters: node
        """
185
        return self.neoctl.dropNode(self.asNode(*params))
186

187
    def getPrimary(self, params):
188 189 190
        """
          Get primary master node.
        """
191
        return uuid_str(self.neoctl.getPrimary())
192

193 194 195 196 197 198 199 200 201 202 203 204 205 206
    def pruneOrphan(self, params):
        """
          Fix database by deleting unreferenced raw data

          This can take a long time.

          Parameters: dry_run node [node [...]]
            dry_run: 0 or 1
            node: if "all", ask all connected storage nodes to repair,
                  otherwise, only the given list of storage nodes.
        """
        dry_run = "01".index(params.pop(0))
        return self.neoctl.repair(self._getStorageList(params), dry_run)

207 208 209 210 211 212 213 214 215 216 217 218 219
    def truncate(self, params):
        """
          Truncate the database at the given tid.

          The cluster must be in RUNNING state, without any pending transaction.
          This causes the cluster to go back in RECOVERING state, waiting all
          nodes to be pending (do not use 'start' command unless you're sure
          the missing nodes don't need to be truncated).

          Parameters: tid
        """
        self.neoctl.truncate(self.asTID(*params))

220 221
    def checkReplicas(self, params):
        """
Julien Muchembled's avatar
Julien Muchembled committed
222
          Test whether partitions have corrupted metadata
223 224 225 226

          Any corrupted cell is put in CORRUPTED state, possibly make the
          cluster non operational.

227
          Parameters: [partition]:[reference] ... [min_tid [max_tid]]
228 229 230
            reference: node id of a storage with known good data
              If not given, and if the cluster is in backup mode, an upstream
              cell is automatically taken as reference.
231 232 233 234 235 236 237 238 239
        """
        partition_dict = {}
        params = iter(params)
        min_tid = ZERO_TID
        max_tid = None
        for p in params:
            try:
                partition, source = p.split(':')
            except ValueError:
240
                min_tid = self.asTID(p)
241
                try:
242
                    max_tid = self.asTID(params.next())
243 244 245
                except StopIteration:
                    pass
                break
246
            source = self.asNode(source) if source else None
247 248 249 250 251 252 253 254
            if partition:
                partition_dict[int(partition)] = source
            else:
                assert not partition_dict
                np = len(self.neoctl.getPartitionRowList()[1])
                partition_dict = dict.fromkeys(xrange(np), source)
        self.neoctl.checkReplicas(partition_dict, min_tid, max_tid)

Aurel's avatar
Aurel committed
255 256 257
class Application(object):
    """The storage node application."""

Julien Muchembled's avatar
Julien Muchembled committed
258 259
    def __init__(self, *args, **kw):
        self.neoctl = TerminalNeoCTL(*args, **kw)
260

261
    def execute(self, args):
Aurel's avatar
Aurel committed
262
        """Execute the command given."""
263
        # print node type : print list of node of the given type
264
        # (STORAGE_NODE_TYPE, MASTER_NODE_TYPE...)
265
        # set node uuid state [1|0] : set the node for the given uuid to the
266
        # state (RUNNING, DOWN...) and modify the partition if asked
267
        # set cluster name [shutdown|operational] : either shutdown the
268
        # cluster or mark it as operational
269
        current_action = action_dict
270
        level = 0
271 272 273 274 275
        while current_action is not None and \
              level < len(args) and \
              isinstance(current_action, dict):
            current_action = current_action.get(args[level])
            level += 1
276
        action = None
277 278 279
        if isinstance(current_action, basestring):
            action = getattr(self.neoctl, current_action, None)
        if action is None:
280 281 282 283 284
            return self.usage('unknown command')
        try:
            return action(args[level:])
        except NotReadyException, message:
            return 'ERROR: %s' % (message, )
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301

    def _usage(self, action_dict, level=0):
        result = []
        append = result.append
        sub_level = level + 1
        for name, action in action_dict.iteritems():
            append('%s%s' % ('  ' * level, name))
            if isinstance(action, dict):
                append(self._usage(action, level=sub_level))
            else:
                real_action = getattr(self.neoctl, action, None)
                if real_action is None:
                    continue
                docstring = getattr(real_action, '__doc__', None)
                if docstring is None:
                    docstring = '(no docstring)'
                docstring_line_list = docstring.split('\n')
302
                # Strip empty lines at beginning & end of line list
303 304 305 306 307 308 309 310 311 312 313 314 315
                for end in (0, -1):
                    while len(docstring_line_list) \
                          and docstring_line_list[end] == '':
                        docstring_line_list.pop(end)
                # Get the indentation of first line, to preserve other lines
                # relative indentation.
                first_line = docstring_line_list[0]
                base_indentation = len(first_line) - len(first_line.lstrip())
                result.extend([('  ' * sub_level) + x[base_indentation:] \
                               for x in docstring_line_list])
        return '\n'.join(result)

    def usage(self, message):
316
        output_list = (message, 'Available commands:', self._usage(action_dict),
317 318 319
            "TID arguments can be either integers or timestamps as floats,"
            " e.g. '257684787499560686', '0x3937af2eeeeeeee' or '1325421296.'"
            " for 2012-01-01 12:34:56 UTC")
320
        return '\n'.join(output_list)