Commit 6c26f831 authored by Roque's avatar Roque

scalability-benchmark: refactoring and parametrizable test duration

- test duration is get from testsuite definition
- refactoring and cleanup in runScalabilityTestSuite and benchmark
- request timeout in metric thread

/reviewed-on nexedi/erp5!607
parent 7d442894
...@@ -113,6 +113,11 @@ class PerformanceTester(object): ...@@ -113,6 +113,11 @@ class PerformanceTester(object):
help='Repeat the benchmark suite N times for a given ' help='Repeat the benchmark suite N times for a given '
'number of users (default: infinite)') 'number of users (default: infinite)')
parser.add_argument('--duration',
type=int,
default=0,
help='Repeat the benchmark suite until time duration is reached')
parser.add_argument('--repeat-range', parser.add_argument('--repeat-range',
type=ArgumentType.checkIntValueWrapper(minimum=1), type=ArgumentType.checkIntValueWrapper(minimum=1),
default=-1, default=-1,
......
...@@ -31,6 +31,7 @@ import traceback ...@@ -31,6 +31,7 @@ import traceback
import signal import signal
import sys import sys
import socket import socket
import time
from ..testbrowser.browser import Browser from ..testbrowser.browser import Browser
from .result import NothingFlushedException from .result import NothingFlushedException
...@@ -158,21 +159,29 @@ class BenchmarkProcess(multiprocessing.Process): ...@@ -158,21 +159,29 @@ class BenchmarkProcess(multiprocessing.Process):
exit_status = 0 exit_status = 0
exit_msg = None exit_msg = None
def runIteration(result):
self._logger.info("Iteration: %d" % self._current_repeat)
self.runBenchmarkSuiteList(result)
if not self._current_repeat % REPEAT_NUMBER_BEFORE_FLUSHING:
try:
result.flush()
except NothingFlushedException:
pass
try: try:
with result_instance as result: with result_instance as result:
self._browser = self.getBrowser(result_instance.log_file) self._browser = self.getBrowser(result_instance.log_file)
if self._argument_namespace.duration > 0:
while self._current_repeat != (self._argument_namespace.repeat + 1): self._logger.info("Iterate until duration %d" % self._argument_namespace.duration)
self._logger.info("Iteration: %d" % self._current_repeat) start_time = time.time()
self.runBenchmarkSuiteList(result) while self._argument_namespace.duration > (time.time()-start_time):
runIteration(result)
if not self._current_repeat % REPEAT_NUMBER_BEFORE_FLUSHING: self._current_repeat += 1
try: else:
result.flush() self._logger.info("Iterate until repeat %d" % self._argument_namespace.repeat)
except NothingFlushedException: while self._current_repeat != (self._argument_namespace.repeat + 1):
pass runIteration(result)
self._current_repeat += 1
self._current_repeat += 1
except StopIteration, e: except StopIteration, e:
self._logger.error(e) self._logger.error(e)
......
...@@ -56,7 +56,7 @@ class TestMetricThread(threading.Thread): ...@@ -56,7 +56,7 @@ class TestMetricThread(threading.Thread):
while(not self.stop_event.is_set()): while(not self.stop_event.is_set()):
self.stop_event.wait(-time.time() % self.interval) self.stop_event.wait(-time.time() % self.interval)
try: try:
response = requests.get(self.metric_url) response = requests.get(self.metric_url, timeout=60)
if response.status_code == 200: if response.status_code == 200:
self.metric_list.append(response.text) self.metric_list.append(response.text)
else: else:
...@@ -71,6 +71,3 @@ class TestMetricThread(threading.Thread): ...@@ -71,6 +71,3 @@ class TestMetricThread(threading.Thread):
def getErrorMessage(self): def getErrorMessage(self):
return self.error_message return self.error_message
...@@ -27,11 +27,12 @@ import datetime ...@@ -27,11 +27,12 @@ import datetime
MAX_INSTALLATION_TIME = 60*50 MAX_INSTALLATION_TIME = 60*50
MAX_TESTING_TIME = 60 MAX_TESTING_TIME = 60
MAX_GETTING_CONNECTION_TIME = 60*5 MAX_GETTING_CONNECTION_TIME = 60*5
TEST_METRIC_TIME_INTERVAL = 60*3 TEST_METRIC_TIME_INTERVAL = 60
SCALABILITY_TEST_DURATION = 10*60
SCALABILITY_LOG_FILENAME = "runScalabilityTestSuite" SCALABILITY_LOG_FILENAME = "runScalabilityTestSuite"
LOG_FILE_PREFIX = "scalability-test" LOG_FILE_PREFIX = "scalability-test"
MAX_ERRORS = 2 MAX_ERRORS = 10
class DummyLogger(object): class DummyLogger(object):
def __init__(self, func): def __init__(self, func):
...@@ -39,73 +40,6 @@ class DummyLogger(object): ...@@ -39,73 +40,6 @@ class DummyLogger(object):
'critical', 'fatal'): 'critical', 'fatal'):
setattr(self, name, func) setattr(self, name, func)
def getConnection(instance_url, log):
"""
Return a connection with the instance.
"""
start_time = time.time()
count = 0
while MAX_GETTING_CONNECTION_TIME > time.time()-start_time:
try:
count = count + 1
parsed = urlparse.urlparse(instance_url)
host = "%s:%s" % (parsed.hostname, str(parsed.port))
if parsed.port is None: host = parsed.hostname
if parsed.scheme == 'https':
return httplib.HTTPSConnection(host)
elif parsed.scheme == 'http':
return httplib.HTTPConnection(host)
else:
raise ValueError("Protocol not implemented")
except:
log("Can't get connection to %s, we will retry." %instance_url)
time.sleep(10)
raise ValueError("Cannot get new connection after %d try (for %s s)" %(count, str(time.time()-start_time)))
# TODO: this will be refactored soon
def waitFor0PendingActivities(instance_url, log):
"""
Waiting while there are no pending activities on the instance.
"""
log("waiting activities for: " + str(instance_url))
start_time = time.time()
parsed = urlparse.urlparse(instance_url)
user = parsed.username;
password = parsed.password;
header_dict = {'Authorization': 'Basic %s' % \
base64.encodestring('%s:%s' % (user, password)).strip()}
count = 0
ok = False
while MAX_INSTALLATION_TIME > time.time()-start_time and not ok:
zope_connection = getConnection(instance_url, log)
try:
count = count + 1
zope_connection.request(
'GET', '/erp5/portal_activities/getMessageList',
headers=header_dict
)
result = zope_connection.getresponse()
message_list_text = result.read()
message_list = [s.strip() for s in message_list_text[1:-1].split(',')]
if len(message_list)==0:
log("There is no pending activities.")
ok = True
#Hack to do not take into account persistent Alarm_installMailServer acitivities
if len(message_list)==1 :
log("1 pending activity but ok.")
ok = True
log("There is %d pending activities" %len(message_list))
time.sleep(5)
except Exception as e:
time.sleep(5)
log("exception: " + str(e))
log("Getting activities failed, retry.")
if not ok:
raise ValueError("Cannot waitFor0PendingActivities after %d try (for %s s)" %(count, str(time.time()-start_time)))
class ScalabilityTest(object): class ScalabilityTest(object):
def __init__(self, data, test_result): def __init__(self, data, test_result):
self.__dict__ = {} self.__dict__ = {}
...@@ -144,9 +78,8 @@ class ScalabilityLauncher(object): ...@@ -144,9 +78,8 @@ class ScalabilityLauncher(object):
def __init__(self): def __init__(self):
self.__argumentNamespace = self._parseArguments(argparse.ArgumentParser( self.__argumentNamespace = self._parseArguments(argparse.ArgumentParser(
description='Run benchmarking scalability suites.')) description='Run benchmarking scalability suites.'))
logger = createLogger(self.__argumentNamespace.log_path) self.logger = createLogger(self.__argumentNamespace.log_path)
self.log = logger.info self.log = self.logger.info
self.logger = logger
self.users_file_original_content = [] self.users_file_original_content = []
# Proxy to with master test_result # Proxy to with master test_result
portal_url = self.__argumentNamespace.test_suite_master_url portal_url = self.__argumentNamespace.test_suite_master_url
...@@ -297,13 +230,14 @@ class ScalabilityLauncher(object): ...@@ -297,13 +230,14 @@ class ScalabilityLauncher(object):
# Prepare configuration # Prepare configuration
current_test_number = int(current_test.title) current_test_number = int(current_test.title)
test_duration = suite.getTestDuration(current_test_number) test_duration = suite.getTestDuration(current_test_number)
if not test_duration or test_duration == 0:
test_duration = SCALABILITY_TEST_DURATION
benchmarks_path = os.path.join(self.__argumentNamespace.repo_location, suite.getTestPath()) benchmarks_path = os.path.join(self.__argumentNamespace.repo_location, suite.getTestPath())
user_file_full_path = os.path.join(self.__argumentNamespace.repo_location, suite.getUsersFilePath()) user_file_full_path = os.path.join(self.__argumentNamespace.repo_location, suite.getUsersFilePath())
user_file_path = os.path.split(user_file_full_path)[0] user_file_path = os.path.split(user_file_full_path)[0]
user_file = os.path.split(user_file_full_path)[1] user_file = os.path.split(user_file_full_path)[1]
tester_path = self.__argumentNamespace.runner_path tester_path = self.__argumentNamespace.runner_path
user_quantity = suite.getUserQuantity(current_test_number) user_quantity = suite.getUserQuantity(current_test_number)
repetition = suite.getTestRepetition(current_test_number)
instance_url = self.__argumentNamespace.instance_url instance_url = self.__argumentNamespace.instance_url
metric_url = self.__argumentNamespace.metric_url metric_url = self.__argumentNamespace.metric_url
...@@ -312,47 +246,45 @@ class ScalabilityLauncher(object): ...@@ -312,47 +246,45 @@ class ScalabilityLauncher(object):
metric_thread = TestMetricThread(metric_url, self.log, metric_thread_stop_event, interval=TEST_METRIC_TIME_INTERVAL) metric_thread = TestMetricThread(metric_url, self.log, metric_thread_stop_event, interval=TEST_METRIC_TIME_INTERVAL)
metric_thread.start() metric_thread.start()
bootstrap_password = self.__argumentNamespace.bootstrap_password
try: try:
bootstrap_password = self.__argumentNamespace.bootstrap_password
self.updateUsersFile(user_quantity, bootstrap_password, user_file_full_path + ".py") self.updateUsersFile(user_quantity, bootstrap_password, user_file_full_path + ".py")
except Exception as e: except Exception as e:
self.log("ERROR while updating file: " + str(e)) self.log("ERROR while updating file: " + str(e))
now = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") now = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M")
log_dir = "test-%s_%s" % (current_test.title, now) log_dir = "test-%s_%s" % (current_test.title, now)
# the repetition of tests will be refactored soon
for i in range(1, repetition+1): command_list = []
self.log("Repetition: %d/%d" %(i, repetition)) user_index = 0
waitFor0PendingActivities(instance_url, self.log) # Prepare commands
# Generate commands to run for test_suite in test_suite_list:
command_list = [] log_file_name_prefix = "%s_%s_suite_%s" %(LOG_FILE_PREFIX, current_test.title, test_suite)
user_index = 0 command_list.append([tester_path,
for test_suite in test_suite_list: instance_url,
command_list.append([tester_path, str(user_quantity/len(test_suite_list)),
instance_url, test_suite,
str(user_quantity/len(test_suite_list)), '--benchmark-path-list', benchmarks_path,
test_suite, '--users-file-path', user_file_path,
'--benchmark-path-list', benchmarks_path, '--users-file', user_file,
'--users-file-path', user_file_path, '--filename-prefix', log_file_name_prefix,
'--users-file', user_file, '--report-directory', self.__argumentNamespace.log_path,
'--filename-prefix', "%s_%s_repetition%d_suite_%s" %(LOG_FILE_PREFIX, current_test.title, i, test_suite), '--max-errors', str(MAX_ERRORS),
'--report-directory', self.__argumentNamespace.log_path, '--user-index', str(user_index),
'--repeat', "%d"%1, "--duration", "%d"%test_duration,
'--max-errors', str(MAX_ERRORS), ])
'--user-index', str(user_index), user_index += user_quantity/len(test_suite_list)
]) # Launch commands
user_index += user_quantity/len(test_suite_list) exec_env = os.environ.copy()
exec_env.update({'raise_error_if_fail': False})
# Launch commands for index, command in enumerate(command_list, start=1):
for command in command_list: test_thread = TestThread(process_manager, command, self.log, env=exec_env)
test_thread = TestThread(process_manager, command, self.log) test_thread.start()
test_thread.start() # Sleep
# Sleep self.log("Going to sleep for %s seconds." % str(test_duration))
self.log("Going to sleep for %s seconds (Test duration)." % str(test_duration)) time.sleep(test_duration)
time.sleep(test_duration) process_manager.killPreviousRun()
self.moveLogs(log_dir, current_test)
waitFor0PendingActivities(instance_url, self.log)
self.moveLogs(log_dir, current_test)
self.log("Test Case %s has finished" %(current_test.title)) self.log("Test Case %s has finished" %(current_test.title))
metric_thread_stop_event.set() metric_thread_stop_event.set()
...@@ -360,45 +292,57 @@ class ScalabilityLauncher(object): ...@@ -360,45 +292,57 @@ class ScalabilityLauncher(object):
metric_list = metric_thread.getMetricList() metric_list = metric_thread.getMetricList()
test_output = suite.getScalabilityTestOutput(metric_list) test_output = suite.getScalabilityTestOutput(metric_list)
if not test_output: if not test_output:
self.log("metric list and test output empty. getting metric thread error message.") metric_error = metric_thread.getErrorMessage()
test_output = metric_thread.getErrorMessage() if metric_error:
self.log("test_output: " + str(test_output)) error_message = str(metric_error)
else:
error_message = "Metric thread couldn't get any metric"
error_message_set.add(error_message)
if error_message_set:
self.log("error_message_set: " + str(error_message_set))
# Send results to master
retry_time = 2.0
proxy = taskdistribution.ServerProxy(
self.__argumentNamespace.test_suite_master_url,
allow_none=True
).portal_task_distribution
test_result_line_test = taskdistribution.TestResultLineProxy(
proxy, retry_time, self.logger,
current_test.relative_path,
current_test.title
)
test_details = "number of users=%d\n"\ test_details = "number of users=%d\n"\
"number of repetitions=%d\n"\
"number of tests=%d\n"\ "number of tests=%d\n"\
"tests=%s\n"\ "tests=%s\n"\
"duration=%d\n"\ "duration=%d\n"\
%( %(
(user_quantity/len(test_suite_list))*len(test_suite_list), (user_quantity/len(test_suite_list))*len(test_suite_list),
repetition,
len(test_suite_list), len(test_suite_list),
'_'.join(test_suite_list), '_'.join(test_suite_list),
test_duration test_duration
) )
self.log("Test details: %s" % test_details) self.log("Test details: %s" % test_details)
self.log("Test output: %s" % test_output)
self.log("Stopping the test case...") # Send results to master
self.log("Connecting to master to set test results...")
try: try:
test_result_line_test.stop(stdout=test_output, retry_time = 2.0
command=test_details, proxy = taskdistribution.ServerProxy(
test_count=len(test_suite_list), self.__argumentNamespace.test_suite_master_url,
duration=test_duration) allow_none=True
).portal_task_distribution
test_result_line_test = taskdistribution.TestResultLineProxy(
proxy, retry_time, self.logger,
current_test.relative_path,
current_test.title
)
if len(error_message_set):
self.log("Test case failed.")
error_message = "; ".join(str(error) for error in error_message_set)
test_result_line_test.stop(error_count=1, failure_count=1,
command=test_details,
stdout=error_message, stderr=error_message)
else:
self.log("Test case finished. Output: %s" % test_output)
test_result_line_test.stop(stdout=test_output,
command=test_details,
test_count=len(test_suite_list),
duration=test_duration)
except Exception as e: except Exception as e:
self.log("ERROR stopping test line") self.log("Error during communication to master: " + str(e))
self.log(e)
raise e raise e
self.log("Test Case Stopped") self.log("Test Case Stopped")
self.clearUsersFile(user_file_full_path + ".py") self.clearUsersFile(user_file_full_path + ".py")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment