Commit 1b1a9bcd authored by Julien Muchembled's avatar Julien Muchembled

importer: when mapping oids, repickle without loading any object

The use of ZODB.broken was completely wrong so previous code only worked
if NEO could import all classes used by the application.
parent 764b41c0
...@@ -14,9 +14,10 @@ ...@@ -14,9 +14,10 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import cPickle as pickle, time import cPickle, pickle, time
from bisect import bisect, insort from bisect import bisect, insort
from collections import defaultdict from collections import deque
from cStringIO import StringIO
from ConfigParser import SafeConfigParser from ConfigParser import SafeConfigParser
from ZODB.config import storageFromString from ZODB.config import storageFromString
from ZODB.POSException import POSKeyError from ZODB.POSException import POSKeyError
...@@ -34,6 +35,141 @@ class Reference(object): ...@@ -34,6 +35,141 @@ class Reference(object):
self.value = value self.value = value
class Repickler(pickle.Unpickler):
def __init__(self, persistent_map):
self._f = StringIO()
# Use python implementation for unpickling because loading can not
# be customized enough with cPickle.
pickle.Unpickler.__init__(self, self._f)
# For pickling, it is possible to use the fastest implementation,
# which also generates fewer useless PUT opcodes.
self._p = cPickle.Pickler(self._f, 1)
self.memo = self._p.memo # just a tiny optimization
def persistent_id(obj):
if isinstance(obj, Reference):
r = obj.value
del obj.value # minimize refcnt like for deque+popleft
return r
self._p.inst_persistent_id = persistent_id
def persistent_load(obj):
new_obj = persistent_map(obj)
if new_obj is not obj:
self._changed = True
return Reference(new_obj)
self.persistent_load = persistent_load
def _save(self, data):
self._p.dump(data.popleft())
# remove STOP (no need to truncate since it will always be overridden)
self._f.seek(-1, 1)
def __call__(self, data):
f = self._f
f.truncate(0)
f.write(data)
f.reset()
self._changed = False
try:
classmeta = self.load()
state = self.load()
finally:
self.memo.clear()
if self._changed:
f.truncate(0)
try:
self._p.dump(classmeta).dump(state)
finally:
self.memo.clear()
return f.getvalue()
return data
dispatch = pickle.Unpickler.dispatch.copy()
class _noload(object):
state = None
def __new__(cls, dump):
def load(*args):
self = object.__new__(cls)
self.dump = dump
# We use deque+popleft everywhere to minimize the number of
# references at the moment cPickle considers memoizing an
# object. This reduces the number of useless PUT opcodes and
# usually produces smaller pickles than ZODB. Without this,
# they would, on the contrary, increase in size.
# We could also use optimize from pickletools module.
self.args = deque(args)
self._list = deque()
self.append = self._list.append
self.extend = self._list.extend
self._dict = deque()
return self
return load
def __setitem__(self, *args):
self._dict.append(args)
def dict(self):
while self._dict:
yield self._dict.popleft()
def list(self, pos):
pt = self.args.popleft()
f = pt._f
f.seek(pos + 3) # NONE + EMPTY_TUPLE + REDUCE
put = f.read() # preserve memo if any
f.truncate(pos)
f.write(self.dump(pt, self.args) + put)
while self._list:
yield self._list.popleft()
def __reduce__(self):
return None, (), self.state, \
self.list(self.args[0]._f.tell()), self.dict()
@_noload
def _obj(self, args):
self._f.write(pickle.MARK)
while args:
self._save(args)
return pickle.OBJ
def _instantiate(self, klass, k):
args = self.stack[k+1:]
self.stack[k:] = self._obj(klass, *args),
del dispatch[pickle.NEWOBJ] # ZODB has never used protocol 2
@_noload
def find_class(self, args):
module, name = args
return pickle.GLOBAL + module + '\n' + name + '\n'
@_noload
def _reduce(self, args):
self._save(args)
self._save(args)
return pickle.REDUCE
def load_reduce(self):
stack = self.stack
args = stack.pop()
stack[-1] = self._reduce(stack[-1], args)
dispatch[pickle.REDUCE] = load_reduce
def load_build(self):
stack = self.stack
state = stack.pop()
inst = stack[-1]
assert inst.state is None
inst.state = state
dispatch[pickle.BUILD] = load_build
class ZODB(object): class ZODB(object):
def __init__(self, storage, oid=0, **kw): def __init__(self, storage, oid=0, **kw):
...@@ -70,45 +206,31 @@ class ZODB(object): ...@@ -70,45 +206,31 @@ class ZODB(object):
del self.mountpoints del self.mountpoints
return shift_oid return shift_oid
def translate(self, data): def repickle(self, data):
if not (self.shift_oid or self.mapping): if not (self.shift_oid or self.mapping):
self.translate = lambda x: x self.repickle = lambda x: x
return data return data
# We'll have to map oids, so define a reusable pickler for this,
# and also a method that will transform pickles.
pickler = pickle.Pickler(1)
u64 = util.u64 u64 = util.u64
p64 = util.p64 p64 = util.p64
def persistent_id(obj): def map_oid(obj):
if type(obj) is Reference: if isinstance(obj, tuple) and len(obj) == 2:
obj = obj.value
if isinstance(obj, tuple):
oid = u64(obj[0]) oid = u64(obj[0])
cls = obj[1] # If this oid pointed to a mount point, drop 2nd item because
assert not hasattr(cls, '__getnewargs__'), cls # it's probably different than the real class of the new oid.
try:
return p64(self.mapping[oid]), cls
except KeyError:
if not self.shift_oid:
return obj # common case for root db
elif isinstance(obj, str): elif isinstance(obj, str):
oid = u64(obj) oid = u64(obj)
else: else:
raise NotImplementedError( raise NotImplementedError(
"Unsupported external reference: %r" % obj) "Unsupported external reference: %r" % obj)
return p64(self.mapping.get(oid, oid + self.shift_oid)) try:
pickler.inst_persistent_id = persistent_id return p64(self.mapping[oid])
dump = pickler.dump except KeyError:
from cStringIO import StringIO if not self.shift_oid:
from ZODB.broken import find_global return obj # common case for root db
Unpickler = pickle.Unpickler oid = p64(oid + self.shift_oid)
def translate(data): return oid if isinstance(obj, str) else (oid, obj[1])
u = Unpickler(StringIO(data)) self.repickle = Repickler(map_oid)
u.persistent_load = Reference return self.repickle(data)
u.find_global = find_global
return dump(u.load()).dump(u.load()).getvalue()
self.translate = translate
return translate(data)
def __getattr__(self, attr): def __getattr__(self, attr):
if attr == '__setstate__': if attr == '__setstate__':
...@@ -200,14 +322,14 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -200,14 +322,14 @@ class ImporterDatabaseManager(DatabaseManager):
if zodb_state: if zodb_state:
logging.warning("Ignoring configuration file for oid mapping." logging.warning("Ignoring configuration file for oid mapping."
" Reloading it from NEO storage.") " Reloading it from NEO storage.")
zodb = pickle.loads(zodb_state) zodb = cPickle.loads(zodb_state)
for k, v in self.zodb: for k, v in self.zodb:
zodb[k].connect(v["storage"]) zodb[k].connect(v["storage"])
else: else:
zodb = {k: ZODB(**v) for k, v in self.zodb} zodb = {k: ZODB(**v) for k, v in self.zodb}
x, = (x for x in zodb.itervalues() if not x.oid) x, = (x for x in zodb.itervalues() if not x.oid)
x.setup(zodb) x.setup(zodb)
self.setConfiguration("zodb", pickle.dumps(zodb)) self.setConfiguration("zodb", cPickle.dumps(zodb))
self.zodb_index, self.zodb = zip(*sorted( self.zodb_index, self.zodb = zip(*sorted(
(x.shift_oid, x) for x in zodb.itervalues())) (x.shift_oid, x) for x in zodb.itervalues()))
self.zodb_ltid = max(x.ltid for x in self.zodb) self.zodb_ltid = max(x.ltid for x in self.zodb)
...@@ -235,7 +357,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -235,7 +357,7 @@ class ImporterDatabaseManager(DatabaseManager):
if tid: if tid:
self.storeTransaction(tid, (), (oid_list, self.storeTransaction(tid, (), (oid_list,
str(txn.user), str(txn.description), str(txn.user), str(txn.description),
pickle.dumps(txn.extension), False, tid), False) cPickle.dumps(txn.extension), False, tid), False)
logging.debug("TXN %s imported (user=%r, desc=%r, len(oid)=%s)", logging.debug("TXN %s imported (user=%r, desc=%r, len(oid)=%s)",
util.dump(tid), txn.user, txn.description, len(oid_list)) util.dump(tid), txn.user, txn.description, len(oid_list))
if self._last_commit + 1 < time.time(): if self._last_commit + 1 < time.time():
...@@ -264,7 +386,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -264,7 +386,7 @@ class ImporterDatabaseManager(DatabaseManager):
if data_tid or r.data is None: if data_tid or r.data is None:
data_id = None data_id = None
else: else:
data = zodb.translate(r.data) data = zodb.repickle(r.data)
if compress: if compress:
compressed_data = compress(data) compressed_data = compress(data)
compression = len(compressed_data) < len(data) compression = len(compressed_data) < len(data)
...@@ -350,7 +472,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -350,7 +472,7 @@ class ImporterDatabaseManager(DatabaseManager):
if u_tid <= self.zodb_tid and o: if u_tid <= self.zodb_tid and o:
return o return o
if value: if value:
value = zodb.translate(value) value = zodb.repickle(value)
checksum = util.makeChecksum(value) checksum = util.makeChecksum(value)
else: else:
# CAVEAT: Although we think loadBefore should not return an empty # CAVEAT: Although we think loadBefore should not return an empty
...@@ -372,7 +494,7 @@ class ImporterDatabaseManager(DatabaseManager): ...@@ -372,7 +494,7 @@ class ImporterDatabaseManager(DatabaseManager):
shift_oid = zodb.shift_oid shift_oid = zodb.shift_oid
return ([p64(u64(x.oid) + shift_oid) for x in txn], return ([p64(u64(x.oid) + shift_oid) for x in txn],
txn.user, txn.description, txn.user, txn.description,
pickle.dumps(txn.extension), 0, tid) cPickle.dumps(txn.extension), 0, tid)
else: else:
return self.db.getTransaction(tid, all) return self.db.getTransaction(tid, all)
......
...@@ -15,19 +15,119 @@ ...@@ -15,19 +15,119 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import deque from collections import deque
from cPickle import Pickler, Unpickler
from cStringIO import StringIO
from itertools import islice, izip_longest from itertools import islice, izip_longest
import os, time, unittest import os, time, unittest
import neo, transaction, ZODB import neo, transaction, ZODB
from neo.lib import logging from neo.lib import logging
from neo.lib.util import u64 from neo.lib.util import u64
from neo.storage.database.importer import Repickler
from ..fs2zodb import Inode from ..fs2zodb import Inode
from .. import getTempDirectory from .. import getTempDirectory
from . import NEOCluster, NEOThreadedTest from . import NEOCluster, NEOThreadedTest
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
class Equal:
_recurse = {}
def __hash__(self):
return 1
def __eq__(self, other):
return type(self) is type(other) and self.__dict__ == other.__dict__
def __repr__(self):
return "<%s(%s)>" % (self.__class__.__name__,
", ".join("%s=%r" % k for k in self.__dict__.iteritems()))
class Reduce(Equal, object):
state = None
def __init__(self, *args):
self.args = args
self._l = []
self._d = []
def append(self, item):
self._l.append(item)
def extend(self, item):
self._l.extend(item)
def __setitem__(self, *args):
self._d.append(args)
def __setstate__(self, state):
self.state = state
def __reduce__(self):
r = self.__class__, self.args, self.state, iter(self._l), iter(self._d)
return r[:5 if self._d else
4 if self._l else
3 if self.state is not None else
2]
class Obj(Equal):
state = None
def __getinitargs__(self):
return self.args
def __init__(self, *args):
self.args = args
def __getstate__(self):
return self.state
def __setstate__(self, state):
self.state = state
class NewObj(Obj, object):
def __init__(self):
pass # __getinitargs__ only work with old-style classes
class DummyRepickler(Repickler):
def __init__(self):
Repickler.__init__(self, None)
_changed = True
def __setattr__(self, name, value):
if name != "_changed":
self.__dict__[name] = value
class ImporterTests(NEOThreadedTest): class ImporterTests(NEOThreadedTest):
def testRepickler(self):
r2 = Obj("foo")
r2.__setstate__("bar")
r2 = Reduce(r2)
r3 = Reduce(1, 2)
r3.__setstate__(NewObj())
r4 = Reduce()
r4.args = r2.args
r4.__setstate__("bar")
r4.extend("!!!")
r5 = Reduce()
r5.append("!!!")
r5["foo"] = "bar"
state = {r2: r3, r4: r5}
p = StringIO()
Pickler(p, 1).dump(Obj).dump(state)
p = p.getvalue()
r = DummyRepickler()(p)
load = Unpickler(StringIO(r)).load
self.assertIs(Obj, load())
self.assertDictEqual(state, load())
def test(self): def test(self):
importer = [] importer = []
fs_dir = os.path.join(getTempDirectory(), self.id()) fs_dir = os.path.join(getTempDirectory(), self.id())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment