Commit 8c90e61c authored by Sebastien Robin's avatar Sebastien Robin

erp5_test_result: stop affecting last remaining tests to all test nodes

Up to now, once all test result lines in draft were processed,
test result lines already started where affected to all test nodes.
It was designed like this in case the initial affected test node was
unable to finish is work (test node or machine could die for various
reasons). But having a testnode dying should be rare, thus optimisation
should not consider that this happens all the time, even though we
must take into account that this could happen.

This was leading to cases where a testnode, instead of quiting a test
suite to process another was affected a test already affected. So it
happened that we loosed one hour of a testnode while it could do much
more useful work than repeating the work of another testnode.

Thus, consider that testnodes are usually able to process their work,
and make testnodes immediately work on another test suite once all tests
of a test result are started.

Then, run regularly an alarm looking for stuck test to restart them
in order to affect work already affected only when required.

This change should make the system more reactive when things are working
(wich is the majority of time). Not working cases would still finish
to work, but in a less reactive way. If we wait urgently for a test result
and we see that a test is stuck, there is also possibility to unblock
it by hand (if we do not want to wait the alarm).
parent 1cb86461
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Alarm" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>active_sense_method_id</string> </key>
<value> <string>TestResultAlarm_restartStuckTestResult</string> </value>
</item>
<item>
<key> <string>automatic_solve</string> </key>
<value> <int>0</int> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>enabled</string> </key>
<value> <int>1</int> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test_result_alarm_restarted_stuck_test_result</string> </value>
</item>
<item>
<key> <string>periodicity_hour</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_minute</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_minute_frequency</string> </key>
<value> <int>32</int> </value>
</item>
<item>
<key> <string>periodicity_month</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_month_day</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>periodicity_start_date</string> </key>
<value>
<object>
<klass>
<global name="DateTime" module="DateTime.DateTime"/>
</klass>
<tuple>
<none/>
</tuple>
<state>
<tuple>
<float>1451606400.0</float>
<string>GMT</string>
</tuple>
</state>
</object>
</value>
</item>
<item>
<key> <string>periodicity_week</string> </key>
<value>
<tuple/>
</value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Alarm</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string>Restart Stuck Test Results</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
"""
Look for all ongoing tests and check if some lines are in
state "started" since too long. If so, this surely means that
the testnode is dead (like machine was turned off). If so,
we should set back test line in state "draft" so that another
testnode will do the job.
"""
portal = context.getPortalObject()
for test_result in portal.portal_catalog(portal_type="Test Result",
simulation_state="started"):
test_result.getObject().activate(priority=5).TestResult_restartStuckLine()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>TestResultAlarm_restartStuckTestResult</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from DateTime import DateTime
now = DateTime()
# Consider that a test running for more than 3 hours is a stuck
# test. Very long tests are not good, they should be splitted to
# let testnodes work on other test suites. So if we have 3 hours
# it should mean testnode is dead, this test should be restarted
old_date = now-1.0/24*3
if context.getSimulationState() == "started":
for line in context.objectValues(portal_type="Test Result Line"):
if line.getSimulationState() == "started":
history_list = line.Base_getWorkflowHistoryItemList('test_result_workflow', display=0)
history_list.reverse()
for history in history_list:
if history.action == 'start':
if history.time < old_date:
line.redraft()
assert line.getSimulationState() == "draft"
break
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>TestResult_restartStuckLine</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
import json
from time import sleep
from DateTime import DateTime
class TestTaskDistribution(ERP5TypeTestCase):
def afterSetUp(self):
......@@ -143,6 +144,10 @@ class TestTaskDistribution(ERP5TypeTestCase):
self.portal.portal_alarms.task_distributor_alarm_optimize.activeSense()
self.tic()
def _callRestartStuckTestResultAlarm(self):
self.portal.portal_alarms.test_result_alarm_restarted_stuck_test_result.activeSense()
self.tic()
def test_03_startTestSuiteWithOneTestNode(self):
config_list = json.loads(self.distributor.startTestSuite(
title="COMP32-Node1"))
......@@ -202,17 +207,28 @@ class TestTaskDistribution(ERP5TypeTestCase):
set(getTestSuiteList()))
# Check that if test suite 1 and test suite 2 are recently processed,
# then next work must be test suite 3
def processTest(test_title, revision):
def processTest(test_title, revision, start_count=2, stop_count=2):
"""start_count: number of test line to start
stop_count: number of test line to stop
"""
status_dict = {}
test_result_path, revision = self._createTestResult(revision=revision,
test_list=['testFoo', 'testBar'], test_title=test_title)
line_url, test = self.tool.startUnitTest(test_result_path)
next_line_url, next_test = self.tool.startUnitTest(test_result_path)
self.assertEqual(set(['testFoo', 'testBar']), set([test, next_test]))
self.tool.stopUnitTest(line_url, status_dict)
self.tool.stopUnitTest(next_line_url, status_dict)
if start_count:
line_url, test = self.tool.startUnitTest(test_result_path)
if start_count == 2:
next_line_url, next_test = self.tool.startUnitTest(test_result_path)
self.assertEqual(set(['testFoo', 'testBar']), set([test, next_test]))
if stop_count:
self.tool.stopUnitTest(line_url, status_dict)
if stop_count == 2:
self.tool.stopUnitTest(next_line_url, status_dict)
test_result = self.portal.restrictedTraverse(test_result_path)
self.assertEquals(test_result.getSimulationState(), "stopped")
if stop_count == 2:
self.assertEquals(test_result.getSimulationState(), "stopped")
else:
self.assertEquals(test_result.getSimulationState(), "started")
processTest("test suite 1", "r0=a")
self.tic()
sleep(1) # needed because creation date sql value does not record millesecond
......@@ -227,7 +243,7 @@ class TestTaskDistribution(ERP5TypeTestCase):
sleep(1)
self.assertEquals(getTestSuiteList()[0], "test suite 1")
processTest("test suite 1", "r0=c")
# after test suite 2, we now have to process test suite 2
# after test suite 1, we now have to process test suite 2
# since it is the oldest one
self.tic()
sleep(1)
......@@ -235,14 +251,26 @@ class TestTaskDistribution(ERP5TypeTestCase):
processTest("test suite 2", "r0=d")
self.tic()
sleep(1)
# now let's say for any reasyon test suite 1 has been done
# now let's say for any reason test suite 1 has been done
processTest("test suite 1", "r0=e")
self.tic()
sleep(1)
# we should then have by order 3, 2, 1
self.assertEquals(["test suite 3", "test suite 2", "test suite 1"],
getTestSuiteList())
# now launch all test of test 3, even if they are not finished yet
processTest("test suite 3", "r0=f", stop_count=1)
self.tic()
sleep(1)
self.assertEquals(["test suite 2", "test suite 1", "test suite 3"],
getTestSuiteList())
# now launch partially tests of suite 2, it must have priority over
# test 3, even if test 3 is older because all tests of test 3 are ongoing
processTest("test suite 2", "r0=g", start_count=1, stop_count=0)
self.tic()
sleep(1)
self.assertEquals(["test suite 1", "test suite 2", "test suite 3"],
getTestSuiteList())
def _cleanupTestResult(self):
self.tic()
......@@ -290,6 +318,19 @@ class TestTaskDistribution(ERP5TypeTestCase):
self.assertEqual(2, len(line_list))
self.assertEqual(set(['testFoo', 'testBar']), set([x.getTitle() for x
in line_list]))
line_url, test = self.tool.startUnitTest(test_result_path)
result = self._createTestResult(test_list=['testFoo', 'testBar'])
self.assertEqual((test_result_path, revision), result)
next_line_url, next_test = self.tool.startUnitTest(test_result_path)
# all tests of this test suite are now started, stop affecting test node to it
result = self._createTestResult(test_list=['testFoo', 'testBar'])
self.assertEqual(None, result)
# though, is we restart one line, we will have affectation again
self.portal.restrictedTraverse(line_url).redraft()
self.commit()
result = self._createTestResult(test_list=['testFoo', 'testBar'])
self.assertEqual((test_result_path, revision), result)
next_line_url, next_test = self.tool.startUnitTest(test_result_path)
def test_06_startStopUnitTest(self):
"""
......@@ -300,6 +341,9 @@ class TestTaskDistribution(ERP5TypeTestCase):
test_result = self.getPortalObject().unrestrictedTraverse(test_result_path)
line_url, test = self.tool.startUnitTest(test_result_path)
next_line_url, next_test = self.tool.startUnitTest(test_result_path)
# once all tests are affected, stop affecting resources on this test result
next_result = self.tool.startUnitTest(test_result_path)
self.assertEqual(None, next_result)
# first launch, we have no time optimisations, so tests are
# launched in alphabetical order
self.assertEqual(['testBar', 'testFoo'], [test, next_test])
......@@ -327,6 +371,37 @@ class TestTaskDistribution(ERP5TypeTestCase):
next_line_url, next_test = self.tool.startUnitTest(next_test_result_path)
self.assertEqual(['testFoo', 'testBar'], [test, next_test])
def test_06b_restartStuckTest(self):
"""
Check if a test result line is not stuck in 'started', if so, redraft
if with alarm to let opportunity of another test node to work on it
"""
test_result_path, revision = self._createTestResult(
test_list=['testFoo', 'testBar'])
test_result = self.portal.unrestrictedTraverse(test_result_path)
line_url, test = self.tool.startUnitTest(test_result_path)
now = DateTime()
def checkTestResultLine(expected):
line_list = test_result.objectValues(portal_type="Test Result Line")
found_list = [(x.getTitle(), x.getSimulationState()) for x in line_list]
found_list.sort(key=lambda x: x[0])
self.assertEqual(expected, found_list)
checkTestResultLine([('testBar', 'started'), ('testFoo', 'draft')])
self._callRestartStuckTestResultAlarm()
checkTestResultLine([('testBar', 'started'), ('testFoo', 'draft')])
line_url, test = self.tool.startUnitTest(test_result_path)
checkTestResultLine([('testBar', 'started'), ('testFoo', 'started')])
self._callRestartStuckTestResultAlarm()
checkTestResultLine([('testBar', 'started'), ('testFoo', 'started')])
# now let change history to do like if a test result line was started
# a long time ago
line = self.portal.restrictedTraverse(line_url)
for history_line in line.workflow_history["test_result_workflow"]:
if history_line['action'] == 'start':
history_line['time'] = now - 1
self._callRestartStuckTestResultAlarm()
checkTestResultLine([('testBar', 'started'), ('testFoo', 'draft')])
def test_07_reportTaskFailure(self):
test_result_path, revision = self._createTestResult(node_title="Node0")
next_test_result_path, revision = self._createTestResult(node_title="Node1")
......
......@@ -31,6 +31,7 @@
<string>cancel</string>
<string>cancel_action</string>
<string>start</string>
<string>stop</string>
</tuple>
</value>
</item>
......
......@@ -31,6 +31,8 @@
<string>cancel</string>
<string>cancel_action</string>
<string>fail</string>
<string>redraft</string>
<string>redraft_action</string>
<string>stop</string>
</tuple>
</value>
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="TransitionDefinition" module="Products.DCWorkflow.Transitions"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>actbox_category</string> </key>
<value> <string>workflow</string> </value>
</item>
<item>
<key> <string>actbox_icon</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>actbox_name</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>actbox_url</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>after_script_name</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>guard</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>redraft</string> </value>
</item>
<item>
<key> <string>new_state_id</string> </key>
<value> <string>draft</string> </value>
</item>
<item>
<key> <string>script_name</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>trigger_type</string> </key>
<value> <int>2</int> </value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="Guard" module="Products.DCWorkflow.Guard"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>permissions</string> </key>
<value>
<tuple>
<string>View</string>
</tuple>
</value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="TransitionDefinition" module="Products.DCWorkflow.Transitions"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>actbox_category</string> </key>
<value> <string>workflow</string> </value>
</item>
<item>
<key> <string>actbox_icon</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>actbox_name</string> </key>
<value> <string>Redraft Test Result Line</string> </value>
</item>
<item>
<key> <string>actbox_url</string> </key>
<value> <string>%(content_url)s/Base_viewWorkflowActionDialog?workflow_action=%(transition_id)s</string> </value>
</item>
<item>
<key> <string>after_script_name</string> </key>
<value> <string>redraft</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>guard</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>redraft_action</string> </value>
</item>
<item>
<key> <string>new_state_id</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>script_name</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>trigger_type</string> </key>
<value> <int>1</int> </value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="Guard" module="Products.DCWorkflow.Guard"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>expr</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>permissions</string> </key>
<value>
<tuple>
<string>View</string>
</tuple>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="Expression" module="Products.CMFCore.Expression"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>text</string> </key>
<value> <string>python: here.getPortalType() == "Test Result Line"</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
portal_alarms/task_distributor_alarm_optimize
\ No newline at end of file
portal_alarms/task_distributor_alarm_optimize
portal_alarms/test_result_alarm_restarted_stuck_test_result
\ No newline at end of file
......@@ -246,7 +246,7 @@ class ERP5ProjectUnitTestDistributor(XMLObject):
now = DateTime()
from_date = now - 30
def getTestSuiteSortKey(test_suite):
test_result = portal.portal_catalog(portal_type="Test Result",
test_result_list = portal.portal_catalog(portal_type="Test Result",
title=SimpleQuery(title=test_suite.getTitle()),
creation_date=SimpleQuery(
creation_date=from_date,
......@@ -254,8 +254,16 @@ class ERP5ProjectUnitTestDistributor(XMLObject):
),
sort_on=[("modification_date", "descending")],
limit=1)
if len(test_result):
key = test_result[0].getObject().getModificationDate().timeTime()
if len(test_result_list):
test_result = test_result_list[0].getObject()
key = test_result.getModificationDate().timeTime()
# if a test result has all it's tests already ongoing, it is not a
# priority at all to process it, therefore push it at the end of the list
if test_result.getSimulationState() == "started":
result_line_list = test_result.objectValues(portal_type="Test Result Line")
if len(result_line_list):
if len([x for x in result_line_list if x.getSimulationState() == "draft"]) == 0:
key = now.timeTime()
else:
key = random.random()
return key
......
......@@ -143,10 +143,15 @@ class TaskDistributionTool(BaseTool):
last_revision = reference
elif reference:
last_revision = last_revision, reference
if len(test_result.objectValues(portal_type="Test Result Line")) == 0 \
and len(test_name_list):
result_line_list = test_result.objectValues(portal_type="Test Result Line")
result_line_list_len = len(result_line_list)
if result_line_list_len == 0 and len(test_name_list):
test_result.serialize() # prevent duplicate test result lines
createTestResultLineList(test_result, test_name_list)
elif result_line_list_len:
# Do not process test result if all test result lines are already affected
if len([x for x in result_line_list if x.getSimulationState() == 'draft']) == 0:
return
return test_result.getRelativeUrl(), last_revision
if last_state in ('stopped', 'public_stopped'):
if reference_list_string is not None:
......@@ -202,11 +207,6 @@ class TaskDistributionTool(BaseTool):
if state == 'draft':
line.start()
return test
# XXX Make sure we finish all tests.
if state == 'started':
started_list.append(test)
if started_list:
return random.choice(started_list)
security.declarePublic('stopUnitTest')
def stopUnitTest(self, test_path, status_dict):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment