protocol.py 42.4 KB
Newer Older
1

Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from struct import pack, unpack, error
Yoshinori Okuji's avatar
Yoshinori Okuji committed
19
from socket import inet_ntoa, inet_aton
20

21 22
from neo.util import Enum

Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
# The protocol version (major, minor).
24
PROTOCOL_VERSION = (4, 1)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
25 26 27

# Size restrictions.
MIN_PACKET_SIZE = 10
28
MAX_PACKET_SIZE = 0x4000000
29
PACKET_HEADER_SIZE = 10
30
RESPONSE_MASK = 0x8000
Yoshinori Okuji's avatar
Yoshinori Okuji committed
31

32
class ErrorCodes(Enum):
33
    ACK = Enum.Item(0)
34 35 36 37 38 39
    NOT_READY = Enum.Item(1)
    OID_NOT_FOUND = Enum.Item(2)
    TID_NOT_FOUND = Enum.Item(3)
    PROTOCOL_ERROR = Enum.Item(4)
    BROKEN_NODE = Enum.Item(5)
ErrorCodes = ErrorCodes()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
40

41
class ClusterStates(Enum):
42 43 44 45
    RECOVERING = Enum.Item(1)
    VERIFYING = Enum.Item(2)
    RUNNING = Enum.Item(3)
    STOPPING = Enum.Item(4)
46
ClusterStates = ClusterStates()
47

48 49 50 51 52 53
class NodeTypes(Enum):
    MASTER = Enum.Item(1)
    STORAGE = Enum.Item(2)
    CLIENT = Enum.Item(3)
    ADMIN = Enum.Item(4)
NodeTypes = NodeTypes()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
54

55 56 57 58 59 60 61 62 63
class NodeStates(Enum):
    RUNNING = Enum.Item(1)
    TEMPORARILY_DOWN = Enum.Item(2)
    DOWN = Enum.Item(3)
    BROKEN = Enum.Item(4)
    HIDDEN = Enum.Item(5)
    PENDING = Enum.Item(6)
    UNKNOWN = Enum.Item(7)
NodeStates = NodeStates()
64

65 66 67 68 69 70 71
class CellStates(Enum):
    UP_TO_DATE = Enum.Item(1)
    OUT_OF_DATE = Enum.Item(2)
    FEEDING = Enum.Item(3)
    DISCARDED = Enum.Item(4)
CellStates = CellStates()

72 73
# used for logging
node_state_prefix_dict = {
74 75 76 77 78 79 80
    NodeStates.RUNNING: 'R',
    NodeStates.TEMPORARILY_DOWN: 'T',
    NodeStates.DOWN: 'D',
    NodeStates.BROKEN: 'B',
    NodeStates.HIDDEN: 'H',
    NodeStates.PENDING: 'P',
    NodeStates.UNKNOWN: 'U',
81 82 83
}

# used for logging
84 85 86
cell_state_prefix_dict = {
    CellStates.UP_TO_DATE: 'U',
    CellStates.OUT_OF_DATE: 'O',
87 88
    CellStates.FEEDING: 'F',
    CellStates.DISCARDED: 'D',
89 90
}

91
# Other constants.
92 93
INVALID_UUID = '\0' * 16
INVALID_TID = '\0' * 8
94
INVALID_OID = '\0' * 8
95
INVALID_PTID = '\0' * 8
96
INVALID_SERIAL = INVALID_TID
97
INVALID_PARTITION = 0xffffffff
98

99
UUID_NAMESPACES = {
100 101 102 103
    NodeTypes.STORAGE: 'S',
    NodeTypes.MASTER: 'M',
    NodeTypes.CLIENT: 'C',
    NodeTypes.ADMIN: 'A',
104 105
}

106
class ProtocolError(Exception):
107 108 109 110 111 112 113
    """ Base class for protocol errors, close the connection """
    pass

class PacketMalformedError(ProtocolError):
    """ Close the connection and set the node as broken"""
    pass

114
class UnexpectedPacketError(ProtocolError):
115 116 117
    """ Close the connection and set the node as broken"""
    pass

118
class NotReadyError(ProtocolError):
119 120 121
    """ Just close the connection """
    pass

122
class BrokenNodeDisallowedError(ProtocolError):
123
    """ Just close the connection """
124
    pass
125

Yoshinori Okuji's avatar
Yoshinori Okuji committed
126

127
# packet parser
128
def _decodeClusterState(state):
129
    cluster_state = ClusterStates.get(state)
130
    if cluster_state is None:
131
        raise PacketMalformedError('invalid cluster state %d' % state)
132 133
    return cluster_state

134
def _decodeNodeState(state):
135
    node_state = NodeStates.get(state)
136 137 138 139
    if node_state is None:
        raise PacketMalformedError('invalid node state %d' % state)
    return node_state

140
def _decodeNodeType(original_node_type):
141
    node_type = NodeTypes.get(original_node_type)
142
    if node_type is None:
143 144
        raise PacketMalformedError('invalid node type %d' % original_node_type)
    return node_type
145

146
def _decodeErrorCode(original_error_code):
147
    error_code = ErrorCodes.get(original_error_code)
148
    if error_code is None:
149
        raise PacketMalformedError('invalid error code %d' %
150
                original_error_code)
151 152
    return error_code

153
def _decodeAddress(address):
154 155 156 157 158 159 160 161
    if address == '\0' * 6:
        return None
    (ip, port) = unpack('!4sH', address)
    return (inet_ntoa(ip), port)

def _encodeAddress(address):
    if address is None:
        return '\0' * 6
162
    # address is a tuple (ip, port)
163 164
    return pack('!4sH', inet_aton(address[0]), address[1])

165
def _decodeUUID(uuid):
166 167 168 169 170 171 172 173 174
    if uuid == INVALID_UUID:
        return None
    return uuid

def _encodeUUID(uuid):
    if uuid is None:
        return INVALID_UUID
    return uuid

175
def _decodePTID(ptid):
176 177 178 179
    if ptid == INVALID_PTID:
        return None
    return ptid

180 181 182 183 184
def _encodePTID(ptid):
    if ptid is None:
        return INVALID_PTID
    return ptid

185 186 187 188 189 190 191 192 193 194
def _decodeTID(tid):
    if tid == INVALID_TID:
        return None
    return tid

def _encodeTID(tid):
    if tid is None:
        return INVALID_TID
    return tid

195 196 197 198
def _readString(buf, name, offset=0):
    buf = buf[offset:]
    (size, ) = unpack('!L', buf[:4])
    string = buf[4:4+size]
199 200
    if len(string) != size:
        raise PacketMalformedError("can't read string <%s>" % name)
201
    return (string, buf[offset+size:])
202

203

204
class Packet(object):
205 206
    """
    Base class for any packet definition.
207
    Each subclass should override _encode() and _decode() and return a string or
208
    a tuple respectively.
209
    """
210

211 212
    _request = None
    _answer = None
213 214 215 216 217 218 219
    _body = None
    _code = None
    _args = None
    _id = None

    def __init__(self, *args, **kw):
        assert self._code is not None, "Packet class not registered"
220
        if args != () or kw != {}:
221 222 223 224 225
            body = self._encode(*args, **kw)
        else:
            body = ''
        self._body = body
        self._args = args
226

227 228 229 230 231 232 233 234 235 236 237
    def decode(self):
        assert self._body is not None
        try:
            return self._decode(self._body)
        except error, msg: # struct.error
            name = self.__class__.__name__
            raise PacketMalformedError("%s fail (%s)" % (name, msg))
        except PacketMalformedError, msg:
            name = self.__class__.__name__
            raise PacketMalformedError("%s fail (%s)" % (name, msg))

238 239
    def setContent(self, msg_id, body):
        """ Register the packet content for future decoding """
240 241 242 243 244 245 246
        self._id = msg_id
        self._body = body

    def setId(self, value):
        self._id = value

    def getId(self):
247
        assert self._id is not None, "No identifier applied on the packet"
248 249 250 251 252 253 254 255
        return self._id

    def getCode(self):
        return self._code

    def getType(self):
        return self.__class__

256 257
    def __call__(self):
        """ Encode a packet as a string to send it over the network """
258 259
        content = self._body
        length = PACKET_HEADER_SIZE + len(content)
260
        return pack('!LHL', self.getId(), self._code, length) + content
261 262 263 264 265 266 267 268 269 270 271 272 273

    def __len__(self):
        return PACKET_HEADER_SIZE + len(self._body)

    def __eq__(self, other):
        """ Compare packets with their code instead of content """
        if other is None:
            return False
        assert isinstance(other, Packet)
        return self._code == other._code

    def _encode(self, *args, **kw):
        """ Default encoder, join all arguments """
274 275 276
        args = list(args)
        args.extend(kw.values())
        return ''.join([str(i) for i in args] or '')
277 278 279 280 281 282 283

    def _decode(self, body):
        """ Default decoder, message must be empty """
        assert body == '', "Non-empty packet decoding not implemented """
        return ()

    def isResponse(self):
284
        return self._code & RESPONSE_MASK == RESPONSE_MASK
285

286 287 288 289 290 291
    def answerMatch(self, answer):
        id_match = self._id == answer._id
        is_error = answer.__class__ == Error
        assert self._answer is not None
        return id_match and (is_error or isinstance(answer, self._answer))

292 293 294
    def getAnswer(self):
        return self._answer

295 296

class Ping(Packet):
297
    """
298 299
    Check if a peer is still alive. Any -> Any.
    """
300 301
    pass

302 303

class Pong(Packet):
304
    """
305 306
    Notify being alive. Any -> Any.
    """
307
    pass
308 309

class RequestIdentification(Packet):
310
    """
311 312 313 314 315 316 317 318 319 320 321 322
    Request a node identification. This must be the first packet for any
    connection. Any -> Any.
    """
    def _encode(self, node_type, uuid, address, name):
        uuid = _encodeUUID(uuid)
        address = _encodeAddress(address)
        return pack('!LLH16s6sL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
                          node_type, uuid, address, len(name)) + name

    def _decode(self, body):
        r = unpack('!LLH16s6s', body[:32])
        major, minor, node_type, uuid, address = r
323
        address = _decodeAddress(address)
324
        (name, _) = _readString(body, 'name', offset=32)
325 326
        node_type = _decodeNodeType(node_type)
        uuid = _decodeUUID(uuid)
327 328 329 330 331 332
        if (major, minor) != PROTOCOL_VERSION:
            raise PacketMalformedError('protocol version mismatch')
        return (node_type, uuid, address, name)

class AcceptIdentification(Packet):
    """
333
    Accept a node identification. This should be a reply to Request Node
334 335
    Identification. Any -> Any.
    """
336
    def _encode(self, node_type, uuid,
337 338 339
             num_partitions, num_replicas, your_uuid):
        uuid = _encodeUUID(uuid)
        your_uuid = _encodeUUID(your_uuid)
340
        return pack('!H16sLL16s', node_type, uuid,
341
                          num_partitions, num_replicas, your_uuid)
342

343
    def _decode(self, body):
344 345
        r = unpack('!H16sLL16s', body)
        node_type, uuid, num_partitions, num_replicas, your_uuid = r
346
        node_type = _decodeNodeType(node_type)
347
        uuid = _decodeUUID(uuid)
348
        your_uuid == _decodeUUID(uuid)
349
        return (node_type, uuid, num_partitions, num_replicas, your_uuid)
350 351

class AskPrimary(Packet):
352 353
    """
    Ask a current primary master node. This must be the second message when
354 355 356
    connecting to a master node. Any -> M.
    """
    pass
357

358
class AnswerPrimary(Packet):
359 360
    """
    Reply to Ask Primary Master. This message includes a list of known master
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
    nodes to make sure that a peer has the same information. M -> Any.
    """
    def _encode(self, primary_uuid, known_master_list):
        primary_uuid = _encodeUUID(primary_uuid)
        body = [primary_uuid, pack('!L', len(known_master_list))]
        for address, uuid in known_master_list:
            uuid = _encodeUUID(uuid)
            address = _encodeAddress(address)
            body.append(pack('!6s16s', address, uuid))
        return ''.join(body)

    def _decode(self, body):
        (primary_uuid, n) = unpack('!16sL', body[:20])
        known_master_list = []
        for i in xrange(n):
            address, uuid = unpack('!6s16s', body[20+i*22:42+i*22])
            address = _decodeAddress(address)
            uuid = _decodeUUID(uuid)
            known_master_list.append((address, uuid))
        primary_uuid = _decodeUUID(primary_uuid)
        return (primary_uuid, known_master_list)

class AnnouncePrimary(Packet):
384
    """
385 386
    Announce a primary master node election. PM -> SM.
    """
387 388
    pass

389 390 391 392
class ReelectPrimary(Packet):
    """
    Force a re-election of a primary master node. M -> M.
    """
393 394
    pass

395
class AskLastIDs(Packet):
396
    """
397 398 399
    Ask the last OID, the last TID and the last Partition Table ID that
    a storage node stores. Used to recover information. PM -> S, S -> PM.
    """
400
    pass
401 402

class AnswerLastIDs(Packet):
403
    """
404 405 406
    Reply to Ask Last IDs. S -> PM, PM -> S.
    """
    def _encode(self, loid, ltid, lptid):
407 408
        # in this case, loid is a valid OID but considered as invalid. This is
        # not an issue because the OID 0 is hard coded and will never be
409
        # generated
410 411 412 413 414 415 416 417 418 419 420 421
        if loid is None:
            loid = INVALID_OID
        ltid = _encodeTID(ltid)
        lptid = _encodePTID(lptid)
        return loid + ltid + lptid

    def _decode(self, body):
        (loid, ltid, lptid) = unpack('!8s8s8s', body)
        lptid = _decodePTID(lptid)
        return (loid, ltid, lptid)

class AskPartitionTable(Packet):
422
    """
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
    Ask rows in a partition table that a storage node stores. Used to recover
    information. PM -> S.
    """
    def _encode(self, offset_list):
        body = [pack('!L', len(offset_list))]
        for offset in offset_list:
            body.append(pack('!L', offset))
        return ''.join(body)

    def _decode(self, body):
        (n,) = unpack('!L', body[:4])
        offset_list = []
        for i in xrange(n):
            offset = unpack('!L', body[4+i*4:8+i*4])[0]
            offset_list.append(offset)
        return (offset_list,)

class AnswerPartitionTable(Packet):
441
    """
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
    Answer rows in a partition table. S -> PM.
    """
    def _encode(self, ptid, row_list):
        ptid = _encodePTID(ptid)
        body = [pack('!8sL', ptid, len(row_list))]
        for offset, cell_list in row_list:
            body.append(pack('!LL', offset, len(cell_list)))
            for uuid, state in cell_list:
                uuid = _encodeUUID(uuid)
                body.append(pack('!16sH', uuid, state))
        return ''.join(body)

    def _decode(self, body):
        index = 12
        (ptid, n) = unpack('!8sL', body[:index])
        ptid = _decodePTID(ptid)
        row_list = []
        cell_list = []
        for i in xrange(n):
            offset, m = unpack('!LL', body[index:index+8])
            index += 8
            for j in xrange(m):
                uuid, state = unpack('!16sH', body[index:index+18])
                index += 18
                state = CellStates.get(state)
                uuid = _decodeUUID(uuid)
                cell_list.append((uuid, state))
            row_list.append((offset, tuple(cell_list)))
            del cell_list[:]
        return (ptid, row_list)

class SendPartitionTable(Packet):
474
    """
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
    Send rows in a partition table to update other nodes. PM -> S, C.
    """
    def _encode(self, ptid, row_list):
        ptid = _encodePTID(ptid)
        body = [pack('!8sL', ptid, len(row_list))]
        for offset, cell_list in row_list:
            body.append(pack('!LL', offset, len(cell_list)))
            for uuid, state in cell_list:
                uuid = _encodeUUID(uuid)
                body.append(pack('!16sH', uuid, state))
        return ''.join(body)

    def _decode(self, body):
        index = 12
        (ptid, n,) = unpack('!8sL', body[:index])
        ptid = _decodePTID(ptid)
        row_list = []
        cell_list = []
        for i in xrange(n):
            offset, m = unpack('!LL', body[index:index+8])
            index += 8
            for j in xrange(m):
                uuid, state = unpack('!16sH', body[index:index+18])
                index += 18
                state = CellStates.get(state)
                uuid = _decodeUUID(uuid)
                cell_list.append((uuid, state))
            row_list.append((offset, tuple(cell_list)))
            del cell_list[:]
        return (ptid, row_list)

class NotifyPartitionChanges(Packet):
507 508
    """
    Notify a subset of a partition table. This is used to notify changes.
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
    PM -> S, C.
    """
    def _encode(self, ptid, cell_list):
        ptid = _encodePTID(ptid)
        body = [pack('!8sL', ptid, len(cell_list))]
        for offset, uuid, state in cell_list:
            uuid = _encodeUUID(uuid)
            body.append(pack('!L16sH', offset, uuid, state))
        return ''.join(body)

    def _decode(self, body):
        (ptid, n) = unpack('!8sL', body[:12])
        ptid = _decodePTID(ptid)
        cell_list = []
        for i in xrange(n):
            (offset, uuid, state) = unpack('!L16sH', body[12+i*22:34+i*22])
525
            state = CellStates.get(state)
526
            uuid = _decodeUUID(uuid)
527 528 529
            cell_list.append((offset, uuid, state))
        return (ptid, cell_list)

530 531 532 533 534 535 536 537 538 539 540 541 542 543
class NotifyReplicationDone(Packet):
    """
    Notify the master node that a partition has been successully replicated from
    a storage to another.
    S -> M
    """

    def _encode(self, offset):
        return pack('!L', offset)

    def _decode(self, body):
        (offset, ) = unpack('!L', body)
        return (offset, )

544 545
class StartOperation(Packet):
    """
546
    Tell a storage nodes to start an operation. Until a storage node receives
547 548 549 550 551 552
    this message, it must not serve client nodes. PM -> S.
    """
    pass

class StopOperation(Packet):
    """
553
    Tell a storage node to stop an operation. Once a storage node receives
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
    this message, it must not serve client nodes. PM -> S.
    """
    pass

class AskUnfinishedTransactions(Packet):
    """
    Ask unfinished transactions  PM -> S.
    """
    pass

class AnswerUnfinishedTransactions(Packet):
    """
    Answer unfinished transactions  S -> PM.
    """
    def _encode(self, tid_list):
        body = [pack('!L', len(tid_list))]
        body.extend(tid_list)
        return ''.join(body)

    def _decode(self, body):
        (n,) = unpack('!L', body[:4])
        tid_list = []
        for i in xrange(n):
            tid = unpack('8s', body[4+i*8:12+i*8])[0]
            tid_list.append(tid)
        return (tid_list,)

class AskObjectPresent(Packet):
    """
583
    Ask if an object is present. If not present, OID_NOT_FOUND should be
584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
    returned. PM -> S.
    """
    def _decode(self, body):
        (oid, tid) = unpack('8s8s', body)
        return (oid, _decodeTID(tid))

class AnswerObjectPresent(Packet):
    """
    Answer that an object is present. PM -> S.
    """
    def _decode(self, body):
        (oid, tid) = unpack('8s8s', body)
        return (oid, _decodeTID(tid))

class DeleteTransaction(Packet):
    """
    Delete a transaction. PM -> S.
    """
    def _encode(self, tid):
        return _encodeTID(tid)

    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (_decodeTID(tid), )

class CommitTransaction(Packet):
    """
    Commit a transaction. PM -> S.
    """
    def _encode(self, tid):
        return _encodeTID(tid)

    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (_decodeTID(tid), )

class AskBeginTransaction(Packet):
    """
    Ask to begin a new transaction. C -> PM.
    """
    def _encode(self, tid):
        return _encodeTID(tid)

    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (_decodeTID(tid), )

class AnswerBeginTransaction(Packet):
    """
    Answer when a transaction begin, give a TID if necessary. PM -> C.
    """
    def _encode(self, tid):
        return _encodeTID(tid)

    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (tid, )

642
class AskFinishTransaction(Packet):
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658
    """
    Finish a transaction. C -> PM.
    """
    def _encode(self, oid_list, tid):
        body = [pack('!8sL', tid, len(oid_list))]
        body.extend(oid_list)
        return ''.join(body)

    def _decode(self, body):
        (tid, n) = unpack('!8sL', body[:12])
        oid_list = []
        for i in xrange(n):
            oid = unpack('8s', body[12+i*8:20+i*8])[0]
            oid_list.append(oid)
        return (oid_list, tid)

659
class AnswerTransactionFinished(Packet):
660
    """
661
    Answer when a transaction is finished. PM -> C.
662 663 664 665 666 667 668 669
    """
    def _encode(self, tid):
        return _encodeTID(tid)

    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (_decodeTID(tid), )

670
class AskLockInformation(Packet):
671 672 673 674 675 676 677 678 679 680
    """
    Lock information on a transaction. PM -> S.
    """
    def _encode(self, tid):
        return _encodeTID(tid)

    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (_decodeTID(tid), )

681
class AnswerInformationLocked(Packet):
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
    """
    Notify information on a transaction locked. S -> PM.
    """
    def _encode(self, tid):
        return _encodeTID(tid)

    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (_decodeTID(tid), )

class InvalidateObjects(Packet):
    """
    Invalidate objects. PM -> C.
    """
    def _encode(self, oid_list, tid):
        body = [pack('!8sL', tid, len(oid_list))]
        body.extend(oid_list)
        return ''.join(body)

    def _decode(self, body):
        (tid, n) = unpack('!8sL', body[:12])
        oid_list = []
        for i in xrange(12, 12 + n * 8, 8):
            oid = unpack('8s', body[i:i+8])[0]
            oid_list.append(oid)
        return (oid_list, tid)

709
class NotifyUnlockInformation(Packet):
710 711 712 713 714 715 716
    """
    Unlock information on a transaction. PM -> S.
    """
    def _encode(self, tid):
        return _encodeTID(tid)

    def _decode(self, body):
717
        (tid, ) = unpack('8s', body)
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907
        return (_decodeTID(tid), )

class AskNewOIDs(Packet):
    """
    Ask new object IDs. C -> PM.
    """
    def _encode(self, num_oids):
        return pack('!H', num_oids)

    def _decode(self, body):
        return unpack('!H', body) # num oids

class AnswerNewOIDs(Packet):
    """
    Answer new object IDs. PM -> C.
    """
    def _encode(self, oid_list):
        body = [pack('!H', len(oid_list))]
        body.extend(oid_list)
        return ''.join(body)

    def _decode(self, body):
        (n,) = unpack('!H', body[:2])
        oid_list = []
        for i in xrange(n):
            oid = unpack('8s', body[2+i*8:10+i*8])[0]
            oid_list.append(oid)
        return (oid_list,)

class AskStoreObject(Packet):
    """
    Ask to store an object. Send an OID, an original serial, a current
    transaction ID, and data. C -> S.
    """
    def _encode(self, oid, serial, compression, checksum, data, tid):
        if serial is None:
            serial = INVALID_TID
        return pack('!8s8s8sBLL', oid, serial, tid, compression,
                          checksum, len(data)) + data

    def _decode(self, body):
        r = unpack('!8s8s8sBL', body[:29])
        oid, serial, tid, compression, checksum = r
        (data, _) = _readString(body, 'data', offset=29)
        return (oid, serial, compression, checksum, data, tid)

class AnswerStoreObject(Packet):
    """
    Answer if an object has been stored. If an object is in conflict,
    a serial of the conflicting transaction is returned. In this case,
    if this serial is newer than the current transaction ID, a client
    node must not try to resolve the conflict. S -> C.
    """
    def _encode(self, conflicting, oid, serial):
        if serial is None:
            serial = INVALID_TID
        return pack('!B8s8s', conflicting, oid, serial)

    def _decode(self, body):
        (conflicting, oid, serial) = unpack('!B8s8s', body)
        return (conflicting, oid, serial)

class AbortTransaction(Packet):
    """
    Abort a transaction. C -> S, PM.
    """
    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (tid, )

class AskStoreTransaction(Packet):
    """
    Ask to store a transaction. C -> S.
    """
    def _encode(self, tid, user, desc, ext, oid_list):
        lengths = (len(oid_list), len(user), len(desc), len(ext))
        body = [pack('!8sLHHH', tid, *lengths)]
        body.append(user)
        body.append(desc)
        body.append(ext)
        body.extend(oid_list)
        return ''.join(body)

    def _decode(self, body):
        r = unpack('!8sLHHH', body[:18])
        tid, oid_len, user_len, desc_len, ext_len = r
        body = body[18:]
        user = body[:user_len]
        body = body[user_len:]
        desc = body[:desc_len]
        body = body[desc_len:]
        ext = body[:ext_len]
        body = body[ext_len:]
        oid_list = []
        for i in xrange(oid_len):
            (oid, ) = unpack('8s', body[:8])
            body = body[8:]
            oid_list.append(oid)
        return (tid, user, desc, ext, oid_list)

class AnswerStoreTransaction(Packet):
    """
    Answer if transaction has been stored. S -> C.
    """
    def _encode(self, tid):
        return _encodeTID(tid)

    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (tid, )

class AskObject(Packet):
    """
    Ask a stored object by its OID and a serial or a TID if given. If a serial
    is specified, the specified revision of an object will be returned. If
    a TID is specified, an object right before the TID will be returned. S,C -> S.
    """
    def _encode(self, oid, serial, tid):
        tid = _encodeTID(tid)
        serial = _encodeTID(serial) # serial is the previous TID
        return pack('!8s8s8s', oid, serial, tid)

    def _decode(self, body):
        (oid, serial, tid) = unpack('8s8s8s', body)
        if serial == INVALID_TID:
            serial = None
        tid = _decodeTID(tid)
        return (oid, serial, tid)

class AnswerObject(Packet):
    """
    Answer the requested object. S -> C.
    """
    def _encode(self, oid, serial_start, serial_end, compression,
            checksum, data):
        if serial_start is None:
            serial_start = INVALID_TID
        if serial_end is None:
            serial_end = INVALID_TID
        return pack('!8s8s8sBLL', oid, serial_start, serial_end,
                          compression, checksum, len(data)) + data

    def _decode(self, body):
        r = unpack('!8s8s8sBL', body[:29])
        oid, serial_start, serial_end, compression, checksum = r
        if serial_end == INVALID_TID:
            serial_end = None
        (data, _) = _readString(body, 'data', offset=29)
        return (oid, serial_start, serial_end, compression, checksum, data)

class AskTIDs(Packet):
    """
    Ask for TIDs between a range of offsets. The order of TIDs is descending,
    and the range is [first, last). C, S -> S.
    """
    def _encode(self, first, last, partition):
        return pack('!QQL', first, last, partition)

    def _decode(self, body):
        return unpack('!QQL', body) # first, last, partition

class AnswerTIDs(Packet):
    """
    Answer the requested TIDs. S -> C, S.
    """
    def _encode(self, tid_list):
        body = [pack('!L', len(tid_list))]
        body.extend(tid_list)
        return ''.join(body)

    def _decode(self, body):
        (n, ) = unpack('!L', body[:4])
        tid_list = []
        for i in xrange(n):
            tid = unpack('8s', body[4+i*8:12+i*8])[0]
            tid_list.append(tid)
        return (tid_list,)

class AskTransactionInformation(Packet):
    """
    Ask information about a transaction. Any -> S.
    """
    def _decode(self, body):
        (tid, ) = unpack('8s', body)
        return (tid, )

class AnswerTransactionInformation(Packet):
    """
    Answer information (user, description) about a transaction. S -> Any.
    """
908 909 910 911
    def _encode(self, tid, user, desc, ext, packed, oid_list):
        packed = packed and 1 or 0
        body = [pack('!8sHHHBL', tid, len(user), len(desc), len(ext),
            packed, len(oid_list))]
912 913 914 915 916 917 918
        body.append(user)
        body.append(desc)
        body.append(ext)
        body.extend(oid_list)
        return ''.join(body)

    def _decode(self, body):
919 920 921
        r = unpack('!8sHHHBL', body[:18])
        tid, user_len, desc_len, ext_len, packed, oid_len = r
        packed = bool(packed)
922 923 924 925 926 927 928 929 930 931 932 933
        body = body[18:]
        user = body[:user_len]
        body = body[user_len:]
        desc = body[:desc_len]
        body = body[desc_len:]
        ext = body[:ext_len]
        body = body[ext_len:]
        oid_list = []
        for i in xrange(oid_len):
            (oid, ) = unpack('8s', body[:8])
            body = body[8:]
            oid_list.append(oid)
934
        return (tid, user, desc, ext, packed, oid_list)
935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050

class AskObjectHistory(Packet):
    """
    Ask history information for a given object. The order of serials is
    descending, and the range is [first, last]. C, S -> S.
    """
    def _encode(self, oid, first, last):
        return pack('!8sQQ', oid, first, last)

    def _decode(self, body):
        (oid, first, last) = unpack('!8sQQ', body)
        return (oid, first, last)

class AnswerObjectHistory(Packet):
    """
    Answer history information (serial, size) for an object. S -> C, S.
    """
    def _encode(self, oid, history_list):
        body = [pack('!8sL', oid, len(history_list))]
        for serial, size in history_list:
            body.append(pack('!8sL', serial, size))
        return ''.join(body)

    def _decode(self, body):
        (oid, length) = unpack('!8sL', body[:12])
        history_list = []
        for i in xrange(12, 12 + length * 12, 12):
            serial, size = unpack('!8sL', body[i:i+12])
            history_list.append((serial, size))
        return (oid, history_list)

class AskOIDs(Packet):
    """
    Ask for OIDs between a range of offsets. The order of OIDs is descending,
    and the range is [first, last). S -> S.
    """
    def _encode(self, first, last, partition):
        return pack('!QQL', first, last, partition)

    def _decode(self, body):
        return unpack('!QQL', body) # first, last, partition

class AnswerOIDs(Packet):
    """
    Answer the requested OIDs. S -> S.
    """
    def _encode(self, oid_list):
        body = [pack('!L', len(oid_list))]
        body.extend(oid_list)
        return ''.join(body)

    def _decode(self, body):
        (n,) = unpack('!L', body[:4])
        oid_list = []
        for i in xrange(n):
            oid = unpack('8s', body[4+i*8:12+i*8])[0]
            oid_list.append(oid)
        return (oid_list,)

class AskPartitionList(Packet):
    """
    All the following messages are for neoctl to admin node
    Ask information about partition
    """
    def _encode(self, min_offset, max_offset, uuid):
        uuid = _encodeUUID(uuid)
        body = [pack('!LL16s', min_offset, max_offset, uuid)]
        return ''.join(body)

    def _decode(self, body):
        (min_offset, max_offset, uuid) =  unpack('!LL16s', body)
        uuid = _decodeUUID(uuid)
        return (min_offset, max_offset, uuid)

class AnswerPartitionList(Packet):
    """
    Answer information about partition
    """
    def _encode(self, ptid, row_list):
        ptid = _encodePTID(ptid)
        body = [pack('!8sL', ptid, len(row_list))]
        for offset, cell_list in row_list:
            body.append(pack('!LL', offset, len(cell_list)))
            for uuid, state in cell_list:
                uuid = _encodeUUID(uuid)
                body.append(pack('!16sH', uuid, state))
        return ''.join(body)

    def _decode(self, body):
        index = 12
        (ptid, n) = unpack('!8sL', body[:index])
        ptid = _decodePTID(ptid)
        row_list = []
        cell_list = []
        for i in xrange(n):
            offset, m = unpack('!LL', body[index:index+8])
            index += 8
            for j in xrange(m):
                uuid, state = unpack('!16sH', body[index:index+18])
                index += 18
                state = CellStates.get(state)
                uuid = _decodeUUID(uuid)
                cell_list.append((uuid, state))
            row_list.append((offset, tuple(cell_list)))
            del cell_list[:]
        return (ptid, row_list)

class AskNodeList(Packet):
    """
    Ask information about nodes
    """
    def _encode(self, node_type):
        return ''.join([pack('!H', node_type)])

    def _decode(self, body):
        (node_type, ) = unpack('!H', body)
1051
        node_type = _decodeNodeType(node_type)
1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
        return (node_type,)

class AnswerNodeList(Packet):
    """
    Answer information about nodes
    """
    def _encode(self, node_list):
        body = [pack('!L', len(node_list))]
        for node_type, address, uuid, state in node_list:
            uuid = _encodeUUID(uuid)
            address = _encodeAddress(address)
            body.append(pack('!H6s16sH', node_type, address, uuid, state))
        return ''.join(body)

    def _decode(self, body):
        (n,) = unpack('!L', body[:4])
        node_list = []
        for i in xrange(n):
            r = unpack('!H6s16sH', body[4+i*26:30+i*26])
            node_type, address, uuid, state = r
            address = _decodeAddress(address)
            node_type = _decodeNodeType(node_type)
            state = _decodeNodeState(state)
            uuid = _decodeUUID(uuid)
            node_list.append((node_type, address, uuid, state))
        return (node_list,)

class SetNodeState(Packet):
    """
    Set the node state
    """
    def _encode(self, uuid, state, modify_partition_table):
        uuid = _encodeUUID(uuid)
        return ''.join([pack('!16sHB', uuid, state, modify_partition_table)])

    def _decode(self, body):
        (uuid, state, modify) = unpack('!16sHB', body)
        state = _decodeNodeState(state)
        uuid = _decodeUUID(uuid)
        return (uuid, state, modify)

class AnswerNodeState(Packet):
    """
    Answer state of the node
    """
    def _encode(self, uuid, state):
        uuid = _encodeUUID(uuid)
        return ''.join([pack('!16sH', uuid, state)])

    def _decode(self, body):
        (uuid, state) = unpack('!16sH', body)
1103 1104
        state = _decodeNodeState(state)
        uuid = _decodeUUID(uuid)
1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118
        return (uuid, state)

class AddPendingNodes(Packet):
    """
    Ask the primary to include some pending node in the partition table
    """
    def _encode(self, uuid_list=()):
        # an empty list means all current pending nodes
        uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
        return pack('!H', len(uuid_list)) + ''.join(uuid_list)

    def _decode(self, body):
        (n, ) = unpack('!H', body[:2])
        uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
1119
        uuid_list = [_decodeUUID(x) for x in uuid_list]
1120 1121 1122 1123
        return (uuid_list, )

class AnswerNewNodes(Packet):
    """
Grégory Wisniewski's avatar
Grégory Wisniewski committed
1124
    Answer what are the nodes added in the partition table
1125 1126 1127 1128 1129 1130 1131 1132 1133
    """
    def _encode(self, uuid_list):
        # an empty list means no new nodes
        uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
        return pack('!H', len(uuid_list)) + ''.join(uuid_list)

    def _decode(self, body):
        (n, ) = unpack('!H', body[:2])
        uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
1134
        uuid_list = [_decodeUUID(x) for x in uuid_list]
1135 1136 1137
        return (uuid_list, )

class NotifyNodeInformation(Packet):
1138
    """
1139
    Notify information about one or more nodes. PM -> Any.
1140 1141 1142 1143 1144 1145 1146 1147
    """
    def _encode(self, node_list):
        body = [pack('!L', len(node_list))]
        for node_type, address, uuid, state in node_list:
            uuid = _encodeUUID(uuid)
            address = _encodeAddress(address)
            body.append(pack('!H6s16sH', node_type, address, uuid, state))
        return ''.join(body)
1148

1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
    def _decode(self, body):
        (n,) = unpack('!L', body[:4])
        node_list = []
        for i in xrange(n):
            r = unpack('!H6s16sH', body[4+i*26:30+i*26])
            node_type, address, uuid, state = r
            address = _decodeAddress(address)
            node_type = _decodeNodeType(node_type)
            state = _decodeNodeState(state)
            uuid = _decodeUUID(uuid)
            node_list.append((node_type, address, uuid, state))
        return (node_list,)

class AskNodeInformation(Packet):
    """
    Ask node information
    """
    pass

class AnswerNodeInformation(Packet):
    """
    Answer node information
    """
    pass

class SetClusterState(Packet):
    """
    Set the cluster state
    """
    def _encode(self, state):
        return pack('!H', state)

    def _decode(self, body):
        (state, ) = unpack('!H', body[:2])
        state = _decodeClusterState(state)
        return (state, )

class NotifyClusterInformation(Packet):
    """
    Notify information about the cluster
    """
    def _encode(self, state):
        return pack('!H', state)

    def _decode(self, body):
        (state, ) = unpack('!H', body)
        state = _decodeClusterState(state)
        return (state, )

class AskClusterState(Packet):
    """
    Ask state of the cluster
    """
1202
    pass
1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225

class AnswerClusterState(Packet):
    """
    Answer state of the cluster
    """
    def _encode(self, state):
        return pack('!H', state)

    def _decode(self, body):
        (state, ) = unpack('!H', body)
        state = _decodeClusterState(state)
        return (state, )

class NotifyLastOID(Packet):
    """
    Notify last OID generated
    """
    def _decode(self, body):
        (loid, ) = unpack('8s', body)
        return (loid, )

class Error(Packet):
    """
1226 1227
    Error is a special type of message, because this can be sent against
    any other message, even if such a message does not expect a reply
1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240
    usually. Any -> Any.
    """
    def _encode(self, code, message):
        return pack('!HL', code, len(message)) + message

    def _decode(self, body):
        (code, ) = unpack('!H', body[:2])
        code = _decodeErrorCode(code)
        (message, _) = _readString(body, 'message', offset=2)
        return (code, message)


StaticRegistry = {}
1241
def register(code, request, answer=None):
1242
    """ Register a packet in the packet registry """
1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258
    # register the request
    # assert code & RESPONSE_MASK == 0
    assert code not in StaticRegistry, "Duplicate request packet code"
    request._code = code
    request._answer = answer
    StaticRegistry[code] = request
    if answer not in (None, Error):
        # compute the answer code
        code = code | RESPONSE_MASK
        answer._request = request
        answer._code = code
        # and register the answer packet
        assert code not in StaticRegistry, "Duplicate response packet code"
        StaticRegistry[code] = answer
        return (request, answer)
    return request
1259 1260 1261


class PacketRegistry(dict):
1262
    """
1263 1264
    Packet registry that check packet code unicity and provide an index
    """
1265 1266 1267

    def __init__(self):
        dict.__init__(self)
1268
        # load packet classes
1269 1270
        self.update(StaticRegistry)

1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290
    def parse(self, msg):
        if len(msg) < MIN_PACKET_SIZE:
            return None
        msg_id, msg_type, msg_len = unpack('!LHL', msg[:PACKET_HEADER_SIZE])
        try:
            packet_klass = self[msg_type]
        except KeyError:
            raise PacketMalformedError('Unknown packet type')
        if msg_len > MAX_PACKET_SIZE:
            raise PacketMalformedError('message too big (%d)' % msg_len)
        if msg_len < MIN_PACKET_SIZE:
            raise PacketMalformedError('message too small (%d)' % msg_len)
        if len(msg) < msg_len:
            # Not enough.
            return None
        packet = packet_klass()
        packet.setContent(msg_id, msg[PACKET_HEADER_SIZE:msg_len])
        return packet

    # packets registration
1291
    Error = register(0x8000, Error)
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
    Ping, Pong = register(
            0x0001,
            Ping,
            Pong)
    RequestIdentification, AcceptIdentification = register(
            0x0002,
            RequestIdentification,
            AcceptIdentification)
    AskPrimary, AnswerPrimary = register(
            0x0003,
            AskPrimary,
            AnswerPrimary)
1304 1305 1306
    AnnouncePrimary = register(0x0004, AnnouncePrimary)
    ReelectPrimary = register(0x0005, ReelectPrimary)
    NotifyNodeInformation = register(0x0006, NotifyNodeInformation)
1307 1308 1309 1310 1311 1312 1313 1314
    AskLastIDs, AnswerLastIDs = register(
            0x0007,
            AskLastIDs,
            AnswerLastIDs)
    AskPartitionTable, AnswerPartitionTable = register(
            0x0008,
            AskPartitionTable,
            AnswerPartitionTable)
1315 1316 1317 1318
    SendPartitionTable = register(0x0009, SendPartitionTable)
    NotifyPartitionChanges = register(0x000A, NotifyPartitionChanges)
    StartOperation = register(0x000B, StartOperation)
    StopOperation = register(0x000C, StopOperation)
1319 1320 1321
    AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
            0x000D,
            AskUnfinishedTransactions,
1322
            AnswerUnfinishedTransactions)
1323 1324 1325 1326
    AskObjectPresent, AnswerObjectPresent = register(
            0x000f,
            AskObjectPresent,
            AnswerObjectPresent)
1327 1328
    DeleteTransaction = register(0x0010, DeleteTransaction)
    CommitTransaction = register(0x0011, CommitTransaction)
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340
    AskBeginTransaction, AnswerBeginTransaction = register(
            0x0012,
            AskBeginTransaction,
            AnswerBeginTransaction)
    AskFinishTransaction, AnswerTransactionFinished = register(
            0x0013,
            AskFinishTransaction,
            AnswerTransactionFinished)
    AskLockInformation, AnswerInformationLocked = register(
            0x0014,
            AskLockInformation,
            AnswerInformationLocked)
1341
    InvalidateObjects = register(0x0015, InvalidateObjects)
1342
    NotifyUnlockInformation = register(0x0016, NotifyUnlockInformation)
1343 1344 1345 1346 1347 1348 1349 1350
    AskNewOIDs, AnswerNewOIDs = register(
            0x0017,
            AskNewOIDs,
            AnswerNewOIDs)
    AskStoreObject, AnswerStoreObject = register(
            0x0018,
            AskStoreObject,
            AnswerStoreObject)
1351
    AbortTransaction = register(0x0019, AbortTransaction)
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
    AskStoreTransaction, AnswerStoreTransaction = register(
            0x001A,
            AskStoreTransaction,
            AnswerStoreTransaction)
    AskObject, AnswerObject = register(
            0x001B,
            AskObject,
            AnswerObject)
    AskTIDs, AnswerTIDs = register(
            0x001C,
            AskTIDs,
            AnswerTIDs)
    AskTransactionInformation, AnswerTransactionInformation = register(
            0x001E,
            AskTransactionInformation,
1367
            AnswerTransactionInformation)
1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399
    AskObjectHistory, AnswerObjectHistory = register(
            0x001F,
            AskObjectHistory,
            AnswerObjectHistory)
    AskOIDs, AnswerOIDs = register(
            0x0020,
            AskOIDs,
            AnswerOIDs)
    AskPartitionList, AnswerPartitionList = register(
            0x0021,
            AskPartitionList,
            AnswerPartitionList)
    AskNodeList, AnswerNodeList = register(
            0x0022,
            AskNodeList,
            AnswerNodeList)
    SetNodeState, AnswerNodeState = register(
            0x0023,
            SetNodeState,
            AnswerNodeState)
    AddPendingNodes, AnswerNewNodes = register(
            0x0024,
            AddPendingNodes,
            AnswerNewNodes)
    AskNodeInformation, AnswerNodeInformation = register(
            0x0025,
            AskNodeInformation,
            AnswerNodeInformation)
    SetClusterState = register(
            0x0026,
            SetClusterState,
            Error)
1400
    NotifyClusterInformation = register(0x0027, NotifyClusterInformation)
1401 1402 1403 1404
    AskClusterState, AnswerClusterState = register(
            0x0028,
            AskClusterState,
            AnswerClusterState)
1405
    NotifyLastOID = register(0x0030, NotifyLastOID)
1406
    NotifyReplicationDone = register(0x0031, NotifyReplicationDone)
1407

1408
# build a "singleton"
1409 1410
Packets = PacketRegistry()

1411 1412 1413 1414
def register(code):
    def wrapper(registry, message=''):
        return Error(code, message)
    return wrapper
1415

1416 1417 1418 1419
class ErrorRegistry(dict):
    """
        Error packet packet registry
    """
1420

1421 1422
    def __init__(self):
        dict.__init__(self)
1423

1424 1425 1426 1427 1428 1429
    Ack = register(ErrorCodes.ACK)
    ProtocolError = register(ErrorCodes.PROTOCOL_ERROR)
    TidNotFound = register(ErrorCodes.TID_NOT_FOUND)
    OidNotFound = register(ErrorCodes.OID_NOT_FOUND)
    NotReady = register(ErrorCodes.NOT_READY)
    Broken = register(ErrorCodes.BROKEN_NODE)
1430

1431
Errors = ErrorRegistry()
1432