protocol.py 36 KB
Newer Older
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1 2
from struct import pack, unpack
from socket import inet_ntoa, inet_aton
3 4
import logging

Yoshinori Okuji's avatar
Yoshinori Okuji committed
5
from neo.util import dump
Yoshinori Okuji's avatar
Yoshinori Okuji committed
6 7 8 9 10 11 12 13 14

# The protocol version (major, minor).
PROTOCOL_VERSION = (4, 0)

# Size restrictions.
MIN_PACKET_SIZE = 10
MAX_PACKET_SIZE = 0x100000

# Message types.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
15 16 17

# Error is a special type of message, because this can be sent against any other message,
# even if such a message does not expect a reply usually. Any -> Any.
18
ERROR = 0x8000
Yoshinori Okuji's avatar
Yoshinori Okuji committed
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35

# Check if a peer is still alive. Any -> Any.
PING = 0x0001

# Notify being alive. Any -> Any.
PONG = 0x8001

# Request a node identification. This must be the first packet for any connection.
# Any -> Any.
REQUEST_NODE_IDENTIFICATION = 0x0002

# Accept a node identification. This should be a reply to Request Node Identification.
# Any -> Any.
ACCEPT_NODE_IDENTIFICATION = 0x8002

# Ask a current primary master node. This must be the second message when connecting
# to a master node. Any -> M.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
36
ASK_PRIMARY_MASTER = 0x0003
Yoshinori Okuji's avatar
Yoshinori Okuji committed
37 38 39

# Reply to Ask Primary Master. This message includes a list of known master nodes,
# to make sure that a peer has the same information. M -> Any.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
40
ANSWER_PRIMARY_MASTER = 0x8003
Yoshinori Okuji's avatar
Yoshinori Okuji committed
41 42

# Announce a primary master node election. PM -> SM.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
43
ANNOUNCE_PRIMARY_MASTER = 0x0004
Yoshinori Okuji's avatar
Yoshinori Okuji committed
44 45

# Force a re-election of a primary master node. M -> M.
46
REELECT_PRIMARY_MASTER = 0x0005
Yoshinori Okuji's avatar
Yoshinori Okuji committed
47 48

# Notify information about one or more nodes. Any -> PM, PM -> Any.
Yoshinori Okuji's avatar
Yoshinori Okuji committed
49
NOTIFY_NODE_INFORMATION = 0x0006
Yoshinori Okuji's avatar
Yoshinori Okuji committed
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77

# Ask the last OID, the last TID and the last Partition Table ID that a storage node
# stores. Used to recover information. PM -> S.
ASK_LAST_IDS = 0x0007

# Reply to Ask Last IDs. S -> PM.
ANSWER_LAST_IDS = 0x8007

# Ask rows in a partition table that a storage node stores. Used to recover
# information. PM -> S.
ASK_PARTITION_TABLE = 0x0008

# Answer rows in a partition table. S -> PM.
ANSWER_PARTITION_TABLE = 0x8008

# Send rows in a partition table to update other nodes. PM -> S, C.
SEND_PARTITION_TABLE = 0x0009

# Notify a subset of a partition table. This is used to notify changes. PM -> S, C.
NOTIFY_PARTITION_CHANGES = 0x000a

# Tell a storage nodes to start an operation. Until a storage node receives this
# message, it must not serve client nodes. PM -> S.
START_OPERATION = 0x000b

# Tell a storage node to stop an operation. Once a storage node receives this message,
# it must not serve client nodes. PM -> S.
STOP_OPERATION = 0x000c
Yoshinori Okuji's avatar
Yoshinori Okuji committed
78

79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
# Ask unfinished transactions' IDs. PM -> S.
ASK_UNFINISHED_TRANSACTIONS = 0x000d

# Answer unfinished transactions' IDs. S -> PM.
ANSWER_UNFINISHED_TRANSACTIONS = 0x800d

# Ask if an object is present. If not present, OID_NOT_FOUND should be returned. PM -> S.
ASK_OBJECT_PRESENT = 0x000f

# Answer that an object is present. PM -> S.
ANSWER_OBJECT_PRESENT = 0x800f

# Delete a transaction. PM -> S.
DELETE_TRANSACTION = 0x0010

# Commit a transaction. PM -> S.
COMMIT_TRANSACTION = 0x0011

Yoshinori Okuji's avatar
Yoshinori Okuji committed
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
# Ask a new transaction ID. C -> PM.
ASK_NEW_TID = 0x0012

# Answer a new transaction ID. PM -> C.
ANSWER_NEW_TID = 0x8012

# Finish a transaction. C -> PM.
FINISH_TRANSACTION = 0x0013

# Notify a transaction finished. PM -> C.
NOTIFY_TRANSACTION_FINISHED = 0x8013

# Lock information on a transaction. PM -> S.
LOCK_INFORMATION = 0x0014

# Notify information on a transaction locked. S -> PM.
NOTIFY_INFORMATION_LOCKED = 0x8014

# Invalidate objects. PM -> C.
INVALIDATE_OBJECTS = 0x0015

# Unlock information on a transaction. PM -> S.
UNLOCK_INFORMATION = 0x0016

121 122
# Ask new object IDs. C -> PM.
ASK_NEW_OIDS = 0x0017
Aurel's avatar
Aurel committed
123

124 125
# Answer new object IDs. PM -> C.
ANSWER_NEW_OIDS = 0x8017
Aurel's avatar
Aurel committed
126

127 128
# Ask to store an object. Send an OID, an original serial, a current
# transaction ID, and data. C -> S.
Aurel's avatar
Aurel committed
129 130
ASK_STORE_OBJECT = 0x0018

131 132 133 134
# Answer if an object has been stored. If an object is in conflict,
# a serial of the conflicting transaction is returned. In this case,
# if this serial is newer than the current transaction ID, a client
# node must not try to resolve the conflict. S -> C.
Aurel's avatar
Aurel committed
135 136 137 138 139 140 141 142 143 144 145
ANSWER_STORE_OBJECT = 0x8018

# Abort a transaction. C -> S
ABORT_TRANSACTION = 0x0019

# Ask to store a transaction. C -> S.
ASK_STORE_TRANSACTION = 0x001a

# Answer if transaction has been stored. S -> C.
ANSWER_STORE_TRANSACTION = 0x801a

146 147 148 149
# Ask a stored object by its OID and a serial or a TID if given. If a serial
# is specified, the specified revision of an object will be returned. If
# a TID is specified, an object right before the TID will be returned. C -> S.
ASK_OBJECT = 0x001b
Aurel's avatar
Aurel committed
150

151
# Answer the requested object. S -> C.
152
ANSWER_OBJECT = 0x801b
Aurel's avatar
Aurel committed
153

154 155
# Ask for TIDs between a range of offsets. The order of TIDs is descending,
# and the range is [first, last). C -> S.
Aurel's avatar
Aurel committed
156 157
ASK_TIDS = 0x001d

158
# Answer the requested TIDs. S -> C.
Aurel's avatar
Aurel committed
159 160
ANSWER_TIDS = 0x801d

161
# Ask information about a transaction. PM, C -> S.
162
ASK_TRANSACTION_INFORMATION = 0x001e
Aurel's avatar
Aurel committed
163

164
# Answer information (user, description) about a transaction. S -> C, PM.
Aurel's avatar
Aurel committed
165 166 167
ANSWER_TRANSACTION_INFORMATION = 0x801e

# Ask history information for a given object. C -> S.
168
ASK_OBJECT_HISTORY = 0x001f
Aurel's avatar
Aurel committed
169 170 171

# Answer history information (serial, size) for an object. S -> C.
ANSWER_OBJECT_HISTORY = 0x801f
172

Yoshinori Okuji's avatar
Yoshinori Okuji committed
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
# Error codes.
NOT_READY_CODE = 1
OID_NOT_FOUND_CODE = 2
SERIAL_NOT_FOUND_CODE = 3
TID_NOT_FOUND_CODE = 4
PROTOCOL_ERROR_CODE = 5
TIMEOUT_ERROR_CODE = 6
BROKEN_NODE_DISALLOWED_CODE = 7
INTERNAL_ERROR_CODE = 8

# Node types.
MASTER_NODE_TYPE = 1
STORAGE_NODE_TYPE = 2
CLIENT_NODE_TYPE = 3

VALID_NODE_TYPE_LIST = (MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE)

190 191
# Node states.
RUNNING_STATE = 0
Yoshinori Okuji's avatar
Yoshinori Okuji committed
192 193 194
TEMPORARILY_DOWN_STATE = 1
DOWN_STATE = 2
BROKEN_STATE = 3
195 196 197

VALID_NODE_STATE_LIST = (RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, BROKEN_STATE)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
198 199 200 201 202 203
# Partition cell states.
UP_TO_DATE_STATE = 0
OUT_OF_DATE_STATE = 1
FEEDING_STATE = 2
DISCARDED_STATE = 3

204
VALID_CELL_STATE_LIST = (UP_TO_DATE_STATE, OUT_OF_DATE_STATE, FEEDING_STATE,
Yoshinori Okuji's avatar
Yoshinori Okuji committed
205 206
                         DISCARDED_STATE)

207 208 209 210 211
# Other constants.
INVALID_UUID = '\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0'
INVALID_TID = '\0\0\0\0\0\0\0\0'
INVALID_SERIAL = '\0\0\0\0\0\0\0\0'
INVALID_OID = '\0\0\0\0\0\0\0\0'
Yoshinori Okuji's avatar
Yoshinori Okuji committed
212
INVALID_PTID = '\0\0\0\0\0\0\0\0'
213

Yoshinori Okuji's avatar
Yoshinori Okuji committed
214 215
class ProtocolError(Exception): pass

Yoshinori Okuji's avatar
Yoshinori Okuji committed
216
class Packet(object):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
217 218 219 220 221 222 223 224
    """A packet."""

    _id = None
    _type = None
    _len = None

    @classmethod
    def parse(cls, msg):
225
        # logging.debug('parsing %s', dump(msg))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
226 227
        if len(msg) < MIN_PACKET_SIZE:
            return None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
228
        msg_id, msg_type, msg_len = unpack('!LHL', msg[:10])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
229
        if msg_len > MAX_PACKET_SIZE:
230
            raise ProtocolError(cls(msg_id, msg_type),
Yoshinori Okuji's avatar
Yoshinori Okuji committed
231 232
                                'message too big (%d)' % msg_len)
        if msg_len < MIN_PACKET_SIZE:
233
            raise ProtocolError(cls(msg_id, msg_type),
Yoshinori Okuji's avatar
Yoshinori Okuji committed
234 235 236 237
                                'message too small (%d)' % msg_len)
        if len(msg) < msg_len:
            # Not enough.
            return None
238
        return cls(msg_id, msg_type, msg[10:msg_len])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
239 240 241 242 243 244 245 246 247 248 249 250 251

    def __init__(self, msg_id = None, msg_type = None, body = None):
        self._id = msg_id
        self._type = msg_type
        self._body = body

    def getId(self):
        return self._id

    def getType(self):
        return self._type

    def __len__(self):
252
        return 10 + len(self._body)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
253 254 255

    # Encoders.
    def encode(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
256
        msg = pack('!LHL', self._id, self._type, 10 + len(self._body)) + self._body
Yoshinori Okuji's avatar
Yoshinori Okuji committed
257 258 259 260 261 262 263 264 265 266
        if len(msg) > MAX_PACKET_SIZE:
            raise ProtocolError(self, 'message too big (%d)' % len(msg))
        return msg

    __str__ = encode

    def error(self, msg_id, error_code, error_message):
        self._id = msg_id
        self._type = ERROR
        self._body = pack('!HL', error_code, len(error_message)) + error_message
267
        return self
Yoshinori Okuji's avatar
Yoshinori Okuji committed
268 269

    def protocolError(self, msg_id, error_message):
270
        return self.error(msg_id, PROTOCOL_ERROR_CODE, 'protocol error: ' + error_message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
271 272

    def internalError(self, msg_id, error_message):
273 274 275
        return self.error(msg_id, INTERNAL_ERROR_CODE, 'internal error: ' + error_message)

    def notReady(self, msg_id, error_message):
Aurel's avatar
Aurel committed
276
        return self.error(msg_id, NOT_READY_CODE, 'not ready: ' + error_message)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
277

278
    def brokenNodeDisallowedError(self, msg_id, error_message):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
279
        return self.error(msg_id, BROKEN_NODE_DISALLOWED_CODE,
280 281
                          'broken node disallowed error: ' + error_message)

282
    def oidNotFound(self, msg_id, error_message):
283
        return self.error(msg_id, OID_NOT_FOUND_CODE,
284 285 286
                          'oid not found: ' + error_message)

    def tidNotFound(self, msg_id, error_message):
287
        return self.error(msg_id, TID_NOT_FOUND_CODE,
288 289
                          'tid not found: ' + error_message)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
290 291 292 293
    def ping(self, msg_id):
        self._id = msg_id
        self._type = PING
        self._body = ''
294
        return self
Yoshinori Okuji's avatar
Yoshinori Okuji committed
295 296 297 298 299

    def pong(self, msg_id):
        self._id = msg_id
        self._type = PONG
        self._body = ''
300
        return self
Yoshinori Okuji's avatar
Yoshinori Okuji committed
301 302 303 304

    def requestNodeIdentification(self, msg_id, node_type, uuid, ip_address, port, name):
        self._id = msg_id
        self._type = REQUEST_NODE_IDENTIFICATION
305
        self._body = pack('!LLH16s4sHL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
Yoshinori Okuji's avatar
Yoshinori Okuji committed
306
                          node_type, uuid, inet_aton(ip_address), port, len(name)) + name
307
        return self
Yoshinori Okuji's avatar
Yoshinori Okuji committed
308

Aurel's avatar
Aurel committed
309 310
    def acceptNodeIdentification(self, msg_id, node_type, uuid, ip_address,
                                 port, num_partitions, num_replicas):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
311 312
        self._id = msg_id
        self._type = ACCEPT_NODE_IDENTIFICATION
313 314
        self._body = pack('!H16s4sHLL', node_type, uuid,
                          inet_aton(ip_address), port,
315
                          num_partitions, num_replicas)
316 317
        return self

Yoshinori Okuji's avatar
Yoshinori Okuji committed
318
    def askPrimaryMaster(self, msg_id):
319 320
        self._id = msg_id
        self._type = ASK_PRIMARY_MASTER
Yoshinori Okuji's avatar
Yoshinori Okuji committed
321
        self._body = ''
322 323
        return self

Yoshinori Okuji's avatar
Yoshinori Okuji committed
324
    def answerPrimaryMaster(self, msg_id, primary_uuid, known_master_list):
325 326
        self._id = msg_id
        self._type = ANSWER_PRIMARY_MASTER
Yoshinori Okuji's avatar
Yoshinori Okuji committed
327
        body = [primary_uuid, pack('!L', len(known_master_list))]
328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
        for master in known_master_list:
            body.append(pack('!4sH16s', inet_aton(master[0]), master[1], master[2]))
        self._body = ''.join(body)
        return self

    def announcePrimaryMaster(self, msg_id):
        self._id = msg_id
        self._type = ANNOUNCE_PRIMARY_MASTER
        self._body = ''
        return self

    def reelectPrimaryMaster(self, msg_id):
        self._id = msg_id
        self._type = REELECT_PRIMARY_MASTER
        self._body = ''
        return self
Yoshinori Okuji's avatar
Yoshinori Okuji committed
344

Yoshinori Okuji's avatar
Yoshinori Okuji committed
345
    def notifyNodeInformation(self, msg_id, node_list):
346
        self._id = msg_id
Yoshinori Okuji's avatar
Yoshinori Okuji committed
347
        self._type = NOTIFY_NODE_INFORMATION
348 349 350
        body = [pack('!L', len(node_list))]
        for node_type, ip_address, port, uuid, state in node_list:
            body.append(pack('!H4sH16sH', node_type, inet_aton(ip_address), port,
Yoshinori Okuji's avatar
Yoshinori Okuji committed
351
                             uuid, state))
352 353 354
        self._body = ''.join(body)
        return self

Yoshinori Okuji's avatar
Yoshinori Okuji committed
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
    def askLastIDs(self, msg_id):
        self._id = msg_id
        self._type = ASK_LAST_IDS
        self._body = ''
        return self

    def answerLastIDs(self, msg_id, loid, ltid, lptid):
        self._id = msg_id
        self._type = ANSWER_LAST_IDS
        self._body = loid + ltid + lptid
        return self

    def askPartitionTable(self, msg_id, offset_list):
        self._id = msg_id
        self._type = ASK_PARTITION_TABLE
        body = [pack('!L', len(offset_list))]
        for offset in offset_list:
            body.append(pack('!L', offset))
        self._body = ''.join(body)
        return self

    def answerPartitionTable(self, msg_id, ptid, row_list):
        self._id = msg_id
        self._type = ANSWER_PARTITION_TABLE
        body = [pack('!8sL', ptid, len(row_list))]
        for offset, cell_list in row_list:
            body.append(pack('!LL', offset, len(cell_list)))
            for uuid, state in cell_list:
                body.append(pack('!16sH', uuid, state))
        self._body = ''.join(body)
        return self

387
    def sendPartitionTable(self, msg_id, ptid, row_list):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
        self._id = msg_id
        self._type = SEND_PARTITION_TABLE
        body = [pack('!8sL', ptid, len(row_list))]
        for offset, cell_list in row_list:
            body.append(pack('!LL', offset, len(cell_list)))
            for uuid, state in cell_list:
                body.append(pack('!16sH', uuid, state))
        self._body = ''.join(body)
        return self

    def notifyPartitionChanges(self, msg_id, ptid, cell_list):
        self._id = msg_id
        self._type = NOTIFY_PARTITION_CHANGES
        body = [pack('!8sL', ptid, len(cell_list))]
        for offset, uuid, state in cell_list:
            body.append(pack('!L16sH', offset, uuid, state))
        self._body = ''.join(body)
        return self

    def startOperation(self, msg_id):
        self._id = msg_id
        self._type = START_OPERATION
        self._body = ''
        return self

    def stopOperation(self, msg_id):
        self._id = msg_id
415 416 417 418 419 420 421
        self._type = STOP_OPERATION
        self._body = ''
        return self

    def askUnfinishedTransactions(self, msg_id):
        self._id = msg_id
        self._type = ASK_UNFINISHED_TRANSACTIONS
Yoshinori Okuji's avatar
Yoshinori Okuji committed
422 423 424
        self._body = ''
        return self

425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456
    def answerUnfinishedTransactions(self, msg_id, tid_list):
        self._id = msg_id
        self._type = ANSWER_UNFINISHED_TRANSACTIONS
        body = [pack('!L', len(tid_list))]
        body.extend(tid_list)
        self._body = ''.join(body)
        return self

    def askObjectPresent(self, msg_id, oid, tid):
        self._id = msg_id
        self._type = ASK_OBJECT_PRESENT
        self._body = oid + tid
        return self

    def answerObjectPresent(self, msg_id, oid, tid):
        self._id = msg_id
        self._type = ANSWER_OBJECT_PRESENT
        self._body = oid + tid
        return self

    def deleteTransaction(self, msg_id, tid):
        self._id = msg_id
        self._type = DELETE_TRANSACTION
        self._body = tid
        return self

    def commitTransaction(self, msg_id, tid):
        self._id = msg_id
        self._type = COMMIT_TRANSACTION
        self._body = tid
        return self

Yoshinori Okuji's avatar
Yoshinori Okuji committed
457 458 459 460 461 462 463 464 465 466 467 468
    def askNewTID(self, msg_id):
        self._id = msg_id
        self._type = ASK_NEW_TID
        self._body = ''
        return self

    def answerNewTID(self, msg_id, tid):
        self._id = msg_id
        self._type = ANSWER_NEW_TID
        self._body = tid
        return self

469
    def askNewOIDs(self, msg_id, num_oids):
Aurel's avatar
Aurel committed
470
        self._id = msg_id
471 472
        self._type = ASK_NEW_OIDS
        self._body = pack('!H', num_oids)
Aurel's avatar
Aurel committed
473 474
        return self

Yoshinori Okuji's avatar
Yoshinori Okuji committed
475
    def answerNewOIDs(self, msg_id, oid_list):
Aurel's avatar
Aurel committed
476
        self._id = msg_id
477 478
        self._type = ANSWER_NEW_OIDS
        body = [pack('!H', len(oid_list))]
Aurel's avatar
Aurel committed
479 480 481 482
        body.extend(oid_list)
        self._body = ''.join(body)
        return self

Yoshinori Okuji's avatar
Yoshinori Okuji committed
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
    def finishTransaction(self, msg_id, oid_list, tid):
        self._id = msg_id
        self._type = FINISH_TRANSACTION
        body = [pack('!8sL', tid, len(oid_list))]
        body.extend(oid_list)
        self._body = ''.join(body)
        return self

    def notifyTransactionFinished(self, msg_id, tid):
        self._id = msg_id
        self._type = NOTIFY_TRANSACTION_FINISHED
        self._body = tid
        return self

    def lockInformation(self, msg_id, tid):
        self._id = msg_id
        self._type = LOCK_INFORMATION
        self._body = tid
        return self

    def notifyInformationLocked(self, msg_id, tid):
        self._id = msg_id
        self._type = NOTIFY_INFORMATION_LOCKED
        self._body = tid
        return self

    def invalidateObjects(self, msg_id, oid_list):
        self._id = msg_id
        self._type = INVALIDATE_OBJECTS
        body = [pack('!L', len(oid_list))]
        body.extend(oid_list)
        self._body = ''.join(body)
        return self

    def unlockInformation(self, msg_id, tid):
        self._id = msg_id
        self._type = UNLOCK_INFORMATION
        self._body = tid
        return self

Aurel's avatar
Aurel committed
523 524 525 526 527
    def abortTransaction(self, msg_id, tid):
        self._id = msg_id
        self._type = ABORT_TRANSACTION
        self._body = tid
        return self
Yoshinori Okuji's avatar
Yoshinori Okuji committed
528

Aurel's avatar
Aurel committed
529 530 531 532 533 534
    def askStoreTransaction(self, msg_id, tid, user, desc, ext, oid_list):
        self._id = msg_id
        self._type = ASK_STORE_TRANSACTION
        user_len = len(user)
        desc_len = len(desc)
        ext_len = len(ext)
535
        body = [pack('!8sLHHH', tid, len(oid_list), user_len, desc_len, ext_len)]
536 537 538
        body.append(user)
        body.append(desc)
        body.append(ext)
Aurel's avatar
Aurel committed
539 540
        for oid in oid_list:
            body.append(pack('8s', oid))
Aurel's avatar
Aurel committed
541 542 543 544 545 546 547 548 549
        self._body = ''.join(body)
        return self

    def answerStoreTransaction(self, msg_id, tid):
        self._id = msg_id
        self._type = ANSWER_STORE_TRANSACTION
        self._body = tid
        return self

Aurel's avatar
Aurel committed
550
    def askStoreObject(self, msg_id, oid, serial, compression, checksum, data, tid):
Aurel's avatar
Aurel committed
551 552
        self._id = msg_id
        self._type = ASK_STORE_OBJECT
553
        self._body = pack('!8s8s8sBLL', oid, serial, tid, compression,
554
                          checksum, len(data)) + data
Aurel's avatar
Aurel committed
555
        return self
556

557
    def answerStoreObject(self, msg_id, conflicting, oid, serial):
Aurel's avatar
Aurel committed
558 559
        self._id = msg_id
        self._type = ANSWER_STORE_OBJECT
560
        self._body = pack('!B8s8s', conflicting, oid, serial)
Aurel's avatar
Aurel committed
561 562
        return self

563
    def askObject(self, msg_id, oid, serial, tid):
Aurel's avatar
Aurel committed
564
        self._id = msg_id
565 566
        self._type = ASK_OBJECT
        self._body = pack('!8s8s8s', oid, serial, tid)
Aurel's avatar
Aurel committed
567
        return self
568

569 570
    def answerObject(self, msg_id, oid, serial_start, serial_end, compression,
                     checksum, data):
Aurel's avatar
Aurel committed
571
        self._id = msg_id
572 573 574
        self._type = ANSWER_OBJECT
        self._body = pack('!8s8s8sBLL', oid, serial_start, serial_end,
                          compression, checksum, len(data)) + data
Aurel's avatar
Aurel committed
575 576
        return self

577
    def askTIDs(self, msg_id, first, last):
Aurel's avatar
Aurel committed
578 579
        self._id = msg_id
        self._type = ASK_TIDS
580
        self._body = pack('!LL', first, last)
Aurel's avatar
Aurel committed
581 582 583 584 585
        return self

    def answerTIDs(self, msg_id, tid_list):
        self._id = msg_id
        self._type = ANSWER_TIDS
586
        body = [pack('!L', len(tid_list))]
Aurel's avatar
Aurel committed
587 588 589 590 591 592 593 594 595
        body.extend(tid_list)
        self._body = ''.join(body)
        return self

    def askTransactionInformation(self, msg_id, tid):
        self._id = msg_id
        self._type = ASK_TRANSACTION_INFORMATION
        self._body = pack('!8s', tid)
        return self
596

597
    def answerTransactionInformation(self, msg_id, tid, user, desc, oid_list):
Aurel's avatar
Aurel committed
598 599
        self._id = msg_id
        self._type = ANSWER_TRANSACTION_INFORMATION
600
        body = [pack('!8sHHL', tid, len(user), len(desc), len(oid_list))]
Aurel's avatar
Aurel committed
601 602
        body.append(user)
        body.append(desc)
603
        body.extend(oid_list)
Aurel's avatar
Aurel committed
604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
        self._body = ''.join(body)
        return self

    def askObjectHistory(self, msg_id, oid, length):
        self._id = msg_id
        self._type = ASK_OBJECT_HISTORY
        self._body = pack('!8sH', oid, length)
        return self

    def answerObjectHistory(self, msg_id, oid, history_list):
        self._id = msg_id
        self._type = ANSWER_OBJECT_HISTORY
        body = [pack('!8sH', oid, len(history_list))]
        # history_list is a list of tuple (serial, size)
        for history_tuple in history_list:
            body.append(pack('8sL', history_tuple[0], history_tuple[1]))
        self._body = ''.join(body)
        return self
622

Yoshinori Okuji's avatar
Yoshinori Okuji committed
623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655
    # Decoders.
    def decode(self):
        try:
            method = self.decode_table[self._type]
        except KeyError:
            raise ProtocolError(self, 'unknown message type 0x%x' % self._type)
        return method(self)

    decode_table = {}

    def _decodeError(self):
        try:
            body = self._body
            code, size = unpack('!HL', body[:6])
            message = body[6:]
        except:
            raise ProtocolError(self, 'invalid error message')
        if len(message) != size:
            raise ProtocolError(self, 'invalid error message size')
        return code, message
    decode_table[ERROR] = _decodeError

    def _decodePing(self):
        pass
    decode_table[PING] = _decodePing

    def _decodePong(self):
        pass
    decode_table[PONG] = _decodePong

    def _decodeRequestNodeIdentification(self):
        try:
            body = self._body
656 657
            major, minor, node_type, uuid, ip_address, port, size \
                    = unpack('!LLH16s4sHL', body[:36])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
658 659 660 661 662 663 664 665
            ip_address = inet_ntoa(ip_address)
            name = body[36:]
        except:
            raise ProtocolError(self, 'invalid request node identification')
        if size != len(name):
            raise ProtocolError(self, 'invalid name size')
        if node_type not in VALID_NODE_TYPE_LIST:
            raise ProtocolError(self, 'invalid node type %d' % node_type)
666
        if (major, minor) != PROTOCOL_VERSION:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
667 668 669 670 671 672
            raise ProtocolError(self, 'protocol version mismatch')
        return node_type, uuid, ip_address, port, name
    decode_table[REQUEST_NODE_IDENTIFICATION] = _decodeRequestNodeIdentification

    def _decodeAcceptNodeIdentification(self):
        try:
673 674
            node_type, uuid, ip_address, port, num_partitions, num_replicas \
                    = unpack('!H16s4sHLL', self._body)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
675 676 677 678 679
            ip_address = inet_ntoa(ip_address)
        except:
            raise ProtocolError(self, 'invalid accept node identification')
        if node_type not in VALID_NODE_TYPE_LIST:
            raise ProtocolError(self, 'invalid node type %d' % node_type)
Aurel's avatar
Aurel committed
680
        return node_type, uuid, ip_address, port, num_partitions, num_replicas
Yoshinori Okuji's avatar
Yoshinori Okuji committed
681 682
    decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification

683
    def _decodeAskPrimaryMaster(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
684
        pass
685 686 687 688
    decode_table[ASK_PRIMARY_MASTER] = _decodeAskPrimaryMaster

    def _decodeAnswerPrimaryMaster(self):
        try:
689
            primary_uuid, n = unpack('!16sL', self._body[:20])
690 691
            known_master_list = []
            for i in xrange(n):
692
                ip_address, port, uuid = unpack('!4sH16s', self._body[20+i*22:42+i*22])
693 694 695 696
                ip_address = inet_ntoa(ip_address)
                known_master_list.append((ip_address, port, uuid))
        except:
            raise ProtocolError(self, 'invalid answer primary master')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
697
        return primary_uuid, known_master_list
698 699 700 701 702 703 704 705 706
    decode_table[ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster

    def _decodeAnnouncePrimaryMaster(self):
        pass
    decode_table[ANNOUNCE_PRIMARY_MASTER] = _decodeAnnouncePrimaryMaster

    def _decodeReelectPrimaryMaster(self):
        pass
    decode_table[REELECT_PRIMARY_MASTER] = _decodeReelectPrimaryMaster
707

Yoshinori Okuji's avatar
Yoshinori Okuji committed
708
    def _decodeNotifyNodeInformation(self):
709
        try:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
710
            n = unpack('!L', self._body[:4])[0]
711 712 713 714 715 716 717 718 719 720 721 722 723 724
            node_list = []
            for i in xrange(n):
                r = unpack('!H4sH16sH', self._body[4+i*26:30+i*26])
                node_type, ip_address, port, uuid, state = r
                ip_address = inet_ntoa(ip_address)
                if node_type not in VALID_NODE_TYPE_LIST:
                    raise ProtocolError(self, 'invalid node type %d' % node_type)
                if state not in VALID_NODE_STATE_LIST:
                    raise ProtocolError(self, 'invalid node state %d' % state)
                node_list.append((node_type, ip_address, port, uuid, state))
        except ProtocolError:
            raise
        except:
            raise ProtocolError(self, 'invalid answer node information')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
725
        return (node_list,)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
726
    decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
Yoshinori Okuji's avatar
Yoshinori Okuji committed
727 728 729 730 731 732 733 734 735 736 737 738 739 740 741

    def _decodeAskLastIDs(self):
        pass
    decode_table[ASK_LAST_IDS] = _decodeAskLastIDs

    def _decodeAnswerLastIDs(self):
        try:
            loid, ltid, lptid = unpack('!8s8s8s', self._body)
        except:
            raise ProtocolError(self, 'invalid answer last ids')
        return loid, ltid, lptid
    decode_table[ANSWER_LAST_IDS] = _decodeAnswerLastIDs

    def _decodeAskPartitionTable(self):
        try:
742
            n = unpack('!L', self._body[:4])[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
743 744
            offset_list = []
            for i in xrange(n):
745
                offset = unpack('!L', self._body[4+i*4:8+i*4])[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
746 747 748
                offset_list.append(offset)
        except:
            raise ProtocolError(self, 'invalid ask partition table')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
749
        return (offset_list,)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
    decode_table[ASK_PARTITION_TABLE] = _decodeAskPartitionTable

    def _decodeAnswerPartitionTable(self):
        try:
            ptid, n = unpack('!8sL', self._body[:12])
            index = 12
            row_list = []
            cell_list = []
            for i in xrange(n):
                offset, m = unpack('!LL', self._body[index:index+8])
                index += 8
                for j in xrange(m):
                    cell = unpack('!16sH', self._body[index:index+18])
                    index += 18
                    cell_list.append(cell)
                row_list.append((offset, cell_list))
                del cell_list[:]
        except:
            raise ProtocolError(self, 'invalid answer partition table')
769
        return ptid, row_list
Yoshinori Okuji's avatar
Yoshinori Okuji committed
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784
    decode_table[ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable

    def _decodeSendPartitionTable(self):
        try:
            ptid, n = unpack('!8sL', self._body[:12])
            index = 12
            row_list = []
            cell_list = []
            for i in xrange(n):
                offset, m = unpack('!LL', self._body[index:index+8])
                index += 8
                for j in xrange(m):
                    cell = unpack('!16sH', self._body[index:index+18])
                    index += 18
                    cell_list.append(cell)
785
                row_list.append((offset, tuple(cell_list)))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
786 787 788
                del cell_list[:]
        except:
            raise ProtocolError(self, 'invalid send partition table')
789
        return ptid, row_list
Yoshinori Okuji's avatar
Yoshinori Okuji committed
790 791 792 793 794
    decode_table[SEND_PARTITION_TABLE] = _decodeSendPartitionTable

    def _decodeNotifyPartitionChanges(self):
        try:
            ptid, n = unpack('!8sL', self._body[:12])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
795
            cell_list = []
Yoshinori Okuji's avatar
Yoshinori Okuji committed
796 797 798 799 800
            for i in xrange(n):
                cell = unpack('!L16sH', self._body[12+i*22:34+i*22])
                cell_list.append(cell)
        except:
            raise ProtocolError(self, 'invalid notify partition changes')
801
        return ptid, cell_list
Yoshinori Okuji's avatar
Yoshinori Okuji committed
802 803 804 805 806 807 808 809
    decode_table[NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges

    def _decodeStartOperation(self):
        pass
    decode_table[START_OPERATION] = _decodeStartOperation

    def _decodeStopOperation(self):
        pass
810
    decode_table[STOP_OPERATION] = _decodeStopOperation
811 812 813 814 815 816 817

    def _decodeAskUnfinishedTransactions(self):
        pass
    decode_table[ASK_UNFINISHED_TRANSACTIONS] = _decodeAskUnfinishedTransactions

    def _decodeAnswerUnfinishedTransactions(self):
        try:
818
            n = unpack('!L', self._body[:4])[0]
819 820
            tid_list = []
            for i in xrange(n):
821
                tid = unpack('8s', self._body[4+i*8:12+i*8])[0]
822 823 824
                tid_list.append(tid)
        except:
            raise ProtocolError(self, 'invalid answer unfinished transactions')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
825
        return (tid_list,)
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845
    decode_table[ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactions

    def _decodeAskObjectPresent(self):
        try:
            oid, tid = unpack('8s8s', self._body)
        except:
            raise ProtocolError(self, 'invalid ask object present')
        return oid, tid
    decode_table[ASK_OBJECT_PRESENT] = _decodeAskObjectPresent

    def _decodeAnswerObjectPresent(self):
        try:
            oid, tid = unpack('8s8s', self._body)
        except:
            raise ProtocolError(self, 'invalid answer object present')
        return oid, tid
    decode_table[ANSWER_OBJECT_PRESENT] = _decodeAnswerObjectPresent

    def _decodeDeleteTransaction(self):
        try:
846
            tid = unpack('8s', self._body)[0]
847 848
        except:
            raise ProtocolError(self, 'invalid delete transaction')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
849
        return (tid,)
850 851 852 853
    decode_table[DELETE_TRANSACTION] = _decodeDeleteTransaction

    def _decodeCommitTransaction(self):
        try:
854
            tid = unpack('8s', self._body)[0]
855 856
        except:
            raise ProtocolError(self, 'invalid commit transaction')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
857
        return (tid,)
858
    decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
Yoshinori Okuji's avatar
Yoshinori Okuji committed
859 860 861 862 863 864 865

    def _decodeAskNewTID(self):
        pass
    decode_table[ASK_NEW_TID] = _decodeAskNewTID

    def _decodeAnswerNewTID(self):
        try:
866
            tid = unpack('8s', self._body)[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
867 868
        except:
            raise ProtocolError(self, 'invalid answer new tid')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
869
        return (tid,)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
870 871
    decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID

872
    def _decodeAskNewOIDs(self):
Aurel's avatar
Aurel committed
873
        try:
874
            num_oids = unpack('!H', self._body)[0]
Aurel's avatar
Aurel committed
875
        except:
876
            raise ProtocolError(self, 'invalid ask new oids')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
877
        return (num_oids,)
878
    decode_table[ASK_NEW_OIDS] = _decodeAskNewOIDs
Aurel's avatar
Aurel committed
879

880
    def _decodeAnswerNewOIDs(self):
Aurel's avatar
Aurel committed
881
        try:
882
            n = unpack('!H', self._body[:2])[0]
Aurel's avatar
Aurel committed
883 884
            oid_list = []
            for i in xrange(n):
885
                oid = unpack('8s', self._body[2+i*8:10+i*8])[0]
Aurel's avatar
Aurel committed
886 887
                oid_list.append(oid)
        except:
888
            raise ProtocolError(self, 'invalid answer new oids')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
889
        return (oid_list,)
890
    decode_table[ANSWER_NEW_OIDS] = _decodeAnswerNewOIDs
Aurel's avatar
Aurel committed
891

Yoshinori Okuji's avatar
Yoshinori Okuji committed
892 893 894 895 896
    def _decodeFinishTransaction(self):
        try:
            tid, n = unpack('!8sL', self._body[:12])
            oid_list = []
            for i in xrange(n):
897
                oid = unpack('8s', self._body[12+i*8:20+i*8])[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
898 899 900 901 902 903 904 905
                oid_list.append(oid)
        except:
            raise ProtocolError(self, 'invalid finish transaction')
        return oid_list, tid
    decode_table[FINISH_TRANSACTION] = _decodeFinishTransaction

    def _decodeNotifyTransactionFinished(self):
        try:
906
            tid = unpack('8s', self._body)[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
907 908
        except:
            raise ProtocolError(self, 'invalid notify transactin finished')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
909
        return (tid,)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
910 911 912 913
    decode_table[NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished

    def _decodeLockInformation(self):
        try:
914
            tid = unpack('8s', self._body)[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
915 916
        except:
            raise ProtocolError(self, 'invalid lock information')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
917
        return (tid,)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
918 919 920 921
    decode_table[LOCK_INFORMATION] = _decodeLockInformation

    def _decodeNotifyInformationLocked(self):
        try:
922
            tid = unpack('8s', self._body)[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
923 924
        except:
            raise ProtocolError(self, 'invalid notify information locked')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
925
        return (tid,)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
926 927 928 929
    decode_table[NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked

    def _decodeInvalidateObjects(self):
        try:
930
            n = unpack('!L', self._body[:4])[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
931 932
            oid_list = []
            for i in xrange(n):
933
                oid = unpack('8s', self._body[4+i*8:12+i*8])[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
934 935 936
                oid_list.append(oid)
        except:
            raise ProtocolError(self, 'invalid finish transaction')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
937
        return (oid_list,)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
938 939 940 941
    decode_table[INVALIDATE_OBJECTS] = _decodeInvalidateObjects

    def _decodeUnlockInformation(self):
        try:
942
            tid = unpack('8s', self._body)[0]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
943 944 945 946 947
        except:
            raise ProtocolError(self, 'invalid unlock information')
        return tid
    decode_table[UNLOCK_INFORMATION] = _decodeUnlockInformation

Aurel's avatar
Aurel committed
948 949
    def _decodeAbortTransaction(self):
        try:
950
            tid = unpack('8s', self._body)[0]
Aurel's avatar
Aurel committed
951 952
        except:
            raise ProtocolError(self, 'invalid abort transaction')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
953
        return (tid,)
Aurel's avatar
Aurel committed
954 955 956 957
    decode_table[ABORT_TRANSACTION] = _decodeAbortTransaction

    def _decodeAskStoreObject(self):
        try:
Aurel's avatar
Aurel committed
958
            oid, serial, tid, compression, checksum, data_len \
Aurel's avatar
Aurel committed
959 960
                 = unpack('!8s8s8sBLL', self._body[:33])
            data = self._body[33:]
Aurel's avatar
Aurel committed
961 962
        except:
            raise ProtocolError(self, 'invalid ask store object')
963 964
        if data_len != len(data):
            raise ProtocolError(self, 'invalid data size')
Aurel's avatar
Aurel committed
965
        return oid, serial, tid, compression, checksum, data
Aurel's avatar
Aurel committed
966
    decode_table[ASK_STORE_OBJECT] = _decodeAskStoreObject
967 968

    def _decodeAnswerStoreObject(self):
Aurel's avatar
Aurel committed
969
        try:
970
            conflicting, oid, serial = unpack('!B8s8s', self._body)
Aurel's avatar
Aurel committed
971 972
        except:
            raise ProtocolError(self, 'invalid answer store object')
973
        return conflicting, oid, serial
Aurel's avatar
Aurel committed
974 975 976 977
    decode_table[ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject

    def _decodeAskStoreTransaction(self):
        try:
978 979 980
            tid, oid_len, user_len, desc_len, ext_len \
                    = unpack('!8sLHHH', self._body[:18])
            offset = 18
981
            user = self._body[offset:offset+user_len]
982
            offset += user_len
983
            desc = self._body[offset:offset+desc_len]
984
            offset += desc_len
985
            ext = self._body[offset:offset+ext_len]
986
            offset += ext_len
Aurel's avatar
Aurel committed
987 988
            oid_list = []
            for i in xrange(oid_len):
989
                oid = unpack('8s', self._body[offset:offset+8])[0]
990
                offset += 8
991
                oid_list.append(oid)
Aurel's avatar
Aurel committed
992 993
        except:
            raise ProtocolError(self, 'invalid ask store transaction')
994
        return tid, user, desc, ext, oid_list
Aurel's avatar
Aurel committed
995
    decode_table[ASK_STORE_TRANSACTION] = _decodeAskStoreTransaction
996

Aurel's avatar
Aurel committed
997 998
    def _decodeAnswerStoreTransaction(self):
        try:
999
            tid = unpack('8s', self._body)[0]
Aurel's avatar
Aurel committed
1000 1001
        except:
            raise ProtocolError(self, 'invalid answer store transaction')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1002
        return (tid,)
Aurel's avatar
Aurel committed
1003 1004
    decode_table[ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction

1005
    def _decodeAskObject(self):
Aurel's avatar
Aurel committed
1006
        try:
1007
            oid, serial, tid = unpack('8s8s8s', self._body)
Aurel's avatar
Aurel committed
1008
        except:
1009 1010 1011
            raise ProtocolError(self, 'invalid ask object')
        return oid, serial, tid
    decode_table[ASK_OBJECT] = _decodeAskObject
1012

1013
    def _decodeAnswerObject(self):
Aurel's avatar
Aurel committed
1014
        try:
1015 1016 1017
            oid, serial_start, serial_end, compression, checksum, data_len \
                 = unpack('!8s8s8sBLL', self._body[:33])
            data = self._body[33:]
Aurel's avatar
Aurel committed
1018
        except:
1019
            raise ProtocolError(self, 'invalid answer object')
1020 1021
        if len(data) != data_len:
            raise ProtocolError(self, 'invalid data size')
1022 1023
        return oid, serial_start, serial_end, compression, checksum, data
    decode_table[ANSWER_OBJECT] = _decodeAnswerObject
Aurel's avatar
Aurel committed
1024 1025 1026

    def _decodeAskTIDs(self):
        try:
1027
            first, last = unpack('!LL', self._body[:8])
Aurel's avatar
Aurel committed
1028 1029
        except:
            raise ProtocolError(self, 'invalid ask tids')
1030
        return first, last
Aurel's avatar
Aurel committed
1031 1032 1033 1034
    decode_table[ASK_TIDS] = _decodeAskTIDs

    def _decodeAnswerTIDs(self):
        try:
1035
            n = unpack('!L', self._body[:4])[0]
Aurel's avatar
Aurel committed
1036 1037
            tid_list = []
            for i in xrange(n):
1038
                tid = unpack('8s', self._body[4+i*8:12+i*8])[0]
Aurel's avatar
Aurel committed
1039 1040 1041
                tid_list.append(tid)
        except:
            raise ProtocolError(self, 'invalid answer tids')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1042
        return (tid_list,)
Aurel's avatar
Aurel committed
1043
    decode_table[ANSWER_TIDS] = _decodeAnswerTIDs
1044

Aurel's avatar
Aurel committed
1045 1046
    def _decodeAskTransactionInformation(self):
        try:
1047
            tid = unpack('8s', self._body)[0]
Aurel's avatar
Aurel committed
1048 1049
        except:
            raise ProtocolError(self, 'invalid ask transaction information')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1050
        return (tid,)
Aurel's avatar
Aurel committed
1051 1052 1053 1054
    decode_table[ASK_TRANSACTION_INFORMATION] = _decodeAskTransactionInformation

    def _decodeAnswerTransactionInformation(self):
        try:
1055 1056
            tid, user_len, desc_len, oid_len = unpack('!8sHHL', self._body[:16])
            offset = 16
Aurel's avatar
Aurel committed
1057 1058 1059
            user = self._body[offset:offset+user_len]
            offset += user_len
            desc = self._body[offset:offset+desc_len]
1060 1061 1062
            offset += desc_len
            oid_list = []
            for i in xrange(oid_len):
1063 1064
                oid = unpack('8s', self._body[offset+i*8:offset+8+i*8])[0]
                oid_list.append(oid)
Aurel's avatar
Aurel committed
1065 1066
        except:
            raise ProtocolError(self, 'invalid answer transaction information')
1067
        return tid, user, desc, oid_list
1068
    decode_table[ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformation
Aurel's avatar
Aurel committed
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080

    def _decodeAskObjectHistory(self):
        try:
            oid, length = unpack('!8sH', self._body)
        except:
            raise ProtocolError(self, 'invalid ask object history')
        return oid, length
    decode_table[ASK_OBJECT_HISTORY] = _decodeAskObjectHistory

    def _decodeAnswerObjectHistory(self):
        try:
            oid, length = unpack('!8sH', self._body[:10])
Yoshinori Okuji's avatar
Yoshinori Okuji committed
1081 1082
            history_list = []
            for i in xrange(length):
Aurel's avatar
Aurel committed
1083
                serial, size = unpack('!8sL', self._body[10+i*12:22+i*12])
1084
                history_list.append(tuple(serial, size))
Aurel's avatar
Aurel committed
1085 1086 1087 1088
        except:
            raise ProtocolError(self, 'invalid answer object history')
        return oid, history_list
    decode_table[ANSWER_OBJECT_HISTORY] = _decodeAnswerObjectHistory
1089