Commit ed7ee13b authored by Kirill Smelkov's avatar Kirill Smelkov

Merge tag 'v1.10'

NEO 1.10

* tag 'v1.10': (55 commits)
  Release version 1.10
  Maximize resiliency by taking into account the topology of storage nodes
  storage: also commit updated cell TID at each replicated chunk of 'obj' records
  storage: skip useless work when unlocking transactions
  qa: flush logs at the end of each test when -L is not used
  qa: add a log in case that a mysterious bug happens again
  storage: clarify log about data deletion of discarded cells
  debug: new example to run the profiler for 1 minute
  mysql: fix replication of big oids (> 16M)
  tests/cluster: speedup waiting a bit
  protocol: update packet docstrings
  Bump protocol version
  protocol: a single byte is more than enough to encode enums
  protocol: small cleanup in packet registration
  Optimize resumption of replication by starting from a greater TID
  importer: update comment about a workaround for ZODB3
  Micro-optimization of p64/u64
  qa: add a log in testBackupNodeLost for easier debugging
  Document that the bug when checking replicas may also cause the master to crash
  storage: stop logging 'Abort TXN' for txn that have been locked
  storage: split _migrate2() for reusable _alterTable()
  qa: new testStorageUpgrade
  qa: update testStorageUpgrade data for what is not automatically upgraded
  qa: original data for the future testStorageUpgrade
  sqlite: fix indexes of upgraded db
  importer: fix NameError when recovering during tpc_finish
  fixup! importer: fetch and process the data to import in a separate process
  Serialize empty transaction extension with an empty string
  client: fix partial import from a source storage
  qa: give a title to subprocesses of functional tests
  importer: give a title to the 'import' and 'writeback' subprocesses
  importer: fetch and process the data to import in a separate process
  importer: new option to write back new transactions to the source database
  importer: log when the transaction index for FileStorage DB is built
  importer: open imported zodb in read-only whenever possible
  fixup! mysql: fix remaining places where a server disconnection was not catched
  fixup! storage: speed up replication by sending bigger network packets
  mysql: do not full-scan for duplicates of big oids if deduplication is disabled
  mysql: fix remaining places where a server disconnection was not catched
  fixup! Add support for custom compression levels
  importer: reenable compression by default
  qa: review testImporter
  qa: remove a few uses of 'chr'
  Fix a few issues with ZODB5
  importer: small code cleanup in speedupFileStorageTxnLookup patch
  importer: do not trigger speedupFileStorageTxnLookup uselessly
  Add support for custom compression levels
  setup: update MANIFEST.in
  importer: do not checksum data twice
  client: store uncompressed if compressed size is equal
  fixup! master: automatically discard feeding cells that get out-of-date
  master: automatically discard feeding cells that get out-of-date
  qa: remove useless indentation in testSafeTweak
  bench: new option to mesure ZEO perfs in matrix test
  bench: reduce number of partitions in matrix test
  storage: fix replication of creation undone
parents 27df6fe8 1ef5c1ba
......@@ -16,6 +16,19 @@ This happens in the following conditions:
4. the cell is checked completely before it could replicate up to the max tid
to check
Sometimes, it causes the master to crash::
File "neo/lib/handler.py", line 72, in dispatch
method(conn, *args, **kw)
File "neo/master/handlers/storage.py", line 93, in notifyReplicationDone
cell_list = app.backup_app.notifyReplicationDone(node, offset, tid)
File "neo/master/backup_app.py", line 337, in notifyReplicationDone
assert cell.isReadable()
AssertionError
Workaround: make sure all cells are up-to-date before checking replicas.
Found by running testBackupNodeLost many times.
Found by running testBackupNodeLost many times:
- either a failureException: 12 != 11
- or the above assert failure, in which case the unit test freezes
Change History
==============
1.10 (2018-07-16)
-----------------
A important performance improvement is that the replication now remembers where
it was interrupted: a storage node that gets disconnected for a short time now
gets fully operational quite instantaneously because it only has to replicate
the new data. Before, the time to recover depended on the size of the DB, just
to verify that most of the data are already transferred.
As a small optimization, an empty transaction extension is now serialized with
an empty string.
The above 2 changes required a bump of the protocol version, as well as an
upgrade of the storage format. Once upgraded (this is done automatically as
usual), databases can't be opened anymore by older versions of NEO.
Other general changes:
- Add support for custom compression levels.
- Maximize resiliency by taking into account the topology of storage nodes.
- Fix a few issues with ZODB5. Note however that merging several DB with the
Importer backend only works if they were only used with ZODB < 5.
Master:
- Automatically discard feeding cells that get out-of-date.
Client:
- Fix partial import from a source storage.
- Store uncompressed if compressed size is equal.
Storage:
- Fixed v1.9 code that sped up the replication by sending bigger network
packets.
- Fix replication of creation undone.
- Stop logging 'Abort TXN' for txn that have been locked.
- Clarify log about data deletion of discarded cells.
MySQL backend:
- Fix replication of big OIDs (> 16M).
- Do not full-scan for duplicates of big OIDs if deduplication is disabled.
- Fix remaining places where a server disconnection was not catched.
SQlite backend:
- Fix indexes of upgraded databases.
Importer backend:
- Fetch and process the data to import in a separate process. It is even
usually free to use the best compression level.
- New option to write back new transactions to the source database.
See 'importer.conf' for more information.
- Give a title to the 'import' and 'writeback' subprocesses,
if the 'setproctitle' egg is installed.
- Log when the transaction index for FileStorage DB is built.
- Open imported database in read-only whenever possible.
- Do not trigger speedupFileStorageTxnLookup uselessly.
- Do not checksum data twice.
- Fix NameError when recovering during tpc_finish.
1.9 (2018-03-13)
----------------
......
graft tools
include neo.conf CHANGELOG.rst TODO TESTS.txt ZODB3.patch
include neo.conf CHANGELOG.rst TODO ZODB3.patch
......@@ -45,6 +45,12 @@
# (instead of adapter=Importer & database=/path_to_this_file).
adapter=MySQL
database=neo
# Keep writing back new transactions to the source database, provided it is
# not splitted. In case of any issue, the import can be aborted without losing
# data. Note however that it is asynchronous so don't stop the storage node
# too quickly after the last committed transaction (e.g. check with tools like
# fstail).
writeback=true
# The other sections are for source databases.
[root]
......@@ -52,7 +58,8 @@ database=neo
# ZEO is possible but less efficient: ZEO servers must be stopped
# if NEO opens FileStorage DBs directly.
# Note that NEO uses 'new_oid' method to get the last OID, that's why the
# source DB can't be open read-only. NEO never modifies a FileStorage DB.
# source DB can't be open read-only. Unless 'writeback' is enabled, NEO never
# modifies a FileStorage DB.
storage=
<filestorage>
path /path/to/root.fs
......
......@@ -160,11 +160,7 @@ class Storage(BaseStorage.BaseStorage,
def copyTransactionsFrom(self, source, verbose=False):
""" Zope compliant API """
return self.importFrom(source)
def importFrom(self, source, start=None, stop=None, preindex=None):
""" Allow import only a part of the source storage """
return self.app.importFrom(self, source, start, stop, preindex)
return self.app.importFrom(self, source)
def pack(self, t, referencesf, gc=False):
if gc:
......
......@@ -14,11 +14,14 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from cPickle import dumps, loads
from zlib import compress, decompress
import heapq
import time
try:
from ZODB._compat import dumps, loads, _protocol
except ImportError:
from cPickle import dumps, loads
_protocol = 1
from ZODB.POSException import UndoError, ConflictError, ReadConflictError
from . import OLD_ZODB
if OLD_ZODB:
......@@ -26,6 +29,7 @@ if OLD_ZODB:
from persistent.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.compress import decompress_list, getCompress
from neo.lib.protocol import NodeTypes, Packets, \
INVALID_PARTITION, MAX_TID, ZERO_HASH, ZERO_TID
from neo.lib.util import makeChecksum, dump
......@@ -50,7 +54,6 @@ if SignalHandler:
import signal
SignalHandler.registerHandler(signal.SIGUSR2, logging.reopen)
class Application(ThreadedApplication):
"""The client node application."""
......@@ -99,7 +102,7 @@ class Application(ThreadedApplication):
# _connecting_to_master_node is used to prevent simultaneous master
# node connection attempts
self._connecting_to_master_node = Lock()
self.compress = compress
self.compress = getCompress(compress)
def __getattr__(self, attr):
if attr in ('last_tid', 'pt'):
......@@ -215,7 +218,7 @@ class Application(ThreadedApplication):
node=node,
dispatcher=self.dispatcher)
p = Packets.RequestIdentification(
NodeTypes.CLIENT, self.uuid, None, self.name, None)
NodeTypes.CLIENT, self.uuid, None, self.name, (), None)
try:
ask(conn, p, handler=handler)
except ConnectionClosed:
......@@ -388,7 +391,7 @@ class Application(ThreadedApplication):
logging.error('wrong checksum from %s for oid %s',
conn, dump(oid))
raise NEOStorageReadRetry(False)
return (decompress(data) if compression else data,
return (decompress_list[compression](data),
tid, next_tid, data_tid)
raise NEOStorageCreationUndoneError(dump(oid))
return self._askStorageForRead(oid,
......@@ -435,17 +438,7 @@ class Application(ThreadedApplication):
checksum = ZERO_HASH
else:
assert data_serial is None
size = len(data)
if self.compress:
compressed_data = compress(data)
if size < len(compressed_data):
compressed_data = data
compression = 0
else:
compression = 1
else:
compression = 0
compressed_data = data
size, compression, compressed_data = self.compress(data)
checksum = makeChecksum(compressed_data)
txn_context.data_size += size
# Store object in tmp cache
......@@ -554,9 +547,12 @@ class Application(ThreadedApplication):
txn_context = self._txn_container.get(transaction)
self.waitStoreResponses(txn_context)
ttid = txn_context.ttid
ext = transaction._extension
ext = dumps(ext, _protocol) if ext else ''
# user and description are cast to str in case they're unicode.
# BBB: This is not required anymore with recent ZODB.
packet = Packets.AskStoreTransaction(ttid, str(transaction.user),
str(transaction.description), dumps(transaction._extension),
txn_context.cache_dict)
str(transaction.description), ext, txn_context.cache_dict)
queue = txn_context.queue
involved_nodes = txn_context.involved_nodes
# Ask in parallel all involved storage nodes to commit object metadata.
......@@ -786,10 +782,6 @@ class Application(ThreadedApplication):
self.waitStoreResponses(txn_context)
return None, txn_oid_list
def _insertMetadata(self, txn_info, extension):
for k, v in loads(extension).items():
txn_info[k] = v
def _getTransactionInformation(self, tid):
return self._askStorageForRead(tid,
Packets.AskTransactionInformation(tid))
......@@ -829,7 +821,8 @@ class Application(ThreadedApplication):
if filter is None or filter(txn_info):
txn_info.pop('packed')
txn_info.pop("oids")
self._insertMetadata(txn_info, txn_ext)
if txn_ext:
txn_info.update(loads(txn_ext))
append(txn_info)
if len(undo_info) >= last - first:
break
......@@ -857,7 +850,7 @@ class Application(ThreadedApplication):
tid = None
for tid in tid_list:
(txn_info, txn_ext) = self._getTransactionInformation(tid)
txn_info['ext'] = loads(txn_ext)
txn_info['ext'] = loads(txn_ext) if txn_ext else {}
append(txn_info)
return (tid, txn_list)
......@@ -876,23 +869,29 @@ class Application(ThreadedApplication):
txn_info['size'] = size
if filter is None or filter(txn_info):
result.append(txn_info)
self._insertMetadata(txn_info, txn_ext)
if txn_ext:
txn_info.update(loads(txn_ext))
return result
def importFrom(self, storage, source, start, stop, preindex=None):
def importFrom(self, storage, source):
# TODO: The main difference with BaseStorage implementation is that
# preindex can't be filled with the result 'store' (tid only
# known after 'tpc_finish'. This method could be dropped if we
# implemented IStorageRestoreable (a wrapper around source would
# still be required for partial import).
if preindex is None:
preindex = {}
for transaction in source.iterator(start, stop):
for transaction in source.iterator():
tid = transaction.tid
self.tpc_begin(storage, transaction, tid, transaction.status)
for r in transaction:
oid = r.oid
pre = preindex.get(oid)
try:
pre = preindex[oid]
except KeyError:
try:
pre = self.load(oid)[1]
except NEOStorageNotFoundError:
pre = ZERO_TID
self.store(oid, pre, r.data, r.version, transaction)
preindex[oid] = tid
conflicted = self.tpc_vote(transaction)
......
......@@ -14,10 +14,14 @@
Give the name of the cluster
</description>
</key>
<key name="compress" datatype="boolean">
<key name="compress" datatype=".compress">
<description>
If true, data is automatically compressed (unless compressed size is
not smaller). This is the default behaviour.
The value is either of 'boolean' type or an explicit algorithm that
matches the regex 'zlib(=\d+)?', where the optional number is
the compression level.
Any record that is not smaller once compressed is stored uncompressed.
True is the default and its meaning may change over time:
currently, it is the same as 'zlib'.
</description>
</key>
<key name="read-only" datatype="boolean">
......
......@@ -23,3 +23,11 @@ class NeoStorage(BaseConfig):
config = self.config
return Storage(**{k: getattr(config, k)
for k in config.getSectionAttributes()})
def compress(value):
from ZConfig.datatypes import asBoolean
try:
return asBoolean(value)
except ValueError:
from neo.lib.compress import parseOption
return parseOption(value)
......@@ -14,10 +14,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from zlib import decompress
from ZODB.TimeStamp import TimeStamp
from neo.lib import logging
from neo.lib.compress import decompress_list
from neo.lib.protocol import Packets, uuid_str
from neo.lib.util import dump, makeChecksum
from neo.lib.exception import NodeNotReady
......@@ -129,8 +129,7 @@ class StorageAnswersHandler(AnswerBaseHandler):
'wrong checksum while getting back data for'
' object %s during rebase of transaction %s'
% (dump(oid), dump(txn_context.ttid)))
if compression:
data = decompress(data)
data = decompress_list[compression](data)
size = len(data)
txn_context.data_size += size
if cached:
......
......@@ -47,7 +47,7 @@ class ConnectionPool(object):
conn = MTClientConnection(app, app.storage_event_handler, node,
dispatcher=app.dispatcher)
p = Packets.RequestIdentification(NodeTypes.CLIENT,
app.uuid, None, app.name, app.id_timestamp)
app.uuid, None, app.name, (), app.id_timestamp)
try:
app._ask(conn, p, handler=app.storage_bootstrap_handler)
except ConnectionClosed:
......
......@@ -164,3 +164,17 @@ elif IF == 'frames':
write("Thread %s:\n" % thread_id)
traceback.print_stack(frame)
write("End of dump\n")
elif IF == 'profile':
DURATION = 60
def stop(prof, path):
prof.disable()
prof.dump_stats(path)
@defer
def profile(app):
import cProfile, threading, time
from .lib.protocol import uuid_str
path = 'neo-%s-%s.prof' % (uuid_str(app.uuid), time.time())
prof = cProfile.Profile()
threading.Timer(DURATION, stop, (prof, path)).start()
prof.enable()
......@@ -26,13 +26,14 @@ class BootstrapManager(EventHandler):
Manage the bootstrap stage, lookup for the primary master then connect to it
"""
def __init__(self, app, node_type, server=None):
def __init__(self, app, node_type, server=None, devpath=()):
"""
Manage the bootstrap stage of a non-master node, it lookup for the
primary master node, connect to it then returns when the master node
is ready.
"""
self.server = server
self.devpath = devpath
self.node_type = node_type
self.num_replicas = None
self.num_partitions = None
......@@ -43,7 +44,7 @@ class BootstrapManager(EventHandler):
def connectionCompleted(self, conn):
EventHandler.connectionCompleted(self, conn)
conn.ask(Packets.RequestIdentification(self.node_type, self.uuid,
self.server, self.app.name, None))
self.server, self.app.name, self.devpath, None))
def connectionFailed(self, conn):
EventHandler.connectionFailed(self, conn)
......
#
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import zlib
decompress_list = (
lambda data: data,
zlib.decompress,
)
def parseOption(value):
x = value.split('=', 1)
try:
alg = ('zlib',).index(x[0])
if len(x) == 1:
return alg, None
level = int(x[1])
except Exception:
raise ValueError("not a valid 'compress' option: %r" % value)
if 0 < level <= zlib.Z_BEST_COMPRESSION:
return alg, level
raise ValueError("invalid compression level: %r" % level)
def getCompress(value):
if value:
alg, level = (0, None) if value is True else value
_compress = zlib.compress
if level:
zlib_compress = _compress
_compress = lambda data: zlib_compress(data, level)
alg += 1
assert 0 < alg < len(decompress_list), 'invalid compression algorithm'
def compress(data):
size = len(data)
compressed = _compress(data)
if len(compressed) < size:
return size, alg, compressed
return size, 0, data
compress._compress = _compress # for testBasicStore
return compress
return lambda data: (len(data), 0, data)
......@@ -34,6 +34,7 @@ class SocketConnector(object):
is_closed = is_server = None
connect_limit = {}
CONNECT_LIMIT = 1 # XXX actually this is (RE-)CONNECT_THROTTLE
SOMAXCONN = 5 # for threaded tests
def __new__(cls, addr, s=None):
if s is None:
......@@ -78,6 +79,7 @@ class SocketConnector(object):
def queue(self, data):
was_empty = not self.queued
self.queued += data
for data in data:
self.queue_size += len(data)
return was_empty
......@@ -123,7 +125,7 @@ class SocketConnector(object):
try:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._bind(self.addr)
self.socket.listen(5)
self.socket.listen(self.SOMAXCONN)
except socket.error, e:
self.socket.close()
self._error('listen', e)
......
......@@ -26,9 +26,6 @@ class PrimaryFailure(NeoException):
class StoppedOperation(NeoException):
pass
class DatabaseFailure(NeoException):
pass
class NodeNotReady(NeoException):
pass
......@@ -22,14 +22,13 @@ def check_signature(reference, function):
a, b, c, d = inspect.getargspec(function)
x = len(A) - len(a)
if x < 0: # ignore extra default parameters
if x + len(d) < 0:
if B or x + len(d) < 0:
return False
del a[x:]
d = d[:x] or None
elif x: # different signature
# We have no need yet to support methods with default parameters.
return a == A[:-x] and (b or a and c) and not (d or D)
return a == A and b == B and c == C and d == D
return a == A[:-x] and (b or a and c) and (d or ()) == (D or ())[:-x]
return a == A and (b or not B) and (c or not C) and d == D
def implements(obj, ignore=()):
ignore = set(ignore)
......@@ -55,7 +54,7 @@ def implements(obj, ignore=()):
while 1:
name, func = base.pop()
x = getattr(obj, name)
if x.im_class is tobj:
if type(getattr(x, '__self__', None)) is tobj:
x = x.__func__
if x is func:
try:
......
......@@ -290,3 +290,16 @@ class NEOLogger(Logger):
logging = NEOLogger()
signal.signal(signal.SIGRTMIN, lambda signum, frame: logging.flush())
signal.signal(signal.SIGRTMIN+1, lambda signum, frame: logging.reopen())
def patch():
def fork():
with logging:
pid = os_fork()
if not pid:
logging._setup()
return pid
os_fork = os.fork
os.fork = fork
patch()
del patch
......@@ -28,6 +28,7 @@ class Node(object):
_connection = None
_identified = False
devpath = ()
id_timestamp = None
def __init__(self, manager, address=None, uuid=None, state=NodeStates.DOWN):
......
......@@ -25,6 +25,7 @@ def speedupFileStorageTxnLookup():
from array import array
from bisect import bisect
from collections import defaultdict
from neo.lib import logging
from ZODB.FileStorage.FileStorage import FileStorage, FileIterator
typecode = 'L' if array('I').itemsize < 4 else 'I'
......@@ -44,6 +45,8 @@ def speedupFileStorageTxnLookup():
try:
index = self._tidindex
except AttributeError:
logging.info("Building index for faster lookup of"
" transactions in the FileStorage DB.")
# Cache a sorted list of all the file pos from oid index.
# To reduce memory usage, the list is splitted in arrays of
# low order 32-bit words.
......@@ -52,10 +55,10 @@ def speedupFileStorageTxnLookup():
tindex[x >> 32].append(x & 0xffffffff)
index = self._tidindex = []
for h, l in sorted(tindex.iteritems()):
x = array('I')
x.fromlist(sorted(l))
l = self._read_data_header(h << 32 | x[0])
index.append((l.tid, h, x))
l = array(typecode, sorted(l))
x = self._read_data_header(h << 32 | l[0])
index.append((x.tid, h, l))
logging.info("... index built")
x = bisect(index, (start,)) - 1
if x >= 0:
x, h, index = index[x]
......
This diff is collapsed.
......@@ -15,12 +15,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import socket
import os, socket
from binascii import a2b_hex, b2a_hex
from datetime import timedelta, datetime
from hashlib import sha1
from Queue import deque
from struct import pack, unpack
from struct import pack, unpack, Struct
from time import gmtime
TID_LOW_OVERFLOW = 2**32
......@@ -102,11 +102,10 @@ def addTID(ptid, offset):
higher = (d.year, d.month, d.day, d.hour, d.minute)
return packTID(higher, lower)
def u64(s):
return unpack('!Q', s)[0]
def p64(n):
return pack('!Q', n)
p64, u64 = (lambda unpack: (
unpack.__self__.pack,
lambda s: unpack(s)[0]
))(Struct('!Q').unpack)
def add64(packed, offset):
"""Add a python number to a 64-bits packed value"""
......@@ -226,3 +225,25 @@ class cached_property(object):
if obj is None: return self
value = obj.__dict__[self.func.__name__] = self.func(obj)
return value
# This module is always imported before multiprocessing is used, and the
# main process does not want to change name when task are run in threads.
spt_pid = os.getpid()
def setproctitle(title):
global spt_pid
pid = os.getpid()
if spt_pid == pid:
return
spt_pid = pid
# Try using https://pypi.org/project/setproctitle/
try:
# On Linux, this is done by clobbering argv, and the main process
# usually has a longer command line than the title of subprocesses.
os.environ['SPT_NOENV'] = '1'
from setproctitle import setproctitle
except ImportError:
return
finally:
del os.environ['SPT_NOENV']
setproctitle(title)
......@@ -24,7 +24,7 @@ from ..app import monotonic_time
class IdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp):
address, name, devpath, id_timestamp):
app = self.app
self.checkClusterName(name)
if address == app.server:
......@@ -101,6 +101,8 @@ class IdentificationHandler(EventHandler):
uuid=uuid, address=address)
else:
node.setUUID(uuid)
if devpath:
node.devpath = tuple(devpath)
node.id_timestamp = monotonic_time()
node.setState(state)
conn.setHandler(handler)
......@@ -120,7 +122,7 @@ class IdentificationHandler(EventHandler):
class SecondaryIdentificationHandler(EventHandler):
def requestIdentification(self, conn, node_type, uuid,
address, name, id_timestamp):
address, name, devpath, id_timestamp):
app = self.app
self.checkClusterName(name)
if address == app.server:
......
......@@ -38,7 +38,7 @@ class ElectionHandler(MasterHandler):
super(ElectionHandler, self).connectionCompleted(conn)
app = self.app
conn.ask(Packets.RequestIdentification(NodeTypes.MASTER,
app.uuid, app.server, app.name, app.election))
app.uuid, app.server, app.name, (), app.election))
def connectionFailed(self, conn):
super(ElectionHandler, self).connectionFailed(conn)
......
......@@ -178,7 +178,7 @@ class PartitionTable(neo.lib.pt.PartitionTable):
def tweak(self, drop_list=()):
"""Optimize partition table
This reassigns cells in 3 ways:
This reassigns cells in 4 ways:
- Discard cells of nodes listed in 'drop_list'. For partitions with too
few readable cells, some cells are instead marked as FEEDING. This is
a preliminary step to drop these nodes, otherwise the partition table
......@@ -187,6 +187,8 @@ class PartitionTable(neo.lib.pt.PartitionTable):
- When a transaction creates new objects (oids are roughly allocated
sequentially), we expect better performance by maximizing the number
of involved nodes (i.e. parallelizing writes).
- For maximum resiliency, cells of each partition are assigned as far
as possible from each other, by checking the topology path of nodes.
Examples of optimal partition tables with np=10, nr=1 and 5 nodes:
......@@ -215,6 +217,17 @@ class PartitionTable(neo.lib.pt.PartitionTable):
U. .U U.
.U U. U.
U. U. .U
For the topology, let's consider an example with paths of the form
(room, machine, disk):
- if there are more rooms than the number of replicas, 2 cells of the
same partition must not be assigned in the same room;
- otherwise, topology paths are checked at a deeper depth,
e.g. not on the same machine and distributed evenly
(off by 1) among rooms.
But the topology is expected to be optimal, otherwise it is ignored.
In some cases, we could fall back to a non-optimal topology but
that would cause extra replication if the user wants to fix it.
"""
# Collect some data in a usable form for the rest of the method.
node_list = {node: {} for node in self.count_dict
......@@ -242,6 +255,67 @@ class PartitionTable(neo.lib.pt.PartitionTable):
i += 1
option_dict = Counter(map(tuple, x))
# Initialize variables/functions to optimize the topology.
devpath_max = []
devpaths = [()] * node_count
if repeats > 1:
_devpaths = [x[0].devpath for x in node_list]
max_depth = min(map(len, _devpaths))
depth = 0
while 1:
if depth < max_depth:
depth += 1
x = Counter(x[:depth] for x in _devpaths)
n = len(x)
x = set(x.itervalues())
# TODO: Prove it works. If the code turns out to be:
# - too pessimistic, the topology is ignored when
# resiliency could be maximized;
# - or worse too optimistic, in which case this
# method raises, possibly after a very long time.
if len(x) == 1 or max(x) * repeats <= node_count:
i, x = divmod(repeats, n)
devpath_max.append((i + 1, x) if x else (i, n))
if n < repeats:
continue
devpaths = [x[:depth] for x in _devpaths]
break
logging.warning("Can't maximize resiliency: fix the topology"
" of your storage nodes and make sure they're all running."
" %s storage device failure(s) may be enough to lose all"
" the database." % (repeats - 1))
break
topology = [{} for _ in xrange(self.np)]
def update_topology():
for offset in option:
n = topology[offset]
for i, (j, k) in zip(devpath, devpath_max):
try:
i, x = n[i]
except KeyError:
n[i] = i, x = [0, {}]
if i == j or i + 1 == j and k == sum(
1 for i in n.itervalues() if i[0] == j):
# Too many cells would be assigned at this topology
# node.
return False
n = x
# The topology may be optimal with this option. Apply it.
for offset in option:
n = topology[offset]
for i in devpath:
n = n[i]
n[0] += 1
n = n[1]
return True
def revert_topology():
for offset in option:
n = topology[offset]
for i in devpath:
n = n[i]
n[0] -= 1
n = n[1]
# Strategies to find the "best" permutation of nodes.
def node_options():
# The second part of the key goes with the above cosmetic sort.
......@@ -291,24 +365,27 @@ class PartitionTable(neo.lib.pt.PartitionTable):
new = [] # the solution
stack = [] # data recursion
def options():
return iter(node_options[len(new)][-1])
x = node_options[len(new)]
return devpaths[x[-2]], iter(x[-1])
for node_options in node_options(): # for each strategy
iter_option = options()
devpath, iter_option = options()
while 1:
try:
option = next(iter_option)
except StopIteration: # 1st strategy only
except StopIteration:
if new:
iter_option = stack.pop()
option_dict[new.pop()] += 1
devpath, iter_option = stack.pop()
option = new.pop()
revert_topology()
option_dict[option] += 1
continue
break
if option_dict[option]:
if option_dict[option] and update_topology():
new.append(option)
if len(new) == len(node_list):
if len(new) == node_count:
break
stack.append(iter_option)
iter_option = options()
stack.append((devpath, iter_option))
devpath, iter_option = options()
option_dict[option] -= 1
if new:
break
......@@ -384,13 +461,18 @@ class PartitionTable(neo.lib.pt.PartitionTable):
if cell.isReadable():
if cell.getNode().isRunning():
lost = None
else :
else:
cell_list.append(cell)
for cell in cell_list:
if cell.getNode() is not lost:
cell.setState(CellStates.OUT_OF_DATE)
change_list.append((offset, cell.getUUID(),
CellStates.OUT_OF_DATE))
node = cell.getNode()
if node is not lost:
if cell.isFeeding():
self.removeCell(offset, node)
state = CellStates.DISCARDED
else:
state = CellStates.OUT_OF_DATE
cell.setState(state)
change_list.append((offset, node.getUUID(), state))
if fully_readable and change_list:
logging.warning(self._first_outdated_message)
return change_list
......
......@@ -65,6 +65,7 @@ UNIT_TEST_MODULES = [
'neo.tests.client.testZODBURI',
# light functional tests
'neo.tests.threaded.test',
'neo.tests.threaded.testConfig',
'neo.tests.threaded.testImporter',
'neo.tests.threaded.testReplication',
'neo.tests.threaded.testSSL',
......
......@@ -71,6 +71,7 @@ class Application(BaseApplication):
self.dm.setup(reset=config.getReset(), dedup=config.getDedup())
self.loadConfiguration()
self.devpath = self.dm.getTopologyPath()
# force node uuid from command line argument, for testing purpose only
if config.getUUID() is not None:
......@@ -203,7 +204,8 @@ class Application(BaseApplication):
pt = self.pt
# search, find, connect and identify to the primary master
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server)
bootstrap = BootstrapManager(self, NodeTypes.STORAGE, self.server,
self.devpath)
self.master_node, self.master_conn, num_partitions, num_replicas = \
bootstrap.getPrimaryConnection()
uuid = self.uuid
......
......@@ -51,7 +51,7 @@ class Checker(object):
else:
conn = ClientConnection(app, StorageOperationHandler(app), node)
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
uuid, app.server, name, app.id_timestamp))
uuid, app.server, name, (), app.id_timestamp))
self.conn_dict[conn] = node.isIdentified()
conn_set = set(self.conn_dict)
conn_set.discard(None)
......
......@@ -16,8 +16,6 @@
LOG_QUERIES = False
from neo.lib.exception import DatabaseFailure
DATABASE_MANAGER_DICT = {
'Importer': 'importer.ImporterDatabaseManager',
'MySQL': 'mysqldb.MySQLDatabaseManager',
......@@ -33,3 +31,6 @@ def getAdapterKlass(name):
def buildDatabaseManager(name, args=(), kw={}):
return getAdapterKlass(name)(*args, **kw)
class DatabaseFailure(Exception):
pass
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -25,7 +25,7 @@ from . import LOG_QUERIES
from .manager import DatabaseManager, splitOIDField
from neo.lib import logging, util
from neo.lib.interfaces import implements
from neo.lib.protocol import CellStates, ZERO_OID, ZERO_TID, ZERO_HASH
from neo.lib.protocol import ZERO_OID, ZERO_TID, ZERO_HASH
def unique_constraint_message(table, *columns):
c = sqlite3.connect(":memory:")
......@@ -68,7 +68,7 @@ class SQLiteDatabaseManager(DatabaseManager):
never be used for small requests.
"""
VERSION = 2
VERSION = 3
def _parse(self, database):
self.db = os.path.expanduser(database)
......@@ -86,6 +86,9 @@ class SQLiteDatabaseManager(DatabaseManager):
q("PRAGMA journal_mode = MEMORY")
self._config = {}
def _getDevPath(self):
return self.db
def _commit(self):
retry_if_locked(self.conn.commit)
......@@ -113,23 +116,33 @@ class SQLiteDatabaseManager(DatabaseManager):
if not e.args[0].startswith("no such table:"):
raise
def _migrate1(self, *_):
self._checkNoUnfinishedTransactions()
self.query("DROP TABLE IF EXISTS ttrans")
def _migrate2(self, schema_dict, index_dict):
def _alterTable(self, schema_dict, table, select="*"):
# BBB: As explained in _setup, no transactional DDL
# so let's do the same dance as for MySQL.
q = self.query
if self.nonempty('obj') is None:
if self.nonempty('new_obj') is None:
new = 'new_' + table
if self.nonempty(table) is None:
if self.nonempty(new) is None:
return
else:
q("DROP TABLE IF EXISTS new_obj")
q(schema_dict.pop('obj') % 'new_obj')
q("INSERT INTO new_obj SELECT * FROM obj")
q("DROP TABLE obj")
q("ALTER TABLE new_obj RENAME TO obj")
q("DROP TABLE IF EXISTS " + new)
q(schema_dict.pop(table) % new)
q("INSERT INTO %s SELECT %s FROM %s" % (new, select, table))
q("DROP TABLE " + table)
q("ALTER TABLE %s RENAME TO %s" % (new, table))
def _migrate1(self, *_):
self._checkNoUnfinishedTransactions()
self.query("DROP TABLE IF EXISTS ttrans")
def _migrate2(self, schema_dict, index_dict):
self._alterTable(schema_dict, 'obj')
def _migrate3(self, schema_dict, index_dict):
self._alterTable(schema_dict, 'pt', "rid, nid, CASE state"
" WHEN 0 THEN -1" # UP_TO_DATE
" WHEN 2 THEN -2" # FEEDING
" ELSE 1-state END")
def _setup(self, dedup=False):
# BBB: SQLite has transactional DDL but before Python 3.6,
......@@ -150,10 +163,10 @@ class SQLiteDatabaseManager(DatabaseManager):
# The table "pt" stores a partition table.
schema_dict['pt'] = """CREATE TABLE %s (
rid INTEGER NOT NULL,
partition INTEGER NOT NULL,
nid INTEGER NOT NULL,
state INTEGER NOT NULL,
PRIMARY KEY (rid, nid))
tid INTEGER NOT NULL,
PRIMARY KEY (partition, nid))
"""
# The table "trans" stores information on committed transactions.
......@@ -223,7 +236,8 @@ class SQLiteDatabaseManager(DatabaseManager):
for table, schema in schema_dict.iteritems():
q(schema % ('IF NOT EXISTS ' + table))
for i, index in enumerate(index_dict.get(table, ()), 1):
for table, index in index_dict.iteritems():
for i, index in enumerate(index, 1):
q(index % ('IF NOT EXISTS _%s_i%s' % (table, i), table))
self._uncommitted_data.update(q("SELECT data_id, count(*)"
......@@ -249,42 +263,23 @@ class SQLiteDatabaseManager(DatabaseManager):
else:
q("REPLACE INTO config VALUES (?,?)", (key, str(value)))
def getPartitionTable(self, *nid):
if nid:
return self.query("SELECT rid, state FROM pt WHERE nid=?", nid)
def _getPartitionTable(self):
return self.query("SELECT * FROM pt")
# A test with a table of 20 million lines and SQLite 3.8.7.1 shows that
# it's not worth changing getLastTID:
# - It already returns the result in less than 2 seconds, without reading
# the whole table (this is 4-7 times faster than MySQL).
# - Strangely, a "GROUP BY partition" clause makes SQLite almost twice
# slower.
# - Getting MAX(tid) is immediate with a "AND partition=?" condition so one
# way to speed up the following 2 methods is to repeat the queries for
# each partition (and finish in Python with max() for getLastTID).
def getLastTID(self, max_tid):
return self.query(
"SELECT MAX(tid) FROM pt, trans"
" WHERE nid=? AND rid=partition AND tid<=?",
(self.getUUID(), max_tid,)).next()[0]
def _getLastIDs(self):
p64 = util.p64
def _getLastTID(self, partition, max_tid=None):
x = self.query
if max_tid is None:
x = x("SELECT MAX(tid) FROM trans WHERE partition=?", (partition,))
else:
x = x("SELECT MAX(tid) FROM trans WHERE partition=? AND tid<=?",
(partition, max_tid))
return x.next()[0]
def _getLastIDs(self, *args):
q = self.query
args = self.getUUID(),
trans = {partition: p64(tid)
for partition, tid in q(
"SELECT partition, MAX(tid) FROM pt, trans"
" WHERE nid=? AND rid=partition GROUP BY partition", args)}
obj = {partition: p64(tid)
for partition, tid in q(
"SELECT partition, MAX(tid) FROM pt, obj"
" WHERE nid=? AND rid=partition GROUP BY partition", args)}
oid = q("SELECT MAX(oid) oid FROM pt, obj"
" WHERE nid=? AND rid=partition", args).next()[0]
return trans, obj, None if oid is None else p64(oid)
(oid,), = q("SELECT MAX(oid) FROM obj WHERE `partition`=?", args)
(tid,), = q("SELECT MAX(tid) FROM obj WHERE `partition`=?", args)
return tid, oid
def _getDataLastId(self, partition):
return self.query("SELECT MAX(id) FROM data WHERE %s <= id AND id < %s"
......@@ -352,8 +347,8 @@ class SQLiteDatabaseManager(DatabaseManager):
# whereas we try to replace only 1 value ?
# We don't want to remove the 'NOT NULL' constraint
# so we must simulate a "REPLACE OR FAIL".
q("DELETE FROM pt WHERE rid=? AND nid=?", (offset, nid))
if state != CellStates.DISCARDED:
q("DELETE FROM pt WHERE partition=? AND nid=?", (offset, nid))
if state is not None:
q("INSERT OR FAIL INTO pt VALUES (?,?,?)",
(offset, nid, int(state)))
......@@ -478,10 +473,15 @@ class SQLiteDatabaseManager(DatabaseManager):
(u64(tid), u64(ttid)))
self.commit()
def unlockTransaction(self, tid, ttid):
def unlockTransaction(self, tid, ttid, trans, obj):
q = self.query
u64 = util.u64
tid = u64(tid)
if trans:
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=?", (tid,))
q("DELETE FROM ttrans WHERE tid=?", (tid,))
if not obj:
return
ttid = u64(ttid)
sql = " FROM tobj WHERE tid=?"
data_id_list = [x for x, in q("SELECT data_id%s AND data_id IS NOT NULL"
......@@ -489,8 +489,6 @@ class SQLiteDatabaseManager(DatabaseManager):
q("INSERT INTO obj SELECT partition, oid, ?, data_id, value_tid" + sql,
(tid, ttid))
q("DELETE" + sql, (ttid,))
q("INSERT INTO trans SELECT * FROM ttrans WHERE tid=?", (tid,))
q("DELETE FROM ttrans WHERE tid=?", (tid,))
self.releaseData(data_id_list)
def abortTransaction(self, ttid):
......@@ -520,12 +518,12 @@ class SQLiteDatabaseManager(DatabaseManager):
def _deleteRange(self, partition, min_tid=None, max_tid=None):
sql = " WHERE partition=?"
args = [partition]
if min_tid:
if min_tid is not None:
sql += " AND ? < tid"
args.append(util.u64(min_tid))
if max_tid:
args.append(min_tid)
if max_tid is not None:
sql += " AND tid <= ?"
args.append(util.u64(max_tid))
args.append(max_tid)
q = self.query
q("DELETE FROM trans" + sql, args)
sql = " FROM obj" + sql
......@@ -693,3 +691,24 @@ class SQLiteDatabaseManager(DatabaseManager):
sha1(','.join(str(x[1]) for x in r)).digest(),
p64(r[-1][1]))
return 0, ZERO_HASH, ZERO_TID, ZERO_HASH, ZERO_OID
def dump(self):
main = []
data = []
for line in self.conn.iterdump():
if line.startswith('INSERT '):
assert line.endswith(';'), line
data.append(line)
continue
if line.startswith('CREATE TABLE '):
# ALTER TABLE adds quotes.
create, table, name, tail = line.split(' ', 3)
line = ' '.join((create, table, name.strip('"'), tail))
main.append(line)
assert line == 'COMMIT;', line
data.sort()
main[-1:-1] = data
return '\n'.join(main) + '\n'
def restore(self, sql):
self.conn.executescript(sql)
......@@ -42,11 +42,11 @@ class ClientOperationHandler(BaseHandler):
# for read rpc
return self.app.tm.read_queue
def askObject(self, conn, oid, serial, tid):
def askObject(self, conn, oid, at, before):
app = self.app
if app.tm.loadLocked(oid):
raise DelayEvent
o = app.dm.getObject(oid, serial, tid)
o = app.dm.getObject(oid, at, before)
try:
serial, next_serial, compression, checksum, data, data_serial = o
except TypeError:
......
......@@ -32,7 +32,7 @@ class IdentificationHandler(EventHandler):
return self.app.nm
def requestIdentification(self, conn, node_type, uuid, address, name,
id_timestamp):
devpath, id_timestamp):
self.checkClusterName(name)
app = self.app
# reject any incoming connections if not ready
......
......@@ -28,21 +28,21 @@ class InitializationHandler(BaseMasterHandler):
raise ProtocolError('Partial partition table received')
# Install the partition table into the database for persistence.
cell_list = []
offset_list = xrange(pt.getPartitions())
unassigned_set = set(offset_list)
for offset in offset_list:
unassigned = range(pt.getPartitions())
for offset in reversed(unassigned):
for cell in pt.getCellList(offset):
cell_list.append((offset, cell.getUUID(), cell.getState()))
if cell.getUUID() == app.uuid:
unassigned_set.remove(offset)
unassigned.remove(offset)
# delete objects database
dm = app.dm
if unassigned_set:
if unassigned:
if app.disable_drop_partitions:
logging.info("don't drop data for partitions %r", unassigned_set)
logging.info('partitions %r are discarded but actual deletion'
' of data is disabled', unassigned)
else:
logging.debug('drop data for partitions %r', unassigned_set)
dm.dropPartitions(unassigned_set)
logging.debug('drop data for partitions %r', unassigned)
dm.dropPartitions(unassigned)
dm.changePartitionTable(ptid, cell_list, reset=True)
dm.commit()
......@@ -63,7 +63,7 @@ class InitializationHandler(BaseMasterHandler):
def askLastIDs(self, conn):
dm = self.app.dm
dm.truncate()
ltid, _, _, loid = dm.getLastIDs()
ltid, loid = dm.getLastIDs()
conn.answer(Packets.AnswerLastIDs(loid, ltid))
def askPartitionTable(self, conn):
......@@ -77,18 +77,10 @@ class InitializationHandler(BaseMasterHandler):
def validateTransaction(self, conn, ttid, tid):
dm = self.app.dm
dm.lockTransaction(tid, ttid)
dm.unlockTransaction(tid, ttid)
dm.unlockTransaction(tid, ttid, True, True)
dm.commit()
def startOperation(self, conn, backup):
self.app.operational = True
# XXX: see comment in protocol
dm = self.app.dm
if backup:
if dm.getBackupTID():
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm._setBackupTID(tid)
dm.commit()
self.app.operational = True
self.app.replicator.startOperation(backup)
......@@ -26,10 +26,7 @@ class MasterOperationHandler(BaseMasterHandler):
def startOperation(self, conn, backup):
# XXX: see comment in protocol
assert self.app.operational and backup
dm = self.app.dm
if not dm.getBackupTID():
dm._setBackupTID(dm.getLastIDs()[0] or ZERO_TID)
dm.commit()
self.app.replicator.startOperation(backup)
def askLockInformation(self, conn, ttid, tid):
self.app.tm.lock(ttid, tid)
......
......@@ -75,9 +75,6 @@ class StorageOperationHandler(EventHandler):
deleteTransaction(tid)
assert not pack_tid, "TODO"
if next_tid:
# More than one chunk ? This could be a full replication so avoid
# restarting from the beginning by committing now.
self.app.dm.commit()
self.app.replicator.fetchTransactions(next_tid)
else:
self.app.replicator.fetchObjects()
......@@ -97,15 +94,12 @@ class StorageOperationHandler(EventHandler):
for serial, oid_list in object_dict.iteritems():
for oid in oid_list:
deleteObject(oid, serial)
# XXX: It should be possible not to commit here if it was the last
# chunk, because we'll either commit again when updating
# 'backup_tid' or the partition table.
self.app.dm.commit()
assert not pack_tid, "TODO"
if next_tid:
# TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid)
else:
# This will also commit.
self.app.replicator.finish()
@checkConnectionIsReplicatorConnection
......@@ -267,6 +261,8 @@ class StorageOperationHandler(EventHandler):
"partition %u dropped or truncated"
% partition), msg_id)
return
if not object[2]: # creation undone
object = object[0], 0, ZERO_HASH, '', object[4]
# Same as in askFetchTransactions.
conn.send(Packets.AddObject(oid, *object), msg_id)
yield conn.buffering
......
......@@ -93,7 +93,7 @@ from neo.lib import logging
from neo.lib.protocol import CellStates, NodeTypes, NodeStates, \
Packets, INVALID_TID, ZERO_TID, ZERO_OID
from neo.lib.connection import ClientConnection, ConnectionClosed
from neo.lib.util import add64, dump
from neo.lib.util import add64, dump, p64
from .handlers.storage import StorageOperationHandler
FETCH_COUNT = 1000
......@@ -190,41 +190,50 @@ class Replicator(object):
return add64(tid, -1)
return ZERO_TID
def updateBackupTID(self):
def updateBackupTID(self, commit=False):
dm = self.app.dm
tid = dm.getBackupTID()
if tid:
new_tid = self.getBackupTID()
if tid != new_tid:
dm._setBackupTID(new_tid)
if commit:
dm.commit()
def startOperation(self, backup):
dm = self.app.dm
if backup:
if dm.getBackupTID():
assert not hasattr(self, 'partition_dict'), self.partition_dict
return
tid = dm.getLastIDs()[0] or ZERO_TID
else:
tid = None
dm._setBackupTID(tid)
dm.commit()
try:
partition_dict = self.partition_dict
except AttributeError:
return
for offset, next_tid in dm.iterCellNextTIDs():
if type(next_tid) is not bytes: # readable
p = partition_dict[offset]
p.next_trans, p.next_obj = next_tid
def populate(self):
app = self.app
pt = app.pt
uuid = app.uuid
self.partition_dict = {}
self.replicate_dict = {}
self.source_dict = {}
self.ttid_set = set()
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
next_tid = app.dm.getBackupTID() or last_tid
next_tid = add64(next_tid, 1) if next_tid else ZERO_TID
outdated_list = []
for offset in xrange(pt.getPartitions()):
for cell in pt.getCellList(offset):
if cell.getUUID() == uuid and not cell.isCorrupted():
for offset, next_tid in self.app.dm.iterCellNextTIDs():
self.partition_dict[offset] = p = Partition()
if cell.isOutOfDate():
if type(next_tid) is bytes: # OUT_OF_DATE
outdated_list.append(offset)
try:
p.next_trans = add64(last_trans_dict[offset], 1)
except KeyError:
p.next_trans = ZERO_TID
p.next_obj = last_obj_dict.get(offset, ZERO_TID)
p.max_ttid = INVALID_TID
else:
p.next_trans = p.next_obj = next_tid
p.max_ttid = INVALID_TID
else: # readable
p.next_trans, p.next_obj = next_tid or (None, None)
p.max_ttid = None
if outdated_list:
self.app.tm.replicating(outdated_list)
......@@ -236,7 +245,6 @@ class Replicator(object):
discarded_list = []
readable_list = []
app = self.app
last_tid, last_trans_dict, last_obj_dict, _ = app.dm.getLastIDs()
for offset, uuid, state in cell_list:
if uuid == app.uuid:
if state in (CellStates.DISCARDED, CellStates.CORRUPTED):
......@@ -251,11 +259,9 @@ class Replicator(object):
elif state == CellStates.OUT_OF_DATE:
assert offset not in self.partition_dict
self.partition_dict[offset] = p = Partition()
try:
p.next_trans = add64(last_trans_dict[offset], 1)
except KeyError:
p.next_trans = ZERO_TID
p.next_obj = last_obj_dict.get(offset, ZERO_TID)
# New cell. 0 is also what should be stored by the backend.
# Nothing to optimize.
p.next_trans = p.next_obj = ZERO_TID
p.max_ttid = INVALID_TID
added_list.append(offset)
else:
......@@ -289,7 +295,7 @@ class Replicator(object):
next_tid = add64(tid, 1)
p.next_trans = p.next_obj = next_tid
if next_tid:
self.updateBackupTID()
self.updateBackupTID(True)
self._nextPartition()
def _nextPartitionSortKey(self, offset):
......@@ -344,7 +350,7 @@ class Replicator(object):
try:
conn.ask(Packets.RequestIdentification(NodeTypes.STORAGE,
None if name else app.uuid, app.server, name or app.name,
app.id_timestamp))
(), app.id_timestamp))
except ConnectionClosed:
if previous_node is self.current_node:
return
......@@ -360,6 +366,9 @@ class Replicator(object):
offset = self.current_partition
p = self.partition_dict[offset]
if min_tid:
# More than one chunk ? This could be a full replication so avoid
# restarting from the beginning by committing now.
self.app.dm.commit()
p.next_trans = min_tid
else:
try:
......@@ -384,13 +393,17 @@ class Replicator(object):
offset = self.current_partition
p = self.partition_dict[offset]
max_tid = self.replicate_tid
dm = self.app.dm
if min_tid:
p.next_obj = min_tid
self.updateBackupTID()
dm.updateCellTID(offset, add64(min_tid, -1))
dm.commit() # like in fetchTransactions
else:
min_tid = p.next_obj
p.next_trans = add64(max_tid, 1)
object_dict = {}
for serial, oid in self.app.dm.getReplicationObjectList(min_tid,
for serial, oid in dm.getReplicationObjectList(min_tid,
max_tid, FETCH_COUNT, offset, min_oid):
try:
object_dict[serial].append(oid)
......@@ -406,11 +419,14 @@ class Replicator(object):
p = self.partition_dict[offset]
p.next_obj = add64(tid, 1)
self.updateBackupTID()
app = self.app
app.dm.updateCellTID(offset, tid)
app.dm.commit()
if p.max_ttid or offset in self.replicate_dict and \
offset not in self.source_dict:
logging.debug("unfinished transactions: %r", self.ttid_set)
else:
self.app.tm.replicated(offset, tid)
app.tm.replicated(offset, tid)
logging.debug("partition %u replicated up to %s from %r",
offset, dump(tid), self.current_node)
self.getCurrentConnection().setReconnectionNoDelay()
......
#
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from msgpack import Packer, Unpacker
class Queue(object):
"""Unidirectional pipe for asynchronous and fast exchange of big amounts
of data between 2 processes.
It is implemented using shared memory, a few locks and msgpack
serialization. While the latter is faster than C pickle, it was mainly
chosen for its streaming API while deserializing, which greatly reduces
the locking overhead for the consumer process.
There is no mechanism to end a communication, so this information must be
in the exchanged data, for example by choosing a marker object like None:
- the last object sent by the producer is this marker
- the consumer stops iterating when it gets this marker
As long as there are data being exchanged, the 2 processes can't change
roles (producer/consumer).
"""
def __init__(self, max_size):
from multiprocessing import Lock, RawArray, RawValue
self._max_size = max_size
self._array = RawArray('c', max_size)
self._pos = RawValue('L')
self._size = RawValue('L')
self._locks = Lock(), Lock(), Lock()
def __repr__(self):
return "<%s pos=%s size=%s max_size=%s>" % (self.__class__.__name__,
self._pos.value, self._size.value, self._max_size)
def __iter__(self):
"""Iterate endlessly over all objects sent by the producer
Internally, this method uses a receiving buffer that is lost if
interrupted (GeneratorExit). If this buffer was not empty, the queue
is left in a inconsistent state and this method can't be called again.
So the correct way to split a loop is to first get an iterator
explicitly:
iq = iter(queue)
for x in iq:
if ...:
break
for x in iq:
...
"""
unpacker = Unpacker(use_list=False, raw=True)
feed = unpacker.feed
max_size = self._max_size
array = self._array
pos = self._pos
size = self._size
lock, get_lock, put_lock = self._locks
left = 0
while 1:
for data in unpacker:
yield data
while 1:
with lock:
p = pos.value
s = size.value
if s:
break
get_lock.acquire()
e = p + s
if e < max_size:
feed(array[p:e])
else:
feed(array[p:])
e -= max_size
feed(array[:e])
with lock:
pos.value = e
n = size.value
size.value = n - s
if n == max_size:
put_lock.acquire(0)
put_lock.release()
def __call__(self, iterable):
"""Fill the queue with given objects
Hoping than msgpack.Packer gets a streaming API, 'iterable' should not
be split (i.e. this method should be called only once, like __iter__).
"""
pack = Packer(use_bin_type=True).pack
max_size = self._max_size
array = self._array
pos = self._pos
size = self._size
lock, get_lock, put_lock = self._locks
left = 0
for data in iterable:
data = pack(data)
n = len(data)
i = 0
while 1:
if not left:
while 1:
with lock:
p = pos.value
j = size.value
left = max_size - j
if left:
break
put_lock.acquire()
p += j
if p >= max_size:
p -= max_size
e = min(p + min(n, left), max_size)
j = e - p
array[p:e] = data[i:i+j]
n -= j
i += j
with lock:
p = pos.value
s = size.value
j += s
size.value = j
if not s:
get_lock.acquire(0)
get_lock.release()
p += j
if p >= max_size:
p -= max_size
left = max_size - j
if not n:
break
def test(self):
import multiprocessing, random, sys, threading
from traceback import print_tb
r = range(50)
random.shuffle(r)
for P in threading.Thread, multiprocessing.Process:
q = Queue(23)
def t():
for n in xrange(len(r)):
yield '.' * n
yield
for n in r:
yield '.' * n
i = j = 0
p = P(target=q, args=(t(),))
p.daemon = 1
p.start()
try:
q = iter(q)
for i, x in enumerate(q):
if x is None:
break
self.assertEqual(x, '.' * i)
self.assertEqual(i, len(r))
for j in r:
self.assertEqual(next(q), '.' * j)
except KeyboardInterrupt:
print_tb(sys.exc_info()[2])
self.fail((i, j))
p.join()
if __name__ == '__main__':
import unittest
unittest.TextTestRunner().run(type('', (unittest.TestCase,), {
'runTest': test})())
......@@ -314,12 +314,15 @@ class TransactionManager(EventQueue):
Unlock transaction
"""
try:
tid = self._transaction_dict[ttid].tid
transaction = self._transaction_dict[ttid]
except KeyError:
raise ProtocolError("unknown ttid %s" % dump(ttid))
tid = transaction.tid
logging.debug('Unlock TXN %s (ttid=%s)', dump(tid), dump(ttid))
dm = self._app.dm
dm.unlockTransaction(tid, ttid)
dm.unlockTransaction(tid, ttid,
transaction.voted == 2,
transaction.store_dict)
self._app.em.setTimeout(time() + 1, dm.deferCommit())
self.abort(ttid, even_if_locked=True)
......@@ -521,7 +524,6 @@ class TransactionManager(EventQueue):
assert not even_if_locked
# See how the master processes AbortTransaction from the client.
return
logging.debug('Abort TXN %s', dump(ttid))
transaction = self._transaction_dict[ttid]
locked = transaction.tid
# if the transaction is locked, ensure we can drop it
......@@ -529,6 +531,7 @@ class TransactionManager(EventQueue):
if not even_if_locked:
return
else:
logging.debug('Abort TXN %s', dump(ttid))
dm = self._app.dm
dm.abortTransaction(ttid)
dm.releaseData([x[1] for x in transaction.store_dict.itervalues()],
......
......@@ -28,6 +28,7 @@ import weakref
import MySQLdb
import transaction
from ConfigParser import SafeConfigParser
from cStringIO import StringIO
try:
from ZODB._compat import Unpickler
......@@ -155,8 +156,22 @@ def setupMySQLdb(db_list, user=DB_USER, password='', clear_databases=True):
conn.commit()
conn.close()
def ImporterConfigParser(adapter, zodb, **kw):
cfg = SafeConfigParser()
cfg.add_section("neo")
cfg.set("neo", "adapter", adapter)
for x in kw.iteritems():
cfg.set("neo", *x)
for name, zodb in zodb:
cfg.add_section(name)
for x in zodb.iteritems():
cfg.set(name, *x)
return cfg
class NeoTestBase(unittest.TestCase):
maxDiff = None
def setUp(self):
logging.name = self.setupLog()
unittest.TestCase.setUp(self)
......@@ -175,6 +190,8 @@ class NeoTestBase(unittest.TestCase):
# Note we don't even abort them because it may require a valid
# connection to a master node (see Storage.sync()).
transaction.manager.__init__()
if logging._max_size is not None:
logging.flush()
class failureException(AssertionError):
def __init__(self, msg=None):
......
......@@ -21,6 +21,7 @@ from .. import NeoUnitTestBase, buildUrlFromString
from neo.client.app import Application
from neo.client.cache import test as testCache
from neo.client.exception import NEOStorageError
from neo.lib.util import p64
class ClientApplicationTests(NeoUnitTestBase):
......@@ -51,9 +52,7 @@ class ClientApplicationTests(NeoUnitTestBase):
def makeOID(self, value=None):
from random import randint
if value is None:
value = randint(1, 255)
return '\00' * 7 + chr(value)
return p64(randint(1, 255) if value is None else value)
makeTID = makeOID
def makeTransactionObject(self, user='u', description='d', _extension='e'):
......
......@@ -221,7 +221,7 @@ class ClusterPdb(object):
def wait(self, test, timeout):
end_time = time() + timeout
period = 0.1
period = 0.01
while not test():
cluster_dict.acquire()
try:
......@@ -232,7 +232,6 @@ class ClusterPdb(object):
next_sleep = max(last_pdb + timeout, end_time) - time()
if next_sleep > period:
next_sleep = period
period *= 1.5
elif next_sleep < 0:
return False
finally:
......
#
# Copyright (C) 2014-2017 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, stat, time
from persistent import Persistent
from BTrees.OOBTree import OOBTree
class Inode(OOBTree):
data = None
def __init__(self, up=None, mode=stat.S_IFDIR):
self[os.pardir] = self if up is None else up
self.mode = mode
self.mtime = time.time()
def __getstate__(self):
return Persistent.__getstate__(self), OOBTree.__getstate__(self)
def __setstate__(self, state):
Persistent.__setstate__(self, state[0])
OOBTree.__setstate__(self, state[1])
def edit(self, data=None, mtime=None):
fmt = stat.S_IFMT(self.mode)
if data is None:
assert fmt == stat.S_IFDIR, oct(fmt)
else:
assert fmt == stat.S_IFREG or fmt == stat.S_IFLNK, oct(fmt)
if self.data != data:
self.data = data
if self.mtime != mtime:
self.mtime = mtime or time.time()
def root(self):
try:
self = self[os.pardir]
except KeyError:
return self
return self.root()
def traverse(self, path, followlinks=True):
path = iter(path.split(os.sep) if isinstance(path, basestring) and path
else path)
for d in path:
if not d:
return self.root().traverse(path, followlinks)
if d != os.curdir:
d = self[d]
if followlinks and stat.S_ISLNK(d.mode):
d = self.traverse(d.data, True)
return d.traverse(path, followlinks)
return self
def inodeFromFs(self, path):
s = os.lstat(path)
mode = s.st_mode
name = os.path.basename(path)
try:
i = self[name]
assert stat.S_IFMT(i.mode) == stat.S_IFMT(mode)
changed = False
except KeyError:
i = self[name] = self.__class__(self, mode)
changed = True
i.edit(open(path).read() if stat.S_ISREG(mode) else
os.readlink(p) if stat.S_ISLNK(mode) else
None, s.st_mtime)
return changed or i._p_changed
def treeFromFs(self, path, yield_interval=None, filter=None):
prefix_len = len(path) + len(os.sep)
n = 0
for dirpath, dirnames, filenames in os.walk(path):
inodeFromFs = self.traverse(dirpath[prefix_len:]).inodeFromFs
for names in dirnames, filenames:
skipped = []
for j, name in enumerate(names):
p = os.path.join(dirpath, name)
if filter and not filter(p[prefix_len:]):
skipped.append(j)
elif inodeFromFs(p):
n += 1
if n == yield_interval:
n = 0
yield self
while skipped:
del names[skipped.pop()]
if n:
yield self
def walk(self):
s = [(None, self)]
while s:
top, self = s.pop()
dirs = []
nondirs = []
for name, inode in self.iteritems():
if name != os.pardir:
(dirs if stat.S_ISDIR(inode.mode) else nondirs).append(name)
yield top or os.curdir, dirs, nondirs
for name in dirs:
s.append((os.path.join(top, name) if top else name, self[name]))
......@@ -29,17 +29,16 @@ import tempfile
import traceback
import threading
import psutil
from ConfigParser import SafeConfigParser
import neo.scripts
from neo.neoctl.neoctl import NeoCTL, NotReadyException
from neo.lib import logging
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates, \
UUID_NAMESPACES
from neo.lib.util import dump
from neo.lib.util import dump, setproctitle
from .. import (ADDRESS_TYPE, DB_SOCKET, DB_USER, IP_VERSION_FORMAT_DICT, SSL,
buildUrlFromString, cluster, getTempDirectory, NeoTestBase, Patch,
setupMySQLdb)
buildUrlFromString, cluster, getTempDirectory, setupMySQLdb,
ImporterConfigParser, NeoTestBase, Patch)
from neo.client.Storage import Storage
from neo.storage.database import manager, buildDatabaseManager
......@@ -116,36 +115,31 @@ class PortAllocator(object):
__del__ = release
class NEOProcess(object):
class Process(object):
_coverage_fd = None
_coverage_prefix = os.path.join(getTempDirectory(), 'coverage-')
_coverage_index = 0
pid = 0
def __init__(self, command, uuid, arg_dict):
try:
__import__('neo.scripts.' + command, level=0)
except ImportError:
raise NotFound, '%s not found' % (command)
def __init__(self, command, arg_dict={}):
self.command = command
self.arg_dict = arg_dict
self.with_uuid = True
self.setUUID(uuid)
def start(self, with_uuid=True):
# Prevent starting when already forked and wait wasn't called.
if self.pid != 0:
raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
command = self.command
def _args(self):
args = []
self.with_uuid = with_uuid
for arg, param in self.arg_dict.iteritems():
args.append('--' + arg)
if param is not None:
args.append(str(param))
if with_uuid:
args += '--uuid', str(self.uuid)
return args
def start(self):
# Prevent starting when already forked and wait wasn't called.
if self.pid != 0:
raise AlreadyRunning('Already running with PID %r' % self.pid)
command = self.command
args = self._args()
global coverage
if coverage:
cls = self.__class__
......@@ -159,7 +153,7 @@ class NEOProcess(object):
if args:
os.close(w)
os.kill(os.getpid(), signal.SIGSTOP)
self.pid = logging.fork()
self.pid = os.fork()
if self.pid:
# Wait that the signal to kill the child is set up.
os.close(w)
......@@ -179,7 +173,8 @@ class NEOProcess(object):
os.close(self._coverage_fd)
os.write(w, '\0')
sys.argv = [command] + args
getattr(neo.scripts, command).main()
setproctitle(self.command)
self.run()
status = 0
except SystemExit, e:
status = e.code
......@@ -203,6 +198,9 @@ class NEOProcess(object):
logging.info('pid %u: %s %s',
self.pid, command, ' '.join(map(repr, args)))
def run(self):
raise NotImplementedError
def child_coverage(self):
r = self._coverage_fd
if r is not None:
......@@ -249,11 +247,32 @@ class NEOProcess(object):
self.kill()
self.wait()
def getPID(self):
return self.pid
def isAlive(self):
try:
return psutil.Process(self.pid).status() != psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
class NEOProcess(Process):
def __init__(self, command, uuid, arg_dict):
try:
__import__('neo.scripts.' + command, level=0)
except ImportError:
raise NotFound(command + ' not found')
super(NEOProcess, self).__init__(command, arg_dict)
self.setUUID(uuid)
def _args(self):
args = super(NEOProcess, self)._args()
if self.uuid:
args += '--uuid', str(self.uuid)
return args
def run(self):
getattr(neo.scripts, self.command).main()
def getUUID(self):
assert self.with_uuid, 'UUID disabled on this process'
return self.uuid
def setUUID(self, uuid):
......@@ -262,12 +281,6 @@ class NEOProcess(object):
"""
self.uuid = uuid
def isAlive(self):
try:
return psutil.Process(self.pid).status() != psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
class NEOCluster(object):
SSL = None
......@@ -304,14 +317,8 @@ class NEOCluster(object):
IP_VERSION_FORMAT_DICT[self.address_type]
self.setupDB(clear_databases)
if importer:
cfg = SafeConfigParser()
cfg.add_section("neo")
cfg.set("neo", "adapter", adapter)
cfg = ImporterConfigParser(adapter, **importer)
cfg.set("neo", "database", self.db_template(*db_list))
for name, zodb in importer:
cfg.add_section(name)
for x in zodb.iteritems():
cfg.set(name, *x)
importer_conf = os.path.join(temp_dir, 'importer.cfg')
with open(importer_conf, 'w') as f:
cfg.write(f)
......
......@@ -202,9 +202,9 @@ class ClientTests(NEOFunctionalTest):
self.neo.stop()
self.neo = NEOCluster(db_list=['test_neo1'], partitions=3,
importer=[("root", {
importer={"zodb": [("root", {
"storage": "<filestorage>\npath %s\n</filestorage>"
% dfs_storage.getName()})],
% dfs_storage.getName()})]},
temp_dir=self.getTempDirectory())
self.neo.start()
neo_db, neo_conn = self.neo.getZODBConnection()
......
......@@ -15,7 +15,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import random, time, unittest
from collections import defaultdict
from collections import Counter, defaultdict
from .. import NeoUnitTestBase
from neo.lib import logging
from neo.lib.protocol import NodeStates, CellStates
......@@ -291,13 +291,17 @@ class MasterPartitionTableTests(NeoUnitTestBase):
self.update(pt, self.tweak(pt, sn[:1]))
self.assertPartitionTable(pt, '.U.|..U|.U.|..U|.U.|..U|.U.')
def test_18_tweak(self):
s = repr(time.time())
logging.info("using seed %r", s)
r = random.Random(s)
def test_18_tweakBigPT(self):
seed = repr(time.time())
logging.info("using seed %r", seed)
sn_count = 11
sn = [self.createStorage(None, i + 1, NodeStates.RUNNING)
for i in xrange(sn_count)]
for topo in 0, 1:
r = random.Random(seed)
if topo:
for i, s in enumerate(sn, sn_count):
s.devpath = str(i % 5),
pt = PartitionTable(1000, 2)
pt.setID(1)
for offset in xrange(pt.np):
......@@ -311,6 +315,70 @@ class MasterPartitionTableTests(NeoUnitTestBase):
self.tweak(pt)
self.update(pt)
def test_19_topology(self):
sn_count = 16
sn = [self.createStorage(None, i + 1, NodeStates.RUNNING)
for i in xrange(sn_count)]
pt = PartitionTable(48, 2)
pt.make(sn)
pt.log()
for i, s in enumerate(sn, sn_count):
s.devpath = tuple(bin(i)[3:-1])
self.assertEqual(Counter(x[2] for x in self.tweak(pt)), {
CellStates.OUT_OF_DATE: 96,
CellStates.FEEDING: 96,
})
self.update(pt)
x = lambda n, *x: ('|'.join(x[:1]*n), '|'.join(x[1:]*n))
for even, np, i, topo, expected in (
## Optimal topology.
# All nodes have same number of cells.
(1, 2, 2, ("00", "01", "02", "10", "11", "12"), ('UU...U|..UUU.',
'UU.U..|..U.UU')),
(1, 7, 1, "0001122", (
'U.....U|.U.U...|..U.U..|U....U.|.U....U|..UU...|....UU.',
'U..U...|.U...U.|..U.U..|U.....U|.U.U...|..U..U.|....U.U')),
(1, 4, 1, "00011122", ('U......U|.U.U....|..U.U...|.....UU.',
'U..U....|.U..U...|..U...U.|.....U.U')),
(1, 9, 1, "000111222", ('U.......U|.U.U.....|..U.U....|'
'.....UU..|U......U.|.U......U|'
'..UU.....|....U.U..|.....U.U.',
'U..U.....|.U....U..|..U.U....|'
'.....U.U.|U.......U|.U.U.....|'
'..U...U..|....U..U.|.....U..U')),
# Some nodes have a extra cell.
(0, 8, 1, "0001122", ('U.....U|.U.U...|..U.U..|U....U.|'
'.U....U|..UU...|....UU.|U.....U',
'U..U...|.U...U.|..U.U..|U.....U|'
'.U.U...|..U..U.|....U.U|U..U...')),
## Topology ignored.
(1, 6, 1, ("00", "01", "1"), 'UU.|U.U|.UU|UU.|U.U|.UU'),
(1, 5, 2, "01233", 'UUU..|U..UU|.UUU.|UU..U|..UUU'),
):
assert len(topo) <= sn_count
sn2 = sn[:len(topo)]
for s in sn2:
s.devpath = ()
k = (1,7)[even]
pt = PartitionTable(np*k, i)
pt.make(sn2)
for devpath, s in zip(topo, sn2):
s.devpath = tuple(devpath)
if type(expected) is tuple:
self.assertTrue(self.tweak(pt))
self.update(pt)
self.assertPartitionTable(pt, '|'.join(expected[:1]*k))
pt.clear()
pt.make(sn2)
self.assertPartitionTable(pt, '|'.join(expected[1:]*k))
self.assertFalse(pt.tweak())
else:
expected = '|'.join((expected,)*k)
self.assertFalse(pt.tweak())
self.assertPartitionTable(pt, expected)
pt.clear()
pt.make(sn2)
self.assertPartitionTable(pt, expected)
if __name__ == '__main__':
unittest.main()
......
#
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import hashlib, random
from collections import deque
from itertools import islice
from persistent import Persistent
from BTrees.IOBTree import IOBTree
from .stat_zodb import _DummyData
def generateTree(random=random):
tree = []
N = 5
fifo = deque()
path = ()
size = lambda: max(int(random.gauss(40,30)), 0)
while 1:
tree.extend(path + (i, size())
for i in xrange(-random.randrange(N), 0))
n = N * (1 - len(path)) + random.randrange(N)
for i in xrange(n):
fifo.append(path + (i,))
try:
path = fifo.popleft()
except IndexError:
break
change = tree
while change:
change = [x[:-1] + (size(),) for x in change if random.randrange(2)]
tree += change
random.shuffle(tree)
return tree
class Leaf(Persistent):
pass
Node = IOBTree
def importTree(root, tree, yield_interval=None, filter=None):
n = 0
for path in tree:
node = root
for i, x in enumerate(path[:-1], 1):
if filter and not filter(path[:i]):
break
if x < 0:
try:
node = node[x]
except KeyError:
node[x] = node = Leaf()
node.data = bytes(_DummyData(random.Random(path), path[-1]))
else:
try:
node = node[x]
continue
except KeyError:
node[x] = node = Node()
n += 1
if n == yield_interval:
n = 0
yield root
if n:
yield root
class hashTree(object):
_hash = None
_new = hashlib.md5
def __init__(self, node):
s = [((), node)]
def walk():
h = self._new()
update = h.update
while s:
top, node = s.pop()
try:
update('%s %s %s\n' % (top, len(node.data),
self._new(node.data).hexdigest()))
yield
except AttributeError:
update('%s %s\n' % (top, tuple(node.keys())))
yield
for k, v in reversed(node.items()):
s.append((top + (k,), v))
del self._walk
self._hash = h
self._walk = walk()
def __getattr__(self, attr):
return getattr(self._hash, attr)
def __call__(self, n=None):
if n is None:
return sum(1 for _ in self._walk)
next(islice(self._walk, n - 1, None))
......@@ -19,11 +19,13 @@ PROD1 = lambda random=random: DummyZODB(6.04237779991, 1.55811487853,
1.04108991045, 0.906703192546,
0.810080409164, random)
def DummyData(random=random):
def _DummyData(random, size):
# returns data that gzip at about 28.5 %
return bytearray(int(random.gauss(0, .8)) % 256 for x in xrange(size))
def DummyData(random=random):
# make sure sample is bigger than dictionary of compressor
data = ''.join(chr(int(random.gauss(0, .8)) % 256) for x in xrange(100000))
return StringIO(data)
return StringIO(_DummyData(random, 100000))
class DummyZODB(object):
......
......@@ -89,7 +89,7 @@ class StorageDBTests(NeoUnitTestBase):
self.db.lockTransaction(tid, ttid)
yield
if commit:
self.db.unlockTransaction(tid, ttid)
self.db.unlockTransaction(tid, ttid, True, objs)
self.db.commit()
elif commit is not None:
self.db.abortTransaction(ttid)
......@@ -227,6 +227,7 @@ class StorageDBTests(NeoUnitTestBase):
def test_changePartitionTable(self):
db = self.getDB()
db.setNumPartitions(3)
ptid = 1
uuid = self.getStorageUUID()
cell1 = 0, uuid, CellStates.OUT_OF_DATE
......@@ -253,7 +254,7 @@ class StorageDBTests(NeoUnitTestBase):
txn1, objs1 = self.getTransaction([oid1])
txn2, objs2 = self.getTransaction([oid2])
# nothing in database
self.assertEqual(self.db.getLastIDs(), (None, {}, {}, None))
self.assertEqual(self.db.getLastIDs(), (None, None))
self.assertEqual(self.db.getUnfinishedTIDDict(), {})
self.assertEqual(self.db.getObject(oid1), None)
self.assertEqual(self.db.getObject(oid2), None)
......@@ -319,13 +320,17 @@ class StorageDBTests(NeoUnitTestBase):
expected = [(t, oid_list[offset+i]) for t in tids for i in (0, np)]
self.assertEqual(self.db.getReplicationObjectList(ZERO_TID,
MAX_TID, len(expected) + 1, offset, ZERO_OID), expected)
self.db._deleteRange(0, MAX_TID)
self.db._deleteRange(0, max_tid=ZERO_TID)
def deleteRange(partition, min_tid=None, max_tid=None):
self.db._deleteRange(partition,
None if min_tid is None else u64(min_tid),
None if max_tid is None else u64(max_tid))
deleteRange(0, MAX_TID)
deleteRange(0, max_tid=ZERO_TID)
check(0, [], t1, t2, t3)
self.db._deleteRange(0); check(0, [])
self.db._deleteRange(1, t2); check(1, [t1], t1, t2)
self.db._deleteRange(2, max_tid=t2); check(2, [], t3)
self.db._deleteRange(3, t1, t2); check(3, [t3], t1, t3)
deleteRange(0); check(0, [])
deleteRange(1, t2); check(1, [t1], t1, t2)
deleteRange(2, max_tid=t2); check(2, [], t3)
deleteRange(3, t1, t2); check(3, [t3], t1, t3)
def test_getTransaction(self):
oid1, oid2 = self.getOIDs(2)
......
......@@ -15,17 +15,32 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from MySQLdb import NotSupportedError, OperationalError
from contextlib import contextmanager
from MySQLdb import NotSupportedError, OperationalError, ProgrammingError
from MySQLdb.constants.CR import SERVER_GONE_ERROR
from MySQLdb.constants.ER import UNKNOWN_STORAGE_ENGINE
from ..mock import Mock
from neo.lib.exception import DatabaseFailure
from neo.lib.protocol import ZERO_OID
from neo.lib.util import p64
from .. import DB_PREFIX, DB_SOCKET, DB_USER
from .. import DB_PREFIX, DB_SOCKET, DB_USER, Patch
from .testStorageDBTests import StorageDBTests
from neo.storage.database import DatabaseFailure
from neo.storage.database.mysqldb import MySQLDatabaseManager
class ServerGone(object):
@contextmanager
def __new__(cls, db):
self = object.__new__(cls)
with Patch(db, conn=self) as self._p:
yield self._p
def query(self, *args):
self._p.revert()
raise OperationalError(SERVER_GONE_ERROR, 'this is a test')
class StorageMySQLdbTests(StorageDBTests):
engine = None
......@@ -67,23 +82,9 @@ class StorageMySQLdbTests(StorageDBTests):
calls[0].checkArgs('SELECT ')
def test_query2(self):
# test the OperationalError exception
# fake object, raise exception during the first call
from MySQLdb.constants.CR import SERVER_GONE_ERROR
class FakeConn(object):
def query(*args):
raise OperationalError(SERVER_GONE_ERROR, 'this is a test')
self.db.conn = FakeConn()
self.connect_called = False
def connect_hook():
# mock object, break raise/connect loop
self.db.conn = Mock()
self.connect_called = True
self.db._connect = connect_hook
# make a query, exception will be raised then connect() will be
# called and the second query will use the mock object
self.db.query('INSERT')
self.assertTrue(self.connect_called)
with ServerGone(self.db) as p:
self.assertRaises(ProgrammingError, self.db.query, 'QUERY')
self.assertFalse(p.applied)
def test_query3(self):
# OperationalError > raise DatabaseFailure exception
......
......@@ -21,6 +21,8 @@ from neo.lib.util import ReadBuffer, parseNodeAddress
class UtilTests(NeoUnitTestBase):
from neo.storage.shared_queue import test as testSharedQueue
def test_parseNodeAddress(self):
""" Parsing of addresses """
def test(parsed, *args):
......
......@@ -19,7 +19,6 @@
import os, random, select, socket, sys, tempfile
import thread, threading, time, traceback, weakref
from collections import deque
from ConfigParser import SafeConfigParser
from contextlib import contextmanager
from itertools import count
from functools import partial, wraps
......@@ -37,8 +36,9 @@ from neo.lib.handler import EventHandler
from neo.lib.locking import SimpleQueue
from neo.lib.protocol import ClusterStates, Enum, NodeStates, NodeTypes, Packets
from neo.lib.util import cached_property, parseMasterList, p64
from .. import NeoTestBase, Patch, getTempDirectory, setupMySQLdb, \
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_SOCKET, DB_USER
from .. import (getTempDirectory, setupMySQLdb,
ImporterConfigParser, NeoTestBase, Patch,
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_SOCKET, DB_USER)
BIND = IP_VERSION_FORMAT_DICT[ADDRESS_TYPE], 0
LOCAL_IP = socket.inet_pton(ADDRESS_TYPE, IP_VERSION_FORMAT_DICT[ADDRESS_TYPE])
......@@ -185,6 +185,8 @@ class Serialized(object):
# a single-core CPU, other threads are still busy and haven't
# sent anything yet on the network. This causes tic() to
# return prematurely. Passing a non-zero value is a hack.
# We also increase SocketConnector.SOMAXCONN in tests so that
# a connection attempt is never delayed inside the kernel.
timeout=0):
# If you're in a pdb here, 'n' switches to another thread
# (the following lines are not supposed to be debugged into)
......@@ -634,6 +636,7 @@ class NEOCluster(object):
Patch(BaseConnection, getTimeout=lambda orig, self: None),
Patch(SimpleQueue, __init__=__init__),
Patch(SocketConnector, CONNECT_LIMIT=0),
Patch(SocketConnector, SOMAXCONN=128), # see Serialized.tic comment
Patch(SocketConnector, _bind=lambda orig, self, addr: orig(self, BIND)),
Patch(SocketConnector, _connect = lambda orig, self, addr:
orig(self, ServerNode.resolv(addr))))
......@@ -674,8 +677,8 @@ class NEOCluster(object):
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
storage_count=None, db_list=None, clear_databases=True,
db_user=DB_USER, db_password='', compress=True,
importer=None, autostart=None, dedup=False):
self.name = 'neo_%s' % self._allocate('name',
importer=None, autostart=None, dedup=False, name=None):
self.name = name or 'neo_%s' % self._allocate('name',
lambda: random.randint(0, 100))
self.compress = compress
self.num_partitions = partitions
......@@ -707,14 +710,8 @@ class NEOCluster(object):
else:
assert False, adapter
if importer:
cfg = SafeConfigParser()
cfg.add_section("neo")
cfg.set("neo", "adapter", adapter)
cfg = ImporterConfigParser(adapter, **importer)
cfg.set("neo", "database", db % tuple(db_list))
for name, zodb in importer:
cfg.add_section(name)
for x in zodb.iteritems():
cfg.set(name, *x)
db = os.path.join(getTempDirectory(), '%s.conf')
with open(db % tuple(db_list), "w") as f:
cfg.write(f)
......@@ -813,7 +810,7 @@ class NEOCluster(object):
else NodeStates.RUNNING)
for node in self.storage_list if storage_list is None else storage_list:
state = self.getNodeState(node)
assert state == expected_state, (node, state)
assert state == expected_state, (repr(node), state)
def stop(self, clear_database=False, __print_exc=traceback.print_exc, **kw):
if self.started:
......@@ -933,10 +930,9 @@ class NEOCluster(object):
if dummy_zodb is None:
from ..stat_zodb import PROD1
dummy_zodb = PROD1(random)
preindex = {}
as_storage = dummy_zodb.as_storage
return lambda count: self.getZODBStorage().importFrom(
as_storage(count), preindex=preindex)
return lambda count: self.getZODBStorage().copyTransactionsFrom(
as_storage(count))
def populate(self, transaction_list, tid=lambda i: p64(i+1),
oid=lambda i: p64(i+1)):
......@@ -1061,7 +1057,11 @@ class NEOThreadedTest(NeoTestBase):
with Patch(client, _getFinalTID=lambda *_: None):
self.assertRaises(ConnectionClosed, txn.commit)
def assertPartitionTable(self, cluster, expected, pt_node=None):
def assertPartitionTable(self, cluster, expected, pt_node=None,
sort_by_nid=False):
if sort_by_nid:
index = lambda x: x
else:
index = [x.uuid for x in cluster.storage_list].index
super(NEOThreadedTest, self).assertPartitionTable(
(pt_node or cluster.admin).pt, expected,
......
......@@ -23,7 +23,6 @@ import unittest
from collections import defaultdict
from contextlib import contextmanager
from thread import get_ident
from zlib import compress
from persistent import Persistent, GHOST
from transaction.interfaces import TransientError
from ZODB import DB, POSException
......@@ -31,7 +30,7 @@ from ZODB.DB import TransactionalUndo
from neo.storage.transactions import TransactionManager, ConflictError
from neo.lib.connection import ConnectionClosed, \
ServerConnection, MTClientConnection
from neo.lib.exception import DatabaseFailure, StoppedOperation
from neo.lib.exception import StoppedOperation
from neo.lib.handler import DelayEvent, EventHandler
from neo.lib import logging
from neo.lib.protocol import (CellStates, ClusterStates, NodeStates, NodeTypes,
......@@ -43,6 +42,7 @@ from neo.lib.util import add64, makeChecksum, p64, u64
from neo.client.exception import NEOPrimaryMasterLost, NEOStorageError
from neo.client.transactions import Transaction
from neo.master.handlers.client import ClientServiceHandler
from neo.storage.database import DatabaseFailure
from neo.storage.handlers.client import ClientOperationHandler
from neo.storage.handlers.identification import IdentificationHandler
from neo.storage.handlers.initialization import InitializationHandler
......@@ -60,12 +60,13 @@ class PCounterWithResolution(PCounter):
class Test(NEOThreadedTest):
@with_cluster()
def testBasicStore(self, cluster):
if 1:
def testBasicStore(self, dedup=False):
with NEOCluster(dedup=dedup) as cluster:
cluster.start()
storage = cluster.getZODBStorage()
storage.sync()
storage.app.max_reconnection_to_master = 0
compress = storage.app.compress._compress
data_info = {}
compressible = 'x' * 20
compressed = compress(compressible)
......@@ -137,27 +138,6 @@ class Test(NEOThreadedTest):
self.assertRaises(POSException.POSKeyError,
storage.load, oid, '')
@with_cluster()
def testCreationUndoneHistory(self, cluster):
if 1:
storage = cluster.getZODBStorage()
oid = storage.new_oid()
txn = transaction.Transaction()
storage.tpc_begin(txn)
storage.store(oid, None, 'foo', '', txn)
storage.tpc_vote(txn)
tid1 = storage.tpc_finish(txn)
storage.tpc_begin(txn)
storage.undo(tid1, txn)
tid2 = storage.tpc_finish(txn)
storage.tpc_begin(txn)
storage.undo(tid2, txn)
tid3 = storage.tpc_finish(txn)
expected = [(tid1, 3), (tid2, 0), (tid3, 3)]
for x in storage.history(oid, 10):
self.assertEqual((x['tid'], x['size']), expected.pop())
self.assertFalse(expected)
def _testUndoConflict(self, cluster, *inc):
def waitResponses(orig, *args):
orig(*args)
......@@ -738,8 +718,9 @@ class Test(NEOThreadedTest):
@with_cluster()
def testStorageUpgrade1(self, cluster):
if 1:
storage = cluster.storage
# Disable migration steps that aren't idempotent.
with Patch(storage.dm.__class__, _migrate3=lambda *_: None):
t, c = cluster.getTransaction()
storage.dm.setConfiguration("version", None)
c.root()._p_changed = 1
......@@ -1309,7 +1290,7 @@ class Test(NEOThreadedTest):
s1.resetNode()
with Patch(s1.dm, truncate=dieFirst(1)):
s1.start()
self.assertEqual(s0.dm.getLastIDs()[0], truncate_tid)
self.assertFalse(s0.dm.getLastIDs()[0])
self.assertEqual(s1.dm.getLastIDs()[0], r._p_serial)
self.tic()
self.assertEqual(calls, [1, 2])
......@@ -1723,7 +1704,18 @@ class Test(NEOThreadedTest):
x.value += 1
c2.root()['x'].value += 2
TransactionalResource(t1, 1, tpc_begin=begin1)
s1m, = s1.getConnectionList(cluster.master)
# BUG: Very rarely, getConnectionList returns more that 1
# connection ("too many values to unpack"), which is
# a mystery and impossible to reproduce:
# - 1st time: v1.8.1 on a test machine (no SSL)
# - last: current revision on my laptop (SSL),
# at the first iteration of this loop
_sm = list(s1.getConnectionList(cluster.master))
try:
s1m, = _sm
except ValueError:
self.fail((_sm, list(
s1.getConnectionList(cluster.master))))
try:
s1.em.removeReader(s1m)
with ConnectionFilter() as f, \
......@@ -2371,7 +2363,7 @@ class Test(NEOThreadedTest):
oid, tid = big_id_list[i]
for j, expected in (
(1 - i, (dm.getLastTID(u64(MAX_TID)), dm.getLastIDs())),
(i, (u64(tid), (tid, {}, {}, oid)))):
(i, (u64(tid), (tid, oid)))):
oid, tid = big_id_list[j]
# Somehow we abuse 'storeTransaction' because we ask it to
# write data for unassigned partitions. This is not checked
......@@ -2381,6 +2373,44 @@ class Test(NEOThreadedTest):
self.assertEqual(expected,
(dm.getLastTID(u64(MAX_TID)), dm.getLastIDs()))
def testStorageUpgrade(self):
path = os.path.join(os.path.dirname(__file__),
self._testMethodName + '-%s',
's%s.sql')
dump_dict = {}
def switch(s):
dm = s.dm
dm.commit()
dump_dict[s.uuid] = dm.dump()
dm.erase()
with open(path % (s.getAdapter(), s.uuid)) as f:
dm.restore(f.read())
with NEOCluster(storage_count=3, partitions=3, replicas=1,
name=self._testMethodName) as cluster:
s1, s2, s3 = cluster.storage_list
cluster.start(storage_list=(s1,))
for s in s2, s3:
s.start()
self.tic()
cluster.neoctl.enableStorageList([s.uuid])
cluster.neoctl.tweakPartitionTable()
self.tic()
nid_list = [s.uuid for s in cluster.storage_list]
switch(s3)
s3.stop()
storage = cluster.getZODBStorage()
txn = transaction.Transaction()
storage.tpc_begin(txn, p64(85**9)) # partition 1
storage.store(p64(0), None, 'foo', '', txn)
storage.tpc_vote(txn)
storage.tpc_finish(txn)
self.tic()
switch(s1)
switch(s2)
cluster.stop()
for i, s in zip(nid_list, cluster.storage_list):
self.assertMultiLineEqual(s.dm.dump(), dump_dict[i])
if __name__ == "__main__":
unittest.main()
#
# Copyright (C) 2018 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
from contextlib import contextmanager
from ZConfig import ConfigurationSyntaxError
from ZODB.config import databaseFromString
from .. import Patch
from . import ClientApplication, NEOThreadedTest, with_cluster
from neo.client import Storage
def databaseFromDict(**kw):
return databaseFromString("%%import neo.client\n"
"<zodb>\n <NEOStorage>\n%s </NEOStorage>\n</zodb>\n"
% ''.join(' %s %s\n' % x for x in kw.iteritems()))
class ConfigTests(NEOThreadedTest):
dummy_required = {'name': 'cluster', 'master_nodes': '127.0.0.1:10000'}
@contextmanager
def _db(self, cluster, **kw):
kw['name'] = cluster.name
kw['master_nodes'] = cluster.master_nodes
def newClient(_, *args, **kw):
client = ClientApplication(*args, **kw)
t.append(client.poll_thread)
return client
t = []
with Patch(Storage, Application=newClient):
db = databaseFromDict(**kw)
try:
yield db
finally:
db.close()
cluster.join(t)
@with_cluster()
def testCompress(self, cluster):
kw = self.dummy_required.copy()
valid = ['false', 'true', 'zlib', 'zlib=9']
for kw['compress'] in '9', 'best', 'zlib=0', 'zlib=100':
self.assertRaises(ConfigurationSyntaxError, databaseFromDict, **kw)
for compress in valid:
with self._db(cluster, compress=compress) as db:
self.assertEqual((0,0,''), db.storage.app.compress(''))
if __name__ == "__main__":
unittest.main()
......@@ -16,20 +16,19 @@
from cPickle import Pickler, Unpickler
from cStringIO import StringIO
from itertools import islice, izip_longest
import os, shutil, unittest
import neo, transaction, ZODB
from itertools import izip_longest
import os, random, shutil, time, unittest
import transaction, ZODB
from neo.client.exception import NEOPrimaryMasterLost
from neo.lib import logging
from neo.lib.util import u64
from neo.storage.database.importer import Repickler
from ..fs2zodb import Inode
from .. import expectedFailure, getTempDirectory
from neo.storage.database import getAdapterKlass, importer, manager
from neo.storage.database.importer import Repickler, TransactionRecord
from .. import expectedFailure, getTempDirectory, random_tree, Patch
from . import NEOCluster, NEOThreadedTest
from ZODB import serialize
from ZODB.FileStorage import FileStorage
class Equal:
_recurse = {}
......@@ -129,61 +128,58 @@ class ImporterTests(NEOThreadedTest):
self.assertIs(Obj, load())
self.assertDictEqual(state, load())
def test(self):
# XXX: Using NEO source files as test data was a bad idea because
# the test breaks easily in case of massive changes in the code,
# or if there are many untracked files.
importer = []
def _importFromFileStorage(self, multi=(),
root_filter=None, sub_filter=None):
import_hash = '1d4ff03730fe6bcbf235e3739fbe5f5b'
txn_size = 10
tree = random_tree.generateTree(random.Random(0))
i = len(tree) // 3
assert i > txn_size
before_tree = tree[:i]
after_tree = tree[i:]
fs_dir = os.path.join(getTempDirectory(), self.id())
shutil.rmtree(fs_dir, 1) # for --loop
os.mkdir(fs_dir)
src_root, = neo.__path__
fs_list = "root", "client", "master", "tests"
def not_pyc(name):
return not name.endswith(".pyc")
# We use 'hash' to skip roughly half of files.
# They'll be added after the migration has started.
def root_filter(name):
if not_pyc(name):
i = name.find(os.sep)
return (i < 0 or name[:i] not in fs_list) and (
'.' not in name or hash(name) & 1)
def sub_filter(name):
return lambda n: not_pyc(n) and (
hash(n) & 1 if '.' in n else
os.sep in n or n in (name, "scripts"))
conn_list = []
iter_list = []
db_list = []
# Setup several FileStorage databases.
for i, name in enumerate(fs_list):
fs_path = os.path.join(fs_dir, name + ".fs")
for i, db in enumerate(('root',) + multi):
fs_path = os.path.join(fs_dir, '%s.fs' % db)
c = ZODB.DB(FileStorage(fs_path)).open()
r = c.root()["neo"] = Inode()
r = c.root()['tree'] = random_tree.Node()
transaction.commit()
conn_list.append(c)
iter_list.append(r.treeFromFs(src_root, 10,
sub_filter(name) if i else root_filter))
importer.append((name, {
iter_list.append(random_tree.importTree(r, before_tree, txn_size,
sub_filter(db) if i else root_filter))
db_list.append((db, r, {
"storage": "<filestorage>\npath %s\n</filestorage>" % fs_path
}))
# Populate FileStorage databases.
for iter_list in izip_longest(*iter_list):
for i in iter_list:
if i:
for i, iter_list in enumerate(izip_longest(*iter_list)):
for r in iter_list:
if r:
transaction.commit()
del iter_list
# Get oids of mount points and close.
for (name, cfg), c in zip(importer, conn_list):
r = c.root()["neo"]
if name == "root":
for name in fs_list[1:]:
cfg[name] = str(u64(r[name]._p_oid))
zodb = []
importer = {'zodb': zodb}
for db, r, cfg in db_list:
if db == 'root':
if multi:
for x in multi:
cfg['_%s' % x] = str(u64(r[x]._p_oid))
else:
h = random_tree.hashTree(r)
h()
self.assertEqual(import_hash, h.hexdigest())
importer['writeback'] = 'true'
else:
cfg["oid"] = str(u64(r[name]._p_oid))
c.db().close()
#del importer[0][1][importer.pop()[0]]
# Start NEO cluster with transparent import of a multi-base ZODB.
with NEOCluster(compress=False, importer=importer) as cluster:
cfg["oid"] = str(u64(r[db]._p_oid))
db = '_%s' % db
r._p_jar.db().close()
zodb.append((db, cfg))
del db_list, iter_list
#del zodb[0][1][zodb.pop()[0]]
# Start NEO cluster with transparent import.
with NEOCluster(importer=importer) as cluster:
# Suspend import for a while, so that import
# is finished in the middle of the below 'for' loop.
# Use a slightly different main loop for storage so that it
......@@ -202,7 +198,7 @@ class ImporterTests(NEOThreadedTest):
dm.doOperation = doOperation
cluster.start()
t, c = cluster.getTransaction()
r = c.root()["neo"]
r = c.root()['tree']
# Test retrieving of an object from ZODB when next serial is in NEO.
r._p_changed = 1
t.commit()
......@@ -213,31 +209,81 @@ class ImporterTests(NEOThreadedTest):
##
self.assertRaisesRegexp(NotImplementedError, " getObjectHistory$",
c.db().history, r._p_oid)
i = r.walk()
next(islice(i, 4, None))
h = random_tree.hashTree(r)
h(30)
logging.info("start migration")
dm.doOperation(cluster.storage)
# Adjust if needed. Must remain > 0.
assert 14 == sum(1 for i in i)
self.assertEqual(22, h())
self.assertEqual(import_hash, h.hexdigest())
# New writes after the switch to NEO.
last_import = -1
for i, r in enumerate(r.treeFromFs(src_root, 6, not_pyc)):
for i, r in enumerate(random_tree.importTree(
r, after_tree, txn_size)):
t.commit()
if cluster.storage.dm._import:
last_import = i
self.tic()
# Same as above. We want last_import smaller enough compared to i
assert i / 3 < last_import < i - 2, (last_import, i)
assert i < last_import * 3 < 2 * i, (last_import, i)
self.assertFalse(cluster.storage.dm._import)
i = len(src_root) + 1
self.assertEqual(sorted(r.walk()), sorted(
(x[i:] or '.', sorted(y), sorted(filter(not_pyc, z)))
for x, y, z in os.walk(src_root)))
t.commit()
storage._cache.clear()
def finalCheck(r):
h = random_tree.hashTree(r)
self.assertEqual(93, h())
self.assertEqual('6bf0f0cb2d6c1aae9e52c412ef0e25b6',
h.hexdigest())
finalCheck(r)
if dm._writeback:
dm.commit()
dm._writeback.wait()
if dm._writeback:
db = ZODB.DB(FileStorage(fs_path, read_only=True))
finalCheck(db.open().root()['tree'])
db.close()
@unittest.skipUnless(importer.FORK, 'no os.fork')
def test1(self):
self._importFromFileStorage()
def testThreadedWriteback(self):
# Also check reconnection to the underlying DB for relevant backends.
tid_list = []
def __init__(orig, tr, db, tid):
orig(tr, db, tid)
tid_list.append(tid)
def fetchObject(orig, db, *args):
if len(tid_list) == 5:
if isinstance(db, getAdapterKlass('MySQL')):
from neo.tests.storage.testStorageMySQL import ServerGone
with ServerGone(db):
orig(db, *args)
self.fail()
else:
tid_list.append(None)
p.revert()
return orig(db, *args)
def sleep(orig, seconds):
self.assertEqual(len(tid_list), 5)
p.revert()
with Patch(importer, FORK=False), \
Patch(TransactionRecord, __init__=__init__), \
Patch(manager.DatabaseManager, fetchObject=fetchObject), \
Patch(time, sleep=sleep) as p:
self._importFromFileStorage()
self.assertFalse(p.applied)
self.assertEqual(len(tid_list), 11)
def testMerge(self):
multi = 1, 2, 3
self._importFromFileStorage(multi,
(lambda path: path[0] not in multi or len(path) == 1),
(lambda db: lambda path: path[0] in (db, 4)))
if getattr(serialize, '_protocol', 1) > 1:
# XXX: With ZODB5, we should at least keep a working test that does not
# merge several DB.
test = expectedFailure(NEOPrimaryMasterLost)(test)
testMerge = expectedFailure(NEOPrimaryMasterLost)(testMerge)
if __name__ == "__main__":
unittest.main()
This diff is collapsed.
......@@ -37,6 +37,11 @@ class SSLTests(SSLMixin, test.Test):
testStorageDataLock2 = None
testUndoConflictDuringStore = None
# With MySQL, this test is expensive.
# Let's check deduplication of big oids here.
def testBasicStore(self):
super(SSLTests, self).testBasicStore(True)
def testAbortConnection(self, after_handshake=1):
with self.getLoopbackConnection() as conn:
conn.ask(Packets.Ping())
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -38,7 +38,7 @@ extras_require = {
'master': [],
'storage-sqlite': [],
'storage-mysqldb': ['mysqlclient'],
'storage-importer': zodb_require,
'storage-importer': zodb_require + ['msgpack>=0.5.6', 'setproctitle'],
}
extras_require['tests'] = ['coverage', 'zope.testing', 'psutil>=2',
'neoppod[%s]' % ', '.join(extras_require)]
......@@ -60,7 +60,7 @@ else:
setup(
name = 'neoppod',
version = '1.9',
version = '1.10',
description = __doc__.strip(),
author = 'Nexedi SA',
author_email = 'neo-dev@erp5.org',
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment