Commit 10b108f7 authored by Vincent Pelletier's avatar Vincent Pelletier

Remove unused imports.

Make it simpler to call NEOCluster.__newProcess.
Categorise process by node type, and add accessors to retrieve all nodes of a given type.
Generate node uuids in the test and implement accessors for this.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1146 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5367588c
...@@ -26,9 +26,9 @@ import MySQLdb ...@@ -26,9 +26,9 @@ import MySQLdb
import tempfile import tempfile
import traceback import traceback
from neo import setupLog
from neo import logging
from neo.client.Storage import Storage from neo.client.Storage import Storage
from neo.tests import getNewUUID
from neo.util import dump
NEO_CONFIG_HEADER = """ NEO_CONFIG_HEADER = """
[DEFAULT] [DEFAULT]
...@@ -71,16 +71,21 @@ class AlreadyStopped(Exception): ...@@ -71,16 +71,21 @@ class AlreadyStopped(Exception):
class NEOProcess: class NEOProcess:
pid = 0 pid = 0
def __init__(self, command, *args): def __init__(self, command, uuid, arg_dict):
self.command = command self.command = command
self.args = args self.arg_dict = arg_dict
self.setUUID(uuid)
def start(self): def start(self):
# Prevent starting when already forked and wait wasn't called. # Prevent starting when already forked and wait wasn't called.
if self.pid != 0: if self.pid != 0:
raise AlreadyRunning, 'Already running with PID %r' % (self.pid, ) raise AlreadyRunning, 'Already running with PID %r' % (self.pid, )
command = self.command command = self.command
args = self.args args = []
for arg, param in self.arg_dict.iteritems():
args.append(arg)
if param is not None:
args.append(param)
self.pid = os.fork() self.pid = os.fork()
if self.pid == 0: if self.pid == 0:
# Child # Child
...@@ -124,6 +129,16 @@ class NEOProcess: ...@@ -124,6 +129,16 @@ class NEOProcess:
self.pid = 0 self.pid = 0
return result return result
def getUUID(self):
return self.uuid
def setUUID(self, uuid):
"""
Note: for this change to take effect, the node must be restarted.
"""
self.uuid = uuid
self.arg_dict['-u'] = uuid
class NEOCluster(object): class NEOCluster(object):
def __init__(self, db_list, master_node_count=1, def __init__(self, db_list, master_node_count=1,
partitions=1, replicas=0, port_base=10000, partitions=1, replicas=0, port_base=10000,
...@@ -131,16 +146,17 @@ class NEOCluster(object): ...@@ -131,16 +146,17 @@ class NEOCluster(object):
db_super_user='root', db_super_password=None, db_super_user='root', db_super_password=None,
cleanup_on_delete=False): cleanup_on_delete=False):
self.cleanup_on_delete = cleanup_on_delete self.cleanup_on_delete = cleanup_on_delete
self.uuid_set = set()
self.db_super_user = db_super_user self.db_super_user = db_super_user
self.db_super_password = db_super_password self.db_super_password = db_super_password
self.db_user = db_user self.db_user = db_user
self.db_password = db_password self.db_password = db_password
self.db_list = db_list self.db_list = db_list
self.process_list = [] self.process_dict = {}
self.last_port = port_base self.last_port = port_base
self.temp_dir = temp_dir = tempfile.mkdtemp(prefix='neo_') self.temp_dir = temp_dir = tempfile.mkdtemp(prefix='neo_')
print 'Using temp directory %r.' % (temp_dir, ) print 'Using temp directory %r.' % (temp_dir, )
config_file_path = os.path.join(temp_dir, 'neo.conf') self.config_file_path = config_file_path = os.path.join(temp_dir, 'neo.conf')
config_file = open(config_file_path, 'w') config_file = open(config_file_path, 'w')
neo_admin_port = self.__allocatePort() neo_admin_port = self.__allocatePort()
self.cluster_name = cluster_name = 'neo_%s' % (random.randint(0, 100), ) self.cluster_name = cluster_name = 'neo_%s' % (random.randint(0, 100), )
...@@ -159,16 +175,13 @@ class NEOCluster(object): ...@@ -159,16 +175,13 @@ class NEOCluster(object):
'password': db_password, 'password': db_password,
'port': neo_admin_port, 'port': neo_admin_port,
}) })
self.__newProcess(NEO_ADMIN, '-vc', config_file_path, '-s', 'admin', self.__newProcess(NEO_ADMIN, 'admin')
'-l', os.path.join(temp_dir, 'admin.log'))
for config_id, port in master_node_dict.iteritems(): for config_id, port in master_node_dict.iteritems():
config_file.write(NEO_CONFIG_MASTER % { config_file.write(NEO_CONFIG_MASTER % {
'id': config_id, 'id': config_id,
'port': port, 'port': port,
}) })
self.__newProcess(NEO_MASTER, '-vc', config_file_path, '-s', self.__newProcess(NEO_MASTER, config_id)
config_id, '-l',
os.path.join(temp_dir, '%s.log' % (config_id)))
for storage, db in enumerate(db_list): for storage, db in enumerate(db_list):
config_id = NEO_STORAGE_ID % (storage, ) config_id = NEO_STORAGE_ID % (storage, )
config_file.write(NEO_CONFIG_STORAGE % { config_file.write(NEO_CONFIG_STORAGE % {
...@@ -176,21 +189,34 @@ class NEOCluster(object): ...@@ -176,21 +189,34 @@ class NEOCluster(object):
'db': db, 'db': db,
'port': self.__allocatePort(), 'port': self.__allocatePort(),
}) })
self.__newProcess(NEO_STORAGE, '-vc', config_file_path, '-s', self.__newProcess(NEO_STORAGE, config_id)
config_id, '-l',
os.path.join(temp_dir, '%s.log' % (config_id)))
config_file.close() config_file.close()
self.neoctl = NeoCTL('127.0.0.1', neo_admin_port, self.neoctl = NeoCTL('127.0.0.1', neo_admin_port,
'SocketConnector') 'SocketConnector')
def __newProcess(self, command, *args): def __newProcess(self, command, section):
self.process_list.append(NEOProcess(command, *args)) uuid = self.__allocateUUID()
self.process_dict.setdefault(command, []).append(
NEOProcess(command, dump(uuid), {
'-v': None,
'-c': self.config_file_path,
'-s': section,
'-l': os.path.join(self.temp_dir, '%s.log' % (section, ))
}))
def __allocatePort(self): def __allocatePort(self):
port = self.last_port port = self.last_port
self.last_port += 1 self.last_port += 1
return port return port
def __allocateUUID(self):
uuid_set = self.uuid_set
uuid = None
while uuid is None or uuid in uuid_set:
uuid = getNewUUID()
uuid_set.add(uuid)
return uuid
def setupDB(self): def setupDB(self):
# Cleanup or bootstrap databases # Cleanup or bootstrap databases
connect_arg_dict = {'user': self.db_super_user} connect_arg_dict = {'user': self.db_super_user}
...@@ -210,9 +236,10 @@ class NEOCluster(object): ...@@ -210,9 +236,10 @@ class NEOCluster(object):
def start(self): def start(self):
neoctl = self.neoctl neoctl = self.neoctl
assert len(self.process_list) assert len(self.process_dict)
for process in self.process_list: for process_list in self.process_dict.itervalues():
process.start() for process in process_list:
process.start()
# Try to put cluster in running state. This will succeed as soon as # Try to put cluster in running state. This will succeed as soon as
# admin node could connect to the primary master node. # admin node could connect to the primary master node.
while True: while True:
...@@ -232,12 +259,13 @@ class NEOCluster(object): ...@@ -232,12 +259,13 @@ class NEOCluster(object):
neoctl.enableStorageList([x[2] for x in storage_node_list]) neoctl.enableStorageList([x[2] for x in storage_node_list])
def stop(self): def stop(self):
for process in self.process_list: for process_list in self.process_dict.itervalues():
try: for process in process_list:
process.kill() try:
process.wait() process.kill()
except AlreadyStopped: process.wait()
pass except AlreadyStopped:
pass
def getNEOCTL(self): def getNEOCTL(self):
return self.neoctl return self.neoctl
...@@ -248,6 +276,18 @@ class NEOCluster(object): ...@@ -248,6 +276,18 @@ class NEOCluster(object):
name=self.cluster_name, name=self.cluster_name,
connector='SocketConnector') connector='SocketConnector')
def _getProcessList(self, type):
return self.process_dict.get(type)
def getMasterProcessList(self):
return self._getProcessList(NEO_MASTER)
def getStorageProcessList(self):
return self._getProcessList(NEO_STORAGE)
def getAdminProcessList(self):
return self._getProcessList(NEO_ADMIN)
def __del__(self): def __del__(self):
if self.cleanup_on_delete: if self.cleanup_on_delete:
os.removedirs(self.temp_dir) os.removedirs(self.temp_dir)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment