neoctl.py 6.39 KB
Newer Older
1
#
2
# Copyright (C) 2006-2016  Nexedi SA
3 4 5 6 7 8 9 10 11 12 13 14
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
from neo.lib.app import BaseApplication
18
from neo.lib.connection import ClientConnection, ConnectionClosed
19
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
20
from .handler import CommandEventHandler
21

22 23 24
class NotReadyException(Exception):
    pass

25
class NeoCTL(BaseApplication):
26 27 28 29

    connection = None
    connected = False

Julien Muchembled's avatar
Julien Muchembled committed
30 31
    def __init__(self, address, **kw):
        super(NeoCTL, self).__init__(**kw)
32
        self.server = self.nm.createAdmin(address=address)
33 34 35 36
        self.handler = CommandEventHandler(self)
        self.response_queue = []

    def __getConnection(self):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
37
        if not self.connected:
38
            self.connection = ClientConnection(self, self.handler, self.server)
39 40
            # Never delay reconnection to master. This speeds up unit tests
            # and it should not change anything for normal use.
41 42 43 44
            try:
                self.connection.setReconnectionNoDelay()
            except ConnectionClosed:
                self.connection = None
45 46 47
            while not self.connected:
                if self.connection is None:
                    raise NotReadyException('not connected')
48
                self.em.poll(1)
49 50 51 52 53 54 55 56
        return self.connection

    def __ask(self, packet):
        # TODO: make thread-safe
        connection = self.__getConnection()
        connection.ask(packet)
        response_queue = self.response_queue
        assert len(response_queue) == 0
57 58 59 60 61 62
        while self.connected:
            self.em.poll(1)
            if response_queue:
                break
        else:
            raise NotReadyException, 'Connection closed'
63
        response = response_queue.pop()
64
        if response[0] == Packets.Error and \
65
           response[1] == ErrorCodes.NOT_READY:
66 67
            raise NotReadyException(response[2])
        return response
68

69
    def enableStorageList(self, uuid_list):
70 71 72
        """
          Put all given storage nodes in "running" state.
        """
73
        packet = Packets.AddPendingNodes(uuid_list)
74
        response = self.__ask(packet)
75 76
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
77
        return response[2]
78

79 80 81 82 83 84
    def tweakPartitionTable(self, uuid_list=()):
        response = self.__ask(Packets.TweakPartitionTable(uuid_list))
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
        return response[2]

85 86 87 88
    def setClusterState(self, state):
        """
          Set cluster state.
        """
89
        packet = Packets.SetClusterState(state)
90
        response = self.__ask(packet)
91 92
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
93
        return response[2]
94

95
    def _setNodeState(self, node, state):
96
        """
97
          Kill node, or remove it permanently
98
        """
99
        response = self.__ask(Packets.SetNodeState(node, state))
100 101
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
102
        return response[2]
103 104 105 106 107

    def getClusterState(self):
        """
          Get cluster state.
        """
108
        packet = Packets.AskClusterState()
109
        response = self.__ask(packet)
110 111
        if response[0] != Packets.AnswerClusterState:
            raise RuntimeError(response)
112 113
        return response[1]

114 115 116 117 118 119 120 121 122 123 124 125
    def getLastIds(self):
        response = self.__ask(Packets.AskLastIDs())
        if response[0] != Packets.AnswerLastIDs:
            raise RuntimeError(response)
        return response[1:]

    def getLastTransaction(self):
        response = self.__ask(Packets.AskLastTransaction())
        if response[0] != Packets.AnswerLastTransaction:
            raise RuntimeError(response)
        return response[1]

126 127 128 129 130 131
    def getRecovery(self):
        response = self.__ask(Packets.AskRecovery())
        if response[0] != Packets.AnswerRecovery:
            raise RuntimeError(response)
        return response[1:]

132 133 134 135
    def getNodeList(self, node_type=None):
        """
          Get a list of nodes, filtering with given type.
        """
136
        packet = Packets.AskNodeList(node_type)
137
        response = self.__ask(packet)
138 139
        if response[0] != Packets.AnswerNodeList:
            raise RuntimeError(response)
140
        return response[1] # node_list
141 142 143 144 145 146

    def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
        """
          Get a list of partition rows, bounded by min & max and involving
          given node.
        """
147
        packet = Packets.AskPartitionList(min_offset, max_offset, node)
148
        response = self.__ask(packet)
149 150
        if response[0] != Packets.AnswerPartitionList:
            raise RuntimeError(response)
151
        return response[1:3] # ptid, row_list
152 153 154 155 156

    def startCluster(self):
        """
          Set cluster into "verifying" state.
        """
157
        return self.setClusterState(ClusterStates.VERIFYING)
158

159 160 161
    def killNode(self, node):
        return self._setNodeState(node, NodeStates.UNKNOWN)

162
    def dropNode(self, node):
163
        return self._setNodeState(node, NodeStates.DOWN)
164

165
    def getPrimary(self):
166 167 168
        """
          Return the primary master UUID.
        """
169
        packet = Packets.AskPrimary()
170
        response = self.__ask(packet)
171 172
        if response[0] != Packets.AnswerPrimary:
            raise RuntimeError(response)
173 174
        return response[1]

175 176 177 178 179 180
    def truncate(self, tid):
        response = self.__ask(Packets.Truncate(tid))
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
        return response[2]

181 182 183 184 185
    def checkReplicas(self, *args):
        response = self.__ask(Packets.CheckReplicas(*args))
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
        return response[2]