Commit a6aae268 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Change protocol module design:

- Each packet encoder and decoder is in a class
- A packet registry check if there is duplicate packet code type
- Each packet class got a docstring
Next commits will update the NEO source code to use those changes.


git-svn-id: https://svn.erp5.org/repos/neo/trunk@1356 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 601d9dad
......@@ -28,222 +28,6 @@ MIN_PACKET_SIZE = 10
MAX_PACKET_SIZE = 0x4000000
PACKET_HEADER_SIZE = 10
# Message types.
class PacketTypes(Enum):
# Error is a special type of message, because this can be sent against any other message,
# even if such a message does not expect a reply usually. Any -> Any.
ERROR = Enum.Item(0x8000)
# Check if a peer is still alive. Any -> Any.
PING = Enum.Item(0x0001)
# Request a node identification. This must be the first packet for any connection.
# Any -> Any.
REQUEST_NODE_IDENTIFICATION = Enum.Item(0x0002)
# Accept a node identification. This should be a reply to Request Node Identification.
# Any -> Any.
ACCEPT_NODE_IDENTIFICATION = Enum.Item(0x8002)
# Ask a current primary master node. This must be the second message when connecting
# to a master node. Any -> M.
ASK_PRIMARY_MASTER = Enum.Item(0x0003)
# Reply to Ask Primary Master. This message includes a list of known master nodes,
# to make sure that a peer has the same information. M -> Any.
ANSWER_PRIMARY_MASTER = Enum.Item(0x8003)
# Announce a primary master node election. PM -> SM.
ANNOUNCE_PRIMARY_MASTER = Enum.Item(0x0004)
# Force a re-election of a primary master node. M -> M.
REELECT_PRIMARY_MASTER = Enum.Item(0x0005)
# Notify information about one or more nodes. Any -> PM, PM -> Any.
NOTIFY_NODE_INFORMATION = Enum.Item(0x0006)
# Ask the last OID, the last TID and the last Partition Table ID that
# a storage node stores. Used to recover information. PM -> S, S -> PM.
ASK_LAST_IDS = Enum.Item(0x0007)
# Reply to Ask Last IDs. S -> PM, PM -> S.
ANSWER_LAST_IDS = Enum.Item(0x8007)
# Ask rows in a partition table that a storage node stores. Used to recover
# information. PM -> S.
ASK_PARTITION_TABLE = Enum.Item(0x0008)
# Answer rows in a partition table. S -> PM.
ANSWER_PARTITION_TABLE = Enum.Item(0x8008)
# Send rows in a partition table to update other nodes. PM -> S, C.
SEND_PARTITION_TABLE = Enum.Item(0x0009)
# Notify a subset of a partition table. This is used to notify changes. PM -> S, C.
NOTIFY_PARTITION_CHANGES = Enum.Item(0x000a)
# Tell a storage nodes to start an operation. Until a storage node receives this
# message, it must not serve client nodes. PM -> S.
START_OPERATION = Enum.Item(0x000b)
# Tell a storage node to stop an operation. Once a storage node receives this message,
# it must not serve client nodes. PM -> S.
STOP_OPERATION = Enum.Item(0x000c)
# Ask unfinished transactions =Enum.Item(s) PM -> S.
ASK_UNFINISHED_TRANSACTIONS = Enum.Item(0x000d)
# Answer unfinished transactions =Enum.Item(s) S -> PM.
ANSWER_UNFINISHED_TRANSACTIONS = Enum.Item(0x800d)
# Ask if an object is present. If not present, OID_NOT_FOUND should be returned. PM -> S.
ASK_OBJECT_PRESENT = Enum.Item(0x000f)
# Answer that an object is present. PM -> S.
ANSWER_OBJECT_PRESENT = Enum.Item(0x800f)
# Delete a transaction. PM -> S.
DELETE_TRANSACTION = Enum.Item(0x0010)
# Commit a transaction. PM -> S.
COMMIT_TRANSACTION = Enum.Item(0x0011)
# Ask to begin a new transaction. C -> PM.
ASK_BEGIN_TRANSACTION = Enum.Item(0x0012)
# Answer when a transaction begin, give a TID if necessary. PM -> C.
ANSWER_BEGIN_TRANSACTION = Enum.Item(0x8012)
# Finish a transaction. C -> PM.
FINISH_TRANSACTION = Enum.Item(0x0013)
# Notify a transaction finished. PM -> C.
NOTIFY_TRANSACTION_FINISHED = Enum.Item(0x8013)
# Lock information on a transaction. PM -> S.
LOCK_INFORMATION = Enum.Item(0x0014)
# Notify information on a transaction locked. S -> PM.
NOTIFY_INFORMATION_LOCKED = Enum.Item(0x8014)
# Invalidate objects. PM -> C.
INVALIDATE_OBJECTS = Enum.Item(0x0015)
# Unlock information on a transaction. PM -> S.
UNLOCK_INFORMATION = Enum.Item(0x0016)
# Ask new object IDs. C -> PM.
ASK_NEW_OIDS = Enum.Item(0x0017)
# Answer new object IDs. PM -> C.
ANSWER_NEW_OIDS = Enum.Item(0x8017)
# Ask to store an object. Send an OID, an original serial, a current
# transaction ID, and data. C -> S.
ASK_STORE_OBJECT = Enum.Item(0x0018)
# Answer if an object has been stored. If an object is in conflict,
# a serial of the conflicting transaction is returned. In this case,
# if this serial is newer than the current transaction ID, a client
# node must not try to resolve the conflict. S -> C.
ANSWER_STORE_OBJECT = Enum.Item(0x8018)
# Abort a transaction. C -> S, PM.
ABORT_TRANSACTION = Enum.Item(0x0019)
# Ask to store a transaction. C -> S.
ASK_STORE_TRANSACTION = Enum.Item(0x001a)
# Answer if transaction has been stored. S -> C.
ANSWER_STORE_TRANSACTION = Enum.Item(0x801a)
# Ask a stored object by its OID and a serial or a TID if given. If a serial
# is specified, the specified revision of an object will be returned. If
# a TID is specified, an object right before the TID will be returned. S,C -> S.
ASK_OBJECT = Enum.Item(0x001b)
# Answer the requested object. S -> C.
ANSWER_OBJECT = Enum.Item(0x801b)
# Ask for TIDs between a range of offsets. The order of TIDs is descending,
# and the range is [first, last). C, S -> S.
ASK_TIDS = Enum.Item(0x001d)
# Answer the requested TIDs. S -> C, S.
ANSWER_TIDS = Enum.Item(0x801d)
# Ask information about a transaction. Any -> S.
ASK_TRANSACTION_INFORMATION = Enum.Item(0x001e)
# Answer information (user, description) about a transaction. S -> Any.
ANSWER_TRANSACTION_INFORMATION = Enum.Item(0x801e)
# Ask history information for a given object. The order of serials is
# descending, and the range is [first, last]. C, S -> S.
ASK_OBJECT_HISTORY = Enum.Item(0x001f)
# Answer history information (serial, size) for an object. S -> C, S.
ANSWER_OBJECT_HISTORY = Enum.Item(0x801f)
# Ask for OIDs between a range of offsets. The order of OIDs is descending,
# and the range is [first, last). S -> S.
ASK_OIDS = Enum.Item(0x0020)
# Answer the requested OIDs. S -> S.
ANSWER_OIDS = Enum.Item(0x8020)
# All the following messages are for neoctl to admin node
# Ask information about partition
ASK_PARTITION_LIST = Enum.Item(0x0021)
# Answer information about partition
ANSWER_PARTITION_LIST = Enum.Item(0x8021)
# Ask information about nodes
ASK_NODE_LIST = Enum.Item(0x0022)
# Answer information about nodes
ANSWER_NODE_LIST = Enum.Item(0x8022)
# Set the node state
SET_NODE_STATE = Enum.Item(0x0023)
# Answer state of the node
ANSWER_NODE_STATE = Enum.Item(0x8023)
# Ask the primary to include some pending node in the partition table
ADD_PENDING_NODES = Enum.Item(0x0024)
# Anwer what are the nodes added in the partition table
ANSWER_NEW_NODES = Enum.Item(0x8024)
# Ask node information
ASK_NODE_INFORMATION = Enum.Item(0x0025)
# Answer node information
ANSWER_NODE_INFORMATION = Enum.Item(0x8025)
# Set the cluster state
SET_CLUSTER_STATE = Enum.Item(0x0026)
# Notify information about the cluster
NOTIFY_CLUSTER_INFORMATION = Enum.Item(0x0027)
# Ask state of the cluster
ASK_CLUSTER_STATE = Enum.Item(0x0028)
# Answer state of the cluster
ANSWER_CLUSTER_STATE = Enum.Item(0x8028)
# Notify being alive. Any -> Any.
PONG = Enum.Item(0x0029)
# Notify last OID generated
NOTIFY_LAST_OID = Enum.Item(0x0030)
PacketTypes = PacketTypes()
class ErrorCodes(Enum):
# TODO: clarify the use of each error code
NO_ERROR = Enum.Item(0)
......@@ -346,88 +130,7 @@ class BrokenNodeDisallowedError(ProtocolError):
pass
decode_table = {}
class Packet(object):
"""A packet."""
_id = None
_type = None
def __init__(self, msg_type, body=''):
self._id = None
self._type = msg_type
self._body = body
def getId(self):
return self._id
def setId(self, packet_id):
self._id = packet_id
def getType(self):
return self._type
def __str__(self):
return 'Packet <%s>' % self._type
def __len__(self):
try:
return PACKET_HEADER_SIZE + len(self._body)
except TypeError:
return PACKET_HEADER_SIZE
def encode(self):
msg = pack('!LHL', self._id, self._type, PACKET_HEADER_SIZE + len(self._body)) + self._body
if len(msg) > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % len(msg))
return msg
def decode(self):
try:
method = decode_table[self._type]
except KeyError:
raise PacketMalformedError('unknown message type 0x%x' % self._type)
return method(self._body)
def isResponse(self):
return self._type & 0x8000 == 0x8000
# packet parser
def parse(msg):
if len(msg) < MIN_PACKET_SIZE:
return None
msg_id, msg_type, msg_len = unpack('!LHL', msg[:PACKET_HEADER_SIZE])
try:
msg_type = PacketTypes[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
if msg_len > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % msg_len)
if msg_len < MIN_PACKET_SIZE:
raise PacketMalformedError('message too small (%d)' % msg_len)
if len(msg) < msg_len:
# Not enough.
return None
packet = Packet(msg_type, msg[PACKET_HEADER_SIZE:msg_len])
packet.setId(msg_id)
return packet
def handle_errors(decoder):
""" Decorator to be used on encoding/decoding methods. Intercept struct
(pack/unpack) exceptions and wrap them in PacketMalformedError """
def wrapper(body):
try:
return decoder(body)
except error, msg: # struct.error
name = decoder.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg))
except PacketMalformedError, msg:
name = decoder.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg))
return wrapper
def _decodeClusterState(state):
cluster_state = ClusterStates.get(state)
if cluster_state is None:
......@@ -501,550 +204,1127 @@ def _readString(buf, name, offset=0):
raise PacketMalformedError("can't read string <%s>" % name)
return (string, buf[offset+size:])
# packet decoding
@handle_errors
def _decodeError(body):
(code, ) = unpack('!H', body[:2])
code = _decodeErrorCode(code)
(message, _) = _readString(body, 'message', offset=2)
return (code, message)
decode_table[PacketTypes.ERROR] = _decodeError
@handle_errors
def _decodePing(body):
pass
decode_table[PacketTypes.PING] = _decodePing
def parse(msg):
if len(msg) < MIN_PACKET_SIZE:
return None
msg_id, msg_type, msg_len = unpack('!LHL', msg[:PACKET_HEADER_SIZE])
try:
packet_klass = Packets[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
if msg_len > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % msg_len)
if msg_len < MIN_PACKET_SIZE:
raise PacketMalformedError('message too small (%d)' % msg_len)
if len(msg) < msg_len:
# Not enough.
return None
packet = packet_klass()
packet.setContent(msg_type, msg_id, msg[PACKET_HEADER_SIZE:msg_len])
return packet
@handle_errors
def _decodePong(body):
pass
decode_table[PacketTypes.PONG] = _decodePong
@handle_errors
def _decodeRequestNodeIdentification(body):
r = unpack('!LLH16s6s', body[:32])
major, minor, node_type, uuid, address = r
address = _decodeAddress(address)
(name, _) = _readString(body, 'name', offset=32)
node_type = _decodeNodeType(node_type)
uuid = _decodeUUID(uuid)
if (major, minor) != PROTOCOL_VERSION:
raise PacketMalformedError('protocol version mismatch')
return node_type, uuid, address, name
decode_table[PacketTypes.REQUEST_NODE_IDENTIFICATION] = _decodeRequestNodeIdentification
@handle_errors
def _decodeAcceptNodeIdentification(body):
r = unpack('!H16s6sLL16s', body)
node_type, uuid, address, num_partitions, num_replicas, your_uuid = r
address = _decodeAddress(address)
node_type = _decodeNodeType(node_type)
uuid = _decodeUUID(uuid)
your_uuid == _decodeUUID(uuid)
return (node_type, uuid, address, num_partitions, num_replicas, your_uuid)
decode_table[PacketTypes.ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
@handle_errors
def _decodeAskPrimaryMaster(body):
pass
decode_table[PacketTypes.ASK_PRIMARY_MASTER] = _decodeAskPrimaryMaster
@handle_errors
def _decodeAnswerPrimaryMaster(body):
(primary_uuid, n) = unpack('!16sL', body[:20])
known_master_list = []
for i in xrange(n):
address, uuid = unpack('!6s16s', body[20+i*22:42+i*22])
address = _decodeAddress(address)
uuid = _decodeUUID(uuid)
known_master_list.append((address, uuid))
primary_uuid = _decodeUUID(primary_uuid)
return (primary_uuid, known_master_list)
decode_table[PacketTypes.ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster
@handle_errors
def _decodeAnnouncePrimaryMaster(body):
class Packet(object):
# XXX: use a global a class-attribute
_body = None
_code = None
_args = None
_id = None
def __init__(self, *args, **kw):
assert self._code is not None, "Packet class not registered"
if args != ():
body = self._encode(*args, **kw)
else:
body = ''
self._body = body
self._args = args
def decode(self):
assert self._body is not None
try:
return self._decode(self._body)
except error, msg: # struct.error
name = self.__class__.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg))
except PacketMalformedError, msg:
name = self.__class__.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg))
def setContent(self, code, msg_id, body):
self._code = code
self._id = msg_id
self._body = body
def setId(self, value):
self._id = value
def getId(self):
assert self._id is not None
return self._id
def getCode(self):
assert self._code is not None
return self._code
def getType(self):
return self.__class__
# TODO: replace this with __call__ that take the id as parameter
def __str__(self):
assert self._id is not None
content = self._body
length = PACKET_HEADER_SIZE + len(content)
return pack('!LHL', self._id, self._code, length) + content
def __len__(self):
return PACKET_HEADER_SIZE + len(self._body)
def __eq__(self, other):
""" Compare packets with their code instead of content """
if other is None:
return False
assert isinstance(other, Packet)
return self._code == other._code
def _encode(self, *args, **kw):
""" Default encoder, join all arguments """
# XXX: this is a convenient method but there is a lack of argument
# checking (tid length...)
assert kw == {}
if args:
return ''.join([str(i) for i in args])
return ''
def _decode(self, body):
""" Default decoder, message must be empty """
assert body == '', "Non-empty packet decoding not implemented """
return ()
def isResponse(self):
# FIXME: usefull ?
return self._code & 0x8000 == 0x8000
class Ping(Packet):
"""
Check if a peer is still alive. Any -> Any.
"""
pass
decode_table[PacketTypes.ANNOUNCE_PRIMARY_MASTER] = _decodeAnnouncePrimaryMaster
@handle_errors
def _decodeReelectPrimaryMaster(body):
class Pong(Packet):
"""
Notify being alive. Any -> Any.
"""
pass
decode_table[PacketTypes.REELECT_PRIMARY_MASTER] = _decodeReelectPrimaryMaster
@handle_errors
def _decodeNotifyNodeInformation(body):
(n,) = unpack('!L', body[:4])
node_list = []
for i in xrange(n):
r = unpack('!H6s16sH', body[4+i*26:30+i*26])
node_type, address, uuid, state = r
class RequestIdentification(Packet):
"""
Request a node identification. This must be the first packet for any
connection. Any -> Any.
"""
def _encode(self, node_type, uuid, address, name):
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
return pack('!LLH16s6sL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
node_type, uuid, address, len(name)) + name
def _decode(self, body):
r = unpack('!LLH16s6s', body[:32])
major, minor, node_type, uuid, address = r
address = _decodeAddress(address)
(name, _) = _readString(body, 'name', offset=32)
node_type = _decodeNodeType(node_type)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
node_list.append((node_type, address, uuid, state))
return (node_list,)
decode_table[PacketTypes.NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
if (major, minor) != PROTOCOL_VERSION:
raise PacketMalformedError('protocol version mismatch')
return (node_type, uuid, address, name)
class AcceptIdentification(Packet):
"""
Accept a node identification. This should be a reply to Request Node
Identification. Any -> Any.
"""
def _encode(self, node_type, uuid, address,
num_partitions, num_replicas, your_uuid):
uuid = _encodeUUID(uuid)
your_uuid = _encodeUUID(your_uuid)
address = _encodeAddress(address)
return pack('!H16s6sLL16s', node_type, uuid, address,
num_partitions, num_replicas, your_uuid)
@handle_errors
def _decodeAskLastIDs(body):
pass
decode_table[PacketTypes.ASK_LAST_IDS] = _decodeAskLastIDs
@handle_errors
def _decodeAnswerLastIDs(body):
(loid, ltid, lptid) = unpack('!8s8s8s', body)
lptid = _decodePTID(lptid)
return (loid, ltid, lptid)
decode_table[PacketTypes.ANSWER_LAST_IDS] = _decodeAnswerLastIDs
@handle_errors
def _decodeAskPartitionTable(body):
(n,) = unpack('!L', body[:4])
offset_list = []
for i in xrange(n):
offset = unpack('!L', body[4+i*4:8+i*4])[0]
offset_list.append(offset)
return (offset_list,)
decode_table[PacketTypes.ASK_PARTITION_TABLE] = _decodeAskPartitionTable
@handle_errors
def _decodeAnswerPartitionTable(body):
index = 12
(ptid, n) = unpack('!8sL', body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
decode_table[PacketTypes.ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable
@handle_errors
def _decodeSendPartitionTable(body):
index = 12
(ptid, n,) = unpack('!8sL', body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
decode_table[PacketTypes.SEND_PARTITION_TABLE] = _decodeSendPartitionTable
@handle_errors
def _decodeNotifyPartitionChanges(body):
(ptid, n) = unpack('!8sL', body[:12])
ptid = _decodePTID(ptid)
cell_list = []
for i in xrange(n):
(offset, uuid, state) = unpack('!L16sH', body[12+i*22:34+i*22])
state = CellStates.get(state)
def _decode(self, body):
r = unpack('!H16s6sLL16s', body)
node_type, uuid, address, num_partitions, num_replicas, your_uuid = r
address = _decodeAddress(address)
node_type = _decodeNodeType(node_type)
uuid = _decodeUUID(uuid)
cell_list.append((offset, uuid, state))
return ptid, cell_list
decode_table[PacketTypes.NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges
your_uuid == _decodeUUID(uuid)
return (node_type, uuid, address, num_partitions, num_replicas, your_uuid)
class AskPrimary(Packet):
"""
Ask a current primary master node. This must be the second message when
connecting to a master node. Any -> M.
"""
pass
@handle_errors
def _decodeStartOperation(body):
class AnswerPrimary(Packet):
"""
Reply to Ask Primary Master. This message includes a list of known master
nodes to make sure that a peer has the same information. M -> Any.
"""
def _encode(self, primary_uuid, known_master_list):
primary_uuid = _encodeUUID(primary_uuid)
body = [primary_uuid, pack('!L', len(known_master_list))]
for address, uuid in known_master_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack('!6s16s', address, uuid))
return ''.join(body)
def _decode(self, body):
(primary_uuid, n) = unpack('!16sL', body[:20])
known_master_list = []
for i in xrange(n):
address, uuid = unpack('!6s16s', body[20+i*22:42+i*22])
address = _decodeAddress(address)
uuid = _decodeUUID(uuid)
known_master_list.append((address, uuid))
primary_uuid = _decodeUUID(primary_uuid)
return (primary_uuid, known_master_list)
class AnnouncePrimary(Packet):
"""
Announce a primary master node election. PM -> SM.
"""
pass
decode_table[PacketTypes.START_OPERATION] = _decodeStartOperation
@handle_errors
def _decodeStopOperation(body):
class ReelectPrimary(Packet):
"""
Force a re-election of a primary master node. M -> M.
"""
pass
decode_table[PacketTypes.STOP_OPERATION] = _decodeStopOperation
@handle_errors
def _decodeAskUnfinishedTransactions(body):
class AskLastIDs(Packet):
"""
Ask the last OID, the last TID and the last Partition Table ID that
a storage node stores. Used to recover information. PM -> S, S -> PM.
"""
pass
decode_table[PacketTypes.ASK_UNFINISHED_TRANSACTIONS] = _decodeAskUnfinishedTransactions
@handle_errors
def _decodeAnswerUnfinishedTransactions(body):
(n,) = unpack('!L', body[:4])
tid_list = []
for i in xrange(n):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
tid_list.append(tid)
return (tid_list,)
decode_table[PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactions
@handle_errors
def _decodeAskObjectPresent(body):
(oid, tid) = unpack('8s8s', body)
return (oid, tid)
decode_table[PacketTypes.ASK_OBJECT_PRESENT] = _decodeAskObjectPresent
@handle_errors
def _decodeAnswerObjectPresent(body):
(oid, tid) = unpack('8s8s', body)
return (oid, tid)
decode_table[PacketTypes.ANSWER_OBJECT_PRESENT] = _decodeAnswerObjectPresent
@handle_errors
def _decodeDeleteTransaction(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[PacketTypes.DELETE_TRANSACTION] = _decodeDeleteTransaction
@handle_errors
def _decodeCommitTransaction(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[PacketTypes.COMMIT_TRANSACTION] = _decodeCommitTransaction
@handle_errors
def _decodeAskBeginTransaction(body):
(tid, ) = unpack('8s', body)
tid = _decodeTID(tid)
return (tid, )
decode_table[PacketTypes.ASK_BEGIN_TRANSACTION] = _decodeAskBeginTransaction
@handle_errors
def _decodeAnswerBeginTransaction(body):
(tid, ) = unpack('8s', body)
tid = _decodeTID(tid)
return (tid, )
decode_table[PacketTypes.ANSWER_BEGIN_TRANSACTION] = _decodeAnswerBeginTransaction
@handle_errors
def _decodeAskNewOIDs(body):
return unpack('!H', body) # num oids
decode_table[PacketTypes.ASK_NEW_OIDS] = _decodeAskNewOIDs
@handle_errors
def _decodeAnswerNewOIDs(body):
(n,) = unpack('!H', body[:2])
oid_list = []
for i in xrange(n):
oid = unpack('8s', body[2+i*8:10+i*8])[0]
oid_list.append(oid)
return (oid_list,)
decode_table[PacketTypes.ANSWER_NEW_OIDS] = _decodeAnswerNewOIDs
@handle_errors
def _decodeFinishTransaction(body):
(tid, n) = unpack('!8sL', body[:12])
oid_list = []
for i in xrange(n):
oid = unpack('8s', body[12+i*8:20+i*8])[0]
oid_list.append(oid)
return (oid_list, tid)
decode_table[PacketTypes.FINISH_TRANSACTION] = _decodeFinishTransaction
@handle_errors
def _decodeNotifyTransactionFinished(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[PacketTypes.NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished
@handle_errors
def _decodeLockInformation(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[PacketTypes.LOCK_INFORMATION] = _decodeLockInformation
@handle_errors
def _decodeNotifyInformationLocked(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[PacketTypes.NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked
@handle_errors
def _decodeInvalidateObjects(body):
(tid, n) = unpack('!8sL', body[:12])
oid_list = []
for i in xrange(12, 12 + n * 8, 8):
oid = unpack('8s', body[i:i+8])[0]
oid_list.append(oid)
return (oid_list, tid)
decode_table[PacketTypes.INVALIDATE_OBJECTS] = _decodeInvalidateObjects
@handle_errors
def _decodeUnlockInformation(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[PacketTypes.UNLOCK_INFORMATION] = _decodeUnlockInformation
@handle_errors
def _decodeAbortTransaction(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[PacketTypes.ABORT_TRANSACTION] = _decodeAbortTransaction
@handle_errors
def _decodeAskStoreObject(body):
r = unpack('!8s8s8sBL', body[:29])
oid, serial, tid, compression, checksum = r
(data, _) = _readString(body, 'data', offset=29)
return (oid, serial, compression, checksum, data, tid)
decode_table[PacketTypes.ASK_STORE_OBJECT] = _decodeAskStoreObject
@handle_errors
def _decodeAnswerStoreObject(body):
(conflicting, oid, serial) = unpack('!B8s8s', body)
return (conflicting, oid, serial)
decode_table[PacketTypes.ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject
@handle_errors
def _decodeAskStoreTransaction(body):
r = unpack('!8sLHHH', body[:18])
tid, oid_len, user_len, desc_len, ext_len = r
body = body[18:]
user = body[:user_len]
body = body[user_len:]
desc = body[:desc_len]
body = body[desc_len:]
ext = body[:ext_len]
body = body[ext_len:]
oid_list = []
for i in xrange(oid_len):
(oid, ) = unpack('8s', body[:8])
body = body[8:]
oid_list.append(oid)
return (tid, user, desc, ext, oid_list)
decode_table[PacketTypes.ASK_STORE_TRANSACTION] = _decodeAskStoreTransaction
@handle_errors
def _decodeAnswerStoreTransaction(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[PacketTypes.ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction
@handle_errors
def _decodeAskObject(body):
(oid, serial, tid) = unpack('8s8s8s', body)
if serial == INVALID_TID:
serial = None
tid = _decodeTID(tid)
return (oid, serial, tid)
decode_table[PacketTypes.ASK_OBJECT] = _decodeAskObject
@handle_errors
def _decodeAnswerObject(body):
r = unpack('!8s8s8sBL', body[:29])
oid, serial_start, serial_end, compression, checksum = r
if serial_end == INVALID_TID:
serial_end = None
(data, _) = _readString(body, 'data', offset=29)
return (oid, serial_start, serial_end, compression, checksum, data)
decode_table[PacketTypes.ANSWER_OBJECT] = _decodeAnswerObject
@handle_errors
def _decodeAskTIDs(body):
return unpack('!QQL', body) # first, last, partition
decode_table[PacketTypes.ASK_TIDS] = _decodeAskTIDs
@handle_errors
def _decodeAnswerTIDs(body):
(n, ) = unpack('!L', body[:4])
tid_list = []
for i in xrange(n):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
tid_list.append(tid)
return (tid_list,)
decode_table[PacketTypes.ANSWER_TIDS] = _decodeAnswerTIDs
@handle_errors
def _decodeAskTransactionInformation(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[PacketTypes.ASK_TRANSACTION_INFORMATION] = _decodeAskTransactionInformation
@handle_errors
def _decodeAnswerTransactionInformation(body):
r = unpack('!8sHHHL', body[:18])
tid, user_len, desc_len, ext_len, oid_len = r
body = body[18:]
user = body[:user_len]
body = body[user_len:]
desc = body[:desc_len]
body = body[desc_len:]
ext = body[:ext_len]
body = body[ext_len:]
oid_list = []
for i in xrange(oid_len):
(oid, ) = unpack('8s', body[:8])
body = body[8:]
oid_list.append(oid)
return (tid, user, desc, ext, oid_list)
decode_table[PacketTypes.ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformation
@handle_errors
def _decodeAskObjectHistory(body):
(oid, first, last) = unpack('!8sQQ', body)
return (oid, first, last)
decode_table[PacketTypes.ASK_OBJECT_HISTORY] = _decodeAskObjectHistory
@handle_errors
def _decodeAnswerObjectHistory(body):
(oid, length) = unpack('!8sL', body[:12])
history_list = []
for i in xrange(12, 12 + length * 12, 12):
serial, size = unpack('!8sL', body[i:i+12])
history_list.append((serial, size))
return (oid, history_list)
decode_table[PacketTypes.ANSWER_OBJECT_HISTORY] = _decodeAnswerObjectHistory
@handle_errors
def _decodeAskOIDs(body):
return unpack('!QQL', body) # first, last, partition
decode_table[PacketTypes.ASK_OIDS] = _decodeAskOIDs
@handle_errors
def _decodeAnswerOIDs(body):
(n,) = unpack('!L', body[:4])
oid_list = []
for i in xrange(n):
oid = unpack('8s', body[4+i*8:12+i*8])[0]
oid_list.append(oid)
return (oid_list,)
decode_table[PacketTypes.ANSWER_OIDS] = _decodeAnswerOIDs
@handle_errors
def _decodeAskPartitionList(body):
(min_offset, max_offset, uuid) = unpack('!LL16s', body)
uuid = _decodeUUID(uuid)
return (min_offset, max_offset, uuid)
decode_table[PacketTypes.ASK_PARTITION_LIST] = _decodeAskPartitionList
@handle_errors
def _decodeAnswerPartitionList(body):
index = 12
(ptid, n) = unpack('!8sL', body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
class AnswerLastIDs(Packet):
"""
Reply to Ask Last IDs. S -> PM, PM -> S.
"""
def _encode(self, loid, ltid, lptid):
# in this case, loid is a valid OID but considered as invalid. This is not
# an issue because the OID 0 is hard coded and will never be generated
if loid is None:
loid = INVALID_OID
ltid = _encodeTID(ltid)
lptid = _encodePTID(lptid)
return loid + ltid + lptid
def _decode(self, body):
(loid, ltid, lptid) = unpack('!8s8s8s', body)
lptid = _decodePTID(lptid)
return (loid, ltid, lptid)
class AskPartitionTable(Packet):
"""
Ask rows in a partition table that a storage node stores. Used to recover
information. PM -> S.
"""
def _encode(self, offset_list):
body = [pack('!L', len(offset_list))]
for offset in offset_list:
body.append(pack('!L', offset))
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
offset_list = []
for i in xrange(n):
offset = unpack('!L', body[4+i*4:8+i*4])[0]
offset_list.append(offset)
return (offset_list,)
class AnswerPartitionTable(Packet):
"""
Answer rows in a partition table. S -> PM.
"""
def _encode(self, ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
return ''.join(body)
def _decode(self, body):
index = 12
(ptid, n) = unpack('!8sL', body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
class SendPartitionTable(Packet):
"""
Send rows in a partition table to update other nodes. PM -> S, C.
"""
def _encode(self, ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
return ''.join(body)
def _decode(self, body):
index = 12
(ptid, n,) = unpack('!8sL', body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
class NotifyPartitionChanges(Packet):
"""
Notify a subset of a partition table. This is used to notify changes.
PM -> S, C.
"""
def _encode(self, ptid, cell_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(cell_list))]
for offset, uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!L16sH', offset, uuid, state))
return ''.join(body)
def _decode(self, body):
(ptid, n) = unpack('!8sL', body[:12])
ptid = _decodePTID(ptid)
cell_list = []
for i in xrange(n):
(offset, uuid, state) = unpack('!L16sH', body[12+i*22:34+i*22])
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
decode_table[PacketTypes.ANSWER_PARTITION_LIST] = _decodeAnswerPartitionList
@handle_errors
def _decodeAskNodeList(body):
(node_type, ) = unpack('!H', body)
node_type = _decodeNodeType(node_type)
return (node_type,)
decode_table[PacketTypes.ASK_NODE_LIST] = _decodeAskNodeList
@handle_errors
def _decodeAnswerNodeList(body):
(n,) = unpack('!L', body[:4])
node_list = []
for i in xrange(n):
r = unpack('!H6s16sH', body[4+i*26:30+i*26])
node_type, address, uuid, state = r
address = _decodeAddress(address)
cell_list.append((offset, uuid, state))
return (ptid, cell_list)
class StartOperation(Packet):
"""
Tell a storage nodes to start an operation. Until a storage node receives
this message, it must not serve client nodes. PM -> S.
"""
pass
class StopOperation(Packet):
"""
Tell a storage node to stop an operation. Once a storage node receives
this message, it must not serve client nodes. PM -> S.
"""
pass
class AskUnfinishedTransactions(Packet):
"""
Ask unfinished transactions PM -> S.
"""
pass
class AnswerUnfinishedTransactions(Packet):
"""
Answer unfinished transactions S -> PM.
"""
def _encode(self, tid_list):
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
tid_list = []
for i in xrange(n):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
tid_list.append(tid)
return (tid_list,)
class AskObjectPresent(Packet):
"""
Ask if an object is present. If not present, OID_NOT_FOUND should be
returned. PM -> S.
"""
def _decode(self, body):
(oid, tid) = unpack('8s8s', body)
return (oid, _decodeTID(tid))
class AnswerObjectPresent(Packet):
"""
Answer that an object is present. PM -> S.
"""
def _decode(self, body):
(oid, tid) = unpack('8s8s', body)
return (oid, _decodeTID(tid))
class DeleteTransaction(Packet):
"""
Delete a transaction. PM -> S.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (_decodeTID(tid), )
class CommitTransaction(Packet):
"""
Commit a transaction. PM -> S.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (_decodeTID(tid), )
class AskBeginTransaction(Packet):
"""
Ask to begin a new transaction. C -> PM.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (_decodeTID(tid), )
class AnswerBeginTransaction(Packet):
"""
Answer when a transaction begin, give a TID if necessary. PM -> C.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (tid, )
class FinishTransaction(Packet):
"""
Finish a transaction. C -> PM.
"""
def _encode(self, oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
(tid, n) = unpack('!8sL', body[:12])
oid_list = []
for i in xrange(n):
oid = unpack('8s', body[12+i*8:20+i*8])[0]
oid_list.append(oid)
return (oid_list, tid)
class NotifyTransactionFinished(Packet):
"""
Notify a transaction finished. PM -> C.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (_decodeTID(tid), )
class LockInformation(Packet):
"""
Lock information on a transaction. PM -> S.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (_decodeTID(tid), )
class NotifyInformationLocked(Packet):
"""
Notify information on a transaction locked. S -> PM.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (_decodeTID(tid), )
class InvalidateObjects(Packet):
"""
Invalidate objects. PM -> C.
"""
def _encode(self, oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
(tid, n) = unpack('!8sL', body[:12])
oid_list = []
for i in xrange(12, 12 + n * 8, 8):
oid = unpack('8s', body[i:i+8])[0]
oid_list.append(oid)
return (oid_list, tid)
class UnlockInformation(Packet):
"""
Unlock information on a transaction. PM -> S.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (_decodeTID(tid), )
class AskNewOIDs(Packet):
"""
Ask new object IDs. C -> PM.
"""
def _encode(self, num_oids):
return pack('!H', num_oids)
def _decode(self, body):
return unpack('!H', body) # num oids
class AnswerNewOIDs(Packet):
"""
Answer new object IDs. PM -> C.
"""
def _encode(self, oid_list):
body = [pack('!H', len(oid_list))]
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!H', body[:2])
oid_list = []
for i in xrange(n):
oid = unpack('8s', body[2+i*8:10+i*8])[0]
oid_list.append(oid)
return (oid_list,)
class AskStoreObject(Packet):
"""
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
"""
def _encode(self, oid, serial, compression, checksum, data, tid):
if serial is None:
serial = INVALID_TID
return pack('!8s8s8sBLL', oid, serial, tid, compression,
checksum, len(data)) + data
def _decode(self, body):
r = unpack('!8s8s8sBL', body[:29])
oid, serial, tid, compression, checksum = r
(data, _) = _readString(body, 'data', offset=29)
return (oid, serial, compression, checksum, data, tid)
class AnswerStoreObject(Packet):
"""
Answer if an object has been stored. If an object is in conflict,
a serial of the conflicting transaction is returned. In this case,
if this serial is newer than the current transaction ID, a client
node must not try to resolve the conflict. S -> C.
"""
def _encode(self, conflicting, oid, serial):
if serial is None:
serial = INVALID_TID
return pack('!B8s8s', conflicting, oid, serial)
def _decode(self, body):
(conflicting, oid, serial) = unpack('!B8s8s', body)
return (conflicting, oid, serial)
class AbortTransaction(Packet):
"""
Abort a transaction. C -> S, PM.
"""
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (tid, )
class AskStoreTransaction(Packet):
"""
Ask to store a transaction. C -> S.
"""
def _encode(self, tid, user, desc, ext, oid_list):
lengths = (len(oid_list), len(user), len(desc), len(ext))
body = [pack('!8sLHHH', tid, *lengths)]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
r = unpack('!8sLHHH', body[:18])
tid, oid_len, user_len, desc_len, ext_len = r
body = body[18:]
user = body[:user_len]
body = body[user_len:]
desc = body[:desc_len]
body = body[desc_len:]
ext = body[:ext_len]
body = body[ext_len:]
oid_list = []
for i in xrange(oid_len):
(oid, ) = unpack('8s', body[:8])
body = body[8:]
oid_list.append(oid)
return (tid, user, desc, ext, oid_list)
class AnswerStoreTransaction(Packet):
"""
Answer if transaction has been stored. S -> C.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (tid, )
class AskObject(Packet):
"""
Ask a stored object by its OID and a serial or a TID if given. If a serial
is specified, the specified revision of an object will be returned. If
a TID is specified, an object right before the TID will be returned. S,C -> S.
"""
def _encode(self, oid, serial, tid):
tid = _encodeTID(tid)
serial = _encodeTID(serial) # serial is the previous TID
return pack('!8s8s8s', oid, serial, tid)
def _decode(self, body):
(oid, serial, tid) = unpack('8s8s8s', body)
if serial == INVALID_TID:
serial = None
tid = _decodeTID(tid)
return (oid, serial, tid)
class AnswerObject(Packet):
"""
Answer the requested object. S -> C.
"""
def _encode(self, oid, serial_start, serial_end, compression,
checksum, data):
if serial_start is None:
serial_start = INVALID_TID
if serial_end is None:
serial_end = INVALID_TID
return pack('!8s8s8sBLL', oid, serial_start, serial_end,
compression, checksum, len(data)) + data
def _decode(self, body):
r = unpack('!8s8s8sBL', body[:29])
oid, serial_start, serial_end, compression, checksum = r
if serial_end == INVALID_TID:
serial_end = None
(data, _) = _readString(body, 'data', offset=29)
return (oid, serial_start, serial_end, compression, checksum, data)
class AskTIDs(Packet):
"""
Ask for TIDs between a range of offsets. The order of TIDs is descending,
and the range is [first, last). C, S -> S.
"""
def _encode(self, first, last, partition):
return pack('!QQL', first, last, partition)
def _decode(self, body):
return unpack('!QQL', body) # first, last, partition
class AnswerTIDs(Packet):
"""
Answer the requested TIDs. S -> C, S.
"""
def _encode(self, tid_list):
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
return ''.join(body)
def _decode(self, body):
(n, ) = unpack('!L', body[:4])
tid_list = []
for i in xrange(n):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
tid_list.append(tid)
return (tid_list,)
class AskTransactionInformation(Packet):
"""
Ask information about a transaction. Any -> S.
"""
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (tid, )
class AnswerTransactionInformation(Packet):
"""
Answer information (user, description) about a transaction. S -> Any.
"""
def _encode(self, tid, user, desc, ext, oid_list):
body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext), len(oid_list))]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
r = unpack('!8sHHHL', body[:18])
tid, user_len, desc_len, ext_len, oid_len = r
body = body[18:]
user = body[:user_len]
body = body[user_len:]
desc = body[:desc_len]
body = body[desc_len:]
ext = body[:ext_len]
body = body[ext_len:]
oid_list = []
for i in xrange(oid_len):
(oid, ) = unpack('8s', body[:8])
body = body[8:]
oid_list.append(oid)
return (tid, user, desc, ext, oid_list)
class AskObjectHistory(Packet):
"""
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. C, S -> S.
"""
def _encode(self, oid, first, last):
return pack('!8sQQ', oid, first, last)
def _decode(self, body):
(oid, first, last) = unpack('!8sQQ', body)
return (oid, first, last)
class AnswerObjectHistory(Packet):
"""
Answer history information (serial, size) for an object. S -> C, S.
"""
def _encode(self, oid, history_list):
body = [pack('!8sL', oid, len(history_list))]
for serial, size in history_list:
body.append(pack('!8sL', serial, size))
return ''.join(body)
def _decode(self, body):
(oid, length) = unpack('!8sL', body[:12])
history_list = []
for i in xrange(12, 12 + length * 12, 12):
serial, size = unpack('!8sL', body[i:i+12])
history_list.append((serial, size))
return (oid, history_list)
class AskOIDs(Packet):
"""
Ask for OIDs between a range of offsets. The order of OIDs is descending,
and the range is [first, last). S -> S.
"""
def _encode(self, first, last, partition):
return pack('!QQL', first, last, partition)
def _decode(self, body):
return unpack('!QQL', body) # first, last, partition
class AnswerOIDs(Packet):
"""
Answer the requested OIDs. S -> S.
"""
def _encode(self, oid_list):
body = [pack('!L', len(oid_list))]
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
oid_list = []
for i in xrange(n):
oid = unpack('8s', body[4+i*8:12+i*8])[0]
oid_list.append(oid)
return (oid_list,)
class AskPartitionList(Packet):
"""
All the following messages are for neoctl to admin node
Ask information about partition
"""
def _encode(self, min_offset, max_offset, uuid):
uuid = _encodeUUID(uuid)
body = [pack('!LL16s', min_offset, max_offset, uuid)]
return ''.join(body)
def _decode(self, body):
(min_offset, max_offset, uuid) = unpack('!LL16s', body)
uuid = _decodeUUID(uuid)
return (min_offset, max_offset, uuid)
class AnswerPartitionList(Packet):
"""
Answer information about partition
"""
def _encode(self, ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
return ''.join(body)
def _decode(self, body):
index = 12
(ptid, n) = unpack('!8sL', body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
class AskNodeList(Packet):
"""
Ask information about nodes
"""
def _encode(self, node_type):
return ''.join([pack('!H', node_type)])
def _decode(self, body):
(node_type, ) = unpack('!H', body)
node_type = _decodeNodeType(node_type)
return (node_type,)
class AnswerNodeList(Packet):
"""
Answer information about nodes
"""
def _encode(self, node_list):
body = [pack('!L', len(node_list))]
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
node_list = []
for i in xrange(n):
r = unpack('!H6s16sH', body[4+i*26:30+i*26])
node_type, address, uuid, state = r
address = _decodeAddress(address)
node_type = _decodeNodeType(node_type)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
node_list.append((node_type, address, uuid, state))
return (node_list,)
class SetNodeState(Packet):
"""
Set the node state
"""
def _encode(self, uuid, state, modify_partition_table):
uuid = _encodeUUID(uuid)
return ''.join([pack('!16sHB', uuid, state, modify_partition_table)])
def _decode(self, body):
(uuid, state, modify) = unpack('!16sHB', body)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
return (uuid, state, modify)
class AnswerNodeState(Packet):
"""
Answer state of the node
"""
def _encode(self, uuid, state):
uuid = _encodeUUID(uuid)
return ''.join([pack('!16sH', uuid, state)])
def _decode(self, body):
(uuid, state) = unpack('!16sH', body)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
node_list.append((node_type, address, uuid, state))
return (node_list,)
decode_table[PacketTypes.ANSWER_NODE_LIST] = _decodeAnswerNodeList
@handle_errors
def _decodeSetNodeState(body):
(uuid, state, modify) = unpack('!16sHB', body)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
return (uuid, state, modify)
decode_table[PacketTypes.SET_NODE_STATE] = _decodeSetNodeState
@handle_errors
def _decodeAnswerNodeState(body):
(uuid, state) = unpack('!16sH', body)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
return (uuid, state)
decode_table[PacketTypes.ANSWER_NODE_STATE] = _decodeAnswerNodeState
@handle_errors
def _decodeAddPendingNodes(body):
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
uuid_list = map(_decodeUUID, uuid_list)
return (uuid_list, )
decode_table[PacketTypes.ADD_PENDING_NODES] = _decodeAddPendingNodes
@handle_errors
def _decodeAnswerNewNodes(body):
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
uuid_list = map(_decodeUUID, uuid_list)
return (uuid_list, )
decode_table[PacketTypes.ANSWER_NEW_NODES] = _decodeAnswerNewNodes
def _decodeAskNodeInformation(body):
pass # No payload
decode_table[PacketTypes.ASK_NODE_INFORMATION] = _decodeAskNodeInformation
decode_table[PacketTypes.ANSWER_NODE_INFORMATION] = _decodeNotifyNodeInformation
def _decodeAskClusterState(body):
return (uuid, state)
class AddPendingNodes(Packet):
"""
Ask the primary to include some pending node in the partition table
"""
def _encode(self, uuid_list=()):
# an empty list means all current pending nodes
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
return pack('!H', len(uuid_list)) + ''.join(uuid_list)
def _decode(self, body):
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
uuid_list = map(_decodeUUID, uuid_list)
return (uuid_list, )
class AnswerNewNodes(Packet):
"""
Anwer what are the nodes added in the partition table
"""
def _encode(self, uuid_list):
# an empty list means no new nodes
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
return pack('!H', len(uuid_list)) + ''.join(uuid_list)
def _decode(self, body):
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
uuid_list = map(_decodeUUID, uuid_list)
return (uuid_list, )
class NotifyNodeInformation(Packet):
"""
Notify information about one or more nodes. Any -> PM, PM -> Any.
"""
def _encode(self, node_list):
body = [pack('!L', len(node_list))]
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
node_list = []
for i in xrange(n):
r = unpack('!H6s16sH', body[4+i*26:30+i*26])
node_type, address, uuid, state = r
address = _decodeAddress(address)
node_type = _decodeNodeType(node_type)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
node_list.append((node_type, address, uuid, state))
return (node_list,)
class AskNodeInformation(Packet):
"""
Ask node information
"""
pass
class AnswerNodeInformation(Packet):
"""
Answer node information
"""
pass
class SetClusterState(Packet):
"""
Set the cluster state
"""
def _encode(self, state):
return pack('!H', state)
def _decode(self, body):
(state, ) = unpack('!H', body[:2])
state = _decodeClusterState(state)
return (state, )
class NotifyClusterInformation(Packet):
"""
Notify information about the cluster
"""
def _encode(self, state):
return pack('!H', state)
def _decode(self, body):
(state, ) = unpack('!H', body)
state = _decodeClusterState(state)
return (state, )
class AskClusterState(Packet):
"""
Ask state of the cluster
"""
pass
decode_table[PacketTypes.ASK_CLUSTER_STATE] = _decodeAskClusterState
@handle_errors
def _decodeAnswerClusterState(body):
(state, ) = unpack('!H', body)
state = _decodeClusterState(state)
return (state, )
decode_table[PacketTypes.ANSWER_CLUSTER_STATE] = _decodeAnswerClusterState
@handle_errors
def _decodeSetClusterState(body):
(state, ) = unpack('!H', body[:2])
state = _decodeClusterState(state)
return (state, )
decode_table[PacketTypes.SET_CLUSTER_STATE] = _decodeSetClusterState
@handle_errors
def _decodeNotifyClusterInformation(body):
(state, ) = unpack('!H', body)
state = _decodeClusterState(state)
return (state, )
decode_table[PacketTypes.NOTIFY_CLUSTER_INFORMATION] = _decodeNotifyClusterInformation
@handle_errors
def _decodeNotifyLastOID(body):
(loid, ) = unpack('8s', body)
return (loid, )
decode_table[PacketTypes.NOTIFY_LAST_OID] = _decodeNotifyLastOID
# Packet encoding
class AnswerClusterState(Packet):
"""
Answer state of the cluster
"""
def _encode(self, state):
return pack('!H', state)
def _decode(self, body):
(state, ) = unpack('!H', body)
state = _decodeClusterState(state)
return (state, )
class NotifyLastOID(Packet):
"""
Notify last OID generated
"""
def _decode(self, body):
(loid, ) = unpack('8s', body)
return (loid, )
class Error(Packet):
"""
Error is a special type of message, because this can be sent against
any other message, even if such a message does not expect a reply
usually. Any -> Any.
"""
def _encode(self, code, message):
return pack('!HL', code, len(message)) + message
def _decode(self, body):
(code, ) = unpack('!H', body[:2])
code = _decodeErrorCode(code)
(message, _) = _readString(body, 'message', offset=2)
return (code, message)
StaticRegistry = {}
def register(code, cls):
assert code not in StaticRegistry, "Duplicate packet code"
cls._code = code
StaticRegistry[code] = cls
return cls
class PacketRegistry(dict):
def __init__(self):
dict.__init__(self)
# TODO: self load and lookup cls in module from attr name:
# for attr in self:
# if issubclass(module.get(attr), Packet):
# cls = module.get(attr)
# code = ???
# self[code] = cls
self.update(StaticRegistry)
Error = register(0x8000, Error)
Ping = register(0x0001, Ping)
Pong = register(0x8001, Pong)
RequestIdentification = register(0x0002, RequestIdentification)
AcceptIdentification = register(0x8002, AcceptIdentification)
AskPrimary = register(0x0003, AskPrimary)
AnswerPrimary = register(0x8003, AnswerPrimary)
AnnouncePrimary = register(0x0004, AnnouncePrimary)
ReelectPrimary = register(0x0005, ReelectPrimary)
NotifyNodeInformation = register(0x0006, NotifyNodeInformation)
AskLastIDs = register(0x0007, AskLastIDs)
AnswerLastIDs = register(0x8007, AnswerLastIDs)
AskPartitionTable = register(0x0008, AskPartitionTable)
AnswerPartitionTable = register(0x8008, AnswerPartitionTable)
SendPartitionTable = register(0x0009, SendPartitionTable)
NotifyPartitionChanges = register(0x000A, NotifyPartitionChanges)
StartOperation = register(0x000B, StartOperation)
StopOperation = register(0x000C, StopOperation)
AskUnfinishedTransactions = register(0x000D, AskUnfinishedTransactions)
AnswerUnfinishedTransactions = register(0x800d, AnswerUnfinishedTransactions)
AskObjectPresent = register(0x000f, AskObjectPresent)
AnswerObjectPresent = register(0x800f, AnswerObjectPresent)
DeleteTransaction = register(0x0010, DeleteTransaction)
CommitTransaction = register(0x0011, CommitTransaction)
AskBeginTransaction = register(0x0012, AskBeginTransaction)
AnswerBeginTransaction = register(0x8012, AnswerBeginTransaction)
FinishTransaction = register(0x0013, FinishTransaction)
NotifyTransactionFinished = register(0x8013, NotifyTransactionFinished)
LockInformation = register(0x0014, LockInformation)
NotifyInformationLocked = register(0x8014, NotifyInformationLocked)
InvalidateObjects = register(0x0015, InvalidateObjects)
UnlockInformation = register(0x0016, UnlockInformation)
AskNewOIDs = register(0x0017, AskNewOIDs)
AnswerNewOIDs = register(0x8017, AnswerNewOIDs)
AskStoreObject = register(0x0018, AskStoreObject)
AnswerStoreObject = register(0x8018, AnswerStoreObject)
AbortTransaction = register(0x0019, AbortTransaction)
AskStoreTransaction = register(0x001A, AskStoreTransaction)
AnswerStoreTransaction = register(0x801A, AnswerStoreTransaction)
AskObject = register(0x001B, AskObject)
AnswerObject = register(0x801B, AnswerObject)
AskTIDs = register(0x001C, AskTIDs)
AnswerTIDs = register(0x801D, AnswerTIDs)
AskTransactionInformation = register(0x001E, AskTransactionInformation)
AnswerTransactionInformation = register(0x801E, AnswerTransactionInformation)
AskObjectHistory = register(0x001F, AskObjectHistory)
AnswerObjectHistory = register(0x801F, AnswerObjectHistory)
AskOIDs = register(0x0020, AskOIDs)
AnswerOIDs = register(0x8020, AnswerOIDs)
AskPartitionList = register(0x0021, AskPartitionList)
AnswerPartitionList = register(0x8021, AnswerPartitionList)
AskNodeList = register(0x0022, AskNodeList)
AnswerNodeList = register(0x8022, AnswerNodeList)
SetNodeState = register(0x0023, SetNodeState)
AnswerNodeState = register(0x8023, AnswerNodeState)
AddPendingNodes = register(0x0024, AddPendingNodes)
AnswerNewNodes = register(0x8024, AnswerNewNodes)
AskNodeInformation = register(0x0025, AskNodeInformation)
AnswerNodeInformation = register(0x8025, AnswerNodeInformation)
SetClusterState = register(0x0026, SetClusterState)
NotifyClusterInformation = register(0x0027, NotifyClusterInformation)
AskClusterState = register(0x0028, AskClusterState)
AnswerClusterState = register(0x8028, AnswerClusterState)
NotifyLastOID = register(0x0030, NotifyLastOID)
Packets = PacketRegistry()
def _error(error_code, error_message):
body = pack('!HL', error_code, len(error_message)) + error_message
return Packet(PacketTypes.ERROR, body)
return Error(error_code, error_message)
def noError(message):
return _error(ErrorCodes.NO_ERROR, message)
......@@ -1065,339 +1345,3 @@ def oidNotFound(error_message):
def tidNotFound(error_message):
return _error(ErrorCodes.TID_NOT_FOUND, 'tid not found: ' + error_message)
def ping():
return Packet(PacketTypes.PING)
def pong():
return Packet(PacketTypes.PONG)
def requestNodeIdentification(node_type, uuid, address, name):
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body = pack('!LLH16s6sL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
node_type, uuid, address, len(name)) + name
return Packet(PacketTypes.REQUEST_NODE_IDENTIFICATION, body)
def acceptNodeIdentification(node_type, uuid, address,
num_partitions, num_replicas, your_uuid):
uuid = _encodeUUID(uuid)
your_uuid = _encodeUUID(your_uuid)
address = _encodeAddress(address)
body = pack('!H16s6sLL16s', node_type, uuid, address,
num_partitions, num_replicas, your_uuid)
return Packet(PacketTypes.ACCEPT_NODE_IDENTIFICATION, body)
def askPrimaryMaster():
return Packet(PacketTypes.ASK_PRIMARY_MASTER)
def answerPrimaryMaster(primary_uuid, known_master_list):
primary_uuid = _encodeUUID(primary_uuid)
body = [primary_uuid, pack('!L', len(known_master_list))]
for address, uuid in known_master_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack('!6s16s', address, uuid))
body = ''.join(body)
return Packet(PacketTypes.ANSWER_PRIMARY_MASTER, body)
def announcePrimaryMaster():
return Packet(PacketTypes.ANNOUNCE_PRIMARY_MASTER)
def reelectPrimaryMaster():
return Packet(PacketTypes.REELECT_PRIMARY_MASTER)
def notifyNodeInformation(node_list):
body = [pack('!L', len(node_list))]
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body = ''.join(body)
return Packet(PacketTypes.NOTIFY_NODE_INFORMATION, body)
def askLastIDs():
return Packet(PacketTypes.ASK_LAST_IDS)
def answerLastIDs(loid, ltid, lptid):
# in this case, loid is a valid OID but considered as invalid. This is not
# an issue because the OID 0 is hard coded and will never be generated
if loid is None:
loid = INVALID_OID
ltid = _encodeTID(ltid)
lptid = _encodePTID(lptid)
return Packet(PacketTypes.ANSWER_LAST_IDS, loid + ltid + lptid)
def askPartitionTable(offset_list):
body = [pack('!L', len(offset_list))]
for offset in offset_list:
body.append(pack('!L', offset))
body = ''.join(body)
return Packet(PacketTypes.ASK_PARTITION_TABLE, body)
def answerPartitionTable(ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(PacketTypes.ANSWER_PARTITION_TABLE, body)
def sendPartitionTable(ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(PacketTypes.SEND_PARTITION_TABLE, body)
def notifyPartitionChanges(ptid, cell_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(cell_list))]
for offset, uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!L16sH', offset, uuid, state))
body = ''.join(body)
return Packet(PacketTypes.NOTIFY_PARTITION_CHANGES, body)
def startOperation():
return Packet(PacketTypes.START_OPERATION)
def stopOperation():
return Packet(PacketTypes.STOP_OPERATION)
def askUnfinishedTransactions():
return Packet(PacketTypes.ASK_UNFINISHED_TRANSACTIONS)
def answerUnfinishedTransactions(tid_list):
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
body = ''.join(body)
return Packet(PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS, body)
def askObjectPresent(oid, tid):
return Packet(PacketTypes.ASK_OBJECT_PRESENT, oid + tid)
def answerObjectPresent(oid, tid):
return Packet(PacketTypes.ANSWER_OBJECT_PRESENT, oid + tid)
def deleteTransaction(tid):
return Packet(PacketTypes.DELETE_TRANSACTION, tid)
def commitTransaction(tid):
return Packet(PacketTypes.COMMIT_TRANSACTION, tid)
def askBeginTransaction(tid):
tid = _encodeTID(tid)
return Packet(PacketTypes.ASK_BEGIN_TRANSACTION, tid)
def answerBeginTransaction(tid):
tid = _encodeTID(tid)
return Packet(PacketTypes.ANSWER_BEGIN_TRANSACTION, tid)
def askNewOIDs(num_oids):
return Packet(PacketTypes.ASK_NEW_OIDS, pack('!H', num_oids))
def answerNewOIDs(oid_list):
body = [pack('!H', len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(PacketTypes.ANSWER_NEW_OIDS, body)
def finishTransaction(oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(PacketTypes.FINISH_TRANSACTION, body)
def notifyTransactionFinished(tid):
return Packet(PacketTypes.NOTIFY_TRANSACTION_FINISHED, tid)
def lockInformation(tid):
return Packet(PacketTypes.LOCK_INFORMATION, tid)
def notifyInformationLocked(tid):
return Packet(PacketTypes.NOTIFY_INFORMATION_LOCKED, tid)
def invalidateObjects(oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(PacketTypes.INVALIDATE_OBJECTS, body)
def unlockInformation(tid):
return Packet(PacketTypes.UNLOCK_INFORMATION, tid)
def abortTransaction(tid):
return Packet(PacketTypes.ABORT_TRANSACTION, tid)
def askStoreTransaction(tid, user, desc, ext, oid_list):
lengths = (len(oid_list), len(user), len(desc), len(ext))
body = [pack('!8sLHHH', tid, *lengths)]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
body = ''.join(body)
return Packet(PacketTypes.ASK_STORE_TRANSACTION, body)
def answerStoreTransaction(tid):
return Packet(PacketTypes.ANSWER_STORE_TRANSACTION, tid)
def askStoreObject(oid, serial, compression, checksum, data, tid):
if serial is None:
serial = INVALID_TID
body = pack('!8s8s8sBLL', oid, serial, tid, compression,
checksum, len(data)) + data
return Packet(PacketTypes.ASK_STORE_OBJECT, body)
def answerStoreObject(conflicting, oid, serial):
if serial is None:
serial = INVALID_TID
body = pack('!B8s8s', conflicting, oid, serial)
return Packet(PacketTypes.ANSWER_STORE_OBJECT, body)
def askObject(oid, serial, tid):
tid = _encodeTID(tid)
serial = _encodeTID(serial) # serial is the previous TID
return Packet(PacketTypes.ASK_OBJECT, pack('!8s8s8s', oid, serial, tid))
def answerObject(oid, serial_start, serial_end, compression,
checksum, data):
if serial_start is None:
serial_start = INVALID_TID
if serial_end is None:
serial_end = INVALID_TID
body = pack('!8s8s8sBLL', oid, serial_start, serial_end,
compression, checksum, len(data)) + data
return Packet(PacketTypes.ANSWER_OBJECT, body)
def askTIDs(first, last, partition):
return Packet(PacketTypes.ASK_TIDS, pack('!QQL', first, last, partition))
def answerTIDs(tid_list):
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
body = ''.join(body)
return Packet(PacketTypes.ANSWER_TIDS, body)
def askTransactionInformation(tid):
return Packet(PacketTypes.ASK_TRANSACTION_INFORMATION, pack('!8s', tid))
def answerTransactionInformation(tid, user, desc, ext, oid_list):
body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext), len(oid_list))]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
body = ''.join(body)
return Packet(PacketTypes.ANSWER_TRANSACTION_INFORMATION, body)
def askObjectHistory(oid, first, last):
return Packet(PacketTypes.ASK_OBJECT_HISTORY, pack('!8sQQ', oid, first, last))
def answerObjectHistory(oid, history_list):
body = [pack('!8sL', oid, len(history_list))]
for serial, size in history_list:
body.append(pack('!8sL', serial, size))
body = ''.join(body)
return Packet(PacketTypes.ANSWER_OBJECT_HISTORY, body)
def askOIDs(first, last, partition):
return Packet(PacketTypes.ASK_OIDS, pack('!QQL', first, last, partition))
def answerOIDs(oid_list):
body = [pack('!L', len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(PacketTypes.ANSWER_OIDS, body)
def askPartitionList(min_offset, max_offset, uuid):
uuid = _encodeUUID(uuid)
body = [pack('!LL16s', min_offset, max_offset, uuid)]
body = ''.join(body)
return Packet(PacketTypes.ASK_PARTITION_LIST, body)
def answerPartitionList(ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))]
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(PacketTypes.ANSWER_PARTITION_LIST, body)
def askNodeList(node_type):
body = [pack('!H', node_type)]
body = ''.join(body)
return Packet(PacketTypes.ASK_NODE_LIST, body)
def answerNodeList(node_list):
body = [pack('!L', len(node_list))]
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body = ''.join(body)
return Packet(PacketTypes.ANSWER_NODE_LIST, body)
def setNodeState(uuid, state, modify_partition_table):
uuid = _encodeUUID(uuid)
body = [pack('!16sHB', uuid, state, modify_partition_table)]
body = ''.join(body)
return Packet(PacketTypes.SET_NODE_STATE, body)
def answerNodeState(uuid, state):
uuid = _encodeUUID(uuid)
body = [pack('!16sH', uuid, state)]
body = ''.join(body)
return Packet(PacketTypes.ANSWER_NODE_STATE, body)
def addPendingNodes(uuid_list=()):
# an empty list means all current pending nodes
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
body = pack('!H', len(uuid_list)) + ''.join(uuid_list)
return Packet(PacketTypes.ADD_PENDING_NODES, body)
def answerNewNodes(uuid_list):
# an empty list means no new nodes
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
body = pack('!H', len(uuid_list)) + ''.join(uuid_list)
return Packet(PacketTypes.ANSWER_NEW_NODES, body)
def askNodeInformation():
return Packet(PacketTypes.ASK_NODE_INFORMATION)
def answerNodeInformation(node_list):
body = [pack('!L', len(node_list))]
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body = ''.join(body)
return Packet(PacketTypes.ANSWER_NODE_INFORMATION, body)
def askClusterState():
return Packet(PacketTypes.ASK_CLUSTER_STATE)
def answerClusterState(state):
body = pack('!H', state)
return Packet(PacketTypes.ANSWER_CLUSTER_STATE, body)
def setClusterState(state):
body = pack('!H', state)
return Packet(PacketTypes.SET_CLUSTER_STATE, body)
def notifyClusterInformation(state):
body = pack('!H', state)
return Packet(PacketTypes.NOTIFY_CLUSTER_INFORMATION, body)
def notifyLastOID(oid):
body = pack('!8s', oid)
return Packet(PacketTypes.NOTIFY_LAST_OID, body)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment