Blame view

product/ERP5/Tool/TaskDistributionTool.py 15.2 KB
Ivan Tyagov committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
##############################################################################
#
# Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved.
#                    Julien Muchembled <jm@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################

import random
Sebastien Robin committed
30
from DateTime import DateTime
Ivan Tyagov committed
31 32 33
from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet, Constraint, interfaces
from Products.ERP5Type.Tool.BaseTool import BaseTool
Sebastien Robin committed
34
from Products.ZSQLCatalog.SQLCatalog import SimpleQuery, NegatedQuery
Ivan Tyagov committed
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
from zLOG import LOG
from xmlrpclib import Binary

class TaskDistributionTool(BaseTool):
  """
  A Task distribution tool (used for ERP5 unit test runs).
  """

  id = 'portal_task_distribution'
  meta_type = 'ERP5 Task Distribution Tool'
  portal_type = 'Task Distribution Tool'
  allowed_types = ()

  security = ClassSecurityInfo()
  security.declareObjectProtected(Permissions.AccessContentsInformation)

  security.declarePublic('getProtocolRevision')
  def getProtocolRevision(self):
    """
    """
    return 1

Łukasz Nowak committed
57 58 59 60 61 62 63 64 65
  def _getTestNodeRelativeUrl(self, node_title):
    portal = self.getPortalObject()
    test_node_list = portal.portal_catalog(
        portal_type="Test Node",
        title=SimpleQuery(comparison_operator='=', title=node_title),
    )
    if len(test_node_list) == 1:
      return test_node_list[0].getRelativeUrl()

Ivan Tyagov committed
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
  def _getTestResultNode(self, test_result, node_title):
    node_list = [x for x in test_result.objectValues(
       portal_type='Test Result Node') if x.getTitle() == node_title]
    node_list_len = len(node_list)
    assert node_list_len in (0, 1)
    node = None
    if len(node_list):
      node = node_list[0]
    return node

  security.declarePublic('createTestResult')
  def createTestResult(self, name, revision, test_name_list, allow_restart,
                       test_title=None, node_title=None, project_title=None):
    """(temporary)
      - name (string)
      - revision (string representation of an integer)
      - test_name_list (list of strings)
      - allow_restart (boolean)

      XXX 'revision' should be a string representing the full revision
          of the tested code, because some projects are tested with different
          revisions of ERP5.

      -> (test_result_path, revision) or None if already completed
    """
    LOG('createTestResult', 0, (name, revision, test_title, project_title))
    portal = self.getPortalObject()
    if test_title is None:
      test_title = name
    def createNode(test_result, node_title):
      if node_title is not None:
        node = self._getTestResultNode(test_result, node_title)
        if node is None:
Łukasz Nowak committed
99 100 101 102 103
          # Note: specialise might not be set. This is on purpose, in order
          #       to support cases, when client will call createTestResult
          #       without calling subscribeNode before, and this is required
          #       to find Test Node document returned by
          #       _getTestNodeRelativeUrl.
Ivan Tyagov committed
104
          node = test_result.newContent(portal_type='Test Result Node',
Łukasz Nowak committed
105 106 107
                                 title=node_title,
                                 specialise=self._getTestNodeRelativeUrl(
                                   node_title))
Ivan Tyagov committed
108
          node.start()
Sebastien Robin committed
109
    def createTestResultLineList(test_result, test_name_list):
Jérome Perrin committed
110
      test_priority_list = []
Ivan Tyagov committed
111
      previous_test_result_list = portal.test_result_module.searchFolder(
Kazuhiko Shiozaki committed
112
             title=SimpleQuery(comparison_operator='=', title=test_result.getTitle()),
Ivan Tyagov committed
113
             sort_on=[('creation_date','descending')],
Sebastien Robin committed
114
             simulation_state=('stopped', 'public_stopped'),
Ivan Tyagov committed
115 116 117 118
             limit=1)
      if len(previous_test_result_list):
        previous_test_result = previous_test_result_list[0].getObject()
        for line in previous_test_result.objectValues():
Sebastien Robin committed
119
          if line.getSimulationState() in ('stopped', 'public_stopped'):
Jérome Perrin committed
120 121 122 123 124 125 126 127 128 129 130 131
            # Execute first the tests that failed on previous run (so that we
            # can see quickly if a fix was effective) and the slowest tests (to
            # make sure slow tests are executed in parrallel and prevent
            # situations where at the end all test nodes are waiting for the
            # latest to finish).
            test_priority_list.append(
                (line.getStringIndex() == 'PASSED',
                 -line.getProperty('duration'),
                 line.getTitle()))
      sorted_test_list = [x[2] for x in sorted(test_priority_list)]
      # Sort tests by name to have consistent ids for test result line on a
      # test suite.
Jérome Perrin committed
132
      for test_name in sorted(test_name_list):
Sebastien Robin committed
133 134 135 136 137 138
        index = 0
        if sorted_test_list:
          try:
            index = sorted_test_list.index(test_name)
          except ValueError:
            pass
Ivan Tyagov committed
139
        line = test_result.newContent(portal_type='Test Result Line',
Sebastien Robin committed
140 141
                                      title=test_name,
                                      int_index=index)
Ivan Tyagov committed
142 143 144 145 146 147 148 149 150 151
    reference_list_string = None
    if type(revision) is str and '=' in revision:
      reference_list_string = revision
      int_index, reference = None, revision
    elif type(revision) is str:
      # backward compatibility
      int_index, reference = revision, None
    else:
      # backward compatibility
      int_index, reference = revision
Sebastien Robin committed
152 153 154
    catalog_kw = {'portal_type': 'Test Result',
                  'title': SimpleQuery(comparison_operator='=', title=test_title),
                  'sort_on': (("creation_date","descending"),),
Sebastien Robin committed
155
                  'simulation_state': NegatedQuery(SimpleQuery(simulation_state="cancelled")),
Sebastien Robin committed
156 157
                  'limit': 1}
    result_list = portal.test_result_module.searchFolder(**catalog_kw)
Julien Muchembled committed
158 159
    if result_list:
      test_result = result_list[0].getObject()
Julien Muchembled committed
160
      if test_result is not None:
Ivan Tyagov committed
161 162 163 164 165 166 167 168 169
        last_state = test_result.getSimulationState()
        last_revision = str(test_result.getIntIndex())
        if last_state == 'started':
          createNode(test_result, node_title)
          reference = test_result.getReference()
          if reference_list_string:
            last_revision = reference
          elif reference:
            last_revision = last_revision, reference
Sebastien Robin committed
170 171 172
          result_line_list = test_result.objectValues(portal_type="Test Result Line")
          result_line_list_len = len(result_line_list)
          if result_line_list_len == 0 and len(test_name_list):
Julien Muchembled committed
173
            test_result.serialize() # prevent duplicate test result lines
Sebastien Robin committed
174
            createTestResultLineList(test_result, test_name_list)
Sebastien Robin committed
175 176 177 178
          elif result_line_list_len:
            # Do not process test result if all test result lines are already affected
            if len([x for x in result_line_list if x.getSimulationState() == 'draft']) == 0:
              return
Julien Muchembled committed
179
          return test_result.getRelativeUrl(), last_revision
Sebastien Robin committed
180
        if last_state in ('stopped', 'public_stopped'):
Sebastien Robin committed
181 182 183 184
          if not allow_restart:
            if reference_list_string is not None:
              if reference_list_string == test_result.getReference():
                return
Sebastien Robin committed
185 186 187
              # If we are here, latest test result might be an old revision created
              # by hand, then we should not test a newer revision already tested
              catalog_kw['simulation_state'] = ["stopped", "public_stopped"]
Sebastien Robin committed
188 189 190 191 192
              if portal.test_result_module.searchFolder(
                   reference=SimpleQuery(comparison_operator='=', reference=reference_list_string),
                   **catalog_kw):
                return
            if last_revision == int_index:
Ivan Tyagov committed
193 194 195 196 197
              return
    test_result = portal.test_result_module.newContent(
      portal_type='Test Result',
      title=test_title,
      reference=reference,
Julien Muchembled committed
198
      is_indexable=False)
Ivan Tyagov committed
199
    if int_index is not None:
Julien Muchembled committed
200
      test_result._setIntIndex(int_index)
Ivan Tyagov committed
201 202
    if project_title is not None:
      project_list = portal.portal_catalog(portal_type='Project',
Julien Muchembled committed
203 204
        title=SimpleQuery(comparison_operator='=',
          title=project_title.encode('utf-8')))
Julien Muchembled committed
205
      if len(project_list) != 1:
Ivan Tyagov committed
206 207
        raise ValueError('found this list of project : %r for title %r' % \
                      ([x.path for x in project_list], project_title))
Julien Muchembled committed
208
      test_result._setSourceProjectValue(project_list[0].getObject())
Ivan Tyagov committed
209 210
    test_result.updateLocalRolesOnSecurityGroups() # XXX
    test_result.start()
Julien Muchembled committed
211 212 213 214
    del test_result.isIndexable
    test_result.immediateReindexObject()
    self.serialize() # prevent duplicate test result
    # following 2 functions only call 'newContent' on test_result
Sebastien Robin committed
215
    createTestResultLineList(test_result, test_name_list)
Ivan Tyagov committed
216
    createNode(test_result, node_title)
Julien Muchembled committed
217
    return test_result.getRelativeUrl(), revision
Ivan Tyagov committed
218 219

  security.declarePublic('startUnitTest')
Sebastien Robin committed
220
  def startUnitTest(self, test_result_path, exclude_list=(), node_title=None):
Ivan Tyagov committed
221 222 223 224 225 226 227 228 229 230 231 232
    """(temporary)
      - test_result_path (string)
      - exclude_list (list of strings)

      -> test_path (string), test_name (string)
         or None if finished
    """
    portal = self.getPortalObject()
    test_result = portal.restrictedTraverse(test_result_path)
    if test_result.getSimulationState() != 'started':
      return
    started_list = []
Sebastien Robin committed
233 234
    for line in test_result.objectValues(portal_type="Test Result Line",
                                         sort_on=[("int_index","ascending")]):
Ivan Tyagov committed
235 236 237 238 239
      test = line.getTitle()
      if test not in exclude_list:
        state = line.getSimulationState()
        test = line.getRelativeUrl(), test
        if state == 'draft':
Sebastien Robin committed
240 241 242
          if node_title:
            node = self._getTestNodeRelativeUrl(node_title)
            line.setSource(node)
Ivan Tyagov committed
243 244 245 246
          line.start()
          return test

  security.declarePublic('stopUnitTest')
Sebastien Robin committed
247
  def stopUnitTest(self, test_path, status_dict, node_title=None):
Ivan Tyagov committed
248 249 250 251 252 253 254 255 256 257
    """(temporary)
      - test_path (string)
      - status_dict (dict)
    """
    status_dict = self._extractXMLRPCDict(status_dict)
    LOG("TaskDistributionTool.stopUnitTest", 0, repr((test_path,status_dict)))
    portal = self.getPortalObject()
    line = portal.restrictedTraverse(test_path)
    test_result = line.getParentValue()
    if test_result.getSimulationState() == 'started':
Łukasz Nowak committed
258
      if line.getSimulationState() in ["draft", "started"]:
Ivan Tyagov committed
259
        line.stop(**status_dict)
Sebastien Robin committed
260 261 262
      # Check by activity is all lines are finished. Do not check synchrnonously
      # in case another test line is stopped in parallel
      test_result.activate().TestResult_stopIfFinished()
Ivan Tyagov committed
263 264 265 266 267

  def _extractXMLRPCDict(self, xmlrpc_dict):
    """
    extract all xmlrpclib.Binary instance
    """
Julien Muchembled committed
268 269
    return {x: y.data if isinstance(y, Binary) else y
       for x, y in xmlrpc_dict.iteritems()}
Ivan Tyagov committed
270 271 272 273 274 275 276 277 278 279

  security.declarePublic('reportTaskFailure')
  def reportTaskFailure(self, test_result_path, status_dict, node_title):
    """report failure when a node can not handle task
    """
    status_dict = self._extractXMLRPCDict(status_dict)
    LOG("TaskDistributionTool.reportTaskFailure", 0, repr((test_result_path,
                                                          status_dict)))
    portal = self.getPortalObject()
    test_result = portal.restrictedTraverse(test_result_path)
Sebastien Robin committed
280 281 282 283 284 285 286 287 288 289 290 291 292 293
    test_result_node = self._getTestResultNode(test_result, node_title)
    assert test_result_node is not None
    test_result_node.fail(**status_dict)
    # Redraft all test result lines that were affected to that test node
    # to allow immediate reexecution (useful in case of timeout raised
    # by a runTestSuite process)
    for line in test_result.objectValues(portal_type="Test Result Line"):
      if line.getSimulationState() == "started" and line.getSourceTitle() == node_title:
        line.redraft()
    # If all test nodes failed, we would like to cancel the test result, giving
    # opportunity to testnode to start working on a newer version of repository,
    # possibly coming with a fix avoiding current failure
    for test_result_node in test_result.objectValues(portal_type='Test Result Node'):
      if test_result_node.getSimulationState() != 'failed':
Ivan Tyagov committed
294 295
        break
    else:
Sebastien Robin committed
296 297 298 299 300 301
      # now check if we had recent work on test line, if so, this means
      # we might just add timeout due to too much tests to execute for too
      # little nodes. In that case we would like to continue the work later
      recent_time = DateTime() - 1.0/24
      for test_result_line in test_result.objectValues(
          portal_type="Test Result Line"):
Sebastien Robin committed
302
        if test_result_line.getModificationDate() >= recent_time:
Sebastien Robin committed
303
          # do not take into account redrafted lines, this means we already
Sebastien Robin committed
304
          # had issues with them (just one time, since we already redraft above)
Sebastien Robin committed
305 306 307
          if len([x for x in portal.portal_workflow.getInfoFor(
                  ob=test_result_line,
                  name='history',
Sebastien Robin committed
308
                  wf_id='test_result_workflow') if x['action']=='redraft']) <= 1:
Sebastien Robin committed
309
            break
Sebastien Robin committed
310 311 312
      else:
        if test_result.getSimulationState() not in ('failed', 'cancelled'):
          test_result.fail()
Sebastien Robin committed
313 314 315 316 317 318 319 320 321 322 323 324

  security.declarePublic('reportTaskStatus')
  def reportTaskStatus(self, test_result_path, status_dict, node_title):
    """report status of node
    """
    status_dict = self._extractXMLRPCDict(status_dict)
    LOG("TaskDistributionTool.reportTaskStatus", 0, repr((test_result_path,
                                                          status_dict)))
    portal = self.getPortalObject()
    test_result = portal.restrictedTraverse(test_result_path)
    node = self._getTestResultNode(test_result, node_title)
    assert node is not None
Tristan Cavelier committed
325
    node._edit(cmdline=status_dict['command'],
Sebastien Robin committed
326
              stdout=status_dict['stdout'], stderr=status_dict['stderr'])
Sebastien Robin committed
327 328 329 330 331 332 333 334 335

  security.declarePublic('isTaskAlive')
  def isTaskAlive(self, test_result_path):
    """check status of a test suite
    """
    LOG("TaskDistributionTool.checkTaskStatus", 0, repr(test_result_path))
    portal = self.getPortalObject()
    test_result = portal.restrictedTraverse(test_result_path)
    return test_result.getSimulationState() == "started" and 1 or 0
Sebastien Robin committed
336

Kazuhiko Shiozaki committed
337
  security.declareProtected(Permissions.AccessContentsInformation, 'getMemcachedDict')
Sebastien Robin committed
338 339 340 341 342 343
  def getMemcachedDict(self):
    """ Return a dictionary used for non persistent data related to distribution
    """
    portal = self.getPortalObject()
    memcached_dict = portal.portal_memcached.getMemcachedDict(
                            "task_distribution", "default_memcached_plugin")
Sebastien Robin committed
344
    return memcached_dict