Commit bce547ed authored by Vincent Pelletier's avatar Vincent Pelletier

Add support for software instanciation.

Reuse erp5.util.taskdistribution for better TaskDistributionTool
integration (adds a dependency, removes 2 entry points): it is possible to
create several test agents with same configuration, each testing a subset
of the entire test suite. Especially useful if/when test agents gets the
extra task of monitoring commits on software releases, and they locally
build & instanciate the SR they test.
Use a medium-lived process rather than a short-lived one, to drop state
loading & saving complexity.
Allow a single test agent to run several concurrent tests.
Cleanup pidfile, even when exceptions occur.
Factorise code used to test each step.
Rework configuration file layout, to better use magic "[DEFAULT]" section.
Take keys & certs from configuration file, allowing for per-test certs.
Remove the need for storing using login & password by reusing x509 auth.
parent a479c4a5
...@@ -45,14 +45,13 @@ setup(name=name, ...@@ -45,14 +45,13 @@ setup(name=name,
extras_require = { extras_require = {
'lampconfigure': ["mysql-python"], #needed for MySQL Database access 'lampconfigure': ["mysql-python"], #needed for MySQL Database access
'zodbpack': ['ZODB3'], # needed to play with ZODB 'zodbpack': ['ZODB3'], # needed to play with ZODB
'agent': ['erp5.util'],
}, },
zip_safe=False, # proxy depends on Flask, which has issues with zip_safe=False, # proxy depends on Flask, which has issues with
# accessing templates # accessing templates
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'agent = slapos.agent.agent:main', 'agent = slapos.agent.agent:main [agent]',
'report_start = slapos.agent.report_start:main',
'report_stop = slapos.agent.report_stop:main',
'clouddestroy = slapos.cloudmgr.destroy:main', 'clouddestroy = slapos.cloudmgr.destroy:main',
'cloudgetprivatekey = slapos.cloudmgr.getprivatekey:main', 'cloudgetprivatekey = slapos.cloudmgr.getprivatekey:main',
'cloudgetpubliciplist = slapos.cloudmgr.getpubliciplist:main', 'cloudgetpubliciplist = slapos.cloudmgr.getpubliciplist:main',
......
import ConfigParser, argparse import ConfigParser
import argparse
import httplib
import json import json
from random import choice import logging
import os, socket, time import os
from datetime import datetime import random
from datetime import timedelta import sys
import tempfile
import traceback
import time
import xmlrpclib import xmlrpclib
from logging import getLogger, basicConfig import slapos.slap
from slapos.slap import slap
from slapos.grid.utils import setRunning, setFinished from slapos.grid.utils import setRunning, setFinished
from erp5.util.taskdistribution import TaskDistributionTool, RPCRetry
def safeRpcCall(proxy, function_id, *args): class AutoSTemp(object):
while True: """
try: Create a self-destructing temporary file.
function = getattr(proxy, function_id) Uses mkstemp.
return function(*args) """
except (socket.error, xmlrpclib.ProtocolError, xmlrpclib.Fault), e: __unlink = os.unlink
time.sleep(64)
def __init__(self, value):
def _encode_software_dict(software_dict): fd, self.__name = tempfile.mkstemp()
result = dict() os.write(fd, value)
for key, value in software_dict.items(): os.close(fd)
result[key] = datetime.strftime(value, "%Y-%m-%dT%H:%M:%S")
return result @property
def name(self):
def _decode_software_dict(software_dict): return self.__name
result = dict()
for key, value in software_dict.items(): def __del__(self):
result[key] = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S") self.__unlink(self.__name)
return result
# XXX: scripts should be merged in a single one
class Agent: GET_DESTROYING_METHOD_ID = \
def __init__(self, portal_url, master_url, 'Agent_getDestroyingSoftwareReleaseReferenceListOnComputer'
maximum_software_installation_duration, software_live_duration, GET_INSTALLED_METHOD_ID = \
computer_list, software_list, log_directory, state_file, software_uri, 'Agent_getInstalledSoftwareReleaseReferenceListOnComputer'
key_file=None, cert_file=None, GET_INSTALLING_METHOD_ID = \
): 'Agent_getInstallingSoftwareReleaseReferenceListOnComputer'
self.portal_url = portal_url SOFTWARE_STATE_UNKNOWN = -1
self.maximum_software_installation_duration = \ SOFTWARE_STATE_INSTALLING = 0
maximum_software_installation_duration SOFTWARE_STATE_INSTALLED = 1
self.software_live_duration = software_live_duration SOFTWARE_STATE_DESTROYING = 2
self.computer_list = computer_list
self.software_list = software_list GET_PARTITION_STATE_METHOD_ID = 'Agent_getComputerPartitionState'
self.software_uri = software_uri INSTANCE_STATE_UNKNOWN = -1
self.log_directory = log_directory INSTANCE_STATE_STARTING = 0
self.state_file = state_file INSTANCE_STATE_STARTED = 1
INSTANCE_STATE_STOPPING = 2
filename = os.path.join(self.log_directory, "agent-%s.log" % datetime.strftime(datetime.now(), "%Y%m%d")) INSTANCE_STATE_STOPPED = 3
basicConfig(filename=filename, format="%(asctime)-15s %(message)s", level="INFO") INSTANCE_STATE_DESTROYING = 4
self.logger = getLogger()
INSTANCE_STATE_DICT = {
self.slap = slap = slap() 'Looking for a free partition': INSTANCE_STATE_UNKNOWN,
slap.initializeConnection(master_url, key_file, cert_file) 'Started': INSTANCE_STATE_STARTED,
self.supply = slap.registerSupply() 'Start in progress': INSTANCE_STATE_STARTING,
'Stopped': INSTANCE_STATE_STOPPED,
state = ConfigParser.SafeConfigParser() 'Stop in progress': INSTANCE_STATE_STOPPING,
state.readfp(open(self.state_file)) 'Destruction in progress': INSTANCE_STATE_DESTROYING,
self.installing_software_dict = dict() 'Destroyed': INSTANCE_STATE_UNKNOWN,
self.installed_software_dict = dict() }
for computer in self.computer_list:
if state.has_section(computer): TESTER_STATE_INITIAL = -1
self.installing_software_dict[computer] = \ TESTER_STATE_NOTHING = 0
_decode_software_dict(json.loads(state.get(computer, "installing_software", "{}"))) TESTER_STATE_SOFTWARE_INSTALLED = 1
self.installed_software_dict[computer] = \ TESTER_STATE_INSTANCE_INSTALLED = 2
_decode_software_dict(json.loads(state.get(computer, "installed_software", "{}"))) TESTER_STATE_INSTANCE_STARTED = 4
else: TESTER_STATE_INSTANCE_UNISTALLED = 5
self.installing_software_dict[computer] = dict()
self.installed_software_dict[computer] = dict() class x509Transport(xmlrpclib.Transport):
"""
def getDestroyingSoftwareReleaseListOnComputer(self, computer): Similar to xmlrpclib.SecureTransport, but with actually usable x509
portal = xmlrpclib.ServerProxy(self.portal_url, allow_none=1) support.
return safeRpcCall(portal, "Agent_getDestroyingSoftwareReleaseReferenceListOnComputer", computer, self.software_list) """
def __init__(self, x509, *args, **kw):
def getInstalledSoftwareReleaseListOnComputer(self, computer): xmlrpclib.Transport.__init__(self, *args, **kw)
portal = xmlrpclib.ServerProxy(self.portal_url, allow_none=1) self.__x509 = x509
return safeRpcCall(portal, "Agent_getInstalledSoftwareReleaseReferenceListOnComputer", computer, self.software_list)
def make_connection(self, host):
def getInstallingSoftwareReleaseListOnComputer(self, computer): if not self._connection or host != self._connection[0]:
portal = xmlrpclib.ServerProxy(self.portal_url, allow_none=1) try:
return safeRpcCall(portal, "Agent_getInstallingSoftwareReleaseReferenceListOnComputer", computer, self.software_list) HTTPSConnection = httplib.HTTPSConnection
except AttributeError:
def getSoftwareReleaseUsageOnComputer(self, computer, software): raise NotImplementedError("your version of httplib doesn't "
portal = xmlrpclib.ServerProxy(self.portal_url, allow_none=1) "support HTTPS")
return safeRpcCall(portal, "Agent_getSoftwareReleaseUsageOnComputer", computer, software) else:
chost, self._extra_headers, x509 = self.get_host_info((host,
def requestSoftwareReleaseCleanupOnComputer(self, computer, software): self.__x509))
try: self._connection = (host, HTTPSConnection(chost, None, **x509))
self.supply.supply(self.software_uri[software], computer, "destroyed") return self._connection[1]
self.logger.info("Successfully requested to cleanup %s on %s." % (software, computer))
return True class TestTimeout(Exception):
except: pass
self.logger.info("Failed to request to cleanup %s on %s." % (software, computer))
return False class SoftwareReleaseTester(RPCRetry):
deadline = None
def requestSoftwareReleaseInstallationOnComputer(self, computer, software): latest_state = None
try:
self.supply.supply(self.software_uri[software], computer, "available") def __init__(self,
self.logger.info("Successfully requested to install %s on %s." % (software, computer)) name,
return True logger,
except: master,
self.logger.info("Failed to request to install %s on %s." % (software, computer)) slap_supply, # slapos supply to manage software release
return False slap_order, # slapos open order to manage instance
url, # software release url
def requestSoftwareInstanceStartedOnComputer(self, reference, computer, software): computer_guid, # computer to use for this test run
try: max_install_duration,
self.slap.registerOpenOrder().request( max_uninstall_duration,
software_release=software, request_kw=None, # instance parameters, if instanciation
partition_reference=reference, # testing is desired
filter_kw={'computer_guid': computer}, max_request_duration=None,
state='started' max_destroy_duration=None,
) ):
self.logger.info("Successfully requested to start a instance of %s on %s.", (software, computer)) super(SoftwareReleaseTester, self).__init__(master, 16, logger)
return True self.name = name
except: self.slap_supply = slap_supply
self.logger.info("Failed to request to start a instance of %s on %s.", (software, computer)) self.slap_order = slap_order
return False self.url = url
self.computer_guid = computer_guid
def requestSoftwareInstanceStoppedOnComputer(self, reference, computer, software): self.request_kw = request_kw
try: self.state = TESTER_STATE_INITIAL
self.slap.registerOpenOrder().request( self.transition_dict = {
software_release=software, TESTER_STATE_INITIAL: (
partition_reference=reference, None,
filter_kw={'computer_guid': computer}, None,
state='stopped' TESTER_STATE_NOTHING,
) None,
self.logger.info("Successfully requested to stop a instance of %s on %s.", (software, computer)) None,
return True ),
except: TESTER_STATE_NOTHING: (
self.logger.info("Failed to request to stop a instance of %s on %s.", (software, computer)) 'install',
return False max_install_duration,
request_kw is None and TESTER_STATE_INSTANCE_UNISTALLED or \
def requestSoftwareInstanceDestroyedOnComputer(self, reference, computer, software): TESTER_STATE_SOFTWARE_INSTALLED,
SOFTWARE_STATE_INSTALLED,
None,
),
TESTER_STATE_SOFTWARE_INSTALLED: (
'request',
max_request_duration,
TESTER_STATE_INSTANCE_STARTED,
None,
INSTANCE_STATE_STARTED,
),
TESTER_STATE_INSTANCE_STARTED: (
'destroy',
max_destroy_duration,
TESTER_STATE_INSTANCE_UNISTALLED,
None,
INSTANCE_STATE_UNKNOWN,
),
TESTER_STATE_INSTANCE_UNISTALLED: (
'uninstall',
max_uninstall_duration,
None,
SOFTWARE_STATE_UNKNOWN,
None,
),
}
def __repr__(self):
deadline = self.deadline
if deadline is not None:
deadline -= time.time()
deadline = '+%is' % (deadline, )
return '<%s(state=%s, deadline=%s) at %x>' % (
self.__class__.__name__, self.state, deadline, id(self))
def _supply(self, state):
self._logger.info('Supply %s@%s: %s', self.url, self.computer_guid,
state)
return self.slap_supply.supply(self.url, self.computer_guid, state)
def _request(self, state):
self._logger.info('Request %s@%s: %s', self.url, self.name, state)
self.latest_state = state
return self.slap_order.request(
software_release=self.url,
partition_reference=self.name,
state=state,
**self.request_kw
)
def _getSoftwareState(self):
# TODO: replace with simpler slap-based API
# TODO: merge all 3 entrypoints into a single, to reduce server load.
for state, method_id in (
(SOFTWARE_STATE_DESTROYING, GET_DESTROYING_METHOD_ID),
(SOFTWARE_STATE_INSTALLED, GET_INSTALLED_METHOD_ID),
(SOFTWARE_STATE_INSTALLING, GET_INSTALLING_METHOD_ID),
):
if self.url in self._retryRPC(method_id, (self.computer_guid,
[self.url])):
return state
def _getInstanceState(self):
# TODO: replace with simpler slap-based API
latest_state = self.latest_state
self._logger.debug('latest_state = %r', latest_state)
if latest_state is None:
return INSTANCE_STATE_UNKNOWN
try:
requested = self._request(latest_state)
except slapos.slap.ServerError:
self._logger.exception('Got an error requesting partition for '
'its state')
return INSTANCE_STATE_UNKNOWN
part_id = requested.getId()
self._logger.debug('part_id = %r', part_id)
if not part_id:
# Master did not allocate a partition yet.
return INSTANCE_STATE_UNKNOWN
return INSTANCE_STATE_DICT[self._retryRPC(
GET_PARTITION_STATE_METHOD_ID,
(self.computer_guid, part_id)
)]
def install(self):
"""
Make software available on computer.
"""
self._supply('available')
def uninstall(self):
"""
Make software unavailable on computer.
"""
self._supply('destroyed')
def start(self):
"""
Request started instance (or starting existing one)
"""
self._request('started')
request = start
def stop(self):
"""
Request stopped instance (or stopping existing one).
"""
self._request('stopped')
def destroy(self):
"""
Destroy existing instance.
"""
self._request('destroyed')
def teardown(self):
"""
Interrupt a running test sequence, putting it in idle state.
"""
if self.request_kw is not None:
self.destroy()
self.uninstall()
self.state = TESTER_STATE_INSTANCE_UNISTALLED
def tic(self, now):
"""
Check for missed deadlines (-> test failure), conditions for moving to
next state, and actually moving to next state (executing its payload).
"""
deadline = self.deadline
if deadline < now and deadline is not None:
raise TestTimeout(self.state)
_, _, state, software_state, instance_state = self.transition_dict[
self.state]
if (software_state is None or
software_state == self._getSoftwareState()) and (
instance_state is None or
instance_state == self._getInstanceState()):
if state is None:
return None
self._logger.debug('Going to state %i (%r, %r)', state,
software_state, instance_state)
self.state = state
step, delay, _, _, _ = self.transition_dict[state]
self.deadline = now + delay
getattr(self, step)()
return self.deadline
def main():
"""
Note: This code does not test as much as it monitors.
The goal is to regularily try to build & instanciate a software release
on several machines, to monitor vifib stability and SR stability as time
passes (and things once available online become unavailable).
Part of this function could be reused to make an actual test bot, testing
only when actual changes are committed to a software release, to look for
regressions.
Note: This code does not connect to any instanciated service, it relies on
the presence of a promise section to make instanciation fail until promise
is happy.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--pidfile', '-p', help='pidfile preventing parallel '
'execution.')
parser.add_argument('--log', '-l', help='Log file path.')
parser.add_argument('--verbose', '-v', help='Be verbose.',
action='store_true')
parser.add_argument('configuration_file', type=argparse.FileType(),
help='Slap Test Agent configuration file.')
# Just to keep strong references to AutoSTemp instances
key_file_dict = {}
def asFilenamePair(key, cert):
# Note: python's ssl support only supports fetching key & cert data
# from on-disk files. This is why we need to "convert" direct data
# into file paths, using temporary files.
cert = cert.strip()
try:
temp_key, temp_cert = key_file_dict[cert]
except KeyError:
temp_key = AutoSTemp(key.strip())
temp_cert = AutoSTemp(cert)
key_file_dict[cert] = (temp_key, temp_cert)
return temp_key.name, temp_cert.name
args = parser.parse_args()
pidfile = args.pidfile
if pidfile:
setRunning(pidfile)
try: try:
self.slap.registerOpenOrder().request( log = args.log
software_release=software, formatter = logging.Formatter('%(asctime)s %(message)s')
partition_reference=reference, logger = logging.getLogger()
filter_kw={'computer_guid': computer}, if args.verbose:
state='destroyed' log_level = logging.DEBUG
) else:
self.logger.info("Successfully requested to destroy a instance of %s on %s.", (software, computer)) log_level = logging.INFO
return True logger.setLevel(log_level)
except: handler = logging.StreamHandler(sys.stdout)
self.logger.info("Failed to request to destroy a instance of %s on %s.", (software, computer)) handler.setFormatter(formatter)
return False logger.addHandler(handler)
if log:
def writeState(self): handler = logging.FileHandler(log)
state = ConfigParser.SafeConfigParser() handler.setFormatter(formatter)
for computer in self.computer_list: logger.addHandler(handler)
state.add_section(computer) log_file = open(log)
state.set(computer, "installing_software", \ log_file.seek(0, 2)
json.dumps(_encode_software_dict(self.installing_software_dict[computer]))) section_dict = {}
state.set(computer, "installed_software", \ configuration = ConfigParser.SafeConfigParser()
json.dumps(_encode_software_dict(self.installed_software_dict[computer]))) configuration.readfp(args.configuration_file)
state.write(open(self.state_file, "w")) for section in configuration.sections():
if section == 'agent':
def main(*args): continue
parser = argparse.ArgumentParser() section_dict[section] = section_entry_dict = dict(
parser.add_argument("--pidfile", help="The location where pidfile will be created.") configuration.items(section))
parser.add_argument("--log_directory", help="") for key in ('request_kw', 'max_install_duration',
parser.add_argument("--state_file", help="") 'max_destroy_duration', 'max_request_duration',
parser.add_argument("--path_file", help="") 'max_uninstall_duration', 'computer_list',
parser.add_argument("--key_file", help="Key file path for https connection to master.") ):
parser.add_argument("--cert_file", help="Certificate file path for https " if key in section_entry_dict:
"connection to master") section_entry_dict[key] = json.loads(
parser.add_argument("configuration_file", nargs=1, type=argparse.FileType(), section_entry_dict[key])
help="Slap Test Agent configuration file.") if 'key' in section_entry_dict:
if args: key_file, cert_file = asFilenamePair(section_entry_dict['key'],
argument_option_instance = parser.parse_args(list(args)) section_entry_dict['cert'])
else: section_entry_dict['key'] = key_file
argument_option_instance = parser.parse_args() section_entry_dict['cert'] = cert_file
configuration = ConfigParser.SafeConfigParser() agent_parameter_dict = dict(configuration.items('agent'))
configuration.readfp(argument_option_instance.pop("configuration_file")[0]) # XXX: should node title be auto-generated by installation recipe ?
configuration_dict = dict(configuration.items("agent")) # For example, using computer guid.
configuration_dict.update(argument_option_instance.__dict__) node_title = agent_parameter_dict['node_title']
pidfile = configuration_dict.get("pidfile") test_title = agent_parameter_dict['test_title']
if pidfile: project_title = agent_parameter_dict['project_title']
setRunning(pidfile) parallel_task_count = int(agent_parameter_dict.get('task_count', 1))
agent = Agent( task_distribution_tool = TaskDistributionTool(agent_parameter_dict[
portal_url=configuration_dict["portal_url"], 'report_url'])
master_url=configuration_dict["master_url"], master_slap_connection_dict = {}
maximum_software_installation_duration=timedelta(minutes=float( test_result = task_distribution_tool.createTestResult(
configuration_dict["maximum_software_installation_duration"])), revision='',
software_live_duration=timedelta(minutes=float(configuration_dict[ test_name_list=section_dict.keys(),
"software_live_duration"])), node_title=node_title,
computer_list=json.loads(configuration_dict["computer_list"]), allow_restart=True,
software_list=json.loads(configuration_dict["software_list"]), test_title=test_title,
log_directory=configuration_dict["log_directory"], project_title=project_title,
state_file=configuration_dict["state_file"], )
software_uri=dict(configuration.items("software_uri")), test_result.watcher_period = 60
key_file=configuration_dict.get("key_file", None), if log:
cert_file=configuration_dict.get("cert_file", None), test_result.addWatch(log, log_file, max_history_bytes=10000)
) assert test_result is not None
now = datetime.now() ran_test_set = set()
for computer in agent.computer_list: running_test_dict = {}
installing_software_list = agent.getInstallingSoftwareReleaseListOnComputer(computer) more_tests = True
installed_software_list = agent.getInstalledSoftwareReleaseListOnComputer(computer) while True:
destroying_software_list = agent.getDestroyingSoftwareReleaseListOnComputer(computer) # Get up to parallel_task_count tasks to execute
if len(installing_software_list) == 0: while len(running_test_dict) < parallel_task_count and \
software = choice(agent.software_list) more_tests:
if software in installed_software_list or software in destroying_software_list: test_line = test_result.start(
pass exclude_list=list(ran_test_set))
else: if test_line is None:
if agent.requestSoftwareReleaseInstallationOnComputer(computer, software): more_tests = False
agent.installing_software_dict[computer][software] = datetime.now() break
else: test_name = test_line.name
for installing_software in installing_software_list: try:
if installing_software in agent.installing_software_dict[computer]: section_entry_dict = section_dict[test_name]
start_time = agent.installing_software_dict[computer][installing_software] except KeyError:
if now - start_time > agent.maximum_software_installation_duration: # We don't know how to execute this test. Assume it doesn't
agent.logger.info("Failed to install %s on %s in %s." % \ # exist anymore, and fail it in result.
(installing_software, computer, agent.maximum_software_installation_duration)) test_line.stop(stderr='This test does not exist on test '
if agent.requestSoftwareReleaseCleanupOnComputer(computer, installing_software): 'node %s' % (node_title, ))
del agent.installing_software_dict[computer][installing_software] continue
for installed_software in installed_software_list: master_url = section_entry_dict['master_url']
if installed_software in agent.installing_software_dict[computer]: master_slap_connection_key = (master_url,
agent.logger.info("Successfully installed %s on %s." % (installed_software, computer)) section_entry_dict.get('key'))
del agent.installing_software_dict[computer][installed_software] try:
agent.installed_software_dict[computer][installed_software] = now supply, order, rpc = master_slap_connection_dict[
elif installed_software in agent.installed_software_dict[computer] and \ master_slap_connection_key]
agent.getSoftwareReleaseUsageOnComputer(computer, installed_software) == 0 and \ except KeyError:
now - agent.installed_software_dict[computer][installed_software] > agent.software_live_duration: key = section_entry_dict.get('key')
if agent.requestSoftwareReleaseCleanupOnComputer(computer, installed_software): cert = section_entry_dict.get('cert')
del agent.installed_software_dict[computer][installed_software] slap = slapos.slap.slap()
for installed_software in installed_software_list: slap.initializeConnection(master_url, key, cert)
if agent.getSoftwareReleaseUsageOnComputer(computer, installed_software) == 0: supply = slap.registerSupply()
for i in range(3): order = slap.registerOpenOrder()
reference = "%s_%s_%s" % (installed_software, str(now), str(i)) assert master_url.startswith('https:')
if agent.requestSoftwareInstanceStartedOnComputer(reference, computer, software): rpc = xmlrpclib.ServerProxy(master_url, allow_none=True,
agent.logger.info("Successfully requested stated a instance of %s on %s", (software, computer)) transport=x509Transport(
else: {'key_file': key, 'cert_file': cert}))
# agent.requestSoftwareInstanceDestroyedOnComputer() master_slap_connection_dict[
master_slap_connection_key] = (supply, order, rpc)
agent.writeState() tester = SoftwareReleaseTester(
test_name + '_' + node_title + time.strftime(
if pidfile: '_%Y/%m/%d_%H:%M:%S_+0000', time.gmtime()),
setFinished(pidfile) logger,
rpc,
if __name__ == "__main__": supply,
main() order,
section_entry_dict['url'],
random.choice(section_entry_dict['computer_list']),
section_entry_dict['max_install_duration'],
section_entry_dict['max_uninstall_duration'],
section_entry_dict.get('request_kw'),
section_entry_dict.get('max_request_duration'),
section_entry_dict.get('max_destroy_duration'),
)
ran_test_set.add(test_name)
running_test_dict[test_name] = (test_line, tester)
if not running_test_dict:
break
now = time.time()
# Synchronise refreshes on watcher period, so it doesn't report a
# stalled test node where we are actually still sleeping.
# Change test_result.watcher_period outside this loop if you wish
# to change sleep duration.
next_deadline = now + test_result.watcher_period
for section, (test_line, tester) in running_test_dict.items():
logger.info('Checking %s: %r...', section, tester)
try:
deadline = tester.tic(now)
except Exception:
logger.exception('test failed')
test_line.stop(
test_count=1,
error_count=1,
failure_count=0,
skip_count=0,
stderr=traceback.format_exc(),
)
del running_test_dict[section]
try:
tester.teardown()
except Exception:
logger.exception('teardown failed, human '
'assistance needed for cleanup')
raise
else:
logger.info('%r', tester)
if deadline is None:
# TODO: report how long each step took.
logger.info('Finished !')
test_line.stop(
test_count=1,
error_count=0,
failure_count=0,
skip_count=0,
)
del running_test_dict[section]
else:
next_deadline = min(deadline, next_deadline)
if running_test_dict:
to_sleep = next_deadline - time.time()
if to_sleep > 0:
logger.info('Sleeping %is...', to_sleep)
time.sleep(to_sleep)
if not test_result.isAlive():
for _, tester in running_test_dict.itervalues():
tester.teardown()
finally:
if pidfile:
setFinished(pidfile)
# Help interpreter get rid of AutoSTemp instances.
key_file_dict.clear()
if __name__ == '__main__':
main()
import ConfigParser, argparse, pprint, socket, sys, time, xmlrpclib, json
def safeRpcCall(function, *args):
retry = 64
xmlrpc_arg_list = []
for argument in args:
if isinstance(argument, dict):
argument = dict([(x, isinstance(y,str) and xmlrpclib.Binary(y) or y) \
for (x,y) in argument.iteritems()])
xmlrpc_arg_list.append(argument)
while True:
try:
return function(*xmlrpc_arg_list)
except (socket.error, xmlrpclib.ProtocolError), e:
print >>sys.stderr, e
pprint.pprint(args, file(function._Method__name, 'w'))
time.sleep(retry)
retry += retry >> 1
def main(*args):
parser = argparse.ArgumentParser()
parser.add_argument("configuration_file", nargs=1, type=argparse.FileType(),
help="Slap Test Agent configuration file.")
if args == ():
argument_option_instance = parser.parse_args()
else:
argument_option_instance = \
parser.parse_args(list(args))
configuration_file = argument_option_instance.configuration_file[0]
configuration = ConfigParser.SafeConfigParser()
configuration.readfp(configuration_file)
master_url = configuration.get("agent", "report_url")
if master_url[-1] != '/':
master_url += '/'
master = xmlrpclib.ServerProxy("%s%s" %
(master_url, 'portal_task_distribution'),
allow_none=1)
assert master.getProtocolRevision() == 1
software_list = json.loads(configuration.get("agent", "software_list"))
test_result = safeRpcCall(master.createTestResult,
"SlapOS Test", "", software_list,
True, "SlapOS Test", "SlapOS Test Agent", "ViFiB Project")
test_result_path, revision = test_result
state = ConfigParser.SafeConfigParser()
state.add_section("path")
exclude_list = software_list[:]
for software in software_list:
exclude_list.remove(software)
test_path, test_name = safeRpcCall(master.startUnitTest,
test_result_path, exclude_list)
state.set("path", test_name, test_path)
exclude_list.append(software)
path_file = configuration.get("agent", "path_file")
state.write(open(path_file, "w"))
if __name__ == "__main__":
main()
import ConfigParser, argparse, pprint, socket, sys, time, xmlrpclib, json, os
from datetime import datetime
def safeRpcCall(function, *args):
retry = 64
xmlrpc_arg_list = []
for argument in args:
if isinstance(argument, dict):
argument = dict([(x, isinstance(y,str) and xmlrpclib.Binary(y) or y) \
for (x,y) in argument.iteritems()])
xmlrpc_arg_list.append(argument)
while True:
try:
return function(*xmlrpc_arg_list)
except (socket.error, xmlrpclib.ProtocolError), e:
print >>sys.stderr, e
pprint.pprint(args, file(function._Method__name, 'w'))
time.sleep(retry)
retry += retry >> 1
def main(*args):
parser = argparse.ArgumentParser()
parser.add_argument("configuration_file", nargs=1, type=argparse.FileType(),
help="Slap Test Agent configuration file.")
if args == ():
argument_option_instance = parser.parse_args()
else:
argument_option_instance = \
parser.parse_args(list(args))
configuration_file = argument_option_instance.configuration_file[0]
configuration = ConfigParser.SafeConfigParser()
configuration.readfp(configuration_file)
master_url = configuration.get("agent", "report_url")
software_list = json.loads(configuration.get("agent", "software_list"))
if master_url[-1] != '/':
master_url += '/'
master = xmlrpclib.ServerProxy("%s%s" %
(master_url, 'portal_task_distribution'),
allow_none=1)
assert master.getProtocolRevision() == 1
log_directory = configuration.get("agent", "log_directory")
logfile_path = os.path.join(log_directory, "agent-%s.log" % datetime.strftime(datetime.now(), "%Y%m%d"))
logfile = open(logfile_path, 'r')
logline_list = logfile.readlines()
path_file = configuration.get("agent", "path_file")
state = ConfigParser.SafeConfigParser()
state.readfp(open(path_file))
for software in software_list:
test_path = state.get("path", software)
success, failure = 0, 0
log = ''
for logline in logline_list:
success_pattern = "Successfully installed %s" % software
failure_pattern = "Failed to install %s" % software
if success_pattern in logline:
success = success + 1
log = log + logline
if failure_pattern in logline:
failure = failure + 1
log = log + logline
safeRpcCall(master.stopUnitTest, test_path,
{
"status_code": 0,
"stderr": log,
"duration": 0,
"test_count": success + failure,
"failure_count": failure
})
if __name__ == "__main__":
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment