Commit c11410ef authored by Julien Muchembled's avatar Julien Muchembled

qa: provide a way to let tests start 1 mysqld per storage node

parent 74ec44e3
......@@ -21,6 +21,7 @@ import gc
import os
import random
import socket
import subprocess
import sys
import tempfile
import unittest
......@@ -41,7 +42,7 @@ from .mock import Mock
from neo.lib import debug, logging, protocol
from neo.lib.protocol import NodeTypes, Packets, UUID_NAMESPACES
from neo.lib.util import cached_property
from time import time
from time import time, sleep
from struct import pack, unpack
from unittest.case import _ExpectedFailure, _UnexpectedSuccess
try:
......@@ -72,6 +73,9 @@ DB_ADMIN = os.getenv('NEO_DB_ADMIN', 'root')
DB_PASSWD = os.getenv('NEO_DB_PASSWD', '')
DB_USER = os.getenv('NEO_DB_USER', 'test')
DB_SOCKET = os.getenv('NEO_DB_SOCKET', '')
DB_INSTALL = os.getenv('NEO_DB_INSTALL', 'mysql_install_db')
DB_MYSQLD = os.getenv('NEO_DB_MYSQLD', '/usr/sbin/mysqld')
DB_MYCNF = os.getenv('NEO_DB_MYCNF')
IP_VERSION_FORMAT_DICT = {
socket.AF_INET: '127.0.0.1',
......@@ -134,8 +138,12 @@ def getTempDirectory():
print 'Using temp directory %r.' % temp_dir
return temp_dir
def setupMySQLdb(db_list, user=DB_USER, password='', clear_databases=True):
def setupMySQLdb(db_list, clear_databases=True):
if mysql_pool:
return mysql_pool.setup(db_list, clear_databases)
from MySQLdb.constants.ER import BAD_DB_ERROR
user = DB_USER
password = ''
kw = {'unix_socket': os.path.expanduser(DB_SOCKET)} if DB_SOCKET else {}
conn = MySQLdb.connect(user=DB_ADMIN, passwd=DB_PASSWD, **kw)
cursor = conn.cursor()
......@@ -154,6 +162,88 @@ def setupMySQLdb(db_list, user=DB_USER, password='', clear_databases=True):
cursor.close()
conn.commit()
conn.close()
return '{}:{}@%s{}'.format(user, password, DB_SOCKET).__mod__
class MySQLPool(object):
def __init__(self, pool_dir=None):
self._args = {}
self._mysqld_dict = {}
if not pool_dir:
pool_dir = getTempDirectory()
self._base = pool_dir + os.sep
self._sock_template = os.path.join(pool_dir, '%s', 'mysql.sock')
def __del__(self):
self.kill(*self._mysqld_dict)
def setup(self, db_list, clear_databases):
start_list = set(db_list).difference(self._mysqld_dict)
if start_list:
start_list = sorted(start_list)
x = []
with open(os.devnull, 'wb') as f:
for db in start_list:
base = self._base + db
datadir = os.path.join(base, 'datadir')
sock = self._sock_template % db
tmpdir = os.path.join(base, 'tmp')
args = [DB_INSTALL,
'--defaults-file=' + DB_MYCNF,
'--datadir=' + datadir,
'--socket=' + sock,
'--tmpdir=' + tmpdir,
'--log_error=' + os.path.join(base, 'error.log')]
if os.path.exists(datadir):
try:
os.remove(sock)
except OSError, e:
if e.errno != errno.ENOENT:
raise
else:
os.makedirs(tmpdir)
x.append(subprocess.Popen(args,
stdout=f, stderr=subprocess.STDOUT))
args[0] = DB_MYSQLD
self._args[db] = args
for x in x:
x = x.wait()
if x:
raise subprocess.CalledProcessError(x, DB_INSTALL)
self.start(*start_list)
for db in start_list:
sock = self._sock_template % db
p = self._mysqld_dict[db]
while not os.path.exists(sock):
sleep(1)
x = p.poll()
if x is not None:
raise subprocess.CalledProcessError(x, DB_MYSQLD)
for db in db_list:
db = MySQLdb.connect(unix_socket=self._sock_template % db,
user='root')
if clear_databases:
db.query('DROP DATABASE IF EXISTS neo')
db.query('CREATE DATABASE IF NOT EXISTS neo')
db.close()
return ('root@neo' + self._sock_template).__mod__
def start(self, *db, **kw):
assert set(db).isdisjoint(self._mysqld_dict)
for db in db:
self._mysqld_dict[db] = subprocess.Popen(self._args[db], **kw)
def kill(self, *db):
processes = []
for db in db:
p = self._mysqld_dict.pop(db)
processes.append(p)
p.kill()
for p in processes:
p.wait()
mysql_pool = MySQLPool() if DB_MYCNF else None
def ImporterConfigParser(adapter, zodb, **kw):
cfg = SafeConfigParser()
......@@ -244,13 +334,15 @@ class NeoUnitTestBase(NeoTestBase):
""" create empty databases """
adapter = os.getenv('NEO_TESTS_ADAPTER', 'MySQL')
if adapter == 'MySQL':
setupMySQLdb([prefix + str(i) for i in xrange(number)])
db_template = setupMySQLdb(
[prefix + str(i) for i in xrange(number)])
self.db_template = lambda i: db_template(prefix + str(i))
elif adapter == 'SQLite':
temp_dir = getTempDirectory()
self.db_template = os.path.join(getTempDirectory(),
prefix + '%s.sqlite').__mod__
for i in xrange(number):
try:
os.remove(os.path.join(temp_dir,
'%s%s.sqlite' % (prefix, i)))
os.remove(self.db_template(i))
except OSError, e:
if e.errno != errno.ENOENT:
raise
......@@ -274,21 +366,14 @@ class NeoUnitTestBase(NeoTestBase):
def getStorageConfiguration(self, cluster='main', master_number=2,
index=0, prefix=DB_PREFIX, uuid=None):
assert master_number >= 1 and master_number <= 10
assert index >= 0 and index <= 9
masters = [(buildUrlFromString(self.local_ip),
10010 + i) for i in xrange(master_number)]
adapter = os.getenv('NEO_TESTS_ADAPTER', 'MySQL')
if adapter == 'MySQL':
db = '%s@%s%s%s' % (DB_USER, prefix, index, DB_SOCKET)
elif adapter == 'SQLite':
db = os.path.join(getTempDirectory(), 'test_neo%s.sqlite' % index)
else:
assert False, adapter
return {
'cluster': cluster,
'bind': (masters[0], 10020 + index),
'masters': masters,
'database': db,
'database': self.db_template(index),
'uuid': uuid,
'adapter': adapter,
'wait': 0,
......
......@@ -36,7 +36,7 @@ from neo.lib import logging
from neo.lib.protocol import ClusterStates, NodeTypes, CellStates, NodeStates, \
UUID_NAMESPACES
from neo.lib.util import dump, setproctitle
from .. import (ADDRESS_TYPE, DB_SOCKET, DB_USER, IP_VERSION_FORMAT_DICT, SSL,
from .. import (ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, SSL,
buildUrlFromString, cluster, getTempDirectory, setupMySQLdb,
ImporterConfigParser, NeoTestBase, Patch)
from neo.client.Storage import Storage
......@@ -306,7 +306,7 @@ class NEOCluster(object):
SSL = None
def __init__(self, db_list, master_count=1, partitions=1, replicas=0,
db_user=DB_USER, db_password='', name=None,
name=None,
cleanup_on_delete=False, temp_dir=None, clear_databases=True,
adapter=os.getenv('NEO_TESTS_ADAPTER'),
address_type=ADDRESS_TYPE, bind_ip=None, logger=True,
......@@ -322,20 +322,28 @@ class NEOCluster(object):
temp_dir = tempfile.mkdtemp(prefix='neo_')
print 'Using temp directory ' + temp_dir
if adapter == 'MySQL':
self.db_user = db_user
self.db_password = db_password
self.db_template = ('%s:%s@%%s%s' % (db_user, db_password,
DB_SOCKET)).__mod__
self.db_template = setupMySQLdb(db_list, clear_databases)
elif adapter == 'SQLite':
self.db_template = (lambda t: lambda db:
':memory:' if db is None else db if os.sep in db else t % db
)(os.path.join(temp_dir, '%s.sqlite'))
if clear_databases:
for db in self.db_list:
if db is None:
continue
db = self.db_template(db)
try:
os.remove(db)
except OSError, e:
if e.errno != errno.ENOENT:
raise
else:
logging.debug('%r deleted', db)
else:
assert False, adapter
self.address_type = address_type
self.local_ip = local_ip = bind_ip or \
IP_VERSION_FORMAT_DICT[self.address_type]
self.setupDB(clear_databases)
if importer:
cfg = ImporterConfigParser(adapter, **importer)
cfg.set("neo", "database", self.db_template(*db_list))
......@@ -382,23 +390,10 @@ class NEOCluster(object):
self.process_dict.setdefault(node_type, []).append(
NEOProcess(command_dict[node_type], uuid=uuid, **kw))
def setupDB(self, clear_databases=True):
if self.adapter == 'MySQL':
setupMySQLdb(self.db_list, self.db_user, self.db_password,
clear_databases)
elif self.adapter == 'SQLite':
if clear_databases:
def resetDB(self):
for db in self.db_list:
if db is None:
continue
db = self.db_template(db)
try:
os.remove(db)
except OSError, e:
if e.errno != errno.ENOENT:
raise
else:
logging.debug('%r deleted', db)
dm = buildDatabaseManager(self.adapter, (self.db_template(db),))
dm.setup(True)
def run(self, except_storages=()):
""" Start cluster processes except some storage nodes """
......
......@@ -62,8 +62,6 @@ class ClientTests(NEOFunctionalTest):
NEOFunctionalTest._tearDown(self, success)
def __setup(self):
# start cluster
self.neo.setupDB()
self.neo.start()
self.neo.expectClusterRunning()
self.db = ZODB.DB(self.neo.getZODBStorage())
......
......@@ -71,7 +71,6 @@ class ClusterTests(NEOFunctionalTest):
def testClusterBreaks(self):
self.neo = NEOCluster(['test_neo1'],
master_count=1, temp_dir=self.getTempDirectory())
self.neo.setupDB()
self.neo.start()
self.neo.expectClusterRunning()
self.neo.expectOudatedCells(number=0)
......@@ -82,7 +81,6 @@ class ClusterTests(NEOFunctionalTest):
self.neo = NEOCluster(['test_neo1', 'test_neo2'],
partitions=2, master_count=1, replicas=0,
temp_dir=self.getTempDirectory())
self.neo.setupDB()
self.neo.start()
self.neo.expectClusterRunning()
self.neo.expectOudatedCells(number=0)
......@@ -93,7 +91,6 @@ class ClusterTests(NEOFunctionalTest):
self.neo = NEOCluster(['test_neo1', 'test_neo2'],
partitions=2, replicas=1, master_count=1,
temp_dir=self.getTempDirectory())
self.neo.setupDB()
self.neo.start()
self.neo.expectClusterRunning()
self.neo.expectOudatedCells(number=0)
......
......@@ -22,7 +22,7 @@ from MySQLdb.constants.ER import UNKNOWN_STORAGE_ENGINE
from ..mock import Mock
from neo.lib.protocol import ZERO_OID
from neo.lib.util import p64
from .. import DB_PREFIX, DB_SOCKET, DB_USER, Patch
from .. import DB_PREFIX, DB_USER, Patch, setupMySQLdb
from .testStorageDBTests import StorageDBTests
from neo.storage.database import DatabaseFailure
from neo.storage.database.mysqldb import MySQLDatabaseManager
......@@ -46,8 +46,8 @@ class StorageMySQLdbTests(StorageDBTests):
engine = None
def _test_lockDatabase_open(self):
self.prepareDatabase(number=1, prefix=DB_PREFIX)
database = '%s@%s0%s' % (DB_USER, DB_PREFIX, DB_SOCKET)
self.prepareDatabase(1)
database = self.db_template(0)
return MySQLDatabaseManager(database, self.engine)
def getDB(self, reset=0):
......
......@@ -40,7 +40,7 @@ from neo.lib.util import cached_property, parseMasterList, p64
from neo.master.recovery import RecoveryManager
from .. import (getTempDirectory, setupMySQLdb,
ImporterConfigParser, NeoTestBase, Patch,
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX, DB_SOCKET, DB_USER)
ADDRESS_TYPE, IP_VERSION_FORMAT_DICT, DB_PREFIX)
BIND = IP_VERSION_FORMAT_DICT[ADDRESS_TYPE], 0
LOCAL_IP = socket.inet_pton(ADDRESS_TYPE, IP_VERSION_FORMAT_DICT[ADDRESS_TYPE])
......@@ -714,7 +714,7 @@ class NEOCluster(object):
def __init__(self, master_count=1, partitions=1, replicas=0, upstream=None,
adapter=os.getenv('NEO_TESTS_ADAPTER', 'SQLite'),
storage_count=None, db_list=None, clear_databases=True,
db_user=DB_USER, db_password='', compress=True,
compress=True,
importer=None, autostart=None, dedup=False, name=None):
self.name = name or 'neo_%s' % self._allocate('name',
lambda: random.randint(0, 100))
......@@ -741,21 +741,20 @@ class NEOCluster(object):
db_list = ['%s%u' % (DB_PREFIX, self._allocate('db', index))
for _ in xrange(storage_count)]
if adapter == 'MySQL':
setupMySQLdb(db_list, db_user, db_password, clear_databases)
db = '%s:%s@%%s%s' % (db_user, db_password, DB_SOCKET)
db = setupMySQLdb(db_list, clear_databases)
elif adapter == 'SQLite':
db = os.path.join(getTempDirectory(), '%s.sqlite')
db = os.path.join(getTempDirectory(), '%s.sqlite').__mod__
else:
assert False, adapter
if importer:
cfg = ImporterConfigParser(adapter, **importer)
cfg.set("neo", "database", db % tuple(db_list))
db = os.path.join(getTempDirectory(), '%s.conf')
with open(db % tuple(db_list), "w") as f:
cfg.set("neo", "database", db(*db_list))
db = os.path.join(getTempDirectory(), '%s.conf').__mod__
with open(db(*db_list), "w") as f:
cfg.write(f)
kw["adapter"] = "Importer"
kw['wait'] = 0
self.storage_list = [StorageApplication(database=db % x, **kw)
self.storage_list = [StorageApplication(database=db(x), **kw)
for x in db_list]
self.admin_list = [AdminApplication(**kw)]
......
......@@ -33,8 +33,6 @@ class RecoveryTests(ZODBTestCase, StorageTestBase, RecoveryStorage):
os.makedirs(dst_temp_dir)
self.neo_dst = NEOCluster(['test_neo1-dst'], partitions=1, replicas=0,
master_count=1, temp_dir=dst_temp_dir)
self.neo_dst.stop()
self.neo_dst.setupDB()
self.neo_dst.start()
self._dst = self.neo.getZODBStorage()
self._dst_db = ZODB.DB(self._dst)
......
......@@ -129,7 +129,7 @@ class MatrixImportBenchmark(BenchmarkRunner):
finally:
zodb.stop()
# Clear DB if no error happened.
zodb.setupDB()
zodb.resetDB()
return end - start
except:
traceback.print_exc()
......
......@@ -53,7 +53,7 @@ class ImportBenchmark(BenchmarkRunner):
finally:
neo.stop()
# Clear DB if no error happened.
neo.setupDB()
neo.resetDB()
return result
except:
return 'Perf: import failed', ''.join(traceback.format_exc())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment