Commit eec87fde authored by Julien Muchembled's avatar Julien Muchembled

Move slapos.recipe.erp5testnode back into erp5.git, renamed to erp5.util.testnode

parents f4e37cf7 96155114
# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
try:
__import__('pkg_resources').declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
erp5testnode
============
# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
try:
__import__('pkg_resources').declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
import slapos.slap, subprocess, os, time
from xml_marshaller import xml_marshaller
class SlapOSControler(object):
def __init__(self, config, process_group_pid_set=None):
self.config = config
# By erasing everything, we make sure that we are able to "update"
# existing profiles. This is quite dirty way to do updates...
if os.path.exists(config['proxy_database']):
os.unlink(config['proxy_database'])
proxy = subprocess.Popen([config['slapproxy_binary'],
config['slapos_config']], close_fds=True, preexec_fn=os.setsid)
process_group_pid_set.add(proxy.pid)
# XXX: dirty, giving some time for proxy to being able to accept
# connections
time.sleep(10)
slap = slapos.slap.slap()
slap.initializeConnection(config['master_url'])
# register software profile
self.software_profile = config['custom_profile_path']
slap.registerSupply().supply(
self.software_profile,
computer_guid=config['computer_id'])
computer = slap.registerComputer(config['computer_id'])
# create partition and configure computer
partition_reference = config['partition_reference']
partition_path = os.path.join(config['instance_root'], partition_reference)
if not os.path.exists(partition_path):
os.mkdir(partition_path)
os.chmod(partition_path, 0750)
computer.updateConfiguration(xml_marshaller.dumps({
'address': config['ipv4_address'],
'instance_root': config['instance_root'],
'netmask': '255.255.255.255',
'partition_list': [{'address_list': [{'addr': config['ipv4_address'],
'netmask': '255.255.255.255'},
{'addr': config['ipv6_address'],
'netmask': 'ffff:ffff:ffff::'},
],
'path': partition_path,
'reference': partition_reference,
'tap': {'name': partition_reference},
}
],
'reference': config['computer_id'],
'software_root': config['software_root']}))
def runSoftwareRelease(self, config, environment, process_group_pid_set=None,
stdout=None, stderr=None):
print "SlapOSControler.runSoftwareRelease"
cpu_count = os.sysconf("SC_NPROCESSORS_ONLN")
os.putenv('MAKEFLAGS', '-j%s' % cpu_count)
os.environ['PATH'] = environment['PATH']
slapgrid = subprocess.Popen([config['slapgrid_software_binary'], '-v', '-c',
#'--buildout-parameter',"'-U -N' -o",
config['slapos_config']],
stdout=stdout, stderr=stderr,
close_fds=True, preexec_fn=os.setsid)
process_group_pid_set.add(slapgrid.pid)
slapgrid.wait()
stdout.seek(0)
stderr.seek(0)
process_group_pid_set.remove(slapgrid.pid)
status_dict = {'status_code':slapgrid.returncode,
'stdout':stdout.read(),
'stderr':stderr.read()}
stdout.close()
stderr.close()
return status_dict
def runComputerPartition(self, config, environment,
process_group_pid_set=None,
stdout=None, stderr=None):
print "SlapOSControler.runSoftwareRelease"
slap = slapos.slap.slap()
slap.registerOpenOrder().request(self.software_profile,
partition_reference='testing partition',
partition_parameter_kw=config['instance_dict'])
slapgrid = subprocess.Popen([config['slapgrid_partition_binary'],
config['slapos_config'], '-c', '-v'],
stdout=stdout, stderr=stderr,
close_fds=True, preexec_fn=os.setsid)
process_group_pid_set.add(slapgrid.pid)
slapgrid.wait()
stdout.seek(0)
stderr.seek(0)
process_group_pid_set.remove(slapgrid.pid)
status_dict = {'status_code':slapgrid.returncode,
'stdout':stdout.read(),
'stderr':stderr.read()}
stdout.close()
stderr.close()
return status_dict
import os, sys, subprocess, re, threading
from testnode import SubprocessError
_format_command_search = re.compile("[[\\s $({?*\\`#~';<>&|]").search
_format_command_escape = lambda s: "'%s'" % r"'\''".join(s.split("'"))
def format_command(*args, **kw):
cmdline = []
for k, v in sorted(kw.items()):
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append('%s=%s' % (k, v))
for v in args:
if _format_command_search(v):
v = _format_command_escape(v)
cmdline.append(v)
return ' '.join(cmdline)
def subprocess_capture(p, quiet=False):
def readerthread(input, output, buffer):
while True:
data = input.readline()
if not data:
break
output(data)
buffer.append(data)
if p.stdout:
stdout = []
output = quiet and (lambda data: None) or sys.stdout.write
stdout_thread = threading.Thread(target=readerthread,
args=(p.stdout, output, stdout))
stdout_thread.setDaemon(True)
stdout_thread.start()
if p.stderr:
stderr = []
stderr_thread = threading.Thread(target=readerthread,
args=(p.stderr, sys.stderr.write, stderr))
stderr_thread.setDaemon(True)
stderr_thread.start()
if p.stdout:
stdout_thread.join()
if p.stderr:
stderr_thread.join()
p.wait()
return (p.stdout and ''.join(stdout),
p.stderr and ''.join(stderr))
GIT_TYPE = 'git'
SVN_TYPE = 'svn'
class Updater(object):
_git_cache = {}
realtime_output = True
stdin = file(os.devnull)
def __init__(self, repository_path, revision=None, git_binary=None):
self.revision = revision
self._path_list = []
self.repository_path = repository_path
self.git_binary = git_binary
def getRepositoryPath(self):
return self.repository_path
def getRepositoryType(self):
try:
return self.repository_type
except AttributeError:
# guess the type of repository we have
if os.path.isdir(os.path.join(
self.getRepositoryPath(), '.git')):
repository_type = GIT_TYPE
elif os.path.isdir(os.path.join(
self.getRepositoryPath(), '.svn')):
repository_type = SVN_TYPE
else:
raise NotImplementedError
self.repository_type = repository_type
return repository_type
def deletePycFiles(self, path):
"""Delete *.pyc files so that deleted/moved files can not be imported"""
for path, dir_list, file_list in os.walk(path):
for file in file_list:
if file[-4:] in ('.pyc', '.pyo'):
# allow several processes clean the same folder at the same time
try:
os.remove(os.path.join(path, file))
except OSError, e:
if e.errno != errno.ENOENT:
raise
def spawn(self, *args, **kw):
quiet = kw.pop('quiet', False)
env = kw and dict(os.environ, **kw) or None
command = format_command(*args, **kw)
print '\n$ ' + command
sys.stdout.flush()
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env,
cwd=self.getRepositoryPath())
if self.realtime_output:
stdout, stderr = subprocess_capture(p, quiet)
else:
stdout, stderr = p.communicate()
if not quiet:
sys.stdout.write(stdout)
sys.stderr.write(stderr)
result = dict(status_code=p.returncode, command=command,
stdout=stdout, stderr=stderr)
if p.returncode:
raise SubprocessError(result)
return result
def _git(self, *args, **kw):
return self.spawn(self.git_binary, *args, **kw)['stdout'].strip()
def _git_find_rev(self, ref):
try:
return self._git_cache[ref]
except KeyError:
if os.path.exists('.git/svn'):
r = self._git('svn', 'find-rev', ref)
assert r
self._git_cache[ref[0] != 'r' and 'r%u' % int(r) or r] = ref
else:
r = self._git('rev-list', '--topo-order', '--count', ref), ref
self._git_cache[ref] = r
return r
def getRevision(self, *path_list):
if not path_list:
path_list = self._path_list
if self.getRepositoryType() == GIT_TYPE:
h = self._git('log', '-1', '--format=%H', '--', *path_list)
return self._git_find_rev(h)
elif self.getRepositoryType() == SVN_TYPE:
stdout = self.spawn('svn', 'info', *path_list)['stdout']
return str(max(map(int, SVN_CHANGED_REV.findall(stdout))))
raise NotImplementedError
def checkout(self, *path_list):
if not path_list:
path_list = '.',
revision = self.revision
if self.getRepositoryType() == GIT_TYPE:
# edit .git/info/sparse-checkout if you want sparse checkout
if revision:
if type(revision) is str:
h = revision
else:
h = revision[1]
if h != self._git('rev-parse', 'HEAD'):
self.deletePycFiles('.')
self._git('reset', '--merge', h)
else:
self.deletePycFiles('.')
if os.path.exists('.git/svn'):
self._git('svn', 'rebase')
else:
self._git('pull', '--ff-only')
self.revision = self._git_find_rev(self._git('rev-parse', 'HEAD'))
elif self.getRepositoryType() == SVN_TYPE:
# following code allows sparse checkout
def svn_mkdirs(path):
path = os.path.dirname(path)
if path and not os.path.isdir(path):
svn_mkdirs(path)
self.spawn(*(args + ['--depth=empty', path]))
for path in path_list:
args = ['svn', 'up', '--force', '--non-interactive']
if revision:
args.append('-r%s' % revision)
svn_mkdirs(path)
args += '--set-depth=infinity', path
self.deletePycFiles(path)
try:
status_dict = self.spawn(*args)
except SubprocessError, e:
if 'cleanup' not in e.stderr:
raise
self.spawn('svn', 'cleanup', path)
status_dict = self.spawn(*args)
if not revision:
self.revision = revision = SVN_UP_REV.findall(
status_dict['stdout'].splitlines()[-1])[0]
else:
raise NotImplementedError
self._path_list += path_list
##############################################################################
#
# Copyright (c) 2010 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from slapos.recipe.librecipe import BaseSlapRecipe
import os
import pkg_resources
import zc.buildout
import zc.recipe.egg
import sys
CONFIG = dict(
proxy_port='5000',
computer_id='COMPUTER',
partition_reference='test0',
)
class Recipe(BaseSlapRecipe):
def __init__(self, buildout, name, options):
self.egg = zc.recipe.egg.Egg(buildout, options['recipe'], options)
BaseSlapRecipe.__init__(self, buildout, name, options)
def installSlapOs(self):
CONFIG['slapos_directory'] = self.createDataDirectory('slapos')
CONFIG['working_directory'] = self.createDataDirectory('testnode')
CONFIG['software_root'] = os.path.join(CONFIG['slapos_directory'],
'software')
CONFIG['instance_root'] = os.path.join(CONFIG['slapos_directory'],
'instance')
CONFIG['proxy_database'] = os.path.join(CONFIG['slapos_directory'],
'proxy.db')
CONFIG['proxy_host'] = self.getLocalIPv4Address()
CONFIG['master_url'] = 'http://%s:%s' % (CONFIG['proxy_host'],
CONFIG['proxy_port'])
self._createDirectory(CONFIG['software_root'])
self._createDirectory(CONFIG['instance_root'])
CONFIG['slapos_config'] = self.createConfigurationFile('slapos.cfg',
self.substituteTemplate(pkg_resources.resource_filename(__name__,
'template/slapos.cfg.in'), CONFIG))
self.path_list.append(CONFIG['slapos_config'])
def setupRunningWrapper(self):
self.path_list.extend(zc.buildout.easy_install.scripts([(
'testnode',
__name__+'.testnode', 'run')], self.ws,
sys.executable, self.wrapper_directory, arguments=[
dict(
computer_id=CONFIG['computer_id'],
instance_dict=eval(self.parameter_dict.get('instance_dict', '{}')),
instance_root=CONFIG['instance_root'],
ipv4_address=self.getLocalIPv4Address(),
ipv6_address=self.getGlobalIPv6Address(),
master_url=CONFIG['master_url'],
profile_path=self.parameter_dict['profile_path'],
proxy_database=CONFIG['proxy_database'],
proxy_port=CONFIG['proxy_port'],
slapgrid_partition_binary=self.options['slapgrid_partition_binary'],
slapgrid_software_binary=self.options['slapgrid_software_binary'],
slapos_config=CONFIG['slapos_config'],
slapproxy_binary=self.options['slapproxy_binary'],
git_binary=self.options['git_binary'],
software_root=CONFIG['software_root'],
working_directory=CONFIG['working_directory'],
vcs_repository_list=eval(self.parameter_dict.get('vcs_repository_list'),),
node_quantity=self.parameter_dict.get('node_quantity', '1'),
test_suite_master_url=self.parameter_dict.get(
'test_suite_master_url', None),
test_suite=self.parameter_dict.get('test_suite'),
test_suite_title=self.parameter_dict.get('test_suite_title'),
test_node_title=self.parameter_dict.get('test_node_title'),
project_title=self.parameter_dict.get('project_title'),
bin_directory=self.bin_directory,
# botenvironemnt is splittable string of key=value to substitute
# environment of running bot
bot_environment=self.parameter_dict.get('bot_environment', ''),
partition_reference=CONFIG['partition_reference'],
environment=dict(PATH=os.environ['PATH']),
vcs_authentication_list=eval(self.parameter_dict.get(
'vcs_authentication_list', 'None')),
)
]))
def installLocalGit(self):
git_dict = dict(git_binary = self.options['git_binary'])
git_dict.update(self.parameter_dict)
double_slash_end_position = 1
# XXX, this should be provided by slapos
print "bin_directory : %r" % self.bin_directory
home_directory = os.path.join(*os.path.split(self.bin_directory)[0:-1])
print "home_directory : %r" % home_directory
git_dict.setdefault("git_server_name", "git.erp5.org")
if git_dict.get('vcs_authentication_list', None) is not None:
vcs_authentication_list = eval(git_dict['vcs_authentication_list'])
netrc_file = open(os.path.join(home_directory, '.netrc'), 'w')
for vcs_authentication_dict in vcs_authentication_list:
netrc_file.write("""
machine %(host)s
login %(user_name)s
password %(password)s
""" % vcs_authentication_dict)
netrc_file.close()
def installLocalRepository(self):
self.installLocalGit()
def installLocalZip(self):
zip = os.path.join(self.bin_directory, 'zip')
if os.path.lexists(zip):
os.unlink(zip)
os.symlink(self.options['zip_binary'], zip)
def installLocalPython(self):
"""Installs local python fully featured with eggs"""
self.path_list.extend(zc.buildout.easy_install.scripts([], self.ws,
sys.executable, self.bin_directory, scripts=None,
interpreter='python'))
def installLocalRunUnitTest(self):
link = os.path.join(self.bin_directory, 'runUnitTest')
destination = os.path.join(CONFIG['instance_root'],
CONFIG['partition_reference'], 'bin', 'runUnitTest')
if os.path.lexists(link):
if not os.readlink(link) != destination:
os.unlink(link)
if not os.path.lexists(link):
os.symlink(destination, link)
def _install(self):
self.requirements, self.ws = self.egg.working_set()
self.path_list = []
self.installSlapOs()
self.setupRunningWrapper()
self.installLocalRepository()
self.installLocalZip()
self.installLocalPython()
self.installLocalRunUnitTest()
return self.path_list
[slapos]
software_root = %(software_root)s
instance_root = %(instance_root)s
master_url = %(master_url)s
computer_id = %(computer_id)s
[slapproxy]
host = %(proxy_host)s
port = %(proxy_port)s
database_uri = %(proxy_database)s
from xml_marshaller import xml_marshaller
import os, xmlrpclib, time, imp
from glob import glob
import signal
import slapos.slap
import subprocess
import sys
import socket
import pprint
from SlapOSControler import SlapOSControler
class SubprocessError(EnvironmentError):
def __init__(self, status_dict):
self.status_dict = status_dict
def __getattr__(self, name):
return self.status_dict[name]
def __str__(self):
return 'Error %i' % self.status_code
from Updater import Updater
process_group_pid_set = set()
process_pid_file_list = []
process_command_list = []
def sigterm_handler(signal, frame):
for pgpid in process_group_pid_set:
try:
os.killpg(pgpid, signal.SIGTERM)
except:
pass
for pid_file in process_pid_file_list:
try:
os.kill(int(open(pid_file).read().strip()), signal.SIGTERM)
except:
pass
for p in process_command_list:
try:
subprocess.call(p)
except:
pass
sys.exit(1)
signal.signal(signal.SIGTERM, sigterm_handler)
def safeRpcCall(function, *args):
retry = 64
while True:
try:
return function(*args)
except (socket.error, xmlrpclib.ProtocolError), e:
print >>sys.stderr, e
pprint.pprint(args, file(function._Method__name, 'w'))
time.sleep(retry)
retry += retry >> 1
def getInputOutputFileList(config, command_name):
stdout = open(os.path.join(
config['instance_root'],'.%s_out' % command_name),
'w+')
stdout.write("%s\n" % command_name)
stderr = open(os.path.join(
config['instance_root'],'.%s_err' % command_name),
'w+')
return (stdout, stderr)
slapos_controler = None
def run(args):
config = args[0]
slapgrid = None
supervisord_pid_file = os.path.join(config['instance_root'], 'var', 'run',
'supervisord.pid')
subprocess.check_call([config['git_binary'],
"config", "--global", "http.sslVerify", "false"])
previous_revision = None
run_software = True
# Write our own software.cfg to use the local repository
custom_profile_path = os.path.join(config['working_directory'], 'software.cfg')
config['custom_profile_path'] = custom_profile_path
vcs_repository_list = config['vcs_repository_list']
profile_content = None
assert len(vcs_repository_list), "we must have at least one repository"
for vcs_repository in vcs_repository_list:
url = vcs_repository['url']
buildout_section_id = vcs_repository.get('buildout_section_id', None)
repository_id = buildout_section_id or \
url.split('/')[-1].split('.')[0]
repository_path = os.path.join(config['working_directory'],repository_id)
vcs_repository['repository_id'] = repository_id
vcs_repository['repository_path'] = repository_path
if profile_content is None:
profile_content = """
[buildout]
extends = %(software_config_path)s
""" % {'software_config_path': os.path.join(repository_path,
config['profile_path'])}
if not(buildout_section_id is None):
profile_content += """
[%(buildout_section_id)s]
repository = %(repository_path)s
branch = %(branch)s
""" % {'buildout_section_id': buildout_section_id,
'repository_path' : repository_path,
'branch' : vcs_repository.get('branch','master')}
custom_profile = open(custom_profile_path, 'w')
custom_profile.write(profile_content)
custom_profile.close()
config['repository_path'] = repository_path
sys.path.append(repository_path)
test_suite_title = config['test_suite_title'] or config['test_suite']
retry_software = False
try:
while True:
# kill processes from previous loop if any
try:
for pgpid in process_group_pid_set:
try:
os.killpg(pgpid, signal.SIGTERM)
except:
pass
process_group_pid_set.clear()
full_revision_list = []
# Make sure we have local repository
for vcs_repository in vcs_repository_list:
repository_path = vcs_repository['repository_path']
repository_id = vcs_repository['repository_id']
if not os.path.exists(repository_path):
parameter_list = [config['git_binary'], 'clone',
vcs_repository['url']]
if vcs_repository.get('branch') is not None:
parameter_list.extend(['-b',vcs_repository.get('branch')])
parameter_list.append(repository_path)
subprocess.check_call(parameter_list)
# Make sure we have local repository
updater = Updater(repository_path, git_binary=config['git_binary'])
updater.checkout()
revision = "-".join(updater.getRevision())
full_revision_list.append('%s=%s' % (repository_id, revision))
revision = ','.join(full_revision_list)
if previous_revision == revision:
time.sleep(120)
if not(retry_software):
continue
retry_software = False
previous_revision = revision
print config
portal_url = config['test_suite_master_url']
test_result_path = None
test_result = (test_result_path, revision)
if portal_url:
if portal_url[-1] != '/':
portal_url += '/'
portal = xmlrpclib.ServerProxy("%s%s" %
(portal_url, 'portal_task_distribution'),
allow_none=1)
master = portal.portal_task_distribution
assert master.getProtocolRevision() == 1
test_result = safeRpcCall(master.createTestResult,
config['test_suite'], revision, [],
False, test_suite_title,
config['test_node_title'], config['project_title'])
print "testnode, test_result : %r" % (test_result,)
if test_result:
test_result_path, test_revision = test_result
if revision != test_revision:
for i, repository_revision in enumerate(test_revision.split(',')):
vcs_repository = vcs_repository_list[i]
repository_path = vcs_repository['repository_path']
# other testnodes on other boxes are already ready to test another
# revision
updater = Updater(repository_path, git_binary=config['git_binary'],
revision=repository_revision.split('-')[1])
updater.checkout()
# Now prepare the installation of SlapOS and create instance
slapos_controler = SlapOSControler(config,
process_group_pid_set=process_group_pid_set)
for method_name in ("runSoftwareRelease", "runComputerPartition"):
stdout, stderr = getInputOutputFileList(config, method_name)
slapos_method = getattr(slapos_controler, method_name)
status_dict = slapos_method(config,
environment=config['environment'],
process_group_pid_set=process_group_pid_set,
stdout=stdout, stderr=stderr
)
if status_dict['status_code'] != 0:
break
if status_dict['status_code'] != 0:
safeRpcCall(master.reportTaskFailure,
test_result_path, status_dict, config['test_node_title'])
retry_software = True
continue
partition_path = os.path.join(config['instance_root'],
config['partition_reference'])
run_test_suite_path = os.path.join(partition_path, 'bin',
'runTestSuite')
if not os.path.exists(run_test_suite_path):
raise ValueError('No %r provided' % run_test_suite_path)
run_test_suite_revision = revision
if isinstance(revision, tuple):
revision = ','.join(revision)
# Deal with Shebang size limitation
file_object = open(run_test_suite_path, 'r')
line = file_object.readline()
file_object.close()
invocation_list = []
if line[:2] == '#!':
invocation_list = line[2:].split()
invocation_list.extend([run_test_suite_path,
'--test_suite', config['test_suite'],
'--revision', revision,
'--test_suite_title', test_suite_title,
'--node_quantity', config['node_quantity'],
'--master_url', config['test_suite_master_url']])
run_test_suite = subprocess.Popen(invocation_list)
process_group_pid_set.add(run_test_suite.pid)
run_test_suite.wait()
process_group_pid_set.remove(run_test_suite.pid)
except SubprocessError:
time.sleep(120)
continue
finally:
# Nice way to kill *everything* generated by run process -- process
# groups working only in POSIX compilant systems
# Exceptions are swallowed during cleanup phase
print "going to kill %r" % (process_group_pid_set,)
for pgpid in process_group_pid_set:
try:
os.killpg(pgpid, signal.SIGTERM)
except:
pass
try:
if os.path.exists(supervisord_pid_file):
os.kill(int(open(supervisord_pid_file).read().strip()), signal.SIGTERM)
except:
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment