Commit b3ae7dbf authored by Yoshinori Okuji's avatar Yoshinori Okuji

Add one more parameter to Answer Transaction Information.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@192 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent bb06437b
......@@ -459,7 +459,8 @@ class ClientEventHandler(EventHandler):
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerTransactionInformation(self, conn, packet, tid, user, desc, oid_list):
def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list):
if isinstance(conn, MTClientConnection):
app = self.app
# transaction information are returned as a dict
......
......@@ -243,7 +243,8 @@ class EventHandler(object):
def handleAskTransactionInformation(self, conn, packet, tid):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerTransactionInformation(self, conn, packet, tid, user, desc, oid_list):
def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list):
self.handleUnexpectedPacket(conn, packet)
def handleAskObjectHistory(self, conn, packet, oid, first, last):
......
......@@ -38,7 +38,7 @@ class MasterEventHandler(EventHandler):
pass
def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, oid_list):
user, desc, ext, oid_list):
logging.info('ignoring Answer Transactin Information')
pass
......
......@@ -335,7 +335,7 @@ class VerificationEventHandler(MasterEventHandler):
app.asking_uuid_dict[uuid] = True
def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, oid_list):
user, desc, ext, oid_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
......
......@@ -596,12 +596,15 @@ class Packet(object):
self._body = pack('!8s', tid)
return self
def answerTransactionInformation(self, msg_id, tid, user, desc, oid_list):
def answerTransactionInformation(self, msg_id, tid, user, desc, ext,
oid_list):
self._id = msg_id
self._type = ANSWER_TRANSACTION_INFORMATION
body = [pack('!8sHHL', tid, len(user), len(desc), len(oid_list))]
body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext),
len(oid_list))]
body.append(user)
body.append(desc)
body.append(ext)
body.extend(oid_list)
self._body = ''.join(body)
return self
......@@ -1054,19 +1057,22 @@ class Packet(object):
def _decodeAnswerTransactionInformation(self):
try:
tid, user_len, desc_len, oid_len = unpack('!8sHHL', self._body[:16])
offset = 16
tid, user_len, desc_len, ext_len, oid_len \
= unpack('!8sHHHL', self._body[:18])
offset = 18
user = self._body[offset:offset+user_len]
offset += user_len
desc = self._body[offset:offset+desc_len]
offset += desc_len
ext = self._body[offset:offset+ext_len]
offset += ext_len
oid_list = []
for i in xrange(oid_len):
oid = unpack('8s', self._body[offset+i*8:offset+8+i*8])[0]
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid answer transaction information')
return tid, user, desc, oid_list
return tid, user, desc, ext_len, oid_list
decode_table[ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformation
def _decodeAskObjectHistory(self):
......
......@@ -141,3 +141,8 @@ class DatabaseManager(object):
at most the specified length. The list of partitions are passed
to filter out non-applicable TIDs."""
raise NotImplementedError('this method must be overridden')
def getTIDListPresent(self, tid_list):
"""Return a list of TIDs which are present in a database among
the given list."""
raise NotImplementedError('this method must be overridden')
......@@ -477,3 +477,9 @@ class MySQLDatabaseManager(DatabaseManager):
','.join([str(p) for p in partition_list]),
offset, length))
return [p64(t[0]) for t in r]
def getTIDListPresent(self, tid_list):
q = self.query
r = q("""SELECT tid FROM trans WHERE tid in (%s)""" \
% ','.join([str(u64(tid)) for tid in tid_list]))
return [p64(t[0]) for t in r]
......@@ -237,7 +237,7 @@ class OperationEventHandler(StorageEventHandler):
p.tidNotFound(packet.getId(), '%s does not exist' % dump(tid))
else:
p.answerTransactionInformation(packet.getId(), tid,
t[1], t[2], t[0])
t[1], t[2], t[3], t[0])
conn.addPacket(p)
def handleAskObjectPresent(self, conn, packet, oid, tid):
......
......@@ -47,6 +47,38 @@ class ReplicationEventHandler(StorageEventHandler):
# Nothing to do.
pass
def handleAnswerTIDs(self, conn, packet, tid_list):
app = self.app
if tid_list:
present_tid_list = app.dm.getTIDListPresent(tid_list)
tid_set = set(tid_list)
present_tid_set = set(present_tid_list)
tid_set -= present_tid_set
for tid in tid_set:
msg_id = conn.getNextId()
p = Packet()
p.askTransactionInformation(msg_id, tid)
conn.addPacket(p)
conn.expectMessage(timeout = 300)
app.replicator.tid_offset += 1000
offset = app.replicator.tid_offset
msg_id = conn.getNextId()
p = Packet()
p.askTIDs(msg_id, offset, offset + 1000,
app.replicator.current_partition.getRID())
conn.addPacket(p)
conn.expectMessage(timeout = 300)
else:
msg_id = conn.getNextId()
p = Packet()
p.askOIDs(msg_id, 0, 1000,
app.replicator.current_partition.getRID())
conn.addPacket(p)
conn.expectMessage(timeout = 300)
app.replicator.oid_offset = 0
class Replicator(object):
"""This class handles replications of objects and transactions.
......@@ -181,6 +213,7 @@ class Replicator(object):
self.current_connection.addPacket(p)
self.current_connection.expectMessage(msg_id)
self.tid_offset = 0
msg_id = self.current_connection.getNextId()
p = Packet()
p.askTIDs(msg_id, 0, 1000, self.current_partition.getRID())
......
......@@ -241,7 +241,7 @@ class VerificationEventHandler(StorageEventHandler):
p.tidNotFound(packet.getId(), '%s does not exist' % dump(tid))
else:
p.answerTransactionInformation(packet.getId(), tid,
t[1], t[2], t[0])
t[1], t[2], t[3], t[0])
conn.addPacket(p)
def handleAskObjectPresent(self, conn, packet, oid, tid):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment