diff --git a/erp5/util/scalability/runScalabilityTestSuite.py b/erp5/util/scalability/runScalabilityTestSuite.py index 2900f1744da7e7bdab9f6daf335990d95cd7ac25..cc26d0ef554956a087e1a1a070682778fa401be8 100644 --- a/erp5/util/scalability/runScalabilityTestSuite.py +++ b/erp5/util/scalability/runScalabilityTestSuite.py @@ -6,7 +6,6 @@ import shutil import time import sys import multiprocessing -import subprocess import signal import errno import json @@ -16,14 +15,23 @@ import glob import urlparse import httplib import base64 +import threading from erp5.util.benchmark.argument import ArgumentType from erp5.util.benchmark.performance_tester import PerformanceTester +from erp5.util.benchmark.thread import TestThread, TestMetricThread from erp5.util import taskdistribution from erp5.util.testnode import Utils +from erp5.util.testnode.ProcessManager import SubprocessError, ProcessManager, CancellationError +import datetime MAX_INSTALLATION_TIME = 60*50 MAX_TESTING_TIME = 60 MAX_GETTING_CONNECTION_TIME = 60*5 +TEST_METRIC_TIME_INTERVAL = 60*3 + +SCALABILITY_LOG_FILENAME = "runScalabilityTestSuite" +LOG_FILE_PREFIX = "scalability-test" +MAX_ERRORS = 2 class DummyLogger(object): def __init__(self, func): @@ -31,17 +39,18 @@ class DummyLogger(object): 'critical', 'fatal'): setattr(self, name, func) -def getConnection(erp5_url, log): +def getConnection(instance_url, log): """ - Return a connection with the erp5 instance. + Return a connection with the instance. """ start_time = time.time() count = 0 while MAX_GETTING_CONNECTION_TIME > time.time()-start_time: try: count = count + 1 - parsed = urlparse.urlparse(erp5_url) + parsed = urlparse.urlparse(instance_url) host = "%s:%s" % (parsed.hostname, str(parsed.port)) + if parsed.port is None: host = parsed.hostname if parsed.scheme == 'https': return httplib.HTTPSConnection(host) elif parsed.scheme == 'http': @@ -49,16 +58,18 @@ def getConnection(erp5_url, log): else: raise ValueError("Protocol not implemented") except: - log("Can't get connection to %s, we will retry." %erp5_url) + log("Can't get connection to %s, we will retry." %instance_url) time.sleep(10) raise ValueError("Cannot get new connection after %d try (for %s s)" %(count, str(time.time()-start_time))) -def waitFor0PendingActivities(erp5_url, log): +# TODO: this will be refactored soon +def waitFor0PendingActivities(instance_url, log): """ - Waiting while there are no pending activities on the erp5 instance. + Waiting while there are no pending activities on the instance. """ + log("waiting activities for: " + str(instance_url)) start_time = time.time() - parsed = urlparse.urlparse(erp5_url) + parsed = urlparse.urlparse(instance_url) user = parsed.username; password = parsed.password; header_dict = {'Authorization': 'Basic %s' % \ @@ -67,7 +78,7 @@ def waitFor0PendingActivities(erp5_url, log): count = 0 ok = False while MAX_INSTALLATION_TIME > time.time()-start_time and not ok: - zope_connection = getConnection(erp5_url, log) + zope_connection = getConnection(instance_url, log) try: count = count + 1 zope_connection.request( @@ -87,54 +98,14 @@ def waitFor0PendingActivities(erp5_url, log): log("There is %d pending activities" %len(message_list)) time.sleep(5) - except: + except Exception as e: time.sleep(5) + log("exception: " + str(e)) log("Getting activities failed, retry.") if not ok: raise ValueError("Cannot waitFor0PendingActivities after %d try (for %s s)" %(count, str(time.time()-start_time))) - -def getCreatedDocumentNumberFromERP5(erp5_url, log): - """ - Get the number of created documents from erp5 instance. - """ - log("count docs number from ERP5 instance") - count_retry = 0 - parsed = urlparse.urlparse(erp5_url) - user = 'zope' - password = 'insecure' - header_dict = {'Authorization': 'Basic %s' % \ - base64.encodestring('%s:%s' % (user, password)).strip()} - zope_connection = getConnection(erp5_url, log) - while count_retry < 100 : - try: - zope_connection.request( - 'GET', '/erp5/count_docs_scalability', - headers=header_dict - ) - result = zope_connection.getresponse() - return int(result.read()) - except: - log("retry..") - count_retry += 1 - time.sleep(15) - raise ValueError("Impossible to get number of docs from ERP5") - - -# XXX: This import is required, just to populate sys.modules['test_suite']. -# Even if it's not used in this file. Yuck. -import product.ERP5Type.tests.ERP5TypeTestSuite - - -from subprocess import call - -LOG_FILE_PREFIX = "performance_tester_erp5" -# Duration of a test case -TEST_CASE_DURATION = 60 -# Maximum limit of documents to create during a test case -MAX_DOCUMENTS = 100000 - class ScalabilityTest(object): def __init__(self, data, test_result): self.__dict__ = {} @@ -144,52 +115,45 @@ class ScalabilityTest(object): def doNothing(**kwargs): pass -def makeSuite(test_suite=None, log=doNothing, **kwargs): - # BBB tests (plural form) is only checked for backward compatibility - for k in sys.modules.keys(): - if k in ('tests', 'test',) or k.startswith('tests.') or k.startswith('test.'): - del sys.modules[k] - singular_succeed = True - while True: - module_name, class_name = ('%s.%s' % (singular_succeed and 'test' or 'tests', - test_suite)).rsplit('.', 1) - try: - suite_class = getattr(__import__(module_name, None, None, [class_name]), - class_name) - except (AttributeError, ImportError): - if not singular_succeed: - raise - singular_succeed = False - else: - break - suite = suite_class(max_instance_count=1, **kwargs) +def makeSuite(test_suite=None, location=None, log=doNothing, **kwargs): + import imp + try: + module = imp.load_source('scalability_test', location + '__init__.py') + suite_class = getattr(module, test_suite) + suite = suite_class(**kwargs) + except Exception as e: + log("[ERROR] While making suite: " + str(e)) + raise return suite +def createLogger(log_path): + log_path = os.path.join(log_path, SCALABILITY_LOG_FILENAME + ".log") + logger_format = '%(asctime)s %(name)-13s: %(levelname)-8s %(message)s' + formatter = logging.Formatter(logger_format) + logging.basicConfig(level=logging.INFO, + format=logger_format) + logger = logging.getLogger(SCALABILITY_LOG_FILENAME) + file_handler = logging.handlers.RotatingFileHandler( + filename=log_path, + maxBytes=20000000, backupCount=4) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + return logger class ScalabilityLauncher(object): def __init__(self): - # Parse arguments self.__argumentNamespace = self._parseArguments(argparse.ArgumentParser( - description='Run ERP5 benchmarking scalability suites.')) - # Create Logger - log_path = os.path.join(self.__argumentNamespace.log_path, - "runScalabilityTestSuite.log") - logger_format = '%(asctime)s %(name)-13s: %(levelname)-8s %(message)s' - formatter = logging.Formatter(logger_format) - logging.basicConfig(level=logging.INFO, - format=logger_format) - logger = logging.getLogger('runScalabilityTestSuite') - logger.addHandler(logging.NullHandler()) - file_handler = logging.handlers.RotatingFileHandler( - filename=log_path, - maxBytes=20000000, backupCount=4) - file_handler.setFormatter(formatter) - logger.addHandler(file_handler) + description='Run benchmarking scalability suites.')) + logger = createLogger(self.__argumentNamespace.log_path) self.log = logger.info - - # Proxy to with erp5 master test_result + self.logger = logger + self.users_file_original_content = [] + # Proxy to with master test_result + portal_url = self.__argumentNamespace.test_suite_master_url + distributor = taskdistribution.TaskDistributor(portal_url, logger=DummyLogger(self.log)) + self.log(self.__argumentNamespace.test_suite_master_url) self.test_result = taskdistribution.TestResultProxy( - self.__argumentNamespace.test_suite_master_url, + distributor, 1.0, DummyLogger(self.log), self.__argumentNamespace.test_result_path, self.__argumentNamespace.node_title, @@ -198,18 +162,26 @@ class ScalabilityLauncher(object): @staticmethod def _addParserArguments(parser): # Mandatory arguments - parser.add_argument('--erp5-url', - metavar='ERP5_URL', - help='Main url of ERP5 instance to test') + parser.add_argument('--instance-url', + metavar='INSTANCE_URL', + help='Main url of instance to test') + + parser.add_argument('--bootstrap-password', + metavar='BOOTSRAP_PASSWORD', + help='Bootstrap password of instance objects') parser.add_argument('--test-result-path', - metavar='ERP5_TEST_RESULT_PATH', - help='ERP5 relative path of the test result') + metavar='TEST_RESULT_PATH', + help='Relative path of the test result') parser.add_argument('--revision', metavar='REVISION', help='Revision of the test_suite') + parser.add_argument('--current-test-data', + metavar='CURRENT_TEST_DATA', + help='Data of the current test') + parser.add_argument('--test-suite', metavar='TEST_SUITE', help='Name of the test suite') @@ -221,20 +193,24 @@ class ScalabilityLauncher(object): parser.add_argument('--test-suite-master-url', metavar='TEST_SUITE_MASTER_URL', - help='Url to connect to the ERP5 Master testsuite taskditributor') + help='Url to connect to the Master testsuite taskditributor') parser.add_argument('--log-path', metavar='LOG_PATH', help='Log Path') - parser.add_argument('--erp5-location', - metavar='ERP5_LOCATION', - help='Path to erp5 depository') + parser.add_argument('--repo-location', + metavar='REPO_LOCATION', + help='Path to repository') parser.add_argument('--runner-path', - metavar='Runner_PATH', + metavar='RUNNER_PATH', help='runner Path') + parser.add_argument('--metric-url', + metavar='METRIC_URL', + help='Url to connect to instance metric generator') + @staticmethod def _checkParsedArguments(namespace): return namespace @@ -246,31 +222,54 @@ class ScalabilityLauncher(object): ScalabilityLauncher._checkParsedArguments(namespace) return namespace - def moveLogs(self, folder_name): - # Get file paths + def moveLogs(self, folder_name, current_test): file_to_move_list = glob.glob(os.path.join(self.__argumentNamespace.log_path, - "%s*.csv" %LOG_FILE_PREFIX)) + "%s*.csv" %LOG_FILE_PREFIX)) file_to_move_list += glob.glob(os.path.join(self.__argumentNamespace.log_path, - "%s*.log" %LOG_FILE_PREFIX)) - # Create folder - new_directory_path = os.path.join(self.__argumentNamespace.log_path, - folder_name) - if not os.path.exists(new_directory_path): os.makedirs(new_directory_path) - # Move files + "%s*.log" %LOG_FILE_PREFIX)) + root_test_dir = os.path.join(self.__argumentNamespace.log_path, + "scalability-test-%s/" % current_test.relative_path.split("/")[1]) + if not os.path.exists(root_test_dir): + os.makedirs(root_test_dir) + new_directory_path = os.path.join(root_test_dir, + folder_name) + if not os.path.exists(new_directory_path): + os.makedirs(new_directory_path) for file_to_move in file_to_move_list: shutil.move(file_to_move, new_directory_path) def getRunningTest(self): """ - Return a ScalabilityTest with current running test case informations, - or None if no test_case ready + Return a ScalabilityTest with current running test case informations """ - data = self.test_result.getRunningTestCase() - if not data: - return None + data_array = self.__argumentNamespace.current_test_data.split(',') + data = json.dumps({"count": data_array[0], "title": data_array[1], "relative_path": data_array[2]}) decoded_data = Utils.deunicodeData(json.loads(data)) return ScalabilityTest(decoded_data, self.test_result) + def clearUsersFile(self, user_file_path): + self.log("Clearing users file: %s" % user_file_path) + os.remove(user_file_path) + users_file = open(user_file_path, "w") + for line in self.users_file_original_content: + users_file.write(line) + users_file.close() + + def updateUsersFile(self, user_quantity, password, user_file_path): + self.log("Updating users file: %s" % user_file_path) + users_file = open(user_file_path, "r") + file_content = users_file.readlines() + self.users_file_original_content = file_content + new_file_content = [] + for line in file_content: + new_file_content.append(line.replace('<password>', password).replace('<user_quantity>', str(user_quantity))) + users_file.close() + os.remove(user_file_path) + users_file = open(user_file_path, "w") + for line in new_file_content: + users_file.write(line) + users_file.close() + def run(self): self.log("Scalability Launcher started, with:") self.log("Test suite master url: %s" %self.__argumentNamespace.test_suite_master_url) @@ -278,140 +277,136 @@ class ScalabilityLauncher(object): self.log("Test result path: %s" %self.__argumentNamespace.test_result_path) self.log("Revision: %s" %self.__argumentNamespace.revision) self.log("Node title: %s" %self.__argumentNamespace.node_title) - self.log("ERP5 url: %s" %self.__argumentNamespace.erp5_url) + self.log("Instance url: %s" %self.__argumentNamespace.instance_url) error_message_set, exit_status = set(), 0 + process_manager = ProcessManager(self.log) # Get suite informations - suite = makeSuite(self.__argumentNamespace.test_suite, self.log) + suite = makeSuite(self.__argumentNamespace.test_suite, self.__argumentNamespace.repo_location, self.log) test_suite_list = suite.getTestList() - # Main loop - while True: - - # Loop for getting new test case + try: current_test = self.getRunningTest() - while not current_test: - time.sleep(15) - current_test = self.getRunningTest() - self.log("Test Case %s going to be run." %(current_test.title)) - - # Prepare configuration - current_test_number = int(current_test.title) - test_duration = suite.getTestDuration(current_test_number) - benchmarks_path = os.path.join(self.__argumentNamespace.erp5_location, suite.getTestPath()) - user_file_full_path = os.path.join(self.__argumentNamespace.erp5_location, suite.getUsersFilePath()) - user_file_path = os.path.split(user_file_full_path)[0] - user_file = os.path.split(user_file_full_path)[1] - tester_path = self.__argumentNamespace.runner_path - user_number = suite.getUserNumber(current_test_number) - repetition = suite.getTestRepetition(current_test_number) - - self.log("user_number: %s" %str(user_number)) - self.log("test_duration: %s seconds" %str(test_duration)) - - # Store the number of documents generated for each iteration - document_number = [] - - # Repeat the same test several times to accurate test result - for i in range(1, repetition+1): - self.log("Repetition: %d/%d" %(i, repetition)) - - # Get the number of documents present before running the test. - waitFor0PendingActivities(self.__argumentNamespace.erp5_url, self.log) - previous_document_number = getCreatedDocumentNumberFromERP5(self.__argumentNamespace.erp5_url, self.log) - self.log("previous_document_number: %d" %previous_document_number) - - # Generate commands to run - command_list = [] - user_index = 0 - for test_suite in test_suite_list: - command_list.append([tester_path, - self.__argumentNamespace.erp5_url, - str(user_number/len(test_suite_list)), - test_suite, - '--benchmark-path-list', benchmarks_path, - '--users-file-path', user_file_path, - '--users-file', user_file, - '--filename-prefix', "%s_%s_repetition%d" %(LOG_FILE_PREFIX, current_test.title, i), - '--report-directory', self.__argumentNamespace.log_path, - '--repeat', "%s" %str(MAX_DOCUMENTS), - '--max-errors', str(1000000), - '--user-index', str(user_index), - ]) - user_index += user_number/len(test_suite_list) - - # Launch commands - tester_process_list = [] - for command in command_list: - self.log("command: %s" %str(command)) - tester_process_list.append(subprocess.Popen(command)) - - # Sleep - time.sleep(test_duration) - - # Stop - for tester_process in tester_process_list: - tester_process.send_signal(signal.SIGINT) - self.log("End signal sent to the tester.") - - # Count created documents - # Wait for 0 pending activities before counting - waitFor0PendingActivities(self.__argumentNamespace.erp5_url, self.log) - current_document_number = getCreatedDocumentNumberFromERP5(self.__argumentNamespace.erp5_url, self.log) - created_document_number = current_document_number - previous_document_number - self.log("previous_document_number: %d" %previous_document_number) - self.log("current_document_number: %d" %current_document_number) - self.log("created_document_number: %d" %created_document_number) - document_number.append(created_document_number) - # Move csv/logs - self.moveLogs(current_test.title) - - self.log("Test Case %s is finish" %(current_test.title)) - - # Get the maximum as choice - maximum = 0 - for i in range(0,len(document_number)): - if document_number[i] > maximum: - maximum = document_number[i] - - # Send results to ERP5 master - retry_time = 2.0 - proxy = taskdistribution.ServerProxy( - self.__argumentNamespace.test_suite_master_url, - allow_none=True - ).portal_task_distribution - test_result_line_test = taskdistribution.TestResultLineProxy( - proxy, retry_time, self.log, - current_test.relative_path, - current_test.title - ) - results = "created docs=%d\n"\ - "duration=%d\n"\ - "number of tests=%d\n"\ - "number of users=%d\n"\ - "tests=%s\n"\ - %( - maximum, - test_duration, - len(test_suite_list), - (user_number/len(test_suite_list))*len(test_suite_list), - '_'.join(test_suite_list) - ) - self.log("Results: %s" %results) - test_result_line_test.stop(stdout=results, + except Exception as e: + error_message = "ERROR while getting current running test: " + str(e) + self.log(error_message) + return error_message, 1 + + self.log("Test Case %s going to be run." %(current_test.title)) + # Prepare configuration + current_test_number = int(current_test.title) + test_duration = suite.getTestDuration(current_test_number) + benchmarks_path = os.path.join(self.__argumentNamespace.repo_location, suite.getTestPath()) + user_file_full_path = os.path.join(self.__argumentNamespace.repo_location, suite.getUsersFilePath()) + user_file_path = os.path.split(user_file_full_path)[0] + user_file = os.path.split(user_file_full_path)[1] + tester_path = self.__argumentNamespace.runner_path + user_quantity = suite.getUserQuantity(current_test_number) + repetition = suite.getTestRepetition(current_test_number) + instance_url = self.__argumentNamespace.instance_url + metric_url = self.__argumentNamespace.metric_url + + # To take metrics + metric_thread_stop_event = threading.Event() + metric_thread = TestMetricThread(metric_url, self.log, metric_thread_stop_event, interval=TEST_METRIC_TIME_INTERVAL) + metric_thread.start() + + bootstrap_password = self.__argumentNamespace.bootstrap_password + try: + self.updateUsersFile(user_quantity, bootstrap_password, user_file_full_path + ".py") + except Exception as e: + self.log("ERROR while updating file: " + str(e)) + + now = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") + log_dir = "test-%s_%s" % (current_test.title, now) + # the repetition of tests will be refactored soon + for i in range(1, repetition+1): + self.log("Repetition: %d/%d" %(i, repetition)) + waitFor0PendingActivities(instance_url, self.log) + # Generate commands to run + command_list = [] + user_index = 0 + for test_suite in test_suite_list: + command_list.append([tester_path, + instance_url, + str(user_quantity/len(test_suite_list)), + test_suite, + '--benchmark-path-list', benchmarks_path, + '--users-file-path', user_file_path, + '--users-file', user_file, + '--filename-prefix', "%s_%s_repetition%d_suite_%s" %(LOG_FILE_PREFIX, current_test.title, i, test_suite), + '--report-directory', self.__argumentNamespace.log_path, + '--repeat', "%d"%1, + '--max-errors', str(MAX_ERRORS), + '--user-index', str(user_index), + ]) + user_index += user_quantity/len(test_suite_list) + + # Launch commands + for command in command_list: + test_thread = TestThread(process_manager, command, self.log) + test_thread.start() + # Sleep + self.log("Going to sleep for %s seconds (Test duration)." % str(test_duration)) + time.sleep(test_duration) + + waitFor0PendingActivities(instance_url, self.log) + self.moveLogs(log_dir, current_test) + + self.log("Test Case %s has finished" %(current_test.title)) + metric_thread_stop_event.set() + time.sleep(15) # wait thread to stop + metric_list = metric_thread.getMetricList() + test_output = suite.getScalabilityTestOutput(metric_list) + if not test_output: + self.log("metric list and test output empty. getting metric thread error message.") + test_output = metric_thread.getErrorMessage() + self.log("test_output: " + str(test_output)) + + # Send results to master + retry_time = 2.0 + proxy = taskdistribution.ServerProxy( + self.__argumentNamespace.test_suite_master_url, + allow_none=True + ).portal_task_distribution + test_result_line_test = taskdistribution.TestResultLineProxy( + proxy, retry_time, self.logger, + current_test.relative_path, + current_test.title + ) + test_details = "number of users=%d\n"\ + "number of repetitions=%d\n"\ + "number of tests=%d\n"\ + "tests=%s\n"\ + "duration=%d\n"\ + %( + (user_quantity/len(test_suite_list))*len(test_suite_list), + repetition, + len(test_suite_list), + '_'.join(test_suite_list), + test_duration + ) + self.log("Test details: %s" % test_details) + self.log("Test output: %s" % test_output) + self.log("Stopping the test case...") + try: + test_result_line_test.stop(stdout=test_output, + command=test_details, test_count=len(test_suite_list), duration=test_duration) - self.log("Test Case Stopped") + except Exception as e: + self.log("ERROR stopping test line") + self.log(e) + raise e + self.log("Test Case Stopped") + self.clearUsersFile(user_file_full_path + ".py") - # error_message_set = None exit_status = 0 + self.log("Scalability Launcher finished.") return error_message_set, exit_status def main(): error_message_set, exit_status = ScalabilityLauncher().run() - for error_message in error_message_set: - print >>sys.stderr, "ERROR: %s" % error_message - sys.exit(exit_status)