testnode.py 9.15 KB
Newer Older
1 2 3 4 5 6 7
from xml_marshaller import xml_marshaller
import os, xmlrpclib, time, imp
from glob import glob
import signal
import slapos.slap
import subprocess
import sys
Sebastien Robin's avatar
Sebastien Robin committed
8
import socket
9
import pprint
10 11 12 13 14 15 16 17 18 19 20 21 22 23
from SlapOSControler import SlapOSControler


class SubprocessError(EnvironmentError):
  def __init__(self, status_dict):
    self.status_dict = status_dict
  def __getattr__(self, name):
    return self.status_dict[name]
  def __str__(self):
    return 'Error %i' % self.status_code


from Updater import Updater

24
process_group_pid_set = set()
25 26 27
process_pid_file_list = []
process_command_list = []
def sigterm_handler(signal, frame):
28
  for pgpid in process_group_pid_set:
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
    try:
      os.killpg(pgpid, signal.SIGTERM)
    except:
      pass
  for pid_file in process_pid_file_list:
    try:
      os.kill(int(open(pid_file).read().strip()), signal.SIGTERM)
    except:
      pass
  for p in process_command_list:
    try:
      subprocess.call(p)
    except:
      pass
  sys.exit(1)

signal.signal(signal.SIGTERM, sigterm_handler)

def safeRpcCall(function, *args):
  retry = 64
  while True:
    try:
      return function(*args)
    except (socket.error, xmlrpclib.ProtocolError), e:
      print >>sys.stderr, e
      pprint.pprint(args, file(function._Method__name, 'w'))
      time.sleep(retry)
      retry += retry >> 1

58 59 60 61 62 63 64 65 66 67
def getInputOutputFileList(config, command_name)
  stdout = open(os.path.join(
                config['instance_root'],'.%s_out' % command_name),
                'w+')
  stdout.write("%s\n" command_name)
  stderr = open(os.path.join(
                config['instance_root'],'.%s_err' % command_name),
                'w+')
  return (stdout, stderr)

68 69 70 71 72 73 74 75 76 77 78
slapos_controler = None

def run(args):
  config = args[0]
  slapgrid = None
  supervisord_pid_file = os.path.join(config['instance_root'], 'var', 'run',
        'supervisord.pid')
  subprocess.check_call([config['git_binary'],
                "config", "--global", "http.sslVerify", "false"])
  previous_revision = None

79
  run_software = True
80 81 82
  # Write our own software.cfg to use the local repository
  custom_profile_path = os.path.join(config['working_directory'], 'software.cfg')
  config['custom_profile_path'] = custom_profile_path
83 84 85 86 87 88 89 90 91 92 93 94 95
  vcs_repository_list = config['vcs_repository_list']
  profile_content = None
  assert len(vcs_repository_list), "we must have at least one repository"
  for vcs_repository in vcs_repository_list:
    url = vcs_repository['url']
    buildout_section_id = vcs_repository.get('buildout_section_id', None)
    repository_id = buildout_section_id or \
                                  url.split('/')[-1].split('.')[0]
    repository_path = os.path.join(config['working_directory'],repository_id)
    vcs_repository['repository_id'] = repository_id
    vcs_repository['repository_path'] = repository_path
    if profile_content is None:
      profile_content = """
96 97
[buildout]
extends = %(software_config_path)s
98 99 100 101 102
""" %  {'software_config_path': os.path.join(repository_path,
                                          config['profile_path'])}
    if not(buildout_section_id is None):
      profile_content += """
[%(buildout_section_id)s]
103
repository = %(repository_path)s
104 105 106
branch = %(branch)s
""" %  {'buildout_section_id': buildout_section_id,
        'repository_path' : repository_path,
107
        'branch' : vcs_repository.get('branch','master')}
108 109

  custom_profile = open(custom_profile_path, 'w')
110 111
  custom_profile.write(profile_content)
  custom_profile.close()
112 113
  config['repository_path'] = repository_path
  sys.path.append(repository_path)
114
  test_suite_title = config['test_suite_title'] or config['test_suite']
115

116
  retry_software = False
117 118
  try:
    while True:
119
      # kill processes from previous loop if any
120 121 122 123 124 125 126
      try:
        for pgpid in process_group_pid_set:
          try:
            os.killpg(pgpid, signal.SIGTERM)
          except:
            pass
        process_group_pid_set.clear()
127
        full_revision_list = []
128
        # Make sure we have local repository
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
        for vcs_repository in vcs_repository_list:
          repository_path = vcs_repository['repository_path']
          repository_id = vcs_repository['repository_id']
          if not os.path.exists(repository_path):
            parameter_list = [config['git_binary'], 'clone',
                              vcs_repository['url']]
            if vcs_repository.get('branch') is not None:
              parameter_list.extend(['-b',vcs_repository.get('branch')])
            parameter_list.append(repository_path)
            subprocess.check_call(parameter_list)
          # Make sure we have local repository
          updater = Updater(repository_path, git_binary=config['git_binary'])
          updater.checkout()
          revision = "-".join(updater.getRevision())
          full_revision_list.append('%s=%s' % (repository_id, revision))
        revision = ','.join(full_revision_list)
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
        if previous_revision == revision:
          time.sleep(120)
          if not(retry_software):
            continue
        retry_software = False
        previous_revision = revision

        print config
        portal_url = config['test_suite_master_url']
        test_result_path = None
        test_result = (test_result_path, revision)
        if portal_url:
          if portal_url[-1] != '/':
            portal_url += '/'
          portal = xmlrpclib.ServerProxy("%s%s" %
                      (portal_url, 'portal_task_distribution'),
                      allow_none=1)
          master = portal.portal_task_distribution
          assert master.getProtocolRevision() == 1
          test_result = safeRpcCall(master.createTestResult,
Sebastien Robin's avatar
Sebastien Robin committed
165 166
            config['test_suite'], revision, [],
            False, test_suite_title,
167
            config['test_node_title'], config['project_title'])
168 169 170 171
        print "testnode, test_result : %r" % (test_result,)
        if test_result:
          test_result_path, test_revision = test_result
          if revision != test_revision:
172 173 174 175 176 177 178 179
            for i, repository_revision in enumerate(test_revision.split(',')):
              vcs_repository = vcs_repository_list[i]
              repository_path = vcs_repository['repository_path']
              # other testnodes on other boxes are already ready to test another
              # revision
              updater = Updater(repository_path, git_binary=config['git_binary'],
                                revision=repository_revision.split('-')[1])
              updater.checkout()
180

181
          # Now prepare the installation of SlapOS and create instance
182 183
          slapos_controler = SlapOSControler(config,
            process_group_pid_set=process_group_pid_set)
184 185 186 187 188 189 190 191 192 193
          for method_name in ("runSoftwareRelease", "runComputerPartition"):
            stdout, stderr = getInputOutputFileList(config, method_name)
            slapos_method = getattr(slapos_controler, method_name)
            status_dict = slapos_method(config,
              environment=config['environment'],
              process_group_pid_set=process_group_pid_set,
              stdout=stdout, stderr=stderr
              )
            if status_dict['status_code'] != 0:
              break
194 195
          if status_dict['status_code'] != 0:
            safeRpcCall(master.reportTaskFailure,
Sebastien Robin's avatar
Sebastien Robin committed
196
              test_result_path, status_dict, config['test_node_title'])
197 198 199 200 201 202 203 204 205 206 207 208 209
            retry_software = True
            continue

          partition_path = os.path.join(config['instance_root'],
                                        config['partition_reference'])
          run_test_suite_path = os.path.join(partition_path, 'bin',
                                            'runTestSuite')
          if not os.path.exists(run_test_suite_path):
            raise ValueError('No %r provided' % run_test_suite_path)

          run_test_suite_revision = revision
          if isinstance(revision, tuple):
            revision = ','.join(revision)
210 211 212 213 214 215 216 217
          # Deal with Shebang size limitation
          file_object = open(run_test_suite_path, 'r')
          line = file_object.readline()
          file_object.close()
          invocation_list = []
          if line[:2] == '#!':
            invocation_list = line[2:].split()
          invocation_list.extend([run_test_suite_path,
218
                                  '--test_suite', config['test_suite'],
219
                                  '--revision', revision,
220
                                  '--test_suite_title', test_suite_title,
221 222 223
                                  '--node_quantity', config['node_quantity'],
                                  '--master_url', config['test_suite_master_url']])
          run_test_suite = subprocess.Popen(invocation_list)
224 225 226 227 228 229
          process_group_pid_set.add(run_test_suite.pid)
          run_test_suite.wait()
          process_group_pid_set.remove(run_test_suite.pid)
      except SubprocessError:
        time.sleep(120)
        continue
230 231 232 233 234

  finally:
    # Nice way to kill *everything* generated by run process -- process
    # groups working only in POSIX compilant systems
    # Exceptions are swallowed during cleanup phase
235 236
    print "going to kill %r" % (process_group_pid_set,)
    for pgpid in process_group_pid_set:
237 238 239 240 241 242 243 244 245 246
      try:
        os.killpg(pgpid, signal.SIGTERM)
      except:
        pass
    try:
      if os.path.exists(supervisord_pid_file):
        os.kill(int(open(supervisord_pid_file).read().strip()), signal.SIGTERM)
    except:
      pass