Commit 93eabad6 authored by Julien Muchembled's avatar Julien Muchembled

Improve NEOCluster from functional test for use with ERP5 unit tests

parent 42e85ff1
...@@ -126,7 +126,7 @@ class NEOProcess(object): ...@@ -126,7 +126,7 @@ class NEOProcess(object):
except ImportError: except ImportError:
raise NotFound, '%s not found' % (command) raise NotFound, '%s not found' % (command)
self.command = command self.command = command
self.arg_dict = arg_dict self.arg_dict = dict(('--' + k, v) for k, v in arg_dict.iteritems())
self.with_uuid = True self.with_uuid = True
self.setUUID(uuid) self.setUUID(uuid)
...@@ -234,10 +234,10 @@ class NEOProcess(object): ...@@ -234,10 +234,10 @@ class NEOProcess(object):
class NEOCluster(object): class NEOCluster(object):
def __init__(self, db_list, master_count=1, partitions=1, replicas=0, def __init__(self, db_list, master_count=1, partitions=1, replicas=0,
db_user=DB_USER, db_password='', db_user=DB_USER, db_password='', name=None,
cleanup_on_delete=False, temp_dir=None, clear_databases=True, cleanup_on_delete=False, temp_dir=None, clear_databases=True,
adapter=os.getenv('NEO_TESTS_ADAPTER'), adapter=os.getenv('NEO_TESTS_ADAPTER'),
address_type=ADDRESS_TYPE, bind_ip=None, address_type=ADDRESS_TYPE, bind_ip=None, logfile=True,
): ):
if not adapter: if not adapter:
adapter = 'MySQL' adapter = 'MySQL'
...@@ -252,9 +252,11 @@ class NEOCluster(object): ...@@ -252,9 +252,11 @@ class NEOCluster(object):
if adapter == 'MySQL': if adapter == 'MySQL':
self.db_user = db_user self.db_user = db_user
self.db_password = db_password self.db_password = db_password
self.db_template = '%s:%s@%%s' % (db_user, db_password) self.db_template = ('%s:%s@%%s' % (db_user, db_password)).__mod__
elif adapter == 'SQLite': elif adapter == 'SQLite':
self.db_template = os.path.join(temp_dir, '%s.sqlite') self.db_template = (lambda t: lambda db:
':memory:' if db is None else db if os.sep in db else t % db
)(os.path.join(temp_dir, '%s.sqlite'))
else: else:
assert False, adapter assert False, adapter
self.address_type = address_type self.address_type = address_type
...@@ -265,58 +267,37 @@ class NEOCluster(object): ...@@ -265,58 +267,37 @@ class NEOCluster(object):
self.temp_dir = temp_dir self.temp_dir = temp_dir
self.port_allocator = PortAllocator() self.port_allocator = PortAllocator()
admin_port = self.port_allocator.allocate(address_type, local_ip) admin_port = self.port_allocator.allocate(address_type, local_ip)
self.cluster_name = 'neo_%s' % (random.randint(0, 100), ) self.cluster_name = name or 'neo_%s' % random.randint(0, 100)
master_node_list = [self.port_allocator.allocate(address_type, local_ip) master_node_list = [self.port_allocator.allocate(address_type, local_ip)
for i in xrange(master_count)] for i in xrange(master_count)]
self.master_nodes = ' '.join('%s:%s' % ( self.master_nodes = ' '.join('%s:%s' % (
buildUrlFromString(self.local_ip), x, ) buildUrlFromString(self.local_ip), x, )
for x in master_node_list) for x in master_node_list)
# create admin node # create admin node
self.__newProcess(NodeTypes.ADMIN, { self._newProcess(NodeTypes.ADMIN, logfile and 'admin', admin_port)
'--cluster': self.cluster_name,
'--logfile': os.path.join(self.temp_dir, 'admin.log'),
'--bind': '%s:%d' % (buildUrlFromString(
self.local_ip), admin_port, ),
'--masters': self.master_nodes,
})
# create master nodes # create master nodes
for i, port in enumerate(master_node_list): for i, port in enumerate(master_node_list):
self.__newProcess(NodeTypes.MASTER, { self._newProcess(NodeTypes.MASTER, logfile and 'master_%u' % i,
'--cluster': self.cluster_name, port, partitions=partitions, replicas=replicas)
'--logfile': os.path.join(self.temp_dir, 'master_%u.log' % i),
'--bind': '%s:%d' % (buildUrlFromString(
self.local_ip), port, ),
'--masters': self.master_nodes,
'--replicas': replicas,
'--partitions': partitions,
})
# create storage nodes # create storage nodes
for i, db in enumerate(db_list): for i, db in enumerate(db_list):
self.__newProcess(NodeTypes.STORAGE, { self._newProcess(NodeTypes.STORAGE, logfile and 'storage_%u' % i,
'--cluster': self.cluster_name, 0, adapter=adapter, database=self.db_template(db))
'--logfile': os.path.join(self.temp_dir, 'storage_%u.log' % i),
'--bind': '%s:%d' % (buildUrlFromString(
self.local_ip),
0 ),
'--masters': self.master_nodes,
'--database': self.db_template % db,
'--adapter': adapter,
})
# create neoctl # create neoctl
self.neoctl = NeoCTL((self.local_ip, admin_port)) self.neoctl = NeoCTL((self.local_ip, admin_port))
def __newProcess(self, node_type, arguments): def _newProcess(self, node_type, logfile=None, port=None, **kw):
self.uuid_dict[node_type] = uuid = 1 + self.uuid_dict.get(node_type, 0) self.uuid_dict[node_type] = uuid = 1 + self.uuid_dict.get(node_type, 0)
uuid += UUID_NAMESPACES[node_type] << 24 uuid += UUID_NAMESPACES[node_type] << 24
arguments['--uuid'] = uuid kw['uuid'] = uuid
kw['cluster'] = self.cluster_name
kw['masters'] = self.master_nodes
if logfile:
kw['logfile'] = os.path.join(self.temp_dir, logfile + '.log')
if port is not None:
kw['bind'] = '%s:%u' % (buildUrlFromString(self.local_ip), port)
self.process_dict.setdefault(node_type, []).append( self.process_dict.setdefault(node_type, []).append(
NEOProcess(command_dict[node_type], uuid, arguments)) NEOProcess(command_dict[node_type], uuid, kw))
def __allocateUUID(self):
uuid = ('%032x' % random.getrandbits(128)).decode('hex')
self.uuid_set.add(uuid)
return uuid
def setupDB(self, clear_databases=True): def setupDB(self, clear_databases=True):
if self.adapter == 'MySQL': if self.adapter == 'MySQL':
...@@ -325,7 +306,9 @@ class NEOCluster(object): ...@@ -325,7 +306,9 @@ class NEOCluster(object):
elif self.adapter == 'SQLite': elif self.adapter == 'SQLite':
if clear_databases: if clear_databases:
for db in self.db_list: for db in self.db_list:
db = self.db_template % db if db is None:
continue
db = self.db_template(db)
try: try:
os.remove(db) os.remove(db)
except OSError, e: except OSError, e:
...@@ -357,29 +340,25 @@ class NEOCluster(object): ...@@ -357,29 +340,25 @@ class NEOCluster(object):
""" Do a complete start of a cluster """ """ Do a complete start of a cluster """
self.run(except_storages=except_storages) self.run(except_storages=except_storages)
neoctl = self.neoctl neoctl = self.neoctl
target_count = len(self.db_list) - len(except_storages) target = [len(self.db_list) - len(except_storages)]
storage_node_list = []
def test(): def test():
storage_node_list[:] = [x
for x in neoctl.getNodeList(node_type=NodeTypes.STORAGE)
if x[3] == NodeStates.PENDING]
# wait at least number of started storages, admin node can know
# more nodes when the cluster restart with an existing partition
# table referencing non-running nodes
result = len(storage_node_list) >= target_count
if result:
try: try:
state = neoctl.getClusterState()
if state == ClusterStates.RUNNING:
return True
if state == ClusterStates.RECOVERING and target[0]:
pending_count = 0
for x in neoctl.getNodeList(node_type=NodeTypes.STORAGE):
if x[3] != NodeStates.PENDING:
target[0] = None # cluster must start automatically
break
pending_count += 1
if pending_count == target[0]:
neoctl.startCluster() neoctl.startCluster()
except RuntimeError, exc: except (NotReadyException, RuntimeError):
result = False pass
else:
result = True
return result
if not pdb.wait(test, MAX_START_TIME): if not pdb.wait(test, MAX_START_TIME):
raise AssertionError('Timeout when starting cluster') raise AssertionError('Timeout when starting cluster')
if storage_node_list:
self.expectClusterRunning()
neoctl.enableStorageList([x[2] for x in storage_node_list])
def stop(self, clients=True): def stop(self, clients=True):
error_list = [] error_list = []
...@@ -426,8 +405,8 @@ class NEOCluster(object): ...@@ -426,8 +405,8 @@ class NEOCluster(object):
return (db, db.open()) return (db, db.open())
def getSQLConnection(self, db): def getSQLConnection(self, db):
assert db in self.db_list assert db is not None and db in self.db_list
return buildDatabaseManager(self.adapter, (self.db_template % db,)) return buildDatabaseManager(self.adapter, (self.db_template(db),))
def getMasterProcessList(self): def getMasterProcessList(self):
return self.process_dict.get(NodeTypes.MASTER) return self.process_dict.get(NodeTypes.MASTER)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment