handler.py 16.1 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from neo import logging
19
from neo.protocol import NodeStates, ErrorCodes, Packets, Errors
20
from neo.protocol import PacketMalformedError, UnexpectedPacketError, \
21
        BrokenNodeDisallowedError, NotReadyError, ProtocolError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
22

23

Yoshinori Okuji's avatar
Yoshinori Okuji committed
24 25
class EventHandler(object):
    """This class handles events."""
26

27 28
    def __init__(self, app):
        self.app = app
29 30
        self.packet_dispatch_table = self.__initPacketDispatchTable()
        self.error_dispatch_table = self.__initErrorDispatchTable()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
31

32 33 34
    def __repr__(self):
        return self.__class__.__name__

35
    def __unexpectedPacket(self, conn, packet, message=None):
36 37
        """Handle an unexpected packet."""
        if message is None:
38 39
            message = 'unexpected packet type %s in %s' % (packet.getType(),
                    self.__class__.__name__)
40
        else:
41 42
            message = 'unexpected packet: %s in %s' % (message,
                    self.__class__.__name__)
43
        logging.error(message)
44
        conn.answer(Errors.ProtocolError(message))
45 46
        conn.abort()
        self.peerBroken(conn)
47

Yoshinori Okuji's avatar
Yoshinori Okuji committed
48 49 50
    def dispatch(self, conn, packet):
        """This is a helper method to handle various packet types."""
        try:
51
            try:
52
                method = self.packet_dispatch_table[packet.getType()]
53 54
            except KeyError:
                raise UnexpectedPacketError('no handler found')
55
            args = packet.decode() or ()
56
            conn.setPeerId(packet.getId())
57
            method(conn, *args)
58
        except UnexpectedPacketError, e:
59
            self.__unexpectedPacket(conn, packet, *e.args)
60
        except PacketMalformedError:
61
            logging.error('malformed packet from %r', conn)
62 63 64
            conn.notify(Packets.Notify('Malformed packet: %r' % (packet, )))
            conn.abort()
            self.peerBroken(conn)
65
        except BrokenNodeDisallowedError:
66
            conn.answer(Errors.Broken('go away'))
67
            conn.abort()
68 69 70 71
        except NotReadyError, message:
            if not message.args:
                message = 'Retry Later'
            message = str(message)
72
            conn.answer(Errors.NotReady(message))
73 74
            conn.abort()
        except ProtocolError, message:
75
            message = str(message)
76
            conn.answer(Errors.ProtocolError(message))
77
            conn.abort()
78

79 80 81 82
    def checkClusterName(self, name):
        # raise an exception if the fiven name mismatch the current cluster name
        if self.app.name != name:
            logging.error('reject an alien cluster')
83
            raise ProtocolError('invalid cluster name')
84

Yoshinori Okuji's avatar
Yoshinori Okuji committed
85

86 87 88 89 90 91 92 93
    # Network level handlers

    def packetReceived(self, conn, packet):
        """Called when a packet is received."""
        self.dispatch(conn, packet)

    def connectionStarted(self, conn):
        """Called when a connection is started."""
94
        logging.debug('connection started for %r', conn)
95 96 97

    def connectionCompleted(self, conn):
        """Called when a connection is completed."""
98
        logging.debug('connection completed for %r', conn)
99 100 101

    def connectionFailed(self, conn):
        """Called when a connection failed."""
102
        logging.debug('connection failed for %r', conn)
103

104
    def connectionAccepted(self, conn):
105 106 107 108
        """Called when a connection is accepted."""

    def timeoutExpired(self, conn):
        """Called when a timeout event occurs."""
109
        logging.debug('timeout expired for %r', conn)
110
        self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
111 112 113

    def connectionClosed(self, conn):
        """Called when a connection is closed by the peer."""
114
        logging.debug('connection closed for %r', conn)
115
        self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
116 117 118

    def peerBroken(self, conn):
        """Called when a peer is broken."""
119
        logging.error('%r is broken', conn)
120
        self.connectionLost(conn, NodeStates.BROKEN)
121

122
    def connectionLost(self, conn, new_state):
123 124 125 126 127 128
        """ this is a method to override in sub-handlers when there is no need
        to make distinction from the kind event that closed the connection  """
        pass


    # Packet handlers.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
129

Grégory Wisniewski's avatar
Grégory Wisniewski committed
130
    def notify(self, conn, message):
131
        logging.info('notification from %r: %s', conn, message)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
132

133
    def requestIdentification(self, conn, node_type,
134
                                        uuid, address, name):
135
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
136

137
    def acceptIdentification(self, conn, node_type,
138
                       uuid, num_partitions, num_replicas, your_uuid):
139
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
140

141
    def askPrimary(self, conn):
142
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
143

144
    def answerPrimary(self, conn, primary_uuid,
145
                                  known_master_list):
146
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
147

148
    def announcePrimary(self, con):
149
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
150

151
    def reelectPrimary(self, conn):
152
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
153

154
    def notifyNodeInformation(self, conn, node_list):
155
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
156

157
    def askLastIDs(self, conn):
158
        raise UnexpectedPacketError
159

160
    def answerLastIDs(self, conn, loid, ltid, lptid):
161
        raise UnexpectedPacketError
162

163
    def askPartitionTable(self, conn, offset_list):
164
        raise UnexpectedPacketError
165

166
    def answerPartitionTable(self, conn, ptid, row_list):
167
        raise UnexpectedPacketError
168

169
    def sendPartitionTable(self, conn, ptid, row_list):
170
        raise UnexpectedPacketError
171

172
    def notifyPartitionChanges(self, conn, ptid, cell_list):
173
        raise UnexpectedPacketError
174

175
    def startOperation(self, conn):
176
        raise UnexpectedPacketError
177

178
    def stopOperation(self, conn):
179
        raise UnexpectedPacketError
180

181
    def askUnfinishedTransactions(self, conn):
182
        raise UnexpectedPacketError
183

184
    def answerUnfinishedTransactions(self, conn, tid_list):
185
        raise UnexpectedPacketError
186

187
    def askObjectPresent(self, conn, oid, tid):
188
        raise UnexpectedPacketError
189

190
    def answerObjectPresent(self, conn, oid, tid):
191
        raise UnexpectedPacketError
192

193
    def deleteTransaction(self, conn, tid):
194
        raise UnexpectedPacketError
195

196
    def commitTransaction(self, conn, tid):
197
        raise UnexpectedPacketError
198

199
    def askBeginTransaction(self, conn, tid):
200
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
201

202
    def answerBeginTransaction(self, conn, tid):
203
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
204

205
    def askNewOIDs(self, conn, num_oids):
206
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
207

208
    def answerNewOIDs(self, conn, num_oids):
209
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
210

211
    def askFinishTransaction(self, conn, oid_list, tid):
212
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
213

214
    def answerTransactionFinished(self, conn, tid):
215
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
216

217
    def askLockInformation(self, conn, tid):
218
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
219

220
    def answerInformationLocked(self, conn, tid):
221
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
222

223
    def invalidateObjects(self, conn, oid_list, tid):
224
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
225

226
    def notifyUnlockInformation(self, conn, tid):
227
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
228

229
    def askStoreObject(self, conn, oid, serial,
230
                             compression, checksum, data, tid):
231
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
232

233
    def answerStoreObject(self, conn, conflicting, oid, serial):
234
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
235

236
    def abortTransaction(self, conn, tid):
237
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
238

239
    def askStoreTransaction(self, conn, tid, user, desc,
Aurel's avatar
Aurel committed
240
                                  ext, oid_list):
241
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
242

243
    def answerStoreTransaction(self, conn, tid):
244
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
245

246
    def askObject(self, conn, oid, serial, tid):
247
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
248

249
    def answerObject(self, conn, oid, serial_start,
250
            serial_end, compression, checksum, data, data_serial):
251
        raise UnexpectedPacketError
252

253
    def askTIDs(self, conn, first, last, partition):
254
        raise UnexpectedPacketError
255

256
    def answerTIDs(self, conn, tid_list):
257
        raise UnexpectedPacketError
258

259
    def askTransactionInformation(self, conn, tid):
260
        raise UnexpectedPacketError
261

262
    def answerTransactionInformation(self, conn, tid,
263
                                           user, desc, ext, packed, oid_list):
264
        raise UnexpectedPacketError
265

266
    def askObjectHistory(self, conn, oid, first, last):
267
        raise UnexpectedPacketError
268

269
    def answerObjectHistory(self, conn, oid, history_list):
270
        raise UnexpectedPacketError
271

272
    def askOIDs(self, conn, first, last, partition):
273
        raise UnexpectedPacketError
274

275
    def answerOIDs(self, conn, oid_list):
276
        raise UnexpectedPacketError
277

278
    def askPartitionList(self, conn, min_offset, max_offset, uuid):
279
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
280

281
    def answerPartitionList(self, conn, ptid, row_list):
282
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
283

284
    def askNodeList(self, conn, offset_list):
285
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
286

287
    def answerNodeList(self, conn, node_list):
288
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
289

290
    def setNodeState(self, conn, uuid, state, modify_partition_table):
291
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
292

293
    def answerNodeState(self, conn, uuid, state):
294
        raise UnexpectedPacketError
Aurel's avatar
Aurel committed
295

296
    def addPendingNodes(self, conn, uuid_list):
297 298
        raise UnexpectedPacketError

299
    def answerNewNodes(self, conn, uuid_list):
300 301
        raise UnexpectedPacketError

302
    def askNodeInformation(self, conn):
303 304
        raise UnexpectedPacketError

305
    def answerNodeInformation(self, conn):
306 307
        raise UnexpectedPacketError

308
    def askClusterState(self, conn):
309 310
        raise UnexpectedPacketError

311
    def answerClusterState(self, conn, state):
312 313
        raise UnexpectedPacketError

314
    def setClusterState(self, conn, state):
315 316
        raise UnexpectedPacketError

317
    def notifyClusterInformation(self, conn, state):
318
        raise UnexpectedPacketError
319

320
    def notifyLastOID(self, conn, oid):
321 322
        raise UnexpectedPacketError

323
    def notifyReplicationDone(self, conn, offset):
324 325
        raise UnexpectedPacketError

326 327 328 329 330
    def askUndoTransaction(self, conn, tid, undone_tid):
        raise UnexpectedPacketError

    def answerUndoTransaction(self, conn, oid_list, error_oid_list, conflict_oid_list):
        raise UnexpectedPacketError
331

Yoshinori Okuji's avatar
Yoshinori Okuji committed
332 333
    # Error packet handlers.

334
    def error(self, conn, code, message):
335 336
        try:
            method = self.error_dispatch_table[code]
337
            method(conn, message)
338 339 340
        except ValueError:
            raise UnexpectedPacketError(message)

341
    def notReady(self, conn, message):
342 343
        raise UnexpectedPacketError

344
    def oidNotFound(self, conn, message):
345 346
        raise UnexpectedPacketError

347
    def tidNotFound(self, conn, message):
348
        raise UnexpectedPacketError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
349

350
    def protocolError(self, conn, message):
351 352
        # the connection should have been closed by the remote peer
        logging.error('protocol error: %s' % (message,))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
353

354
    def timeoutError(self, conn, message):
355
        logging.error('timeout error: %s' % (message,))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
356

357
    def brokenNodeDisallowedError(self, conn, message):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
358 359
        raise RuntimeError, 'broken node disallowed error: %s' % (message,)

360
    def ack(self, conn, message):
361 362
        logging.debug("no error message : %s" % (message))

Aurel's avatar
Aurel committed
363

364 365
    # Fetch tables initialization

366
    def __initPacketDispatchTable(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
367 368
        d = {}

369
        d[Packets.Error] = self.error
Grégory Wisniewski's avatar
Grégory Wisniewski committed
370
        d[Packets.Notify] = self.notify
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
        d[Packets.RequestIdentification] = self.requestIdentification
        d[Packets.AcceptIdentification] = self.acceptIdentification
        d[Packets.AskPrimary] = self.askPrimary
        d[Packets.AnswerPrimary] = self.answerPrimary
        d[Packets.AnnouncePrimary] = self.announcePrimary
        d[Packets.ReelectPrimary] = self.reelectPrimary
        d[Packets.NotifyNodeInformation] = self.notifyNodeInformation
        d[Packets.AskLastIDs] = self.askLastIDs
        d[Packets.AnswerLastIDs] = self.answerLastIDs
        d[Packets.AskPartitionTable] = self.askPartitionTable
        d[Packets.AnswerPartitionTable] = self.answerPartitionTable
        d[Packets.SendPartitionTable] = self.sendPartitionTable
        d[Packets.NotifyPartitionChanges] = self.notifyPartitionChanges
        d[Packets.StartOperation] = self.startOperation
        d[Packets.StopOperation] = self.stopOperation
        d[Packets.AskUnfinishedTransactions] = self.askUnfinishedTransactions
387 388
        d[Packets.AnswerUnfinishedTransactions] = \
            self.answerUnfinishedTransactions
389 390 391 392 393 394
        d[Packets.AskObjectPresent] = self.askObjectPresent
        d[Packets.AnswerObjectPresent] = self.answerObjectPresent
        d[Packets.DeleteTransaction] = self.deleteTransaction
        d[Packets.CommitTransaction] = self.commitTransaction
        d[Packets.AskBeginTransaction] = self.askBeginTransaction
        d[Packets.AnswerBeginTransaction] = self.answerBeginTransaction
395
        d[Packets.AskFinishTransaction] = self.askFinishTransaction
396
        d[Packets.AnswerTransactionFinished] = self.answerTransactionFinished
397
        d[Packets.AskLockInformation] = self.askLockInformation
398
        d[Packets.AnswerInformationLocked] = self.answerInformationLocked
399
        d[Packets.InvalidateObjects] = self.invalidateObjects
400
        d[Packets.NotifyUnlockInformation] = self.notifyUnlockInformation
401 402 403 404 405 406 407 408 409 410 411 412
        d[Packets.AskNewOIDs] = self.askNewOIDs
        d[Packets.AnswerNewOIDs] = self.answerNewOIDs
        d[Packets.AskStoreObject] = self.askStoreObject
        d[Packets.AnswerStoreObject] = self.answerStoreObject
        d[Packets.AbortTransaction] = self.abortTransaction
        d[Packets.AskStoreTransaction] = self.askStoreTransaction
        d[Packets.AnswerStoreTransaction] = self.answerStoreTransaction
        d[Packets.AskObject] = self.askObject
        d[Packets.AnswerObject] = self.answerObject
        d[Packets.AskTIDs] = self.askTIDs
        d[Packets.AnswerTIDs] = self.answerTIDs
        d[Packets.AskTransactionInformation] = self.askTransactionInformation
413 414
        d[Packets.AnswerTransactionInformation] = \
            self.answerTransactionInformation
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
        d[Packets.AskObjectHistory] = self.askObjectHistory
        d[Packets.AnswerObjectHistory] = self.answerObjectHistory
        d[Packets.AskOIDs] = self.askOIDs
        d[Packets.AnswerOIDs] = self.answerOIDs
        d[Packets.AskPartitionList] = self.askPartitionList
        d[Packets.AnswerPartitionList] = self.answerPartitionList
        d[Packets.AskNodeList] = self.askNodeList
        d[Packets.AnswerNodeList] = self.answerNodeList
        d[Packets.SetNodeState] = self.setNodeState
        d[Packets.AnswerNodeState] = self.answerNodeState
        d[Packets.SetClusterState] = self.setClusterState
        d[Packets.AddPendingNodes] = self.addPendingNodes
        d[Packets.AnswerNewNodes] = self.answerNewNodes
        d[Packets.AskNodeInformation] = self.askNodeInformation
        d[Packets.AnswerNodeInformation] = self.answerNodeInformation
        d[Packets.AskClusterState] = self.askClusterState
        d[Packets.AnswerClusterState] = self.answerClusterState
        d[Packets.NotifyClusterInformation] = self.notifyClusterInformation
        d[Packets.NotifyLastOID] = self.notifyLastOID
434
        d[Packets.NotifyReplicationDone] = self.notifyReplicationDone
435 436
        d[Packets.AskUndoTransaction] = self.askUndoTransaction
        d[Packets.AnswerUndoTransaction] = self.answerUndoTransaction
437

438
        return d
Yoshinori Okuji's avatar
Yoshinori Okuji committed
439

440
    def __initErrorDispatchTable(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
441
        d = {}
442

443
        d[ErrorCodes.ACK] = self.ack
444 445 446 447 448
        d[ErrorCodes.NOT_READY] = self.notReady
        d[ErrorCodes.OID_NOT_FOUND] = self.oidNotFound
        d[ErrorCodes.TID_NOT_FOUND] = self.tidNotFound
        d[ErrorCodes.PROTOCOL_ERROR] = self.protocolError
        d[ErrorCodes.BROKEN_NODE] = self.brokenNodeDisallowedError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
449

450 451
        return d