app.py 10.2 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
2
# Copyright (C) 2006-2012  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
from operator import itemgetter
18
from .neoctl import NeoCTL, NotReadyException
19 20
from neo.lib.util import bin, p64
from neo.lib.protocol import uuid_str, ClusterStates, NodeStates, NodeTypes, \
21
    UUID_NAMESPACES, ZERO_TID
Aurel's avatar
Aurel committed
22

23 24
action_dict = {
    'print': {
25 26 27
        'pt': 'getPartitionRowList',
        'node': 'getNodeList',
        'cluster': 'getClusterState',
28
        'primary': 'getPrimary',
29 30
    },
    'set': {
31 32
        'node': 'setNodeState',
        'cluster': 'setClusterState',
33
    },
34
    'check': 'checkReplicas',
35
    'start': 'startCluster',
36
    'add': 'enableStorageList',
37
    'tweak': 'tweakPartitionTable',
38
    'drop': 'dropNode',
39 40
}

41 42 43 44
uuid_int = (lambda ns: lambda uuid:
    (ns[uuid[0]] << 24) + int(uuid[1:])
    )(dict((str(k)[0], v) for k, v in UUID_NAMESPACES.iteritems()))

45
class TerminalNeoCTL(object):
Olivier Cros's avatar
Olivier Cros committed
46 47
    def __init__(self, address):
        self.neoctl = NeoCTL(address)
48

49 50 51
    def __del__(self):
        self.neoctl.close()

52 53
    # Utility methods (could be functions)
    def asNodeState(self, value):
54
        return getattr(NodeStates, value.upper())
55 56

    def asNodeType(self, value):
57
        return getattr(NodeTypes, value.upper())
58 59

    def asClusterState(self, value):
60
        return getattr(ClusterStates, value.upper())
61

62
    asNode = staticmethod(uuid_int)
63 64

    def formatRowList(self, row_list):
65
        return '\n'.join('%03d | %s' % (offset,
66
            ''.join('%s - %s |' % (uuid_str(uuid), state)
67 68
            for (uuid, state) in cell_list))
            for (offset, cell_list) in row_list)
69

70
    def formatNodeList(self, node_list, _sort_key=itemgetter(2, 0, 1)):
71 72
        if not node_list:
            return 'Empty list!'
73 74 75 76 77
        node_list.sort(key=_sort_key)
        return '\n'.join(
            '%s - %s - %s - %s' % (node_type, uuid_str(uuid),
                                   address and '%s:%s' % address, state)
            for node_type, address, uuid, state in node_list)
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94

    # Actual actions
    def getPartitionRowList(self, params):
        """
          Get a list of partition rows, bounded by min & max and involving
          given node.
          Parameters: [min [max [node]]]
            min: offset of the first row to fetch (starts at 0)
            max: offset of the last row to fetch (0 for no limit)
            node: filters the list of nodes serving a line to this node
        """
        params = params + [0, 0, None][len(params):]
        min_offset, max_offset, node = params
        min_offset = int(min_offset)
        max_offset = int(max_offset)
        if node is not None:
            node = self.asNode(node)
95 96
        ptid, row_list = self.neoctl.getPartitionRowList(
                min_offset=min_offset, max_offset=max_offset, node=node)
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
        # TODO: return ptid
        return self.formatRowList(row_list)

    def getNodeList(self, params):
        """
          Get a list of nodes, filtering with given type.
          Parameters: [type]
            type: type of node to display
        """
        assert len(params) < 2
        if len(params):
            node_type = self.asNodeType(params[0])
        else:
            node_type = None
        node_list = self.neoctl.getNodeList(node_type=node_type)
        return self.formatNodeList(node_list)

    def getClusterState(self, params):
        """
          Get cluster state.
        """
        assert len(params) == 0
        return str(self.neoctl.getClusterState())

    def setNodeState(self, params):
        """
          Set node state, and allow (or not) updating partition table.
          Parameters: node state [update]
            node: node to modify
            state: state to put the node in
            update: disallow (0, default) or allow (other integer) partition
                    table to be updated
        """
        assert len(params) in (2, 3)
        node = self.asNode(params[0])
        state = self.asNodeState(params[1])
        if len(params) == 3:
134
            update_partition_table = bool(int(params[2]))
135 136
        else:
            update_partition_table = False
137
        return self.neoctl.setNodeState(node, state,
138 139 140 141 142 143 144 145 146
            update_partition_table=update_partition_table)

    def setClusterState(self, params):
        """
          Set cluster state.
          Parameters: state
            state: state to put the cluster in
        """
        assert len(params) == 1
147
        return self.neoctl.setClusterState(self.asClusterState(params[0]))
148 149 150 151 152

    def startCluster(self, params):
        """
          Starts cluster operation after a startup.
          Equivalent to:
153
            set cluster verifying
154 155
        """
        assert len(params) == 0
156
        return self.neoctl.startCluster()
157 158 159 160 161 162 163 164 165 166

    def enableStorageList(self, params):
        """
          Enable cluster to make use of pending storages.
          Parameters: all
                      node [node [...]]
            node: if "all", add all pending storage nodes.
                  otherwise, the list of storage nodes to enable.
        """
        if len(params) == 1 and params[0] == 'all':
167
            node_list = self.neoctl.getNodeList(NodeTypes.STORAGE)
168
            uuid_list = [node[2] for node in node_list]
169
        else:
170
            uuid_list = map(self.asNode, params)
171
        return self.neoctl.enableStorageList(uuid_list)
172

173 174 175 176 177 178 179 180
    def tweakPartitionTable(self, params):
        """
          Optimize partition table.
          No partitition will be assigned to specified storage nodes.
          Parameters: [node [...]]
        """
        return self.neoctl.tweakPartitionTable(map(self.asNode, params))

181 182 183 184 185 186 187 188 189
    def dropNode(self, params):
        """
          Set node into DOWN state.
          Parameters: node
            node: node the pu into DOWN state
          Equivalent to:
            set node state (node) DOWN
        """
        assert len(params) == 1
190
        return self.neoctl.dropNode(self.asNode(params[0]))
191

192
    def getPrimary(self, params):
193 194 195
        """
          Get primary master node.
        """
196
        return uuid_str(self.neoctl.getPrimary())
197

198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
    def checkReplicas(self, params):
        """
          Parameters: [partition]:[reference] ... [min_tid [max_tid]]
        """
        partition_dict = {}
        params = iter(params)
        min_tid = ZERO_TID
        max_tid = None
        for p in params:
            try:
                partition, source = p.split(':')
            except ValueError:
                min_tid = p64(p)
                try:
                    max_tid = p64(params.next())
                except StopIteration:
                    pass
                break
216
            source = self.asNode(source) if source else None
217 218 219 220 221 222 223 224
            if partition:
                partition_dict[int(partition)] = source
            else:
                assert not partition_dict
                np = len(self.neoctl.getPartitionRowList()[1])
                partition_dict = dict.fromkeys(xrange(np), source)
        self.neoctl.checkReplicas(partition_dict, min_tid, max_tid)

Aurel's avatar
Aurel committed
225 226 227
class Application(object):
    """The storage node application."""

Olivier Cros's avatar
Olivier Cros committed
228 229
    def __init__(self, address):
        self.neoctl = TerminalNeoCTL(address)
230

231
    def execute(self, args):
Aurel's avatar
Aurel committed
232
        """Execute the command given."""
233
        # print node type : print list of node of the given type
234
        # (STORAGE_NODE_TYPE, MASTER_NODE_TYPE...)
235
        # set node uuid state [1|0] : set the node for the given uuid to the
236
        # state (RUNNING, DOWN...) and modify the partition if asked
237
        # set cluster name [shutdown|operational] : either shutdown the
238
        # cluster or mark it as operational
239
        current_action = action_dict
240
        level = 0
241 242 243 244 245
        while current_action is not None and \
              level < len(args) and \
              isinstance(current_action, dict):
            current_action = current_action.get(args[level])
            level += 1
246
        action = None
247 248 249
        if isinstance(current_action, basestring):
            action = getattr(self.neoctl, current_action, None)
        if action is None:
250 251 252 253 254
            return self.usage('unknown command')
        try:
            return action(args[level:])
        except NotReadyException, message:
            return 'ERROR: %s' % (message, )
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287

    def _usage(self, action_dict, level=0):
        result = []
        append = result.append
        sub_level = level + 1
        for name, action in action_dict.iteritems():
            append('%s%s' % ('  ' * level, name))
            if isinstance(action, dict):
                append(self._usage(action, level=sub_level))
            else:
                real_action = getattr(self.neoctl, action, None)
                if real_action is None:
                    continue
                docstring = getattr(real_action, '__doc__', None)
                if docstring is None:
                    docstring = '(no docstring)'
                docstring_line_list = docstring.split('\n')
                # Strip empty lines at begining & end of line list
                for end in (0, -1):
                    while len(docstring_line_list) \
                          and docstring_line_list[end] == '':
                        docstring_line_list.pop(end)
                # Get the indentation of first line, to preserve other lines
                # relative indentation.
                first_line = docstring_line_list[0]
                base_indentation = len(first_line) - len(first_line.lstrip())
                result.extend([('  ' * sub_level) + x[base_indentation:] \
                               for x in docstring_line_list])
        return '\n'.join(result)

    def usage(self, message):
        output_list = [message, 'Available commands:', self._usage(action_dict)]
        return '\n'.join(output_list)
288