Commit 4ed0fe57 by Julien Muchembled

qa: new --engine & --kill-mysqld tools/stress options

1 parent 1f340f4a
......@@ -200,6 +200,7 @@ class MySQLDatabaseManager(DatabaseManager):
def _commit(self):
# XXX: Should we translate OperationalError into MysqlError ?
self._active = 0
......@@ -310,7 +310,7 @@ class NEOCluster(object):
cleanup_on_delete=False, temp_dir=None, clear_databases=True,
address_type=ADDRESS_TYPE, bind_ip=None, logger=True,
importer=None, storage_kw={}):
if not adapter:
adapter = 'MySQL'
self.adapter = adapter
......@@ -372,7 +372,8 @@ class NEOCluster(object):
# create storage nodes
for i, db in enumerate(db_list):
self._newProcess(NodeTypes.STORAGE, logger and 'storage_%u' % i,
0, adapter=adapter, database=self.db_template(db))
0, adapter=adapter, database=self.db_template(db),
# create neoctl
self.neoctl = NeoCTL((self.local_ip, admin_port), ssl=self.SSL)
......@@ -7,6 +7,7 @@ from contextlib import contextmanager
from datetime import datetime
from functools import partial
from multiprocessing import Lock, RawArray
from multiprocessing.queues import SimpleQueue
from struct import Struct
from netfilterqueue import NetfilterQueue
import gevent.socket # preload for subprocesses
......@@ -19,7 +20,7 @@ from neo.lib.protocol import NodeTypes
from neo.lib.util import timeStringFromTID, p64, u64
Application as StorageApplication
from neo.tests import getTempDirectory
from neo.tests import getTempDirectory, mysql_pool
from neo.tests.ConflictFree import ConflictFreeLog
from neo.tests.functional import AlreadyStopped, NEOCluster, Process
from neo.tests.stress import StressApplication
......@@ -312,10 +313,11 @@ class NEOCluster(NEOCluster):
class Application(StressApplication):
_blocking = None
_blocking = _kill_mysqld = None
def __init__(self, client_count, thread_count, restart_ratio, logrotate,
*args, **kw):
def __init__(self, client_count, thread_count,
restart_ratio, kill_mysqld,
logrotate, *args, **kw):
self.client_count = client_count
self.thread_count = thread_count
self.logrotate = logrotate
......@@ -326,7 +328,24 @@ class Application(StressApplication):
def __init__(self, config):
StorageApplication__init__(self, config)
StorageApplication.__init__ = __init__
if kill_mysqld:
from neo.scripts import neostorage
from import mysqldb
neostorage_main = neostorage.main
self._kill_mysqld = kill_mysqld = SimpleQueue()
def main():
pid = os.getpid()
except mysqldb.OperationalError as e:
code = e.args[0]
except mysqldb.MysqlError as e:
code = e.code
if mysqldb.SERVER_LOST != code != mysqldb.SERVER_GONE_ERROR:
neostorage.main = main
super(Application, self).__init__(cluster.SSL,
......@@ -398,6 +417,10 @@ class Application(StressApplication):
t = threading.Thread(target=self._logrotate_thread)
t.daemon = 1
if self._kill_mysqld:
t = threading.Thread(target=self._watch_storage_thread)
t.daemon = 1
def stopCluster(self, wait=None):
......@@ -471,13 +494,30 @@ class Application(StressApplication):
except ValueError:
def _watch_storage_thread(self):
get = self._kill_mysqld.get
storage_list = self.cluster.getStorageProcessList()
while 1:
pid = get()
p, = (p for p in storage_list if == pid)
def restartStorages(self, nids):
processes = [p for p in self.cluster.getStorageProcessList()
if p.uuid in nids]
for p in processes: p.kill(signal.SIGKILL)
for p in processes: p.wait()
for p in processes: p.start()
storage_list = self.cluster.getStorageProcessList()
if self._kill_mysqld:
db_list = [db for db, p in zip(self.cluster.db_list, storage_list)
if p.uuid in nids]
with open(os.devnull, "wb") as f:
mysql_pool.start(*db_list, stderr=f)
processes = [p for p in storage_list if p.uuid in nids]
for p in processes: p.kill(signal.SIGKILL)
for p in processes: p.wait()
for p in processes: p.start()
def _cleanFirewall(self):
with open(os.devnull, "wb") as f:
......@@ -548,6 +588,7 @@ def main():
default=socket.AF_INET, const=socket.AF_INET6, help='(default: IPv4)')
_('-a', '--adapter', choices=adapters, default=default_adapter)
_('-d', '--datadir', help="(default: same as unit tests)")
_('-e', '--engine', help="database engine (MySQL only)")
_('-l', '--logdir', help="(default: same as --datadir)")
_('-m', '--masters', type=int, default=1)
_('-s', '--storages', type=int, default=8)
......@@ -574,6 +615,9 @@ def main():
_('-r', '--restart-ratio', type=ratio, default=.5, metavar='RATIO',
help='probability to kill/restart a storage node, rather than just'
' RSTing a TCP connection with this node')
_('--kill-mysqld', action='store_true',
help='if r != 0 and if NEO_DB_MYCNF is set,'
' kill mysqld rather than storage node')
_('-C', '--console', type=int, default=0,
help='console port (localhost) (default: any)')
_('-D', '--delay', type=float, default=.01,
......@@ -601,11 +645,19 @@ def main():
kw = dict(db_list=db_list, name='stress',
partitions=args.partitions, replicas=args.replicas,
adapter=args.adapter, address_type=args.address_type,
temp_dir=args.logdir or args.datadir or getTempDirectory())
temp_dir=args.logdir or args.datadir or getTempDirectory(),
storage_kw={'engine': args.engine, 'wait': -1})
if args.command == 'run':
NFQueue.delay = args.delay
app = Application(args.clients, args.threads, args.restart_ratio,
error = args.kill_mysqld and (
'invalid adapter' if args.adapter != 'MySQL' else
None if mysql_pool else 'NEO_DB_MYCNF not set'
if error:
parser.error('--kill-mysqld: ' + error)
app = Application(args.clients, args.threads,
args.restart_ratio, args.kill_mysqld,
int(round(args.logrotate * 3600, 0)), **kw)
t = threading.Thread(target=console, args=(args.console, app))
t.daemon = 1
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!