testnode.py 17.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
##############################################################################
#
# Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly advised to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################
27
from datetime import datetime,timedelta
28
import os
29 30
import subprocess
import sys
31
import time
32
import glob
33
import SlapOSControler
34 35 36
import json
import time
import shutil
37
from ProcessManager import SubprocessError, ProcessManager, CancellationError
38
from subprocess import CalledProcessError
39
from Updater import Updater
40
from erp5.util import taskdistribution
41

42 43
DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep
supervisord_pid_file = None
44

45
PROFILE_PATH_KEY = 'profile_path'
46

47 48 49 50 51
class DummyLogger(object):
  def __init__(self, func):
    for name in ('trace', 'debug', 'info', 'warn', 'warning', 'error',
      'critical', 'fatal'):
       setattr(self, name, func)
52

53
class SlapOSInstance(object):
54

55
  def __init__(self):
56 57
    self.retry_software_count = 0
    self.retry = False
58 59 60

  def edit(self, **kw):
    self.__dict__.update(**kw)
61 62 63 64
    self._checkData()

  def _checkData(self):
    pass
65 66 67

class NodeTestSuite(SlapOSInstance):

68 69
  def __init__(self, reference):
    super(NodeTestSuite, self).__init__()
70
    self.reference = reference
71 72

  def edit(self, **kw):
73
    super(NodeTestSuite, self).edit(**kw)
74

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  def _checkData(self):
    if getattr(self, "working_directory", None) is not None:
      if not(self.working_directory.endswith(os.path.sep + self.reference)):
        self.working_directory = os.path.join(self.working_directory,
                                             self.reference)
      SlapOSControler.createFolder(self.working_directory)
      self.custom_profile_path = os.path.join(self.working_directory,
                                 'software.cfg')
    if getattr(self, "vcs_repository_list", None) is not None:
      for vcs_repository in self.vcs_repository_list:
        buildout_section_id = vcs_repository.get('buildout_section_id', None)
        repository_id = buildout_section_id or \
                        vcs_repository.get('url').split('/')[-1].split('.')[0]
        repository_path = os.path.join(self.working_directory,repository_id)
        vcs_repository['repository_id'] = repository_id
        vcs_repository['repository_path'] = repository_path

92 93 94 95
class TestNode(object):

  def __init__(self, log, config):
    self.log = log
96
    self.config = config or {}
97
    self.process_manager = ProcessManager(log)
98
    self.node_test_suite_dict = {}
99
    # hack until slapos.cookbook is updated
100 101
    if self.config.get('working_directory', '').endswith("slapos/"):
      self.config['working_directory'] = self.config[
102
        'working_directory'][:-(len("slapos/"))] + "testnode"
103

104
  def checkOldTestSuite(self,test_suite_data):
105
    config = self.config
106
    installed_reference_set = set(os.listdir(config['working_directory']))
107
    wished_reference_set = set([x['test_suite_reference'] for x in test_suite_data])
108 109
    to_remove_reference_set = installed_reference_set.difference(
                                 wished_reference_set)
110
    for y in to_remove_reference_set:
111
      fpath = os.path.join(config['working_directory'],y)
112
      self.delNodeTestSuite(y)
113 114 115 116
      if os.path.isdir(fpath):
       shutil.rmtree(fpath)
      else:
       os.remove(fpath)
117

118 119 120
  def getNodeTestSuite(self, reference):
    node_test_suite = self.node_test_suite_dict.get(reference)
    if node_test_suite is None:
121 122
      node_test_suite = NodeTestSuite(reference)
      self.node_test_suite_dict[reference] = node_test_suite
123 124 125
    return node_test_suite

  def delNodeTestSuite(self, reference):
126
    if self.node_test_suite_dict.has_key(reference):
127 128
      self.node_test_suite_dict.pop(reference)

129
  def constructProfile(self, node_test_suite):
130
    config = self.config
131
    profile_content = ''
132
    assert len(node_test_suite.vcs_repository_list), "we must have at least one repository"
133
    profile_path_count = 0
134
    for vcs_repository in node_test_suite.vcs_repository_list:
135 136
      url = vcs_repository['url']
      buildout_section_id = vcs_repository.get('buildout_section_id', None)
137
      repository_path = vcs_repository['repository_path']
138 139 140 141 142 143 144 145 146
      try:
        profile_path = vcs_repository[PROFILE_PATH_KEY]
      except KeyError:
        pass
      else:
        profile_path_count += 1
        if profile_path_count > 1:
          raise ValueError(PROFILE_PATH_KEY + ' defined more than once')
        profile_content = """
147 148
[buildout]
extends = %(software_config_path)s
149
""" %  {'software_config_path': os.path.join(repository_path, profile_path)}
150

151 152
      if not(buildout_section_id is None):
        profile_content += """
153
[%(buildout_section_id)s]
154
repository = %(repository_path)s
155 156
branch = %(branch)s
""" %  {'buildout_section_id': buildout_section_id,
157 158
   'repository_path' : repository_path,
   'branch' : vcs_repository.get('branch','master')}
159 160
    if not profile_path_count:
      raise ValueError(PROFILE_PATH_KEY + ' not defined')
161
    custom_profile = open(node_test_suite.custom_profile_path, 'w')
162 163 164
    custom_profile.write(profile_content)
    custom_profile.close()
    sys.path.append(repository_path)
165

166
  def getAndUpdateFullRevisionList(self, node_test_suite):
167 168 169
    full_revision_list = []
    config = self.config
    log = self.log
170
    for vcs_repository in node_test_suite.vcs_repository_list:
171 172 173 174 175 176 177 178 179 180 181
      repository_path = vcs_repository['repository_path']
      repository_id = vcs_repository['repository_id']
      if not os.path.exists(repository_path):
        parameter_list = [config['git_binary'], 'clone',
                          vcs_repository['url']]
        if vcs_repository.get('branch') is not None:
          parameter_list.extend(['-b',vcs_repository.get('branch')])
        parameter_list.append(repository_path)
        log(subprocess.check_output(parameter_list, stderr=subprocess.STDOUT))
      # Make sure we have local repository
      updater = Updater(repository_path, git_binary=config['git_binary'],
182
         log=log, process_manager=self.process_manager)
183
      updater.checkout()
184 185 186
      revision = "-".join(updater.getRevision())
      full_revision_list.append('%s=%s' % (repository_id, revision))
    node_test_suite.revision = ','.join(full_revision_list)
187 188 189 190 191 192 193 194 195 196 197 198
    return full_revision_list

  def addWatcher(self,test_result):
    config = self.config
    if config.get('log_file'):
     log_file_name = config['log_file']
     log_file = open(log_file_name)
     log_file.seek(0, 2)
     log_file.seek(-min(5000, log_file.tell()), 2)
     test_result.addWatch(log_file_name,log_file,max_history_bytes=10000)
     return log_file_name

199
  def checkRevision(self, test_result, node_test_suite):
200 201
    config = self.config
    log = self.log
202
    if node_test_suite.revision != test_result.revision:
203 204
     log('Disagreement on tested revision, checking out: %r' % (
          (node_test_suite.revision,test_result.revision),))
205
     for i, repository_revision in enumerate(test_result.revision.split(',')):
206
      vcs_repository = node_test_suite.vcs_repository_list[i]
207
      repository_path = vcs_repository['repository_path']
208
      revision = repository_revision.rsplit('-', 1)[1]
209 210
      # other testnodes on other boxes are already ready to test another
      # revision
211
      log('  %s at %s' % (repository_path, node_test_suite.revision))
212
      updater = Updater(repository_path, git_binary=config['git_binary'],
213
                        revision=revision, log=log,
214
                        process_manager=self.process_manager)
215
      updater.checkout()
216
      node_test_suite.revision = test_result.revision
217

218 219 220 221 222 223
  def _prepareSlapOS(self, working_directory, slapos_instance,
          create_partition=1, software_path_list=None, **kw):
    """
    Launch slapos to build software and partitions
    """
    slapproxy_log = os.path.join(self.config['log_directory'],
224
                                  'slapproxy.log')
225 226 227 228
    self.log('Configured slapproxy log to %r' % slapproxy_log)
    reset_software = slapos_instance.retry_software_count > 10
    self.log('testnode, retry_software_count : %r' % \
             slapos_instance.retry_software_count)
229
    self.slapos_controler = SlapOSControler.SlapOSControler(
230 231 232
      working_directory, self.config, log=self.log, slapproxy_log=slapproxy_log,
      process_manager=self.process_manager, reset_software=reset_software,
      software_path_list=software_path_list)
233
    self.process_manager.supervisord_pid_file = os.path.join(\
234
         self.slapos_controler.instance_root, 'var', 'run', 'supervisord.pid')
235 236 237 238
    method_list= ["runSoftwareRelease"]
    if create_partition:
      method_list.append("runComputerPartition")
    for method_name in method_list:
239
      slapos_method = getattr(self.slapos_controler, method_name)
240 241
      status_dict = slapos_method(self.config,
                                  environment=self.config['environment'],
242
                                 )
243
      if status_dict['status_code'] != 0:
244 245
         slapos_instance.retry = True
         slapos_instance.retry_software_count += 1
246 247
         raise SubprocessError(status_dict)
      else:
248
         slapos_instance.retry_software_count = 0
249 250
    return status_dict

251 252 253 254 255
  def prepareSlapOSForTestNode(self, test_node_slapos):
    """
    We will build slapos software needed by the testnode itself,
    like the building of selenium-runner by default
    """
256
    self._prepareSlapOS(self.config['slapos_directory'],
257 258 259 260 261 262
              test_node_slapos, create_partition=0,
              software_path_list=self.config.get("software_list"))

  def prepareSlapOSForTestSuite(self, node_test_suite):
    return self._prepareSlapOS(node_test_suite.working_directory,
              node_test_suite,
263
              software_path_list=[node_test_suite.custom_profile_path])
264

265 266 267 268 269 270 271
  def _dealShebang(self,run_test_suite_path):
    line = open(run_test_suite_path, 'r').readline()
    invocation_list = []
    if line[:2] == '#!':
      invocation_list = line[2:].split()
    return invocation_list

272
  def runTestSuite(self, node_test_suite, portal_url):
273 274
    config = self.config

275
    run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" % \
276
        self.slapos_controler.instance_root)
277 278 279
    if not len(run_test_suite_path_list):
      raise ValueError('No runTestSuite provided in installed partitions.')
    run_test_suite_path = run_test_suite_path_list[0]
280
    run_test_suite_revision = node_test_suite.revision
281 282 283
    # Deal with Shebang size limitation
    invocation_list = self._dealShebang(run_test_suite_path)
    invocation_list.extend([run_test_suite_path,
284 285 286
                           '--test_suite', node_test_suite.test_suite,
                           '--revision', node_test_suite.revision,
                           '--test_suite_title', node_test_suite.test_suite_title,
287 288
                           '--node_quantity', config['node_quantity'],
                           '--master_url', portal_url])
289
    firefox_bin_list = glob.glob("%s/soft/*/parts/firefox/firefox-slapos" % \
290
        config["slapos_directory"])
291 292
    if len(firefox_bin_list):
      invocation_list.extend(["--firefox_bin", firefox_bin_list[0]])
293
    xvfb_bin_list = glob.glob("%s/soft/*/parts/xserver/bin/Xvfb" % \
294
        config["slapos_directory"])
295 296 297 298 299 300 301 302
    if len(xvfb_bin_list):
      invocation_list.extend(["--xvfb_bin", xvfb_bin_list[0]])
    bt5_path_list = config.get("bt5_path")
    if bt5_path_list not in ('', None,):
      invocation_list.extend(["--bt5_path", bt5_path_list])
    # From this point, test runner becomes responsible for updating test
    # result. We only do cleanup if the test runner itself is not able
    # to run.
303
    self.process_manager.spawn(*invocation_list,
304 305 306 307 308
                          cwd=config['test_suite_directory'],
                          log_prefix='runTestSuite', get_output=False)

  def cleanUp(self,test_result):
    log = self.log
309
    log('Testnode.cleanUp')
310
    self.process_manager.killPreviousRun()
311
    if test_result is not None:
312
      try:
313
        test_result.removeWatch(self.config['log_file'])
314 315 316 317 318 319 320 321 322 323
      except KeyError:
        log("KeyError, Watcher already deleted or not added correctly")

  def run(self):
    log = self.log
    config = self.config
    slapgrid = None
    previous_revision_dict = {}
    revision_dict = {}
    test_result = None
324 325
    test_node_slapos = SlapOSInstance()
    test_node_slapos.edit(working_directory=self.config['slapos_directory'])
326 327
    try:
      while True:
328
        try:
329
          self.cleanUp(None)
330
          remote_test_result_needs_cleanup = False
331
          begin = time.time()
332
          self.prepareSlapOSForTestNode(test_node_slapos)
333 334 335 336 337
          portal_url = config['test_suite_master_url']
          portal = taskdistribution.TaskDistributionTool(portal_url, logger=DummyLogger(log))
          test_suite_portal = taskdistribution.TaskDistributor(portal_url, logger=DummyLogger(log))
          test_suite_json =  test_suite_portal.startTestSuite(config['test_node_title'])
          test_suite_data = json.loads(test_suite_json)
338 339
          log("Got following test suite data from master : %r" % \
              (test_suite_data,))
340 341 342
          #Clean-up test suites
          self.checkOldTestSuite(test_suite_data)
          for test_suite in test_suite_data:
343 344
            remote_test_result_needs_cleanup = False
            node_test_suite = self.getNodeTestSuite(
345 346 347
               test_suite["test_suite_reference"])
            node_test_suite.edit(
               working_directory=self.config['working_directory'])
348
            node_test_suite.edit(**test_suite)
349 350
            run_software = True
            # Write our own software.cfg to use the local repository
351
            self.constructProfile(node_test_suite)
352
            # kill processes from previous loop if any
353
            self.process_manager.killPreviousRun()
354
            self.getAndUpdateFullRevisionList(node_test_suite)
355
            # Make sure we have local repository
356 357 358 359
            test_result = portal.createTestResult(node_test_suite.revision, [],
                     config['test_node_title'], False,
                     node_test_suite.test_suite_title,
                     node_test_suite.project_title)
360 361
            remote_test_result_needs_cleanup = True
            log("testnode, test_result : %r" % (test_result, ))
362
            if test_result is not None:
363
              log_file_name = self.addWatcher(test_result)
364
              self.checkRevision(test_result,node_test_suite)
365
              # Now prepare the installation of SlapOS and create instance
366
              status_dict = self.prepareSlapOSForTestSuite(node_test_suite)
367 368 369 370
              # Give some time so computer partitions may start
              # as partitions can be of any kind we have and likely will never have
              # a reliable way to check if they are up or not ...
              time.sleep(20)
371
              self.runTestSuite(node_test_suite,portal_url)
372
              test_result.removeWatch(log_file_name)
373 374
              # break the loop to get latest priorities from master
              break
375
            self.cleanUp(test_result)
376
        except (SubprocessError, CalledProcessError) as e:
377 378 379 380 381 382 383 384 385 386 387
          log("SubprocessError", exc_info=sys.exc_info())
          if test_result is not None:
            test_result.removeWatch(log_file_name)
          if remote_test_result_needs_cleanup:
            status_dict = e.status_dict or {}
            test_result.reportFailure(
              command=status_dict.get('command'),
              stdout=status_dict.get('stdout'),
              stderr=status_dict.get('stderr'),
            )
          continue
388 389 390 391
        except ValueError as e:
          # This could at least happens if runTestSuite is not found
          log("ValueError", exc_info=sys.exc_info())
          node_test_suite.retry_software_count += 1
392 393
        except CancellationError, e:
          log("CancellationError", exc_info=sys.exc_info())
394
          self.process_manager.under_cancellation = False
395
          node_test_suite.retry = True
396 397 398 399
          continue
        except:
            log("erp5testnode exception", exc_info=sys.exc_info())
            raise
400
        now = time.time()
401
        self.cleanUp(test_result)
402
        if (now-begin) < 120:
403 404 405
          sleep_time = 120 - (now-begin)
          log("End of processing, going to sleep %s" % sleep_time)
          time.sleep(sleep_time)
406 407 408
    finally:
      # Nice way to kill *everything* generated by run process -- process
      # groups working only in POSIX compilant systems
409
      # Exceptions are swallowed during cleanup phas
410
      log("GENERAL EXCEPTION, QUITING")
411
      self.cleanUp(test_result)