neoctl.py 5.86 KB
Newer Older
1
#
2
# Copyright (C) 2006-2015  Nexedi SA
3 4 5 6 7 8 9 10 11 12 13 14
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17 18 19
from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager
from neo.lib.protocol import ClusterStates, NodeStates, ErrorCodes, Packets
20
from neo.lib.node import NodeManager
21
from .handler import CommandEventHandler
22

23 24 25
class NotReadyException(Exception):
    pass

26 27 28 29 30
class NeoCTL(object):

    connection = None
    connected = False

Olivier Cros's avatar
Olivier Cros committed
31
    def __init__(self, address):
32 33
        self.nm = nm = NodeManager()
        self.server = nm.createAdmin(address=address)
34 35 36 37
        self.em = EventManager()
        self.handler = CommandEventHandler(self)
        self.response_queue = []

38 39
    def close(self):
        self.em.close()
40
        self.nm.close()
41 42
        del self.__dict__

43
    def __getConnection(self):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
44
        if not self.connected:
45
            self.connection = ClientConnection(self.em, self.handler,
46
                                               self.server)
47
            while not self.connected:
48
                self.em.poll(1)
49 50
                if self.connection is None:
                    raise NotReadyException('not connected')
51 52 53 54 55 56 57 58
        return self.connection

    def __ask(self, packet):
        # TODO: make thread-safe
        connection = self.__getConnection()
        connection.ask(packet)
        response_queue = self.response_queue
        assert len(response_queue) == 0
59 60 61 62 63 64
        while self.connected:
            self.em.poll(1)
            if response_queue:
                break
        else:
            raise NotReadyException, 'Connection closed'
65
        response = response_queue.pop()
66
        if response[0] == Packets.Error and \
67
           response[1] == ErrorCodes.NOT_READY:
68 69
            raise NotReadyException(response[2])
        return response
70

71
    def enableStorageList(self, uuid_list):
72 73 74
        """
          Put all given storage nodes in "running" state.
        """
75
        packet = Packets.AddPendingNodes(uuid_list)
76
        response = self.__ask(packet)
77 78
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
79
        return response[2]
80

81 82 83 84 85 86
    def tweakPartitionTable(self, uuid_list=()):
        response = self.__ask(Packets.TweakPartitionTable(uuid_list))
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
        return response[2]

87 88 89 90
    def setClusterState(self, state):
        """
          Set cluster state.
        """
91
        packet = Packets.SetClusterState(state)
92
        response = self.__ask(packet)
93 94
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
95
        return response[2]
96

97
    def _setNodeState(self, node, state):
98
        """
99
          Kill node, or remove it permanently
100
        """
101
        response = self.__ask(Packets.SetNodeState(node, state))
102 103
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
104
        return response[2]
105 106 107 108 109

    def getClusterState(self):
        """
          Get cluster state.
        """
110
        packet = Packets.AskClusterState()
111
        response = self.__ask(packet)
112 113
        if response[0] != Packets.AnswerClusterState:
            raise RuntimeError(response)
114 115
        return response[1]

116 117 118 119 120 121 122 123 124 125 126 127
    def getLastIds(self):
        response = self.__ask(Packets.AskLastIDs())
        if response[0] != Packets.AnswerLastIDs:
            raise RuntimeError(response)
        return response[1:]

    def getLastTransaction(self):
        response = self.__ask(Packets.AskLastTransaction())
        if response[0] != Packets.AnswerLastTransaction:
            raise RuntimeError(response)
        return response[1]

128 129 130 131
    def getNodeList(self, node_type=None):
        """
          Get a list of nodes, filtering with given type.
        """
132
        packet = Packets.AskNodeList(node_type)
133
        response = self.__ask(packet)
134 135
        if response[0] != Packets.AnswerNodeList:
            raise RuntimeError(response)
136
        return response[1] # node_list
137 138 139 140 141 142

    def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
        """
          Get a list of partition rows, bounded by min & max and involving
          given node.
        """
143
        packet = Packets.AskPartitionList(min_offset, max_offset, node)
144
        response = self.__ask(packet)
145 146
        if response[0] != Packets.AnswerPartitionList:
            raise RuntimeError(response)
147
        return response[1:3] # ptid, row_list
148 149 150 151 152

    def startCluster(self):
        """
          Set cluster into "verifying" state.
        """
153
        return self.setClusterState(ClusterStates.VERIFYING)
154

155 156 157
    def killNode(self, node):
        return self._setNodeState(node, NodeStates.UNKNOWN)

158
    def dropNode(self, node):
159
        return self._setNodeState(node, NodeStates.DOWN)
160

161
    def getPrimary(self):
162 163 164
        """
          Return the primary master UUID.
        """
165
        packet = Packets.AskPrimary()
166
        response = self.__ask(packet)
167 168
        if response[0] != Packets.AnswerPrimary:
            raise RuntimeError(response)
169 170
        return response[1]

171 172 173 174 175
    def checkReplicas(self, *args):
        response = self.__ask(Packets.CheckReplicas(*args))
        if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
            raise RuntimeError(response)
        return response[2]