Commit 073a88a4 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Change some protocols to be more generic. Only Ask Object History has changed the API.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@178 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent cf282a92
......@@ -771,7 +771,7 @@ class Application(object):
try:
msg_id = conn.getNextId()
p = Packet()
p.askObjectHistory(msg_id, oid, length)
p.askObjectHistory(msg_id, oid, 0, length)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue())
......
......@@ -246,7 +246,7 @@ class EventHandler(object):
def handleAnswerTransactionInformation(self, conn, packet, tid, user, desc, oid_list):
self.handleUnexpectedPacket(conn, packet)
def handleAskObjectHistory(self, conn, packet, oid, length):
def handleAskObjectHistory(self, conn, packet, oid, first, last):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
......
......@@ -152,10 +152,10 @@ ASK_OBJECT = 0x001b
ANSWER_OBJECT = 0x801b
# Ask for TIDs between a range of offsets. The order of TIDs is descending,
# and the range is [first, last). C -> S.
# and the range is [first, last). C, S -> S.
ASK_TIDS = 0x001d
# Answer the requested TIDs. S -> C.
# Answer the requested TIDs. S -> C, S.
ANSWER_TIDS = 0x801d
# Ask information about a transaction. PM, C -> S.
......@@ -164,10 +164,11 @@ ASK_TRANSACTION_INFORMATION = 0x001e
# Answer information (user, description) about a transaction. S -> C, PM.
ANSWER_TRANSACTION_INFORMATION = 0x801e
# Ask history information for a given object. C -> S.
# Ask history information for a given object. The order of serials is
# descending, and the range is [first, last]. C, S -> S.
ASK_OBJECT_HISTORY = 0x001f
# Answer history information (serial, size) for an object. S -> C.
# Answer history information (serial, size) for an object. S -> C, S.
ANSWER_OBJECT_HISTORY = 0x801f
# Error codes.
......@@ -577,7 +578,7 @@ class Packet(object):
def askTIDs(self, msg_id, first, last):
self._id = msg_id
self._type = ASK_TIDS
self._body = pack('!LL', first, last)
self._body = pack('!QQ', first, last)
return self
def answerTIDs(self, msg_id, tid_list):
......@@ -604,19 +605,19 @@ class Packet(object):
self._body = ''.join(body)
return self
def askObjectHistory(self, msg_id, oid, length):
def askObjectHistory(self, msg_id, oid, first, last):
self._id = msg_id
self._type = ASK_OBJECT_HISTORY
self._body = pack('!8sH', oid, length)
self._body = pack('!8sQQ', oid, first, last)
return self
def answerObjectHistory(self, msg_id, oid, history_list):
self._id = msg_id
self._type = ANSWER_OBJECT_HISTORY
body = [pack('!8sH', oid, len(history_list))]
body = [pack('!8sL', oid, len(history_list))]
# history_list is a list of tuple (serial, size)
for history_tuple in history_list:
body.append(pack('8sL', history_tuple[0], history_tuple[1]))
body.append(pack('!8sL', history_tuple[0], history_tuple[1]))
self._body = ''.join(body)
return self
......@@ -1024,7 +1025,7 @@ class Packet(object):
def _decodeAskTIDs(self):
try:
first, last = unpack('!LL', self._body[:8])
first, last = unpack('!QQ', self._body)
except:
raise ProtocolError(self, 'invalid ask tids')
return first, last
......@@ -1069,18 +1070,18 @@ class Packet(object):
def _decodeAskObjectHistory(self):
try:
oid, length = unpack('!8sH', self._body)
oid, first, last = unpack('!8sQQ', self._body)
except:
raise ProtocolError(self, 'invalid ask object history')
return oid, length
return oid, first, last
decode_table[ASK_OBJECT_HISTORY] = _decodeAskObjectHistory
def _decodeAnswerObjectHistory(self):
try:
oid, length = unpack('!8sH', self._body[:10])
oid, length = unpack('!8sL', self._body[:12])
history_list = []
for i in xrange(length):
serial, size = unpack('!8sL', self._body[10+i*12:22+i*12])
for i in xrange(12, 12 + length * 12, 12):
serial, size = unpack('!8sL', self._body[i:i+12])
history_list.append(tuple(serial, size))
except:
raise ProtocolError(self, 'invalid answer object history')
......
......@@ -129,7 +129,7 @@ class DatabaseManager(object):
area as well."""
raise NotImplementedError('this method must be overridden')
def getObjectHistory(self, oid, length = 1):
def getObjectHistory(self, oid, offset = 0, length = 1):
"""Return a list of serials and sizes for a given object ID.
The length specifies the maximum size of such a list. The first serial
must be the last serial, and the list must be sorted in descending
......
......@@ -195,7 +195,7 @@ class StorageEventHandler(EventHandler):
def handleAskTIDs(self, conn, packet, first, last):
self.handleUnexpectedPacket(conn, packet)
def handleAskObjectHistory(self, conn, packet, oid, length):
def handleAskObjectHistory(self, conn, packet, oid, first, last):
self.handleUnexpectedPacket(conn, packet)
def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
......
......@@ -459,12 +459,12 @@ class MySQLDatabaseManager(DatabaseManager):
return oid_list, user, desc, ext
return None
def getObjectHistory(self, oid, length = 1):
def getObjectHistory(self, oid, offset = 0, length = 1):
q = self.query
oid = u64(oid)
r = q("""SELECT serial, LENGTH(value) FROM obj WHERE oid = %d
ORDER BY serial DESC LIMIT %d""" \
% (oid, length))
ORDER BY serial DESC LIMIT %d,%d""" \
% (oid, offset, length))
if r:
return [(p64(serial), length) for serial, length in r]
return None
......
......@@ -336,9 +336,14 @@ class OperationEventHandler(StorageEventHandler):
app.num_partitions, partition_list)
conn.addPacket(Packet().answerTIDs(packet.getId(), tid_list))
def handleAskObjectHistory(self, conn, packet, oid, length):
def handleAskObjectHistory(self, conn, packet, oid, first, last):
if first >= last:
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid offsets'))
return
app = self.app
history_list = app.dm.getObjectHistory(oid, length)
history_list = app.dm.getObjectHistory(oid, first, last - first)
conn.addPacket(Packet().answerObjectHistory(packet.getId(),
history_list))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment