Commit 491f4c89 authored by Julien Muchembled's avatar Julien Muchembled

qa: new tools/stress options to evaluate MySQL engines

--kill-mysqld should be combined with something like -f .3 -r .1
to give storage nodes enough time to recover.
And also -D 0 to focus testing on the storage backend rather than NEO.
parent c11410ef
......@@ -200,6 +200,7 @@ class MySQLDatabaseManager(DatabaseManager):
def _commit(self):
# XXX: Should we translate OperationalError into MysqlError ?
self._active = 0
......@@ -310,7 +310,7 @@ class NEOCluster(object):
cleanup_on_delete=False, temp_dir=None, clear_databases=True,
address_type=ADDRESS_TYPE, bind_ip=None, logger=True,
importer=None, storage_kw={}):
if not adapter:
adapter = 'MySQL'
self.adapter = adapter
......@@ -372,7 +372,8 @@ class NEOCluster(object):
# create storage nodes
for i, db in enumerate(db_list):
self._newProcess(NodeTypes.STORAGE, logger and 'storage_%u' % i,
0, adapter=adapter, database=self.db_template(db))
0, adapter=adapter, database=self.db_template(db),
# create neoctl
self.neoctl = NeoCTL((self.local_ip, admin_port), ssl=self.SSL)
......@@ -50,6 +50,7 @@ class StressApplication(AdminApplication):
cluster_state = server = uuid = None
listening_conn = True
fault_probability = 1
restart_ratio = float('inf') # no firewall support
_stress = False
......@@ -191,7 +192,7 @@ class StressApplication(AdminApplication):
self.loid = loid
self.ltid = ltid
self.em.setTimeout(int(time.time() + 1), self.askLastIDs)
if self._stress:
if self._stress and random.random() < self.fault_probability:
node_list = self.nm.getStorageList()
fw = []
......@@ -7,6 +7,7 @@ from contextlib import contextmanager
from datetime import datetime
from functools import partial
from multiprocessing import Lock, RawArray
from multiprocessing.queues import SimpleQueue
from struct import Struct
from netfilterqueue import NetfilterQueue
import gevent.socket # preload for subprocesses
......@@ -19,7 +20,7 @@ from neo.lib.protocol import NodeTypes
from neo.lib.util import timeStringFromTID, p64, u64
Application as StorageApplication
from neo.tests import getTempDirectory
from neo.tests import getTempDirectory, mysql_pool
from neo.tests.ConflictFree import ConflictFreeLog
from neo.tests.functional import AlreadyStopped, NEOCluster, Process
from neo.tests.stress import StressApplication
......@@ -312,13 +313,15 @@ class NEOCluster(NEOCluster):
class Application(StressApplication):
_blocking = None
_blocking = _kill_mysqld = None
def __init__(self, client_count, thread_count, restart_ratio, logrotate,
*args, **kw):
def __init__(self, client_count, thread_count,
fault_probability, restart_ratio, kill_mysqld,
logrotate, *args, **kw):
self.client_count = client_count
self.thread_count = thread_count
self.logrotate = logrotate
self.fault_probability = fault_probability
self.restart_ratio = restart_ratio
self.cluster = cluster = NEOCluster(*args, **kw)
# Make the firewall also affect connections between storage nodes.
......@@ -326,7 +329,24 @@ class Application(StressApplication):
def __init__(self, config):
StorageApplication__init__(self, config)
StorageApplication.__init__ = __init__
if kill_mysqld:
from neo.scripts import neostorage
from import mysqldb
neostorage_main = neostorage.main
self._kill_mysqld = kill_mysqld = SimpleQueue()
def main():
pid = os.getpid()
except mysqldb.OperationalError as e:
code = e.args[0]
except mysqldb.MysqlError as e:
code = e.code
if mysqldb.SERVER_LOST != code != mysqldb.SERVER_GONE_ERROR:
neostorage.main = main
super(Application, self).__init__(cluster.SSL,
......@@ -398,6 +418,10 @@ class Application(StressApplication):
t = threading.Thread(target=self._logrotate_thread)
t.daemon = 1
if self._kill_mysqld:
t = threading.Thread(target=self._watch_storage_thread)
t.daemon = 1
def stopCluster(self, wait=None):
......@@ -471,13 +495,30 @@ class Application(StressApplication):
except ValueError:
def _watch_storage_thread(self):
get = self._kill_mysqld.get
storage_list = self.cluster.getStorageProcessList()
while 1:
pid = get()
p, = (p for p in storage_list if == pid)
def restartStorages(self, nids):
processes = [p for p in self.cluster.getStorageProcessList()
if p.uuid in nids]
for p in processes: p.kill(signal.SIGKILL)
for p in processes: p.wait()
for p in processes: p.start()
storage_list = self.cluster.getStorageProcessList()
if self._kill_mysqld:
db_list = [db for db, p in zip(self.cluster.db_list, storage_list)
if p.uuid in nids]
with open(os.devnull, "wb") as f:
mysql_pool.start(*db_list, stderr=f)
processes = [p for p in storage_list if p.uuid in nids]
for p in processes: p.kill(signal.SIGKILL)
for p in processes: p.wait()
for p in processes: p.start()
def _cleanFirewall(self):
with open(os.devnull, "wb") as f:
......@@ -548,6 +589,7 @@ def main():
default=socket.AF_INET, const=socket.AF_INET6, help='(default: IPv4)')
_('-a', '--adapter', choices=adapters, default=default_adapter)
_('-d', '--datadir', help="(default: same as unit tests)")
_('-e', '--engine', help="database engine (MySQL only)")
_('-l', '--logdir', help="(default: same as --datadir)")
_('-m', '--masters', type=int, default=1)
_('-s', '--storages', type=int, default=8)
......@@ -571,9 +613,14 @@ def main():
help='number of client processes')
_('-t', '--threads', type=int, default=1,
help='number of thread workers per client process')
_('-f', '--fault-probability', type=ratio, default=1, metavar='P',
help='probability to cause faults every second')
_('-r', '--restart-ratio', type=ratio, default=.5, metavar='RATIO',
help='probability to kill/restart a storage node, rather than just'
' RSTing a TCP connection with this node')
_('--kill-mysqld', action='store_true',
help='if r != 0 and if NEO_DB_MYCNF is set,'
' kill mysqld rather than storage node')
_('-C', '--console', type=int, default=0,
help='console port (localhost) (default: any)')
_('-D', '--delay', type=float, default=.01,
......@@ -594,18 +641,31 @@ def main():
db_list = ['stress_neo%s' % x for x in xrange(args.storages)]
if args.datadir:
if args.adapter != 'SQLite':
parser.error('--datadir is only for SQLite adapter')
db_list = [os.path.join(args.datadir, x + '.sqlite') for x in db_list]
if args.adapter == 'SQLite':
db_list = [os.path.join(args.datadir, x + '.sqlite')
for x in db_list]
elif mysql_pool:
'--datadir: meaningless when using an existing MySQL server')
kw = dict(db_list=db_list, name='stress',
partitions=args.partitions, replicas=args.replicas,
adapter=args.adapter, address_type=args.address_type,
temp_dir=args.logdir or args.datadir or getTempDirectory())
temp_dir=args.logdir or args.datadir or getTempDirectory(),
storage_kw={'engine': args.engine, 'wait': -1})
if args.command == 'run':
NFQueue.delay = args.delay
app = Application(args.clients, args.threads, args.restart_ratio,
error = args.kill_mysqld and (
'invalid adapter' if args.adapter != 'MySQL' else
None if mysql_pool else 'NEO_DB_MYCNF not set'
if error:
parser.error('--kill-mysqld: ' + error)
app = Application(args.clients, args.threads,
args.fault_probability, args.restart_ratio, args.kill_mysqld,
int(round(args.logrotate * 3600, 0)), **kw)
t = threading.Thread(target=console, args=(args.console, app))
t.daemon = 1
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment