Commit dc220d04 authored by Julien Muchembled's avatar Julien Muchembled

fixup! importer: fetch and process the data to import in a separate process

- for FileStorage DB, make sure a transaction index is built at most once
- for other DB types, reopen the DB in the subprocess

Now that we have specific code for FileStorage, the generic case is not tested
anymore. We should add a test using ZEO. Or better, and in some way crazy,
one with NEO, but one would need to fix a special case in getObject.
parent a6d4c4e9
...@@ -29,6 +29,7 @@ try: ...@@ -29,6 +29,7 @@ try:
except ImportError: except ImportError:
from cPickle import dumps, loads from cPickle import dumps, loads
_protocol = 1 _protocol = 1
from ZODB.FileStorage import FileStorage
from . import buildDatabaseManager, DatabaseFailure from . import buildDatabaseManager, DatabaseFailure
from .manager import DatabaseManager from .manager import DatabaseManager
...@@ -202,15 +203,19 @@ class ZODB(object): ...@@ -202,15 +203,19 @@ class ZODB(object):
def __getstate__(self): def __getstate__(self):
state = self.__dict__.copy() state = self.__dict__.copy()
del state["data_tid"], state["storage"] del state["_connect"], state["data_tid"], state["storage"]
return state return state
def connect(self, storage): def connect(self, storage):
self.data_tid = {} self.data_tid = {}
config, _ = loadConfigFile(getStorageSchema(), StringIO(storage)) config, _ = loadConfigFile(getStorageSchema(), StringIO(storage))
section = config.storage section = config.storage
if 'read_only' in section.config.getSectionAttributes(): def _connect():
has_next_oid = section.config.read_only = hasattr(self, 'next_oid') self.storage = section.open()
self._connect = _connect
config = section.config
if 'read_only' in config.getSectionAttributes():
has_next_oid = config.read_only = hasattr(self, 'next_oid')
if not has_next_oid: if not has_next_oid:
import gc import gc
# This will reopen read-only as soon as we know the last oid. # This will reopen read-only as soon as we know the last oid.
...@@ -224,11 +229,11 @@ class ZODB(object): ...@@ -224,11 +229,11 @@ class ZODB(object):
del self.storage del self.storage
gc.collect() # to be sure (maybe only required for PyPy, gc.collect() # to be sure (maybe only required for PyPy,
# if one day we support it) # if one day we support it)
section.config.read_only = True config.read_only = True
self.storage = section.open() self._connect()
return new_oid return new_oid
self.new_oid = new_oid self.new_oid = new_oid
self.storage = section.open() self._connect()
def setup(self, zodb_dict, shift_oid=0): def setup(self, zodb_dict, shift_oid=0):
self.shift_oid = shift_oid self.shift_oid = shift_oid
...@@ -295,13 +300,35 @@ class ZODB(object): ...@@ -295,13 +300,35 @@ class ZODB(object):
class ZODBIterator(object): class ZODBIterator(object):
def __init__(self, zodb, *args, **kw): def __new__(cls, zodb_list, *args):
iterator = zodb.iterator(*args, **kw) def _init(zodb):
self = object.__new__(cls)
iterator = zodb.iterator(*args)
def _next(): def _next():
self.transaction = next(iterator) self.transaction = next(iterator)
_next()
self.zodb = zodb self.zodb = zodb
self.next = _next self.next = _next
return self
def init(zodb):
# FileStorage is fork-safe and in case we don't start iteration
# from the beginning, we want the tid index built at most once
# (by speedupFileStorageTxnLookup).
if FORK and not isinstance(zodb.storage, FileStorage):
def init():
zodb._connect()
return _init(zodb)
return init
return _init(zodb)
def result(zodb_list):
for self in zodb_list:
if callable(self):
self = self()
try:
self.next()
yield self
except StopIteration:
pass
return result(map(init, zodb_list))
tid = property(lambda self: self.transaction.tid) tid = property(lambda self: self.transaction.tid)
...@@ -418,18 +445,23 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -418,18 +445,23 @@ class ImporterDatabaseManager(DatabaseManager):
app.newTask(self._import) app.newTask(self._import)
def _import(self): def _import(self):
p64 = util.p64
u64 = util.u64 u64 = util.u64
tid = p64(self.zodb_tid + 1) if self.zodb_tid else None
zodb_list = ZODBIterator(self.zodb, tid, p64(self.zodb_ltid))
if FORK: if FORK:
from multiprocessing import Process from multiprocessing import Process
from ..shared_queue import Queue from ..shared_queue import Queue
queue = Queue(1<<24) queue = Queue(1<<24)
process = self._import_process = Process( process = self._import_process = Process(
target=lambda: queue(self._iter_zodb())) target=lambda zodb_list: queue(self._iter_zodb(zodb_list)),
args=(zodb_list,))
process.daemon = True process.daemon = True
process.start() process.start()
else: else:
queue = self._iter_zodb() queue = self._iter_zodb(zodb_list)
process = None process = None
del zodb_list
object_list = [] object_list = []
data_id_list = [] data_id_list = []
for txn in queue: for txn in queue:
...@@ -473,17 +505,11 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -473,17 +505,11 @@ class ImporterDatabaseManager(DatabaseManager):
""".split(): """.split():
setattr(self, x, getattr(self.db, x)) setattr(self, x, getattr(self.db, x))
def _iter_zodb(self): def _iter_zodb(self, zodb_list):
util.setproctitle('neostorage: import') util.setproctitle('neostorage: import')
p64 = util.p64 p64 = util.p64
u64 = util.u64 u64 = util.u64
tid = p64(self.zodb_tid + 1) if self.zodb_tid else None zodb_list = list(zodb_list)
zodb_list = []
for zodb in self.zodb:
try:
zodb_list.append(ZODBIterator(zodb, tid, p64(self.zodb_ltid)))
except StopIteration:
pass
if zodb_list: if zodb_list:
tid = None tid = None
_compress = compress.getCompress(self.compress) _compress = compress.getCompress(self.compress)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment