Commit aa4995fd authored by Yoshinori Okuji's avatar Yoshinori Okuji

Add Ask OIDs and Answer OIDs.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@203 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 09336971
...@@ -253,6 +253,12 @@ class EventHandler(object): ...@@ -253,6 +253,12 @@ class EventHandler(object):
def handleAnswerObjectHistory(self, conn, packet, oid, history_list): def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAskOIDs(self, conn, packet, first, last, partition):
self.handleUnexpectedPacket(conn, packet)
def handleAnswerOIDs(self, conn, packet, tid_list):
self.handleUnexpectedPacket(conn, packet)
# Error packet handlers. # Error packet handlers.
handleNotReady = handleUnexpectedPacket handleNotReady = handleUnexpectedPacket
......
...@@ -158,10 +158,10 @@ ASK_TIDS = 0x001d ...@@ -158,10 +158,10 @@ ASK_TIDS = 0x001d
# Answer the requested TIDs. S -> C, S. # Answer the requested TIDs. S -> C, S.
ANSWER_TIDS = 0x801d ANSWER_TIDS = 0x801d
# Ask information about a transaction. PM, C -> S. # Ask information about a transaction. Any -> S.
ASK_TRANSACTION_INFORMATION = 0x001e ASK_TRANSACTION_INFORMATION = 0x001e
# Answer information (user, description) about a transaction. S -> C, PM. # Answer information (user, description) about a transaction. S -> Any.
ANSWER_TRANSACTION_INFORMATION = 0x801e ANSWER_TRANSACTION_INFORMATION = 0x801e
# Ask history information for a given object. The order of serials is # Ask history information for a given object. The order of serials is
...@@ -171,6 +171,14 @@ ASK_OBJECT_HISTORY = 0x001f ...@@ -171,6 +171,14 @@ ASK_OBJECT_HISTORY = 0x001f
# Answer history information (serial, size) for an object. S -> C, S. # Answer history information (serial, size) for an object. S -> C, S.
ANSWER_OBJECT_HISTORY = 0x801f ANSWER_OBJECT_HISTORY = 0x801f
# Ask for OIDs between a range of offsets. The order of OIDs is descending,
# and the range is [first, last). S -> S.
ASK_OIDS = 0x0020
# Answer the requested OIDs. S -> S.
ANSWER_OIDS = 0x8020
# Error codes. # Error codes.
NOT_READY_CODE = 1 NOT_READY_CODE = 1
OID_NOT_FOUND_CODE = 2 OID_NOT_FOUND_CODE = 2
...@@ -625,6 +633,20 @@ class Packet(object): ...@@ -625,6 +633,20 @@ class Packet(object):
self._body = ''.join(body) self._body = ''.join(body)
return self return self
def askOIDs(self, msg_id, first, last, partition):
self._id = msg_id
self._type = ASK_OIDS
self._body = pack('!QQL', first, last, partition)
return self
def answerOIDs(self, msg_id, oid_list):
self._id = msg_id
self._type = ANSWER_OIDS
body = [pack('!L', len(oid_list))]
body.extend(oid_list)
self._body = ''.join(body)
return self
# Decoders. # Decoders.
def decode(self): def decode(self):
try: try:
...@@ -1095,3 +1117,23 @@ class Packet(object): ...@@ -1095,3 +1117,23 @@ class Packet(object):
return oid, history_list return oid, history_list
decode_table[ANSWER_OBJECT_HISTORY] = _decodeAnswerObjectHistory decode_table[ANSWER_OBJECT_HISTORY] = _decodeAnswerObjectHistory
def _decodeAskOIDs(self):
try:
first, last, partition = unpack('!QQL', self._body)
except:
raise ProtocolError(self, 'invalid ask oids')
return first, last, partition
decode_table[ASK_OIDS] = _decodeAskOIDs
def _decodeAnswerOIDs(self):
try:
n = unpack('!L', self._body[:4])[0]
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[4+i*8:12+i*8])[0]
oid_list.append(oid)
except:
raise ProtocolError(self, 'invalid answer oids')
return (oid_list,)
decode_table[ANSWER_OIDS] = _decodeAnswerOIDs
...@@ -101,13 +101,13 @@ class DatabaseManager(object): ...@@ -101,13 +101,13 @@ class DatabaseManager(object):
"""Drop any unfinished data from a database.""" """Drop any unfinished data from a database."""
raise NotImplementedError('this method must be overridden') raise NotImplementedError('this method must be overridden')
def storeTransaction(self, tid, object_list, transaction): def storeTransaction(self, tid, object_list, transaction, temporary = True):
"""Store a transaction temporarily. Note that this transaction """Store a transaction temporarily, if temporary is true. Note
is not finished yet. The list of objects contains tuples, that this transaction is not finished yet. The list of objects
each of which consists of an object ID, a compression specification, contains tuples, each of which consists of an object ID,
a checksum and object data. The transaction is either None or a compression specification, a checksum and object data.
a tuple of the list of OIDs, user information, a description and The transaction is either None or a tuple of the list of OIDs,
extension information.""" user information, a description and extension information."""
raise NotImplementedError('this method must be overridden') raise NotImplementedError('this method must be overridden')
def finishTransaction(self, tid): def finishTransaction(self, tid):
......
...@@ -375,10 +375,18 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -375,10 +375,18 @@ class MySQLDatabaseManager(DatabaseManager):
raise raise
self.commit() self.commit()
def storeTransaction(self, tid, object_list, transaction): def storeTransaction(self, tid, object_list, transaction, temporary = True):
q = self.query q = self.query
e = self.escape e = self.escape
tid = u64(tid) tid = u64(tid)
if temporary:
obj_table = 'tobj'
trans_table = 'ttrans'
else:
obj_table = 'obj'
trans_table = 'trans'
self.begin() self.begin()
try: try:
# XXX it might be more efficient to insert multiple objects # XXX it might be more efficient to insert multiple objects
...@@ -390,16 +398,16 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -390,16 +398,16 @@ class MySQLDatabaseManager(DatabaseManager):
for oid, compression, checksum, data in object_list: for oid, compression, checksum, data in object_list:
oid = u64(oid) oid = u64(oid)
data = e(data) data = e(data)
q("""INSERT INTO tobj VALUES (%d, %d, %d, %d, '%s')""" \ q("""INSERT INTO %s VALUES (%d, %d, %d, %d, '%s')""" \
% (oid, tid, compression, checksum, data)) % (obj_table, oid, tid, compression, checksum, data))
if transaction is not None: if transaction is not None:
oid_list, user, desc, ext = transaction oid_list, user, desc, ext = transaction
oids = e(''.join(oid_list)) oids = e(''.join(oid_list))
user = e(user) user = e(user)
desc = e(desc) desc = e(desc)
ext = e(ext) ext = e(ext)
q("""INSERT INTO ttrans VALUES (%d, '%s', '%s', '%s', '%s')""" \ q("""INSERT INTO %s VALUES (%d, '%s', '%s', '%s', '%s')""" \
% (tid, oids, user, desc, ext)) % (trans_table, tid, oids, user, desc, ext))
except: except:
self.rollback() self.rollback()
raise raise
......
...@@ -78,6 +78,14 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -78,6 +78,14 @@ class ReplicationEventHandler(StorageEventHandler):
conn.expectMessage(timeout = 300) conn.expectMessage(timeout = 300)
app.replicator.oid_offset = 0 app.replicator.oid_offset = 0
def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list):
app = self.app
# Directly store the transaction.
app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext), True)
class Replicator(object): class Replicator(object):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment