Commit 813f8853 authored by Rafael Monnerat's avatar Rafael Monnerat

Fully reimplementation of Test Agent

  - Get precise state of all software instances from a Hosting Subscription
  - Get the connection parameters from  all software instances from a Hosting Subscription
  - Get the instance parameters from all software instances from a Hosting Subscription
  - Move all API to connect slapos master to HAL implementation
  - Get the state of the Software Installation
  - Try to connect to monitoring tool in case of error.
  - Use TaskDistributor to determinate the tests (instead local file)
  - Use erp5.util and few erp5testnode features to determinate test revisions.
  - Download informations from the Task distributor instead use parameters (it makes implementation more flexible).
parent dc9a069a
import ConfigParser
import argparse
import collections
import datetime
import httplib
import json
import logging
import os
import random
import sys
import tempfile
import traceback
import time
import xmlrpclib
import os
import tempfile
import slapos.slap
from slapos.grid.utils import setRunning, setFinished
from erp5.util.taskdistribution import TaskDistributionTool, RPCRetry
from erp5.util.taskdistribution import SAFE_RPC_EXCEPTION_LIST
from erp5.util.taskdistribution import TaskDistributor, TaskDistributionTool
from erp5.util.testnode.Updater import Updater
from erp5.util.testnode.ProcessManager import SubprocessError, ProcessManager, CancellationError
class AutoSTemp(object):
......@@ -39,562 +38,366 @@ class AutoSTemp(object):
def __del__(self):
self.__unlink(self.__name)
SOFTWARE_STATE_UNKNOWN = -1
SOFTWARE_STATE_INSTALLING = 0
SOFTWARE_STATE_INSTALLED = 1
SOFTWARE_STATE_DESTROYING = 2
INSTANCE_STATE_UNKNOWN = -1
INSTANCE_STATE_STARTING = 0
INSTANCE_STATE_STARTED = 1
INSTANCE_STATE_STOPPING = 2
INSTANCE_STATE_STOPPED = 3
INSTANCE_STATE_DESTROYING = 4
TESTER_STATE_INITIAL = -1
TESTER_STATE_NOTHING = 0
TESTER_STATE_SOFTWARE_INSTALLED = 1
TESTER_STATE_INSTANCE_INSTALLED = 2
TESTER_STATE_INSTANCE_STARTED = 4
TESTER_STATE_INSTANCE_UNINSTALLED = 5
class x509Transport(xmlrpclib.Transport):
"""
Similar to xmlrpclib.SecureTransport, but with actually usable x509
support.
"""
def __init__(self, x509, *args, **kw):
xmlrpclib.Transport.__init__(self, *args, **kw)
self.__x509 = x509
def make_connection(self, host):
if not self._connection or host != self._connection[0]:
try:
HTTPSConnection = httplib.HTTPSConnection
except AttributeError:
raise NotImplementedError("your version of httplib doesn't "
"support HTTPS")
else:
chost, self._extra_headers, x509 = self.get_host_info((host,
self.__x509))
self._connection = (host, HTTPSConnection(chost, None, **x509))
return self._connection[1]
class TestTimeout(Exception):
pass
# Simple decorator to prevent raise due small
# network failures.
def retryOnNetworkFailure(func):
def wrapper(*args, **kwargs):
retry_time = 64
while True:
try:
return func(*args, **kwargs)
except SAFE_RPC_EXCEPTION_LIST, e:
print 'Network failure: %s , %s' % (sys.exc_info(), e)
print 'Retry method %s in %i seconds' % (func, retry_time)
time.sleep(retry_time)
retry_time += retry_time >> 1
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
class SoftwareReleaseTester(RPCRetry):
deadline = None
latest_state = None
def __init__(self,
name,
logger,
master,
slap_supply, # slapos supply to manage software release
slap_order, # slapos open order to manage instance
url, # software release url
computer_guid, # computer to use for this test run
max_install_duration,
max_uninstall_duration,
request_kw=None, # instance parameters, if instantiation
# testing is desired
max_request_duration=None,
max_destroy_duration=None,
):
super(SoftwareReleaseTester, self).__init__(master, 16, logger)
self.name = name
self.slap_supply = slap_supply
self.slap_order = slap_order
self.url = url
self.computer_guid = computer_guid
self.request_kw = request_kw
self.state = TESTER_STATE_INITIAL
self.transition_dict = {
# step function
# delay
# next_state
# software_state
# instance_state
TESTER_STATE_INITIAL: (
lambda t: None,
None,
TESTER_STATE_NOTHING,
None,
None,
),
TESTER_STATE_NOTHING: (
lambda t: t.install(),
max_install_duration,
request_kw is None and TESTER_STATE_INSTANCE_UNINSTALLED or \
TESTER_STATE_SOFTWARE_INSTALLED,
SOFTWARE_STATE_INSTALLED,
None,
),
TESTER_STATE_SOFTWARE_INSTALLED: (
lambda t: t.request(),
max_request_duration,
TESTER_STATE_INSTANCE_STARTED,
None,
INSTANCE_STATE_STARTED,
),
TESTER_STATE_INSTANCE_STARTED: (
lambda t: t.destroy(),
max_destroy_duration,
TESTER_STATE_INSTANCE_UNINSTALLED,
None,
INSTANCE_STATE_UNKNOWN,
),
TESTER_STATE_INSTANCE_UNINSTALLED: (
lambda t: t.uninstall(),
max_uninstall_duration,
None,
None,
INSTANCE_STATE_UNKNOWN,
),
}
def __repr__(self):
deadline = self.deadline
if deadline is not None:
deadline -= time.time()
deadline = '+%is' % (deadline, )
return '<%s(state=%s, deadline=%s) at %x>' % (
self.__class__.__name__, self.state, deadline, id(self))
@retryOnNetworkFailure
def _supply(self, state):
self._logger.info('Supply %s@%s: %s', self.url, self.computer_guid,
state)
return self.slap_supply.supply(self.url, self.computer_guid, state)
@retryOnNetworkFailure
def _request(self, state):
self._logger.info('Request %s@%s: %s', self.url, self.name, state)
self.latest_state = state
return self.slap_order.request(
software_release=self.url,
partition_reference=self.name,
state=state,
**self.request_kw
)
def _getSoftwareState(self):
return SOFTWARE_STATE_INSTALLED
def _getInstanceState(self):
latest_state = self.latest_state
self._logger.debug('latest_state = %r', latest_state)
if latest_state is None:
return INSTANCE_STATE_UNKNOWN
try:
requested = self._request(latest_state)
if requested._computer_id is None:
return INSTANCE_STATE_UNKNOWN
except slapos.slap.ServerError:
self._logger.exception('Got an error requesting partition for '
'its state')
return INSTANCE_STATE_UNKNOWN
try:
instance_state = requested.getStatus()
# the following does NOT take TZ into account
created_at = datetime.datetime.strptime(instance_state['created_at'], '%a, %d %b %Y %H:%M:%S %Z')
gmt_now = datetime.datetime(*time.gmtime()[:6])
self._logger.debug('Instance state: ', instance_state)
self._logger.debug('Created at: %s (%d)' % (instance_state['created_at'], (gmt_now - created_at).seconds))
if instance_state['text'].startswith('#error no data found'):
return INSTANCE_STATE_UNKNOWN
elif instance_state['text'].startswith('#access') \
and (gmt_now - created_at).seconds < 300:
return INSTANCE_STATE_STARTED
except slapos.slap.ResourceNotReady:
return INSTANCE_STATE_UNKNOWN
return INSTANCE_STATE_UNKNOWN
def install(self):
"""
Make software available on computer.
"""
self._supply('available')
def uninstall(self):
"""
Make software unavailable on computer.
"""
self._supply('destroyed')
def start(self):
"""
Request started instance (or starting existing one)
"""
self._request('started')
request = start
def stop(self):
"""
Request stopped instance (or stopping existing one).
"""
self._request('stopped')
def destroy(self):
"""
Destroy existing instance.
"""
self._request('destroyed')
def teardown(self):
"""
Interrupt a running test sequence, putting it in idle state.
"""
self._logger.info('Invoking TearDown for %s@%s' % (self.url, self.name))
if self.request_kw is not None:
self.destroy()
self.uninstall()
self.state = TESTER_STATE_INSTANCE_UNINSTALLED
def tic(self, now):
"""
Check for missed deadlines (-> test failure), conditions for moving to
next state, and actually moving to next state (executing its payload).
"""
self._logger.debug('TIC')
deadline = self.deadline
if deadline < now and deadline is not None:
raise TestTimeout(self.state)
_, _, next_state, software_state, instance_state = self.transition_dict[
self.state]
if (software_state is None or
software_state == self._getSoftwareState()) and (
instance_state is None or
instance_state == self._getInstanceState()):
self._logger.debug('Going to state %s (%r, %r)', next_state,
software_state, instance_state)
if next_state is None:
return None
self.state = next_state
stepfunc, delay, _, _, _ = self.transition_dict[next_state]
self.deadline = now + delay
stepfunc(self)
return self.deadline
from tester import SoftwareReleaseTester
class TestMap(object):
def __init__(self, test_dict):
self.test_map_dict = collections.OrderedDict()
for key in test_dict:
target_computer = test_dict[key]["target_computer"]
if target_computer not in self.test_map_dict:
self.test_map_dict[target_computer] = [key]
else:
self.test_map_dict[target_computer].append(key)
def getExcludeList(self, computer_id):
exclude_list = []
for key in self.test_map_dict:
if key != computer_id:
exclude_list.extend(self.test_map_dict[key])
return set(exclude_list)
def getComputerList(self):
return self.test_map_dict.keys()
def dropComputer(self, computer_id):
del self.test_map_dict[computer_id]
def cleanUp(self):
for key in self.test_map_dict.copy():
if len(self.test_map_dict[key]) == 0:
del self.test_map_dict[key]
def getNextComputer(self, used_computer_list):
for computer in self.getComputerList():
if computer not in used_computer_list:
return computer
return None
def main():
"""
Note: This code does not test as much as it monitors.
The goal is to regularily try to build & instantiate a software release
on several machines, to monitor vifib stability and SR stability as time
passes (and things once available online become unavailable).
Part of this function could be reused to make an actual test bot, testing
only when actual changes are committed to a software release, to look for
regressions.
Note: This code does not connect to any instantiated service, it relies on
the presence of a promise section to make instantiation fail until promise
is happy.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--pidfile', '-p', help='pidfile preventing parallel '
'execution.')
parser.add_argument('--log', '-l', help='Log file path.')
parser.add_argument('--verbose', '-v', help='Be verbose.',
action='store_true')
parser.add_argument('configuration_file', type=argparse.FileType(),
help='Slap Test Agent configuration file.')
# Just to keep strong references to AutoSTemp instances
key_file_dict = {}
def asFilenamePair(key, cert):
# Note: python's ssl support only supports fetching key & cert data
# from on-disk files. This is why we need to "convert" direct data
# into file paths, using temporary files.
cert = cert.strip()
def __init__(self, test_dict):
self.test_map_dict = collections.OrderedDict()
for key in test_dict:
group = test_dict[key].get("group", "default")
if group not in self.test_map_dict:
self.test_map_dict[group] = [key]
def getExcludeList(self, group):
exclude_list = []
for key in self.test_map_dict:
if key != group:
exclude_list.extend(self.test_map_dict[key])
return set(exclude_list)
def getGroupList(self):
return self.test_map_dict.keys()
def dropGroup(self, group):
del self.test_map_dict[group]
def cleanEmptyGroup(self):
for key in self.test_map_dict.copy():
if len(self.test_map_dict[key]) == 0:
del self.test_map_dict[key]
def getNextGroup(self, ignore_list):
for group in self.getGroupList():
if group not in ignore_list:
return group
return None
def loadConfiguration(configuration, logger):
section_dict = collections.OrderedDict()
for section in configuration.sections():
if section == 'agent':
continue
section_dict[section] = section_entry_dict = dict(
configuration.items(section))
for key in ('request_kw', ):
if key in section_entry_dict:
try:
temp_key, temp_cert = key_file_dict[cert]
except KeyError:
temp_key = AutoSTemp(key.strip())
temp_cert = AutoSTemp(cert)
key_file_dict[cert] = (temp_key, temp_cert)
return temp_key.name, temp_cert.name
args = parser.parse_args()
log = args.log
formatter = logging.Formatter('%(asctime)s %(message)s')
logger = logging.getLogger()
if args.verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
logger.setLevel(log_level)
handler = logging.StreamHandler(sys.stdout)
if isinstance(section_entry_dict[key], str) or \
isinstance(section_entry_dict[key], unicode):
section_entry_dict[key] = json.loads(section_entry_dict[key])
except Exception:
logger.error("Fail to load %s on %s" % (key, section_entry_dict))
raise
if "group" not in section_entry_dict:
section_entry_dict["group"] = "default"
return section_dict
def getLogger(log, verbose):
formatter = logging.Formatter('%(asctime)s %(message)s')
logger = logging.getLogger()
if verbose:
log_level = logging.DEBUG
else:
log_level = logging.INFO
logger.setLevel(log_level)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger.addHandler(handler)
log_file = None
if log:
handler = logging.FileHandler(log)
handler.setFormatter(formatter)
logger.addHandler(handler)
if log:
handler = logging.FileHandler(log)
handler.setFormatter(formatter)
logger.addHandler(handler)
log_file = open(log)
log_file.seek(0, 2)
pidfile = args.pidfile
if pidfile:
setRunning(pidfile)
try:
section_dict = collections.OrderedDict()
configuration = ConfigParser.SafeConfigParser()
configuration.readfp(args.configuration_file)
for section in configuration.sections():
if section == 'agent':
continue
section_dict[section] = section_entry_dict = dict(
configuration.items(section))
for key in ('request_kw', 'max_install_duration',
'max_destroy_duration', 'max_request_duration',
'max_uninstall_duration', 'computer_list'
):
if key in section_entry_dict:
try:
if isinstance(section_entry_dict[key], str) or \
isinstance(section_entry_dict[key], unicode):
section_entry_dict[key] = json.loads(
section_entry_dict[key])
except Exception as exc:
logger.error("Fail to load %s on %s" % (key, section_entry_dict))
raise
if 'key' in section_entry_dict:
key_file, cert_file = asFilenamePair(section_entry_dict['key'],
section_entry_dict['cert'])
section_entry_dict['key'] = key_file
section_entry_dict['cert'] = cert_file
if "computer_list" in section_entry_dict:
section_entry_dict["target_computer"] = \
random.choice(section_entry_dict["computer_list"])
agent_parameter_dict = dict(configuration.items('agent'))
# XXX: should node title be auto-generated by installation recipe ?
# For example, using computer guid.
node_title = agent_parameter_dict['node_title']
test_title = agent_parameter_dict['test_title']
project_title = agent_parameter_dict['project_title']
task_distribution_tool = TaskDistributionTool(agent_parameter_dict[
'report_url'])
master_slap_connection_dict = {}
log_file = open(log)
log_file.seek(0, 2)
return logger, log_file
def getAndUpdateFullRevisionList(node_test_suite, working_directory,
logger, process_manager, git_binary="git"):
full_revision_list = []
base_path = os.path.join(working_directory,
node_test_suite.get('test_suite_reference'))
for vcs_repository in node_test_suite.get('vcs_repository_list', []):
repository_path = os.path.join(base_path,
vcs_repository['buildout_section_id'])
repository_id = vcs_repository['buildout_section_id']
branch = vcs_repository.get('branch')
# Make sure we have local repository
updater = Updater(repository_path, git_binary=git_binary,
branch=branch, log=logger.info, process_manager=process_manager,
working_directory=working_directory,
url=vcs_repository["url"])
updater.checkout()
revision = "-".join(updater.getRevision())
full_revision_list.append('%s=%s' % (repository_id, revision))
node_test_suite['revision'] = ','.join(full_revision_list)
return full_revision_list
def main():
"""
Note: This code does not test as much as it monitors.
The goal is to regularily try to build & instantiate a software release
on several machines, to monitor vifib stability and SR stability as time
passes (and things once available online become unavailable).
Part of this function could be reused to make an actual test bot, testing
only when actual changes are committed to a software release, to look for
regressions.
Note: This code does not connect to any instantiated service, it relies on
the presence of a promise section to make instantiation fail until promise
is happy.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--pidfile', '-p', help='pidfile preventing parallel '
'execution.')
parser.add_argument('--log', '-l', help='Log file path.')
parser.add_argument('--verbose', '-v', help='Be verbose.',
action='store_true')
parser.add_argument('configuration_file', type=argparse.FileType(),
help='Slap Test Agent configuration file.')
key_file_dict = {}
args = parser.parse_args()
log = args.log
logger, log_file = getLogger(log, args.verbose)
pidfile = args.pidfile
if pidfile:
setRunning(logger=logger, pidfile=pidfile)
try:
while True:
configuration = ConfigParser.SafeConfigParser()
configuration.readfp(args.configuration_file)
section_dict = loadConfiguration(configuration, logger)
agent_parameter_dict = dict(configuration.items('agent'))
task_distributor = TaskDistributor(agent_parameter_dict['report_url'])
task_distributor.subscribeNode(
node_title=agent_parameter_dict['node_title'],
computer_guid="None")
test_suite_data = task_distributor.startTestSuite(
node_title=agent_parameter_dict['node_title'],
computer_guid="None")
task_distribution_tool = TaskDistributionTool(
agent_parameter_dict['report_url'],
logger=logger)
if type(test_suite_data) == str:
# Backward compatiblity
test_suite_data = json.loads(test_suite_data)
slap_account_key = task_distributor.getSlaposAccountKey()
slap_certificate = task_distributor.getSlaposAccountCertificate()
master_url = task_distributor.getSlaposUrl()
key_file_dict = {}
def asFilenamePair(key, cert):
# Note: python's ssl support only supports fetching key & cert data
# from on-disk files. This is why we need to "convert" direct data
# into file paths, using temporary files.
cert = cert.strip()
try:
temp_key, temp_cert = key_file_dict[cert]
except KeyError:
temp_key = AutoSTemp(key.strip())
temp_cert = AutoSTemp(cert)
key_file_dict[cert] = (temp_key, temp_cert)
return temp_key.name, temp_cert.name
key_file, cert_file = asFilenamePair(slap_account_key,
slap_certificate)
process_manager = ProcessManager(logger.info)
for test_suite in test_suite_data:
full_revision_list = getAndUpdateFullRevisionList(test_suite,
agent_parameter_dict["working_directory"], logger, process_manager)
unit_test_dict = task_distributor.generateConfiguration(
test_suite['test_suite_title'])
if not len(full_revision_list):
# We don't watch git revision but we periodically
# run the test, once a day.
full_revision_list = ["day=%s" % time.strftime('%Y/%m/%d', time.gmtime())]
if type(unit_test_dict) == str:
# Backward compatiblity
unit_test_dict = json.loads(unit_test_dict)
test_result = task_distribution_tool.createTestResult(
revision='',
test_name_list=section_dict.keys(),
node_title=node_title,
allow_restart=True,
test_title=test_title,
project_title=project_title,
revision=','.join(full_revision_list),
test_name_list=unit_test_dict.keys(),
node_title=agent_parameter_dict['node_title'],
allow_restart=False,
test_title=test_suite['test_suite_title'],
project_title=agent_parameter_dict['project_title'],
)
test_result.watcher_period = 300
if log:
test_result.addWatch(log, log_file, max_history_bytes=10000)
if test_result is None:
# We already have a test result
logger.info('Skiping test for %s, result already available (%s)' %
(test_suite['test_suite_title'], ','.join(full_revision_list)))
continue
test_result.watcher_period = 5 #120
assert test_result is not None
test_mapping = TestMap(section_dict)
if log_file is not None:
test_result.addWatch(log, log_file, max_history_bytes=10000)
logger.info("Starting to run for %s" % test_result )
test_mapping = TestMap(unit_test_dict)
logger.info("Running %s tests in parallel." % \
len(test_mapping.getComputerList()))
len(test_mapping.getGroupList()))
assert master_url.startswith('https:')
slap = slapos.slap.slap()
slap.initializeConnection(
master_url, key_file, cert_file)
supply = slap.registerSupply()
order = slap.registerOpenOrder()
ran_test_set = set()
running_test_dict = {}
more_tests = True
logger.info('Starting Test Agent run %s ' % node_title)
logger.info('Starting Test Agent run %s ' % agent_parameter_dict['node_title'])
while True:
# Get up to parallel_task_count tasks to execute
while len(running_test_dict) < len(test_mapping.getComputerList())\
and more_tests:
test_mapping.cleanUp()
target_computer = test_mapping.getNextComputer([computer \
for _, _, computer in running_test_dict.itervalues()])
test_line = test_result.start(
exclude_list= list(ran_test_set) + \
list(test_mapping.getExcludeList(target_computer)))
logger.info("Test Line: %s " % test_line)
logger.info("Ran Test Set: %s " % ran_test_set)
logger.info("Running test dict: %s " % running_test_dict)
logger.info("Target Computer: %s " % target_computer)
if test_line is None:
test_mapping.dropComputer(target_computer)
if len(test_mapping.getComputerList()) == 0:
more_tests = False
continue
test_name = test_line.name
try:
section_entry_dict = section_dict[test_name]
except KeyError:
# We don't know how to execute this test. Assume it doesn't
# exist anymore, and fail it in result.
test_line.stop(stderr='This test does not exist on test '
'node %s' % (node_title, ))
continue
master_url = section_entry_dict['master_url']
master_slap_connection_key = (master_url,
section_entry_dict.get('key'))
try:
supply, order, rpc = master_slap_connection_dict[
master_slap_connection_key]
except KeyError:
key = section_entry_dict.get('key')
cert = section_entry_dict.get('cert')
slap = slapos.slap.slap()
slap.initializeConnection(master_url, key, cert)
supply = slap.registerSupply()
order = slap.registerOpenOrder()
assert master_url.startswith('https:')
rpc = xmlrpclib.ServerProxy(master_url, allow_none=True,
transport=x509Transport(
{'key_file': key, 'cert_file': cert}))
master_slap_connection_dict[
master_slap_connection_key] = (supply, order, rpc)
tester = SoftwareReleaseTester(
test_name + '_' + node_title + time.strftime(
'_%Y/%m/%d_%H:%M:%S_+0000', time.gmtime()),
logger,
rpc,
supply,
order,
section_entry_dict['url'],
section_entry_dict['target_computer'],
section_entry_dict['max_install_duration'],
section_entry_dict['max_uninstall_duration'],
section_entry_dict.get('request_kw'),
section_entry_dict.get('max_request_duration'),
section_entry_dict.get('max_destroy_duration'),
)
ran_test_set.add(test_name)
running_test_dict[test_name] = (test_line, tester, target_computer)
if not running_test_dict:
break
now = time.time()
# Synchronise refreshes on watcher period, so it doesn't report a
# stalled test node where we are actually still sleeping.
# Change test_result.watcher_period outside this loop if you wish
# to change sleep duration.
next_deadline = now + test_result.watcher_period
for section, (test_line, tester, target_computer) in running_test_dict.items():
logger.info('Checking %s: %r...', section, tester)
# Get up to parallel_task_count tasks to execute
while len(running_test_dict) < len(test_mapping.getGroupList())\
and (len(test_mapping.getGroupList()) > 0):
test_mapping.cleanEmptyGroup()
# Select an unused computer to run the test.
group = test_mapping.getNextGroup(
ignore_list = [group for _, _, group in \
running_test_dict.itervalues()])
# Select a test
test_line = test_result.start(
exclude_list= list(ran_test_set) + \
list(test_mapping.getExcludeList(group)))
logger.info("Test Line: %s " % test_line)
logger.info("Ran Test Set: %s " % ran_test_set)
logger.info("Running test dict: %s " % running_test_dict)
logger.info("Group: %s " % group)
if test_line is None:
logger.info("Removing Group (empty test line): %s " % group)
test_mapping.dropGroup(group)
continue
test_name = test_line.name
try:
section_entry_dict = unit_test_dict[test_name]
except KeyError:
# We don't know how to execute this test. Assume it doesn't
# exist anymore, and fail it in result.
test_line.stop(stderr='This test does not exist on test '
'node %s' % (agent_parameter_dict['node_title'], ))
continue
general_timeout = agent_parameter_dict.get('timeout', 3600)
tester = SoftwareReleaseTester(
test_name + time.strftime('_%Y/%m/%d_%H:%M:%S_+0000', time.gmtime()),
logger,
slap,
order,
supply,
section_entry_dict['url'],
section_entry_dict.get('supply_computer'),
section_entry_dict.get('request_kw'),
agent_parameter_dict.get('software_timeout', general_timeout),
agent_parameter_dict.get('instance_timeout', general_timeout)
)
ran_test_set.add(test_name)
running_test_dict[test_name] = (test_line, tester, group)
if not running_test_dict:
logger.info('No more tests to run...')
break
now = time.time()
# Synchronise refreshes on watcher period, so it doesn't report a
# stalled test node where we are actually still sleeping.
# Change test_result.watcher_period outside this loop if you wish
# to change sleep duration.
next_deadline = now + test_result.watcher_period
for section, (test_line, tester, group) in running_test_dict.items():
logger.info('Checking %s: %r...', section, tester)
try:
deadline = tester.tic(now)
except Exception:
logger.exception('Test execution fail for %s' % (section))
test_line.stop(test_count=1, error_count=1, failure_count=0,
skip_count=0, command=tester.getInfo(),
stdout=tester.getFormatedLastMessage(),
stderr=traceback.format_exc())
del running_test_dict[section]
try:
tester.teardown()
except slapos.slap.NotFoundError:
# This exception is ignored because we cannot
# Teardown if SR URL do not exist.
logger.exception('Fail and not found')
pass
except Exception:
logger.exception('teardown failed, human assistance needed for cleanup')
raise
else:
logger.info('%r' % tester)
if deadline is None:
# TODO: report how long each step took.
logger.info('Test execution finished for %s' % (section))
test_line.stop(test_count=1, error_count=0, failure_count=0,
skip_count=0, command=tester.getInfo(), stdout=tester.getFormatedLastMessage())
del running_test_dict[section]
try:
deadline = tester.tic(now)
pass #tester.teardown()
except slapos.slap.NotFoundError:
# This exception is ignored because we cannot
# Teardown if SR URL do not exist.
logger.exception('Fail and not found')
pass
except Exception:
logger.exception('Test execution fail for %s' % (section))
test_line.stop(
test_count=1,
error_count=1,
failure_count=0,
skip_count=0,
stderr=traceback.format_exc(),
)
del running_test_dict[section]
try:
tester.teardown()
except slapos.slap.NotFoundError:
# This exception is ignored because we cannot
# Teardown if SR URL do not exist.
logger.exception('Fail and not found')
pass
except Exception:
logger.exception('teardown failed, human '
'assistance needed for cleanup')
raise
else:
logger.info('%r', tester)
if deadline is None:
# TODO: report how long each step took.
logger.info('Test execution finished for %s' % (section))
test_line.stop(
test_count=1,
error_count=0,
failure_count=0,
skip_count=0,
)
del running_test_dict[section]
try:
tester.teardown()
except slapos.slap.NotFoundError:
# This exception is ignored because we cannot
# Teardown if SR URL do not exist.
logger.exception('Fail and not found')
pass
except Exception:
logger.exception('teardown failed, human '
'assistance needed for cleanup')
raise
else:
next_deadline = min(deadline, next_deadline)
if running_test_dict:
to_sleep = next_deadline - time.time()
if to_sleep > 0:
logger.info('Sleeping %is...', to_sleep)
time.sleep(to_sleep)
if not test_result.isAlive():
for _, tester, computer_id in running_test_dict.itervalues():
tester.teardown()
finally:
if pidfile:
setFinished(pidfile)
# Help interpreter get rid of AutoSTemp instances.
key_file_dict.clear()
logger.exception('teardown failed, human assistance needed for cleanup')
raise
else:
next_deadline = min(deadline, next_deadline)
if running_test_dict:
to_sleep = next_deadline - time.time()
if to_sleep > 0:
logger.info('Sleeping %is...', to_sleep)
time.sleep(to_sleep)
if not test_result.isAlive():
for _, tester, computer_id in running_test_dict.itervalues():
tester.teardown()
finally:
if pidfile:
setFinished(pidfile)
key_file_dict.clear()
if __name__ == '__main__':
main()
main()
import datetime
import json
import sys
import traceback
import time
import feedparser
from uritemplate import expand
import slapos.slap
from slapos.slap import SoftwareProductCollection
from erp5.util.taskdistribution import SAFE_RPC_EXCEPTION_LIST
SOFTWARE_PRODUCT_NAMESPACE = "product."
SOFTWARE_STATE_UNKNOWN = "SOFTWARE_STATE_UNKNOWN"
SOFTWARE_STATE_INSTALLING = "SOFTWARE_STATE_INSTALLING"
SOFTWARE_STATE_INSTALLED = "SOFTWARE_STATE_INSTALLED"
SOFTWARE_STATE_DESTROYING = "SOFTWARE_STATE_DESTROYING"
INSTANCE_STATE_UNKNOWN = "INSTANCE_STATE_UNKNOWN"
INSTANCE_STATE_STARTING = "INSTANCE_STATE_STARTING"
INSTANCE_STATE_STARTED = "INSTANCE_STATE_STARTED"
INSTANCE_STATE_STARTED_WITH_ERROR = "INSTANCE_STATE_STARTED_WITH_ERROR"
INSTANCE_STATE_STOPPING = "INSTANCE_STATE_STOPPING"
INSTANCE_STATE_STOPPED = "INSTANCE_STATE_STOPPED"
INSTANCE_STATE_DESTROYING = "INSTANCE_STATE_DESTROYING"
TESTER_STATE_INITIAL = "TESTER_STATE_INITIAL"
TESTER_STATE_NOTHING = "TESTER_STATE_NOTHING"
TESTER_STATE_SOFTWARE_INSTALLED = "TESTER_STATE_SOFTWARE_INSTALLED"
TESTER_STATE_INSTANCE_INSTALLED = "TESTER_STATE_INSTANCE_INSTALLED"
TESTER_STATE_INSTANCE_STARTED = "TESTER_STATE_INSTANCE_STARTED"
TESTER_STATE_INSTANCE_UNINSTALLED = "TESTER_STATE_INSTANCE_UNINSTALLED"
class TestTimeout(Exception):
pass
# Simple decorator to prevent raise due small
# network failures.
def retryOnNetworkFailure(func):
def wrapper(*args, **kwargs):
retry_time = 64
while True:
try:
return func(*args, **kwargs)
except SAFE_RPC_EXCEPTION_LIST, e:
print 'Network failure: %s , %s' % (sys.exc_info(), e)
print 'Retry method %s in %i seconds' % (func, retry_time)
time.sleep(retry_time)
retry_time += retry_time >> 1
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
class SlapOSMasterCommunicator(object):
def __init__(self, slap, slap_supply, slap_order, url, logger):
self._logger = logger
self.slap = slap
self.slap_order = slap_order
self.slap_supply = slap_supply
self.hateoas_navigator = self.slap._hateoas_navigator
self.hosting_subscription_url = None
if url is not None and \
url.startswith(SOFTWARE_PRODUCT_NAMESPACE):
product = SoftwareProductCollection(self._logger, self.slap)
try:
url = product.__getattr__(url[len(SOFTWARE_PRODUCT_NAMESPACE):])
except AttributeError as e:
self._logger.warning('Error on get software release : %s ' % e.message)
self.url = url
@retryOnNetworkFailure
def _supply(self, state):
if self.computer_guid is None:
self._logger.info('Nothing to supply for %s.' % (self.name))
return None
self._logger.info('Supply %s@%s: %s', self.url, self.computer_guid,
state)
return self.slap_supply.supply(self.url, self.computer_guid, state)
@retryOnNetworkFailure
def _request(self, state):
self._logger.info('Request %s@%s: %s', self.url, self.name, state)
self.latest_state = state
return self.slap_order.request(
software_release=self.url,
partition_reference=self.name,
state=state,
**self.request_kw)
def _hateoas_getComputer(self, reference):
root_document = self.hateoas_navigator.getRootDocument()
search_url = root_document["_links"]['raw_search']['href']
getter_link = expand(search_url, {
"query": "reference:%s AND portal_type:Computer" % reference,
"select_list": ["relative_url"],
"limit": 1})
result = self.hateoas_navigator.GET(getter_link)
content_list = json.loads(result)['_embedded']['contents']
if len(content_list) == 0:
raise Exception('No Computer found.')
computer_relative_url = content_list[0]["relative_url"]
getter_url = self.hateoas_navigator.getDocumentAndHateoas(
computer_relative_url)
return json.loads(self.hateoas_navigator.GET(getter_url))
def getSoftwareInstallationList(self):
# XXX Move me to slap.py API
computer = self._hateoas_getComputer(self.computer_guid)
# Not a list ?
action = computer['_links']['action_object_slap']
if action.get('title') == 'getHateoasSoftwareInstallationList':
getter_link = action['href']
else:
raise Exception('No Link found found.')
result = self.hateoas_navigator.GET(getter_link)
return json.loads(result)['_links']['content']
def getSoftwareInstallationNews(self):
for si in self.getSoftwareInstallationList():
if si["title"] == self.url:
getter_link = si["href"]
break
result = self.hateoas_navigator.GET(getter_link)
action_object_slap_list = json.loads(result)['_links']['action_object_slap']
for action in action_object_slap_list:
if action.get('title') == 'getHateoasNews':
getter_link = action['href']
break
else:
raise Exception('getHateoasNews not found.')
result = self.hateoas_navigator.GET(getter_link)
if len(json.loads(result)['news']) > 0:
return json.loads(result)['news'][0]["text"]
return ""
def getInstanceUrlList(self):
if self.hosting_subscription_url is None:
for hs in self.hateoas_navigator._hateoas_getHostingSubscriptionDict():
if hs['title'] == self.name:
self.hosting_subscription_url = hs['href']
break
if self.hosting_subscription_url is None:
return None
return self.hateoas_navigator.getHateoasInstanceList(
self.hosting_subscription_url)
def getNewsFromInstance(self, url):
result = self.hateoas_navigator.GET(url)
result = json.loads(result)
if result['_links'].get('action_object_slap', None) is None:
return None
object_link = self.hateoas_navigator.hateoasGetLinkFromLinks(
result['_links']['action_object_slap'], 'getHateoasNews')
result = self.hateoas_navigator.GET(object_link)
return json.loads(result)['news']
def getInformationFromInstance(self, url):
result = self.hateoas_navigator.GET(url)
result = json.loads(result)
if result['_links'].get('action_object_slap', None) is None:
return None
object_link = self.hateoas_navigator.hateoasGetLinkFromLinks(
result['_links']['action_object_slap'], 'getHateoasInformation')
result = self.hateoas_navigator.GET(object_link)
return json.loads(result)
class SoftwareReleaseTester(SlapOSMasterCommunicator):
deadline = None
latest_state = None
def __init__(self,
name,
logger,
slap,
slap_order,
slap_supply,
url, # software release url
computer_guid=None, # computer for supply if desired
request_kw=None, # instance parameters, if instantiation
# testing is desired
software_timeout=3600,
instance_timeout=3600,
):
super(SoftwareReleaseTester, self).__init__(
slap, slap_supply, slap_order, url, logger)
self.name = name
self.computer_guid = computer_guid
if isinstance(request_kw, str) or \
isinstance(request_kw, unicode):
self.request_kw = json.loads(request_kw)
else:
self.request_kw = request_kw
self.message_history = []
self.state = TESTER_STATE_INITIAL
self.transition_dict = {
# step function
# delay
# next_state
# software_state
# instance_state
TESTER_STATE_INITIAL: (
lambda t: None,
None,
TESTER_STATE_NOTHING,
None,
None,
),
TESTER_STATE_NOTHING: (
lambda t: t._supply("available"),
int(software_timeout),
request_kw is None and TESTER_STATE_INSTANCE_UNINSTALLED or \
TESTER_STATE_SOFTWARE_INSTALLED,
SOFTWARE_STATE_INSTALLED,
None,
),
TESTER_STATE_SOFTWARE_INSTALLED: (
lambda t: t._request("started"),
int(instance_timeout),
TESTER_STATE_INSTANCE_STARTED,
None,
INSTANCE_STATE_STARTED,
),
TESTER_STATE_INSTANCE_STARTED: (
lambda t: t._request("destroyed"),
int(1200),
TESTER_STATE_INSTANCE_UNINSTALLED,
None,
INSTANCE_STATE_STOPPED,
),
TESTER_STATE_INSTANCE_UNINSTALLED: (
lambda t: t._supply("destroyed"),
int(1200),
None,
None,
None,
),
}
def __repr__(self):
deadline = self.deadline
if deadline is not None:
deadline -= time.time()
deadline = '+%is' % (deadline, )
return '<%s(state=%s, deadline=%s) at %x>' % (
self.__class__.__name__, self.state, deadline, id(self))
def getInfo(self):
info = ""
info += "Software Release URL: %s\n" % (self.url)
if self.computer_guid is not None:
info += "Supply requested on: %s\n" % (self.computer_guid)
info += "Instance Requested (Parameters): %s\n" % self.request_kw
return info
def getFormatedLastMessage(self):
if len(self.message_history) == 0:
return "No message"
summary = "Summary about the test. Instance List and Status:\n"
message = "Last information about the tester:\n"
if self.message_history[-1] is not None:
message_list = self.message_history[-1]
for entry in message_list:
summary += "%s %s -> %s\n" % (
entry['title'], entry["slave"] and "(slave)" or "", entry['state'])
for prop in entry:
if prop != "information":
message += "%s = %s\n" % (prop, json.dumps(entry[prop], indent=2))
message += "=== connection_dict === \n%s\n" % (
json.dumps(entry["information"]["connection_dict"], indent=2))
message += "\n"
message += "=== parameter_dict === \n%s\n" % (
json.dumps(entry["information"]["parameter_dict"], indent=2))
message += "\n"
message += "="*79
message += "\n\n\n"
return summary + message
def _getSoftwareState(self):
if self.computer_guid is None:
return SOFTWARE_STATE_INSTALLED
message = self.getSoftwareInstallationNews()
if message.startswith("#error no data found"):
return SOFTWARE_STATE_UNKNOWN
if message.startswith('#access software release'):
return SOFTWARE_STATE_INSTALLED
if message.startswith('#error'):
return SOFTWARE_STATE_INSTALLING
return SOFTWARE_STATE_UNKNOWN
def getRSSEntryFromMonitoring(self, base_url):
if base_url is None:
return {}
feed_url = base_url + '/monitor-public/rssfeed.html'
d = feedparser.parse(feed_url)
if len(d.entries) > 0:
return {"date": d.entries[0].published,
"message": d.entries[0].description,
"title" : d.entries[0].title}
return {}
def _getInstanceState(self):
latest_state = self.latest_state
self._logger.debug('latest_state = %r', latest_state)
if latest_state is None:
return INSTANCE_STATE_UNKNOWN
message_list = []
try:
for instance in self.getInstanceUrlList():
news = self.getNewsFromInstance(instance["href"])
information = self.getInformationFromInstance(instance["href"])
state = INSTANCE_STATE_UNKNOWN
monitor_information_dict = {}
info_created_at = "-1"
is_slave = information['slave']
if is_slave:
self._logger.debug('Instance is slave')
if (information["connection_dict"]) > 0:
state = INSTANCE_STATE_STARTED
else:
# not slave
instance_state = news[0]
if instance_state.get('created_at', '-1') != "-1":
# the following does NOT take TZ into account
created_at = datetime.datetime.strptime(instance_state['created_at'],
'%a, %d %b %Y %H:%M:%S %Z')
gmt_now = datetime.datetime(*time.gmtime()[:6])
info_created_at = '%s (%d)' % (
instance_state['created_at'], (gmt_now - created_at).seconds)
if instance_state['text'].startswith('#access'):
state = INSTANCE_STATE_STARTED
if instance_state['text'].startswith('#access Instance correctly stopped'):
state = INSTANCE_STATE_STOPPED
if instance_state['text'].startswith('#error'):
state = INSTANCE_STATE_STARTED_WITH_ERROR
if state == INSTANCE_STATE_STARTED_WITH_ERROR:
# search for monitor url
monitor_v6_url = information["connection_dict"].get("monitor_v6_url")
try:
monitor_information_dict = self.getRSSEntryFromMonitoring(monitor_v6_url)
except Exception:
self._logger.exception('Unable to download promises for: %s' % (instance["title"]))
self._logger.info(traceback.format_exc())
monitor_information_dict = {"message": "Unable to download"}
self._logger.info('Instance state: %s -> %s' % (instance['title'], state))
self._logger.info('Instance Created at: %s -> %s' % (instance['title'], info_created_at))
message_list.append({
'title': instance["title"],
'slave': is_slave,
'news': news[0],
'information': information,
'monitor': monitor_information_dict,
'state': state
})
except slapos.slap.ServerError:
self._logger.exception('Got an error requesting partition for '
'its state')
return INSTANCE_STATE_UNKNOWN
started = 0
stopped = 0
self.message_history.append(message_list)
for instance in message_list:
if not instance['slave'] and \
instance['state'] in (INSTANCE_STATE_UNKNOWN, INSTANCE_STATE_STARTED_WITH_ERROR):
return instance['state']
elif not instance['slave'] and instance['state'] == INSTANCE_STATE_STARTED:
started = 1
elif not instance['slave'] and instance['state'] == INSTANCE_STATE_STOPPED:
stopped = 1
if instance['slave'] and instance['state'] == INSTANCE_STATE_UNKNOWN:
return instance['state']
if started and stopped:
return INSTANCE_STATE_UNKNOWN
if started:
return INSTANCE_STATE_STARTED
if stopped:
return INSTANCE_STATE_STOPPED
def teardown(self):
"""
Interrupt a running test sequence, putting it in idle state.
"""
self._logger.info('Invoking TearDown for %s@%s' % (self.url, self.name))
if self.request_kw is not None:
self._request('destroyed')
if self.computer_guid is not None:
self._supply('destroyed')
self.state = TESTER_STATE_INSTANCE_UNINSTALLED
def tic(self, now):
"""
Check for missed deadlines (-> test failure), conditions for moving to
next state, and actually moving to next state (executing its payload).
"""
self._logger.debug('TIC')
deadline = self.deadline
if deadline < now and deadline is not None:
raise TestTimeout(self.state)
_, _, next_state, software_state, instance_state = self.transition_dict[
self.state]
if (software_state is None or
software_state == self._getSoftwareState()) and (
instance_state is None or
instance_state == self._getInstanceState()):
self._logger.debug('Going to state %s (%r)', next_state, instance_state)
if next_state is None:
return None
self.state = next_state
stepfunc, delay, _, _, _ = self.transition_dict[next_state]
self.deadline = now + delay
stepfunc(self)
return self.deadline
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment