neoctl.py 7.57 KB
Newer Older
1
#
2
# Copyright (C) 2006-2019  Nexedi SA
3 4 5 6 7 8 9 10 11 12 13 14
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17 18 19
import argparse
from neo.lib import util
from neo.lib.app import BaseApplication, buildOptionParser
20
from neo.lib.connection import ClientConnection, ConnectionClosed
21
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
22
from .handler import CommandEventHandler
23

24 25 26
class NotReadyException(Exception):
    pass

27
@buildOptionParser
28
class NeoCTL(BaseApplication):
29 30 31 32

    connection = None
    connected = False

33 34 35 36 37 38 39 40 41 42 43 44
    @classmethod
    def _buildOptionParser(cls):
        # XXX: Use argparse sub-commands.
        parser = cls.option_parser
        parser.description = "NEO Control node"
        parser('a', 'address', default='127.0.0.1:9999',
            parse=lambda x: util.parseNodeAddress(x, 9999),
            help="address of an admin node")
        parser.argument('cmd', nargs=argparse.REMAINDER,
            help="command to execute; if not supplied,"
                 " the list of available commands is displayed")

Julien Muchembled's avatar
Julien Muchembled committed
45 46
    def __init__(self, address, **kw):
        super(NeoCTL, self).__init__(**kw)
47
        self.server = self.nm.createAdmin(address=address)
48 49 50 51
        self.handler = CommandEventHandler(self)
        self.response_queue = []

    def __getConnection(self):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
52
        if not self.connected:
53
            self.connection = ClientConnection(self, self.handler, self.server)
54 55
            # Never delay reconnection to master. This speeds up unit tests
            # and it should not change anything for normal use.
56 57 58 59
            try:
                self.connection.setReconnectionNoDelay()
            except ConnectionClosed:
                self.connection = None
60 61 62
            while not self.connected:
                if self.connection is None:
                    raise NotReadyException('not connected')
63
                self.em.poll(1)
64 65 66 67 68 69 70 71
        return self.connection

    def __ask(self, packet):
        # TODO: make thread-safe
        connection = self.__getConnection()
        connection.ask(packet)
        response_queue = self.response_queue
        assert len(response_queue) == 0
72 73 74 75 76 77
        while self.connected:
            self.em.poll(1)
            if response_queue:
                break
        else:
            raise NotReadyException, 'Connection closed'
78
        response = response_queue.pop()
79
        if response[0] == Packets.Error and \
80
           response[1] == ErrorCodes.NOT_READY:
81 82
            raise NotReadyException(response[2])
        return response
83

84
    def enableStorageList(self, uuid_list):
85 86 87
        """
          Put all given storage nodes in "running" state.
        """
88
        packet = Packets.AddPendingNodes(uuid_list)
89
        response = self.__ask(packet)
90 91
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
92
        return response[2]
93

94 95 96
    def tweakPartitionTable(self, uuid_list=(), dry_run=False):
        response = self.__ask(Packets.TweakPartitionTable(dry_run, uuid_list))
        if response[0] != Packets.AnswerTweakPartitionTable:
97
            raise RuntimeError(response)
98
        return response[1:]
99

100 101 102 103 104 105
    def setNumReplicas(self, nr):
        response = self.__ask(Packets.SetNumReplicas(nr))
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
        return response[2]

106 107 108 109
    def setClusterState(self, state):
        """
          Set cluster state.
        """
110
        packet = Packets.SetClusterState(state)
111
        response = self.__ask(packet)
112 113
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
114
        return response[2]
115

116
    def _setNodeState(self, node, state):
117
        """
118
          Kill node, or remove it permanently
119
        """
120
        response = self.__ask(Packets.SetNodeState(node, state))
121 122
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
123
        return response[2]
124 125 126 127 128

    def getClusterState(self):
        """
          Get cluster state.
        """
129
        packet = Packets.AskClusterState()
130
        response = self.__ask(packet)
131 132
        if response[0] != Packets.AnswerClusterState:
            raise RuntimeError(response)
133 134
        return response[1]

135 136 137 138 139 140 141 142 143 144 145 146
    def getLastIds(self):
        response = self.__ask(Packets.AskLastIDs())
        if response[0] != Packets.AnswerLastIDs:
            raise RuntimeError(response)
        return response[1:]

    def getLastTransaction(self):
        response = self.__ask(Packets.AskLastTransaction())
        if response[0] != Packets.AnswerLastTransaction:
            raise RuntimeError(response)
        return response[1]

147 148 149 150 151 152
    def getRecovery(self):
        response = self.__ask(Packets.AskRecovery())
        if response[0] != Packets.AnswerRecovery:
            raise RuntimeError(response)
        return response[1:]

153 154 155 156
    def getNodeList(self, node_type=None):
        """
          Get a list of nodes, filtering with given type.
        """
157
        packet = Packets.AskNodeList(node_type)
158
        response = self.__ask(packet)
159 160
        if response[0] != Packets.AnswerNodeList:
            raise RuntimeError(response)
161
        return response[1] # node_list
162 163 164 165 166 167

    def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
        """
          Get a list of partition rows, bounded by min & max and involving
          given node.
        """
168
        packet = Packets.AskPartitionList(min_offset, max_offset, node)
169
        response = self.__ask(packet)
170 171
        if response[0] != Packets.AnswerPartitionList:
            raise RuntimeError(response)
172
        return response[1:]
173 174 175 176 177

    def startCluster(self):
        """
          Set cluster into "verifying" state.
        """
178
        return self.setClusterState(ClusterStates.VERIFYING)
179

180
    def killNode(self, node):
181
        return self._setNodeState(node, NodeStates.DOWN)
182

183
    def dropNode(self, node):
184
        return self._setNodeState(node, NodeStates.UNKNOWN)
185

186
    def getPrimary(self):
187 188 189
        """
          Return the primary master UUID.
        """
190
        packet = Packets.AskPrimary()
191
        response = self.__ask(packet)
192 193
        if response[0] != Packets.AnswerPrimary:
            raise RuntimeError(response)
194 195
        return response[1]

196 197 198 199 200 201
    def repair(self, *args):
        response = self.__ask(Packets.Repair(*args))
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
        return response[2]

202 203 204 205 206 207
    def truncate(self, tid):
        response = self.__ask(Packets.Truncate(tid))
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
        return response[2]

208 209 210 211 212
    def checkReplicas(self, *args):
        response = self.__ask(Packets.CheckReplicas(*args))
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
        return response[2]
213 214 215 216 217 218

    def flushLog(self):
        conn = self.__getConnection()
        conn.send(Packets.FlushLog())
        while conn.pending():
            self.em.poll(1)