Commit bb0141e2 authored by Vincent Pelletier's avatar Vincent Pelletier

Move all common test functions into a single class, making it more reusable between tests.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1127 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent bacc91d9
......@@ -21,18 +21,66 @@ import os
import sys
import time
import signal
import random
import MySQLdb
import tempfile
import traceback
# No need to protect this list with a lock, NEOProcess instanciations and
# killallNeo calls are done from a single thread.
neo_process_list = []
from neo import setupLog
from neo import logging
from neo.client.Storage import Storage
NEO_CONFIG_HEADER = """
[DEFAULT]
master_nodes: %(master_nodes)s
replicas: %(replicas)s
partitions: %(partitions)s
name: %(name)s
user: %(user)s
password: %(password)s
connector: SocketConnector
[admin]
server: 127.0.0.1:%(port)s
"""
NEO_CONFIG_MASTER = """
[%(id)s]
server: 127.0.0.1:%(port)s
"""
NEO_CONFIG_STORAGE = """
[%(id)s]
database: %(db)s
server: 127.0.0.1:%(port)s
"""
NEO_MASTER_ID = 'master%s'
NEO_STORAGE_ID = 'storage%s'
NEO_MASTER = 'neomaster'
NEO_STORAGE = 'neostorage'
NEO_ADMIN = 'neoadmin'
class AlreadyRunning(Exception):
pass
class AlreadyStopped(Exception):
pass
class NEOProcess:
pid = 0
def __init__(self, command, *args):
self.command = command
self.args = args
def start(self):
# Prevent starting when already forked and wait wasn't called.
if self.pid != 0:
raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
command = self.command
args = self.args
self.pid = os.fork()
if self.pid == 0:
# Child
......@@ -49,8 +97,6 @@ class NEOProcess:
# traceback, replace stdout & stderr.
sys.stdout = sys.stderr = open('/dev/null', 'w')
raise KeyboardInterrupt
else:
neo_process_list.append(self)
def kill(self, sig=signal.SIGTERM):
if self.pid:
......@@ -58,6 +104,8 @@ class NEOProcess:
os.kill(self.pid, sig)
except OSError:
traceback.print_last()
else:
raise AlreadyStopped
def __del__(self):
# If we get killed, kill subprocesses aswell.
......@@ -70,162 +118,101 @@ class NEOProcess:
pass
def wait(self, options=0):
assert self.pid
return os.WEXITSTATUS(os.waitpid(self.pid, options)[1])
def killallNeo():
while len(neo_process_list):
process = neo_process_list.pop()
process.kill()
process.wait()
NEO_MASTER = 'neomaster'
NEO_STORAGE = 'neostorage'
NEO_ADMIN = 'neoadmin'
NEO_PORT_BASE = 10010
NEO_CLUSTER_NAME = 'test'
NEO_MASTER_PORT_1 = NEO_PORT_BASE
NEO_MASTER_PORT_2 = NEO_MASTER_PORT_1 + 1
NEO_MASTER_PORT_3 = NEO_MASTER_PORT_2 + 1
NEO_STORAGE_PORT_1 = NEO_MASTER_PORT_3 + 1
NEO_STORAGE_PORT_2 = NEO_STORAGE_PORT_1 + 1
NEO_STORAGE_PORT_3 = NEO_STORAGE_PORT_2 + 1
NEO_STORAGE_PORT_4 = NEO_STORAGE_PORT_3 + 1
NEO_ADMIN_PORT = NEO_STORAGE_PORT_4 + 1
NEO_MASTER_NODES = '127.0.0.1:%(port_1)s 127.0.0.1:%(port_2)s 127.0.0.1:%(port_3)s' % {
'port_1': NEO_MASTER_PORT_1,
'port_2': NEO_MASTER_PORT_2,
'port_3': NEO_MASTER_PORT_3
}
NEO_SQL_USER = 'test'
NEO_SQL_PASSWORD = ''
NEO_SQL_DATABASE_1 = 'test_neo1'
NEO_SQL_DATABASE_2 = 'test_neo2'
NEO_SQL_DATABASE_3 = 'test_neo3'
NEO_SQL_DATABASE_4 = 'test_neo4'
# Used to create & drop above databases and grant test users privileges.
SQL_ADMIN_USER = 'root'
SQL_ADMIN_PASSWORD = None
NEO_CONFIG = '''
# Default parameters.
[DEFAULT]
# The list of master nodes.
master_nodes: %(master_nodes)s
# The number of replicas.
replicas: 2
# The number of partitions.
partitions: 1009
# The name of this cluster.
name: %(name)s
# The user name for the database.
user: %(user)s
# The password for the database.
password: %(password)s
# The connector class used
connector: SocketConnector
# The admin node.
[admin]
server: 127.0.0.1:%(admin_port)s
# The first master.
[master1]
server: 127.0.0.1:%(master1_port)s
# The second master.
[master2]
server: 127.0.0.1:%(master2_port)s
# The third master.
[master3]
server: 127.0.0.1:%(master3_port)s
# The first storage.
[storage1]
database: %(storage1_db)s
server: 127.0.0.1:%(storage1_port)s
# The first storage.
[storage2]
database: %(storage2_db)s
server: 127.0.0.1:%(storage2_port)s
# The third storage.
[storage3]
database: %(storage3_db)s
server: 127.0.0.1:%(storage3_port)s
# The fourth storage.
[storage4]
database: %(storage4_db)s
server: 127.0.0.1:%(storage4_port)s
''' % {
'master_nodes': NEO_MASTER_NODES,
'name': NEO_CLUSTER_NAME,
'user': NEO_SQL_USER,
'password': NEO_SQL_PASSWORD,
'admin_port': NEO_ADMIN_PORT,
'master1_port': NEO_MASTER_PORT_1,
'master2_port': NEO_MASTER_PORT_2,
'master3_port': NEO_MASTER_PORT_3,
'storage1_port': NEO_STORAGE_PORT_1,
'storage1_db': NEO_SQL_DATABASE_1,
'storage2_port': NEO_STORAGE_PORT_2,
'storage2_db': NEO_SQL_DATABASE_2,
'storage3_port': NEO_STORAGE_PORT_3,
'storage3_db': NEO_SQL_DATABASE_3,
'storage4_port': NEO_STORAGE_PORT_4,
'storage4_db': NEO_SQL_DATABASE_4
}
temp_dir = tempfile.mkdtemp(prefix='neo_')
print 'Using temp directory %r.' % (temp_dir, )
config_file_path = os.path.join(temp_dir, 'neo.conf')
config_file = open(config_file_path, 'w')
config_file.write(NEO_CONFIG)
config_file.close()
m1_log = os.path.join(temp_dir, 'm1.log')
m2_log = os.path.join(temp_dir, 'm2.log')
m3_log = os.path.join(temp_dir, 'm3.log')
s1_log = os.path.join(temp_dir, 's1.log')
s2_log = os.path.join(temp_dir, 's2.log')
s3_log = os.path.join(temp_dir, 's3.log')
s4_log = os.path.join(temp_dir, 's4.log')
a_log = os.path.join(temp_dir, 'a.log')
if self.pid == 0:
raise AlreadyStopped
result = os.WEXITSTATUS(os.waitpid(self.pid, options)[1])
self.pid = 0
return result
class NEOCluster(object):
def __init__(self, db_list, master_node_count=1,
partitions=1, replicas=0, port_base=10000,
db_user='neo', db_password='neo',
db_super_user='root', db_super_password=None,
cleanup_on_delete=False):
self.cleanup_on_delete = cleanup_on_delete
self.db_super_user = db_super_user
self.db_super_password = db_super_password
self.db_user = db_user
self.db_password = db_password
self.db_list = db_list
self.process_list = []
self.last_port = port_base
self.temp_dir = temp_dir = tempfile.mkdtemp(prefix='neo_')
print 'Using temp directory %r.' % (temp_dir, )
config_file_path = os.path.join(temp_dir, 'neo.conf')
config_file = open(config_file_path, 'w')
neo_admin_port = self.__allocatePort()
self.cluster_name = cluster_name = 'neo_%s' % (random.randint(0, 100), )
master_node_dict = {}
for master in xrange(master_node_count):
master_node_dict[NEO_MASTER_ID % (master, )] = \
self.__allocatePort()
self.master_nodes = master_nodes = ' '.join('127.0.0.1:%s' %
(x, ) for x in master_node_dict.itervalues())
config_file.write(NEO_CONFIG_HEADER % {
'master_nodes': master_nodes,
'replicas': replicas,
'partitions': partitions,
'name': cluster_name,
'user': db_user,
'password': db_password,
'port': neo_admin_port,
})
self.__newProcess(NEO_ADMIN, '-vc', config_file_path, '-s', 'admin',
'-l', os.path.join(temp_dir, 'admin.log'))
for config_id, port in master_node_dict.iteritems():
config_file.write(NEO_CONFIG_MASTER % {
'id': config_id,
'port': port,
})
self.__newProcess(NEO_MASTER, '-vc', config_file_path, '-s',
config_id, '-l',
os.path.join(temp_dir, '%s.log' % (config_id)))
for storage, db in enumerate(db_list):
config_id = NEO_STORAGE_ID % (storage, )
config_file.write(NEO_CONFIG_STORAGE % {
'id': config_id,
'db': db,
'port': self.__allocatePort(),
})
self.__newProcess(NEO_STORAGE, '-vc', config_file_path, '-s',
config_id, '-l',
os.path.join(temp_dir, '%s.log' % (config_id)))
config_file.close()
self.neoctl = NeoCTL('127.0.0.1', neo_admin_port,
'SocketConnector')
from neo import setupLog
client_log = os.path.join(temp_dir, 'c.log')
setupLog('CLIENT', filename=client_log, verbose=True)
from neo import logging
from neo.client.Storage import Storage
def __newProcess(self, command, *args):
self.process_list.append(NEOProcess(command, *args))
neoctl = NeoCTL('127.0.0.1', NEO_ADMIN_PORT, 'SocketConnector')
def __allocatePort(self):
port = self.last_port
self.last_port += 1
return port
def startNeo():
# Stop NEO cluster (if running)
killallNeo()
def setupDB(self):
# Cleanup or bootstrap databases
connect_arg_dict = {'user': SQL_ADMIN_USER}
if SQL_ADMIN_PASSWORD is not None:
connect_arg_dict['passwd'] = SQL_ADMIN_PASSWORD
connect_arg_dict = {'user': self.db_super_user}
password = self.db_super_password
if password is not None:
connect_arg_dict['passwd'] = password
sql_connection = MySQLdb.Connect(**connect_arg_dict)
cursor = sql_connection.cursor()
for database in (NEO_SQL_DATABASE_1, NEO_SQL_DATABASE_2, NEO_SQL_DATABASE_3, NEO_SQL_DATABASE_4):
cursor.execute('DROP DATABASE IF EXISTS %s' % (database, ))
cursor.execute('CREATE DATABASE %s' % (database, ))
cursor.execute('GRANT ALL ON %s.* TO "%s"@"localhost" IDENTIFIED BY "%s"' % (database, NEO_SQL_USER, NEO_SQL_PASSWORD))
for database in self.db_list:
cursor.execute('DROP DATABASE IF EXISTS `%s`' % (database, ))
cursor.execute('CREATE DATABASE `%s`' % (database, ))
cursor.execute('GRANT ALL ON `%s`.* TO "%s"@"localhost" '\
'IDENTIFIED BY "%s"' % (database, self.db_user,
self.db_password))
cursor.close()
sql_connection.close()
# Start NEO cluster
NEOProcess(NEO_MASTER, '-vc', config_file_path, '-s', 'master1', '-l', m1_log)
NEOProcess(NEO_MASTER, '-vc', config_file_path, '-s', 'master2', '-l', m2_log)
NEOProcess(NEO_MASTER, '-vc', config_file_path, '-s', 'master3', '-l', m3_log)
NEOProcess(NEO_STORAGE, '-vRc', config_file_path, '-s', 'storage1', '-l', s1_log)
NEOProcess(NEO_STORAGE, '-vRc', config_file_path, '-s', 'storage2', '-l', s2_log)
NEOProcess(NEO_STORAGE, '-vRc', config_file_path, '-s', 'storage3', '-l', s3_log)
NEOProcess(NEO_STORAGE, '-vRc', config_file_path, '-s', 'storage4', '-l', s4_log)
NEOProcess(NEO_ADMIN, '-vc', config_file_path, '-s', 'admin', '-l', a_log)
def start(self):
neoctl = self.neoctl
assert len(self.process_list)
for process in self.process_list:
process.start()
# Try to put cluster in running state. This will succeed as soon as
# admin node could connect to the primary master node.
while True:
......@@ -235,17 +222,35 @@ def startNeo():
time.sleep(0.5)
else:
break
target_count = len(self.db_list)
while True:
storage_node_list = neoctl.getNodeList(
node_type=protocol.STORAGE_NODE_TYPE)
if len(storage_node_list) == 4:
if len(storage_node_list) == target_count:
break
time.sleep(0.5)
neoctl.enableStorageList([x[2] for x in storage_node_list])
def getNeoStorage():
def stop(self):
for process in self.process_list:
try:
process.kill()
process.wait()
except AlreadyStopped:
pass
def getNEOCTL(self):
return self.neoctl
def getStorage(self):
#setupLog('CLIENT', filename=os.path.join(self.temp_dir, 'client.log'),
# verbose=True)
return Storage(
master_nodes=NEO_MASTER_NODES,
name=NEO_CLUSTER_NAME,
master_nodes=self.master_nodes,
name=self.cluster_name,
connector='SocketConnector')
def __del__(self):
if self.cleanup_on_delete:
os.removedirs(self.temp_dir)
......@@ -23,7 +23,7 @@ from persistent import Persistent
from persistent.mapping import PersistentMapping
import transaction
from neo.tests.functional import startNeo, getNeoStorage, killallNeo
from neo.tests.functional import NEOCluster
class P(Persistent):
pass
......@@ -38,11 +38,17 @@ class DecoyIndependent(Persistent):
def _p_independent(self):
return 0
neo = NEOCluster(['test_neo1', 'test_neo2', 'test_neo3', 'test_neo4'],
partitions=1009, replicas=1, port_base=20000,
master_node_count=3)
class ZODBTests(unittest.TestCase):
def setUp(self):
startNeo()
self._storage = getNeoStorage()
neo.stop()
neo.setupDB()
neo.start()
self._storage = neo.getStorage()
self._db = ZODB.DB(self._storage)
def populate(self):
......@@ -59,7 +65,7 @@ class ZODBTests(unittest.TestCase):
def tearDown(self):
self._db.close()
self._storage.cleanup()
killallNeo()
neo.stop()
def checkExportImport(self, abort_it=False):
self.populate()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment