Commit 76a82e15 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Fix inconsistencies and bugs. Still, handlers are not corrected.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@44 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent dfcf454d
...@@ -729,8 +729,5 @@ class Application(object): ...@@ -729,8 +729,5 @@ class Application(object):
def getPartition(self, oid_or_tid): def getPartition(self, oid_or_tid):
return unpack('!Q', oid_or_tid)[0] % self.num_partitions return unpack('!Q', oid_or_tid)[0] % self.num_partitions
def getNewOidList(self, num_oid): def getNewOidList(self, num_oids):
return [self.getNextOid() for i in xrange(n)] return [self.getNextOid() for i in xrange(num_oids)]
...@@ -57,8 +57,8 @@ class MasterEventHandler(EventHandler): ...@@ -57,8 +57,8 @@ class MasterEventHandler(EventHandler):
logging.info('ignoring Ask New TID') logging.info('ignoring Ask New TID')
pass pass
def handleAskNewOIDList(self, conn, packet): def handleAskNewOIDs(self, conn, packet):
logging.info('ignoring Ask New OID List') logging.info('ignoring Ask New OIDs')
pass pass
def handleFinishTransaction(self, conn, packet, oid_list, tid): def handleFinishTransaction(self, conn, packet, oid_list, tid):
......
...@@ -370,7 +370,7 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -370,7 +370,7 @@ class ServiceEventHandler(MasterEventHandler):
tid = app.getNextTID() tid = app.getNextTID()
conn.addPacket(Packet().answerNewTID(packet.getId(), tid)) conn.addPacket(Packet().answerNewTID(packet.getId(), tid))
def handleAskNewOIDList(self, conn, packet, num_oid): def handleAskNewOIDList(self, conn, packet, num_oids):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None: if uuid is None:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -383,8 +383,8 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -383,8 +383,8 @@ class ServiceEventHandler(MasterEventHandler):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
return return
oid = app.getNextOIDList(num_oid) oid_list = app.getNextOIDList(num_oids)
conn.addPacket(Packet().answerNewOIDList(packet.getId(), num_oid, oid_list)) conn.addPacket(Packet().answerNewOIDList(packet.getId(), oid_list))
def handleFinishTransaction(self, conn, packet, oid_list, tid): def handleFinishTransaction(self, conn, packet, oid_list, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
......
...@@ -124,11 +124,11 @@ INVALIDATE_OBJECTS = 0x0015 ...@@ -124,11 +124,11 @@ INVALIDATE_OBJECTS = 0x0015
# Unlock information on a transaction. PM -> S. # Unlock information on a transaction. PM -> S.
UNLOCK_INFORMATION = 0x0016 UNLOCK_INFORMATION = 0x0016
# Ask a new object ID list. C -> PM. # Ask new object IDs. C -> PM.
ASK_NEW_OID_LIST = 0x0017 ASK_NEW_OIDS = 0x0017
# Answer a new object ID list. PM -> C. # Answer new object IDs. PM -> C.
ANSWER_NEW_OID_LIST = 0x8017 ANSWER_NEW_OIDS = 0x8017
# Ask to store an object. C -> S. # Ask to store an object. C -> S.
ASK_STORE_OBJECT = 0x0018 ASK_STORE_OBJECT = 0x0018
...@@ -284,8 +284,9 @@ class Packet(object): ...@@ -284,8 +284,9 @@ class Packet(object):
port, num_partitions, num_replicas): port, num_partitions, num_replicas):
self._id = msg_id self._id = msg_id
self._type = ACCEPT_NODE_IDENTIFICATION self._type = ACCEPT_NODE_IDENTIFICATION
self._body = pack('!H16s4sHHH', node_type, uuid, inet_aton(ip_address), \ self._body = pack('!H16s4sHLL', node_type, uuid,
port, num_partitions, num_replicas) inet_aton(ip_address), port,
num_partitions, num_replicas)
return self return self
def askPrimaryMaster(self, msg_id): def askPrimaryMaster(self, msg_id):
...@@ -453,16 +454,16 @@ class Packet(object): ...@@ -453,16 +454,16 @@ class Packet(object):
self._body = tid self._body = tid
return self return self
def askNewOIDList(self, msg_id, num_oid): def askNewOIDs(self, msg_id, num_oids):
self._id = msg_id self._id = msg_id
self._type = ASK_NEW_OID_LIST self._type = ASK_NEW_OIDS
self._body = num_oid self._body = pack('!H', num_oids)
return self return self
def answerNewOIDList(self, msg_id, num_oid, oid_list): def answerNewOIDList(self, msg_id, oid_list):
self._id = msg_id self._id = msg_id
self._type = ANSWER_NEW_OID_LIST self._type = ANSWER_NEW_OIDS
body = [pack('!H', num_oid)] body = [pack('!H', len(oid_list))]
body.extend(oid_list) body.extend(oid_list)
self._body = ''.join(body) self._body = ''.join(body)
return self return self
...@@ -519,8 +520,10 @@ class Packet(object): ...@@ -519,8 +520,10 @@ class Packet(object):
user_len = len(user) user_len = len(user)
desc_len = len(desc) desc_len = len(desc)
ext_len = len(ext) ext_len = len(ext)
body = [pack('!8sLLLL%ds%ds%ds' %(user_len, desc_len, ext_len), tid, \ body = [pack('!8sLHHH' tid, len(oid_list), user_len, desc_len, ext_len)]
len(oid_list), user_len, desc_len, ext_len, user, desc, ext)] body.append(user)
body.append(desc)
body.append(ext)
body.expend(oid_list) body.expend(oid_list)
self._body = ''.join(body) self._body = ''.join(body)
return self return self
...@@ -531,31 +534,31 @@ class Packet(object): ...@@ -531,31 +534,31 @@ class Packet(object):
self._body = tid self._body = tid
return self return self
def askStoreObject(self, msg_id, oid, serial, compressed, data, crc, tid): def askStoreObject(self, msg_id, oid, serial, compression, checksum, data):
self._id = msg_id self._id = msg_id
self._type = ASK_STORE_OBJECT self._type = ASK_STORE_OBJECT
body = [pack('!8s8s8sHLQ', oid, serial, tid, compressed, crc, len(data))] self._body = pack('!8s8sBLL', oid, serial, compression,
body.append(pack('%ds' %(len(data),), data)) checksum, len(data)) + data
self._body = ''.join(body)
return self return self
def answerStoreObject(self, msg_id, status, oid): def answerStoreObject(self, msg_id, conflicting, oid, serial):
self._id = msg_id self._id = msg_id
self._type = ANSWER_STORE_OBJECT self._type = ANSWER_STORE_OBJECT
self._body = pack('!H8s', status, oid) self._body = pack('!B8s8s', conflicting, oid, serial)
return self return self
def askObjectByOID(self, msg_id, oid, serial): def askObjectByOID(self, msg_id, oid, serial):
self._id = msg_id self._id = msg_id
self._type = ASK_OBJET_BY_OID self._type = ASK_OBJET_BY_OID
self._body = oid, serial self._body = pack('!8s8s', oid, serial)
return self return self
def answerObjectByOID(self, msg_id, oid, serial, compressed, crc, data): def answerObjectByOID(self, msg_id, oid, serial, compression, checksum,
data):
self._id = msg_id self._id = msg_id
self._type = ANSWER_OBJECT_BY_OID self._type = ANSWER_OBJECT_BY_OID
body = pack('!8s8sQHL%ds' %(len(data),), oid, serial, len(data), \ self._body = pack('!8s8sBLL', oid, serial, compression,
compressed, crc, data) checksum, len(data)) + data
return self return self
# Decoders. # Decoders.
...@@ -591,8 +594,8 @@ class Packet(object): ...@@ -591,8 +594,8 @@ class Packet(object):
def _decodeRequestNodeIdentification(self): def _decodeRequestNodeIdentification(self):
try: try:
body = self._body body = self._body
major, minor, node_type, uuid, ip_address, port, size = unpack('!LLH16s4sHL', major, minor, node_type, uuid, ip_address, port, size \
body[:36]) = unpack('!LLH16s4sHL', body[:36])
ip_address = inet_ntoa(ip_address) ip_address = inet_ntoa(ip_address)
name = body[36:] name = body[36:]
except: except:
...@@ -608,7 +611,8 @@ class Packet(object): ...@@ -608,7 +611,8 @@ class Packet(object):
def _decodeAcceptNodeIdentification(self): def _decodeAcceptNodeIdentification(self):
try: try:
node_type, uuid, ip_address, port, num_partitions, num_replicas = unpack('!H16s4sHHH', self._body) node_type, uuid, ip_address, port, num_partitions, num_replicas \
= unpack('!H16s4sHLL', self._body)
ip_address = inet_ntoa(ip_address) ip_address = inet_ntoa(ip_address)
except: except:
raise ProtocolError(self, 'invalid accept node identification') raise ProtocolError(self, 'invalid accept node identification')
...@@ -825,25 +829,25 @@ class Packet(object): ...@@ -825,25 +829,25 @@ class Packet(object):
return tid return tid
decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID
def _decodeAskNewOIDList(self): def _decodeAskNewOIDs(self):
try: try:
num_oid = unpack('!H', self._body) num_oids = unpack('!H', self._body)
except: except:
raise ProtocolError(self, 'invalid ask new oid list') raise ProtocolError(self, 'invalid ask new oids')
return num_oid return num_oids
decode_table[ASK_NEW_OID_LIST] = _decodeAskNewOIDList decode_table[ASK_NEW_OIDS] = _decodeAskNewOIDs
def _decodeAnswerNewOIDList(self): def _decodeAnswerNewOIDs(self):
try: try:
n = unpack('!H', self._body[:2]) n = unpack('!H', self._body[:2])
oid_list = [] oid_list = []
for i in xrange(n): for i in xrange(n):
oid = unpack('8s', self._body[2+i*8:12+i*8]) oid = unpack('8s', self._body[2+i*8:10+i*8])
oid_list.append(oid) oid_list.append(oid)
except: except:
raise ProtocolError(self, 'invalid new oid list') raise ProtocolError(self, 'invalid answer new oids')
return oid_list return oid_list
decode_table[ANSWER_NEW_OID_LIST] = _decodeAnswerNewOIDList decode_table[ANSWER_NEW_OIDS] = _decodeAnswerNewOIDs
def _decodeFinishTransaction(self): def _decodeFinishTransaction(self):
try: try:
...@@ -911,32 +915,39 @@ class Packet(object): ...@@ -911,32 +915,39 @@ class Packet(object):
def _decodeAskStoreObject(self): def _decodeAskStoreObject(self):
try: try:
oid, serial, tid, compressed, crc, data_len = \ oid, serial, compression, checksum, data_len \
unpack('!8s8s8sHLQ', self._body[:38]) = unpack('!8s8sBLL', self._body[:25])
data = unpack('%ds' %(data_len,), self._body[38:]) data = self._body[25:]
except: except:
raise ProtocolError(self, 'invalid ask store object') raise ProtocolError(self, 'invalid ask store object')
return oid, serial, tid, compressed, crc, data if data_len != len(data):
raise ProtocolError(self, 'invalid data size')
return oid, serial, compression, checksum, data
decode_table[ASK_STORE_OBJECT] = _decodeAskStoreObject decode_table[ASK_STORE_OBJECT] = _decodeAskStoreObject
def _decodeAnswerStoreObject(self): # XXX maybe to be redefine def _decodeAnswerStoreObject(self): # XXX maybe to be redefine
try: try:
status, oid = unpack('!H8s', self._body) conflicting, oid, serial = unpack('!B8s8s', self._body)
except: except:
raise ProtocolError(self, 'invalid answer store object') raise ProtocolError(self, 'invalid answer store object')
return status, oid return conflicting, oid, serial
decode_table[ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject decode_table[ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject
def _decodeAskStoreTransaction(self): def _decodeAskStoreTransaction(self):
try: try:
tid, oid_len, user_len, desc_len, ext_len = unpack('!8sLLLL', self._body[:24]) tid, oid_len, user_len, desc_len, ext_len \
user = unpack('%ds' %(user_len), self._body[24:24+user_len]) = unpack('!8sLHHH', self._body[:18])
desc = unpack('%ds' %(desc_len), self._body[24+user_len:24+user_len+desc_len]) offset = 18
ext = unpack('%ds' %(ext_len), self._body[24+user_len+desc_len:24+user_len+desc_len+ext_len]) user = unpack('8s', self._body[offset:offset+user_len])
offset += user_len
desc = unpack('8s', self._body[offset:offset+desc_len])
offset += desc_len
ext = unpack('8s', self._body[offset:offset+ext_len])
offset += ext_len
oid_list = [] oid_list = []
txn_len = user_len+desc_len+ext_len
for i in xrange(oid_len): for i in xrange(oid_len):
oid = unpack('8s', self._body[24+txn_len+i*8:24+txn_len+8+i*8]) oid = unpack('8s', self._body[offset:offset+8])
offset += 8
oid_list.append(oid) oid_list.append(oid)
except: except:
raise ProtocolError(self, 'invalid ask store transaction') raise ProtocolError(self, 'invalid ask store transaction')
...@@ -956,14 +967,17 @@ class Packet(object): ...@@ -956,14 +967,17 @@ class Packet(object):
oid, serial = unpack('8s8s', self._body) oid, serial = unpack('8s8s', self._body)
except: except:
raise ProtocolError(self, 'invalid ask object by oid') raise ProtocolError(self, 'invalid ask object by oid')
return oid return oid, serial
decode_table[ASK_OBJECT_BY_OID] = _decodeAskObjectByOID decode_table[ASK_OBJECT_BY_OID] = _decodeAskObjectByOID
def _decodeAnswerObjectByOID(self): def _decodeAnswerObjectByOID(self):
try: try:
oid, serial, data_len, compressed, crc = unpack('!8s8sQHL', self._body[:20]) oid, serial, compression, checksum, data_len \
data = unpack('%ds' %(data_len,), self._body[30:]) = unpack('!8s8sBLL', self._body[:25])
data = self._body[25:]
except: except:
raise ProtocolError(self, 'invalid answer object by oid') raise ProtocolError(self, 'invalid answer object by oid')
return oid, serial, compressed, crc, data if len(data) != data_len:
raise ProtocolError(self, 'invalid data size')
return oid, serial, compression, checksum, data
decode_table[ANSWER_OBJECT_BY_OID] = _decodeAnswerObjectByOID decode_table[ANSWER_OBJECT_BY_OID] = _decodeAnswerObjectByOID
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment