handler.py 7.21 KB
Newer Older
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2009-2010  Nexedi SA
Aurel's avatar
Aurel committed
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17

18
import neo
19

20 21 22 23 24
from neo.lib.handler import EventHandler
from neo.lib import protocol
from neo.lib.protocol import Packets, Errors
from neo.lib.exception import PrimaryFailure
from neo.lib.util import dump
25

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
def forward_ask(klass):
    def wrapper(self, conn, *args, **kw):
        app = self.app
        if app.master_conn is None:
            raise protocol.NotReadyError('Not connected to a primary master.')
        msg_id = app.master_conn.ask(klass(*args, **kw))
        app.dispatcher.register(msg_id, conn, {'msg_id': conn.getPeerId()})
    return wrapper

def forward_answer(klass):
    def wrapper(self, conn, *args, **kw):
        packet = klass(*args, **kw)
        self._answerNeoCTL(conn, packet)
    return wrapper

41
class AdminEventHandler(EventHandler):
Aurel's avatar
Aurel committed
42 43
    """This class deals with events for administrating cluster."""

44
    def askPartitionList(self, conn, min_offset, max_offset, uuid):
45
        neo.lib.logging.info("ask partition list from %s to %s for %s" %
46
                (min_offset, max_offset, dump(uuid)))
Aurel's avatar
Aurel committed
47
        app = self.app
48
        # check we have one pt otherwise ask it to PMN
49
        if app.pt is None:
50
            if self.app.master_conn is None:
51 52
                raise protocol.NotReadyError('Not connected to a primary ' \
                        'master.')
53
            msg_id = self.app.master_conn.ask(Packets.AskPartitionTable())
54 55 56 57
            app.dispatcher.register(msg_id, conn,
                                    {'min_offset' : min_offset,
                                     'max_offset' : max_offset,
                                     'uuid' : uuid,
58
                                     'msg_id' : conn.getPeerId()})
59
        else:
60
            app.sendPartitionTable(conn, min_offset, max_offset, uuid)
Aurel's avatar
Aurel committed
61

Aurel's avatar
Aurel committed
62

63
    def askNodeList(self, conn, node_type):
64 65 66 67 68 69
        if node_type is None:
            node_type = 'all'
            node_filter = None
        else:
            node_filter = lambda n: n.getType() is node_type
        neo.lib.logging.info("ask list of %s nodes", node_type)
70
        node_list = self.app.nm.getList(node_filter)
71
        node_information_list = [node.asTuple() for node in node_list ]
72
        p = Packets.AnswerNodeList(node_information_list)
73
        conn.answer(p)
Aurel's avatar
Aurel committed
74

75
    def setNodeState(self, conn, uuid, state, modify_partition_table):
76
        neo.lib.logging.info("set node state for %s-%s" %(dump(uuid), state))
77
        node = self.app.nm.getByUUID(uuid)
Aurel's avatar
Aurel committed
78
        if node is None:
79
            raise protocol.ProtocolError('invalid uuid')
80
        if node.getState() == state and modify_partition_table is False:
81
            # no change
82
            p = Errors.Ack('no change')
83
            conn.answer(p)
84
            return
85
        # forward to primary master node
86 87
        if self.app.master_conn is None:
            raise protocol.NotReadyError('Not connected to a primary master.')
88
        p = Packets.SetNodeState(uuid, state, modify_partition_table)
89
        msg_id = self.app.master_conn.ask(p)
90
        self.app.dispatcher.register(msg_id, conn, {'msg_id' : conn.getPeerId()})
Aurel's avatar
Aurel committed
91

92
    def askClusterState(self, conn):
93
        if self.app.cluster_state is None:
94
            if self.app.master_conn is None:
95 96
                raise protocol.NotReadyError('Not connected to a primary ' \
                        'master.')
97
            # required it from PMN first
98
            msg_id = self.app.master_conn.ask(Packets.AskClusterState())
99
            self.app.dispatcher.register(msg_id, conn,
100
                    {'msg_id' : conn.getPeerId()})
101
        else:
102
            conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
103

104
    def askPrimary(self, conn):
105 106
        if self.app.master_conn is None:
            raise protocol.NotReadyError('Not connected to a primary master.')
107
        master_node = self.app.master_node
108
        conn.answer(Packets.AnswerPrimary(master_node.getUUID(), []))
109

110 111 112 113
    addPendingNodes = forward_ask(Packets.AddPendingNodes)
    setClusterState = forward_ask(Packets.SetClusterState)


114
class MasterEventHandler(EventHandler):
115
    """ This class is just used to dispacth message to right handler"""
Aurel's avatar
Aurel committed
116

117
    def _connectionLost(self, conn):
118
        app = self.app
119 120 121 122 123 124
        if app.listening_conn: # if running
            assert app.master_conn in (conn, None)
            app.dispatcher.clear()
            app.reset()
            app.uuid = None
            raise PrimaryFailure
125 126 127

    def connectionFailed(self, conn):
        self._connectionLost(conn)
Aurel's avatar
Aurel committed
128

129
    def connectionClosed(self, conn):
130
        self._connectionLost(conn)
131

132
    def dispatch(self, conn, packet):
133 134
        if packet.isResponse() and \
           self.app.dispatcher.registered(packet.getId()):
135
            # expected answer
136 137
            self.app.request_handler.dispatch(conn, packet)
        else:
138 139
            # unexpectexd answers and notifications
            super(MasterEventHandler, self).dispatch(conn, packet)
140

141
    def answerNodeInformation(self, conn):
142 143
        # XXX: This will no more exists when the initialization module will be
        # implemented for factorize code (as done for bootstrap)
144
        neo.lib.logging.debug("answerNodeInformation")
145

146
    def notifyPartitionChanges(self, conn, ptid, cell_list):
147
        self.app.pt.update(ptid, cell_list, self.app.nm)
148

149 150
    def answerPartitionTable(self, conn, ptid, row_list):
        self.app.pt.load(ptid, row_list, self.app.nm)
151
        self.app.bootstrapped = True
152

153
    def sendPartitionTable(self, conn, ptid, row_list):
154 155
        if self.app.bootstrapped:
            self.app.pt.load(ptid, row_list, self.app.nm)
156

157
    def notifyClusterInformation(self, conn, cluster_state):
158 159
        self.app.cluster_state = cluster_state

160
    def notifyNodeInformation(self, conn, node_list):
161 162
        app = self.app
        app.nm.update(node_list)
163

164
class MasterRequestEventHandler(EventHandler):
165 166
    """ This class handle all answer from primary master node"""

167
    def _answerNeoCTL(self, conn, packet):
168
        msg_id = conn.getPeerId()
169
        client_conn, kw = self.app.dispatcher.pop(msg_id)
170
        client_conn.answer(packet)
171

172
    def answerClusterState(self, conn, state):
173
        neo.lib.logging.info("answerClusterState for a conn")
174
        self.app.cluster_state = state
175
        self._answerNeoCTL(conn, Packets.AnswerClusterState(state))
176

177
    def answerPartitionTable(self, conn, ptid, row_list):
178
        neo.lib.logging.info("answerPartitionTable for a conn")
179
        client_conn, kw = self.app.dispatcher.pop(conn.getPeerId())
180
        # sent client the partition table
181
        self.app.sendPartitionTable(client_conn)
Aurel's avatar
Aurel committed
182

183 184
    ack = forward_answer(Errors.Ack)
    protocolError = forward_answer(Errors.ProtocolError)