Commit 1154f37b authored by Yoshinori Okuji's avatar Yoshinori Okuji

This is the first complete prototype of the replicator.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@205 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 67f4f5aa
......@@ -152,3 +152,9 @@ class DatabaseManager(object):
"""Return a list of TIDs which are present in a database among
the given list."""
raise NotImplementedError('this method must be overridden')
def getSerialListPresent(self, oid, serial_list):
"""Return a list of serials which are present in a database among
the given list."""
raise NotImplementedError('this method must be overridden')
......@@ -500,3 +500,11 @@ class MySQLDatabaseManager(DatabaseManager):
r = q("""SELECT tid FROM trans WHERE tid in (%s)""" \
% ','.join([str(u64(tid)) for tid in tid_list]))
return [p64(t[0]) for t in r]
def getSerialListPresent(self, oid, serial_list):
q = self.query
oid = u64(oid)
r = q("""SELECT serial FROM obj WHERE oid = %d AND serial in (%s)""" \
% (oid, ','.join([str(u64(serial)) for serial in serial_list])))
return [p64(t[0]) for t in r]
......@@ -65,9 +65,7 @@ class ReplicationEventHandler(StorageEventHandler):
# If I have pending TIDs, check which TIDs I don't have, and
# request the data.
present_tid_list = app.dm.getTIDListPresent(tid_list)
tid_set = set(tid_list)
present_tid_set = set(present_tid_list)
tid_set -= present_tid_set
tid_set = set(tid_list) - set(present_tid_list)
for tid in tid_set:
msg_id = conn.getNextId()
p = Packet()
......@@ -122,7 +120,24 @@ class ReplicationEventHandler(StorageEventHandler):
app = self.app
if history_list:
# Check if I have objects, request those which I don't have.
raise NotImplementedError
serial_list = [t[0] for t in history_list]
present_serial_list = app.dm.getSerialListPresent(oid, serial_list)
serial_set = set(serial_list) - set(present_serial_list)
for serial in serial_set:
msg_id = conn.getNextId()
p = Packet()
p.askObject(msg_id, oid, serial, INVALID_TID)
conn.addPacket(p)
conn.expectMessage(timeout = 300)
# And, ask more serials.
app.replicator.serial_offset += 1000
offset = app.replicator.serial_offset
msg_id = conn.getNextId()
p = Packet()
p.askObjectHistory(msg_id, oid, offset, offset + 1000)
conn.addPacket(p)
conn.expectMessage(timeout = 300)
else:
# This OID is finished. So advance to next.
oid_list = app.replicator.oid_list
......@@ -146,7 +161,14 @@ class ReplicationEventHandler(StorageEventHandler):
conn.addPacket(p)
conn.expectMessage(timeout = 300)
def answerObject(self, msg_id, oid, serial_start, serial_end, compression,
checksum, data):
app = self.app
# Directly store the transaction.
obj = (oid, compression, checksum, data)
app.dm.storeTransaction(serial_start, [obj], None, True)
del obj
del data
class Replicator(object):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment