Commit 7924cd62 by Julien Muchembled

Add tool to synthesize a ZODB for benchmark

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2778 71dcc9de-d417-0410-9af5-da40c76e7ee4
1 parent d0ee6041
......@@ -8,7 +8,7 @@ import datetime
from email.MIMEMultipart import MIMEMultipart
from email.MIMEText import MIMEText
from neo.tests.functional import NEOCluster
from neo.lib import logger
MAIL_SERVER = '127.0.0.1:25'
......@@ -41,13 +41,13 @@ class BenchmarkRunner(object):
# check specifics arguments
self._config = AttributeDict()
self._config.update(self.load_options(options, self._args))
self._config.update(dict(
self._config.update(
title = options.title or self.__class__.__name__,
verbose = options.verbose,
verbose = bool(options.verbose),
mail_from = options.mail_from,
mail_to = options.mail_to,
mail_server = mail_server.split(':'),
))
)
def add_status(self, key, value):
self._status.append((key, value))
......@@ -91,6 +91,7 @@ class BenchmarkRunner(object):
s.close()
def run(self):
logger.PACKET_LOGGER.enable(self._config.verbose)
subject, report = self.start()
report = self.build_report(report)
if self._config.mail_to:
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import math, os, random, sys
from cStringIO import StringIO
from ZODB.utils import p64
from ZODB.BaseStorage import TransactionRecord
from ZODB.FileStorage import FileStorage
# Stats of a 43.5 GB production Data.fs
# µ σ
# size of object 6.04237779991 1.55811487853
# # objects / transaction 1.04108991045 0.906703192546
# size of transaction 7.98615420517 1.6624220402
#
# % of new object / transaction: 0.810080409164
# # of transactions: 1541194
# compression ratio: 28.5 % (gzip -6)
PROD1 = lambda random=random: DummyZODB(6.04237779991, 1.55811487853,
1.04108991045, 0.906703192546,
0.810080409164, random)
def DummyData(random=random):
# returns data that gzip at about 28.5 %
# make sure sample is bigger than dictionary of compressor
data = ''.join(chr(int(random.gauss(0, .8)) % 256) for x in xrange(100000))
return StringIO(data)
class DummyZODB(object):
"""
Object size and count of generated transaction follows a log normal
distribution, where *_mu and *_sigma are their parameters.
"""
def __init__(self, obj_size_mu, obj_size_sigma,
obj_count_mu, obj_count_sigma,
new_ratio, random=random):
self.obj_size_mu = obj_size_mu
self.obj_size_sigma = obj_size_sigma
self.obj_count_mu = obj_count_mu
self.obj_count_sigma = obj_count_sigma
self.random = random
self.new_ratio = new_ratio
self.next_oid = 0
self.err_count = 0
def __call__(self):
variate = self.random.lognormvariate
oid_set = set()
for i in xrange(int(round(variate(self.obj_count_mu,
self.obj_count_sigma))) or 1):
if len(oid_set) >= self.next_oid or \
self.random.random() < self.new_ratio:
oid = self.next_oid
self.next_oid = oid + 1
else:
while True:
oid = self.random.randrange(self.next_oid)
if oid not in oid_set:
break
oid_set.add(oid)
yield p64(oid), int(round(variate(self.obj_size_mu,
self.obj_size_sigma))) or 1
def as_storage(self, transaction_count, dummy_data_file=None):
if dummy_data_file is None:
dummy_data_file = DummyData(self.random)
class dummy_change(object):
data_txn = None
version = ''
def __init__(self, tid, oid, size):
self.tid = tid
self.oid = oid
data = ''
while size:
d = dummy_data_file.read(size)
size -= len(d)
data += d
if size:
dummy_data_file.seek(0)
self.data = data
class dummy_transaction(TransactionRecord):
def __init__(transaction, *args):
TransactionRecord.__init__(transaction, *args)
transaction_size = 0
transaction.record_list = []
add_record = transaction.record_list.append
for x in self():
oid, size = x
transaction_size += size
add_record(dummy_change(transaction.tid, oid, size))
transaction.size = transaction_size
def __iter__(transaction):
return iter(transaction.record_list)
class dummy_storage(object):
size = 0
def iterator(storage, *args):
args = ' ', '', '', {}
for i in xrange(1, transaction_count+1):
t = dummy_transaction(p64(i), *args)
storage.size += t.size
yield t
def getSize(self):
return self.size
return dummy_storage()
def lognorm_stat(X):
Y = map(math.log, X)
n = len(Y)
mu = sum(Y) / n
s2 = sum(d*d for d in (y - mu for y in Y)) / n
return mu, math.sqrt(s2)
def stat(*storages):
obj_size_list = []
obj_count_list = []
tr_size_list = []
oid_set = set()
for storage in storages:
for transaction in storage.iterator():
obj_count = tr_size = 0
for r in transaction:
if r.data:
obj_count += 1
oid = r.oid
if oid not in oid_set:
oid_set.add(oid)
size = len(r.data)
tr_size += size
obj_size_list.append(size)
obj_count_list.append(obj_count)
tr_size_list.append(tr_size)
new_ratio = float(len(oid_set)) / len(obj_size_list)
return (lognorm_stat(obj_size_list),
lognorm_stat(obj_count_list),
lognorm_stat(tr_size_list),
new_ratio, len(tr_size_list))
def main():
s = stat(*(FileStorage(x, read_only=True) for x in sys.argv[1:]))
print(u" %-15s σ\n"
"size of object %-15s %s\n"
"# objects / transaction %-15s %s\n"
"size of transaction %-15s %s\n"
"\n%% of new object / transaction: %s"
"\n# of transactions: %s"
% ((u"µ",) + s[0] + s[1] + s[2] + s[3:]))
if __name__ == "__main__":
sys.exit(main())
......@@ -16,7 +16,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import os, random, socket, sys, threading, time, types
import os, random, socket, sys, tempfile, threading, time, types
from collections import deque
from functools import wraps
from Queue import Queue, Empty
......@@ -270,6 +270,20 @@ class NeoCTL(neo.neoctl.app.NeoCTL):
lambda self, address: setattr(self, '_server', address))
class LoggerThreadName(object):
def __init__(self, default='TEST'):
self.__default = default
def __getattr__(self, attr):
return getattr(str(self), attr)
def __str__(self):
try:
return threading.currentThread().node_name
except AttributeError:
return self.__default
class NEOCluster(object):
BaseConnection_checkTimeout = staticmethod(BaseConnection.checkTimeout)
......@@ -326,14 +340,21 @@ class NEOCluster(object):
def __init__(self, master_count=1, partitions=1, replicas=0,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'BTree'),
storage_count=None, db_list=None,
db_user='neo', db_password='neo'):
storage_count=None, db_list=None, clear_databases=True,
db_user='neo', db_password='neo', verbose=None):
if verbose is not None:
temp_dir = os.getenv('TEMP') or \
os.path.join(tempfile.gettempdir(), 'neo_tests')
os.path.exists(temp_dir) or os.makedirs(temp_dir)
log_file = tempfile.mkstemp('.log', '', temp_dir)[1]
print 'Logging to %r' % log_file
setupLog(LoggerThreadName(), log_file, verbose)
self.name = 'neo_%s' % random.randint(0, 100)
ip = getVirtualIp('master')
self.master_nodes = ' '.join('%s:%s' % (ip, i)
for i in xrange(master_count))
kw = dict(cluster=self, getReplicas=replicas, getPartitions=partitions,
getAdapter=adapter, getReset=True)
getAdapter=adapter, getReset=clear_databases)
self.master_list = [MasterApplication(address=(ip, i), **kw)
for i in xrange(master_count)]
ip = getVirtualIp('storage')
......@@ -383,7 +404,7 @@ class NEOCluster(object):
self.client = ClientApplication(self)
self.neoctl = NeoCTL(self)
def start(self, client=False, storage_list=None, fast_startup=True):
def start(self, storage_list=None, fast_startup=True):
self.__class__._cluster = weak_ref(self)
for node_type in 'master', 'admin':
for node in getattr(self, node_type + '_list'):
......@@ -401,8 +422,6 @@ class NEOCluster(object):
self.tic()
assert self.neoctl.getClusterState() == ClusterStates.RUNNING
self.enableStorageList(storage_list)
if client:
self.startClient()
def enableStorageList(self, storage_list):
self.neoctl.enableStorageList([x.uuid for x in storage_list])
......@@ -410,13 +429,16 @@ class NEOCluster(object):
for node in storage_list:
assert self.getNodeState(node) == NodeStates.RUNNING
def startClient(self):
self.client.setPoll(True)
self.db = ZODB.DB(storage=self.getZODBStorage())
@property
def db(self):
try:
return self._db
except AttributeError:
self._db = db = ZODB.DB(storage=self.getZODBStorage())
return db
def stop(self):
if hasattr(self, 'db'):
self.db.close()
getattr(self, '_db', self.client).close()
#self.neoctl.setClusterState(ClusterStates.STOPPING) # TODO
try:
Serialized.release(stop=1)
......@@ -446,6 +468,9 @@ class NEOCluster(object):
if cell[1] == CellStates.OUT_OF_DATE]
def getZODBStorage(self, **kw):
# automatically put client in master mode
if self.client.em._timeout == 0:
self.client.setPoll(True)
return Storage.Storage(None, self.name, _app=self.client, **kw)
def getTransaction(self):
......@@ -453,17 +478,6 @@ class NEOCluster(object):
return txn, self.db.open(txn)
class LoggerThreadName(object):
def __getattr__(self, attr):
return getattr(str(self), attr)
def __str__(self):
try:
return threading.currentThread().node_name
except AttributeError:
return 'TEST'
class NEOThreadedTest(NeoUnitTestBase):
def setupLog(self):
......
......@@ -29,7 +29,7 @@ class Test(NEOThreadedTest):
def test_commit(self):
cluster = NEOCluster()
cluster.start(1)
cluster.start()
try:
t, c = cluster.getTransaction()
c.root()['foo'] = PObject()
......@@ -42,7 +42,8 @@ class Test(NEOThreadedTest):
# (neo.tests.client.testMasterHandler)
cluster = NEOCluster()
try:
cluster.start(1)
cluster.start()
cluster.db # open DB
cluster.client.setPoll(0)
storage, = cluster.client.nm.getStorageList()
conn = storage.getConnection()
......
......@@ -42,6 +42,7 @@ setup(
'neostorage=neo.scripts.neostorage:main',
'neotestrunner=neo.scripts.runner:main',
'neosimple=neo.scripts.simple:main',
'stat_zodb=neo.tests.stat_zodb:main',
],
},
# Raah!!! I wish I could write something like:
......
......@@ -7,7 +7,6 @@ import traceback
from time import time
from neo.tests.benchmark import BenchmarkRunner
from neo.tests.functional import NEOCluster
from ZODB.FileStorage import FileStorage
MIN_STORAGES = 1
......@@ -25,9 +24,10 @@ class MatrixImportBenchmark(BenchmarkRunner):
parser.add_option('', '--max-storages')
parser.add_option('', '--min-replicas')
parser.add_option('', '--max-replicas')
parser.add_option('', '--threaded', action="store_true")
def load_options(self, options, args):
if not options.datafs or not os.path.exists(options.datafs):
if options.datafs and not os.path.exists(options.datafs):
sys.exit('Missing or wrong data.fs argument')
return dict(
datafs = options.datafs,
......@@ -35,6 +35,7 @@ class MatrixImportBenchmark(BenchmarkRunner):
max_s = int(options.max_storages or MAX_STORAGES),
min_r = int(options.min_replicas or MIN_REPLICAS),
max_r = int(options.max_replicas or MAX_REPLICAS),
threaded = options.threaded,
)
def start(self):
......@@ -49,42 +50,59 @@ class MatrixImportBenchmark(BenchmarkRunner):
if storages[-1] < max_s:
storages.append(max_s)
replicas = range(min_r, max_r + 1)
results = self.runMatrix(storages, replicas)
if self._config.threaded:
from neo.tests.threaded import NEOCluster
NEOCluster.patch() # XXX ugly
try:
results = self.runMatrix(storages, replicas)
finally:
if self._config.threaded:
from neo.tests.threaded import NEOCluster
NEOCluster.unpatch()# XXX ugly
return self.buildReport(storages, replicas, results)
def runMatrix(self, storages, replicas):
stats = {}
size = float(os.path.getsize(self._config.datafs))
for s in storages:
for r in [r for r in replicas if r < s]:
stats.setdefault(s, {})
result = self.runImport(1, s, r, 100)
if result is not None:
result = size / result / 1024
stats[s][r] = result
stats[s][r] = self.runImport(1, s, r, 100)
return stats
def runImport(self, masters, storages, replicas, partitions):
datafs = self._config.datafs
if datafs:
dfs_storage = FileStorage(file_name=self._config.datafs)
else:
datafs = 'PROD1'
import random, neo.tests.stat_zodb
dfs_storage = getattr(neo.tests.stat_zodb, datafs)(
random.Random(0)).as_storage(100)
print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
self._config.datafs, masters, storages, replicas, partitions)
datafs, masters, storages, replicas, partitions)
# cluster
neo = NEOCluster(
kw = dict(
db_list=['neot_matrix_%d' % i for i in xrange(storages)],
clear_databases=True,
partitions=partitions,
replicas=replicas,
master_node_count=masters,
verbose=self._config.verbose,
)
# import
neo_storage = neo.getZODBStorage()
dfs_storage = FileStorage(file_name=self._config.datafs)
if self._config.threaded:
from neo.tests.threaded import NEOCluster
neo = NEOCluster(master_count=masters, **kw)
else:
from neo.tests.functional import NEOCluster
neo = NEOCluster(master_node_count=masters, **kw)
neo.start()
neo_storage = neo.getZODBStorage()
# import
start = time()
try:
try:
neo_storage.copyTransactionsFrom(dfs_storage)
return time() - start
end = time()
return dfs_storage.getSize() / ((end - start) * 1e3)
except:
traceback.print_exc()
self.error_log += "Import with m=%s, s=%s, r=%s, p=%s:" % (
......
......@@ -24,7 +24,7 @@ class ImportBenchmark(BenchmarkRunner):
parser.add_option('-r', '--replicas')
def load_options(self, options, args):
if not options.datafs or not os.path.exists(options.datafs):
if options.datafs and not os.path.exists(options.datafs):
sys.exit('Missing or wrong data.fs argument')
return dict(
datafs = options.datafs,
......@@ -74,8 +74,12 @@ class ImportBenchmark(BenchmarkRunner):
# open storages clients
datafs = self._config.datafs
neo_storage = neo.getZODBStorage()
dfs_storage = FileStorage(file_name=datafs)
dfs_size = os.path.getsize(datafs)
if datafs:
dfs_storage = FileStorage(file_name=datafs)
else:
from neo.tests.stat_zodb import PROD1
from random import Random
dfs_storage = PROD1(Random(0)).as_storage(10000)
# monkey patch storage
txn_dict, obj_dict = {}, {}
......@@ -92,13 +96,13 @@ class ImportBenchmark(BenchmarkRunner):
'Transactions': txn_dict.values(),
'Objects': obj_dict.values(),
}
return (dfs_size, elapsed, stats)
return (dfs_storage.getSize(), elapsed, stats)
def buildReport(self, dfs_size, elapsed, stats):
""" build a report for the given import data """
config = self._config
dfs_size /= 1024
size = dfs_size / 1024
dfs_size /= 1e3
size = dfs_size / 1e3
speed = dfs_size / elapsed
# configuration
......
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!