Commit c44b3723 authored by Cédric de Saint Martin's avatar Cédric de Saint Martin

Merge branch 'agent2'

parents 848a10f0 df9ac291
import ConfigParser
import argparse
import datetime
import httplib
import json
import logging
......@@ -10,9 +11,13 @@ import tempfile
import traceback
import time
import xmlrpclib
import slapos.slap
from slapos.grid.utils import setRunning, setFinished
from erp5.util.taskdistribution import TaskDistributionTool, RPCRetry
from erp5.util.taskdistribution import SAFE_RPC_EXCEPTION_LIST
class AutoSTemp(object):
"""
......@@ -33,19 +38,11 @@ class AutoSTemp(object):
def __del__(self):
self.__unlink(self.__name)
# XXX: scripts should be merged in a single one
GET_DESTROYING_METHOD_ID = \
'Agent_getDestroyingSoftwareReleaseReferenceListOnComputer'
GET_INSTALLED_METHOD_ID = \
'Agent_getInstalledSoftwareReleaseReferenceListOnComputer'
GET_INSTALLING_METHOD_ID = \
'Agent_getInstallingSoftwareReleaseReferenceListOnComputer'
SOFTWARE_STATE_UNKNOWN = -1
SOFTWARE_STATE_INSTALLING = 0
SOFTWARE_STATE_INSTALLED = 1
SOFTWARE_STATE_DESTROYING = 2
GET_PARTITION_STATE_METHOD_ID = 'Agent_getComputerPartitionState'
INSTANCE_STATE_UNKNOWN = -1
INSTANCE_STATE_STARTING = 0
INSTANCE_STATE_STARTED = 1
......@@ -53,22 +50,12 @@ INSTANCE_STATE_STOPPING = 2
INSTANCE_STATE_STOPPED = 3
INSTANCE_STATE_DESTROYING = 4
INSTANCE_STATE_DICT = {
'Looking for a free partition': INSTANCE_STATE_UNKNOWN,
'Started': INSTANCE_STATE_STARTED,
'Start in progress': INSTANCE_STATE_STARTING,
'Stopped': INSTANCE_STATE_STOPPED,
'Stop in progress': INSTANCE_STATE_STOPPING,
'Destruction in progress': INSTANCE_STATE_DESTROYING,
'Destroyed': INSTANCE_STATE_UNKNOWN,
}
TESTER_STATE_INITIAL = -1
TESTER_STATE_NOTHING = 0
TESTER_STATE_SOFTWARE_INSTALLED = 1
TESTER_STATE_INSTANCE_INSTALLED = 2
TESTER_STATE_INSTANCE_STARTED = 4
TESTER_STATE_INSTANCE_UNISTALLED = 5
TESTER_STATE_INSTANCE_UNINSTALLED = 5
class x509Transport(xmlrpclib.Transport):
"""
......@@ -95,6 +82,25 @@ class x509Transport(xmlrpclib.Transport):
class TestTimeout(Exception):
pass
# Simple decorator to prevent raise due small
# network failures.
def retryOnNetworkFailure(func):
def wrapper(*args, **kwargs):
retry_time = 64
while True:
try:
return func(*args, **kwargs)
except SAFE_RPC_EXCEPTION_LIST, e:
print 'Network failure: %s , %s' % (sys.exc_info(), e)
print 'Retry method %s in %i seconds' % (func, retry_time)
time.sleep(retry_time)
retry_time += retry_time >> 1
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
class SoftwareReleaseTester(RPCRetry):
deadline = None
latest_state = None
......@@ -109,7 +115,7 @@ class SoftwareReleaseTester(RPCRetry):
computer_guid, # computer to use for this test run
max_install_duration,
max_uninstall_duration,
request_kw=None, # instance parameters, if instanciation
request_kw=None, # instance parameters, if instantiation
# testing is desired
max_request_duration=None,
max_destroy_duration=None,
......@@ -123,41 +129,46 @@ class SoftwareReleaseTester(RPCRetry):
self.request_kw = request_kw
self.state = TESTER_STATE_INITIAL
self.transition_dict = {
# step function
# delay
# next_state
# software_state
# instance_state
TESTER_STATE_INITIAL: (
None,
lambda t: None,
None,
TESTER_STATE_NOTHING,
None,
None,
),
TESTER_STATE_NOTHING: (
'install',
lambda t: t.install(),
max_install_duration,
request_kw is None and TESTER_STATE_INSTANCE_UNISTALLED or \
request_kw is None and TESTER_STATE_INSTANCE_UNINSTALLED or \
TESTER_STATE_SOFTWARE_INSTALLED,
SOFTWARE_STATE_INSTALLED,
None,
),
TESTER_STATE_SOFTWARE_INSTALLED: (
'request',
lambda t: t.request(),
max_request_duration,
TESTER_STATE_INSTANCE_STARTED,
None,
INSTANCE_STATE_STARTED,
),
TESTER_STATE_INSTANCE_STARTED: (
'destroy',
lambda t: t.destroy(),
max_destroy_duration,
TESTER_STATE_INSTANCE_UNISTALLED,
TESTER_STATE_INSTANCE_UNINSTALLED,
None,
INSTANCE_STATE_UNKNOWN,
),
TESTER_STATE_INSTANCE_UNISTALLED: (
'uninstall',
TESTER_STATE_INSTANCE_UNINSTALLED: (
lambda t: t.uninstall(),
max_uninstall_duration,
None,
SOFTWARE_STATE_UNKNOWN,
None,
INSTANCE_STATE_UNKNOWN,
),
}
......@@ -169,11 +180,13 @@ class SoftwareReleaseTester(RPCRetry):
return '<%s(state=%s, deadline=%s) at %x>' % (
self.__class__.__name__, self.state, deadline, id(self))
@retryOnNetworkFailure
def _supply(self, state):
self._logger.info('Supply %s@%s: %s', self.url, self.computer_guid,
state)
return self.slap_supply.supply(self.url, self.computer_guid, state)
@retryOnNetworkFailure
def _request(self, state):
self._logger.info('Request %s@%s: %s', self.url, self.name, state)
self.latest_state = state
......@@ -185,39 +198,41 @@ class SoftwareReleaseTester(RPCRetry):
)
def _getSoftwareState(self):
# TODO: replace with simpler slap-based API
# TODO: merge all 3 entrypoints into a single, to reduce server load.
for state, method_id in (
(SOFTWARE_STATE_DESTROYING, GET_DESTROYING_METHOD_ID),
(SOFTWARE_STATE_INSTALLED, GET_INSTALLED_METHOD_ID),
(SOFTWARE_STATE_INSTALLING, GET_INSTALLING_METHOD_ID),
):
if self.url in self._retryRPC(method_id, (self.computer_guid,
[self.url])):
return state
return SOFTWARE_STATE_UNKNOWN
return SOFTWARE_STATE_INSTALLED
def _getInstanceState(self):
# TODO: replace with simpler slap-based API
latest_state = self.latest_state
self._logger.debug('latest_state = %r', latest_state)
if latest_state is None:
return INSTANCE_STATE_UNKNOWN
try:
requested = self._request(latest_state)
if requested._computer_id is None:
return INSTANCE_STATE_UNKNOWN
except slapos.slap.ServerError:
self._logger.exception('Got an error requesting partition for '
'its state')
return INSTANCE_STATE_UNKNOWN
part_id = requested.getId()
self._logger.debug('part_id = %r', part_id)
if not part_id:
# Master did not allocate a partition yet.
try:
instance_state = requested.getStatus()
# the following does NOT take TZ into account
created_at = datetime.datetime.strptime(instance_state['created_at'], '%a, %d %b %Y %H:%M:%S %Z')
gmt_now = datetime.datetime(*time.gmtime()[:6])
self._logger.debug('Instance state: ', instance_state)
self._logger.debug('Created at: %s (%d)' % (instance_state['created_at'], (gmt_now - created_at).seconds))
if instance_state['text'].startswith('#error no data found'):
return INSTANCE_STATE_UNKNOWN
elif instance_state['text'].startswith('#access') \
and (gmt_now - created_at).seconds < 300:
return INSTANCE_STATE_STARTED
except slapos.slap.ResourceNotReady:
return INSTANCE_STATE_UNKNOWN
return INSTANCE_STATE_DICT[self._retryRPC(
GET_PARTITION_STATE_METHOD_ID,
(self.computer_guid, part_id)
)]
return INSTANCE_STATE_UNKNOWN
def install(self):
"""
......@@ -254,47 +269,85 @@ class SoftwareReleaseTester(RPCRetry):
"""
Interrupt a running test sequence, putting it in idle state.
"""
self._logger.info('Invoking TearDown for %s@%s' % (self.url, self.name))
if self.request_kw is not None:
self.destroy()
self.uninstall()
self.state = TESTER_STATE_INSTANCE_UNISTALLED
self.state = TESTER_STATE_INSTANCE_UNINSTALLED
def tic(self, now):
"""
Check for missed deadlines (-> test failure), conditions for moving to
next state, and actually moving to next state (executing its payload).
"""
self._logger.debug('TIC')
deadline = self.deadline
if deadline < now and deadline is not None:
raise TestTimeout(self.state)
_, _, state, software_state, instance_state = self.transition_dict[
_, _, next_state, software_state, instance_state = self.transition_dict[
self.state]
if (software_state is None or
software_state == self._getSoftwareState()) and (
instance_state is None or
instance_state == self._getInstanceState()):
if state is None:
return None
self._logger.debug('Going to state %i (%r, %r)', state,
self._logger.debug('Going to state %s (%r, %r)', next_state,
software_state, instance_state)
self.state = state
step, delay, _, _, _ = self.transition_dict[state]
if next_state is None:
return None
self.state = next_state
stepfunc, delay, _, _, _ = self.transition_dict[next_state]
self.deadline = now + delay
getattr(self, step)()
stepfunc(self)
return self.deadline
class TestMap(object):
def __init__(self, test_dict):
self.test_map_dict = {}
for key in test_dict:
target_computer = test_dict[key]["target_computer"]
if target_computer not in self.test_map_dict:
self.test_map_dict[target_computer] = [key]
else:
self.test_map_dict[target_computer].append(key)
def getExcludeList(self, computer_id):
exclude_list = []
for key in self.test_map_dict:
if key != computer_id:
exclude_list.extend(self.test_map_dict[key])
return set(exclude_list)
def getComputerList(self):
return self.test_map_dict.keys()
def dropComputer(self, computer_id):
del self.test_map_dict[computer_id]
def cleanUp(self):
for key in self.test_map_dict.copy():
if len(self.test_map_dict[key]) == 0:
del self.test_map_dict[key]
def getNextComputer(self, used_computer_list):
for computer in self.getComputerList():
if computer not in used_computer_list:
return computer
return None
def main():
"""
Note: This code does not test as much as it monitors.
The goal is to regularily try to build & instanciate a software release
The goal is to regularily try to build & instantiate a software release
on several machines, to monitor vifib stability and SR stability as time
passes (and things once available online become unavailable).
Part of this function could be reused to make an actual test bot, testing
only when actual changes are committed to a software release, to look for
regressions.
Note: This code does not connect to any instanciated service, it relies on
the presence of a promise section to make instanciation fail until promise
Note: This code does not connect to any instantiated service, it relies on
the presence of a promise section to make instantiation fail until promise
is happy.
"""
parser = argparse.ArgumentParser()
......@@ -351,23 +404,31 @@ def main():
configuration.items(section))
for key in ('request_kw', 'max_install_duration',
'max_destroy_duration', 'max_request_duration',
'max_uninstall_duration', 'computer_list',
'max_uninstall_duration', 'computer_list'
):
if key in section_entry_dict:
section_entry_dict[key] = json.loads(
section_entry_dict[key])
try:
if isinstance(section_entry_dict[key], str) or \
isinstance(section_entry_dict[key], unicode):
section_entry_dict[key] = json.loads(
section_entry_dict[key])
except Exception as exc:
logger.error("Fail to load %s on %s" % (key, section_entry_dict))
raise
if 'key' in section_entry_dict:
key_file, cert_file = asFilenamePair(section_entry_dict['key'],
section_entry_dict['cert'])
section_entry_dict['key'] = key_file
section_entry_dict['cert'] = cert_file
if "computer_list" in section_entry_dict:
section_entry_dict["target_computer"] = \
random.choice(section_entry_dict["computer_list"])
agent_parameter_dict = dict(configuration.items('agent'))
# XXX: should node title be auto-generated by installation recipe ?
# For example, using computer guid.
node_title = agent_parameter_dict['node_title']
test_title = agent_parameter_dict['test_title']
project_title = agent_parameter_dict['project_title']
parallel_task_count = int(agent_parameter_dict.get('task_count', 1))
task_distribution_tool = TaskDistributionTool(agent_parameter_dict[
'report_url'])
master_slap_connection_dict = {}
......@@ -379,22 +440,39 @@ def main():
test_title=test_title,
project_title=project_title,
)
test_result.watcher_period = 60
test_result.watcher_period = 300
if log:
test_result.addWatch(log, log_file, max_history_bytes=10000)
assert test_result is not None
test_mapping = TestMap(section_dict)
logger.info("Running %s tests in parallel." % \
len(test_mapping.getComputerList()))
ran_test_set = set()
running_test_dict = {}
more_tests = True
logger.info('Starting Test Agent run %s ' % node_title)
while True:
# Get up to parallel_task_count tasks to execute
while len(running_test_dict) < parallel_task_count and \
more_tests:
while len(running_test_dict) < len(test_mapping.getComputerList())\
and more_tests:
test_mapping.cleanUp()
target_computer = test_mapping.getNextComputer([computer \
for _, _, computer in running_test_dict.itervalues()])
test_line = test_result.start(
exclude_list=list(ran_test_set))
exclude_list= list(ran_test_set) + \
list(test_mapping.getExcludeList(target_computer)))
logger.info("Test Line: %s " % test_line)
logger.info("Ran Test Set: %s " % ran_test_set)
logger.info("Running test dict: %s " % running_test_dict)
logger.info("Target Computer: %s " % target_computer)
if test_line is None:
more_tests = False
break
test_mapping.dropComputer(target_computer)
if len(test_mapping.getComputerList()) == 0:
more_tests = False
continue
test_name = test_line.name
try:
section_entry_dict = section_dict[test_name]
......@@ -431,7 +509,7 @@ def main():
supply,
order,
section_entry_dict['url'],
random.choice(section_entry_dict['computer_list']),
section_entry_dict['target_computer'],
section_entry_dict['max_install_duration'],
section_entry_dict['max_uninstall_duration'],
section_entry_dict.get('request_kw'),
......@@ -439,21 +517,22 @@ def main():
section_entry_dict.get('max_destroy_duration'),
)
ran_test_set.add(test_name)
running_test_dict[test_name] = (test_line, tester)
running_test_dict[test_name] = (test_line, tester, target_computer)
if not running_test_dict:
break
break
now = time.time()
# Synchronise refreshes on watcher period, so it doesn't report a
# stalled test node where we are actually still sleeping.
# Change test_result.watcher_period outside this loop if you wish
# to change sleep duration.
next_deadline = now + test_result.watcher_period
for section, (test_line, tester) in running_test_dict.items():
for section, (test_line, tester, target_computer) in running_test_dict.items():
logger.info('Checking %s: %r...', section, tester)
try:
deadline = tester.tic(now)
except Exception:
logger.exception('test failed')
logger.exception('Test execution fail for %s' % (section))
test_line.stop(
test_count=1,
error_count=1,
......@@ -464,6 +543,11 @@ def main():
del running_test_dict[section]
try:
tester.teardown()
except slapos.slap.NotFoundError:
# This exception is ignored because we cannot
# Teardown if SR URL do not exist.
logger.exception('Fail and not found')
pass
except Exception:
logger.exception('teardown failed, human '
'assistance needed for cleanup')
......@@ -472,7 +556,7 @@ def main():
logger.info('%r', tester)
if deadline is None:
# TODO: report how long each step took.
logger.info('Finished !')
logger.info('Test execution finished for %s' % (section))
test_line.stop(
test_count=1,
error_count=0,
......@@ -480,6 +564,18 @@ def main():
skip_count=0,
)
del running_test_dict[section]
try:
tester.teardown()
except slapos.slap.NotFoundError:
# This exception is ignored because we cannot
# Teardown if SR URL do not exist.
logger.exception('Fail and not found')
pass
except Exception:
logger.exception('teardown failed, human '
'assistance needed for cleanup')
raise
else:
next_deadline = min(deadline, next_deadline)
if running_test_dict:
......@@ -488,7 +584,7 @@ def main():
logger.info('Sleeping %is...', to_sleep)
time.sleep(to_sleep)
if not test_result.isAlive():
for _, tester in running_test_dict.itervalues():
for _, tester, computer_id in running_test_dict.itervalues():
tester.teardown()
finally:
if pidfile:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment