Commit 63f2e32c authored by Cédric de Saint Martin's avatar Cédric de Saint Martin

Merge branch 'agent'

parents daa69dc0 bce547ed
...@@ -45,14 +45,13 @@ setup(name=name, ...@@ -45,14 +45,13 @@ setup(name=name,
extras_require = { extras_require = {
'lampconfigure': ["mysql-python"], #needed for MySQL Database access 'lampconfigure': ["mysql-python"], #needed for MySQL Database access
'zodbpack': ['ZODB3'], # needed to play with ZODB 'zodbpack': ['ZODB3'], # needed to play with ZODB
'agent': ['erp5.util'],
}, },
zip_safe=False, # proxy depends on Flask, which has issues with zip_safe=False, # proxy depends on Flask, which has issues with
# accessing templates # accessing templates
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'agent = slapos.agent.agent:main', 'agent = slapos.agent.agent:main [agent]',
'report_start = slapos.agent.report_start:main',
'report_stop = slapos.agent.report_stop:main',
'clouddestroy = slapos.cloudmgr.destroy:main', 'clouddestroy = slapos.cloudmgr.destroy:main',
'cloudgetprivatekey = slapos.cloudmgr.getprivatekey:main', 'cloudgetprivatekey = slapos.cloudmgr.getprivatekey:main',
'cloudgetpubliciplist = slapos.cloudmgr.getpubliciplist:main', 'cloudgetpubliciplist = slapos.cloudmgr.getpubliciplist:main',
......
import ConfigParser, argparse import ConfigParser
import argparse
import httplib
import json import json
from random import random, choice import logging
import os, socket, time import os
from datetime import datetime import random
from datetime import timedelta import sys
import tempfile
import traceback
import time
import xmlrpclib import xmlrpclib
from logging import getLogger, basicConfig import slapos.slap
from slapos.slap import slap, Supply
from slapos.grid.utils import setRunning, setFinished from slapos.grid.utils import setRunning, setFinished
from erp5.util.taskdistribution import TaskDistributionTool, RPCRetry
def safeRpcCall(proxy, function_id, *args): class AutoSTemp(object):
while True: """
try: Create a self-destructing temporary file.
function = getattr(proxy, function_id) Uses mkstemp.
return function(*args) """
except (socket.error, xmlrpclib.ProtocolError, xmlrpclib.Fault), e: __unlink = os.unlink
time.sleep(64)
def __init__(self, value):
def _encode_software_dict(software_dict): fd, self.__name = tempfile.mkstemp()
result = dict() os.write(fd, value)
for key, value in software_dict.items(): os.close(fd)
result[key] = datetime.strftime(value, "%Y-%m-%dT%H:%M:%S")
return result @property
def name(self):
def _decode_software_dict(software_dict): return self.__name
result = dict()
for key, value in software_dict.items(): def __del__(self):
result[key] = datetime.strptime(value, "%Y-%m-%dT%H:%M:%S") self.__unlink(self.__name)
return result
# XXX: scripts should be merged in a single one
class Agent: GET_DESTROYING_METHOD_ID = \
def __init__(self, configuration_file): 'Agent_getDestroyingSoftwareReleaseReferenceListOnComputer'
configuration = ConfigParser.SafeConfigParser() GET_INSTALLED_METHOD_ID = \
configuration.readfp(configuration_file) 'Agent_getInstalledSoftwareReleaseReferenceListOnComputer'
self.portal_url = configuration.get("agent", "portal_url") GET_INSTALLING_METHOD_ID = \
self.master_url = configuration.get("agent", "master_url") 'Agent_getInstallingSoftwareReleaseReferenceListOnComputer'
self.key_file = configuration.get("agent", "key_file") SOFTWARE_STATE_UNKNOWN = -1
self.cert_file = configuration.get("agent", "cert_file") SOFTWARE_STATE_INSTALLING = 0
self.maximum_software_installation_duration = \ SOFTWARE_STATE_INSTALLED = 1
timedelta(minutes=configuration.getfloat("agent", "maximum_software_installation_duration")) SOFTWARE_STATE_DESTROYING = 2
self.software_live_duration = \
timedelta(minutes=configuration.getfloat("agent", "software_live_duration")) GET_PARTITION_STATE_METHOD_ID = 'Agent_getComputerPartitionState'
self.computer_list = json.loads(configuration.get("agent", "computer_list")) INSTANCE_STATE_UNKNOWN = -1
self.software_list = json.loads(configuration.get("agent", "software_list")) INSTANCE_STATE_STARTING = 0
self.software_uri = dict() INSTANCE_STATE_STARTED = 1
for (software, uri) in configuration.items("software_uri"): INSTANCE_STATE_STOPPING = 2
self.software_uri[software] = uri INSTANCE_STATE_STOPPED = 3
self.log_directory = configuration.get("agent", "log_directory") INSTANCE_STATE_DESTROYING = 4
self.state_file = configuration.get("agent", "state_file")
INSTANCE_STATE_DICT = {
filename = os.path.join(self.log_directory, "agent-%s.log" % datetime.strftime(datetime.now(), "%Y%m%d")) 'Looking for a free partition': INSTANCE_STATE_UNKNOWN,
basicConfig(filename=filename, format="%(asctime)-15s %(message)s", level="INFO") 'Started': INSTANCE_STATE_STARTED,
self.logger = getLogger() 'Start in progress': INSTANCE_STATE_STARTING,
'Stopped': INSTANCE_STATE_STOPPED,
self.slap = slap() 'Stop in progress': INSTANCE_STATE_STOPPING,
self.slap.initializeConnection(self.master_url, self.key_file, self.cert_file) 'Destruction in progress': INSTANCE_STATE_DESTROYING,
self.supply = Supply() 'Destroyed': INSTANCE_STATE_UNKNOWN,
}
state = ConfigParser.SafeConfigParser()
state.readfp(open(self.state_file)) TESTER_STATE_INITIAL = -1
self.installing_software_dict = dict() TESTER_STATE_NOTHING = 0
self.installed_software_dict = dict() TESTER_STATE_SOFTWARE_INSTALLED = 1
for computer in self.computer_list: TESTER_STATE_INSTANCE_INSTALLED = 2
if state.has_section(computer): TESTER_STATE_INSTANCE_STARTED = 4
self.installing_software_dict[computer] = \ TESTER_STATE_INSTANCE_UNISTALLED = 5
_decode_software_dict(json.loads(state.get(computer, "installing_software", "{}")))
self.installed_software_dict[computer] = \ class x509Transport(xmlrpclib.Transport):
_decode_software_dict(json.loads(state.get(computer, "installed_software", "{}"))) """
else: Similar to xmlrpclib.SecureTransport, but with actually usable x509
self.installing_software_dict[computer] = dict() support.
self.installed_software_dict[computer] = dict() """
def __init__(self, x509, *args, **kw):
def getDestroyingSoftwareReleaseListOnComputer(self, computer): xmlrpclib.Transport.__init__(self, *args, **kw)
portal = xmlrpclib.ServerProxy(self.portal_url, allow_none=1) self.__x509 = x509
return safeRpcCall(portal, "Agent_getDestroyingSoftwareReleaseReferenceListOnComputer", computer, self.software_list)
def make_connection(self, host):
def getInstalledSoftwareReleaseListOnComputer(self, computer): if not self._connection or host != self._connection[0]:
portal = xmlrpclib.ServerProxy(self.portal_url, allow_none=1) try:
return safeRpcCall(portal, "Agent_getInstalledSoftwareReleaseReferenceListOnComputer", computer, self.software_list) HTTPSConnection = httplib.HTTPSConnection
except AttributeError:
def getInstallingSoftwareReleaseListOnComputer(self, computer): raise NotImplementedError("your version of httplib doesn't "
portal = xmlrpclib.ServerProxy(self.portal_url, allow_none=1) "support HTTPS")
return safeRpcCall(portal, "Agent_getInstallingSoftwareReleaseReferenceListOnComputer", computer, self.software_list) else:
chost, self._extra_headers, x509 = self.get_host_info((host,
def getSoftwareReleaseUsageOnComputer(self, computer, software): self.__x509))
portal = xmlrpclib.ServerProxy(self.portal_url, allow_none=1) self._connection = (host, HTTPSConnection(chost, None, **x509))
return safeRpcCall(portal, "Agent_getSoftwareReleaseUsageOnComputer", computer, software) return self._connection[1]
def requestSoftwareReleaseCleanupOnComputer(self, computer, software): class TestTimeout(Exception):
try: pass
self.supply.supply(self.software_uri[software], computer, "destroyed")
self.logger.info("Successfully requested to cleanup %s on %s." % (software, computer)) class SoftwareReleaseTester(RPCRetry):
return True deadline = None
except: latest_state = None
self.logger.info("Failed to request to cleanup %s on %s." % (software, computer))
return False def __init__(self,
name,
def requestSoftwareReleaseInstallationOnComputer(self, computer, software): logger,
master,
slap_supply, # slapos supply to manage software release
slap_order, # slapos open order to manage instance
url, # software release url
computer_guid, # computer to use for this test run
max_install_duration,
max_uninstall_duration,
request_kw=None, # instance parameters, if instanciation
# testing is desired
max_request_duration=None,
max_destroy_duration=None,
):
super(SoftwareReleaseTester, self).__init__(master, 16, logger)
self.name = name
self.slap_supply = slap_supply
self.slap_order = slap_order
self.url = url
self.computer_guid = computer_guid
self.request_kw = request_kw
self.state = TESTER_STATE_INITIAL
self.transition_dict = {
TESTER_STATE_INITIAL: (
None,
None,
TESTER_STATE_NOTHING,
None,
None,
),
TESTER_STATE_NOTHING: (
'install',
max_install_duration,
request_kw is None and TESTER_STATE_INSTANCE_UNISTALLED or \
TESTER_STATE_SOFTWARE_INSTALLED,
SOFTWARE_STATE_INSTALLED,
None,
),
TESTER_STATE_SOFTWARE_INSTALLED: (
'request',
max_request_duration,
TESTER_STATE_INSTANCE_STARTED,
None,
INSTANCE_STATE_STARTED,
),
TESTER_STATE_INSTANCE_STARTED: (
'destroy',
max_destroy_duration,
TESTER_STATE_INSTANCE_UNISTALLED,
None,
INSTANCE_STATE_UNKNOWN,
),
TESTER_STATE_INSTANCE_UNISTALLED: (
'uninstall',
max_uninstall_duration,
None,
SOFTWARE_STATE_UNKNOWN,
None,
),
}
def __repr__(self):
deadline = self.deadline
if deadline is not None:
deadline -= time.time()
deadline = '+%is' % (deadline, )
return '<%s(state=%s, deadline=%s) at %x>' % (
self.__class__.__name__, self.state, deadline, id(self))
def _supply(self, state):
self._logger.info('Supply %s@%s: %s', self.url, self.computer_guid,
state)
return self.slap_supply.supply(self.url, self.computer_guid, state)
def _request(self, state):
self._logger.info('Request %s@%s: %s', self.url, self.name, state)
self.latest_state = state
return self.slap_order.request(
software_release=self.url,
partition_reference=self.name,
state=state,
**self.request_kw
)
def _getSoftwareState(self):
# TODO: replace with simpler slap-based API
# TODO: merge all 3 entrypoints into a single, to reduce server load.
for state, method_id in (
(SOFTWARE_STATE_DESTROYING, GET_DESTROYING_METHOD_ID),
(SOFTWARE_STATE_INSTALLED, GET_INSTALLED_METHOD_ID),
(SOFTWARE_STATE_INSTALLING, GET_INSTALLING_METHOD_ID),
):
if self.url in self._retryRPC(method_id, (self.computer_guid,
[self.url])):
return state
def _getInstanceState(self):
# TODO: replace with simpler slap-based API
latest_state = self.latest_state
self._logger.debug('latest_state = %r', latest_state)
if latest_state is None:
return INSTANCE_STATE_UNKNOWN
try:
requested = self._request(latest_state)
except slapos.slap.ServerError:
self._logger.exception('Got an error requesting partition for '
'its state')
return INSTANCE_STATE_UNKNOWN
part_id = requested.getId()
self._logger.debug('part_id = %r', part_id)
if not part_id:
# Master did not allocate a partition yet.
return INSTANCE_STATE_UNKNOWN
return INSTANCE_STATE_DICT[self._retryRPC(
GET_PARTITION_STATE_METHOD_ID,
(self.computer_guid, part_id)
)]
def install(self):
"""
Make software available on computer.
"""
self._supply('available')
def uninstall(self):
"""
Make software unavailable on computer.
"""
self._supply('destroyed')
def start(self):
"""
Request started instance (or starting existing one)
"""
self._request('started')
request = start
def stop(self):
"""
Request stopped instance (or stopping existing one).
"""
self._request('stopped')
def destroy(self):
"""
Destroy existing instance.
"""
self._request('destroyed')
def teardown(self):
"""
Interrupt a running test sequence, putting it in idle state.
"""
if self.request_kw is not None:
self.destroy()
self.uninstall()
self.state = TESTER_STATE_INSTANCE_UNISTALLED
def tic(self, now):
"""
Check for missed deadlines (-> test failure), conditions for moving to
next state, and actually moving to next state (executing its payload).
"""
deadline = self.deadline
if deadline < now and deadline is not None:
raise TestTimeout(self.state)
_, _, state, software_state, instance_state = self.transition_dict[
self.state]
if (software_state is None or
software_state == self._getSoftwareState()) and (
instance_state is None or
instance_state == self._getInstanceState()):
if state is None:
return None
self._logger.debug('Going to state %i (%r, %r)', state,
software_state, instance_state)
self.state = state
step, delay, _, _, _ = self.transition_dict[state]
self.deadline = now + delay
getattr(self, step)()
return self.deadline
def main():
"""
Note: This code does not test as much as it monitors.
The goal is to regularily try to build & instanciate a software release
on several machines, to monitor vifib stability and SR stability as time
passes (and things once available online become unavailable).
Part of this function could be reused to make an actual test bot, testing
only when actual changes are committed to a software release, to look for
regressions.
Note: This code does not connect to any instanciated service, it relies on
the presence of a promise section to make instanciation fail until promise
is happy.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--pidfile', '-p', help='pidfile preventing parallel '
'execution.')
parser.add_argument('--log', '-l', help='Log file path.')
parser.add_argument('--verbose', '-v', help='Be verbose.',
action='store_true')
parser.add_argument('configuration_file', type=argparse.FileType(),
help='Slap Test Agent configuration file.')
# Just to keep strong references to AutoSTemp instances
key_file_dict = {}
def asFilenamePair(key, cert):
# Note: python's ssl support only supports fetching key & cert data
# from on-disk files. This is why we need to "convert" direct data
# into file paths, using temporary files.
cert = cert.strip()
try:
temp_key, temp_cert = key_file_dict[cert]
except KeyError:
temp_key = AutoSTemp(key.strip())
temp_cert = AutoSTemp(cert)
key_file_dict[cert] = (temp_key, temp_cert)
return temp_key.name, temp_cert.name
args = parser.parse_args()
pidfile = args.pidfile
if pidfile:
setRunning(pidfile)
try: try:
self.supply.supply(self.software_uri[software], computer, "available") log = args.log
self.logger.info("Successfully requested to install %s on %s." % (software, computer)) formatter = logging.Formatter('%(asctime)s %(message)s')
return True logger = logging.getLogger()
except: if args.verbose:
self.logger.info("Failed to request to install %s on %s." % (software, computer)) log_level = logging.DEBUG
return False else:
log_level = logging.INFO
def writeState(self): logger.setLevel(log_level)
state = ConfigParser.SafeConfigParser() handler = logging.StreamHandler(sys.stdout)
for computer in self.computer_list: handler.setFormatter(formatter)
state.add_section(computer) logger.addHandler(handler)
state.set(computer, "installing_software", \ if log:
json.dumps(_encode_software_dict(self.installing_software_dict[computer]))) handler = logging.FileHandler(log)
state.set(computer, "installed_software", \ handler.setFormatter(formatter)
json.dumps(_encode_software_dict(self.installed_software_dict[computer]))) logger.addHandler(handler)
state.write(open(self.state_file, "w")) log_file = open(log)
log_file.seek(0, 2)
def main(*args): section_dict = {}
parser = argparse.ArgumentParser() configuration = ConfigParser.SafeConfigParser()
parser.add_argument("--pidfile", help="The location where pidfile will be created.") configuration.readfp(args.configuration_file)
parser.add_argument("configuration_file", nargs=1, type=argparse.FileType(), for section in configuration.sections():
help="Slap Test Agent configuration file.") if section == 'agent':
if args == (): continue
argument_option_instance = parser.parse_args() section_dict[section] = section_entry_dict = dict(
else: configuration.items(section))
argument_option_instance = \ for key in ('request_kw', 'max_install_duration',
parser.parse_args(list(args)) 'max_destroy_duration', 'max_request_duration',
option_dict = {} 'max_uninstall_duration', 'computer_list',
configuration_file = argument_option_instance.configuration_file[0] ):
for argument_key, argument_value in vars(argument_option_instance if key in section_entry_dict:
).iteritems(): section_entry_dict[key] = json.loads(
option_dict.update({argument_key:argument_value}) section_entry_dict[key])
pidfile = option_dict.get("pidfile", None) if 'key' in section_entry_dict:
if pidfile: key_file, cert_file = asFilenamePair(section_entry_dict['key'],
setRunning(pidfile) section_entry_dict['cert'])
section_entry_dict['key'] = key_file
agent = Agent(configuration_file) section_entry_dict['cert'] = cert_file
now = datetime.now() agent_parameter_dict = dict(configuration.items('agent'))
for computer in agent.computer_list: # XXX: should node title be auto-generated by installation recipe ?
installing_software_list = agent.getInstallingSoftwareReleaseListOnComputer(computer) # For example, using computer guid.
installed_software_list = agent.getInstalledSoftwareReleaseListOnComputer(computer) node_title = agent_parameter_dict['node_title']
destroying_software_list = agent.getDestroyingSoftwareReleaseListOnComputer(computer) test_title = agent_parameter_dict['test_title']
if len(installing_software_list) == 0: project_title = agent_parameter_dict['project_title']
software = choice(agent.software_list) parallel_task_count = int(agent_parameter_dict.get('task_count', 1))
if software in installed_software_list or software in destroying_software_list: task_distribution_tool = TaskDistributionTool(agent_parameter_dict[
pass 'report_url'])
else: master_slap_connection_dict = {}
if agent.requestSoftwareReleaseInstallationOnComputer(computer, software): test_result = task_distribution_tool.createTestResult(
agent.installing_software_dict[computer][software] = datetime.now() revision='',
else: test_name_list=section_dict.keys(),
for installing_software in installing_software_list: node_title=node_title,
if installing_software in agent.installing_software_dict[computer]: allow_restart=True,
start_time = agent.installing_software_dict[computer][installing_software] test_title=test_title,
if now - start_time > agent.maximum_software_installation_duration: project_title=project_title,
agent.logger.info("Failed to install %s on %s in %s." % \ )
(installing_software, computer, agent.maximum_software_installation_duration)) test_result.watcher_period = 60
if agent.requestSoftwareReleaseCleanupOnComputer(computer, installing_software): if log:
del agent.installing_software_dict[computer][installing_software] test_result.addWatch(log, log_file, max_history_bytes=10000)
for installed_software in installed_software_list: assert test_result is not None
if installed_software in agent.installing_software_dict[computer]: ran_test_set = set()
agent.logger.info("Successfully installed %s on %s." % (installed_software, computer)) running_test_dict = {}
del agent.installing_software_dict[computer][installed_software] more_tests = True
agent.installed_software_dict[computer][installed_software] = now while True:
elif installed_software in agent.installed_software_dict[computer] and \ # Get up to parallel_task_count tasks to execute
agent.getSoftwareReleaseUsageOnComputer(computer, installed_software) == 0 and \ while len(running_test_dict) < parallel_task_count and \
now - agent.installed_software_dict[computer][installed_software] > agent.software_live_duration: more_tests:
if agent.requestSoftwareReleaseCleanupOnComputer(computer, installed_software): test_line = test_result.start(
del agent.installed_software_dict[computer][installed_software] exclude_list=list(ran_test_set))
agent.writeState() if test_line is None:
more_tests = False
if pidfile: break
setFinished(pidfile) test_name = test_line.name
try:
if __name__ == "__main__": section_entry_dict = section_dict[test_name]
main() except KeyError:
# We don't know how to execute this test. Assume it doesn't
# exist anymore, and fail it in result.
test_line.stop(stderr='This test does not exist on test '
'node %s' % (node_title, ))
continue
master_url = section_entry_dict['master_url']
master_slap_connection_key = (master_url,
section_entry_dict.get('key'))
try:
supply, order, rpc = master_slap_connection_dict[
master_slap_connection_key]
except KeyError:
key = section_entry_dict.get('key')
cert = section_entry_dict.get('cert')
slap = slapos.slap.slap()
slap.initializeConnection(master_url, key, cert)
supply = slap.registerSupply()
order = slap.registerOpenOrder()
assert master_url.startswith('https:')
rpc = xmlrpclib.ServerProxy(master_url, allow_none=True,
transport=x509Transport(
{'key_file': key, 'cert_file': cert}))
master_slap_connection_dict[
master_slap_connection_key] = (supply, order, rpc)
tester = SoftwareReleaseTester(
test_name + '_' + node_title + time.strftime(
'_%Y/%m/%d_%H:%M:%S_+0000', time.gmtime()),
logger,
rpc,
supply,
order,
section_entry_dict['url'],
random.choice(section_entry_dict['computer_list']),
section_entry_dict['max_install_duration'],
section_entry_dict['max_uninstall_duration'],
section_entry_dict.get('request_kw'),
section_entry_dict.get('max_request_duration'),
section_entry_dict.get('max_destroy_duration'),
)
ran_test_set.add(test_name)
running_test_dict[test_name] = (test_line, tester)
if not running_test_dict:
break
now = time.time()
# Synchronise refreshes on watcher period, so it doesn't report a
# stalled test node where we are actually still sleeping.
# Change test_result.watcher_period outside this loop if you wish
# to change sleep duration.
next_deadline = now + test_result.watcher_period
for section, (test_line, tester) in running_test_dict.items():
logger.info('Checking %s: %r...', section, tester)
try:
deadline = tester.tic(now)
except Exception:
logger.exception('test failed')
test_line.stop(
test_count=1,
error_count=1,
failure_count=0,
skip_count=0,
stderr=traceback.format_exc(),
)
del running_test_dict[section]
try:
tester.teardown()
except Exception:
logger.exception('teardown failed, human '
'assistance needed for cleanup')
raise
else:
logger.info('%r', tester)
if deadline is None:
# TODO: report how long each step took.
logger.info('Finished !')
test_line.stop(
test_count=1,
error_count=0,
failure_count=0,
skip_count=0,
)
del running_test_dict[section]
else:
next_deadline = min(deadline, next_deadline)
if running_test_dict:
to_sleep = next_deadline - time.time()
if to_sleep > 0:
logger.info('Sleeping %is...', to_sleep)
time.sleep(to_sleep)
if not test_result.isAlive():
for _, tester in running_test_dict.itervalues():
tester.teardown()
finally:
if pidfile:
setFinished(pidfile)
# Help interpreter get rid of AutoSTemp instances.
key_file_dict.clear()
if __name__ == '__main__':
main()
import ConfigParser, argparse, pprint, socket, sys, time, xmlrpclib, json
def safeRpcCall(function, *args):
retry = 64
xmlrpc_arg_list = []
for argument in args:
if isinstance(argument, dict):
argument = dict([(x, isinstance(y,str) and xmlrpclib.Binary(y) or y) \
for (x,y) in argument.iteritems()])
xmlrpc_arg_list.append(argument)
while True:
try:
return function(*xmlrpc_arg_list)
except (socket.error, xmlrpclib.ProtocolError), e:
print >>sys.stderr, e
pprint.pprint(args, file(function._Method__name, 'w'))
time.sleep(retry)
retry += retry >> 1
def main(*args):
parser = argparse.ArgumentParser()
parser.add_argument("configuration_file", nargs=1, type=argparse.FileType(),
help="Slap Test Agent configuration file.")
if args == ():
argument_option_instance = parser.parse_args()
else:
argument_option_instance = \
parser.parse_args(list(args))
configuration_file = argument_option_instance.configuration_file[0]
configuration = ConfigParser.SafeConfigParser()
configuration.readfp(configuration_file)
master_url = configuration.get("agent", "report_url")
if master_url[-1] != '/':
master_url += '/'
master = xmlrpclib.ServerProxy("%s%s" %
(master_url, 'portal_task_distribution'),
allow_none=1)
assert master.getProtocolRevision() == 1
software_list = json.loads(configuration.get("agent", "software_list"))
test_result = safeRpcCall(master.createTestResult,
"SlapOS Test", "", software_list,
True, "SlapOS Test", "SlapOS Test Agent", "ViFiB Project")
test_result_path, revision = test_result
state = ConfigParser.SafeConfigParser()
state.add_section("path")
exclude_list = software_list[:]
for software in software_list:
exclude_list.remove(software)
test_path, test_name = safeRpcCall(master.startUnitTest,
test_result_path, exclude_list)
state.set("path", test_name, test_path)
exclude_list.append(software)
path_file = configuration.get("agent", "path_file")
state.write(open(path_file, "w"))
if __name__ == "__main__":
main()
import ConfigParser, argparse, pprint, socket, sys, time, xmlrpclib, json, os
from datetime import datetime
def safeRpcCall(function, *args):
retry = 64
xmlrpc_arg_list = []
for argument in args:
if isinstance(argument, dict):
argument = dict([(x, isinstance(y,str) and xmlrpclib.Binary(y) or y) \
for (x,y) in argument.iteritems()])
xmlrpc_arg_list.append(argument)
while True:
try:
return function(*xmlrpc_arg_list)
except (socket.error, xmlrpclib.ProtocolError), e:
print >>sys.stderr, e
pprint.pprint(args, file(function._Method__name, 'w'))
time.sleep(retry)
retry += retry >> 1
def main(*args):
parser = argparse.ArgumentParser()
parser.add_argument("configuration_file", nargs=1, type=argparse.FileType(),
help="Slap Test Agent configuration file.")
if args == ():
argument_option_instance = parser.parse_args()
else:
argument_option_instance = \
parser.parse_args(list(args))
configuration_file = argument_option_instance.configuration_file[0]
configuration = ConfigParser.SafeConfigParser()
configuration.readfp(configuration_file)
master_url = configuration.get("agent", "report_url")
software_list = json.loads(configuration.get("agent", "software_list"))
if master_url[-1] != '/':
master_url += '/'
master = xmlrpclib.ServerProxy("%s%s" %
(master_url, 'portal_task_distribution'),
allow_none=1)
assert master.getProtocolRevision() == 1
log_directory = configuration.get("agent", "log_directory")
logfile_path = os.path.join(log_directory, "agent-%s.log" % datetime.strftime(datetime.now(), "%Y%m%d"))
logfile = open(logfile_path, 'r')
logline_list = logfile.readlines()
path_file = configuration.get("agent", "path_file")
state = ConfigParser.SafeConfigParser()
state.readfp(open(path_file))
for software in software_list:
test_path = state.get("path", software)
success, failure = 0, 0
log = ''
for logline in logline_list:
success_pattern = "Successfully installed %s" % software
failure_pattern = "Failed to install %s" % software
if success_pattern in logline:
success = success + 1
log = log + logline
if failure_pattern in logline:
failure = failure + 1
log = log + logline
safeRpcCall(master.stopUnitTest, test_path,
{
"status_code": 0,
"stderr": log,
"duration": 0,
"test_count": success + failure,
"failure_count": failure
})
if __name__ == "__main__":
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment