Commit 1f2cea9f authored by Aurel's avatar Aurel

define more protocol


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@41 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 34ff1ddb
......@@ -105,7 +105,8 @@ class EventHandler(object):
self.handleUnexpectedPacket(conn, packet)
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port):
uuid, ip_address, port,
num_partitions, num_replicas):
self.handleUnexpectedPacket(conn, packet)
def handlePing(self, conn, packet):
......@@ -185,6 +186,12 @@ class EventHandler(object):
def handleAnswerNewTID(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAskNewOIDList(self, conn, packet):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerNewOIDList(self, conn, packet, oid_list):
self.handleUnexpectedPacket(conn, packet)
def handleFinishTransaction(self, conn, packet, oid_list, tid):
self.handleUnexpectedPacket(conn, packet)
......@@ -203,6 +210,29 @@ class EventHandler(object):
def handleUnlockInformation(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAskStoreObject(self, conn, packet, msg_id, oid, serial,
compressed, data, crc, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerStoreObject(self, conn, packet, status, oid):
self.handleUnexpectedPacket(conn, packet)
def handleAbortTransaction(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
ext, oid_list):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerStoreTransaction(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAskObjectByOID(self, conn, packet, oid, serial):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerObjectByOID(self, conn, packet, oid, serial,
compressed, crc, data):
self.handleUnexpectedPacket(conn, packet)
# Error packet handlers.
......
......@@ -124,6 +124,33 @@ INVALIDATE_OBJECTS = 0x0015
# Unlock information on a transaction. PM -> S.
UNLOCK_INFORMATION = 0x0016
# Ask a new object ID list. C -> PM.
ASK_NEW_OID_LIST = 0x0017
# Answer a new object ID list. PM -> C.
ANSWER_NEW_OID_LIST = 0x8017
# Ask to store an object. C -> S.
ASK_STORE_OBJECT = 0x0018
# Answer if object has been stored. S -> C.
ANSWER_STORE_OBJECT = 0x8018
# Abort a transaction. C -> S
ABORT_TRANSACTION = 0x0019
# Ask to store a transaction. C -> S.
ASK_STORE_TRANSACTION = 0x001a
# Answer if transaction has been stored. S -> C.
ANSWER_STORE_TRANSACTION = 0x801a
# Ask a stored object by its OID and serial if given. C -> S.
ASK_OBJECT_BY_OID = 0x001b
# Answer the object reclamed. S -> C.
ANSWER_OBJECT_BY_OID = 0x801b
# Error codes.
NOT_READY_CODE = 1
......@@ -228,7 +255,7 @@ class Packet(object):
return self.error(msg_id, INTERNAL_ERROR_CODE, 'internal error: ' + error_message)
def notReady(self, msg_id, error_message):
return self.error(msg_id, NOT_READY, 'not ready: ' + error_message)
return self.error(msg_id, NOT_READY_CODE, 'not ready: ' + error_message)
def brokenNodeDisallowedError(self, msg_id, error_message):
return self.error(msg_id, BROKEN_NODE_DISALLOWED_ERROR,
......@@ -253,10 +280,12 @@ class Packet(object):
node_type, uuid, inet_aton(ip_address), port, len(name)) + name
return self
def acceptNodeIdentification(self, msg_id, node_type, uuid, ip_address, port):
def acceptNodeIdentification(self, msg_id, node_type, uuid, ip_address,
port, num_partitions, num_replicas):
self._id = msg_id
self._type = ACCEPT_NODE_IDENTIFICATION
self._body = pack('!H16s4sH', node_type, uuid, inet_aton(ip_address), port)
self._body = pack('!H16s4sHHH', node_type, uuid, inet_aton(ip_address), \
port, num_partitions, num_replicas)
return self
def askPrimaryMaster(self, msg_id):
......@@ -424,6 +453,20 @@ class Packet(object):
self._body = tid
return self
def askNewOIDList(self, msg_id, num_oid):
self._id = msg_id
self._type = ASK_NEW_OID_LIST
self._body = num_oid
return self
def answerNewOIDList(self, msg_id, num_oid, oid_list):
self._id = msg_id
self._type = ANSWER_NEW_OID_LIST
body = [pack('!H', num_oid)]
body.extend(oid_list)
self._body = ''.join(body)
return self
def finishTransaction(self, msg_id, oid_list, tid):
self._id = msg_id
self._type = FINISH_TRANSACTION
......@@ -464,7 +507,57 @@ class Packet(object):
self._body = tid
return self
def abortTransaction(self, msg_id, tid):
self._id = msg_id
self._type = ABORT_TRANSACTION
self._body = tid
return self
def askStoreTransaction(self, msg_id, tid, user, desc, ext, oid_list):
self._id = msg_id
self._type = ASK_STORE_TRANSACTION
user_len = len(user)
desc_len = len(desc)
ext_len = len(ext)
body = [pack('!8sLLLL%ds%ds%ds' %(user_len, desc_len, ext_len), tid, \
len(oid_list), user_len, desc_len, ext_len, user, desc, ext)]
body.expend(oid_list)
self._body = ''.join(body)
return self
def answerStoreTransaction(self, msg_id, tid):
self._id = msg_id
self._type = ANSWER_STORE_TRANSACTION
self._body = tid
return self
def askStoreObject(self, msg_id, oid, serial, compressed, data, crc, tid):
self._id = msg_id
self._type = ASK_STORE_OBJECT
body = [pack('!8s8s8sHLQ', oid, serial, tid, compressed, crc, len(data))]
body.append(pack('%ds' %(len(data),), data))
self._body = ''.join(body)
return self
def answerStoreObject(self, msg_id, status, oid):
self._id = msg_id
self._type = ANSWER_STORE_OBJECT
self._body = pack('!H8s', status, oid)
return self
def askObjectByOID(self, msg_id, oid, serial):
self._id = msg_id
self._type = ASK_OBJET_BY_OID
self._body = oid, serial
return self
def answerObjectByOID(self, msg_id, oid, serial, compressed, crc, data):
self._id = msg_id
self._type = ANSWER_OBJECT_BY_OID
body = pack('!8s8sQHL%ds' %(len(data),), oid, serial, len(data), \
compressed, crc, data)
return self
# Decoders.
def decode(self):
try:
......@@ -515,13 +608,13 @@ class Packet(object):
def _decodeAcceptNodeIdentification(self):
try:
node_type, uuid, ip_address, port = unpack('!H16s4sH', self._body)
node_type, uuid, ip_address, port, num_partitions, num_replicas = unpack('!H16s4sHHH', self._body)
ip_address = inet_ntoa(ip_address)
except:
raise ProtocolError(self, 'invalid accept node identification')
if node_type not in VALID_NODE_TYPE_LIST:
raise ProtocolError(self, 'invalid node type %d' % node_type)
return node_type, uuid, ip_address, port
return node_type, uuid, ip_address, port, num_partitions, num_replicas
decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
def _decodeAskPrimaryMaster(self):
......@@ -732,6 +825,26 @@ class Packet(object):
return tid
decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID
def _decodeAskNewOIDList(self):
try:
num_oid = unpack('!H', self._body)
except:
raise ProtocolError(self, 'invalid ask new oid list')
return num_oid
decode_table[ASK_NEW_OID_LIST] = _decodeAskNewOIDList
def _decodeAnswerNewOIDList(self):
try:
n = unpack('!H', self._body[:2])
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[2+i*8:12+i*8])
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid new oid list')
return oid_list
decode_table[ANSWER_NEW_OID_LIST] = _decodeAnswerNewOIDList
def _decodeFinishTransaction(self):
try:
tid, n = unpack('!8sL', self._body[:12])
......@@ -788,3 +901,69 @@ class Packet(object):
return tid
decode_table[UNLOCK_INFORMATION] = _decodeUnlockInformation
def _decodeAbortTransaction(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid abort transaction')
return tid
decode_table[ABORT_TRANSACTION] = _decodeAbortTransaction
def _decodeAskStoreObject(self):
try:
oid, serial, tid, compressed, crc, data_len = \
unpack('!8s8s8sHLQ', self._body[:38])
data = unpack('%ds' %(data_len,), self._body[38:])
except:
raise ProtocolError(self, 'invalid ask store object')
return oid, serial, tid, compressed, crc, data
decode_table[ASK_STORE_OBJECT] = _decodeAskStoreObject
def _decodeAnswerStoreObject(self): # XXX maybe to be redefine
try:
status, oid = unpack('!H8s', self._body)
except:
raise ProtocolError(self, 'invalid answer store object')
return status, oid
decode_table[ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject
def _decodeAskStoreTransaction(self):
try:
tid, oid_len, user_len, desc_len, ext_len = unpack('!8sLLLL', self._body[:24])
user = unpack('%ds' %(user_len), self._body[24:24+user_len])
desc = unpack('%ds' %(desc_len), self._body[24+user_len:24+user_len+desc_len])
ext = unpack('%ds' %(ext_len), self._body[24+user_len+desc_len:24+user_len+desc_len+ext_len])
oid_list = []
txn_len = user_len+desc_len+ext_len
for i in xrange(oid_len):
oid = unpack('8s', self._body[24+txn_len+i*8:24+txn_len+8+i*8])
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid ask store transaction')
return tid, user, desc, ext, oid_list
decode_table[ASK_STORE_TRANSACTION] = _decodeAskStoreTransaction
def _decodeAnswerStoreTransaction(self):
try:
tid = unpack('8s', self._body)
except:
raise ProtocolError(self, 'invalid answer store transaction')
return tid
decode_table[ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction
def _decodeAskObjectByOID(self):
try:
oid, serial = unpack('8s8s', self._body)
except:
raise ProtocolError(self, 'invalid ask object by oid')
return oid
decode_table[ASK_OBJECT_BY_OID] = _decodeAskObjectByOID
def _decodeAnswerObjectByOID(self):
try:
oid, serial, data_len, compressed, crc = unpack('!8s8sQHL', self._body[:20])
data = unpack('%ds' %(data_len,), self._body[30:])
except:
raise ProtocolError(self, 'invalid answer object by oid')
return oid, serial, compressed, crc, data
decode_table[ANSWER_OBJECT_BY_OID] = _decodeAnswerObjectByOID
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment