Commit 34ae978b authored by Ivan Tyagov's avatar Ivan Tyagov

Add a task distribution tool (currently used for parallel unit test runs).

parent 917d1961
##############################################################################
#
# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved.
# Julien Muchembled <jm@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
import random
from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet, Constraint, interfaces
from Products.ERP5Type.Tool.BaseTool import BaseTool
from zLOG import LOG
from xmlrpclib import Binary
class TaskDistributionTool(BaseTool):
"""
A Task distribution tool (used for ERP5 unit test runs).
"""
id = 'portal_task_distribution'
meta_type = 'ERP5 Task Distribution Tool'
portal_type = 'Task Distribution Tool'
allowed_types = ()
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
def __init__(self, *args, **kw):
BaseTool.__init__(self, *args, **kw)
# XXX Cache information about running test results, because the protocol
# is synchronous and we can't rely on the catalog.
# This is a hack until a better (and asynchronous) protocol is
# implemented.
self.test_result_dict = {}
security.declarePublic('getProtocolRevision')
def getProtocolRevision(self):
"""
"""
return 1
def _getTestResultNode(self, test_result, node_title):
node_list = [x for x in test_result.objectValues(
portal_type='Test Result Node') if x.getTitle() == node_title]
node_list_len = len(node_list)
assert node_list_len in (0, 1)
node = None
if len(node_list):
node = node_list[0]
return node
security.declarePublic('createTestResult')
def createTestResult(self, name, revision, test_name_list, allow_restart,
test_title=None, node_title=None, project_title=None):
"""(temporary)
- name (string)
- revision (string representation of an integer)
- test_name_list (list of strings)
- allow_restart (boolean)
XXX 'revision' should be a string representing the full revision
of the tested code, because some projects are tested with different
revisions of ERP5.
-> (test_result_path, revision) or None if already completed
"""
LOG('createTestResult', 0, (name, revision, test_title, project_title))
self._p_changed = 1
portal = self.getPortalObject()
if test_title is None:
test_title = name
test_result_path, line_dict = self.test_result_dict.get(
test_title, ('', {}))
duration_dict = {}
def createNode(test_result, node_title):
if node_title is not None:
node = self._getTestResultNode(test_result, node_title)
if node is None:
node = test_result.newContent(portal_type='Test Result Node',
title=node_title)
node.start()
def createTestResultLineList(test_result, test_name_list, line_dict):
previous_test_result_list = portal.test_result_module.searchFolder(
title='=%s' % test_result.getTitle(),
sort_on=[('creation_date','descending')],
simulation_state='stopped',
limit=1)
if len(previous_test_result_list):
previous_test_result = previous_test_result_list[0].getObject()
for line in previous_test_result.objectValues():
if line.getSimulationState() == 'stopped':
duration_dict[line.getTitle()] = line.getProperty('duration')
for test_name in test_name_list:
line = test_result.newContent(portal_type='Test Result Line',
title=test_name)
line_dict[line.getId()] = duration_dict.get(test_name)
reference_list_string = None
if type(revision) is str and '=' in revision:
reference_list_string = revision
int_index, reference = None, revision
elif type(revision) is str:
# backward compatibility
int_index, reference = revision, None
else:
# backward compatibility
int_index, reference = revision
test_result = None
if test_result_path:
test_result = portal.unrestrictedTraverse(test_result_path, None)
if test_result is None or test_result.getSimulationState() in \
('cancelled', 'failed'):
del self.test_result_dict[test_title]
line_dict = {}
else:
last_state = test_result.getSimulationState()
last_revision = str(test_result.getIntIndex())
if last_state == 'started':
createNode(test_result, node_title)
reference = test_result.getReference()
if reference_list_string:
last_revision = reference
elif reference:
last_revision = last_revision, reference
if len(line_dict) == 0 and len(test_name_list):
createTestResultLineList(test_result, test_name_list, line_dict)
return test_result_path, last_revision
if last_state == 'stopped':
if reference_list_string is not None:
if reference_list_string == test_result.getReference():
return
elif last_revision == int_index and not allow_restart:
return
test_result = portal.test_result_module.newContent(
portal_type='Test Result',
title=test_title,
reference=reference,
predecessor=test_result_path)
if int_index is not None:
test_result.setIntIndex(int_index)
if project_title is not None:
project_list = portal.portal_catalog(portal_type='Project',
title='="%s"' % project_title)
if len(project_list) == 1:
test_result.setSourceProjectValue(project_list[0].getObject())
else:
raise ValueError('found this list of project : %r for title %r' % \
([x.path for x in project_list], project_title))
else:
# Backward compatibility
project = portal.ERP5Site_getProjectFromTestSuite(name)
test_result.setSourceProjectValue(project)
test_result.updateLocalRolesOnSecurityGroups() # XXX
test_result_path = test_result.getRelativeUrl()
self.test_result_dict[test_title] = test_result_path, line_dict
test_result.start()
createTestResultLineList(test_result, test_name_list, line_dict)
createNode(test_result, node_title)
return test_result_path, revision
security.declarePublic('startUnitTest')
def startUnitTest(self, test_result_path, exclude_list=()):
"""(temporary)
- test_result_path (string)
- exclude_list (list of strings)
-> test_path (string), test_name (string)
or None if finished
"""
portal = self.getPortalObject()
test_result = portal.restrictedTraverse(test_result_path)
if test_result.getSimulationState() != 'started':
return
path, line_dict = self.test_result_dict[test_result.getTitle()]
assert path == test_result_path
started_list = []
for line_id, duration in sorted(line_dict.iteritems(),
key=lambda x: x[1], reverse=1):
line = test_result[line_id]
test = line.getTitle()
if test not in exclude_list:
state = line.getSimulationState()
test = line.getRelativeUrl(), test
if state == 'draft':
line.start()
return test
# XXX Make sure we finish all tests.
if state == 'started':
started_list.append(test)
if started_list:
return random.choice(started_list)
security.declarePublic('stopUnitTest')
def stopUnitTest(self, test_path, status_dict):
"""(temporary)
- test_path (string)
- status_dict (dict)
"""
status_dict = self._extractXMLRPCDict(status_dict)
LOG("TaskDistributionTool.stopUnitTest", 0, repr((test_path,status_dict)))
portal = self.getPortalObject()
line = portal.restrictedTraverse(test_path)
test_result = line.getParentValue()
path, line_dict = self.test_result_dict[test_result.getTitle()]
if test_result.getSimulationState() == 'started':
assert path == test_result.getRelativeUrl()
line_id = line.getId()
if line_id in line_dict:
line.stop(**status_dict)
del line_dict[line_id]
self._p_changed = 1
if not line_dict:
test_result.stop()
def _extractXMLRPCDict(self, xmlrpc_dict):
"""
extract all xmlrpclib.Binary instance
"""
return dict([(x,isinstance(y, Binary) and y.data or y) \
for (x, y) in xmlrpc_dict.iteritems()])
security.declarePublic('reportTaskFailure')
def reportTaskFailure(self, test_result_path, status_dict, node_title):
"""report failure when a node can not handle task
"""
status_dict = self._extractXMLRPCDict(status_dict)
LOG("TaskDistributionTool.reportTaskFailure", 0, repr((test_result_path,
status_dict)))
portal = self.getPortalObject()
test_result = portal.restrictedTraverse(test_result_path)
node = self._getTestResultNode(test_result, node_title)
assert node is not None
node.fail(**status_dict)
for node in test_result.objectValues(portal_type='Test Result Node'):
if node.getSimulationState() != 'failed':
break
else:
test_result.fail()
......@@ -51,7 +51,7 @@ from Tool import CategoryTool, SimulationTool, RuleTool, IdTool, TemplateTool,\
GadgetTool, ContributionRegistryTool, IntrospectionTool,\
AcknowledgementTool, SolverTool, SolverProcessTool,\
ConversionTool, RoundingTool, UrlRegistryTool,\
CertificateAuthorityTool, InotifyTool
CertificateAuthorityTool, InotifyTool, TaskDistributionTool
import ERP5Site
from Document import PythonScript
object_classes = ( ERP5Site.ERP5Site,
......@@ -82,6 +82,7 @@ portal_tools = ( CategoryTool.CategoryTool,
UrlRegistryTool.UrlRegistryTool,
CertificateAuthorityTool.CertificateAuthorityTool,
InotifyTool.InotifyTool,
TaskDistributionTool.TaskDistributionTool,
)
content_classes = ()
content_constructors = ()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment