From 6c26f831d5c6b68a0f5205a4724eab12addf82da Mon Sep 17 00:00:00 2001
From: Roque <roqueporchetto@gmail.com>
Date: Fri, 16 Mar 2018 21:07:34 +0100
Subject: [PATCH] scalability-benchmark: refactoring and parametrizable test
 duration

- test duration is get from testsuite definition
- refactoring and cleanup in runScalabilityTestSuite and benchmark
- request timeout in metric thread

/reviewed-on https://lab.nexedi.com/nexedi/erp5/merge_requests/607
---
 erp5/util/benchmark/performance_tester.py     |   5 +
 erp5/util/benchmark/process.py                |  33 ++-
 erp5/util/benchmark/thread.py                 |   5 +-
 .../scalability/runScalabilityTestSuite.py    | 206 +++++++-----------
 4 files changed, 102 insertions(+), 147 deletions(-)

diff --git a/erp5/util/benchmark/performance_tester.py b/erp5/util/benchmark/performance_tester.py
index 75155b5e46..f5258a6ab1 100755
--- a/erp5/util/benchmark/performance_tester.py
+++ b/erp5/util/benchmark/performance_tester.py
@@ -113,6 +113,11 @@ class PerformanceTester(object):
                         help='Repeat the benchmark suite N times for a given '
                              'number of users (default: infinite)')
 
+    parser.add_argument('--duration',
+                        type=int,
+                        default=0,
+                        help='Repeat the benchmark suite until time duration is reached')
+
     parser.add_argument('--repeat-range',
                         type=ArgumentType.checkIntValueWrapper(minimum=1),
                         default=-1,
diff --git a/erp5/util/benchmark/process.py b/erp5/util/benchmark/process.py
index e9175daf52..e47d3539ca 100644
--- a/erp5/util/benchmark/process.py
+++ b/erp5/util/benchmark/process.py
@@ -31,6 +31,7 @@ import traceback
 import signal
 import sys
 import socket
+import time
 
 from ..testbrowser.browser import Browser
 from .result import NothingFlushedException
@@ -158,21 +159,29 @@ class BenchmarkProcess(multiprocessing.Process):
     exit_status = 0
     exit_msg = None
 
+    def runIteration(result):
+      self._logger.info("Iteration: %d" % self._current_repeat)
+      self.runBenchmarkSuiteList(result)
+      if not self._current_repeat % REPEAT_NUMBER_BEFORE_FLUSHING:
+        try:
+          result.flush()
+        except NothingFlushedException:
+          pass
+
     try:
       with result_instance as result:
         self._browser = self.getBrowser(result_instance.log_file)
-
-        while self._current_repeat != (self._argument_namespace.repeat + 1):
-          self._logger.info("Iteration: %d" % self._current_repeat)
-          self.runBenchmarkSuiteList(result)
-
-          if not self._current_repeat % REPEAT_NUMBER_BEFORE_FLUSHING:
-            try:
-              result.flush()
-            except NothingFlushedException:
-              pass
-
-          self._current_repeat += 1
+        if self._argument_namespace.duration > 0:
+          self._logger.info("Iterate until duration %d" % self._argument_namespace.duration)
+          start_time = time.time()
+          while self._argument_namespace.duration > (time.time()-start_time):
+            runIteration(result)
+            self._current_repeat += 1
+        else:
+          self._logger.info("Iterate until repeat %d" % self._argument_namespace.repeat)
+          while self._current_repeat != (self._argument_namespace.repeat + 1):
+            runIteration(result)
+            self._current_repeat += 1
 
     except StopIteration, e:
       self._logger.error(e)
diff --git a/erp5/util/benchmark/thread.py b/erp5/util/benchmark/thread.py
index 947660ce74..52cce26b06 100644
--- a/erp5/util/benchmark/thread.py
+++ b/erp5/util/benchmark/thread.py
@@ -56,7 +56,7 @@ class TestMetricThread(threading.Thread):
     while(not self.stop_event.is_set()):
       self.stop_event.wait(-time.time() % self.interval)
       try:
-        response = requests.get(self.metric_url)
+        response = requests.get(self.metric_url, timeout=60)
         if response.status_code == 200:
           self.metric_list.append(response.text)
         else:
@@ -71,6 +71,3 @@ class TestMetricThread(threading.Thread):
 
   def getErrorMessage(self):
     return self.error_message
-
-
-
diff --git a/erp5/util/scalability/runScalabilityTestSuite.py b/erp5/util/scalability/runScalabilityTestSuite.py
index cc26d0ef55..f3f03c25d7 100644
--- a/erp5/util/scalability/runScalabilityTestSuite.py
+++ b/erp5/util/scalability/runScalabilityTestSuite.py
@@ -27,11 +27,12 @@ import datetime
 MAX_INSTALLATION_TIME = 60*50
 MAX_TESTING_TIME = 60
 MAX_GETTING_CONNECTION_TIME = 60*5
-TEST_METRIC_TIME_INTERVAL = 60*3
+TEST_METRIC_TIME_INTERVAL = 60
+SCALABILITY_TEST_DURATION = 10*60
 
 SCALABILITY_LOG_FILENAME = "runScalabilityTestSuite"
 LOG_FILE_PREFIX = "scalability-test"
-MAX_ERRORS = 2
+MAX_ERRORS = 10
 
 class DummyLogger(object):
   def __init__(self, func):
@@ -39,73 +40,6 @@ class DummyLogger(object):
       'critical', 'fatal'):
        setattr(self, name, func)
 
-def getConnection(instance_url, log):
-  """
-  Return a connection with the instance.
-  """
-  start_time = time.time()
-  count = 0
-  while MAX_GETTING_CONNECTION_TIME > time.time()-start_time:
-    try:
-      count = count + 1
-      parsed = urlparse.urlparse(instance_url)
-      host = "%s:%s" % (parsed.hostname, str(parsed.port))
-      if parsed.port is None: host = parsed.hostname
-      if parsed.scheme == 'https':
-        return httplib.HTTPSConnection(host)
-      elif parsed.scheme == 'http':
-        return httplib.HTTPConnection(host)
-      else:
-        raise ValueError("Protocol not implemented")
-    except:
-      log("Can't get connection to %s, we will retry." %instance_url)
-      time.sleep(10)
-  raise ValueError("Cannot get new connection after %d try (for %s s)" %(count, str(time.time()-start_time)))
-
-# TODO: this will be refactored soon
-def waitFor0PendingActivities(instance_url, log):
-  """
-  Waiting while there are no pending activities on the instance.
-  """
-  log("waiting activities for: " + str(instance_url))
-  start_time = time.time()
-  parsed = urlparse.urlparse(instance_url)
-  user = parsed.username;
-  password = parsed.password;
-  header_dict = {'Authorization': 'Basic %s' % \
-  base64.encodestring('%s:%s' % (user, password)).strip()}
-
-  count = 0
-  ok = False
-  while MAX_INSTALLATION_TIME > time.time()-start_time and not ok:
-    zope_connection = getConnection(instance_url, log)
-    try:
-      count = count + 1
-      zope_connection.request(
-        'GET', '/erp5/portal_activities/getMessageList',
-        headers=header_dict
-      )
-      result = zope_connection.getresponse()
-      message_list_text = result.read()
-      message_list = [s.strip() for s in message_list_text[1:-1].split(',')]
-      if len(message_list)==0:
-        log("There is no pending activities.")
-        ok = True
-      #Hack to do not take into account persistent Alarm_installMailServer acitivities
-      if len(message_list)==1 :
-        log("1 pending activity but ok.")
-        ok = True
-
-      log("There is %d pending activities" %len(message_list))
-      time.sleep(5)
-    except Exception as e:
-      time.sleep(5)
-      log("exception: " + str(e))
-      log("Getting activities failed, retry.")
-
-  if not ok:
-    raise ValueError("Cannot waitFor0PendingActivities after %d try (for %s s)" %(count, str(time.time()-start_time)))
-
 class ScalabilityTest(object):
   def __init__(self, data, test_result):
     self.__dict__ = {}
@@ -144,9 +78,8 @@ class ScalabilityLauncher(object):
   def __init__(self):
     self.__argumentNamespace = self._parseArguments(argparse.ArgumentParser(
           description='Run benchmarking scalability suites.'))
-    logger = createLogger(self.__argumentNamespace.log_path)
-    self.log = logger.info
-    self.logger = logger
+    self.logger = createLogger(self.__argumentNamespace.log_path)
+    self.log = self.logger.info
     self.users_file_original_content = []
     # Proxy to with master test_result
     portal_url = self.__argumentNamespace.test_suite_master_url
@@ -297,13 +230,14 @@ class ScalabilityLauncher(object):
     # Prepare configuration
     current_test_number = int(current_test.title)
     test_duration = suite.getTestDuration(current_test_number)
+    if not test_duration or test_duration == 0:
+      test_duration = SCALABILITY_TEST_DURATION
     benchmarks_path = os.path.join(self.__argumentNamespace.repo_location, suite.getTestPath())
     user_file_full_path = os.path.join(self.__argumentNamespace.repo_location, suite.getUsersFilePath())
     user_file_path = os.path.split(user_file_full_path)[0]
     user_file = os.path.split(user_file_full_path)[1]
     tester_path = self.__argumentNamespace.runner_path
     user_quantity = suite.getUserQuantity(current_test_number)
-    repetition = suite.getTestRepetition(current_test_number)
     instance_url = self.__argumentNamespace.instance_url
     metric_url = self.__argumentNamespace.metric_url
 
@@ -312,47 +246,45 @@ class ScalabilityLauncher(object):
     metric_thread = TestMetricThread(metric_url, self.log, metric_thread_stop_event, interval=TEST_METRIC_TIME_INTERVAL)
     metric_thread.start()
 
-    bootstrap_password = self.__argumentNamespace.bootstrap_password
     try:
+      bootstrap_password = self.__argumentNamespace.bootstrap_password
       self.updateUsersFile(user_quantity, bootstrap_password, user_file_full_path + ".py")
     except Exception as e:
       self.log("ERROR while updating file: " + str(e))
 
     now = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
     log_dir = "test-%s_%s" % (current_test.title, now)
-    # the repetition of tests will be refactored soon
-    for i in range(1, repetition+1):
-      self.log("Repetition: %d/%d" %(i, repetition))
-      waitFor0PendingActivities(instance_url, self.log)
-      # Generate commands to run
-      command_list = []
-      user_index = 0
-      for test_suite in test_suite_list:
-        command_list.append([tester_path,
-                             instance_url,
-                             str(user_quantity/len(test_suite_list)),
-                             test_suite,
-                             '--benchmark-path-list', benchmarks_path,
-                             '--users-file-path', user_file_path,
-                             '--users-file', user_file,
-                             '--filename-prefix', "%s_%s_repetition%d_suite_%s" %(LOG_FILE_PREFIX, current_test.title, i, test_suite),
-                             '--report-directory', self.__argumentNamespace.log_path,
-                             '--repeat', "%d"%1,
-                             '--max-errors', str(MAX_ERRORS),
-                             '--user-index', str(user_index),
-                           ])
-        user_index += user_quantity/len(test_suite_list)
-
-      # Launch commands
-      for command in command_list:
-        test_thread = TestThread(process_manager, command, self.log)
-        test_thread.start()
-      # Sleep
-      self.log("Going to sleep for %s seconds (Test duration)." % str(test_duration))
-      time.sleep(test_duration)
-
-      waitFor0PendingActivities(instance_url, self.log)
-      self.moveLogs(log_dir, current_test)
+
+    command_list = []
+    user_index = 0
+    # Prepare commands
+    for test_suite in test_suite_list:
+      log_file_name_prefix = "%s_%s_suite_%s" %(LOG_FILE_PREFIX, current_test.title, test_suite)
+      command_list.append([tester_path,
+                           instance_url,
+                           str(user_quantity/len(test_suite_list)),
+                           test_suite,
+                           '--benchmark-path-list', benchmarks_path,
+                           '--users-file-path', user_file_path,
+                           '--users-file', user_file,
+                           '--filename-prefix', log_file_name_prefix,
+                           '--report-directory', self.__argumentNamespace.log_path,
+                           '--max-errors', str(MAX_ERRORS),
+                           '--user-index', str(user_index),
+                           "--duration", "%d"%test_duration,
+                         ])
+      user_index += user_quantity/len(test_suite_list)
+    # Launch commands
+    exec_env = os.environ.copy()
+    exec_env.update({'raise_error_if_fail': False})
+    for index, command in enumerate(command_list, start=1):
+      test_thread = TestThread(process_manager, command, self.log, env=exec_env)
+      test_thread.start()
+    # Sleep
+    self.log("Going to sleep for %s seconds." % str(test_duration))
+    time.sleep(test_duration)
+    process_manager.killPreviousRun()
+    self.moveLogs(log_dir, current_test)
 
     self.log("Test Case %s has finished" %(current_test.title))
     metric_thread_stop_event.set()
@@ -360,45 +292,57 @@ class ScalabilityLauncher(object):
     metric_list = metric_thread.getMetricList()
     test_output = suite.getScalabilityTestOutput(metric_list)
     if not test_output:
-      self.log("metric list and test output empty. getting metric thread error message.")
-      test_output = metric_thread.getErrorMessage()
-    self.log("test_output: " + str(test_output))
+      metric_error = metric_thread.getErrorMessage()
+      if metric_error:
+        error_message = str(metric_error)
+      else:
+        error_message = "Metric thread couldn't get any metric"
+      error_message_set.add(error_message)
+
+    if error_message_set:
+      self.log("error_message_set: " + str(error_message_set))
 
-    # Send results to master
-    retry_time = 2.0
-    proxy = taskdistribution.ServerProxy(
-                self.__argumentNamespace.test_suite_master_url,
-                allow_none=True
-            ).portal_task_distribution
-    test_result_line_test = taskdistribution.TestResultLineProxy(
-                              proxy, retry_time, self.logger,
-                              current_test.relative_path,
-                              current_test.title
-                            )
     test_details = "number of users=%d\n"\
-                    "number of repetitions=%d\n"\
                     "number of tests=%d\n"\
                     "tests=%s\n"\
                     "duration=%d\n"\
                     %(
                       (user_quantity/len(test_suite_list))*len(test_suite_list),
-                      repetition,
                       len(test_suite_list),
                       '_'.join(test_suite_list),
                       test_duration
                     )
     self.log("Test details: %s" % test_details)
-    self.log("Test output: %s" % test_output)
-    self.log("Stopping the test case...")
+
+    # Send results to master
+    self.log("Connecting to master to set test results...")
     try:
-      test_result_line_test.stop(stdout=test_output,
-                      command=test_details,
-                      test_count=len(test_suite_list),
-                      duration=test_duration)
+      retry_time = 2.0
+      proxy = taskdistribution.ServerProxy(
+                  self.__argumentNamespace.test_suite_master_url,
+                  allow_none=True
+              ).portal_task_distribution
+      test_result_line_test = taskdistribution.TestResultLineProxy(
+                                proxy, retry_time, self.logger,
+                                current_test.relative_path,
+                                current_test.title
+                              )
+      if len(error_message_set):
+        self.log("Test case failed.")
+        error_message = "; ".join(str(error) for error in error_message_set)
+        test_result_line_test.stop(error_count=1, failure_count=1,
+                                   command=test_details,
+                                   stdout=error_message, stderr=error_message)
+      else:
+        self.log("Test case finished. Output: %s" % test_output)
+        test_result_line_test.stop(stdout=test_output,
+                                   command=test_details,
+                                   test_count=len(test_suite_list),
+                                   duration=test_duration)
     except Exception as e:
-      self.log("ERROR stopping test line")
-      self.log(e)
+      self.log("Error during communication to master: " + str(e))
       raise e
+
     self.log("Test Case Stopped")
     self.clearUsersFile(user_file_full_path + ".py")
 
-- 
2.30.9