app.py 10.5 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
2
# Copyright (C) 2006-2015  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
from operator import itemgetter
18
from .neoctl import NeoCTL, NotReadyException
19
from neo.lib.util import p64, u64, tidFromTime
20
from neo.lib.protocol import uuid_str, ClusterStates, NodeTypes, \
21
    UUID_NAMESPACES, ZERO_TID
Aurel's avatar
Aurel committed
22

23 24
action_dict = {
    'print': {
25
        'ids': 'getLastIds',
26 27 28
        'pt': 'getPartitionRowList',
        'node': 'getNodeList',
        'cluster': 'getClusterState',
29
        'primary': 'getPrimary',
30 31
    },
    'set': {
32
        'cluster': 'setClusterState',
33
    },
34
    'check': 'checkReplicas',
35
    'start': 'startCluster',
36
    'add': 'enableStorageList',
37
    'tweak': 'tweakPartitionTable',
38
    'drop': 'dropNode',
39
    'kill': 'killNode',
40 41
}

42 43
uuid_int = (lambda ns: lambda uuid:
    (ns[uuid[0]] << 24) + int(uuid[1:])
44
    )({str(k)[0]: v for k, v in UUID_NAMESPACES.iteritems()})
45

46
class TerminalNeoCTL(object):
Olivier Cros's avatar
Olivier Cros committed
47 48
    def __init__(self, address):
        self.neoctl = NeoCTL(address)
49

50 51 52
    def __del__(self):
        self.neoctl.close()

53 54
    # Utility methods (could be functions)
    def asNodeType(self, value):
55
        return getattr(NodeTypes, value.upper())
56 57

    def asClusterState(self, value):
58
        return getattr(ClusterStates, value.upper())
59

60
    def asTID(self, value):
61 62
        if '.' in value:
            return tidFromTime(float(value))
63 64
        return p64(int(value, 0))

65
    asNode = staticmethod(uuid_int)
66 67

    def formatRowList(self, row_list):
68
        return '\n'.join('%03d | %s' % (offset,
69
            ''.join('%s - %s |' % (uuid_str(uuid), state)
70 71
            for (uuid, state) in cell_list))
            for (offset, cell_list) in row_list)
72

73
    def formatNodeList(self, node_list, _sort_key=itemgetter(2, 0, 1)):
74 75
        if not node_list:
            return 'Empty list!'
76 77 78 79 80
        node_list.sort(key=_sort_key)
        return '\n'.join(
            '%s - %s - %s - %s' % (node_type, uuid_str(uuid),
                                   address and '%s:%s' % address, state)
            for node_type, address, uuid, state in node_list)
81 82

    # Actual actions
83 84 85 86 87 88 89 90 91 92 93
    def getLastIds(self, params):
        """
          Get last ids.
        """
        assert not params
        r = self.neoctl.getLastIds()
        if r[3]:
            return "last_tid = 0x%x" % u64(self.neoctl.getLastTransaction())
        return "last_oid = 0x%x\nlast_tid = 0x%x\nlast_ptid = %u" % (
            u64(r[0]), u64(r[1]), r[2])

94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
    def getPartitionRowList(self, params):
        """
          Get a list of partition rows, bounded by min & max and involving
          given node.
          Parameters: [min [max [node]]]
            min: offset of the first row to fetch (starts at 0)
            max: offset of the last row to fetch (0 for no limit)
            node: filters the list of nodes serving a line to this node
        """
        params = params + [0, 0, None][len(params):]
        min_offset, max_offset, node = params
        min_offset = int(min_offset)
        max_offset = int(max_offset)
        if node is not None:
            node = self.asNode(node)
109 110
        ptid, row_list = self.neoctl.getPartitionRowList(
                min_offset=min_offset, max_offset=max_offset, node=node)
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
        # TODO: return ptid
        return self.formatRowList(row_list)

    def getNodeList(self, params):
        """
          Get a list of nodes, filtering with given type.
          Parameters: [type]
            type: type of node to display
        """
        assert len(params) < 2
        if len(params):
            node_type = self.asNodeType(params[0])
        else:
            node_type = None
        node_list = self.neoctl.getNodeList(node_type=node_type)
        return self.formatNodeList(node_list)

    def getClusterState(self, params):
        """
          Get cluster state.
        """
        assert len(params) == 0
        return str(self.neoctl.getClusterState())

    def setClusterState(self, params):
        """
          Set cluster state.
          Parameters: state
            state: state to put the cluster in
        """
        assert len(params) == 1
142
        return self.neoctl.setClusterState(self.asClusterState(params[0]))
143 144 145 146 147

    def startCluster(self, params):
        """
          Starts cluster operation after a startup.
          Equivalent to:
148
            set cluster verifying
149 150
        """
        assert len(params) == 0
151
        return self.neoctl.startCluster()
152 153 154 155 156 157 158 159 160 161

    def enableStorageList(self, params):
        """
          Enable cluster to make use of pending storages.
          Parameters: all
                      node [node [...]]
            node: if "all", add all pending storage nodes.
                  otherwise, the list of storage nodes to enable.
        """
        if len(params) == 1 and params[0] == 'all':
162
            node_list = self.neoctl.getNodeList(NodeTypes.STORAGE)
163
            uuid_list = [node[2] for node in node_list]
164
        else:
165
            uuid_list = map(self.asNode, params)
166
        return self.neoctl.enableStorageList(uuid_list)
167

168 169 170 171 172 173 174 175
    def tweakPartitionTable(self, params):
        """
          Optimize partition table.
          No partitition will be assigned to specified storage nodes.
          Parameters: [node [...]]
        """
        return self.neoctl.tweakPartitionTable(map(self.asNode, params))

176 177 178 179 180 181 182
    def killNode(self, params):
        """
          Kill redundant nodes (either a storage or a secondary master).
          Parameters: node
        """
        return self.neoctl.killNode(self.asNode(*params))

183 184
    def dropNode(self, params):
        """
185
          Remove storage node permanently.
186 187
          Parameters: node
        """
188
        return self.neoctl.dropNode(self.asNode(*params))
189

190
    def getPrimary(self, params):
191 192 193
        """
          Get primary master node.
        """
194
        return uuid_str(self.neoctl.getPrimary())
195

196 197
    def checkReplicas(self, params):
        """
Julien Muchembled's avatar
Julien Muchembled committed
198
          Test whether partitions have corrupted metadata
199 200 201 202

          Any corrupted cell is put in CORRUPTED state, possibly make the
          cluster non operational.

203
          Parameters: [partition]:[reference] ... [min_tid [max_tid]]
204 205 206
            reference: node id of a storage with known good data
              If not given, and if the cluster is in backup mode, an upstream
              cell is automatically taken as reference.
207 208 209 210 211 212 213 214 215
        """
        partition_dict = {}
        params = iter(params)
        min_tid = ZERO_TID
        max_tid = None
        for p in params:
            try:
                partition, source = p.split(':')
            except ValueError:
216
                min_tid = self.asTID(p)
217
                try:
218
                    max_tid = self.asTID(params.next())
219 220 221
                except StopIteration:
                    pass
                break
222
            source = self.asNode(source) if source else None
223 224 225 226 227 228 229 230
            if partition:
                partition_dict[int(partition)] = source
            else:
                assert not partition_dict
                np = len(self.neoctl.getPartitionRowList()[1])
                partition_dict = dict.fromkeys(xrange(np), source)
        self.neoctl.checkReplicas(partition_dict, min_tid, max_tid)

Aurel's avatar
Aurel committed
231 232 233
class Application(object):
    """The storage node application."""

Olivier Cros's avatar
Olivier Cros committed
234 235
    def __init__(self, address):
        self.neoctl = TerminalNeoCTL(address)
236

237
    def execute(self, args):
Aurel's avatar
Aurel committed
238
        """Execute the command given."""
239
        # print node type : print list of node of the given type
240
        # (STORAGE_NODE_TYPE, MASTER_NODE_TYPE...)
241
        # set node uuid state [1|0] : set the node for the given uuid to the
242
        # state (RUNNING, DOWN...) and modify the partition if asked
243
        # set cluster name [shutdown|operational] : either shutdown the
244
        # cluster or mark it as operational
245
        current_action = action_dict
246
        level = 0
247 248 249 250 251
        while current_action is not None and \
              level < len(args) and \
              isinstance(current_action, dict):
            current_action = current_action.get(args[level])
            level += 1
252
        action = None
253 254 255
        if isinstance(current_action, basestring):
            action = getattr(self.neoctl, current_action, None)
        if action is None:
256 257 258 259 260
            return self.usage('unknown command')
        try:
            return action(args[level:])
        except NotReadyException, message:
            return 'ERROR: %s' % (message, )
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291

    def _usage(self, action_dict, level=0):
        result = []
        append = result.append
        sub_level = level + 1
        for name, action in action_dict.iteritems():
            append('%s%s' % ('  ' * level, name))
            if isinstance(action, dict):
                append(self._usage(action, level=sub_level))
            else:
                real_action = getattr(self.neoctl, action, None)
                if real_action is None:
                    continue
                docstring = getattr(real_action, '__doc__', None)
                if docstring is None:
                    docstring = '(no docstring)'
                docstring_line_list = docstring.split('\n')
                # Strip empty lines at begining & end of line list
                for end in (0, -1):
                    while len(docstring_line_list) \
                          and docstring_line_list[end] == '':
                        docstring_line_list.pop(end)
                # Get the indentation of first line, to preserve other lines
                # relative indentation.
                first_line = docstring_line_list[0]
                base_indentation = len(first_line) - len(first_line.lstrip())
                result.extend([('  ' * sub_level) + x[base_indentation:] \
                               for x in docstring_line_list])
        return '\n'.join(result)

    def usage(self, message):
292
        output_list = (message, 'Available commands:', self._usage(action_dict),
293 294 295
            "TID arguments can be either integers or timestamps as floats,"
            " e.g. '257684787499560686', '0x3937af2eeeeeeee' or '1325421296.'"
            " for 2012-01-01 12:34:56 UTC")
296
        return '\n'.join(output_list)
297