__init__.py 9.65 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2013 Vifib SARL and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 3
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
#
##############################################################################

import argparse
import json
31
import importlib
32 33
import logging
import os
34 35
import sys
import tempfile
36 37 38
import time
import traceback
from erp5.util import taskdistribution
39 40 41 42
try:
  from erp5.util.testnode import Utils
except ImportError:
  pass
43

44 45 46 47 48 49
def importFrom(name):
  """
  Import a test suite module (in the suites module) and return it.
  """
  return importlib.import_module('.suites.%s' % name, package=__name__)

50
def parseArguments():
51 52 53
  """
  Parse arguments.
  """
54
  parser = argparse.ArgumentParser()
55
  parser.add_argument('--test-result-path', '--test_result_path',
56 57 58 59 60 61 62
                      metavar='ERP5_TEST_RESULT_PATH',
                      help='ERP5 relative path of the test result')

  parser.add_argument('--revision',
                      metavar='REVISION',
                      help='Revision of the test_suite')

63
  parser.add_argument('--test-suite', '--test_suite',
64 65 66
                      metavar='TEST_SUITE',
                      help='Name of the test suite')

67 68 69 70 71
  parser.add_argument('--test-suite-title', '--test_suite_title',
                      metavar='TEST_SUITE',
                      help='The test suite title')

  parser.add_argument('--node-title', '--test_node_title',
72 73 74 75
                      metavar='NODE_TITLE',
                      help='Title of the testnode which is running this'
                            'launcher')

76 77 78 79
  parser.add_argument('--node_quantity', help='Number of parallel tests to run',
                      default=1, type=int)

  parser.add_argument('--test-suite-master-url', '--master_url',
80 81 82
                      metavar='TEST_SUITE_MASTER_URL',
                      help='Url to connect to the ERP5 Master testsuite taskditributor')

83
  parser.add_argument('--log-path', '--log_path',
84 85 86
                      metavar='LOG_PATH',
                      help='Log Path')

87
  parser.add_argument('additional_arguments', nargs=argparse.REMAINDER)
88 89 90

  return parser.parse_args()

91
def setupLogging(name=__name__, log_path=None):
92 93
  logger_format = '%(asctime)s %(name)-13s: %(levelname)-8s %(message)s'
  formatter = logging.Formatter(logger_format)
94 95 96 97 98 99 100 101
  logging.basicConfig(level=logging.INFO, format=logger_format)

  root_logger = logging.getLogger('')
  fd, fname = tempfile.mkstemp()
  file_handler = logging.FileHandler(fname)
  file_handler.setFormatter(formatter)
  root_logger.addHandler(file_handler)

102 103 104 105 106
  if log_path:
    file_handler = logging.handlers.RotatingFileHandler(
        filename=log_path,
        maxBytes=20000000, backupCount=4)
    file_handler.setFormatter(formatter)
107 108 109 110
    root_logger.addHandler(file_handler)

  logger = logging.getLogger(name)
  return logger, fname
111

112
def runTestSuite(test_suite_title, test_suite_arguments, logger):
113 114 115 116 117 118 119 120 121 122 123
  """
  Run a specified test suite, by dynamically loading the module and calling
  its "runTestSuite" method.
  """
  try:
    # Generate the additional arguments that were given using the syntax
    # additionalargument1=value1 additionalargument2=value2
    parsed_arguments = dict(key.split('=') for key in test_suite_arguments)
    test_suite_module = importFrom(test_suite_title)
    success = test_suite_module.runTestSuite(**parsed_arguments)
  except:
124
    logger.exception('Impossible to run resiliency test:')
125 126 127
    success = False
  return success

128
class ScalabilityTest(object):
129 130 131
  """
  Simple structure carrying test data.
  """
132 133 134 135 136 137
  def __init__(self, data, test_result):
    self.__dict__ = {}
    self.__dict__.update(data)
    self.test_result = test_result

class ScalabilityLauncher(object):
138 139 140 141
  """
  Core part of the code, responsible of speaking with the ERP5 testnode Master
  and running tests.
  """
142 143
  def __init__(self):
    self._argumentNamespace = parseArguments()
144 145 146 147 148
    if self._argumentNamespace.log_path:
      log_path = os.path.join(self._argumentNamespace.log_path,
                              'runScalabilityTestSuite.log')
    else:
      log_path = None
149
    logger, fname = setupLogging('runScalabilityTestSuite', log_path)
150
    self.log = logger.info
151 152

    # Proxy to erp5 master test_result
153
    self.test_result = taskdistribution.TestResultProxy(
154
                        self._argumentNamespace.test_suite_master_url,
155
                        1.0, logger,
156 157 158 159 160 161 162 163 164 165
                        self._argumentNamespace.test_result_path,
                        self._argumentNamespace.node_title,
                        self._argumentNamespace.revision
                      )

  def getNextTest(self):
    """
    Return a ScalabilityTest with current running test case informations,
    or None if no test_case ready
    """
166
    data = self.test_result.getRunningTestCase()
167
    if data == None:
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
      return None
    decoded_data = Utils.deunicodeData(json.loads(
                  data
                ))
    next_test = ScalabilityTest(decoded_data, self.test_result)
    return next_test

  def run(self):
    self.log('Resiliency Launcher started, with:')
    self.log('Test suite master url: %s' % self._argumentNamespace.test_suite_master_url)
    self.log('Test suite: %s' % self._argumentNamespace.test_suite)
    self.log('Test result path: %s' % self._argumentNamespace.test_result_path)
    self.log('Revision: %s' % self._argumentNamespace.revision)
    self.log('Node title: %s' % self._argumentNamespace.node_title)

    while True:
      time.sleep(5)
      current_test = self.getNextTest()
      if current_test == None:
        self.log('No Test Case Ready')
      else:
        start_time = time.time()
        error_message_set, exit_status = set(), 0

        proxy = taskdistribution.ServerProxy(
                    self._argumentNamespace.test_suite_master_url,
                    allow_none=True
                ).portal_task_distribution
        retry_time = 2.0
        test_result_line_test = taskdistribution.TestResultLineProxy(
                                  proxy, retry_time, self.log,
                                  current_test.relative_path,
                                  current_test.title
                                )

203 204 205 206 207
        success = runTestSuite(
            self._argumentNamespace.test_suite,
            self._argumentNamespace.additional_arguments,
            self.log,
        )
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222

        if success:
          error_count = 0
        else:
          error_count = 1

        test_duration = time.time() - start_time
        test_result_line_test.stop(stdout='Success',
                        test_count=1,
                        error_count=error_count,
                        duration=test_duration)
        self.log('Test Case Stopped')

    return error_message_set, exit_status

223
def runResiliencyTest():
224 225 226 227 228
  """
  Used for automated test suite run from "Scalability" Test Node infrastructure.
  It means the instance running this code should have been deployed by a
  "Scalability" Testnode.
  """
229 230 231 232 233
  error_message_set, exit_status = ScalabilityLauncher().run()
  for error_message in error_message_set:
    print >>sys.stderr, 'ERROR: %s' % error_message

  sys.exit(exit_status)
234

235 236 237 238 239
def runUnitTest():
  """
  Function meant to be run by "classical" (a.k.a UnitTest) erp5testnode.
  """
  args = parseArguments()
240 241
  logger, fname = setupLogging('runScalabilityTestSuite', None)
  try:
242
    master = taskdistribution.TaskDistributor(args.test_suite_master_url)
243 244 245 246 247
    test_suite_title = args.test_suite_title or args.test_suite
    revision = args.revision

    test_result = master.createTestResult(revision, [test_suite_title],
      args.node_title, True, test_suite_title, 'foo')
248 249 250

    if test_result is None:
      # No test to run.
251
      logger.info("There is no test to run. Exiting...")
252 253
      return

254 255 256 257 258 259 260
    test_line = test_result.start()
    start_time = time.time()

    args.additional_arguments.append('type=UnitTest')
    success = runTestSuite(
        args.test_suite,
        args.additional_arguments,
261
        logger,
262 263 264 265 266 267
    )

    if success:
      error_count = 0
    else:
      error_count = 1
268

269
    test_duration = time.time() - start_time
270 271 272 273 274 275 276 277 278 279 280 281 282 283
    max_size = 4096
    fsize = os.stat(fname).st_size
    with open(fname) as f:
      if fsize <= max_size:
        stdout = f.read()
      else:
        # Read only 2048 bytes from the file. The whole
        # file will be available on the server.
        stdout = f.read(2048)
        stdout += "\n[...] File truncated\n"
        f.seek(-2048, os.SEEK_END)
        stdout += f.read()

    test_line.stop(stdout=stdout,
284 285 286 287 288 289 290
                    test_count=1,
                    error_count=error_count,
                    duration=test_duration)
  except:
    raise
  finally:
    os.remove(fname)