Commit de4c9e1d authored by Julien Muchembled's avatar Julien Muchembled

New 'Importer' storage backend

parent 53df0e1a
...@@ -33,7 +33,9 @@ ZODB API is fully implemented except: ...@@ -33,7 +33,9 @@ ZODB API is fully implemented except:
(full implementation is considered) (full implementation is considered)
- blobs: not implemented (not considered yet) - blobs: not implemented (not considered yet)
There is a simple way to convert FileStorage to NEO and back again. Any ZODB like FileStorage can be converted to NEO instanteously,
which means the database is operational before all data are imported.
There's also a tool to convert back to FileStorage.
See also http://www.neoppod.org/links for more detailed information about See also http://www.neoppod.org/links for more detailed information about
features related to scalability. features related to scalability.
...@@ -88,7 +90,7 @@ e. Clients can connect when the cluster is in RUNNING state:: ...@@ -88,7 +90,7 @@ e. Clients can connect when the cluster is in RUNNING state::
$ neoctl -a <admin> print cluster $ neoctl -a <admin> print cluster
RUNNING RUNNING
f. See `neomigrate` command to import an existing FileStorage database, f. See `importer.conf` file to import an existing database,
or `neoctl` command for more administrative tasks. or `neoctl` command for more administrative tasks.
Alternatively, you can use `neosimple` command to quickly setup a cluster for Alternatively, you can use `neosimple` command to quickly setup a cluster for
......
# This file describes how to use the NEO Importer storage backend,
# which is the recommended way to import the data of an existing Zope database
# into a NEO cluster.
#
# Note that the 'neomigrate' command is another way to migrate to NEO but:
# - the database is unusable as long as migration is not finished;
# - it can not merge bases at mountpoints;
# - it can not resume an interrupted migration;
# - it does not preserve metadata that are specific to "undo" transactions
# (which would then behave like normal transactions);
# - it only supports conversion from FileStorage;
# - it is slower.
# The only advantage of 'neomigrate' over Importer is that data can be imported
# directly to a NEO cluster with replicas or several storage nodes.
# Importer backend can only be used with a single storage node.
#
# Here is how to proceed once this file ready:
# 1. Restart ZODB clients to connect to new NEO cluster (not started yet).
# 2. Start NEO cluster and use 'neoctl -a <admin> start' command.
# 3. Your Zope applications should work and background migration of data
# started automatically. The only downtime depends on how fast you are to do
# the first 2 steps. The only limitations in this mode are that:
# - pack is not supported
# - IStorage.history() is not implemented (this is used for example by
# history tab in Zope Management Interface)
# 4. A warning message reporting that "All data are imported"
# is emitted to storage node's log when the migration is finished.
# This can take days with big databases.
# The following steps can be scheduled any time after migration is over,
# at your convenience:
# 5. Change NEO configuration to stop using Importer backend.
# 6. Stop clients.
# 7. Restart NEO cluster.
# 8. Start clients. This was the second, very short, downtime.
# 9. Optionally, add storage nodes and balance partitions.
#
# Data are migrated even if your ZODB clients are stopped.
# The first section describes the destination NEO storage.
# See neo.conf for description of parameters.
[neo]
# Once migration is finished, restart NEO storage to use the below directly
# (instead of adapter=Importer & database=/path_to_this_file).
adapter=MySQL
database=neo
# The other sections are for source databases.
[root]
# Example with FileStorage but this can be anything else.
# ZEO is possible but less efficient: ZEO servers must be stopped
# if NEO opens FileStorage DBs directly.
storage=
<filestorage>
path /path/to/root.fs
</filestorage>
# (leading spaces indicate value continuation)
# This file can stop here if your source DB is not splitted.
# Otherwise, you need to describe mountpoints and other ZODB.
# OID mapping can't be changed once the NEO cluster is started with this
# configuration.
# Mountpoints for this ZODB (see the use case at the end of this file).
# <section_name>=<oid>
foo=421
bar=666
# Following sections must define 'oid' parameter.
[foo]
# Any reference to oid 421 in 'root' is changed to point to oid 123 of 'foo'.
# Of course, original oid 421 in 'root' will become unreferenced.
oid=123
storage=
<filestorage>
path /path/to/foo.fs
</filestorage>
baz=1000
[bar]
oid=4567
storage=
<filestorage>
path /path/to/bar.fs
</filestorage>
[baz]
oid=2000
storage=
<filestorage>
path /path/to/baz.fs
</filestorage>
## Case of several databases linked with MountedObject objects
#
# MountedObject is provided by ZODBMountPoint Zope product.
# It relies on IAcquirer and ITraversable to fetch the real object.
#
# Given the following multi-base:
# - in 'foo', /a/b is the real object
# - in 'root', /c/d is a MountedObject object configured to point to
# /a/b in 'foo'
#
# So you need to get the oid of /a/b in 'foo':
# unrestrictedTraverse("/a/b")._p_oid
# which equals to 123 in the above example
#
# And that of /c/d in 'root':
# unrestrictedTraverse("/c").__dict__["d"]._p_oid -> 421
# The way to retrieve the mount point depends on the container type.
# For a BTreeFolder2, it would be: c._tree["d"]._p_oid
...@@ -2,6 +2,9 @@ ...@@ -2,6 +2,9 @@
# must be identical for all nodes in a given cluster. # must be identical for all nodes in a given cluster.
# This file is optionnal: parameters can be given at the command line. # This file is optionnal: parameters can be given at the command line.
# See SafeConfigParser at https://docs.python.org/2/library/configparser.html
# for more information about the syntax.
# Common parameters. # Common parameters.
[DEFAULT] [DEFAULT]
# The cluster name # The cluster name
......
...@@ -122,6 +122,11 @@ class Storage(BaseStorage.BaseStorage, ...@@ -122,6 +122,11 @@ class Storage(BaseStorage.BaseStorage,
raise POSException.POSKeyError(oid) raise POSException.POSKeyError(oid)
def loadBefore(self, oid, tid): def loadBefore(self, oid, tid):
# XXX: FileStorage return an empty string for a deleted object
# but it may cause EOFError exceptions in ZODB.Connection
# and it makes impossible to store empty values.
# We think it's wrong behaviour and raise POSKeyError instead.
# Or maybe we should return None?
try: try:
return self.app.load(oid, None, tid) return self.app.load(oid, None, tid)
except NEOStorageDoesNotExistError: except NEOStorageDoesNotExistError:
......
...@@ -46,9 +46,12 @@ def main(args=None): ...@@ -46,9 +46,12 @@ def main(args=None):
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from neo.client.Storage import Storage as NEOStorage from neo.client.Storage import Storage as NEOStorage
if os.path.exists(source): if os.path.exists(source):
print("NOTE: NEO does not implement IStorageRestoreable interface," print("WARNING: This is not the recommended way to import data to NEO:"
" which means that undo information is not preserved: conflict" " you should use Imported backend instead.\n"
" resolution could happen when undoing an old transaction.") "NEO also does not implement IStorageRestoreable interface,"
" which means that undo information is not preserved when using"
" this tool: conflict resolution could happen when undoing an"
" old transaction.")
src = FileStorage(file_name=source, read_only=True) src = FileStorage(file_name=source, read_only=True)
dst = NEOStorage(master_nodes=destination, name=cluster, dst = NEOStorage(master_nodes=destination, name=cluster,
logfile=options.logfile) logfile=options.logfile)
......
...@@ -65,6 +65,7 @@ UNIT_TEST_MODULES = [ ...@@ -65,6 +65,7 @@ UNIT_TEST_MODULES = [
'neo.tests.client.testConnectionPool', 'neo.tests.client.testConnectionPool',
# light functional tests # light functional tests
'neo.tests.threaded.test', 'neo.tests.threaded.test',
'neo.tests.threaded.testImporter',
'neo.tests.threaded.testReplication', 'neo.tests.threaded.testReplication',
] ]
......
...@@ -289,7 +289,8 @@ class Application(object): ...@@ -289,7 +289,8 @@ class Application(object):
"""Handle everything, including replications and transactions.""" """Handle everything, including replications and transactions."""
logging.info('doing operation') logging.info('doing operation')
_poll = self._poll poll = self._poll
_poll = self.em._poll
isIdle = self.em.isIdle isIdle = self.em.isIdle
handler = master.MasterOperationHandler(self) handler = master.MasterOperationHandler(self)
...@@ -301,14 +302,18 @@ class Application(object): ...@@ -301,14 +302,18 @@ class Application(object):
self.task_queue = task_queue = deque() self.task_queue = task_queue = deque()
try: try:
self.dm.doOperation(self)
while True: while True:
while task_queue and isIdle(): while task_queue:
try: try:
task_queue[-1].next() while isIdle():
task_queue.rotate() if task_queue[-1].next():
_poll(0)
task_queue.rotate()
break
except StopIteration: except StopIteration:
task_queue.pop() task_queue.pop()
_poll() poll()
finally: finally:
del self.task_queue del self.task_queue
# XXX: Although no handled exception should happen between # XXX: Although no handled exception should happen between
......
...@@ -20,6 +20,7 @@ from neo.lib.exception import DatabaseFailure ...@@ -20,6 +20,7 @@ from neo.lib.exception import DatabaseFailure
from .manager import DatabaseManager from .manager import DatabaseManager
DATABASE_MANAGER_DICT = { DATABASE_MANAGER_DICT = {
'Importer': 'importer.ImporterDatabaseManager',
'MySQL': 'mysqldb.MySQLDatabaseManager', 'MySQL': 'mysqldb.MySQLDatabaseManager',
'SQLite': 'sqlite.SQLiteDatabaseManager', 'SQLite': 'sqlite.SQLiteDatabaseManager',
} }
......
#
# Copyright (C) 2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import cPickle as pickle
from bisect import bisect, insort
from collections import defaultdict
from ConfigParser import SafeConfigParser
from ZODB.config import storageFromString
from ZODB.POSException import POSKeyError
from . import buildDatabaseManager, DatabaseManager
from neo.lib import logging, util
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH, MAX_TID
class Reference(object):
__slots__ = "value",
def __init__(self, value):
self.value = value
class ZODB(object):
def __init__(self, storage, oid=0, **kw):
self.oid = int(oid)
self.mountpoints = {k: int(v) for k, v in kw.iteritems()}
self.connect(storage)
self.ltid = util.u64(self.lastTransaction())
if not self.ltid:
raise DatabaseFailure("Can not import empty storage: %s" % storage)
self.mapping = {}
def __getstate__(self):
state = self.__dict__.copy()
del state["data_tid"], state["storage"]
return state
def connect(self, storage):
self.data_tid = {}
self.storage = storageFromString(storage)
def setup(self, zodb_dict, shift_oid=0):
self.shift_oid = shift_oid
self.next_oid = util.u64(self.new_oid())
shift_oid += self.next_oid
for mp, oid in self.mountpoints.iteritems():
mp = zodb_dict[mp]
new_oid = mp.oid
try:
new_oid += mp.shift_oid
except AttributeError:
new_oid += shift_oid
shift_oid = mp.setup(zodb_dict, shift_oid)
self.mapping[oid] = new_oid
del self.mountpoints
return shift_oid
def translate(self, data):
if not (self.shift_oid or self.mapping):
self.translate = lambda x: x
return data
# We'll have to map oids, so define a reusable pickler for this,
# and also a method that will transform pickles.
pickler = pickle.Pickler(1)
u64 = util.u64
p64 = util.p64
def persistent_id(obj):
if type(obj) is Reference:
obj = obj.value
if isinstance(obj, tuple):
oid = u64(obj[0])
cls = obj[1]
assert not hasattr(cls, '__getnewargs__'), cls
try:
return p64(self.mapping[oid]), cls
except KeyError:
if not self.shift_oid:
return obj # common case for root db
elif isinstance(obj, str):
oid = u64(obj)
else:
raise NotImplementedError(
"Unsupported external reference: %r" % obj)
return p64(self.mapping.get(oid, oid + self.shift_oid))
pickler.inst_persistent_id = persistent_id
dump = pickler.dump
from cStringIO import StringIO
from ZODB.broken import find_global
Unpickler = pickle.Unpickler
def translate(data):
u = Unpickler(StringIO(data))
u.persistent_load = Reference
u.find_global = find_global
return dump(u.load()).dump(u.load()).getvalue()
self.translate = translate
return translate(data)
def __getattr__(self, attr):
return getattr(self.storage, attr)
def getDataTid(self, oid, tid):
try:
return self.data_tid[tid][oid]
except KeyError:
assert tid not in self.data_tid, (oid, tid)
p_tid = util.p64(tid)
txn = next(self.storage.iterator(p_tid))
if txn.tid != p_tid:
raise
u64 = util.u64
txn = self.data_tid[tid] = {u64(x.oid): x.data_txn for x in txn}
return txn[oid]
class ZODBIterator(object):
def __init__(self, zodb, *args, **kw):
iterator = zodb.iterator(*args, **kw)
def _next():
self.transaction = next(iterator)
_next()
self.zodb = zodb
self.next = _next
tid = property(lambda self: self.transaction.tid)
def __lt__(self, other):
return self.tid < other.tid or self.tid == other.tid \
and self.zodb.shift_oid < other.zodb.shift_oid
class ImporterDatabaseManager(DatabaseManager):
"""Proxy that transparently imports data from a ZODB storage
"""
def __init__(self, *args, **kw):
super(ImporterDatabaseManager, self).__init__(*args, **kw)
self.db._connect()
_uncommitted_data = property(
lambda self: self.db._uncommitted_data,
lambda self, value: setattr(self.db, "_uncommitted_data", value))
def _parse(self, database):
config = SafeConfigParser()
config.read(database)
sections = config.sections()
# XXX: defaults copy & pasted from elsewhere - refactoring needed
main = {'adapter': 'MySQL', 'wait': 0}
main.update(config.items(sections.pop(0)))
self.zodb = ((x, dict(config.items(x))) for x in sections)
self.compress = main.get('compress', 1)
self.db = buildDatabaseManager(main['adapter'],
(main['database'], main['wait']))
for x in """commit query erase getConfiguration _setConfiguration
getPartitionTable changePartitionTable getUnfinishedTIDList
dropUnfinishedData storeTransaction finishTransaction
storeData
""".split():
setattr(self, x, getattr(self.db, x))
def setNumPartitions(self, num_partitions):
self.db.setNumPartitions(num_partitions)
try:
del self._getPartition
except AttributeError:
pass
def close(self):
self.db.close()
if isinstance(self.zodb, list): # _setup called
for zodb in self.zodb:
zodb.close()
def _setup(self):
self.db._setup()
zodb_state = self.getConfiguration("zodb")
if zodb_state:
logging.warning("Ignoring configuration file for oid mapping."
" Reloading it from NEO storage.")
zodb = pickle.loads(zodb_state)
for k, v in self.zodb:
zodb[k].connect(v["storage"])
else:
zodb = {k: ZODB(**v) for k, v in self.zodb}
x, = (x for x in zodb.itervalues() if not x.oid)
x.setup(zodb)
self.setConfiguration("zodb", pickle.dumps(zodb))
self.zodb_index, self.zodb = zip(*sorted(
(x.shift_oid, x) for x in zodb.itervalues()))
self.zodb_ltid = max(x.ltid for x in self.zodb)
zodb = self.zodb[-1]
self.zodb_loid = zodb.shift_oid + zodb.next_oid - 1
self.zodb_tid = self.db.getLastTID(self.zodb_ltid) or 0
self._import = self._import()
def doOperation(self, app):
if self._import:
app.newTask(self._import)
def _import(self):
p64 = util.p64
u64 = util.u64
tid = p64(self.zodb_tid + 1)
zodb_list = []
for zodb in self.zodb:
try:
zodb_list.append(ZODBIterator(zodb, tid, p64(self.zodb_ltid)))
except StopIteration:
pass
tid = None
def finish():
if tid:
self.storeTransaction(tid, (), (oid_list,
str(txn.user), str(txn.description),
pickle.dumps(txn.extension), False, tid), False)
logging.debug("TXN %s imported (user=%r, desc=%r, len(oid)=%s)",
util.dump(tid), txn.user, txn.description, len(oid_list))
self.commit()
self.zodb_tid = u64(tid)
if self.compress:
from zlib import compress
else:
compress = None
compression = 0
while zodb_list:
zodb_list.sort()
z = zodb_list[0]
# Merge transactions with same tid. Only
# user/desc/ext from first ZODB are kept.
if tid != z.tid:
finish()
oid_list = []
txn = z.transaction
tid = txn.tid
yield 1
zodb = z.zodb
for r in z.transaction:
oid = p64(u64(r.oid) + zodb.shift_oid)
data_tid = r.data_txn
if data_tid or r.data is None:
data_id = None
else:
data = zodb.translate(r.data)
if compress:
compressed_data = compress(data)
compression = len(compressed_data) < len(data)
if compression:
data = compressed_data
checksum = util.makeChecksum(data)
data_id = self.storeData(util.makeChecksum(data), data,
compression)
# Write metadata before next yield. This may not be efficient
# but if they were written at the same time as the transaction,
# _pruneData could delete imported but not yet referenced data.
self.storeTransaction(tid, ((oid, data_id, data_tid),), (),
False)
oid_list.append(oid)
yield 1
try:
z.next()
except StopIteration:
del zodb_list[0]
finish()
logging.warning("All data are imported. You should change"
" your configuration to use the native backend and restart.")
self._import = None
for x in """getObject objectPresent getReplicationTIDList
""".split():
setattr(self, x, getattr(self.db, x))
def inZodb(self, oid, tid=None, before_tid=None):
return oid <= self.zodb_loid and (
self.zodb_tid < before_tid if before_tid else
tid is None or self.zodb_tid < tid <= self.zodb_ltid)
def zodbFromOid(self, oid):
zodb = self.zodb[bisect(self.zodb_index, oid) - 1]
return zodb, oid - zodb.shift_oid
def getLastIDs(self, all=True):
tid, _, _, oid = self.db.getLastIDs(all)
return (util.p64(max(tid, self.zodb_ltid)), None, None,
util.p64(max(oid, self.zodb_loid)))
def objectPresent(self, oid, tid, all=True):
r = self.db.objectPresent(oid, tid, all)
if not r:
u_oid = util.u64(oid)
u_tid = util.u64(tid)
if self.inZodb(u_oid, u_tid):
zodb, oid = self.zodbFromOid(u_oid)
try:
return zodb.loadSerial(util.p64(oid), tid)
except POSKeyError:
pass
def getObject(self, oid, tid=None, before_tid=None):
u64 = util.u64
u_oid = u64(oid)
u_tid = tid and u64(tid)
u_before_tid = before_tid and u64(before_tid)
db = self.db
if self.zodb_tid < (u_before_tid - 1 if before_tid else
u_tid or 0) <= self.zodb_ltid:
o = None
else:
o = db.getObject(oid, tid, before_tid)
if o and self.zodb_ltid < u64(o[0]) or \
not self.inZodb(u_oid, u_tid, u_before_tid):
return o
p64 = util.p64
zodb, z_oid = self.zodbFromOid(u_oid)
try:
value, serial, next_serial = zodb.loadBefore(p64(z_oid),
before_tid or (util.p64(u_tid + 1) if tid else MAX_TID))
except TypeError: # loadBefore returned None
return False
except POSKeyError:
assert not o, o
return o
if serial != tid:
if tid:
return False
u_tid = u64(serial)
if u_tid <= self.zodb_tid and o:
return o
if value:
value = zodb.translate(value)
checksum = util.makeChecksum(value)
else:
# CAVEAT: Although we think loadBefore should not return an empty
# value for a deleted object (see comment in NEO Storage),
# there's no need to distinguish this case in the above
# except clause because it would be crazy to import a
# NEO DB using this backend.
checksum = None
return (serial, next_serial or
db._getNextTID(db._getPartition(u_oid), u_oid, u_tid),
0, checksum, value, zodb.getDataTid(z_oid, u_tid))
def getTransaction(self, tid, all=False):
u64 = util.u64
if self.zodb_tid < u64(tid) <= self.zodb_ltid:
for zodb in self.zodb:
for txn in zodb.iterator(tid, tid):
p64 = util.p64
shift_oid = zodb.shift_oid
return ([p64(u64(x.oid) + shift_oid) for x in txn],
txn.user, txn.description,
pickle.dumps(txn.extension), 0, tid)
else:
return self.db.getTransaction(tid, all)
def getReplicationTIDList(self, min_tid, max_tid, length, partition):
p64 = util.p64
tid = p64(self.zodb_tid)
if min_tid <= tid:
r = self.db.getReplicationTIDList(min_tid, min(max_tid, tid),
length, partition)
if max_tid <= tid:
return r
length -= len(r)
min_tid = p64(self.zodb_tid + 1)
else:
r = []
if length:
tid = p64(self.zodb_ltid)
if min_tid <= tid:
u64 = util.u64
def next_tid(i):
for txn in i:
tid = u64(txn.tid)
if self._getPartition(tid) == partition:
insort(z, (-tid, i))
break
z = []
for zodb in self.zodb:
next_tid(zodb.iterator(min_tid, min(max_tid, tid)))
while z:
t, i = z.pop()
r.append(p64(-t))
length -= 1
if not length:
return r
next_tid(i)
if tid < max_tid:
r += self.db.getReplicationTIDList(max(min_tid, tid), max_tid,
length, partition)
return r
...@@ -81,6 +81,9 @@ class DatabaseManager(object): ...@@ -81,6 +81,9 @@ class DatabaseManager(object):
""" """
raise NotImplementedError raise NotImplementedError
def doOperation(self, app):
pass
def commit(self): def commit(self):
pass pass
...@@ -198,6 +201,14 @@ class DatabaseManager(object): ...@@ -198,6 +201,14 @@ class DatabaseManager(object):
node, and a cell state.""" node, and a cell state."""
raise NotImplementedError raise NotImplementedError
def getLastTID(self, max_tid):
"""Return greatest tid in trans table that is <= given 'max_tid'
Required only to import a DB using Importer backend.
max_tid must be in unpacked format.
"""
raise NotImplementedError
def _getLastIDs(self, all=True): def _getLastIDs(self, all=True):
raise NotImplementedError raise NotImplementedError
...@@ -228,6 +239,19 @@ class DatabaseManager(object): ...@@ -228,6 +239,19 @@ class DatabaseManager(object):
r = self.getObject(oid) r = self.getObject(oid)
return r and r[0] return r and r[0]
def _getNextTID(self, partition, oid, tid):
"""
partition (int)
Must be the result of (oid % self.getPartition(oid))
oid (int)
Identifier of object to retrieve.
tid (int)
Exact serial to retrieve.
If tid is the last revision of oid, None is returned.
"""
raise NotImplementedError
def _getObject(self, oid, tid=None, before_tid=None): def _getObject(self, oid, tid=None, before_tid=None):
""" """
oid (int) oid (int)
......
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
from binascii import a2b_hex from binascii import a2b_hex
import MySQLdb import MySQLdb
from MySQLdb import IntegrityError, OperationalError from MySQLdb import DataError, IntegrityError, OperationalError
from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST from MySQLdb.constants.CR import SERVER_GONE_ERROR, SERVER_LOST
from MySQLdb.constants.ER import DUP_ENTRY from MySQLdb.constants.ER import DATA_TOO_LONG, DUP_ENTRY
from array import array from array import array
from hashlib import sha1 from hashlib import sha1
import re import re
...@@ -237,16 +237,27 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -237,16 +237,27 @@ class MySQLDatabaseManager(DatabaseManager):
q = self.query q = self.query
e = self.escape e = self.escape
self._config[key] = value self._config[key] = value
key = e(str(key)) k = e(str(key))
if value is None: if value is None:
q("DELETE FROM config WHERE name = '%s'" % key) q("DELETE FROM config WHERE name = '%s'" % k)
else: return
value = e(str(value)) value = str(value)
q("REPLACE INTO config VALUES ('%s', '%s')" % (key, value)) sql = "REPLACE INTO config VALUES ('%s', '%s')" % (k, e(value))
try:
q(sql)
except DataError, (code, _):
if code != DATA_TOO_LONG or len(value) < 256 or key != "zodb":
raise
q("ALTER TABLE config MODIFY value VARBINARY(%s) NULL" % len(value))
q(sql)
def getPartitionTable(self): def getPartitionTable(self):
return self.query("SELECT * FROM pt") return self.query("SELECT * FROM pt")
def getLastTID(self, max_tid):
return self.query("SELECT MAX(tid) FROM trans WHERE tid<=%s"
% max_tid)[0][0]
def _getLastIDs(self, all=True): def _getLastIDs(self, all=True):
p64 = util.p64 p64 = util.p64
q = self.query q = self.query
...@@ -290,6 +301,12 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -290,6 +301,12 @@ class MySQLDatabaseManager(DatabaseManager):
% (self._getPartition(oid), oid)) % (self._getPartition(oid), oid))
return util.p64(r[0][0]) if r else None return util.p64(r[0][0]) if r else None
def _getNextTID(self, *args): # partition, oid, tid
r = self.query("SELECT tid FROM obj"
" WHERE partition=%d AND oid=%d AND tid>%d"
" ORDER BY tid LIMIT 1" % args)
return r[0][0] if r else None
def _getObject(self, oid, tid=None, before_tid=None): def _getObject(self, oid, tid=None, before_tid=None):
q = self.query q = self.query
partition = self._getPartition(oid) partition = self._getPartition(oid)
...@@ -309,10 +326,8 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -309,10 +326,8 @@ class MySQLDatabaseManager(DatabaseManager):
serial, compression, checksum, data, value_serial = r[0] serial, compression, checksum, data, value_serial = r[0]
except IndexError: except IndexError:
return None return None
r = q("SELECT tid FROM obj WHERE partition=%d AND oid=%d AND tid>%d" return (serial, self._getNextTID(partition, oid, serial),
" ORDER BY tid LIMIT 1" % (partition, oid, serial)) compression, checksum, data, value_serial)
return (serial, r[0][0] if r else None, compression, checksum, data,
value_serial)
def changePartitionTable(self, ptid, cell_list, reset=False): def changePartitionTable(self, ptid, cell_list, reset=False):
offset_list = [] offset_list = []
......
...@@ -208,6 +208,10 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -208,6 +208,10 @@ class SQLiteDatabaseManager(DatabaseManager):
def getPartitionTable(self): def getPartitionTable(self):
return self.query("SELECT * FROM pt") return self.query("SELECT * FROM pt")
def getLastTID(self, max_tid):
return self.query("SELECT MAX(tid) FROM trans WHERE tid<=?",
(max_tid,)).next()[0]
def _getLastIDs(self, all=True): def _getLastIDs(self, all=True):
p64 = util.p64 p64 = util.p64
q = self.query q = self.query
...@@ -252,6 +256,12 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -252,6 +256,12 @@ class SQLiteDatabaseManager(DatabaseManager):
(self._getPartition(oid), oid)).fetchone() (self._getPartition(oid), oid)).fetchone()
return r and util.p64(r[0]) return r and util.p64(r[0])
def _getNextTID(self, *args): # partition, oid, tid
r = self.query("""SELECT tid FROM obj
WHERE partition=? AND oid=? AND tid>?
ORDER BY tid LIMIT 1""", args).fetchone()
return r and r[0]
def _getObject(self, oid, tid=None, before_tid=None): def _getObject(self, oid, tid=None, before_tid=None):
q = self.query q = self.query
partition = self._getPartition(oid) partition = self._getPartition(oid)
...@@ -269,14 +279,11 @@ class SQLiteDatabaseManager(DatabaseManager): ...@@ -269,14 +279,11 @@ class SQLiteDatabaseManager(DatabaseManager):
serial, compression, checksum, data, value_serial = r.fetchone() serial, compression, checksum, data, value_serial = r.fetchone()
except TypeError: except TypeError:
return None return None
r = q("""SELECT tid FROM obj
WHERE partition=? AND oid=? AND tid>?
ORDER BY tid LIMIT 1""",
(partition, oid, serial)).fetchone()
if checksum: if checksum:
checksum = str(checksum) checksum = str(checksum)
data = str(data) data = str(data)
return serial, r and r[0], compression, checksum, data, value_serial return (serial, self._getNextTID(partition, oid, serial),
compression, checksum, data, value_serial)
def changePartitionTable(self, ptid, cell_list, reset=False): def changePartitionTable(self, ptid, cell_list, reset=False):
q = self.query q = self.query
......
#
# Copyright (C) 2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, stat, time
from persistent import Persistent
from persistent.TimeStamp import TimeStamp
from BTrees.OOBTree import OOBTree
class Inode(OOBTree):
data = None
def __init__(self, up=None, mode=stat.S_IFDIR):
self[os.pardir] = self if up is None else up
self.mode = mode
self.mtime = time.time()
def __getstate__(self):
return Persistent.__getstate__(self), OOBTree.__getstate__(self)
def __setstate__(self, state):
Persistent.__setstate__(self, state[0])
OOBTree.__setstate__(self, state[1])
def edit(self, data=None, mtime=None):
fmt = stat.S_IFMT(self.mode)
if data is None:
assert fmt == stat.S_IFDIR, oct(fmt)
else:
assert fmt == stat.S_IFREG or fmt == stat.S_IFLNK, oct(fmt)
if self.data != data:
self.data = data
if self.mtime != mtime:
self.mtime = mtime or time.time()
def root(self):
try:
self = self[os.pardir]
except KeyError:
return self
return self.root()
def traverse(self, path, followlinks=True):
path = iter(path.split(os.sep) if isinstance(path, basestring) and path
else path)
for d in path:
if not d:
return self.root().traverse(path, followlinks)
if d != os.curdir:
d = self[d]
if followlinks and stat.S_ISLNK(d.mode):
d = self.traverse(d.data, True)
return d.traverse(path, followlinks)
return self
def inodeFromFs(self, path):
s = os.lstat(path)
mode = s.st_mode
name = os.path.basename(path)
try:
i = self[name]
assert stat.S_IFMT(i.mode) == stat.S_IFMT(mode)
changed = False
except KeyError:
i = self[name] = self.__class__(self, mode)
changed = True
i.edit(open(path).read() if stat.S_ISREG(mode) else
os.readlink(p) if stat.S_ISLNK(mode) else
None, s.st_mtime)
return changed or i._p_changed
def treeFromFs(self, path, yield_interval=None, filter=None):
prefix_len = len(path) + len(os.sep)
n = 0
for dirpath, dirnames, filenames in os.walk(path):
inodeFromFs = self.traverse(dirpath[prefix_len:]).inodeFromFs
for names in dirnames, filenames:
skipped = []
for j, name in enumerate(names):
p = os.path.join(dirpath, name)
if filter and not filter(p[prefix_len:]):
skipped.append(j)
elif inodeFromFs(p):
n += 1
if n == yield_interval:
n = 0
yield self
while skipped:
del names[skipped.pop()]
if n:
yield self
def walk(self):
s = [(None, self)]
while s:
top, self = s.pop()
dirs = []
nondirs = []
for name, inode in self.iteritems():
if name != os.pardir:
(dirs if stat.S_ISDIR(inode.mode) else nondirs).append(name)
yield top or os.curdir, dirs, nondirs
for name in dirs:
s.append((os.path.join(top, name) if top else name, self[name]))
...@@ -30,6 +30,7 @@ import tempfile ...@@ -30,6 +30,7 @@ import tempfile
import traceback import traceback
import threading import threading
import psutil import psutil
from ConfigParser import SafeConfigParser
import neo.scripts import neo.scripts
from neo.neoctl.neoctl import NeoCTL, NotReadyException from neo.neoctl.neoctl import NeoCTL, NotReadyException
...@@ -238,7 +239,7 @@ class NEOCluster(object): ...@@ -238,7 +239,7 @@ class NEOCluster(object):
cleanup_on_delete=False, temp_dir=None, clear_databases=True, cleanup_on_delete=False, temp_dir=None, clear_databases=True,
adapter=os.getenv('NEO_TESTS_ADAPTER'), adapter=os.getenv('NEO_TESTS_ADAPTER'),
address_type=ADDRESS_TYPE, bind_ip=None, logger=True, address_type=ADDRESS_TYPE, bind_ip=None, logger=True,
): importer=None):
if not adapter: if not adapter:
adapter = 'MySQL' adapter = 'MySQL'
self.adapter = adapter self.adapter = adapter
...@@ -263,6 +264,21 @@ class NEOCluster(object): ...@@ -263,6 +264,21 @@ class NEOCluster(object):
self.local_ip = local_ip = bind_ip or \ self.local_ip = local_ip = bind_ip or \
IP_VERSION_FORMAT_DICT[self.address_type] IP_VERSION_FORMAT_DICT[self.address_type]
self.setupDB(clear_databases) self.setupDB(clear_databases)
if importer:
cfg = SafeConfigParser()
cfg.add_section("neo")
cfg.set("neo", "adapter", adapter)
cfg.set("neo", "database", self.db_template(*db_list))
for name, zodb in importer:
cfg.add_section(name)
for x in zodb.iteritems():
cfg.set(name, *x)
importer_conf = os.path.join(temp_dir, 'importer.cfg')
with open(importer_conf, 'w') as f:
cfg.write(f)
adapter = "Importer"
self.db_template = str
db_list = importer_conf,
self.process_dict = {} self.process_dict = {}
self.temp_dir = temp_dir self.temp_dir = temp_dir
self.port_allocator = PortAllocator() self.port_allocator = PortAllocator()
......
...@@ -22,6 +22,7 @@ import socket ...@@ -22,6 +22,7 @@ import socket
from struct import pack from struct import pack
from neo.neoctl.neoctl import NeoCTL from neo.neoctl.neoctl import NeoCTL
from neo.lib.util import makeChecksum, u64
from ZODB.FileStorage import FileStorage from ZODB.FileStorage import FileStorage
from ZODB.POSException import ConflictError from ZODB.POSException import ConflictError
from ZODB.tests.StorageTestBase import zodb_pickle from ZODB.tests.StorageTestBase import zodb_pickle
...@@ -168,9 +169,7 @@ class ClientTests(NEOFunctionalTest): ...@@ -168,9 +169,7 @@ class ClientTests(NEOFunctionalTest):
name = os.path.join(self.getTempDirectory(), 'data.fs') name = os.path.join(self.getTempDirectory(), 'data.fs')
if reset and os.path.exists(name): if reset and os.path.exists(name):
os.remove(name) os.remove(name)
storage = FileStorage(file_name=name) return FileStorage(file_name=name)
db = ZODB.DB(storage=storage)
return (db, storage)
def __populate(self, db, tree_size=TREE_SIZE): def __populate(self, db, tree_size=TREE_SIZE):
if isinstance(db.storage, FileStorage): if isinstance(db.storage, FileStorage):
...@@ -204,7 +203,8 @@ class ClientTests(NEOFunctionalTest): ...@@ -204,7 +203,8 @@ class ClientTests(NEOFunctionalTest):
def testImport(self): def testImport(self):
# source database # source database
dfs_db, dfs_storage = self.__getDataFS() dfs_storage = self.__getDataFS()
dfs_db = ZODB.DB(dfs_storage)
self.__populate(dfs_db) self.__populate(dfs_db)
# create a neo storage # create a neo storage
...@@ -213,28 +213,48 @@ class ClientTests(NEOFunctionalTest): ...@@ -213,28 +213,48 @@ class ClientTests(NEOFunctionalTest):
# copy data fs to neo # copy data fs to neo
neo_storage.copyTransactionsFrom(dfs_storage, verbose=0) neo_storage.copyTransactionsFrom(dfs_storage, verbose=0)
dfs_db.close()
# check neo content # check neo content
(neo_db, neo_conn) = self.neo.getZODBConnection() (neo_db, neo_conn) = self.neo.getZODBConnection()
self.__checkTree(neo_conn.root()['trees']) self.__checkTree(neo_conn.root()['trees'])
def __dump(self, storage):
return {u64(t.tid): [(u64(o.oid), o.data_txn and u64(o.data_txn),
None if o.data is None else makeChecksum(o.data))
for o in t]
for t in storage.iterator()}
def testExport(self): def testExport(self):
# create a neo storage # create a neo storage
self.neo.start() self.neo.start()
(neo_db, neo_conn) = self.neo.getZODBConnection() (neo_db, neo_conn) = self.neo.getZODBConnection()
self.__populate(neo_db) self.__populate(neo_db)
dump = self.__dump(neo_db.storage)
# copy neo to data fs # copy neo to data fs
dfs_db, dfs_storage = self.__getDataFS(reset=True) dfs_storage = self.__getDataFS(reset=True)
neo_storage = self.neo.getZODBStorage() neo_storage = self.neo.getZODBStorage()
dfs_storage.copyTransactionsFrom(neo_storage) dfs_storage.copyTransactionsFrom(neo_storage)
# check data fs content # check data fs content
conn = dfs_db.open() dfs_db = ZODB.DB(dfs_storage)
root = conn.root() root = dfs_db.open().root()
self.__checkTree(root['trees']) self.__checkTree(root['trees'])
dfs_db.close()
self.neo.stop()
self.neo = NEOCluster(db_list=['test_neo1'], partitions=3,
importer=[("root", {
"storage": "<filestorage>\npath %s\n</filestorage>"
% dfs_storage.getName()})],
temp_dir=self.getTempDirectory())
self.neo.start()
neo_db, neo_conn = self.neo.getZODBConnection()
self.__checkTree(neo_conn.root()['trees'])
self.assertEqual(dump, self.__dump(neo_db.storage))
def testLockTimeout(self): def testLockTimeout(self):
""" Hold a lock on an object to block a second transaction """ """ Hold a lock on an object to block a second transaction """
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
import os, random, socket, sys, tempfile, threading, time, types, weakref import os, random, socket, sys, tempfile, threading, time, types, weakref
import traceback import traceback
from collections import deque from collections import deque
from ConfigParser import SafeConfigParser
from contextlib import contextmanager from contextlib import contextmanager
from itertools import count from itertools import count
from functools import wraps from functools import wraps
...@@ -545,7 +546,8 @@ class NEOCluster(object): ...@@ -545,7 +546,8 @@ class NEOCluster(object):
def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None, def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'), adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
storage_count=None, db_list=None, clear_databases=True, storage_count=None, db_list=None, clear_databases=True,
db_user=DB_USER, db_password='', compress=True): db_user=DB_USER, db_password='', compress=True,
importer=None):
self.name = 'neo_%s' % self._allocate('name', self.name = 'neo_%s' % self._allocate('name',
lambda: random.randint(0, 100)) lambda: random.randint(0, 100))
master_list = [MasterApplication.newAddress() master_list = [MasterApplication.newAddress()
...@@ -573,6 +575,19 @@ class NEOCluster(object): ...@@ -573,6 +575,19 @@ class NEOCluster(object):
db = os.path.join(getTempDirectory(), '%s.sqlite') db = os.path.join(getTempDirectory(), '%s.sqlite')
else: else:
assert False, adapter assert False, adapter
if importer:
cfg = SafeConfigParser()
cfg.add_section("neo")
cfg.set("neo", "adapter", adapter)
cfg.set("neo", "database", db % tuple(db_list))
for name, zodb in importer:
cfg.add_section(name)
for x in zodb.iteritems():
cfg.set(name, *x)
db = os.path.join(getTempDirectory(), '%s.conf')
with open(db % tuple(db_list), "w") as f:
cfg.write(f)
kw["getAdapter"] = "Importer"
self.storage_list = [StorageApplication(getDatabase=db % x, **kw) self.storage_list = [StorageApplication(getDatabase=db % x, **kw)
for x in db_list] for x in db_list]
self.admin_list = [AdminApplication(**kw)] self.admin_list = [AdminApplication(**kw)]
......
#
# Copyright (C) 2014 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from collections import deque
from itertools import islice, izip_longest
import os, time, unittest
import neo, transaction, ZODB
from neo.lib import logging
from neo.lib.util import u64
from ..fs2zodb import Inode
from .. import getTempDirectory
from . import NEOCluster, NEOThreadedTest
from ZODB.FileStorage import FileStorage
class ImporterTests(NEOThreadedTest):
def test(self):
importer = []
fs_dir = os.path.join(getTempDirectory(), self.id())
os.mkdir(fs_dir)
src_root, = neo.__path__
fs_list = "root", "client", "master", "tests"
def root_filter(name):
if not name.endswith(".pyc"):
i = name.find(os.sep)
return i < 0 or name[:i] not in fs_list
def sub_filter(name):
return lambda n: n[-4:] != '.pyc' and \
n.split(os.sep, 1)[0] in (name, "scripts")
conn_list = []
iter_list = []
# Setup several FileStorage databases.
for i, name in enumerate(fs_list):
fs_path = os.path.join(fs_dir, name + ".fs")
c = ZODB.DB(FileStorage(fs_path)).open()
r = c.root()["neo"] = Inode()
transaction.commit()
conn_list.append(c)
iter_list.append(r.treeFromFs(src_root, 10,
sub_filter(name) if i else root_filter))
importer.append((name, {
"storage": "<filestorage>\npath %s\n</filestorage>" % fs_path
}))
# Populate FileStorage databases.
for iter_list in izip_longest(*iter_list):
for i in iter_list:
if i:
transaction.commit()
del iter_list
# Get oids of mount points and close.
for (name, cfg), c in zip(importer, conn_list):
r = c.root()["neo"]
if name == "root":
for name in fs_list[1:]:
cfg[name] = str(u64(r[name]._p_oid))
else:
cfg["oid"] = str(u64(r[name]._p_oid))
c.db().close()
#del importer[0][1][importer.pop()[0]]
# Start NEO cluster with transparent import of a multi-base ZODB.
cluster = NEOCluster(compress=False, importer=importer)
try:
# Suspend import for a while, so that import
# is finished in the middle of the below 'for' loop.
# Use a slightly different main loop for storage so that it
# does not import data too fast and we test read/write access
# by the client during the import.
dm = cluster.storage.dm
def doOperation(app):
del dm.doOperation
try:
while True:
if app.task_queue:
app.task_queue[-1].next()
app._poll()
except StopIteration:
app.task_queue.pop()
dm.doOperation = doOperation
cluster.start()
t, c = cluster.getTransaction()
r = c.root()["neo"]
i = r.walk()
next(islice(i, 9, None))
dm.doOperation(cluster.storage) # resume
deque(i, maxlen=0)
last_import = None
for i, r in enumerate(r.treeFromFs(src_root, 10)):
t.commit()
if cluster.storage.dm._import:
last_import = i
self.assertTrue(last_import and not cluster.storage.dm._import)
i = len(src_root) + 1
self.assertEqual(sorted(r.walk()), sorted(
(x[i:] or '.', sorted(y), sorted(z))
for x, y, z in os.walk(src_root)))
t.commit()
finally:
cluster.stop()
if __name__ == "__main__":
unittest.main()
...@@ -23,13 +23,16 @@ if not os.path.exists('mock.py'): ...@@ -23,13 +23,16 @@ if not os.path.exists('mock.py'):
raise EnvironmentError("MD5 checksum mismatch downloading 'mock.py'") raise EnvironmentError("MD5 checksum mismatch downloading 'mock.py'")
open('mock.py', 'w').write(mock_py) open('mock.py', 'w').write(mock_py)
zodb_require = ['ZODB3>=3.10', 'ZODB3<3.11dev']
extras_require = { extras_require = {
'admin': [], 'admin': [],
'client': ['ZODB3>=3.10', 'ZODB3<3.11dev'], 'client': zodb_require,
'ctl': [], 'ctl': [],
'master': [], 'master': [],
'storage-sqlite': [], 'storage-sqlite': [],
'storage-mysqldb': ['MySQL-python'], 'storage-mysqldb': ['MySQL-python'],
'storage-importer': zodb_require,
} }
extras_require['tests'] = ['zope.testing', 'psutil>=2', extras_require['tests'] = ['zope.testing', 'psutil>=2',
'neoppod[%s]' % ', '.join(extras_require)] 'neoppod[%s]' % ', '.join(extras_require)]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment