Commit 5529b30f authored by Grégory Wisniewski's avatar Grégory Wisniewski

New protocol parser, semantic oriented.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2652 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 6ffb55d5
...@@ -160,7 +160,7 @@ class PrimaryAnswersHandler(AnswerBaseHandler): ...@@ -160,7 +160,7 @@ class PrimaryAnswersHandler(AnswerBaseHandler):
self.app.setHandlerData(ttid) self.app.setHandlerData(ttid)
def answerNewOIDs(self, conn, oid_list): def answerNewOIDs(self, conn, oid_list):
self.app.new_oid_list = oid_list self.app.new_oid_list = list(oid_list)
def answerTransactionFinished(self, conn, _, tid): def answerTransactionFinished(self, conn, _, tid):
self.app.setHandlerData(tid) self.app.setHandlerData(tid)
......
...@@ -133,10 +133,13 @@ class EventHandler(object): ...@@ -133,10 +133,13 @@ class EventHandler(object):
def notify(self, conn, message): def notify(self, conn, message):
neo.lib.logging.info('notification from %r: %s', conn, message) neo.lib.logging.info('notification from %r: %s', conn, message)
def requestIdentification(self, conn, node_type, def requestIdentification(self, conn, node_type, uuid, address, name):
uuid, address, name):
raise UnexpectedPacketError raise UnexpectedPacketError
def _requestIdentification(self, conn, protocol, node_type,
uuid, address, name):
self.requestIdentification(conn, node_type, uuid, address, name)
def acceptIdentification(self, conn, node_type, def acceptIdentification(self, conn, node_type,
uuid, num_partitions, num_replicas, your_uuid): uuid, num_partitions, num_replicas, your_uuid):
raise UnexpectedPacketError raise UnexpectedPacketError
...@@ -428,7 +431,7 @@ class EventHandler(object): ...@@ -428,7 +431,7 @@ class EventHandler(object):
d[Packets.Error] = self.error d[Packets.Error] = self.error
d[Packets.Notify] = self.notify d[Packets.Notify] = self.notify
d[Packets.RequestIdentification] = self.requestIdentification d[Packets.RequestIdentification] = self._requestIdentification
d[Packets.AcceptIdentification] = self.acceptIdentification d[Packets.AcceptIdentification] = self.acceptIdentification
d[Packets.AskPrimary] = self.askPrimary d[Packets.AskPrimary] = self.askPrimary
d[Packets.AnswerPrimary] = self.answerPrimary d[Packets.AnswerPrimary] = self.answerPrimary
......
...@@ -37,8 +37,11 @@ class PacketLogger(object): ...@@ -37,8 +37,11 @@ class PacketLogger(object):
klass = packet.getType() klass = packet.getType()
uuid = dump(conn.getUUID()) uuid = dump(conn.getUUID())
ip, port = conn.getAddress() ip, port = conn.getAddress()
packet_name = packet.__class__.__name__
if packet.isResponse() and packet._request is not None:
packet_name += packet._request.__name__
neo.lib.logging.debug('#0x%08x %-30s %s %s (%s:%d)', packet.getId(), neo.lib.logging.debug('#0x%08x %-30s %s %s (%s:%d)', packet.getId(),
packet.__class__.__name__, direction, uuid, ip, port) packet_name, direction, uuid, ip, port)
# look for custom packet logger # look for custom packet logger
logger = self.packet_dispatch_table.get(klass, None) logger = self.packet_dispatch_table.get(klass, None)
logger = logger and getattr(self, logger.im_func.__name__, None) logger = logger and getattr(self, logger.im_func.__name__, None)
......
...@@ -15,12 +15,13 @@ ...@@ -15,12 +15,13 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from struct import pack, unpack, error, calcsize import sys
import traceback
from types import ClassType
from socket import inet_ntoa, inet_aton from socket import inet_ntoa, inet_aton
from neo.lib.profiling import profiler_decorator
from cStringIO import StringIO from cStringIO import StringIO
from neo.lib.util import Enum from neo.lib.util import Enum, Struct
# The protocol version (major, minor). # The protocol version (major, minor).
PROTOCOL_VERSION = (4, 1) PROTOCOL_VERSION = (4, 1)
...@@ -28,13 +29,13 @@ PROTOCOL_VERSION = (4, 1) ...@@ -28,13 +29,13 @@ PROTOCOL_VERSION = (4, 1)
# Size restrictions. # Size restrictions.
MIN_PACKET_SIZE = 10 MIN_PACKET_SIZE = 10
MAX_PACKET_SIZE = 0x4000000 MAX_PACKET_SIZE = 0x4000000
PACKET_HEADER_FORMAT = '!LHL' PACKET_HEADER_FORMAT = Struct('!LHL')
PACKET_HEADER_SIZE = calcsize(PACKET_HEADER_FORMAT)
# Check that header size is the expected value. # Check that header size is the expected value.
# If it is not, it means that struct module result is incompatible with # If it is not, it means that struct module result is incompatible with
# "reference" platform (python 2.4 on x86-64). # "reference" platform (python 2.4 on x86-64).
assert PACKET_HEADER_SIZE == 10, \ assert PACKET_HEADER_FORMAT.size == 10, \
'Unsupported platform, packet header length = %i' % (PACKET_HEADER_SIZE, ) 'Unsupported platform, packet header length = %i' % \
(PACKET_HEADER_FORMAT.size, )
RESPONSE_MASK = 0x8000 RESPONSE_MASK = 0x8000
class ErrorCodes(Enum): class ErrorCodes(Enum):
...@@ -109,6 +110,7 @@ INVALID_UUID = '\0' * 16 ...@@ -109,6 +110,7 @@ INVALID_UUID = '\0' * 16
INVALID_TID = '\xff' * 8 INVALID_TID = '\xff' * 8
INVALID_OID = '\xff' * 8 INVALID_OID = '\xff' * 8
INVALID_PARTITION = 0xffffffff INVALID_PARTITION = 0xffffffff
INVALID_ADDRESS = ('0.0.0.0', 0)
ZERO_TID = '\0' * 8 ZERO_TID = '\0' * 8
ZERO_OID = '\0' * 8 ZERO_OID = '\0' * 8
OID_LEN = len(INVALID_OID) OID_LEN = len(INVALID_OID)
...@@ -141,128 +143,42 @@ class BrokenNodeDisallowedError(ProtocolError): ...@@ -141,128 +143,42 @@ class BrokenNodeDisallowedError(ProtocolError):
""" Just close the connection """ """ Just close the connection """
pass pass
# packet parser
def _decodeClusterState(state):
cluster_state = ClusterStates.get(state)
if cluster_state is None:
raise PacketMalformedError('invalid cluster state %d' % state)
return cluster_state
def _decodeNodeState(state):
node_state = NodeStates.get(state)
if node_state is None:
raise PacketMalformedError('invalid node state %d' % state)
return node_state
def _decodeNodeType(original_node_type):
node_type = NodeTypes.get(original_node_type)
if node_type is None:
raise PacketMalformedError('invalid node type %d' % original_node_type)
return node_type
def _decodeErrorCode(original_error_code):
error_code = ErrorCodes.get(original_error_code)
if error_code is None:
raise PacketMalformedError('invalid error code %d' %
original_error_code)
return error_code
def _decodeLockState(original_lock_state):
lock_state = LockState.get(original_lock_state)
if lock_state is None:
raise PacketMalformedError('invalid lock state %d' % (
original_lock_state, ))
return lock_state
def _decodeAddress(address):
if address == '\0' * 6:
return None
(ip, port) = unpack('!4sH', address)
return (inet_ntoa(ip), port)
def _encodeAddress(address):
if address is None:
return '\0' * 6
# address is a tuple (ip, port)
return pack('!4sH', inet_aton(address[0]), address[1])
def _decodeUUID(uuid):
if uuid == INVALID_UUID:
return None
return uuid
def _encodeUUID(uuid):
if uuid is None:
return INVALID_UUID
return uuid
def _decodePTID(ptid):
ptid = unpack('!Q', ptid)[0]
if ptid == 0:
return None
return ptid
def _encodePTID(ptid):
if ptid is None:
ptid = 0
assert isinstance(ptid, (int, long)), ptid
return pack('!Q', ptid)
def _decodeTID(tid):
if tid == INVALID_TID:
return None
return tid
def _encodeTID(tid):
if tid is None:
return INVALID_TID
return tid
def _decodeString(buf, name, offset=0):
buf = buf[offset:]
(size, ) = unpack('!L', buf[:4])
string = buf[4:4+size]
if len(string) != size:
raise PacketMalformedError("can't read string <%s>" % name)
return (string, buf[offset+4+size:])
@profiler_decorator
def _encodeString(buf):
return pack('!L', len(buf)) + buf
class Packet(object): class Packet(object):
""" """
Base class for any packet definition. Base class for any packet definition. The _fmt class attribute must be
Each subclass should override _encode() and _decode() and return a string or defined for any non-empty packet.
a tuple respectively.
""" """
_ignore_when_closed = False _ignore_when_closed = False
_header_format = None
_header_len = None
_request = None _request = None
_answer = None _answer = None
_body = None _body = None
_code = None _code = None
_fmt = None
_id = None _id = None
def __init__(self, *args, **kw): def __init__(self, *args, **kw):
args = list(args)
assert self._code is not None, "Packet class not registered" assert self._code is not None, "Packet class not registered"
if args != () or kw != {}: if args or kw:
body = self._encode(*args, **kw) assert self._fmt is not None
buf = StringIO()
# load named arguments
for item in self._fmt._items[len(args):]:
args.append(kw.get(item._name))
self._fmt.encode(buf.write, args)
body = buf.getvalue()
else: else:
body = '' body = ''
self._body = body self._body = body
def decode(self): def decode(self):
assert self._body is not None assert self._body is not None
if self._fmt is None:
return ()
buf = StringIO(self._body)
try: try:
return self._decode(self._body) return self._fmt.decode(buf.read)
except error, msg: # struct.error except ParseError, msg:
name = self.__class__.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg))
except PacketMalformedError, msg:
name = self.__class__.__name__ name = self.__class__.__name__
raise PacketMalformedError("%s fail (%s)" % (name, msg)) raise PacketMalformedError("%s fail (%s)" % (name, msg))
...@@ -278,23 +194,17 @@ class Packet(object): ...@@ -278,23 +194,17 @@ class Packet(object):
assert self._id is not None, "No identifier applied on the packet" assert self._id is not None, "No identifier applied on the packet"
return self._id return self._id
def getCode(self):
return self._code
def getType(self): def getType(self):
return self.__class__ return self.__class__
@profiler_decorator
def encode(self): def encode(self):
""" Encode a packet as a string to send it over the network """ """ Encode a packet as a string to send it over the network """
content = self._body content = self._body
length = PACKET_HEADER_SIZE + len(content) length = PACKET_HEADER_FORMAT.size + len(content)
return (pack(PACKET_HEADER_FORMAT, self._id, self._code, length), return (PACKET_HEADER_FORMAT.pack(self._id, self._code, length), content)
content)
@profiler_decorator
def __len__(self): def __len__(self):
return PACKET_HEADER_SIZE + len(self._body) return PACKET_HEADER_FORMAT.size + len(self._body)
def __repr__(self): def __repr__(self):
return '%s[%r]' % (self.__class__.__name__, self._id) return '%s[%r]' % (self.__class__.__name__, self._id)
...@@ -306,17 +216,6 @@ class Packet(object): ...@@ -306,17 +216,6 @@ class Packet(object):
assert isinstance(other, Packet) assert isinstance(other, Packet)
return self._code == other._code return self._code == other._code
def _encode(self, *args, **kw):
""" Default encoder, join all arguments """
args = list(args)
args.extend(kw.values())
return ''.join([str(i) for i in args] or '')
def _decode(self, body):
""" Default decoder, message must be empty """
assert body == '', "Non-empty packet decoding not implemented """
return ()
def isError(self): def isError(self):
return isinstance(self, Error) return isinstance(self, Error)
...@@ -333,1230 +232,907 @@ class Packet(object): ...@@ -333,1230 +232,907 @@ class Packet(object):
""" """
return self._ignore_when_closed return self._ignore_when_closed
class Notify(Packet): class ParseError(Exception):
""" """
General purpose notification (remote logging) An exception that encapsulate another and build the 'path' of the
packet item that generate the error.
""" """
def _encode(self, message): def __init__(self, item, trace):
return message Exception.__init__(self)
self._trace = trace
self._items = [item]
def _decode(self, body): def append(self, item):
return (body, ) self._items.append(item)
class Ping(Packet): def __repr__(self):
chain = '/'.join([item.getName() for item in reversed(self._items)])
return 'at %s:\n%s' % (chain, self._trace)
__str__ = __repr__
# packet parsers
class PItem(object):
""" """
Check if a peer is still alive. Any -> Any. Base class for any packet item, _encode and _decode must be overriden
by subclasses.
""" """
pass def __init__(self, name):
self._name = name
def __repr__(self):
return self.__class__.__name__
def getName(self):
return self._name
def _trace(self, method, *args):
try:
return method(*args)
except ParseError, e:
# trace and forward exception
e.append(self)
raise
except Exception:
# original exception, encapsulate it
trace = ''.join(traceback.format_exception(*sys.exc_info())[2:])
raise ParseError(self, trace)
def encode(self, writer, items):
return self._trace(self._encode, writer, items)
def decode(self, reader):
return self._trace(self._decode, reader)
def _encode(self, writer):
raise NotImplementedError, self.__class__.__name__
class Pong(Packet): def _decode(self, reader):
raise NotImplementedError, self.__class__.__name__
class PStruct(PItem):
""" """
Notify being alive. Any -> Any. Aggregate other items
""" """
pass def __init__(self, name, *items):
PItem.__init__(self, name)
self._items = items
class RequestIdentification(Packet): def _encode(self, writer, items):
assert len(self._items) == len(items), (items, self._items)
for item, value in zip(self._items, items):
item.encode(writer, value)
def _decode(self, reader):
return tuple([item.decode(reader) for item in self._items])
class PStructItem(PItem):
""" """
Request a node identification. This must be the first packet for any A single value encoded with struct
connection. Any -> Any. """
def __init__(self, name, fmt):
PItem.__init__(self, name)
struct = Struct(fmt)
self.pack = struct.pack
self.unpack = struct.unpack
self.size = struct.size
def _encode(self, writer, value):
writer(self.pack(value))
def _decode(self, reader):
return self.unpack(reader(self.size))[0]
class PList(PStructItem):
"""
A list of homogeneous items
"""
def __init__(self, name, item):
PStructItem.__init__(self, name, '!L')
self._item = item
def _encode(self, writer, items):
assert isinstance(items, (list, tuple, set)), (type(items), items)
writer(self.pack(len(items)))
item = self._item
for value in items:
item.encode(writer, value)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
item = self._item
return [item.decode(reader) for _ in xrange(length)]
class PDict(PStructItem):
"""
A dictionary with custom key and value formats
"""
def __init__(self, name, key, value):
PStructItem.__init__(self, name, '!L')
self._key = key
self._value = value
def _encode(self, writer, item):
assert isinstance(item , dict), (type(item), item)
writer(self.pack(len(item)))
key, value = self._key, self._value
for k, v in item.iteritems():
key.encode(writer, k)
value.encode(writer, v)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
key, value = self._key, self._value
new_dict = {}
for _ in xrange(length):
k = key.decode(reader)
v = value.decode(reader)
new_dict[k] = v
return new_dict
class PEnum(PStructItem):
"""
Encapsulate an enumeration value
"""
def __init__(self, name, enum):
PStructItem.__init__(self, name, '!L')
self._enum = enum
def _encode(self, writer, item):
assert isinstance(item, int), item
writer(self.pack(item))
def _decode(self, reader):
code = self.unpack(reader(self.size))[0]
try:
return self._enum[code]
except KeyError:
enum = self._enum.__class__.__name__
raise ValueError, 'Invalid code for %s enum: %r' % (enum, code)
class PAddress(PStructItem):
""" """
_header_format = '!LLH16s6s' An host address (IPv4 for now)
"""
def _encode(self, node_type, uuid, address, name): def __init__(self, name):
uuid = _encodeUUID(uuid) PStructItem.__init__(self, name, '!4sH')
address = _encodeAddress(address)
return pack(self._header_format, PROTOCOL_VERSION[0], def _encode(self, writer, address):
PROTOCOL_VERSION[1], node_type, uuid, address) + \ if address is None:
_encodeString(name) address = INVALID_ADDRESS
assert len(address) == 2, address
def _decode(self, body): host, port = address
r = unpack(self._header_format, body[:self._header_len]) host = inet_aton(host)
major, minor, node_type, uuid, address = r writer(self.pack(host, port))
address = _decodeAddress(address)
(name, _) = _decodeString(body, 'name', offset=self._header_len) def _decode(self, reader):
node_type = _decodeNodeType(node_type) data = reader(self.size)
uuid = _decodeUUID(uuid) host, port = self.unpack(data)
host = inet_ntoa(host)
if (host, port) == INVALID_ADDRESS:
return None
return (host, port)
class PString(PStructItem):
"""
A variable-length string
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!L')
def _encode(self, writer, value):
writer(self.pack(len(value)))
writer(value)
def _decode(self, reader):
length = self.unpack(reader(self.size))[0]
return reader(length)
class PBoolean(PStructItem):
"""
A boolean value, encoded as a single byte
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!B')
def _encode(self, writer, value):
writer(self.pack(bool(value)))
def _decode(self, reader):
return bool(self.unpack(reader(self.size))[0])
class PNumber(PStructItem):
"""
A integer number (4-bytes length)
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!L')
class PChecksum(PStructItem):
"""
A checksum
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!Q')
class PIndex(PStructItem):
"""
A big integer to defined indexes in a huge list.
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!Q')
class PPTID(PStructItem):
"""
A None value means an invalid PTID
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!Q')
def _encode(self, writer, value):
if value is None:
value = 0
PStructItem._encode(self, writer, value)
def _decode(self, reader):
value = PStructItem._decode(self, reader)
if value == 0:
value = None
return value
class PProtocol(PStructItem):
"""
The protocol version definition
"""
def __init__(self, name):
PStructItem.__init__(self, name, '!LL')
def _encode(self, writer, version):
writer(self.pack(*version))
def _decode(self, reader):
major, minor = self.unpack(reader(self.size))
if (major, minor) != PROTOCOL_VERSION: if (major, minor) != PROTOCOL_VERSION:
raise PacketMalformedError('protocol version mismatch') raise ProtocolError('protocol version mismatch')
return (node_type, uuid, address, name) return (major, minor)
class AcceptIdentification(Packet): class PUUID(PItem):
""" """
Accept a node identification. This should be a reply to Request Node An UUID (node identifier)
Identification. Any -> Any.
""" """
_header_format = '!H16sLL16s' def _encode(self, writer, uuid):
if uuid is None:
uuid = INVALID_UUID
assert len(uuid) == 16, (len(uuid), uuid)
writer(uuid)
def _encode(self, node_type, uuid, def _decode(self, reader):
num_partitions, num_replicas, your_uuid): uuid = reader(16)
uuid = _encodeUUID(uuid) if uuid == INVALID_UUID:
your_uuid = _encodeUUID(your_uuid) uuid = None
return pack(self._header_format, node_type, uuid, return uuid
num_partitions, num_replicas, your_uuid)
class PTID(PItem):
"""
A transaction identifier
"""
def _encode(self, writer, tid):
if tid is None:
tid = INVALID_TID
assert len(tid) == 8, (len(tid), tid)
writer(tid)
def _decode(self, body): def _decode(self, reader):
r = unpack(self._header_format, body) tid = reader(8)
node_type, uuid, num_partitions, num_replicas, your_uuid = r if tid == INVALID_TID:
node_type = _decodeNodeType(node_type) tid = None
uuid = _decodeUUID(uuid) return tid
your_uuid = _decodeUUID(your_uuid)
return (node_type, uuid, num_partitions, num_replicas, your_uuid) # same definition, for now
POID = PTID
# common definitions
PFEmpty = PStruct('no_content')
PFNodeType = PEnum('type', NodeTypes)
PFNodeState = PEnum('state', NodeStates)
PFCellState = PEnum('state', CellStates)
PFNodeList = PList('node_list',
PStruct('node',
PFNodeType,
PAddress('address'),
PUUID('uuid'),
PFNodeState,
),
)
PFCellList = PList('cell_list',
PStruct('cell',
PUUID('uuid'),
PFCellState,
),
)
PFRowList = PList('row_list',
PStruct('row',
PNumber('offset'),
PFCellList,
),
)
PFHistoryList = PList('history_list',
PStruct('history_entry',
PTID('serial'),
PNumber('size'),
),
)
PFUUIDList = PList('uuid_list',
PUUID('uuid'),
)
PFTidList = PList('tid_list',
PTID('tid'),
)
PFOidList = PList('oid_list',
POID('oid'),
)
# packets definition
class AskPrimary(Packet): class Notify(Packet):
""" """
Ask a current primary master node. This must be the second message when General purpose notification (remote logging)
connecting to a master node. Any -> M.
""" """
pass _fmt = PStruct('notify',
PString('message'),
)
class Error(Packet):
"""
Error is a special type of message, because this can be sent against
any other message, even if such a message does not expect a reply
usually. Any -> Any.
"""
_fmt = PStruct('error',
PNumber('code'),
PString('message'),
)
class Ping(Packet):
"""
Check if a peer is still alive. Any -> Any.
"""
_answer = PFEmpty
class RequestIdentification(Packet):
"""
Request a node identification. This must be the first packet for any
connection. Any -> Any.
"""
_fmt = PStruct('request_identification',
PProtocol('protocol_version'),
PFNodeType,
PUUID('uuid'),
PAddress('address'),
PString('name'),
)
_answer = PStruct('accept_identification',
PFNodeType,
PUUID('my_uuid'),
PNumber('num_partitions'),
PNumber('num_replicas'),
PUUID('your_uuid'),
)
def __init__(self, *args, **kw):
if args or kw:
# always announce current protocol version
args = list(args)
args.insert(0, PROTOCOL_VERSION)
super(RequestIdentification, self).__init__(*args, **kw)
class AnswerPrimary(Packet): class PrimaryMaster(Packet):
""" """
Ask a current primary master node. This must be the second message when
connecting to a master node. Any -> M.
Reply to Ask Primary Master. This message includes a list of known master Reply to Ask Primary Master. This message includes a list of known master
nodes to make sure that a peer has the same information. M -> Any. nodes to make sure that a peer has the same information. M -> Any.
""" """
_header_format = '!16sL' _answer = PStruct('answer_primary',
_list_entry_format = '!6s16s' PUUID('primary_uuid'),
_list_entry_len = calcsize(_list_entry_format) PList('known_master_list',
PStruct('master',
def _encode(self, primary_uuid, known_master_list): PAddress('address'),
primary_uuid = _encodeUUID(primary_uuid) PUUID('uuid'),
body = [pack(self._header_format, primary_uuid, ),
len(known_master_list))] ),
for address, uuid in known_master_list: )
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack(self._list_entry_format, address, uuid))
return ''.join(body)
def _decode(self, body):
packet_offset = self._header_len
(primary_uuid, n) = unpack(self._header_format,
body[:packet_offset])
known_master_list = []
list_entry_len = self._list_entry_len
list_entry_format = self._list_entry_format
for _ in xrange(n):
next_packet_offset = packet_offset + list_entry_len
address, uuid = unpack(list_entry_format,
body[packet_offset:next_packet_offset])
packet_offset = next_packet_offset
address = _decodeAddress(address)
uuid = _decodeUUID(uuid)
known_master_list.append((address, uuid))
primary_uuid = _decodeUUID(primary_uuid)
return (primary_uuid, known_master_list)
class AnnouncePrimary(Packet): class AnnouncePrimary(Packet):
""" """
Announce a primary master node election. PM -> SM. Announce a primary master node election. PM -> SM.
""" """
pass
class ReelectPrimary(Packet): class ReelectPrimary(Packet):
""" """
Force a re-election of a primary master node. M -> M. Force a re-election of a primary master node. M -> M.
""" """
pass
class AskLastIDs(Packet): class LastIDs(Packet):
""" """
Ask the last OID, the last TID and the last Partition Table ID that Ask the last OID, the last TID and the last Partition Table ID that
a storage node stores. Used to recover information. PM -> S, S -> PM. a storage node stores. Used to recover information. PM -> S, S -> PM.
"""
pass
class AnswerLastIDs(Packet):
"""
Reply to Ask Last IDs. S -> PM, PM -> S. Reply to Ask Last IDs. S -> PM, PM -> S.
""" """
def _encode(self, loid, ltid, lptid): _answer = PStruct('answer_last_ids',
# in this case, loid is a valid OID but considered as invalid. This is POID('last_oid'),
# not an issue because the OID 0 is hard coded and will never be PTID('last_tid'),
# generated PPTID('last_ptid'),
if loid is None: )
loid = INVALID_OID
ltid = _encodeTID(ltid)
lptid = _encodePTID(lptid)
return loid + ltid + lptid
def _decode(self, body):
(loid, ltid, lptid) = unpack('!8s8s8s', body)
if loid == INVALID_OID:
loid = None
ltid = _decodeTID(ltid)
lptid = _decodePTID(lptid)
return (loid, ltid, lptid)
class AskPartitionTable(Packet):
"""
Ask the full partition table. PM -> S.
"""
pass
class AnswerPartitionTable(Packet): class PartitionTable(Packet):
""" """
Ask the full partition table. PM -> S.
Answer rows in a partition table. S -> PM. Answer rows in a partition table. S -> PM.
""" """
_header_format = '!8sL' _answer = PStruct('answer_partition_table',
_row_entry_format = '!LL' PPTID('ptid'),
_row_entry_len = calcsize(_row_entry_format) PFRowList,
_cell_entry_format = '!16sH' )
_cell_entry_len = calcsize(_cell_entry_format)
class NotifyPartitionTable(Packet):
def _encode(self, ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack(self._header_format, ptid, len(row_list))]
row_entry_format = self._row_entry_format
cell_entry_format = self._cell_entry_format
for offset, cell_list in row_list:
body.append(pack(row_entry_format, offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack(cell_entry_format, uuid, state))
return ''.join(body)
def _decode(self, body):
index = self._header_len
(ptid, n) = unpack(self._header_format, body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
row_entry_format = self._row_entry_format
row_entry_len = self._row_entry_len
cell_entry_format = self._cell_entry_format
cell_entry_len = self._cell_entry_len
for _ in xrange(n):
next_index = index + row_entry_len
offset, m = unpack(row_entry_format, body[index:next_index])
index = next_index
for _ in xrange(m):
next_index = index + cell_entry_len
uuid, state = unpack(cell_entry_format, body[index:next_index])
index = next_index
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
class SendPartitionTable(Packet):
""" """
Send rows in a partition table to update other nodes. PM -> S, C. Send rows in a partition table to update other nodes. PM -> S, C.
""" """
_header_format = '!8sL' _fmt = PStruct('send_partition_table',
_row_entry_format = '!LL' PPTID('ptid'),
_row_entry_len = calcsize(_row_entry_format) PFRowList,
_cell_entry_format = '!16sH' )
_cell_entry_len = calcsize(_cell_entry_format)
class PartitionChanges(Packet):
def _encode(self, ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack(self._header_format, ptid, len(row_list))]
row_entry_format = self._row_entry_format
cell_entry_format = self._cell_entry_format
for offset, cell_list in row_list:
body.append(pack(row_entry_format, offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack(cell_entry_format, uuid, state))
return ''.join(body)
def _decode(self, body):
index = self._header_len
(ptid, n,) = unpack(self._header_format, body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
row_entry_format = self._row_entry_format
row_entry_len = self._row_entry_len
cell_entry_format = self._cell_entry_format
cell_entry_len = self._cell_entry_len
for _ in xrange(n):
next_index = index + row_entry_len
offset, m = unpack(row_entry_format, body[index:next_index])
index = next_index
for _ in xrange(m):
next_index = index + cell_entry_len
uuid, state = unpack(cell_entry_format, body[index:next_index])
index = next_index
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
class NotifyPartitionChanges(Packet):
""" """
Notify a subset of a partition table. This is used to notify changes. Notify a subset of a partition table. This is used to notify changes.
PM -> S, C. PM -> S, C.
""" """
_header_format = '!8sL' _fmt = PStruct('notify_partition_changes',
_list_entry_format = '!L16sH' PPTID('ptid'),
_list_entry_len = calcsize(_list_entry_format) PList('cell_list',
PStruct('cell',
def _encode(self, ptid, cell_list): PNumber('offset'),
ptid = _encodePTID(ptid) PUUID('uuid'),
body = [pack(self._header_format, ptid, len(cell_list))] PFNodeState,
list_entry_format = self._list_entry_format ),
for offset, uuid, state in cell_list: ),
uuid = _encodeUUID(uuid) )
body.append(pack(list_entry_format, offset, uuid, state))
return ''.join(body) class ReplicationDone(Packet):
def _decode(self, body):
packet_offset = self._header_len
(ptid, n) = unpack(self._header_format, body[:packet_offset])
ptid = _decodePTID(ptid)
cell_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_packet_offset = packet_offset + list_entry_len
(offset, uuid, state) = unpack(list_entry_format,
body[packet_offset:next_packet_offset])
packet_offset = next_packet_offset
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((offset, uuid, state))
return (ptid, cell_list)
class NotifyReplicationDone(Packet):
""" """
Notify the master node that a partition has been successully replicated from Notify the master node that a partition has been successully replicated from
a storage to another. a storage to another.
S -> M S -> M
""" """
_header_format = '!L' _fmt = PStruct('notify_replication_done',
PNumber('offset'),
def _encode(self, offset): )
return pack(self._header_format, offset)
def _decode(self, body):
(offset, ) = unpack(self._header_format, body)
return (offset, )
class StartOperation(Packet): class StartOperation(Packet):
""" """
Tell a storage nodes to start an operation. Until a storage node receives Tell a storage nodes to start an operation. Until a storage node receives
this message, it must not serve client nodes. PM -> S. this message, it must not serve client nodes. PM -> S.
""" """
pass
class StopOperation(Packet): class StopOperation(Packet):
""" """
Tell a storage node to stop an operation. Once a storage node receives Tell a storage node to stop an operation. Once a storage node receives
this message, it must not serve client nodes. PM -> S. this message, it must not serve client nodes. PM -> S.
""" """
pass
class AskUnfinishedTransactions(Packet): class UnfinishedTransactions(Packet):
""" """
Ask unfinished transactions PM -> S. Ask unfinished transactions PM -> S.
"""
pass
class AnswerUnfinishedTransactions(Packet):
"""
Answer unfinished transactions S -> PM. Answer unfinished transactions S -> PM.
""" """
_header_format = '!8sL' _answer = PStruct('answer_unfinished_transactions',
_list_entry_format = '8s' PTID('max_tid'),
_list_entry_len = calcsize(_list_entry_format) PList('tid_list',
PTID('unfinished_tid'),
def _encode(self, max_tid, tid_list): ),
body = [pack(self._header_format, max_tid, len(tid_list))] )
body.extend(tid_list)
return ''.join(body) class ObjectPresent(Packet):
def _decode(self, body):
offset = self._header_len
(max_tid, n) = unpack(self._header_format, body[:offset])
tid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_offset = offset + list_entry_len
tid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
tid_list.append(tid)
return (max_tid, tid_list)
class AskObjectPresent(Packet):
""" """
Ask if an object is present. If not present, OID_NOT_FOUND should be Ask if an object is present. If not present, OID_NOT_FOUND should be
returned. PM -> S. returned. PM -> S.
"""
def _decode(self, body):
(oid, tid) = unpack('8s8s', body)
return (oid, _decodeTID(tid))
class AnswerObjectPresent(Packet):
"""
Answer that an object is present. PM -> S. Answer that an object is present. PM -> S.
""" """
def _decode(self, body): _fmt = PStruct('object_present',
(oid, tid) = unpack('8s8s', body) POID('oid'),
return (oid, _decodeTID(tid)) PTID('tid'),
)
_answer = PStruct('object_present',
POID('oid'),
PTID('tid'),
)
class DeleteTransaction(Packet): class DeleteTransaction(Packet):
""" """
Delete a transaction. PM -> S. Delete a transaction. PM -> S.
""" """
_header_format = '!8sL' _fmt = PStruct('delete_transaction',
_list_entry_format = '8s' PTID('tid'),
_list_entry_len = calcsize(_list_entry_format) PFOidList,
)
def _encode(self, tid, oid_list):
body = [pack(self._header_format, tid, len(oid_list))]
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
offset = self._header_len
(tid, n) = unpack(self._header_format, body[:offset])
oid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_offset = offset + list_entry_len
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
oid_list.append(oid)
return (tid, oid_list)
class CommitTransaction(Packet): class CommitTransaction(Packet):
""" """
Commit a transaction. PM -> S. Commit a transaction. PM -> S.
""" """
def _encode(self, tid): _fmt = PStruct('commit_transaction',
return _encodeTID(tid) PTID('tid'),
)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (_decodeTID(tid), )
class AskBeginTransaction(Packet): class BeginTransaction(Packet):
""" """
Ask to begin a new transaction. C -> PM. Ask to begin a new transaction. C -> PM.
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
return (_decodeTID(unpack('8s', body)[0]), )
class AnswerBeginTransaction(Packet):
"""
Answer when a transaction begin, give a TID if necessary. PM -> C. Answer when a transaction begin, give a TID if necessary. PM -> C.
""" """
def _encode(self, tid): _fmt = PStruct('ask_begin_transaction',
return _encodeTID(tid) PTID('tid'),
)
def _decode(self, body): _answer = PStruct('answer_begin_transaction',
(tid, ) = unpack('8s', body) PTID('tid'),
return (tid, ) )
class AskFinishTransaction(Packet): class FinishTransaction(Packet):
""" """
Finish a transaction. C -> PM. Finish a transaction. C -> PM.
Answer when a transaction is finished. PM -> C.
""" """
_header_format = '!8sL' _fmt = PStruct('ask_finish_transaction',
_list_entry_format = '8s' PTID('tid'),
_list_entry_len = calcsize(_list_entry_format) PFOidList,
)
def _encode(self, tid, oid_list):
body = [pack(self._header_format, tid, len(oid_list))] _answer = PStruct('answer_information_locked',
body.extend(oid_list) PTID('ttid'),
return ''.join(body) PTID('tid'),
)
def _decode(self, body):
offset = self._header_len
(tid, n) = unpack(self._header_format, body[:offset])
oid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_offset = offset + list_entry_len
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
oid_list.append(oid)
return (tid, oid_list)
class NotifyTransactionFinished(Packet): class NotifyTransactionFinished(Packet):
""" """
Notify that a transaction blocking a replication is now finished Notify that a transaction blocking a replication is now finished
M -> S M -> S
""" """
def _encode(self, ttid, max_tid): _fmt = PStruct('notify_transaction_finished',
return _encodeTID(ttid) + _encodeTID(max_tid) PTID('ttid'),
PTID('max_tid'),
def _decode(self, body): )
(ttid, max_tid) = unpack('8s8s', body)
return (ttid, max_tid)
class AnswerTransactionFinished(Packet):
"""
Answer when a transaction is finished. PM -> C.
"""
def _encode(self, ttid, tid):
return _encodeTID(ttid) + _encodeTID(tid)
def _decode(self, body):
(ttid, tid) = unpack('8s8s', body)
return (_decodeTID(ttid), _decodeTID(tid))
class AskLockInformation(Packet): class LockInformation(Packet):
""" """
Lock information on a transaction. PM -> S. Lock information on a transaction. PM -> S.
"""
# XXX: Identical to InvalidateObjects and AskFinishTransaction
_header_format = '!8s8sL'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, ttid, tid, oid_list):
body = [pack(self._header_format, ttid, tid, len(oid_list))]
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
offset = self._header_len
(ttid, tid, n) = unpack(self._header_format, body[:offset])
oid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_offset = offset + list_entry_len
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
oid_list.append(oid)
return (ttid, tid, oid_list)
class AnswerInformationLocked(Packet):
"""
Notify information on a transaction locked. S -> PM. Notify information on a transaction locked. S -> PM.
""" """
def _encode(self, tid): _fmt = PStruct('ask_lock_informations',
return _encodeTID(tid) PTID('ttid'),
PTID('tid'),
PFOidList,
)
def _decode(self, body): _answer = PStruct('answer_information_locked',
(tid, ) = unpack('8s', body) PTID('tid'),
return (_decodeTID(tid), ) )
class InvalidateObjects(Packet): class InvalidateObjects(Packet):
""" """
Invalidate objects. PM -> C. Invalidate objects. PM -> C.
""" """
_header_format = '!8sL' _fmt = PStruct('ask_finish_transaction',
_list_entry_format = '8s' PTID('tid'),
_list_entry_len = calcsize(_list_entry_format) PFOidList,
)
def _encode(self, tid, oid_list):
body = [pack(self._header_format, tid, len(oid_list))] class UnlockInformation(Packet):
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
offset = self._header_len
(tid, n) = unpack(self._header_format, body[:offset])
oid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_offset = offset + list_entry_len
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
oid_list.append(oid)
return (tid, oid_list)
class NotifyUnlockInformation(Packet):
""" """
Unlock information on a transaction. PM -> S. Unlock information on a transaction. PM -> S.
""" """
def _encode(self, tid): _fmt = PStruct('notify_unlock_information',
return _encodeTID(tid) PTID('tid'),
)
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (_decodeTID(tid), )
class AskNewOIDs(Packet): class GenerateOIDs(Packet):
""" """
Ask new object IDs. C -> PM. Ask new object IDs. C -> PM.
Answer new object IDs. PM -> C.
""" """
_header_format = '!H' _fmt = PStruct('ask_new_oids',
PNumber('num_oids'),
def _encode(self, num_oids): )
return pack(self._header_format, num_oids)
def _decode(self, body): _answer = PStruct('answer_new_oids',
return unpack(self._header_format, body) # num oids PFOidList,
)
class AnswerNewOIDs(Packet): class StoreObject(Packet):
"""
Answer new object IDs. PM -> C.
"""
_header_format = '!H'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, oid_list):
body = [pack(self._header_format, len(oid_list))]
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
offset = self._header_len
(n,) = unpack(self._header_format, body[:offset])
oid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_offset = offset + list_entry_len
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
oid_list.append(oid)
return (oid_list,)
class AskStoreObject(Packet):
""" """
Ask to store an object. Send an OID, an original serial, a current Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S. transaction ID, and data. C -> S.
"""
_header_format = '!8s8s8sBL8sB'
@profiler_decorator
def _encode(self, oid, serial, compression, checksum, data, data_serial,
tid, unlock):
if serial is None:
serial = INVALID_TID
if data_serial is None:
data_serial = INVALID_TID
unlock = unlock and 1 or 0
return pack(self._header_format, oid, serial, tid, compression,
checksum, data_serial, unlock) + _encodeString(data)
def _decode(self, body):
header_len = self._header_len
r = unpack(self._header_format, body[:header_len])
oid, serial, tid, compression, checksum, data_serial, unlock = r
serial = _decodeTID(serial)
data_serial = _decodeTID(data_serial)
(data, _) = _decodeString(body, 'data', offset=header_len)
return (oid, serial, compression, checksum, data, data_serial, tid,
bool(unlock))
class AnswerStoreObject(Packet):
"""
Answer if an object has been stored. If an object is in conflict, Answer if an object has been stored. If an object is in conflict,
a serial of the conflicting transaction is returned. In this case, a serial of the conflicting transaction is returned. In this case,
if this serial is newer than the current transaction ID, a client if this serial is newer than the current transaction ID, a client
node must not try to resolve the conflict. S -> C. node must not try to resolve the conflict. S -> C.
""" """
_header_format = '!B8s8s' _fmt = PStruct('ask_store_object',
POID('oid'),
def _encode(self, conflicting, oid, serial): PTID('serial'),
if serial is None: PBoolean('compression'),
serial = INVALID_TID PNumber('checksum'),
return pack(self._header_format, conflicting, oid, serial) PString('data'),
PTID('data_serial'),
PTID('tid'),
PBoolean('unlock'),
)
def _decode(self, body): _answer = PStruct('answer_store_object',
(conflicting, oid, serial) = unpack(self._header_format, body) PBoolean('conflicting'),
return (conflicting, oid, serial) POID('oid'),
PTID('serial'),
)
class AbortTransaction(Packet): class AbortTransaction(Packet):
""" """
Abort a transaction. C -> S, PM. Abort a transaction. C -> S, PM.
""" """
def _decode(self, body): _fmt = PStruct('abort_transaction',
(tid, ) = unpack('8s', body) PTID('tid'),
return (tid, ) )
class AskStoreTransaction(Packet): class StoreTransaction(Packet):
""" """
Ask to store a transaction. C -> S. Ask to store a transaction. C -> S.
"""
_header_format = '!8sLHHH'
def _encode(self, tid, user, desc, ext, oid_list):
lengths = (len(oid_list), len(user), len(desc), len(ext))
body = [pack(self._header_format, tid, *lengths)]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
r = unpack(self._header_format, body[:self._header_len])
tid, oid_len, user_len, desc_len, ext_len = r
body = body[self._header_len:]
user = body[:user_len]
body = body[user_len:]
desc = body[:desc_len]
body = body[desc_len:]
ext = body[:ext_len]
body = body[ext_len:]
oid_list = []
for _ in xrange(oid_len):
(oid, ) = unpack('8s', body[:8])
body = body[8:]
oid_list.append(oid)
return (tid, user, desc, ext, oid_list)
class AnswerStoreTransaction(Packet):
"""
Answer if transaction has been stored. S -> C. Answer if transaction has been stored. S -> C.
""" """
def _encode(self, tid): _fmt = PStruct('ask_store_transaction',
return _encodeTID(tid) PTID('tid'),
PString('user'),
PString('description'),
PString('extension'),
PFOidList,
)
def _decode(self, body): _answer = PStruct('answer_store_transaction',
(tid, ) = unpack('8s', body) PTID('tid'),
return (tid, ) )
class AskObject(Packet): class GetObject(Packet):
""" """
Ask a stored object by its OID and a serial or a TID if given. If a serial Ask a stored object by its OID and a serial or a TID if given. If a serial
is specified, the specified revision of an object will be returned. If is specified, the specified revision of an object will be returned. If
a TID is specified, an object right before the TID will be returned. S,C -> S. a TID is specified, an object right before the TID will be returned. S,C -> S.
Answer the requested object. S -> C.
""" """
_header_format = '!8s8s8s' _fmt = PStruct('ask_object',
POID('oid'),
def _encode(self, oid, serial, tid): PTID('serial'),
tid = _encodeTID(tid) PTID('tid'),
serial = _encodeTID(serial) # serial is the previous TID )
return pack(self._header_format, oid, serial, tid)
def _decode(self, body): _answer = PStruct('answer_object',
(oid, serial, tid) = unpack(self._header_format, body) POID('oid'),
if serial == INVALID_TID: PTID('serial_start'),
serial = None PTID('serial_end'),
tid = _decodeTID(tid) PBoolean('compression'),
return (oid, serial, tid) PNumber('checksum'),
PString('data'),
PTID('data_serial'),
)
class AnswerObject(Packet): class TIDList(Packet):
"""
Answer the requested object. S -> C.
"""
_header_format = '!8s8s8s8sBL'
def _encode(self, oid, serial_start, serial_end, compression,
checksum, data, data_serial):
if serial_start is None:
serial_start = INVALID_TID
if serial_end is None:
serial_end = INVALID_TID
if data_serial is None:
data_serial = INVALID_TID
return pack(self._header_format, oid, serial_start, serial_end,
data_serial, compression, checksum) + _encodeString(data)
def _decode(self, body):
header_len = self._header_len
r = unpack(self._header_format, body[:header_len])
oid, serial_start, serial_end, data_serial, compression, checksum = r
if serial_end == INVALID_TID:
serial_end = None
if data_serial == INVALID_TID:
data_serial = None
(data, _) = _decodeString(body, 'data', offset=header_len)
return (oid, serial_start, serial_end, compression, checksum, data,
data_serial)
class AskTIDs(Packet):
""" """
Ask for TIDs between a range of offsets. The order of TIDs is descending, Ask for TIDs between a range of offsets. The order of TIDs is descending,
and the range is [first, last). C -> S. and the range is [first, last). C -> S.
Answer the requested TIDs. S -> C.
""" """
_header_format = '!QQL' _fmt = PStruct('ask_tids',
PIndex('first'),
def _encode(self, first, last, partition): PIndex('last'),
return pack(self._header_format, first, last, partition) PNumber('partition'),
)
def _decode(self, body): _answer = PStruct('answer_tids',
return unpack(self._header_format, body) # first, last, partition PFTidList,
)
class AnswerTIDs(Packet): class TIDListFrom(Packet):
"""
Answer the requested TIDs. S -> C.
"""
_header_format = '!L'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, tid_list):
body = [pack(self._header_format, len(tid_list))]
body.extend(tid_list)
return ''.join(body)
def _decode(self, body):
offset = self._header_len
(n, ) = unpack(self._header_format, body[:offset])
tid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_offset = offset + list_entry_len
tid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
tid_list.append(tid)
return (tid_list,)
class AskTIDsFrom(Packet):
""" """
Ask for length TIDs starting at min_tid. The order of TIDs is ascending. Ask for length TIDs starting at min_tid. The order of TIDs is ascending.
S -> S. S -> S.
"""
_header_format = '!8s8sLL'
_list_entry_format = 'L'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, min_tid, max_tid, length, partition_list):
body = [pack(self._header_format, min_tid, max_tid, length,
len(partition_list))]
list_entry_format = self._list_entry_format
for partition in partition_list:
body.append(pack(list_entry_format, partition))
return ''.join(body)
def _decode(self, body):
body = StringIO(body)
read = body.read
header = unpack(self._header_format, read(self._header_len))
min_tid, max_tid, length, list_length = header
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
partition_list = []
for _ in xrange(list_length):
partition = unpack(list_entry_format, read(list_entry_len))[0]
partition_list.append(partition)
return (min_tid, max_tid, length, partition_list)
class AnswerTIDsFrom(AnswerTIDs):
"""
Answer the requested TIDs. S -> S Answer the requested TIDs. S -> S
""" """
pass _fmt = PStruct('tid_list_from',
PTID('min_tid'),
PTID('max_tid'),
PNumber('length'),
PList('partition_list',
PNumber('partition'),
),
)
class AskTransactionInformation(Packet): _answer = PStruct('answer_tids',
""" PFTidList,
Ask information about a transaction. Any -> S. )
"""
def _decode(self, body):
(tid, ) = unpack('8s', body)
return (tid, )
class AnswerTransactionInformation(Packet): class TransactionInformation(Packet):
""" """
Ask information about a transaction. Any -> S.
Answer information (user, description) about a transaction. S -> Any. Answer information (user, description) about a transaction. S -> Any.
""" """
_header_format = '!8sHHHBL' _fmt = PStruct('ask_transaction_information',
PTID('tid'),
def _encode(self, tid, user, desc, ext, packed, oid_list): )
packed = packed and 1 or 0
body = [pack(self._header_format, tid, len(user), len(desc), len(ext),
packed, len(oid_list))]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
return ''.join(body)
def _decode(self, body):
r = unpack(self._header_format, body[:self._header_len])
tid, user_len, desc_len, ext_len, packed, oid_len = r
packed = bool(packed)
body = body[self._header_len:]
user = body[:user_len]
body = body[user_len:]
desc = body[:desc_len]
body = body[desc_len:]
ext = body[:ext_len]
body = body[ext_len:]
oid_list = []
for _ in xrange(oid_len):
(oid, ) = unpack('8s', body[:8])
body = body[8:]
oid_list.append(oid)
return (tid, user, desc, ext, packed, oid_list)
class AskObjectHistory(Packet):
"""
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. C -> S.
"""
_header_format = '!8sQQ'
def _encode(self, oid, first, last):
return pack(self._header_format, oid, first, last)
def _decode(self, body): _answer = PStruct('answer_transaction_information',
(oid, first, last) = unpack(self._header_format, body) PTID('tid'),
return (oid, first, last) PString('user'),
PString('description'),
PString('extension'),
PBoolean('packed'),
PFOidList,
)
class AnswerObjectHistory(Packet): class ObjectHistory(Packet):
""" """
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. C -> S.
Answer history information (serial, size) for an object. S -> C. Answer history information (serial, size) for an object. S -> C.
""" """
_header_format = '!8sL' _fmt = PStruct('ask_object_history',
_list_entry_format = '!8sL' POID('oid'),
_list_entry_len = calcsize(_list_entry_format) PIndex('first'),
PIndex('last'),
def _encode(self, oid, history_list): )
body = [pack(self._header_format, oid, len(history_list))]
list_entry_format = self._list_entry_format _answer = PStruct('answer_object_history',
for serial, size in history_list: POID('oid'),
body.append(pack(list_entry_format, serial, size)) PFHistoryList,
return ''.join(body) )
def _decode(self, body):
offset = self._header_len
(oid, length) = unpack(self._header_format, body[:offset])
history_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(length):
next_offset = offset + list_entry_len
serial, size = unpack(list_entry_format, body[offset:next_offset])
offset = next_offset
history_list.append((serial, size))
return (oid, history_list)
class AskObjectHistoryFrom(Packet): class ObjectHistoryFrom(Packet):
""" """
Ask history information for a given object. The order of serials is Ask history information for a given object. The order of serials is
ascending, and starts at (or above) min_serial for min_oid. S -> S. ascending, and starts at (or above) min_serial for min_oid. S -> S.
Answer the requested serials. S -> S.
""" """
_header_format = '!8s8s8sLL' _fmt = PStruct('ask_object_history',
POID('min_oid'),
def _encode(self, min_oid, min_serial, max_serial, length, partition): PTID('min_serial'),
return pack(self._header_format, min_oid, min_serial, max_serial, PTID('max_serial'),
length, partition) PNumber('length'),
PNumber('partition'),
)
def _decode(self, body): _answer = PStruct('ask_finish_transaction',
# min_oid, min_serial, length, partition PDict('object_dict',
return unpack(self._header_format, body) POID('oid'),
PFTidList,
),
)
class AnswerObjectHistoryFrom(Packet): class PartitionList(Packet):
"""
Answer the requested serials. S -> S.
"""
_header_format = '!L'
_list_entry_format = '!8sL'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, object_dict):
body = [pack(self._header_format, len(object_dict))]
append = body.append
extend = body.extend
list_entry_format = self._list_entry_format
for oid, serial_list in object_dict.iteritems():
append(pack(list_entry_format, oid, len(serial_list)))
extend(serial_list)
return ''.join(body)
def _decode(self, body):
body = StringIO(body)
read = body.read
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
object_dict = {}
dict_len = unpack(self._header_format, read(self._header_len))[0]
for _ in xrange(dict_len):
oid, serial_len = unpack(list_entry_format, read(list_entry_len))
object_dict[oid] = [read(TID_LEN) for _ in xrange(serial_len)]
return (object_dict, )
class AskPartitionList(Packet):
""" """
All the following messages are for neoctl to admin node All the following messages are for neoctl to admin node
Ask information about partition Ask information about partition
"""
_header_format = '!LL16s'
def _encode(self, min_offset, max_offset, uuid):
uuid = _encodeUUID(uuid)
body = [pack(self._header_format, min_offset, max_offset, uuid)]
return ''.join(body)
def _decode(self, body):
(min_offset, max_offset, uuid) = unpack(self._header_format, body)
uuid = _decodeUUID(uuid)
return (min_offset, max_offset, uuid)
class AnswerPartitionList(Packet):
"""
Answer information about partition Answer information about partition
""" """
_header_format = '!8sL' _fmt = PStruct('ask_partition_list',
_row_entry_format = '!LL' PNumber('min_offset'),
_row_entry_len = calcsize(_row_entry_format) PNumber('max_offset'),
_cell_entry_format = '!16sH' PUUID('uuid'),
_cell_entry_len = calcsize(_cell_entry_format) )
def _encode(self, ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack(self._header_format, ptid, len(row_list))]
row_entry_format = self._row_entry_format
cell_entry_format = self._cell_entry_format
for offset, cell_list in row_list:
body.append(pack(row_entry_format, offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack(cell_entry_format, uuid, state))
return ''.join(body)
def _decode(self, body):
index = self._header_len
(ptid, n) = unpack(self._header_format, body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
row_entry_format = self._row_entry_format
row_entry_len = self._row_entry_len
cell_entry_format = self._cell_entry_format
cell_entry_len = self._cell_entry_len
for _ in xrange(n):
next_index = index + row_entry_len
offset, m = unpack(row_entry_format, body[index:next_index])
index = next_index
for _ in xrange(m):
next_index = index + cell_entry_len
uuid, state = unpack(cell_entry_format, body[index:next_index])
index = next_index
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
class AskNodeList(Packet):
"""
Ask information about nodes
"""
_header_format = '!H'
def _encode(self, node_type):
return ''.join([pack(self._header_format, node_type)])
def _decode(self, body): _answer = PStruct('answer_partition_list',
(node_type, ) = unpack(self._header_format, body) PPTID('ptid'),
node_type = _decodeNodeType(node_type) PFRowList,
return (node_type,) )
class AnswerNodeList(Packet): class NodeList(Packet):
""" """
Ask information about nodes
Answer information about nodes Answer information about nodes
""" """
_header_format = '!L' _fmt = PStruct('ask_node_list',
_list_entry_format = '!H6s16sH' PFNodeType,
_list_entry_len = calcsize(_list_entry_format) )
def _encode(self, node_list): _answer = PStruct('answer_node_list',
body = [pack(self._header_format, len(node_list))] PFNodeList,
list_entry_format = self._list_entry_format )
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack(list_entry_format, node_type, address, uuid,
state))
return ''.join(body)
def _decode(self, body):
offset = self._header_len
(n,) = unpack(self._header_format, body[:offset])
node_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_offset = offset + list_entry_len
r = unpack(list_entry_format, body[offset:next_offset])
offset = next_offset
node_type, address, uuid, state = r
address = _decodeAddress(address)
node_type = _decodeNodeType(node_type)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
node_list.append((node_type, address, uuid, state))
return (node_list,)
class SetNodeState(Packet): class SetNodeState(Packet):
""" """
Set the node state Set the node state
""" """
_header_format = '!16sHB' _fmt = PStruct('set_node_state',
PUUID('uuid'),
def _encode(self, uuid, state, modify_partition_table): PFNodeState,
uuid = _encodeUUID(uuid) PBoolean('modify_partition_table'),
return ''.join([pack(self._header_format, uuid, state, )
modify_partition_table)])
def _decode(self, body): _answer = Error
(uuid, state, modify) = unpack(self._header_format, body)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
return (uuid, state, modify)
class AddPendingNodes(Packet): class AddPendingNodes(Packet):
""" """
Ask the primary to include some pending node in the partition table Ask the primary to include some pending node in the partition table
""" """
_header_format = '!H' _fmt = PStruct('add_pending_nodes',
_list_header_format = '!16s' PFUUIDList,
_list_header_len = calcsize(_list_header_format) )
def _encode(self, uuid_list=()): _answer = Error
list_header_format = self._list_header_format
# an empty list means all current pending nodes
uuid_list = [pack(list_header_format, _encodeUUID(uuid)) \
for uuid in uuid_list]
return pack(self._header_format, len(uuid_list)) + ''.join(uuid_list)
def _decode(self, body):
header_len = self._header_len
(n, ) = unpack(self._header_format, body[:header_len])
list_header_format = self._list_header_format
list_header_len = self._list_header_len
uuid_list = [unpack(list_header_format,
body[header_len+i*list_header_len:\
header_len+(i+1)*list_header_len])[0] for i in xrange(n)]
uuid_list = [_decodeUUID(x) for x in uuid_list]
return (uuid_list, )
class NotifyNodeInformation(Packet): class NotifyNodeInformation(Packet):
""" """
Notify information about one or more nodes. PM -> Any. Notify information about one or more nodes. PM -> Any.
""" """
_header_format = '!L' _fmt = PStruct('notify_node_informations',
_list_entry_format = '!H6s16sH' PFNodeList,
_list_entry_len = calcsize(_list_entry_format) )
def _encode(self, node_list):
body = [pack(self._header_format, len(node_list))]
list_entry_format = self._list_entry_format
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack(list_entry_format, node_type, address, uuid,
state))
return ''.join(body)
def _decode(self, body):
offset = self._header_len
(n,) = unpack(self._header_format, body[:offset])
node_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for _ in xrange(n):
next_offset = offset + list_entry_len
r = unpack(list_entry_format, body[offset:next_offset])
offset = next_offset
node_type, address, uuid, state = r
address = _decodeAddress(address)
node_type = _decodeNodeType(node_type)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
node_list.append((node_type, address, uuid, state))
return (node_list,)
class AskNodeInformation(Packet):
"""
Ask node information
"""
pass
class AnswerNodeInformation(Packet): class NodeInformation(Packet):
""" """
Answer node information Ask node information
""" """
pass _answer = PFEmpty
class SetClusterState(Packet): class SetClusterState(Packet):
""" """
Set the cluster state Set the cluster state
""" """
_header_format = '!H' _fmt = PStruct('set_cluster_state',
PEnum('state', ClusterStates),
def _encode(self, state): )
return pack(self._header_format, state)
def _decode(self, body): _answer = Error
(state, ) = unpack(self._header_format, body[:self._header_len])
state = _decodeClusterState(state)
return (state, )
class NotifyClusterInformation(Packet): class ClusterInformation(Packet):
""" """
Notify information about the cluster Notify information about the cluster
""" """
_header_format = '!H' _fmt = PStruct('notify_cluster_information',
PEnum('state', ClusterStates),
def _encode(self, state): )
return pack(self._header_format, state)
def _decode(self, body):
(state, ) = unpack(self._header_format, body)
state = _decodeClusterState(state)
return (state, )
class AskClusterState(Packet): class ClusterState(Packet):
""" """
Ask state of the cluster Ask state of the cluster
"""
pass
class AnswerClusterState(Packet):
"""
Answer state of the cluster Answer state of the cluster
""" """
_header_format = '!H'
def _encode(self, state):
return pack(self._header_format, state)
def _decode(self, body): _answer = PStruct('answer_cluster_state',
(state, ) = unpack(self._header_format, body) PEnum('state', ClusterStates),
state = _decodeClusterState(state) )
return (state, )
class NotifyLastOID(Packet): class NotifyLastOID(Packet):
""" """
Notify last OID generated Notify last OID generated
""" """
def _decode(self, body): _fmt = PStruct('notify_last_oid',
(loid, ) = unpack('8s', body) POID('last_oid'),
return (loid, ) )
class AskObjectUndoSerial(Packet): class ObjectUndoSerial(Packet):
""" """
Ask storage the serial where object data is when undoing given transaction, Ask storage the serial where object data is when undoing given transaction,
for a list of OIDs. for a list of OIDs.
C -> S C -> S
"""
_header_format = '!8s8s8sL'
def _encode(self, tid, ltid, undone_tid, oid_list):
body = StringIO()
write = body.write
write(pack(self._header_format, tid, ltid, undone_tid, len(oid_list)))
for oid in oid_list:
write(oid)
return body.getvalue()
def _decode(self, body):
body = StringIO(body)
read = body.read
tid, ltid, undone_tid, oid_list_len = unpack(self._header_format,
read(self._header_len))
oid_list = [read(8) for _ in xrange(oid_list_len)]
return tid, ltid, undone_tid, oid_list
class AnswerObjectUndoSerial(Packet):
"""
Answer serials at which object data is when undoing a given transaction. Answer serials at which object data is when undoing a given transaction.
object_tid_dict has the following format: object_tid_dict has the following format:
key: oid key: oid
...@@ -1569,207 +1145,154 @@ class AnswerObjectUndoSerial(Packet): ...@@ -1569,207 +1145,154 @@ class AnswerObjectUndoSerial(Packet):
If current_serial's data is current on storage. If current_serial's data is current on storage.
S -> C S -> C
""" """
_header_format = '!L' _fmt = PStruct('ask_undo_transaction',
_list_entry_format = '!8s8s8sB' PTID('tid'),
_list_entry_len = calcsize(_list_entry_format) PTID('ltid'),
PTID('undone_tid'),
def _encode(self, object_tid_dict): PFOidList,
body = StringIO() )
write = body.write
write(pack(self._header_format, len(object_tid_dict)))
list_entry_format = self._list_entry_format
for oid, (current_serial, undo_serial, is_current) in \
object_tid_dict.iteritems():
if undo_serial is None:
undo_serial = ZERO_TID
write(pack(list_entry_format, oid, current_serial, undo_serial,
is_current))
return body.getvalue()
def _decode(self, body):
body = StringIO(body)
read = body.read
object_tid_dict = {}
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
object_tid_len = unpack(self._header_format, read(self._header_len))[0]
for _ in xrange(object_tid_len):
oid, current_serial, undo_serial, is_current = unpack(
list_entry_format, read(list_entry_len))
if undo_serial == ZERO_TID:
undo_serial = None
object_tid_dict[oid] = (current_serial, undo_serial,
bool(is_current))
return (object_tid_dict, )
class AskHasLock(Packet):
"""
Ask a storage is oid is locked by another transaction.
C -> S
"""
def _encode(self, tid, oid):
return _encodeTID(tid) + _encodeTID(oid)
def _decode(self, body): _answer = PStruct('answer_undo_transaction',
return (_decodeTID(body[:8]), _decodeTID(body[8:])) PDict('object_tid_dict',
POID('oid'),
PStruct('object_tid_value',
PTID('current_serial'),
PTID('undo_serial'),
PBoolean('is_current'),
),
),
)
class AnswerHasLock(Packet): class HasLock(Packet):
""" """
Ask a storage is oid is locked by another transaction.
C -> S
Answer whether a transaction holds the write lock for requested object. Answer whether a transaction holds the write lock for requested object.
""" """
_header_format = '!8sH' _fmt = PStruct('has_load_lock',
PTID('tid'),
def _encode(self, oid, state): POID('oid'),
return pack(self._header_format, oid, state) )
def _decode(self, body): _answer = PStruct('answer_has_lock',
oid, state = unpack(self._header_format, body) POID('oid'),
return (oid, _decodeLockState(state)) PEnum('lock_state', LockState),
)
class AskCheckCurrentSerial(Packet): class CheckCurrentSerial(Packet):
""" """
Verifies if given serial is current for object oid in the database, and Verifies if given serial is current for object oid in the database, and
take a write lock on it (so that this state is not altered until take a write lock on it (so that this state is not altered until
transaction ends). transaction ends).
"""
_header_format = '!8s8s8s'
def _encode(self, tid, serial, oid):
return tid + serial + oid
def _decode(self, body):
return unpack(self._header_format, body)
class AnswerCheckCurrentSerial(AnswerStoreObject):
"""
Answer to AskCheckCurrentSerial. Answer to AskCheckCurrentSerial.
Same structure as AnswerStoreObject, to handle the same way, except there Same structure as AnswerStoreObject, to handle the same way, except there
is nothing to invalidate in any client's cache. is nothing to invalidate in any client's cache.
""" """
pass _fmt = PStruct('ask_check_current_serial',
PTID('tid'),
PTID('serial'),
POID('oid'),
)
_answer = PStruct('answer_store_object',
PBoolean('conflicting'),
POID('oid'),
PTID('serial'),
)
class AskBarrier(Packet): class Barrier(Packet):
""" """
Initates a "network barrier", allowing the node sending this packet to know Initates a "network barrier", allowing the node sending this packet to know
when all packets sent previously on the same connection have been handled when all packets sent previously on the same connection have been handled
by its peer. by its peer.
""" """
pass
class AnswerBarrier(Packet): _answer = PFEmpty
pass
class AskPack(Packet): class Pack(Packet):
""" """
Request a pack at given TID. Request a pack at given TID.
C -> M C -> M
M -> S M -> S
"""
def _encode(self, tid):
return _encodeTID(tid)
def _decode(self, body):
return (_decodeTID(body), )
class AnswerPack(Packet):
"""
Inform that packing it over. Inform that packing it over.
S -> M S -> M
M -> C M -> C
""" """
_header_format = '!H' _fmt = PStruct('ask_pack',
PTID('tid'),
def _encode(self, status): )
return pack(self._header_format, int(status))
def _decode(self, body): _answer = PStruct('answer_pack',
return (bool(unpack(self._header_format, body)[0]), ) PBoolean('status'),
)
class AskCheckTIDRange(Packet): class CheckTIDRange(Packet):
""" """
Ask some stats about a range of transactions. Ask some stats about a range of transactions.
Used to know if there are differences between a replicating node and Used to know if there are differences between a replicating node and
reference node. reference node.
S -> S S -> S
"""
_header_format = '!8s8sLL'
def _encode(self, min_tid, max_tid, length, partition):
return pack(self._header_format, min_tid, max_tid, length, partition)
def _decode(self, body):
# min_tid, max_tid, length, partition
return unpack(self._header_format, body)
class AnswerCheckTIDRange(Packet):
"""
Stats about a range of transactions. Stats about a range of transactions.
Used to know if there are differences between a replicating node and Used to know if there are differences between a replicating node and
reference node. reference node.
S -> S S -> S
""" """
_header_format = '!8sLLQ8s' _fmt = PStruct('ask_check_tid_range',
def _encode(self, min_tid, length, count, tid_checksum, max_tid): PTID('min_tid'),
return pack(self._header_format, min_tid, length, count, tid_checksum, PTID('max_tid'),
max_tid) PNumber('length'),
PNumber('partition'),
)
def _decode(self, body): _answer = PStruct('answer_check_tid_range',
# min_tid, length, partition, count, tid_checksum, max_tid PTID('min_tid'),
return unpack(self._header_format, body) PNumber('length'),
PNumber('count'),
PChecksum('checksum'),
PTID('max_tid'),
)
class AskCheckSerialRange(Packet): class CheckSerialRange(Packet):
""" """
Ask some stats about a range of object history. Ask some stats about a range of object history.
Used to know if there are differences between a replicating node and Used to know if there are differences between a replicating node and
reference node. reference node.
S -> S S -> S
"""
_header_format = '!8s8s8sLL'
def _encode(self, min_oid, min_serial, max_tid, length, partition):
return pack(self._header_format, min_oid, min_serial, max_tid, length,
partition)
def _decode(self, body):
# min_oid, min_serial, max_tid, length, partition
return unpack(self._header_format, body)
class AnswerCheckSerialRange(Packet):
"""
Stats about a range of object history. Stats about a range of object history.
Used to know if there are differences between a replicating node and Used to know if there are differences between a replicating node and
reference node. reference node.
S -> S S -> S
""" """
_header_format = '!8s8sLLQ8sQ8s' _fmt = PStruct('ask_check_serial_range',
POID('min_oid'),
def _encode(self, min_oid, min_serial, length, count, oid_checksum, PTID('min_serial'),
max_oid, serial_checksum, max_serial): PTID('max_tid'),
return pack(self._header_format, min_oid, min_serial, length, count, PNumber('length'),
oid_checksum, max_oid, serial_checksum, max_serial) PNumber('partition'),
)
def _decode(self, body): _answer = PStruct('answer_check_serial_range',
# min_oid, min_serial, length, count, oid_checksum, max_oid, POID('min_oid'),
# serial_checksum, max_serial PTID('min_serial'),
return unpack(self._header_format, body) PNumber('length'),
PNumber('count'),
PChecksum('oid_checksum'),
POID('max_oid'),
PChecksum('serial_checksum'),
PTID('max_serial'),
)
class AskLastTransaction(Packet): class LastTransaction(Packet):
""" """
Ask last committed TID. Ask last committed TID.
C -> M C -> M
"""
pass
class AnswerLastTransaction(Packet):
"""
Answer last committed TID. Answer last committed TID.
M -> C M -> C
""" """
def _encode(self, tid):
return tid
def _decode(self, body): _answer = PStruct('answer_last_transaction',
return (body, ) PTID('tid'),
)
class NotifyReady(Packet): class NotifyReady(Packet):
""" """
...@@ -1778,38 +1301,12 @@ class NotifyReady(Packet): ...@@ -1778,38 +1301,12 @@ class NotifyReady(Packet):
""" """
pass pass
class Error(Packet):
"""
Error is a special type of message, because this can be sent against
any other message, even if such a message does not expect a reply
usually. Any -> Any.
"""
_header_format = '!H'
def _encode(self, code, message):
return pack(self._header_format, code) + _encodeString(message)
def _decode(self, body):
offset = self._header_len
(code, ) = unpack(self._header_format, body[:offset])
code = _decodeErrorCode(code)
(message, _) = _decodeString(body, 'message', offset=offset)
return (code, message)
def initMessage(klass):
if klass._header_format is not None:
klass._header_len = calcsize(klass._header_format)
StaticRegistry = {} StaticRegistry = {}
def register(code, request, answer=None, ignore_when_closed=None): def register(code, request, answer=None, ignore_when_closed=None):
""" Register a packet in the packet registry """ """ Register a packet in the packet registry """
# register the request # register the request
# assert code & RESPONSE_MASK == 0
assert code not in StaticRegistry, "Duplicate request packet code" assert code not in StaticRegistry, "Duplicate request packet code"
initMessage(request)
request._code = code request._code = code
request._answer = answer
StaticRegistry[code] = request StaticRegistry[code] = request
if ignore_when_closed is None: if ignore_when_closed is None:
# By default, on a closed connection: # By default, on a closed connection:
...@@ -1818,17 +1315,21 @@ def register(code, request, answer=None, ignore_when_closed=None): ...@@ -1818,17 +1315,21 @@ def register(code, request, answer=None, ignore_when_closed=None):
# - nofitication: keep # - nofitication: keep
ignore_when_closed = answer is not None ignore_when_closed = answer is not None
request._ignore_when_closed = ignore_when_closed request._ignore_when_closed = ignore_when_closed
if answer not in (None, Error): if request._answer in (Error, None):
initMessage(answer) return request
# build a class for the answer
answer = ClassType('Answer%s' % (request.__name__, ), (Packet, ), {})
answer._fmt = request._answer
# compute the answer code # compute the answer code
code = code | RESPONSE_MASK code = code | RESPONSE_MASK
answer._request = request answer._request = request
assert answer._code is None, "Answer of %s is already used" % (request, )
answer._code = code answer._code = code
request._answer = answer
# and register the answer packet # and register the answer packet
assert code not in StaticRegistry, "Duplicate response packet code" assert code not in StaticRegistry, "Duplicate response packet code"
StaticRegistry[code] = answer StaticRegistry[code] = answer
return (request, answer) return (request, answer)
return request
class ParserState(object): class ParserState(object):
""" """
...@@ -1850,7 +1351,6 @@ class PacketRegistry(dict): ...@@ -1850,7 +1351,6 @@ class PacketRegistry(dict):
""" """
Packet registry that check packet code unicity and provide an index Packet registry that check packet code unicity and provide an index
""" """
def __init__(self): def __init__(self):
dict.__init__(self) dict.__init__(self)
# load packet classes # load packet classes
...@@ -1859,10 +1359,10 @@ class PacketRegistry(dict): ...@@ -1859,10 +1359,10 @@ class PacketRegistry(dict):
def parse(self, buf, state_container): def parse(self, buf, state_container):
state = state_container.get() state = state_container.get()
if state is None: if state is None:
header = buf.read(PACKET_HEADER_SIZE) header = buf.read(PACKET_HEADER_FORMAT.size)
if header is None: if header is None:
return None return None
msg_id, msg_type, msg_len = unpack(PACKET_HEADER_FORMAT, header) msg_id, msg_type, msg_len = PACKET_HEADER_FORMAT.unpack(header)
try: try:
packet_klass = self[msg_type] packet_klass = self[msg_type]
except KeyError: except KeyError:
...@@ -1871,7 +1371,7 @@ class PacketRegistry(dict): ...@@ -1871,7 +1371,7 @@ class PacketRegistry(dict):
raise PacketMalformedError('message too big (%d)' % msg_len) raise PacketMalformedError('message too big (%d)' % msg_len)
if msg_len < MIN_PACKET_SIZE: if msg_len < MIN_PACKET_SIZE:
raise PacketMalformedError('message too small (%d)' % msg_len) raise PacketMalformedError('message too small (%d)' % msg_len)
msg_len -= PACKET_HEADER_SIZE msg_len -= PACKET_HEADER_FORMAT.size
else: else:
msg_id, packet_klass, msg_len = state msg_id, packet_klass, msg_len = state
data = buf.read(msg_len) data = buf.read(msg_len)
...@@ -1886,180 +1386,113 @@ class PacketRegistry(dict): ...@@ -1886,180 +1386,113 @@ class PacketRegistry(dict):
packet.setContent(msg_id, data) packet.setContent(msg_id, data)
return packet return packet
# packets registration # notifications
Error = register(0x8000, Error) Error = register(
Notify = register(0x0032, Notify) 0x8000, Error)
Ping, Pong = register( Ping, Pong = register(
0x0001, 0x0001, Ping)
Ping, Notify = register(
Pong) 0x0002, Notify)
RequestIdentification, AcceptIdentification = register( RequestIdentification, AcceptIdentification = register(
0x0002, 0x0003, RequestIdentification)
RequestIdentification,
AcceptIdentification)
AskPrimary, AnswerPrimary = register( AskPrimary, AnswerPrimary = register(
0x0003, 0x0004, PrimaryMaster)
AskPrimary, AnnouncePrimary = register(
AnswerPrimary) 0x0005, AnnouncePrimary)
AnnouncePrimary = register(0x0004, AnnouncePrimary) ReelectPrimary = register(
ReelectPrimary = register(0x0005, ReelectPrimary) 0x0006, ReelectPrimary)
NotifyNodeInformation = register(0x0006, NotifyNodeInformation) NotifyNodeInformation = register(
0x0007, NotifyNodeInformation)
AskLastIDs, AnswerLastIDs = register( AskLastIDs, AnswerLastIDs = register(
0x0007, 0x0008, LastIDs)
AskLastIDs,
AnswerLastIDs)
AskPartitionTable, AnswerPartitionTable = register( AskPartitionTable, AnswerPartitionTable = register(
0x0008, 0x0009, PartitionTable)
AskPartitionTable, SendPartitionTable = register(
AnswerPartitionTable) 0x000A, NotifyPartitionTable)
SendPartitionTable = register(0x0009, SendPartitionTable) NotifyPartitionChanges = register(
NotifyPartitionChanges = register(0x000A, NotifyPartitionChanges) 0x000B, PartitionChanges)
StartOperation = register(0x000B, StartOperation) StartOperation = register(
StopOperation = register(0x000C, StopOperation) 0x000C, StartOperation)
StopOperation = register(
0x000D, StopOperation)
AskUnfinishedTransactions, AnswerUnfinishedTransactions = register( AskUnfinishedTransactions, AnswerUnfinishedTransactions = register(
0x000D, 0x000E, UnfinishedTransactions)
AskUnfinishedTransactions,
AnswerUnfinishedTransactions)
AskObjectPresent, AnswerObjectPresent = register( AskObjectPresent, AnswerObjectPresent = register(
0x000f, 0x000F, ObjectPresent)
AskObjectPresent, DeleteTransaction = register(
AnswerObjectPresent) 0x0010, DeleteTransaction)
DeleteTransaction = register(0x0010, DeleteTransaction) CommitTransaction = register(
CommitTransaction = register(0x0011, CommitTransaction) 0x0011, CommitTransaction)
AskBeginTransaction, AnswerBeginTransaction = register( AskBeginTransaction, AnswerBeginTransaction = register(
0x0012, 0x0012, BeginTransaction)
AskBeginTransaction,
AnswerBeginTransaction)
AskFinishTransaction, AnswerTransactionFinished = register( AskFinishTransaction, AnswerTransactionFinished = register(
0x0013, 0x0013, FinishTransaction, ignore_when_closed=False)
AskFinishTransaction,
AnswerTransactionFinished,
ignore_when_closed=False,
)
AskLockInformation, AnswerInformationLocked = register( AskLockInformation, AnswerInformationLocked = register(
0x0014, 0x0014, LockInformation, ignore_when_closed=False)
AskLockInformation, InvalidateObjects = register(
AnswerInformationLocked, 0x0015, InvalidateObjects)
) NotifyUnlockInformation = register(
InvalidateObjects = register(0x0015, InvalidateObjects) 0x0016, UnlockInformation)
NotifyUnlockInformation = register(0x0016, NotifyUnlockInformation)
AskNewOIDs, AnswerNewOIDs = register( AskNewOIDs, AnswerNewOIDs = register(
0x0017, 0x0017, GenerateOIDs)
AskNewOIDs,
AnswerNewOIDs)
AskStoreObject, AnswerStoreObject = register( AskStoreObject, AnswerStoreObject = register(
0x0018, 0x0018, StoreObject)
AskStoreObject, AbortTransaction = register(
AnswerStoreObject) 0x0019, AbortTransaction)
AbortTransaction = register(0x0019, AbortTransaction)
AskStoreTransaction, AnswerStoreTransaction = register( AskStoreTransaction, AnswerStoreTransaction = register(
0x001A, 0x001A, StoreTransaction)
AskStoreTransaction,
AnswerStoreTransaction)
AskObject, AnswerObject = register( AskObject, AnswerObject = register(
0x001B, 0x001B, GetObject)
AskObject,
AnswerObject)
AskTIDs, AnswerTIDs = register( AskTIDs, AnswerTIDs = register(
0x001C, 0x001C, TIDList)
AskTIDs,
AnswerTIDs)
AskTransactionInformation, AnswerTransactionInformation = register( AskTransactionInformation, AnswerTransactionInformation = register(
0x001E, 0x001D, TransactionInformation)
AskTransactionInformation,
AnswerTransactionInformation)
AskObjectHistory, AnswerObjectHistory = register( AskObjectHistory, AnswerObjectHistory = register(
0x001F, 0x001E, ObjectHistory)
AskObjectHistory,
AnswerObjectHistory)
AskPartitionList, AnswerPartitionList = register( AskPartitionList, AnswerPartitionList = register(
0x0021, 0x001F, PartitionList)
AskPartitionList,
AnswerPartitionList)
AskNodeList, AnswerNodeList = register( AskNodeList, AnswerNodeList = register(
0x0022, 0x0020, NodeList)
AskNodeList,
AnswerNodeList)
SetNodeState = register( SetNodeState = register(
0x0023, 0x0021, SetNodeState, ignore_when_closed=False)
SetNodeState,
Error,
ignore_when_closed=False,
)
AddPendingNodes = register( AddPendingNodes = register(
0x0024, 0x0022, AddPendingNodes, ignore_when_closed=False)
AddPendingNodes,
Error,
ignore_when_closed=False,
)
AskNodeInformation, AnswerNodeInformation = register( AskNodeInformation, AnswerNodeInformation = register(
0x0025, 0x0023, NodeInformation)
AskNodeInformation,
AnswerNodeInformation)
SetClusterState = register( SetClusterState = register(
0x0026, 0x0024, SetClusterState, ignore_when_closed=False)
SetClusterState, NotifyClusterInformation = register(
Error, 0x0025, ClusterInformation)
ignore_when_closed=False,
)
NotifyClusterInformation = register(0x0027, NotifyClusterInformation)
AskClusterState, AnswerClusterState = register( AskClusterState, AnswerClusterState = register(
0x0028, 0x0026, ClusterState)
AskClusterState, NotifyLastOID = register(
AnswerClusterState) 0x0027, NotifyLastOID)
NotifyLastOID = register(0x0030, NotifyLastOID) NotifyReplicationDone = register(
NotifyReplicationDone = register(0x0031, NotifyReplicationDone) 0x0028, ReplicationDone)
AskObjectUndoSerial, AnswerObjectUndoSerial = register( AskObjectUndoSerial, AnswerObjectUndoSerial = register(
0x0033, 0x0029, ObjectUndoSerial)
AskObjectUndoSerial,
AnswerObjectUndoSerial)
AskHasLock, AnswerHasLock = register( AskHasLock, AnswerHasLock = register(
0x0034, 0x002A, HasLock)
AskHasLock,
AnswerHasLock)
AskTIDsFrom, AnswerTIDsFrom = register( AskTIDsFrom, AnswerTIDsFrom = register(
0x0035, 0x002B, TIDListFrom)
AskTIDsFrom,
AnswerTIDsFrom)
AskObjectHistoryFrom, AnswerObjectHistoryFrom = register( AskObjectHistoryFrom, AnswerObjectHistoryFrom = register(
0x0036, 0x002C, ObjectHistoryFrom)
AskObjectHistoryFrom,
AnswerObjectHistoryFrom)
AskBarrier, AnswerBarrier = register( AskBarrier, AnswerBarrier = register(
0x0037, 0x002D, Barrier)
AskBarrier,
AnswerBarrier)
AskPack, AnswerPack = register( AskPack, AnswerPack = register(
0x0038, 0x002E, Pack, ignore_when_closed=False)
AskPack,
AnswerPack,
ignore_when_closed=False,
)
AskCheckTIDRange, AnswerCheckTIDRange = register( AskCheckTIDRange, AnswerCheckTIDRange = register(
0x0039, 0x002F, CheckTIDRange)
AskCheckTIDRange,
AnswerCheckTIDRange,
)
AskCheckSerialRange, AnswerCheckSerialRange = register( AskCheckSerialRange, AnswerCheckSerialRange = register(
0x003A, 0x0030, CheckSerialRange)
AskCheckSerialRange, NotifyReady = register(
AnswerCheckSerialRange, 0x0031, NotifyReady)
)
NotifyReady = register(0x003B, NotifyReady)
AskLastTransaction, AnswerLastTransaction = register( AskLastTransaction, AnswerLastTransaction = register(
0x003C, 0x0032, LastTransaction)
AskLastTransaction,
AnswerLastTransaction,
)
AskCheckCurrentSerial, AnswerCheckCurrentSerial = register( AskCheckCurrentSerial, AnswerCheckCurrentSerial = register(
0x003D, 0x0033, CheckCurrentSerial)
AskCheckCurrentSerial,
AnswerCheckCurrentSerial,
)
NotifyTransactionFinished = register( NotifyTransactionFinished = register(
0x003E, 0x003E, NotifyTransactionFinished)
NotifyTransactionFinished,
)
# build a "singleton" # build a "singleton"
Packets = PacketRegistry() Packets = PacketRegistry()
...@@ -2073,7 +1506,6 @@ class ErrorRegistry(dict): ...@@ -2073,7 +1506,6 @@ class ErrorRegistry(dict):
""" """
Error packet packet registry Error packet packet registry
""" """
def __init__(self): def __init__(self):
dict.__init__(self) dict.__init__(self)
......
...@@ -22,6 +22,23 @@ from zlib import adler32 ...@@ -22,6 +22,23 @@ from zlib import adler32
from Queue import deque from Queue import deque
from struct import pack, unpack from struct import pack, unpack
try:
from struct import Struct
except ImportError:
import struct
# support for python 2.4
class Struct(object):
def __init__(self, fmt):
self._fmt = fmt
self.size = struct.calcsize(fmt)
def pack(self, *args):
return struct.pack(self._fmt, *args)
def unpack(self, *args):
return struct.unpack(self._fmt, *args)
def u64(s): def u64(s):
return unpack('!Q', s)[0] return unpack('!Q', s)[0]
......
...@@ -70,9 +70,9 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -70,9 +70,9 @@ class ProtocolTests(NeoUnitTestBase):
def test_11_RequestIdentification(self): def test_11_RequestIdentification(self):
uuid = self.getNewUUID() uuid = self.getNewUUID()
p = Packets.RequestIdentification(NodeTypes.CLIENT, uuid, p = Packets.RequestIdentification(NodeTypes.CLIENT,
("127.0.0.1", 9080), "unittest") uuid, ("127.0.0.1", 9080), "unittest")
node, p_uuid, (ip, port), name = p.decode() (plow, phigh), node, p_uuid, (ip, port), name = p.decode()
self.assertEqual(node, NodeTypes.CLIENT) self.assertEqual(node, NodeTypes.CLIENT)
self.assertEqual(p_uuid, uuid) self.assertEqual(p_uuid, uuid)
self.assertEqual(ip, "127.0.0.1") self.assertEqual(ip, "127.0.0.1")
...@@ -148,9 +148,11 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -148,9 +148,11 @@ class ProtocolTests(NeoUnitTestBase):
uuid1 = self.getNewUUID() uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID() uuid2 = self.getNewUUID()
uuid3 = self.getNewUUID() uuid3 = self.getNewUUID()
cell_list = [(0, ((uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE))), cell_list = [
(43, ((uuid2, CellStates.OUT_OF_DATE),(uuid3, CellStates.DISCARDED))), (0, [(uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE)]),
(124, ((uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)))] (43, [(uuid2, CellStates.OUT_OF_DATE), (uuid3, CellStates.DISCARDED)]),
(124, [(uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)]),
]
p = Packets.AnswerPartitionTable(ptid, cell_list) p = Packets.AnswerPartitionTable(ptid, cell_list)
pptid, p_cell_list = p.decode() pptid, p_cell_list = p.decode()
self.assertEqual(pptid, ptid) self.assertEqual(pptid, ptid)
...@@ -161,9 +163,11 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -161,9 +163,11 @@ class ProtocolTests(NeoUnitTestBase):
uuid1 = self.getNewUUID() uuid1 = self.getNewUUID()
uuid2 = self.getNewUUID() uuid2 = self.getNewUUID()
uuid3 = self.getNewUUID() uuid3 = self.getNewUUID()
cell_list = [(0, ((uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE))), cell_list = [
(43, ((uuid2, CellStates.OUT_OF_DATE),(uuid3, CellStates.DISCARDED))), (0, [(uuid1, CellStates.UP_TO_DATE), (uuid2, CellStates.OUT_OF_DATE)]),
(124, ((uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)))] (43, [(uuid2, CellStates.OUT_OF_DATE), (uuid3, CellStates.DISCARDED)]),
(124, [(uuid1, CellStates.DISCARDED), (uuid3, CellStates.UP_TO_DATE)]),
]
p = Packets.AnswerPartitionTable(ptid, cell_list) p = Packets.AnswerPartitionTable(ptid, cell_list)
pptid, p_cell_list = p.decode() pptid, p_cell_list = p.decode()
self.assertEqual(pptid, ptid) self.assertEqual(pptid, ptid)
...@@ -176,8 +180,7 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -176,8 +180,7 @@ class ProtocolTests(NeoUnitTestBase):
cell_list = [(0, uuid1, CellStates.UP_TO_DATE), cell_list = [(0, uuid1, CellStates.UP_TO_DATE),
(43, uuid2, CellStates.OUT_OF_DATE), (43, uuid2, CellStates.OUT_OF_DATE),
(124, uuid1, CellStates.DISCARDED)] (124, uuid1, CellStates.DISCARDED)]
p = Packets.NotifyPartitionChanges(ptid, p = Packets.NotifyPartitionChanges(ptid, cell_list)
cell_list)
pptid, p_cell_list = p.decode() pptid, p_cell_list = p.decode()
self.assertEqual(pptid, ptid) self.assertEqual(pptid, ptid)
self.assertEqual(p_cell_list, cell_list) self.assertEqual(p_cell_list, cell_list)
...@@ -235,7 +238,6 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -235,7 +238,6 @@ class ProtocolTests(NeoUnitTestBase):
ptid = p.decode()[0] ptid = p.decode()[0]
self.assertEqual(ptid, tid) self.assertEqual(ptid, tid)
def test_32_askBeginTransaction(self): def test_32_askBeginTransaction(self):
tid = self.getNextTID() tid = self.getNextTID()
p = Packets.AskBeginTransaction(tid) p = Packets.AskBeginTransaction(tid)
...@@ -370,11 +372,11 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -370,11 +372,11 @@ class ProtocolTests(NeoUnitTestBase):
def test_46_answerStoreObject(self): def test_46_answerStoreObject(self):
oid = self.getNextTID() oid = self.getNextTID()
serial = self.getNextTID() serial = self.getNextTID()
p = Packets.AnswerStoreObject(1, oid, serial) p = Packets.AnswerStoreObject(True, oid, serial)
conflicting, poid, pserial = p.decode() conflicting, poid, pserial = p.decode()
self.assertEqual(oid, poid) self.assertEqual(oid, poid)
self.assertEqual(serial, pserial) self.assertEqual(serial, pserial)
self.assertEqual(conflicting, 1) self.assertTrue(conflicting)
def test_47_askObject(self): def test_47_askObject(self):
oid = self.getNextTID() oid = self.getNextTID()
...@@ -532,7 +534,7 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -532,7 +534,7 @@ class ProtocolTests(NeoUnitTestBase):
def test_AddPendingNodes(self): def test_AddPendingNodes(self):
uuid1, uuid2 = self.getNewUUID(), self.getNewUUID() uuid1, uuid2 = self.getNewUUID(), self.getNewUUID()
p = Packets.AddPendingNodes([uuid1, uuid2]) p = Packets.AddPendingNodes((uuid1, uuid2))
self.assertEqual(p.decode(), ([uuid1, uuid2], )) self.assertEqual(p.decode(), ([uuid1, uuid2], ))
def test_SetNodeState(self): def test_SetNodeState(self):
...@@ -551,7 +553,7 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -551,7 +553,7 @@ class ProtocolTests(NeoUnitTestBase):
self.getNewUUID(), NodeStates.DOWN) self.getNewUUID(), NodeStates.DOWN)
node2 = (NodeTypes.MASTER, ('127.0.0.1', 2000), node2 = (NodeTypes.MASTER, ('127.0.0.1', 2000),
self.getNewUUID(), NodeStates.RUNNING) self.getNewUUID(), NodeStates.RUNNING)
p = Packets.AnswerNodeList([node1, node2]) p = Packets.AnswerNodeList((node1, node2))
self.assertEqual(p.decode(), ([node1, node2], )) self.assertEqual(p.decode(), ([node1, node2], ))
def test_AskPartitionList(self): def test_AskPartitionList(self):
...@@ -564,14 +566,14 @@ class ProtocolTests(NeoUnitTestBase): ...@@ -564,14 +566,14 @@ class ProtocolTests(NeoUnitTestBase):
def test_AnswerPartitionList(self): def test_AnswerPartitionList(self):
ptid = self.getPTID(1) ptid = self.getPTID(1)
row_list = [ row_list = [
(0, ( (0, [
(self.getNewUUID(), CellStates.UP_TO_DATE), (self.getNewUUID(), CellStates.UP_TO_DATE),
(self.getNewUUID(), CellStates.OUT_OF_DATE), (self.getNewUUID(), CellStates.OUT_OF_DATE),
)), ]),
(1, ( (1, [
(self.getNewUUID(), CellStates.FEEDING), (self.getNewUUID(), CellStates.FEEDING),
(self.getNewUUID(), CellStates.DISCARDED), (self.getNewUUID(), CellStates.DISCARDED),
)), ]),
] ]
p = Packets.AnswerPartitionList(ptid, row_list) p = Packets.AnswerPartitionList(ptid, row_list)
self.assertEqual(p.decode(), (ptid, row_list)) self.assertEqual(p.decode(), (ptid, row_list))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment