Commit 887aa655 authored by Łukasz Nowak's avatar Łukasz Nowak

Apply changes from slapos/erp5testnode branch.

Changes are done manually, as files were moved.

Original log:

02666ca Enclose runTestSuite in its workspace.
d029684 Use safeRpcCall always during calling master.
70d2e4b Setup process group for test suite.
e94ff72 Always kill supervisord.
677895a Remove not filled kill lists.
b228b13 Make import list diff/patch friendly.
8ef8b9b Missing import.
37da900 Always set partition's permission.
346ff3b Cleanup test result when SubprocessError is raised.
5515a97 Reuse SubprocessError exception in more error cases.
e72a7c8 Remove no-op.
048d47d Remove debugging log.
dc8276c Increase verbosity.
2d47034 Get rid of empty lines between each commands.
bdd79ce Use timestamped logger in Updater & SlapOSControler.
e0ec540 Fix typo in log message.
d46f496 Output formated timestamp in front of each message.
2dfa0a9 Don't write to some dynamically-named, pwd-relative file.
de91c1f Catch all RPC exceptions.
d413f61 Move log files to testrunner's log_directory.
4d357a4 Provide and use run_directory instead of computing it.
0cd300b Define profile_path on individual repository list entry.
1f5e7a3 save memory by stopping supervisor

Original commits are available in http://git.erp5.org/repos/slapos.git
repository.
parent b80072bc
......@@ -29,7 +29,12 @@ from xml_marshaller import xml_marshaller
class SlapOSControler(object):
def __init__(self, config, process_group_pid_set=None):
def log(self, message):
print message
def __init__(self, config, process_group_pid_set=None, log=None):
if log is not None:
self.log = log
self.config = config
# By erasing everything, we make sure that we are able to "update"
# existing profiles. This is quite dirty way to do updates...
......@@ -54,7 +59,7 @@ class SlapOSControler(object):
partition_path = os.path.join(config['instance_root'], partition_reference)
if not os.path.exists(partition_path):
os.mkdir(partition_path)
os.chmod(partition_path, 0750)
os.chmod(partition_path, 0750)
computer.updateConfiguration(xml_marshaller.dumps({
'address': config['ipv4_address'],
'instance_root': config['instance_root'],
......@@ -74,13 +79,14 @@ class SlapOSControler(object):
def runSoftwareRelease(self, config, environment, process_group_pid_set=None,
stdout=None, stderr=None):
print "SlapOSControler.runSoftwareRelease"
self.log("SlapOSControler.runSoftwareRelease")
cpu_count = os.sysconf("SC_NPROCESSORS_ONLN")
os.putenv('MAKEFLAGS', '-j%s' % cpu_count)
os.environ['PATH'] = environment['PATH']
slapgrid = subprocess.Popen([config['slapgrid_software_binary'], '-v', '-c',
command = [config['slapgrid_software_binary'], '-v', '-c',
#'--buildout-parameter',"'-U -N' -o",
config['slapos_config']],
config['slapos_config']]
slapgrid = subprocess.Popen(command,
stdout=stdout, stderr=stderr,
close_fds=True, preexec_fn=os.setsid)
process_group_pid_set.add(slapgrid.pid)
......@@ -89,6 +95,7 @@ class SlapOSControler(object):
stderr.seek(0)
process_group_pid_set.remove(slapgrid.pid)
status_dict = {'status_code':slapgrid.returncode,
'command': repr(command),
'stdout':stdout.read(),
'stderr':stderr.read()}
stdout.close()
......@@ -98,13 +105,14 @@ class SlapOSControler(object):
def runComputerPartition(self, config, environment,
process_group_pid_set=None,
stdout=None, stderr=None):
print "SlapOSControler.runSoftwareRelease"
self.log("SlapOSControler.runComputerPartition")
slap = slapos.slap.slap()
slap.registerOpenOrder().request(self.software_profile,
partition_reference='testing partition',
partition_parameter_kw=config['instance_dict'])
slapgrid = subprocess.Popen([config['slapgrid_partition_binary'],
config['slapos_config'], '-c', '-v'],
command = [config['slapgrid_partition_binary'],
config['slapos_config'], '-c', '-v']
slapgrid = subprocess.Popen(command,
stdout=stdout, stderr=stderr,
close_fds=True, preexec_fn=os.setsid)
process_group_pid_set.add(slapgrid.pid)
......@@ -113,6 +121,7 @@ class SlapOSControler(object):
stderr.seek(0)
process_group_pid_set.remove(slapgrid.pid)
status_dict = {'status_code':slapgrid.returncode,
'command': repr(command),
'stdout':stdout.read(),
'stderr':stderr.read()}
stdout.close()
......
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os, sys, subprocess, re, threading
import errno
import os
import re
import subprocess
import sys
import threading
from testnode import SubprocessError
_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search
......@@ -79,7 +59,13 @@ class Updater(object):
realtime_output = True
stdin = file(os.devnull)
def __init__(self, repository_path, revision=None, git_binary=None):
def log(self, message):
print message
def __init__(self, repository_path, revision=None, git_binary=None,
log=None):
if log is not None:
self.log = log
self.revision = revision
self._path_list = []
self.repository_path = repository_path
......@@ -120,7 +106,7 @@ class Updater(object):
quiet = kw.pop('quiet', False)
env = kw and dict(os.environ, **kw) or None
command = format_command(*args, **kw)
print '\n$ ' + command
self.log('$ ' + command)
sys.stdout.flush()
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env,
......
......@@ -53,7 +53,12 @@ def main(*args):
return config.get('testnode', o)
CONFIG['slapos_directory'] = geto('slapos_directory')
CONFIG['working_directory'] = geto('working_directory')
for d in CONFIG['slapos_directory'], CONFIG['working_directory']:
CONFIG['test_suite_directory'] = geto('test_suite_directory')
CONFIG['log_directory'] = geto('log_directory')
CONFIG['run_directory'] = geto('run_directory')
for d in CONFIG['slapos_directory'], CONFIG['working_directory'], \
CONFIG['test_suite_directory'], CONFIG['log_directory'], \
CONFIG['run_directory']:
if not os.path.isdir(d):
raise ValueError('Directory %r does not exists.' % d)
CONFIG['software_root'] = os.path.join(CONFIG['slapos_directory'],
......@@ -87,7 +92,6 @@ def main(*args):
vcs_repository_list.append(dict(config.items(section)))
CONFIG['vcs_repository_list'] = vcs_repository_list
CONFIG['profile_path'] = geto('profile_path')
CONFIG['test_suite_title'] = geto('test_suite_title')
CONFIG['test_node_title'] = geto('test_node_title')
CONFIG['test_suite'] = geto('test_suite')
......
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from xml_marshaller import xml_marshaller
import os, xmlrpclib, time, imp
from glob import glob
......@@ -33,8 +7,9 @@ import subprocess
import sys
import socket
import pprint
import traceback
from SlapOSControler import SlapOSControler
import time
class SubprocessError(EnvironmentError):
def __init__(self, status_dict):
......@@ -47,25 +22,18 @@ class SubprocessError(EnvironmentError):
from Updater import Updater
def log(message):
# Log to stdout, with a timestamp.
print time.strftime('%Y/%m/%d %H:%M:%S'), message
supervisord_pid_file = None
process_group_pid_set = set()
process_pid_file_list = []
process_command_list = []
def sigterm_handler(signal, frame):
for pgpid in process_group_pid_set:
try:
os.killpg(pgpid, signal.SIGTERM)
except:
pass
for pid_file in process_pid_file_list:
try:
os.kill(int(open(pid_file).read().strip()), signal.SIGTERM)
except:
pass
for p in process_command_list:
try:
subprocess.call(p)
except:
pass
sys.exit(1)
signal.signal(signal.SIGTERM, sigterm_handler)
......@@ -83,18 +51,33 @@ def safeRpcCall(function, *args):
def getInputOutputFileList(config, command_name):
stdout = open(os.path.join(
config['instance_root'],'.%s_out' % command_name),
config['log_directory'],'%s_out' % command_name),
'w+')
stdout.write("%s\n" % command_name)
stderr = open(os.path.join(
config['instance_root'],'.%s_err' % command_name),
config['log_directory'],'%s_err' % command_name),
'w+')
return (stdout, stderr)
slapos_controler = None
def killPreviousRun():
for pgpid in process_group_pid_set:
try:
os.killpg(pgpid, signal.SIGTERM)
except:
pass
try:
if os.path.exists(supervisord_pid_file):
os.kill(int(open(supervisord_pid_file).read().strip()), signal.SIGTERM)
except:
pass
PROFILE_PATH_KEY = 'profile_path'
def run(config):
slapgrid = None
global supervisord_pid_file
supervisord_pid_file = os.path.join(config['instance_root'], 'var', 'run',
'supervisord.pid')
subprocess.check_call([config['git_binary'],
......@@ -108,6 +91,14 @@ def run(config):
vcs_repository_list = config['vcs_repository_list']
profile_content = None
assert len(vcs_repository_list), "we must have at least one repository"
try:
# BBB: Accept global profile_path, which is the same as setting it for the
# first configured repository.
profile_path = config.pop(PROFILE_PATH_KEY)
except KeyError:
pass
else:
vcs_repository_list[0][PROFILE_PATH_KEY] = profile_path
for vcs_repository in vcs_repository_list:
url = vcs_repository['url']
buildout_section_id = vcs_repository.get('buildout_section_id', None)
......@@ -116,12 +107,17 @@ def run(config):
repository_path = os.path.join(config['working_directory'],repository_id)
vcs_repository['repository_id'] = repository_id
vcs_repository['repository_path'] = repository_path
if profile_content is None:
try:
profile_path = vcs_repository[PROFILE_PATH_KEY]
except KeyError:
pass
else:
if profile_content is not None:
raise ValueError(PROFILE_PATH_KEY + ' defined more than once')
profile_content = """
[buildout]
extends = %(software_config_path)s
""" % {'software_config_path': os.path.join(repository_path,
config['profile_path'])}
""" % {'software_config_path': os.path.join(repository_path, profile_path)}
if not(buildout_section_id is None):
profile_content += """
[%(buildout_section_id)s]
......@@ -131,6 +127,8 @@ branch = %(branch)s
'repository_path' : repository_path,
'branch' : vcs_repository.get('branch','master')}
if profile_content is None:
raise ValueError(PROFILE_PATH_KEY + ' not defined')
custom_profile = open(custom_profile_path, 'w')
custom_profile.write(profile_content)
custom_profile.close()
......@@ -141,13 +139,10 @@ branch = %(branch)s
retry_software = False
try:
while True:
remote_test_result_needs_cleanup = False
# kill processes from previous loop if any
try:
for pgpid in process_group_pid_set:
try:
os.killpg(pgpid, signal.SIGTERM)
except:
pass
killPreviousRun()
process_group_pid_set.clear()
full_revision_list = []
# Make sure we have local repository
......@@ -162,19 +157,21 @@ branch = %(branch)s
parameter_list.append(repository_path)
subprocess.check_call(parameter_list)
# Make sure we have local repository
updater = Updater(repository_path, git_binary=config['git_binary'])
updater = Updater(repository_path, git_binary=config['git_binary'],
log=log)
updater.checkout()
revision = "-".join(updater.getRevision())
full_revision_list.append('%s=%s' % (repository_id, revision))
revision = ','.join(full_revision_list)
if previous_revision == revision:
log('Sleeping a bit')
time.sleep(120)
if not(retry_software):
continue
log('Retrying install')
retry_software = False
previous_revision = revision
print config
portal_url = config['test_suite_master_url']
test_result_path = None
test_result = (test_result_path, revision)
......@@ -185,27 +182,31 @@ branch = %(branch)s
(portal_url, 'portal_task_distribution'),
allow_none=1)
master = portal.portal_task_distribution
assert master.getProtocolRevision() == 1
assert safeRpcCall(master.getProtocolRevision) == 1
test_result = safeRpcCall(master.createTestResult,
config['test_suite'], revision, [],
False, test_suite_title,
config['test_node_title'], config['project_title'])
print "testnode, test_result : %r" % (test_result,)
remote_test_result_needs_cleanup = True
log("testnode, test_result : %r" % (test_result, ))
if test_result:
test_result_path, test_revision = test_result
if revision != test_revision:
log('Disagreement on tested revision, checking out:')
for i, repository_revision in enumerate(test_revision.split(',')):
vcs_repository = vcs_repository_list[i]
repository_path = vcs_repository['repository_path']
revision = repository_revision.split('-')[1]
# other testnodes on other boxes are already ready to test another
# revision
log(' %s at %s' % (repository_path, revision))
updater = Updater(repository_path, git_binary=config['git_binary'],
revision=repository_revision.split('-')[1])
revision=revision)
updater.checkout()
# Now prepare the installation of SlapOS and create instance
slapos_controler = SlapOSControler(config,
process_group_pid_set=process_group_pid_set)
process_group_pid_set=process_group_pid_set, log=log)
for method_name in ("runSoftwareRelease", "runComputerPartition"):
stdout, stderr = getInputOutputFileList(config, method_name)
slapos_method = getattr(slapos_controler, method_name)
......@@ -215,19 +216,20 @@ branch = %(branch)s
stdout=stdout, stderr=stderr
)
if status_dict['status_code'] != 0:
break
if status_dict['status_code'] != 0:
safeRpcCall(master.reportTaskFailure,
test_result_path, status_dict, config['test_node_title'])
retry_software = True
continue
retry_software = True
raise SubprocessError(status_dict)
partition_path = os.path.join(config['instance_root'],
config['partition_reference'])
run_test_suite_path = os.path.join(partition_path, 'bin',
'runTestSuite')
if not os.path.exists(run_test_suite_path):
raise ValueError('No %r provided' % run_test_suite_path)
raise SubprocessError({
'command': 'os.path.exists(run_test_suite_path)',
'status_code': 1,
'stdout': '',
'stderr': 'File does not exist: %r' % (run_test_suite_path, ),
})
run_test_suite_revision = revision
if isinstance(revision, tuple):
......@@ -245,11 +247,20 @@ branch = %(branch)s
'--test_suite_title', test_suite_title,
'--node_quantity', config['node_quantity'],
'--master_url', config['test_suite_master_url']])
run_test_suite = subprocess.Popen(invocation_list)
# From this point, test runner becomes responsible for updating test
# result.
# XXX: is it good for all cases (eg: test runner fails too early for
# any custom code to pick the failure up and react ?)
remote_test_result_needs_cleanup = False
run_test_suite = subprocess.Popen(invocation_list,
preexec_fn=os.setsid, cwd=config['test_suite_directory'])
process_group_pid_set.add(run_test_suite.pid)
run_test_suite.wait()
process_group_pid_set.remove(run_test_suite.pid)
except SubprocessError:
except SubprocessError, e:
if remote_test_result_needs_cleanup:
safeRpcCall(master.reportTaskFailure,
test_result_path, e.status_dict, config['test_node_title'])
time.sleep(120)
continue
......@@ -257,15 +268,4 @@ branch = %(branch)s
# Nice way to kill *everything* generated by run process -- process
# groups working only in POSIX compilant systems
# Exceptions are swallowed during cleanup phase
print "going to kill %r" % (process_group_pid_set,)
for pgpid in process_group_pid_set:
try:
os.killpg(pgpid, signal.SIGTERM)
except:
pass
try:
if os.path.exists(supervisord_pid_file):
os.kill(int(open(supervisord_pid_file).read().strip()), signal.SIGTERM)
except:
pass
killPreviousRun()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment