Commit 2a3392eb authored by Julien Muchembled's avatar Julien Muchembled

importer: merge all inserts to 'obj' into a single request per transaction

parent cd33de9f
...@@ -358,11 +358,14 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -358,11 +358,14 @@ class ImporterDatabaseManager(DatabaseManager):
tid = None tid = None
def finish(): def finish():
if tid: if tid:
self.storeTransaction(tid, (), (oid_list, self.storeTransaction(tid, object_list, (
(x[0] for x in object_list),
str(txn.user), str(txn.description), str(txn.user), str(txn.description),
cPickle.dumps(txn.extension), False, tid), False) cPickle.dumps(txn.extension), False, tid), False)
self.releaseData(data_id_list)
logging.debug("TXN %s imported (user=%r, desc=%r, len(oid)=%s)", logging.debug("TXN %s imported (user=%r, desc=%r, len(oid)=%s)",
util.dump(tid), txn.user, txn.description, len(oid_list)) util.dump(tid), txn.user, txn.description, len(object_list))
del object_list[:], data_id_list[:]
if self._last_commit + 1 < time.time(): if self._last_commit + 1 < time.time():
self.commit() self.commit()
self.zodb_tid = u64(tid) self.zodb_tid = u64(tid)
...@@ -371,6 +374,8 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -371,6 +374,8 @@ class ImporterDatabaseManager(DatabaseManager):
else: else:
compress = None compress = None
compression = 0 compression = 0
object_list = []
data_id_list = []
while zodb_list: while zodb_list:
zodb_list.sort() zodb_list.sort()
z = zodb_list[0] z = zodb_list[0]
...@@ -378,7 +383,6 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -378,7 +383,6 @@ class ImporterDatabaseManager(DatabaseManager):
# user/desc/ext from first ZODB are kept. # user/desc/ext from first ZODB are kept.
if tid != z.tid: if tid != z.tid:
finish() finish()
oid_list = []
txn = z.transaction txn = z.transaction
tid = txn.tid tid = txn.tid
yield 1 yield 1
...@@ -396,14 +400,16 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -396,14 +400,16 @@ class ImporterDatabaseManager(DatabaseManager):
if compression: if compression:
data = compressed_data data = compressed_data
checksum = util.makeChecksum(data) checksum = util.makeChecksum(data)
data_id = self.storeData(util.makeChecksum(data), data, data_id = self.holdData(util.makeChecksum(data), data,
compression) compression)
# Write metadata before next yield. This may not be efficient data_id_list.append(data_id)
# but if they were written at the same time as the transaction, object_list.append((oid, data_id, data_tid))
# _pruneData could delete imported but not yet referenced data. # Give the main loop the opportunity to process requests
self.storeTransaction(tid, ((oid, data_id, data_tid),), (), # from other nodes. In particular, clients may commit. If the
False) # storage node exits after such commit, and before we actually
oid_list.append(oid) # update 'obj' with 'object_list', some rows in 'data' may be
# unreferenced. This is not a problem because the leak is
# solved when resuming the migration.
yield 1 yield 1
try: try:
z.next() z.next()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment