handler.py 8.69 KB
Newer Older
1 2 3
import logging

from neo.handler import EventHandler
4 5
from neo.protocol import Packet, \
        INVALID_UUID, RUNNING_STATE, BROKEN_STATE, \
6
        MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE
Yoshinori Okuji's avatar
Yoshinori Okuji committed
7
from neo.util import dump
8
from neo.node import MasterNode, StorageNode, ClientNode
Yoshinori Okuji's avatar
Yoshinori Okuji committed
9
from neo.connection import ClientConnection
10
from neo.exception import PrimaryFailure
11 12 13 14 15 16 17

class StorageEventHandler(EventHandler):
    """This class implements a generic part of the event handlers."""
    def __init__(self, app):
        self.app = app
        EventHandler.__init__(self)

18 19 20
    def dealWithClientFailure(self, uuid):
        pass

21 22 23 24 25
    def handleRequestNodeIdentification(self, conn, packet, node_type,
                                        uuid, ip_address, port, name):
        raise NotImplementedError('this method must be overridden')

    def handleAcceptNodeIdentification(self, conn, packet, node_type,
Yoshinori Okuji's avatar
Yoshinori Okuji committed
26 27
                                       uuid, ip_address, port,
                                       num_partitions, num_replicas):
28 29 30 31 32
        raise NotImplementedError('this method must be overridden')

    def handleAskPrimaryMaster(self, conn, packet):
        """This should not be used in reality, because I am not a master
        node. But? If someone likes to ask me, I can help."""
33
        logging.info('asked a primary master node')
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
        app = self.app

        if app.primary_master_node is not None:
            primary_uuid = app.primary_master_node.getUUID()
        else:
            primary_uuid = INVALID_UUID

        known_master_list = []
        for n in app.nm.getMasterNodeList():
            if n.getState() == BROKEN_STATE:
                continue
            info = n.getServer() + (n.getUUID() or INVALID_UUID,)
            known_master_list.append(info)

        p = Packet()
        p.answerPrimaryMaster(packet.getId(), primary_uuid, known_master_list)
        conn.addPacket(p)

    def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
                                  known_master_list):
        raise NotImplementedError('this method must be overridden')

    def handleAnnouncePrimaryMaster(self, conn, packet):
        """Theoretically speaking, I should not get this message,
        because the primary master election must happen when I am
        not connected to any master node."""
        uuid = conn.getUUID()
        if uuid is None:
            self.handleUnexpectedPacket(conn, packet)
            return

        app = self.app
        node = app.nm.getNodeByUUID(uuid)
        if node is None:
            raise RuntimeError('I do not know the uuid %r' % dump(uuid))

        if not isinstance(node, MasterNode):
            self.handleUnexpectedPacket(conn, packet)
            return

        if app.primary_master_node is None:
            # Hmm... I am somehow connected to the primary master already.
            app.primary_master_node = node
            if not isinstance(conn, ClientConnection):
                # I do not want a connection from any master node. I rather
                # want to connect from myself.
                conn.close()
        elif app.primary_master_node.getUUID() == uuid:
            # Yes, I know you are the primary master node.
            pass
        else:
            # It seems that someone else claims taking over the primary
            # master node...
            app.primary_master_node = None
            raise PrimaryFailure('another master node wants to take over')

    def handleReelectPrimaryMaster(self, conn, packet):
        raise PrimaryFailure('re-election occurs')

    def handleNotifyNodeInformation(self, conn, packet, node_list):
        """Store information on nodes, only if this is sent by a primary
        master node."""
        # XXX it might be better to implement this callback in each handler.
        uuid = conn.getUUID()
        if uuid is None:
            self.handleUnexpectedPacket(conn, packet)
            return

Yoshinori Okuji's avatar
Yoshinori Okuji committed
102
        logging.debug('handleNotifyNodeInformation: node_list = %r', node_list)
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
        app = self.app
        node = app.nm.getNodeByUUID(uuid)
        if not isinstance(node, MasterNode) \
                or app.primary_master_node is None \
                or app.primary_master_node.getUUID() != uuid:
            return

        for node_type, ip_address, port, uuid, state in node_list:
            addr = (ip_address, port)

            if node_type == MASTER_NODE_TYPE:
                n = app.nm.getNodeByServer(addr)
                if n is None:
                    n = MasterNode(server = addr)
                    app.nm.add(n)

                n.setState(state)
                if uuid != INVALID_UUID:
                    if n.getUUID() is None:
                        n.setUUID(uuid)

            elif node_type == STORAGE_NODE_TYPE:
                if uuid == INVALID_UUID:
                    # No interest.
                    continue

                n = app.nm.getNodeByUUID(uuid)
                if n is None:
                    n = StorageNode(server = addr, uuid = uuid)
                    app.nm.add(n)
                else:
                    n.setServer(addr)

                n.setState(state)

            elif node_type == CLIENT_NODE_TYPE:
                if uuid == INVALID_UUID:
                    # No interest.
                    continue

                if state == RUNNING_STATE:
                    n = app.nm.getNodeByUUID(uuid)
                    if n is None:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
146
                        logging.debug('adding client node %s', dump(uuid))
147 148
                        n = ClientNode(uuid = uuid)
                        app.nm.add(n)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
149
                        assert app.nm.getNodeByUUID(uuid) is n
150
                else:
151
                    self.dealWithClientFailure(uuid)
152 153
                    n = app.nm.getNodeByUUID(uuid)
                    if n is not None:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
154
                        logging.debug('removing client node %s', dump(uuid))
155 156 157 158 159 160 161 162
                        app.nm.remove(n)

    def handleAskLastIDs(self, conn, packet):
        raise NotImplementedError('this method must be overridden')

    def handleAskPartitionTable(self, conn, packet, offset_list):
        raise NotImplementedError('this method must be overridden')

163
    def handleSendPartitionTable(self, conn, packet, ptid, row_list):
164 165
        raise NotImplementedError('this method must be overridden')

166
    def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
167 168 169 170 171 172 173 174 175 176 177
        raise NotImplementedError('this method must be overridden')

    def handleStartOperation(self, conn, packet):
        raise NotImplementedError('this method must be overridden')

    def handleStopOperation(self, conn, packet):
        raise NotImplementedError('this method must be overridden')

    def handleAskUnfinishedTransactions(self, conn, packet):
        raise NotImplementedError('this method must be overridden')

178
    def handleAskTransactionInformation(self, conn, packet, tid):
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
        raise NotImplementedError('this method must be overridden')

    def handleAskObjectPresent(self, conn, packet, oid, tid):
        raise NotImplementedError('this method must be overridden')

    def handleDeleteTransaction(self, conn, packet, tid):
        raise NotImplementedError('this method must be overridden')

    def handleCommitTransaction(self, conn, packet, tid):
        raise NotImplementedError('this method must be overridden')

    def handleLockInformation(self, conn, packet, tid):
        raise NotImplementedError('this method must be overridden')

    def handleUnlockInformation(self, conn, packet, tid):
        raise NotImplementedError('this method must be overridden')
195

196 197 198
    def handleAskObject(self, conn, packet, oid, serial, tid):
        self.handleUnexpectedPacket(conn, packet)

199
    def handleAskTIDs(self, conn, packet, first, last, partition):
200 201
        self.handleUnexpectedPacket(conn, packet)

202
    def handleAskObjectHistory(self, conn, packet, oid, first, last):
203 204 205 206 207 208
        self.handleUnexpectedPacket(conn, packet)

    def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
                                  ext, oid_list):
        self.handleUnexpectedPacket(conn, packet)

209
    def handleAskStoreObject(self, conn, packet, oid, serial,
210
                             compression, checksum, data, tid):
211 212
        self.handleUnexpectedPacket(conn, packet)

213 214 215
    def handleAbortTransaction(self, conn, packet, tid):
        logging.info('ignoring abort transaction')
        pass
216 217 218 219 220 221 222 223

    def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
        logging.info('ignoring answer last ids')
        pass

    def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
        logging.info('ignoring answer unfinished transactions')
        pass
224 225 226 227

    def handleAskOIDs(self, conn, packet, first, last, partition):
        logging.info('ignoring ask oids')
        pass