Commit d86e6370 authored by Pere Cortes's avatar Pere Cortes Committed by Sebastien Robin

erp5testnode getting test suites from master

- test node support much less parameters, it will get them from
  the master
- testnode get list of test suite to install
- the instance folder structure has changed to support multiple
  test suites
- test node will delete useless test suites data if needed
- the master url should be a sub object (distributor) inside
  the task distibution tool
- the long run method has been splitted in many smaller methods
  (it is still possible to go futher
parent 65681d8a
......@@ -173,7 +173,7 @@ class TestResultLineProxy(RPCRetry):
if kw:
self._logger.info('Extra parameters provided: %r', kw)
status_dict.update(kw)
self._retryRPC('stopUnitTest', (self._test_result_line_path,
self._retryRPC('stopUnitTest', (self._test_result_line_path,self.node_title,
status_dict))
class TestResultProxy(RPCRetry):
......@@ -204,7 +204,6 @@ class TestResultProxy(RPCRetry):
self._watcher_period = 60
self._watcher_dict = {}
self._watcher_condition = threading.Condition()
def __repr__(self):
return '<%s(%r, %r, %r) at %x>' % (self.__class__.__name__,
self._test_result_path, self._node_title, self._revision, id(self))
......@@ -220,7 +219,7 @@ class TestResultProxy(RPCRetry):
Return an TestResultLineProxy instance, or None if there is nothing to
do.
"""
result = self._retryRPC('startUnitTest', (self._test_result_path,
result = self._retryRPC('startUnitTest', (self._test_result_path,self.node_title,
exclude_list))
if result:
line_url, test_name = result
......@@ -414,6 +413,33 @@ class TaskDistributionTool(RPCRetry):
self._logger, test_result_path, node_title, revision)
return result
class TaskDistributor(RPCRetry):
def __init__(self,portal_url,retry_time=64,logger=None):
if logger is None:
logger = null_logger
if portal_url is None:
proxy = DummyTaskDistributionTool()
else:
proxy = xmlrpclib.ServerProxy(
portal_url,
allow_none=True,
)
super(TaskDistributor, self).__init__(proxy, retry_time,logger)
protocol_revision = self._retryRPC('getProtocolRevision')
if protocol_revision != 1:
raise ValueError('Unsupported protocol revision: %r',
protocol_revision)
def startTestSuite(self,node_title):
"""
Returns None if no test suite is needed.
therwise, returns a JSON with all the test suite parameters.
"""
result = self._retryRPC('startTestSuite',(node_title,))
return result
class DummyTaskDistributionTool(object):
"""
Fake remote server.
......
......@@ -70,48 +70,23 @@ def main(*args):
# do not change case of option keys
config.optionxform = str
config.readfp(parsed_argument.configuration_file[0])
for key in ('slapos_directory', 'working_directory', 'test_suite_directory',
'log_directory', 'run_directory', 'proxy_host', 'proxy_port',
'git_binary', 'zip_binary', 'test_suite_title', 'test_node_title',
'test_suite', 'project_title', 'node_quantity', 'ipv4_address',
'ipv6_address', 'test_suite_master_url', 'slapgrid_partition_binary',
'slapgrid_software_binary', 'slapproxy_binary'):
CONFIG[key] = config.get('testnode', key)
for key in ('slapos_directory','working_directory','test_suite_directory',
'log_directory','run_directory','proxy_host','proxy_port',
'git_binary','zip_binary','node_quantity','test_node_title',
'ipv4_address','ipv6_address','test_suite_master_url',
'slapgrid_partition_binary','slapgrid_software_binary',
'slapproxy_binary'):
CONFIG[key] = config.get('testnode',key)
for key in ('slapos_directory', 'working_directory', 'test_suite_directory',
'log_directory', 'run_directory'):
d = CONFIG[key]
if not os.path.isdir(d):
raise ValueError('Directory %r does not exists.' % d)
slapos_directory = CONFIG['slapos_directory']
CONFIG['software_root'] = software_root = os.path.join(slapos_directory,
'software')
CONFIG['instance_root'] = instance_root = os.path.join(slapos_directory,
'instance')
CONFIG['proxy_database'] = os.path.join(slapos_directory, 'proxy.db')
CONFIG['slapos_config'] = slapos_config = os.path.join(slapos_directory,
'slapos.cfg')
if not os.path.lexists(software_root):
os.mkdir(software_root)
CONFIG['master_url'] = 'http://%s:%s' % (CONFIG['proxy_host'],
CONFIG['proxy_port'])
open(slapos_config, 'w').write(pkg_resources.resource_string(
'erp5.util.testnode', 'template/slapos.cfg.in') % CONFIG)
CONFIG['runTestSuite'] = os.path.join(instance_root,
CONFIG['partition_reference'], 'bin', 'runTestSuite')
# generate vcs_repository_list
vcs_repository_list = []
for section in config.sections():
if section.startswith('vcs_repository'):
vcs_repository_list.append(dict(config.items(section)))
CONFIG['bt5_path'] = None
if 'bt5_path' in config.options("testnode"):
bt5_path = config.get("testnode", 'bt5_path')
if bt5_path.lower() != "none":
CONFIG['bt5_path'] = bt5_path
CONFIG['vcs_repository_list'] = vcs_repository_list
if 'bot_environment' in config.sections():
bot_environment = dict(config.items('bot_environment'))
else:
......
......@@ -24,14 +24,17 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from datetime import datetime
from datetime import datetime,timedelta
import os
import subprocess
import sys
import time
import glob
import SlapOSControler
import json
import time
import shutil
import pkg_resources
from ProcessManager import SubprocessError, ProcessManager, CancellationError
from Updater import Updater
from erp5.util import taskdistribution
......@@ -41,35 +44,67 @@ supervisord_pid_file = None
PROFILE_PATH_KEY = 'profile_path'
class DummyLogger(object):
def __init__(self, func):
for name in ('trace', 'debug', 'info', 'warn', 'warning', 'error',
'critical', 'fatal'):
setattr(self, name, func)
class TestNode(object):
def __init__(self, log, config):
self.log = log
self.config = config
self.process_manager = ProcessManager(log)
self.process_manager.supervisord_pid_file = os.path.join(config['instance_root'], 'var', 'run',
'supervisord.pid')
def run(self):
log = self.log
process_manager = self.process_manager
def checkOldTestSuite(self,test_suite_data):
config = self.config
slapgrid = None
previous_revision = None
installed_reference_set = set(os.listdir(config['slapos_directory']))
wished_reference_set = set([x['test_suite_reference'] for x in test_suite_data])
to_remove_reference_set = installed_reference_set.discard - wished_reference_set
for y in to_remove_reference_set:
fpath = os.path.join(config['slapos_directory'],y)
if os.path.isdir(fpath):
shutil.rmtree(fpath)
else:
os.remove(fpath)
pass
run_software = True
# Write our own software.cfg to use the local repository
def updateConfigForTestSuite(self, test_suite_data):
config = self.config
config["project_title"] = test_suite_data["project-title"]
config["test_suite"] = test_suite_data["test-suite"]
config["test_suite_title"] = test_suite_data["test-suite-title"]
config["test_suite_reference"] = test_suite_data["test-suite-reference"]
try:
config["additional_bt5_repository_id"] = test_suite_data["additional-bt5-repository-id"]
except KeyError:
pass
config["vcs_repository_list"] = test_suite_data["vcs-repository-list"]
config['working_directory'] = os.path.join(config['slapos_directory'],
config['test_suite_reference'])
if not(os.path.exists(config['working_directory'])):
os.mkdir(config['working_directory'])
config['instance_root'] = os.path.join(config['working_directory'],
'inst')
if not(os.path.exists(config['instance_root'])):
os.mkdir(config['instance_root'])
config['software_root'] = os.path.join(config['working_directory'],
'soft')
if not(os.path.exists(config['software_root'])):
os.mkdir(config['software_root'])
config['proxy_database'] = os.path.join(config['working_directory'],
'proxy.db')
custom_profile_path = os.path.join(config['working_directory'], 'software.cfg')
config['custom_profile_path'] = custom_profile_path
vcs_repository_list = config['vcs_repository_list']
config['slapos_config'] = os.path.join(config['working_directory'],
'slapos.cfg')
open(config['slapos_config'], 'w').write(pkg_resources.resource_string(
'erp5.util.testnode', 'template/slapos.cfg.in') % config)
def constructProfile(self):
vcs_repository_list = self.config['vcs_repository_list']
config = self.config
profile_content = ''
assert len(vcs_repository_list), "we must have at least one repository"
try:
......@@ -85,7 +120,7 @@ class TestNode(object):
url = vcs_repository['url']
buildout_section_id = vcs_repository.get('buildout_section_id', None)
repository_id = buildout_section_id or \
url.split('/')[-1].split('.')[0]
url.split('/')[-1].split('.')[0]
repository_path = os.path.join(config['working_directory'],repository_id)
vcs_repository['repository_id'] = repository_id
vcs_repository['repository_path'] = repository_path
......@@ -108,171 +143,245 @@ extends = %(software_config_path)s
repository = %(repository_path)s
branch = %(branch)s
""" % {'buildout_section_id': buildout_section_id,
'repository_path' : repository_path,
'branch' : vcs_repository.get('branch','master')}
'repository_path' : repository_path,
'branch' : vcs_repository.get('branch','master')}
if not profile_path_count:
raise ValueError(PROFILE_PATH_KEY + ' not defined')
custom_profile = open(custom_profile_path, 'w')
custom_profile = open(config['custom_profile_path'], 'w')
custom_profile.write(profile_content)
custom_profile.close()
config['repository_path'] = repository_path
sys.path.append(repository_path)
test_suite_title = config['test_suite_title'] or config['test_suite']
return vcs_repository_list
retry = False
retry_software_count = 0
same_revision_count = 0
def getFullRevisionList(self, revision_dict):
full_revision_list = []
config = self.config
log = self.log
test_suite_title = config['test_suite_title']
process_manager = self.process_manager
vcs_repository_list = self.config['vcs_repository_list']
for vcs_repository in vcs_repository_list:
repository_path = vcs_repository['repository_path']
repository_id = vcs_repository['repository_id']
if not os.path.exists(repository_path):
parameter_list = [config['git_binary'], 'clone',
vcs_repository['url']]
if vcs_repository.get('branch') is not None:
parameter_list.extend(['-b',vcs_repository.get('branch')])
parameter_list.append(repository_path)
log(subprocess.check_output(parameter_list, stderr=subprocess.STDOUT))
# Make sure we have local repository
updater = Updater(repository_path, git_binary=config['git_binary'],
log=log, process_manager=process_manager)
updater.checkout()
revision_dict[test_suite_title] = "-".join(updater.getRevision())
full_revision_list.append('%s=%s' % (repository_id, revision_dict[test_suite_title]))
revision_dict[test_suite_title] = ','.join(full_revision_list)
return full_revision_list
def addWatcher(self,test_result):
config = self.config
if config.get('log_file'):
log_file_name = config['log_file']
log_file = open(log_file_name)
log_file.seek(0, 2)
log_file.seek(-min(5000, log_file.tell()), 2)
test_result.addWatch(log_file_name,log_file,max_history_bytes=10000)
return log_file_name
def checkRevision(self,test_result,revision_dict,previous_revision_dict,
vcs_repository_list):
config = self.config
log = self.log
process_manager = self.process_manager
test_suite_title = config['test_suite_title']
if revision_dict[test_suite_title] != test_result.revision:
previous_revision_dict[test_suite_title] = test_result.revision
log('Disagreement on tested revision, checking out:')
for i, repository_revision in enumerate(previous_revision_dict[test_suite_title].split(',')):
vcs_repository = vcs_repository_list[i]
repository_path = vcs_repository['repository_path']
revision_dict[test_suite_title] = repository_revision.rsplit('-', 1)[1]
# other testnodes on other boxes are already ready to test another
# revision
log(' %s at %s' % (repository_path, revision_dict[test_suite_title]))
updater = Updater(repository_path, git_binary=config['git_binary'],
revision=revision_dict[test_suite_title], log=log,
process_manager=process_manager)
updater.checkout()
def prepareSlapOS(self,retry_software_count=0,retry=False):
config = self.config
log = self.log
process_manager = self.process_manager
slapproxy_log = os.path.join(config['log_directory'],
'slapproxy.log')
log('Configured slapproxy log to %r' % slapproxy_log)
log('testnode, retry_software_count : %r' % retry_software_count)
slapos_controler = SlapOSControler.SlapOSControler(config,
log=log, slapproxy_log=slapproxy_log, process_manager=process_manager,
reset_software=(retry_software_count>0 and retry_software_count%10 == 0))
for method_name in ("runSoftwareRelease", "runComputerPartition",):
slapos_method = getattr(slapos_controler, method_name)
status_dict = slapos_method(config,
environment=config['environment'],
)
if status_dict['status_code'] != 0:
retry = True
retry_software_count += 1
raise SubprocessError(status_dict)
else:
retry_software_count = 0
return status_dict
def _dealShebang(self,run_test_suite_path):
line = open(run_test_suite_path, 'r').readline()
invocation_list = []
if line[:2] == '#!':
invocation_list = line[2:].split()
return invocation_list
def runTestSuite(self,revision_dict,portal_url):
config = self.config
test_suite_title = config['test_suite_title']
run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" %config['instance_root'])
if not len(run_test_suite_path_list):
raise ValueError('No runTestSuite provided in installed partitions.')
run_test_suite_path = run_test_suite_path_list[0]
run_test_suite_revision = revision_dict[test_suite_title]
if isinstance(revision_dict[test_suite_title], tuple):
revision_dict[test_suite_title] = ','.join(revision_dict[test_suite_title])
# Deal with Shebang size limitation
invocation_list = self._dealShebang(run_test_suite_path)
invocation_list.extend([run_test_suite_path,
'--test_suite', config['test_suite'],
'--revision', revision_dict[test_suite_title],
'--test_suite_title', test_suite_title,
'--node_quantity', config['node_quantity'],
'--master_url', portal_url])
firefox_bin_list = glob.glob("%s/*/parts/firefox/firefox-slapos" % config["software_root"])
if len(firefox_bin_list):
invocation_list.extend(["--firefox_bin", firefox_bin_list[0]])
xvfb_bin_list = glob.glob("%s/*/parts/xserver/bin/Xvfb" % config["software_root"])
if len(xvfb_bin_list):
invocation_list.extend(["--xvfb_bin", xvfb_bin_list[0]])
bt5_path_list = config.get("bt5_path")
if bt5_path_list not in ('', None,):
invocation_list.extend(["--bt5_path", bt5_path_list])
# From this point, test runner becomes responsible for updating test
# result. We only do cleanup if the test runner itself is not able
# to run.
process_manager.spawn(*invocation_list,
cwd=config['test_suite_directory'],
log_prefix='runTestSuite', get_output=False)
def cleanUp(self,test_result):
process_manager = self.process_manager
log = self.log
log('Testnode.run, finally close')
process_manager.killPreviousRun()
if 0 and test_result is not None:
try:
test_result.removeWatch(log_file_name)
except KeyError:
log("KeyError, Watcher already deleted or not added correctly")
def run(self):
log = self.log
config = self.config
process_manager = self.process_manager
slapgrid = None
previous_revision_dict = {}
revision_dict = {}
now = time.time()
test_result = None
try:
while True:
try:
# kill processes from previous loop if any
process_manager.killPreviousRun()
full_revision_list = []
# Make sure we have local repository
for vcs_repository in vcs_repository_list:
repository_path = vcs_repository['repository_path']
repository_id = vcs_repository['repository_id']
if not os.path.exists(repository_path):
parameter_list = [config['git_binary'], 'clone',
vcs_repository['url']]
if vcs_repository.get('branch') is not None:
parameter_list.extend(['-b',vcs_repository.get('branch')])
parameter_list.append(repository_path)
log(subprocess.check_output(parameter_list, stderr=subprocess.STDOUT))
# Make sure we have local repository
updater = Updater(repository_path, git_binary=config['git_binary'],
log=log, process_manager=process_manager)
updater.checkout()
revision = "-".join(updater.getRevision())
full_revision_list.append('%s=%s' % (repository_id, revision))
revision = ','.join(full_revision_list)
if previous_revision == revision:
log('Same Revision')
same_revision_count += 1
if not(retry) and same_revision_count <= 2:
log('Sleeping a bit since same revision')
time.sleep(DEFAULT_SLEEP_TIMEOUT)
continue
same_revision_count = 0
log('Retrying install or checking if previous test was cancelled')
begin = time.time()
portal_url = config['test_suite_master_url']
portal = taskdistribution.TaskDistributionTool(portal_url, logger=DummyLogger(log))
test_suite_portal = taskdistribution.TaskDistributor(portal_url, logger=DummyLogger(log))
test_suite_json = test_suite_portal.startTestSuite(config['test_node_title'])
test_suite_data = json.loads(test_suite_json)
#Clean-up test suites
self.checkOldTestSuite(test_suite_data)
for test_suite in test_suite_data:
self.updateConfigForTestSuite(test_suite)
run_software = True
self.process_manager.supervisord_pid_file = os.path.join(config['instance_root'], 'var', 'run',
'supervisord.pid')
# Write our own software.cfg to use the local repository
vcs_repository_list = self.constructProfile()
retry = False
previous_revision = revision
portal_url = config['test_suite_master_url']
portal = taskdistribution.TaskDistributionTool(portal_url, logger = DummyLogger(log))
test_result = portal.createTestResult(revision,[],config['test_node_title'],False,test_suite_title,config['project_title'])
remote_test_result_needs_cleanup = True
log("testnode, test_result : %r" % (test_result, ))
if test_result is not None:
if config.get('log_file'):
log_file_name = config['log_file']
log_file = open(log_file_name)
log_file.seek(0,2)
log_file.seek(-min(5000,log_file.tell()),2)
test_result.addWatch(log_file_name,log_file,max_history_bytes=10000)
if revision != test_result.revision:
previous_revision = test_result.revision
log('Disagreement on tested revision, checking out:')
for i, repository_revision in enumerate(test_result.revision.split(',')):
vcs_repository = vcs_repository_list[i]
repository_path = vcs_repository['repository_path']
checkout_revision = repository_revision.rsplit('-', 1)[1]
# other testnodes on other boxes are already ready to test another
# revision
log(' %s at %s' % (repository_path, checkout_revision))
updater = Updater(repository_path, git_binary=config['git_binary'],
revision=checkout_revision, log=log,
process_manager=process_manager)
updater.checkout()
# Now prepare the installation of SlapOS and create instance
slapproxy_log = os.path.join(config['log_directory'],
'slapproxy.log')
log('Configured slapproxy log to %r' % slapproxy_log)
log('testnode, retry_software_count : %r' % retry_software_count)
slapos_controler = SlapOSControler.SlapOSControler(config,
log=log, slapproxy_log=slapproxy_log, process_manager=process_manager,
reset_software=(retry_software_count>0 and retry_software_count%10 == 0))
for method_name in ("runSoftwareRelease", "runComputerPartition",):
slapos_method = getattr(slapos_controler, method_name)
status_dict = slapos_method(config,
environment=config['environment'],
)
if status_dict['status_code'] != 0:
retry = True
retry_software_count += 1
raise SubprocessError(status_dict)
else:
retry_software_count = 0
# Give some time so computer partitions may start
# as partitions can be of any kind we have and likely will never have
# a reliable way to check if they are up or not ...
time.sleep(20)
run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" %config['instance_root'])
if not len(run_test_suite_path_list):
raise ValueError('No runTestSuite provided in installed partitions.')
run_test_suite_path = run_test_suite_path_list[0]
run_test_suite_revision = revision
if isinstance(revision, tuple):
revision = ','.join(revision)
# Deal with Shebang size limitation
line = open(run_test_suite_path, 'r').readline()
invocation_list = []
if line[:2] == '#!':
invocation_list = line[2:].split()
invocation_list.extend([run_test_suite_path,
'--test_suite', config['test_suite'],
'--revision', revision,
'--test_suite_title', test_suite_title,
'--node_quantity', config['node_quantity'],
'--master_url', portal_url])
firefox_bin_list = glob.glob("%s/*/parts/firefox/firefox-slapos" % config["software_root"])
if len(firefox_bin_list):
invocation_list.extend(["--firefox_bin", firefox_bin_list[0]])
xvfb_bin_list = glob.glob("%s/*/parts/xserver/bin/Xvfb" % config["software_root"])
if len(xvfb_bin_list):
invocation_list.extend(["--xvfb_bin", xvfb_bin_list[0]])
bt5_path_list = config.get("bt5_path")
if bt5_path_list not in ('', None,):
invocation_list.extend(["--bt5_path", bt5_path_list])
# From this point, test runner becomes responsible for updating test
# result. We only do cleanup if the test runner itself is not able
# to run.
process_manager.spawn(*invocation_list,
cwd=config['test_suite_directory'],
log_prefix='runTestSuite', get_output=False)
retry_software_count = 0
same_revision_count = 0
test_suite_title = config['test_suite_title']
try:
# kill processes from previous loop if any
process_manager.killPreviousRun()
full_revision_list = self.getFullRevisionList(revision_dict)
# Make sure we have local repository
now = time.time()
try:
if previous_revision_dict[test_suite_title] == revision_dict[test_suite_title]:
log('Same Revision')
same_revision_count += 1
if not(retry) and same_revision_count <= 2:
log('Sleeping a bit since same revision')
time.sleep(DEFAULT_SLEEP_TIMEOUT)
continue
same_revision_count = 0
log('Retrying install or checking if previous test was cancelled')
except KeyError:
pass
retry = False
previous_revision_dict[test_suite_title] = revision_dict[test_suite_title]
now = time.time()
test_result = portal.createTestResult(revision_dict[test_suite_title],[],
config['test_node_title'],False,test_suite_title,config['project_title'])
remote_test_result_needs_cleanup = True
log("testnode, test_result : %r" % (test_result, ))
if test_result is not None:
log_file_name = self.addWatcher(test_result)
self.checkRevision(test_result,revision_dict,previous_revision_dict,
vcs_repository_list)
# Now prepare the installation of SlapOS and create instance
status_dict = self.prepareSlapOS(retry_software_count,retry)
# Give some time so computer partitions may start
# as partitions can be of any kind we have and likely will never have
# a reliable way to check if they are up or not ...
time.sleep(20)
self.runTestSuite(revision_dict,portal_url)
if test_result is not None:
test_result.removeWatch(log_file_name)
except SubprocessError, e:
log("SubprocessError", exc_info=sys.exc_info())
if test_result is not None:
test_result.removeWatch(log_file_name)
except SubprocessError, e:
log("SubprocessError", exc_info=sys.exc_info())
if test_result is not None:
test_result.removeWatch(log_file_name)
if remote_test_result_needs_cleanup:
status_dict = e.status_dict or {}
test_result.reportFailure(
command = status_dict.get('command'),
stdout = status_dict.get('stdout'),
stderr = status_dict.get('stderr'),
)
log("SubprocessError, going to sleep %s" % DEFAULT_SLEEP_TIMEOUT)
time.sleep(DEFAULT_SLEEP_TIMEOUT)
continue
except CancellationError, e:
log("CancellationError", exc_info=sys.exc_info())
process_manager.under_cancellation = False
retry = True
continue
except:
log("erp5testnode exception", exc_info=sys.exc_info())
raise
if remote_test_result_needs_cleanup:
status_dict = e.status_dict or {}
test_result.reportFailure(
command=status_dict.get('command'),
stdout=status_dict.get('stdout'),
stderr=status_dict.get('stderr'),
)
log("SubprocessError, going to sleep %s" % DEFAULT_SLEEP_TIMEOUT)
time.sleep(DEFAULT_SLEEP_TIMEOUT)
continue
except CancellationError, e:
log("CancellationError", exc_info=sys.exc_info())
process_manager.under_cancellation = False
retry = True
continue
except:
log("erp5testnode exception", exc_info=sys.exc_info())
raise
if (now-begin) < 120:
time.sleep(120 - (now-begin))
finally:
# Nice way to kill *everything* generated by run process -- process
# groups working only in POSIX compilant systems
# Exceptions are swallowed during cleanup phase
log('Testnode.run, finally close')
process_manager.killPreviousRun()
if test_result is not None:
try:
test_result.removeWatch(log_file_name)
except KeyError:
log("KeyError, Watcher already deleted or not added correctly")
# Exceptions are swallowed during cleanup phas
self.cleanUp(test_result)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment