Commit 9bc65393 authored by Rafael Monnerat's avatar Rafael Monnerat

Implement new parallel test approach

The tests runs always 1 test per available computer (no more
no less), this approach prevent pick several tests on the same
machine which leads to timeouts and lack of resources (Disk) on
some machines.
parent e6b66fad
......@@ -309,6 +309,39 @@ class SoftwareReleaseTester(RPCRetry):
getattr(self, step)()
return self.deadline
class TestMap(object):
def __init__(self, test_dict):
self.test_map_dict = {}
for key in test_dict:
target_computer = test_dict[key]["target_computer"]
if target_computer not in self.test_map_dict:
self.test_map_dict[target_computer] = [key]
else:
self.test_map_dict[target_computer].append(key)
def getExcludeList(self, computer_id):
exclude_list = []
for key in self.test_map_dict:
if key != computer_id:
exclude_list.extend(self.test_map_dict[key])
return set(exclude_list)
def getComputerList(self):
return self.test_map_dict.keys()
def cleanUp(self):
for key in self.test_map_dict.copy():
if len(self.test_map_dict[key]) == 0:
del self.test_map_dict[key]
def getNextComputer(self, used_computer_list):
for computer in self.getComputerList():
if computer not in used_computer_list:
return computer
return None
def main():
"""
Note: This code does not test as much as it monitors.
......@@ -387,13 +420,15 @@ def main():
section_entry_dict['cert'])
section_entry_dict['key'] = key_file
section_entry_dict['cert'] = cert_file
if "computer_list" in section_entry_dict:
section_entry_dict["target_computer"] = \
random.choice(section_entry_dict["computer_list"])
agent_parameter_dict = dict(configuration.items('agent'))
# XXX: should node title be auto-generated by installation recipe ?
# For example, using computer guid.
node_title = agent_parameter_dict['node_title']
test_title = agent_parameter_dict['test_title']
project_title = agent_parameter_dict['project_title']
parallel_task_count = int(agent_parameter_dict.get('task_count', 1))
task_distribution_tool = TaskDistributionTool(agent_parameter_dict[
'report_url'])
master_slap_connection_dict = {}
......@@ -409,19 +444,30 @@ def main():
if log:
test_result.addWatch(log, log_file, max_history_bytes=10000)
assert test_result is not None
test_mapping = TestMap(section_dict)
logger.info("Running %s tests in parallel." % \
len(test_mapping.getComputerList()))
ran_test_set = set()
running_test_dict = {}
more_tests = True
logger.info('Starting Test Agent run %s ' % node_title)
while True:
# Get up to parallel_task_count tasks to execute
while len(running_test_dict) < parallel_task_count and \
more_tests:
while len(running_test_dict) < len(test_mapping.getComputerList())\
and more_tests:
test_mapping.cleanUp()
target_computer = test_mapping.getNextComputer([computer \
for _, _, computer in running_test_dict.itervalues()])
test_line = test_result.start(
exclude_list=list(ran_test_set))
exclude_list= list(ran_test_set) + \
list(test_mapping.getExcludeList(target_computer)))
logger.info("Test Line: %s " % test_line)
logger.info("Ran Test Set: %s " % ran_test_set)
logger.info("Running test dict: %s " % running_test_dict)
logger.info("Target Computer: %s " % target_computer)
if test_line is None:
more_tests = False
break
......@@ -461,7 +507,7 @@ def main():
supply,
order,
section_entry_dict['url'],
random.choice(section_entry_dict['computer_list']),
section_entry_dict['target_computer'],
section_entry_dict['max_install_duration'],
section_entry_dict['max_uninstall_duration'],
section_entry_dict.get('request_kw'),
......@@ -469,7 +515,7 @@ def main():
section_entry_dict.get('max_destroy_duration'),
)
ran_test_set.add(test_name)
running_test_dict[test_name] = (test_line, tester)
running_test_dict[test_name] = (test_line, tester, target_computer)
if not running_test_dict:
break
now = time.time()
......@@ -478,7 +524,7 @@ def main():
# Change test_result.watcher_period outside this loop if you wish
# to change sleep duration.
next_deadline = now + test_result.watcher_period
for section, (test_line, tester) in running_test_dict.items():
for section, (test_line, tester, target_computer) in running_test_dict.items():
logger.info('Checking %s: %r...', section, tester)
try:
deadline = tester.tic(now)
......@@ -523,7 +569,7 @@ def main():
logger.info('Sleeping %is...', to_sleep)
time.sleep(to_sleep)
if not test_result.isAlive():
for _, tester in running_test_dict.itervalues():
for _, tester, computer_id in running_test_dict.itervalues():
tester.teardown()
finally:
if pidfile:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment