handler.py 15.4 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

Yoshinori Okuji's avatar
Yoshinori Okuji committed
18 19
import logging

Yoshinori Okuji's avatar
Yoshinori Okuji committed
20 21
from neo.protocol import Packet, ProtocolError
from neo.connection import ServerConnection
Yoshinori Okuji's avatar
Yoshinori Okuji committed
22 23 24 25

from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
        PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
        REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
26 27
        STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \
        ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
28
        ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
29
        ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
Yoshinori Okuji's avatar
Yoshinori Okuji committed
30 31 32
        DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_NEW_TID, ANSWER_NEW_TID, \
        FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
        NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
33 34
        ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
        ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \
35
        ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \
36
        ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \
37
        ASK_OIDS, ANSWER_OIDS, \
Yoshinori Okuji's avatar
Yoshinori Okuji committed
38 39 40 41 42 43 44 45 46 47 48 49
        NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
        PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
        INTERNAL_ERROR_CODE

class EventHandler(object):
    """This class handles events."""
    def __init__(self):
        self.initPacketDispatchTable()
        self.initErrorDispatchTable()

    def connectionStarted(self, conn):
        """Called when a connection is started."""
50
        logging.debug('connection started for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
51 52 53

    def connectionCompleted(self, conn):
        """Called when a connection is completed."""
54
        logging.debug('connection completed for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
55 56 57

    def connectionFailed(self, conn):
        """Called when a connection failed."""
58
        logging.debug('connection failed for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
59

60
    def connectionAccepted(self, conn, connector, addr):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
61
        """Called when a connection is accepted."""
62
        logging.debug('connection accepted from %s:%d', *addr)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
63
        new_conn = ServerConnection(conn.getEventManager(), conn.getHandler(),
64
                                    connector = connector, addr = addr)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
65 66 67 68 69
        # A request for a node identification should arrive.
        new_conn.expectMessage(timeout = 10, additional_timeout = 0)

    def timeoutExpired(self, conn):
        """Called when a timeout event occurs."""
70
        logging.debug('timeout expired for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
71 72 73

    def connectionClosed(self, conn):
        """Called when a connection is closed by the peer."""
74
        logging.debug('connection closed for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
75 76 77 78 79 80 81

    def packetReceived(self, conn, packet):
        """Called when a packet is received."""
        self.dispatch(conn, packet)

    def packetMalformed(self, conn, packet, error_message):
        """Called when a packet is malformed."""
82 83 84
        logging.info('malformed packet %x from %s:%d: %s',
                     packet.getType(), conn.getAddress()[0], 
                     conn.getAddress()[1], error_message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
85 86 87 88 89 90 91 92 93 94 95 96 97
        conn.addPacket(Packet().protocolError(packet.getId(), error_message))
        conn.abort()
        self.peerBroken(conn)

    def peerBroken(self, conn):
        """Called when a peer is broken."""
        logging.error('%s:%d is broken', *(conn.getAddress()))

    def dispatch(self, conn, packet):
        """This is a helper method to handle various packet types."""
        t = packet.getType()
        try:
            method = self.packet_dispatch_table[t]
98
            args = packet.decode() or ()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
99
            method(conn, packet, *args)
100
        except (KeyError, ValueError):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
101 102 103 104 105 106 107
            self.handleUnexpectedPacket(conn, packet)
        except ProtocolError, m:
            self.packetMalformed(conn, packet, m[1])

    def handleUnexpectedPacket(self, conn, packet, message = None):
        """Handle an unexpected packet."""
        if message is None:
108
            message = 'unexpected packet type %s' % packet.getType()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
        else:
            message = 'unexpected packet: ' + message
        logging.info('%s', message)
        conn.addPacket(Packet().protocolError(packet.getId(), message))
        conn.abort()
        self.peerBroken(conn)

    # Packet handlers.

    def handleError(self, conn, packet, code, message):
        try:
            method = self.error_dispatch_table[code]
            method(conn, packet, message)
        except ValueError:
            self.handleUnexpectedPacket(conn, packet, message)

    def handleRequestNodeIdentification(self, conn, packet, node_type,
                                        uuid, ip_address, port, name):
        self.handleUnexpectedPacket(conn, packet)

    def handleAcceptNodeIdentification(self, conn, packet, node_type,
Aurel's avatar
Aurel committed
130
                                       uuid, ip_address, port,
131
                                       num_partitions, num_replicas, your_uuid):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
132 133 134 135 136 137 138 139 140
        self.handleUnexpectedPacket(conn, packet)

    def handlePing(self, conn, packet):
        logging.info('got a ping packet; am I overloaded?')
        conn.addPacket(Packet().pong(packet.getId()))

    def handlePong(self, conn, packet):
        pass

141
    def handleAskPrimaryMaster(self, conn, packet):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
142 143
        self.handleUnexpectedPacket(conn, packet)

144 145
    def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
                                  known_master_list):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
146 147 148 149 150 151 152 153 154 155 156
        self.handleUnexpectedPacket(conn, packet)

    def handleAnnouncePrimaryMaster(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleReelectPrimaryMaster(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleNotifyNodeInformation(self, conn, packet, node_list):
        self.handleUnexpectedPacket(conn, packet)

157 158 159 160 161 162 163 164 165
    def handleAskLastIDs(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
        self.handleUnexpectedPacket(conn, packet)

    def handleAskPartitionTable(self, conn, packet, offset_list):
        self.handleUnexpectedPacket(conn, packet)

166
    def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
167 168
        self.handleUnexpectedPacket(conn, packet)

169
    def handleSendPartitionTable(self, conn, packet, ptid, row_list):
170 171
        self.handleUnexpectedPacket(conn, packet)

172
    def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
173 174 175 176 177 178 179 180
        self.handleUnexpectedPacket(conn, packet)

    def handleStartOperation(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleStopOperation(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
    def handleAskUnfinishedTransactions(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
        self.handleUnexpectedPacket(conn, packet)

    def handleAskObjectPresent(self, conn, packet, oid, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerObjectPresent(self, conn, packet, oid, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleDeleteTransaction(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleCommitTransaction(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
199 200 201 202 203 204
    def handleAskNewTID(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerNewTID(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

205
    def handleAskNewOIDs(self, conn, packet):
Aurel's avatar
Aurel committed
206 207
        self.handleUnexpectedPacket(conn, packet)

208
    def handleAnswerNewOIDs(self, conn, packet, oid_list):
Aurel's avatar
Aurel committed
209 210
        self.handleUnexpectedPacket(conn, packet)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
211 212 213 214 215 216 217 218 219 220 221 222
    def handleFinishTransaction(self, conn, packet, oid_list, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleNotifyTransactionFinished(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleLockInformation(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleNotifyInformationLocked(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

223
    def handleInvalidateObjects(self, conn, packet, oid_list, tid):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
224 225 226 227 228
        self.handleUnexpectedPacket(conn, packet)

    def handleUnlockInformation(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

229
    def handleAskStoreObject(self, conn, packet, oid, serial,
230
                             compression, checksum, data, tid):
Aurel's avatar
Aurel committed
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerStoreObject(self, conn, packet, status, oid):
        self.handleUnexpectedPacket(conn, packet)

    def handleAbortTransaction(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
                                  ext, oid_list):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerStoreTransaction(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

246
    def handleAskObject(self, conn, packet, oid, serial, tid):
Aurel's avatar
Aurel committed
247 248
        self.handleUnexpectedPacket(conn, packet)

249 250
    def handleAnswerObject(self, conn, packet, oid, serial_start,
                           serial_end, compression, checksum, data):
Aurel's avatar
Aurel committed
251
        self.handleUnexpectedPacket(conn, packet)
252

253
    def handleAskTIDs(self, conn, packet, first, last, partition):
254
        self.handleUnexpectedPacket(conn, packet)
255 256

    def handleAnswerTIDs(self, conn, packet, tid_list):
257
        self.handleUnexpectedPacket(conn, packet)
258 259

    def handleAskTransactionInformation(self, conn, packet, tid):
260
        self.handleUnexpectedPacket(conn, packet)
261

262 263
    def handleAnswerTransactionInformation(self, conn, packet, tid, 
                                           user, desc, ext, oid_list):
264
        self.handleUnexpectedPacket(conn, packet)
265

266
    def handleAskObjectHistory(self, conn, packet, oid, first, last):
267 268
        self.handleUnexpectedPacket(conn, packet)

269
    def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
270
        self.handleUnexpectedPacket(conn, packet)
271

272 273 274
    def handleAskOIDs(self, conn, packet, first, last, partition):
        self.handleUnexpectedPacket(conn, packet)

275
    def handleAnswerOIDs(self, conn, packet, oid_list):
276 277
        self.handleUnexpectedPacket(conn, packet)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
    # Error packet handlers.

    handleNotReady = handleUnexpectedPacket
    handleOidNotFound = handleUnexpectedPacket
    handleSerialNotFound = handleUnexpectedPacket
    handleTidNotFound = handleUnexpectedPacket

    def handleProtocolError(self, conn, packet, message):
        raise RuntimeError, 'protocol error: %s' % (message,)

    def handleTimeoutError(self, conn, packet, message):
        raise RuntimeError, 'timeout error: %s' % (message,)

    def handleBrokenNodeDisallowedError(self, conn, packet, message):
        raise RuntimeError, 'broken node disallowed error: %s' % (message,)

    def handleInternalError(self, conn, packet, message):
        self.peerBroken(conn)
        conn.close()

    def initPacketDispatchTable(self):
        d = {}

        d[ERROR] = self.handleError
        d[REQUEST_NODE_IDENTIFICATION] = self.handleRequestNodeIdentification
        d[ACCEPT_NODE_IDENTIFICATION] = self.handleAcceptNodeIdentification
        d[PING] = self.handlePing
        d[PONG] = self.handlePong
        d[ASK_PRIMARY_MASTER] = self.handleAskPrimaryMaster
        d[ANSWER_PRIMARY_MASTER] = self.handleAnswerPrimaryMaster
        d[ANNOUNCE_PRIMARY_MASTER] = self.handleAnnouncePrimaryMaster
        d[REELECT_PRIMARY_MASTER] = self.handleReelectPrimaryMaster
        d[NOTIFY_NODE_INFORMATION] = self.handleNotifyNodeInformation
311 312 313 314 315 316 317 318
        d[ASK_LAST_IDS] = self.handleAskLastIDs
        d[ANSWER_LAST_IDS] = self.handleAnswerLastIDs
        d[ASK_PARTITION_TABLE] = self.handleAskPartitionTable
        d[ANSWER_PARTITION_TABLE] = self.handleAnswerPartitionTable
        d[SEND_PARTITION_TABLE] = self.handleSendPartitionTable
        d[NOTIFY_PARTITION_CHANGES] = self.handleNotifyPartitionChanges
        d[START_OPERATION] = self.handleStartOperation
        d[STOP_OPERATION] = self.handleStopOperation
319 320 321 322 323 324
        d[ASK_UNFINISHED_TRANSACTIONS] = self.handleAskUnfinishedTransactions
        d[ANSWER_UNFINISHED_TRANSACTIONS] = self.handleAnswerUnfinishedTransactions
        d[ASK_OBJECT_PRESENT] = self.handleAskObjectPresent
        d[ANSWER_OBJECT_PRESENT] = self.handleAnswerObjectPresent
        d[DELETE_TRANSACTION] = self.handleDeleteTransaction
        d[COMMIT_TRANSACTION] = self.handleCommitTransaction
Yoshinori Okuji's avatar
Yoshinori Okuji committed
325 326 327 328 329 330 331 332
        d[ASK_NEW_TID] = self.handleAskNewTID
        d[ANSWER_NEW_TID] = self.handleAnswerNewTID
        d[FINISH_TRANSACTION] = self.handleFinishTransaction
        d[NOTIFY_TRANSACTION_FINISHED] = self.handleNotifyTransactionFinished
        d[LOCK_INFORMATION] = self.handleLockInformation
        d[NOTIFY_INFORMATION_LOCKED] = self.handleNotifyInformationLocked
        d[INVALIDATE_OBJECTS] = self.handleInvalidateObjects
        d[UNLOCK_INFORMATION] = self.handleUnlockInformation
333 334
        d[ASK_NEW_OIDS] = self.handleAskNewOIDs
        d[ANSWER_NEW_OIDS] = self.handleAnswerNewOIDs
335 336
        d[ASK_STORE_OBJECT] = self.handleAskStoreObject
        d[ANSWER_STORE_OBJECT] = self.handleAnswerStoreObject
337
        d[ABORT_TRANSACTION] = self.handleAbortTransaction
338 339
        d[ASK_STORE_TRANSACTION] = self.handleAskStoreTransaction
        d[ANSWER_STORE_TRANSACTION] = self.handleAnswerStoreTransaction
340 341
        d[ASK_OBJECT] = self.handleAskObject
        d[ANSWER_OBJECT] = self.handleAnswerObject
342 343 344 345 346 347
        d[ASK_TIDS] = self.handleAskTIDs
        d[ANSWER_TIDS] = self.handleAnswerTIDs
        d[ASK_TRANSACTION_INFORMATION] = self.handleAskTransactionInformation
        d[ANSWER_TRANSACTION_INFORMATION] = self.handleAnswerTransactionInformation
        d[ASK_OBJECT_HISTORY] = self.handleAskObjectHistory
        d[ANSWER_OBJECT_HISTORY] = self.handleAnswerObjectHistory
348 349
        d[ASK_OIDS] = self.handleAskOIDs
        d[ANSWER_OIDS] = self.handleAnswerOIDs
350

Yoshinori Okuji's avatar
Yoshinori Okuji committed
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
        self.packet_dispatch_table = d

    def initErrorDispatchTable(self):
        d = {}

        d[NOT_READY_CODE] = self.handleNotReady
        d[OID_NOT_FOUND_CODE] = self.handleOidNotFound
        d[SERIAL_NOT_FOUND_CODE] = self.handleSerialNotFound
        d[TID_NOT_FOUND_CODE] = self.handleTidNotFound
        d[PROTOCOL_ERROR_CODE] = self.handleProtocolError
        d[TIMEOUT_ERROR_CODE] = self.handleTimeoutError
        d[BROKEN_NODE_DISALLOWED_CODE] = self.handleBrokenNodeDisallowedError
        d[INTERNAL_ERROR_CODE] = self.handleInternalError

        self.error_dispatch_table = d