Commit 9180f343 authored by Roque's avatar Roque

testnode: scalability test runner update - requestUrl script

- requests to bootstrap and site availability urls using a script run with userhosts
- hardcoded parameters removed
- try-exception controls in communication with master
- general refactoring and cleanup
parent cfed9ece
#!/usr/bin/env python
import sys
import requests
class Connection(object):
def get(self, url):
error_message_set = set()
result = None
try:
response = requests.get(url, timeout=300)
if response.status_code != 200:
raise ValueError("response status %d" % response.status_code)
result = response.text
except Exception as e:
error_message_set.add("Error: %s" % str(e))
return result, error_message_set
return result, error_message_set
def main():
exit_status = 0
error_message_set = set()
if len(sys.argv) != 2:
error_message_set.add("Error: wrong parameters")
else:
result, error_message_set = Connection().get(sys.argv[1])
if error_message_set:
exit_status = 1
for error in error_message_set:
print error
elif result:
print result
sys.exit(exit_status)
......@@ -54,20 +54,17 @@ import signal
from . import logger
# TODO: some hardcoded values for dev purposes that will be removed soon
FRONTEND_MASTER_REFERENCE = "Scalability-FRONTEND-MASTER-SOFT1-COMP-2732"
FRONTEND_MASTER_DOMAIN = "scalability.nexedi.com"
FRONTEND_PUBLIC_IPV4 = "163.172.72.96"
FRONTEND_SOFTWARE = "https://lab.node.vifib.com/nexedi/slapos/raw/1.0.56/software/apache-frontend/software.cfg"
USERHOSTS_BIN = "/opt/slapgrid/488fc141078e0494c60d437972b3fe55/parts/userhosts/userhosts"
FRONTEND_MASTER_DOMAIN = "scalability.nexedi.com"
# max time to instance changing state: 2 hour
MAX_INSTANCE_TIME = 60*60*2
# max time to generate frontend instance: 1.5 hour
MAX_FRONTEND_TIME = 60*90
# max time to register instance to slapOSMaster: 5 minutes
MAX_CREATION_INSTANCE_TIME = 60*10
# max time for a test: 2 hour
MAX_TEST_CASE_TIME = 60*60*2
# max time for a test: 20 minutes
MAX_TEST_CASE_TIME = 60*20
# max time to prepare SlapOS for testsuite (software installation, instances requests, etc.)
MAX_PREPARE_TEST_SUITE = 3600*10*1.0 # 10 hours
# max time for a test line creation: 5 minutes
......@@ -82,6 +79,7 @@ CHECK_ACTIVITIES_TIME = 60*2
# runner names
PERFORMANCE_RUNNER_SCRIPT = "performance_tester_erp5"
SCALABILITY_RUNNER_SCRIPT = "runScalabilityTestSuite"
REQUEST_URL_SCRIPT = "requestUrl"
SCALABILITY_TEST = "scalability_test"
TEST_SUITE_INIT = "__init__.py"
......@@ -130,6 +128,7 @@ class ScalabilityTestRunner():
# Used to simulate SlapOS answer (used as a queue)
self.last_slapos_answer = []
self.last_slapos_answer_request = []
self.frontend_instance_guid = None
def _prepareSlapOS(self, software_path, computer_guid, create_partition=0):
# create_partition is kept for compatibility
......@@ -158,7 +157,7 @@ class ScalabilityTestRunner():
return instance_title
def _generateInstanceXML(self, software_configuration,
test_result, test_suite, frontend_instance_guid=None):
test_result, test_suite, frontend_software, frontend_instance_guid=None):
"""
Generate a complete scalability instance XML configuration
"""
......@@ -170,21 +169,21 @@ class ScalabilityTestRunner():
config.update({'test-suite':test_suite})
config.update({'test-suite-master-url':self.testnode.config['test_suite_master_url']})
if frontend_instance_guid:
config["frontend"] = {"software-url": FRONTEND_SOFTWARE,
config["frontend"] = {"software-url": frontend_software,
"virtualhostroot-http-port" : 8080,
"virtualhostroot-https-port" : 4443}
config["sla-dict"]["instance_guid=%s" % frontend_instance_guid] = ["frontend"]
return config
def _createInstance(self, software_path, software_configuration, instance_title,
test_result, test_suite, frontend_instance_guid=None):
test_result, test_suite, frontend_software, frontend_instance_guid=None):
"""
Create scalability instance
"""
if self.authorize_request:
logger.info("testnode, request : %s", instance_title)
config = self._generateInstanceXML(software_configuration,
test_result, test_suite, frontend_instance_guid)
test_result, test_suite, frontend_software, frontend_instance_guid)
request_kw = {"partition_parameter_kw": {"_" : json.dumps(config)} }
self.slapos_communicator.requestInstanceStart(instance_title, request_kw)
self.authorize_request = False
......@@ -244,12 +243,19 @@ ces or already launched.")
return len(self.remaining_software_installation_dict) > 0
def _updateInstanceXML(self, software_configuration, instance_title,
test_result, test_suite):
test_result, test_suite, frontend_software, frontend_instance_guid=None):
logger.info("testnode, updateInstanceXML : %s", instance_title)
config = self._generateInstanceXML(software_configuration,
test_result, test_suite)
test_result, test_suite, frontend_software, frontend_instance_guid)
request_kw = {"partition_parameter_kw": {"_" : json.dumps(config)} }
self.slapos_communicator.requestInstanceStart(instance_title, request_kw)
logger.info("Waiting for new configuration")
time.sleep(60*5)
self.slapos_communicator.requestInstanceStop(instance_title, request_kw)
self.slapos_communicator.waitInstanceStopped(instance_title)
self.slapos_communicator.requestInstanceStart(instance_title, request_kw)
return {'status_code' : 0}
def _waitInstanceCreation(self, instance_title, max_time=MAX_CREATION_INSTANCE_TIME):
......@@ -361,14 +367,14 @@ Require valid-user
software_file.write(line)
software_file.close()
def prepareFrontendMasterInstance(self, computer, test_suite_title):
public_ipv4 = FRONTEND_PUBLIC_IPV4
frontend_software = FRONTEND_SOFTWARE
frontend_master_reference = FRONTEND_MASTER_REFERENCE
domain = FRONTEND_MASTER_DOMAIN
def prepareFrontendMasterInstance(self, computer, frontend_software, test_suite_title):
self.frontend_master_reference = "Scalability-Frontend-Master-"
self.frontend_master_reference += "("+test_suite_title+")-"
self.frontend_master_reference += str(computer).replace("'","")
self.frontend_master_reference += "-root"
slap, supply, order = self._initializeSlapOSConnection()
slapos_communicator = SlapOSMasterCommunicator.SlapOSTester(
frontend_master_reference, slap, order, supply,
self.frontend_master_reference, slap, order, supply,
frontend_software, computer_guid=computer)
# Ask for software installation
start_time = time.time()
......@@ -381,13 +387,15 @@ Require valid-user
raise ValueError("ERROR while installing frontend software")
logger.info("Frontend software installed.")
# Request master frontend instance
master_request_kw = {"partition_parameter_kw": {"domain": domain, "public-ipv4": public_ipv4 } }
slapos_communicator.requestInstanceStart(frontend_master_reference, master_request_kw)
slapos_communicator.waitInstanceStarted(frontend_master_reference)
master_request_kw = {"partition_parameter_kw": {"domain" : FRONTEND_MASTER_DOMAIN } }
slapos_communicator.requestInstanceStart(self.frontend_master_reference, master_request_kw)
slapos_communicator.waitInstanceStarted(self.frontend_master_reference)
logger.info("Master Frontend instance started.")
frontend_master_dict = slapos_communicator.getMasterFrontendDict()
if not frontend_master_dict['frontend_master_ipv6']:
raise ValueError("ERROR with master frontend: ipv6 url not found")
if not frontend_master_dict['instance_guid']:
raise ValueError("ERROR in master frontend: instance guid not found")
parsed_url = urlparse.urlparse(frontend_master_dict['frontend_master_ipv6'])
self.frontend_master_ipv6 = parsed_url.hostname
return frontend_master_dict['instance_guid']
......@@ -436,7 +444,6 @@ Require valid-user
supply,
self.reachable_profile,
computer_guid=self.launcher_nodes_computer_guid[0])
# Ask for SR installation
for computer_guid in self.involved_nodes_computer_guid:
self._prepareSlapOS(self.reachable_profile, computer_guid)
......@@ -454,7 +461,11 @@ Require valid-user
# TODO : remove the line below wich simulate an answer from slapos master
self._comeBackFromDummySlapOS()
# Ask for frontend SR installation and instance request
frontend_instance_guid = self.prepareFrontendMasterInstance(self.launcher_nodes_computer_guid[0], node_test_suite.test_suite_title)
# XXX TODO: frontend software url will come from test_configuration soon
self.frontend_software = FRONTEND_SOFTWARE
self.frontend_instance_guid = self.prepareFrontendMasterInstance(self.launcher_nodes_computer_guid[0],
self.frontend_software,
node_test_suite.test_suite_title)
if self.remainSoftwareToInstall() :
# All softwares are not installed, however maxtime is elapsed, that's a failure.
logger.error("All softwares are not installed.")
......@@ -468,7 +479,8 @@ Require valid-user
self.instance_title,
node_test_suite.test_result,
node_test_suite.test_suite,
frontend_instance_guid)
self.frontend_software,
self.frontend_instance_guid)
logger.debug("Scalability instance requested.")
except Exception:
msg = "Unable to launch instance"
......@@ -479,69 +491,38 @@ Require valid-user
return {'status_code' : 0}
return {'status_code' : 1}
# TODO: this will be refactored soon
# this information must be retrieved by the testsuite itself
def getConnection(self, instance_url):
start_time = time.time()
count = 0
while MAX_CONNECTION_TIME > time.time()-start_time:
try:
count = count + 1
parsed = urlparse.urlparse(instance_url)
host = "%s:%s" % (parsed.hostname, str(parsed.port))
if parsed.port is None: host = parsed.hostname
if parsed.scheme == 'https':
return httplib.HTTPSConnection(host)
elif parsed.scheme == 'http':
return httplib.HTTPConnection(host)
else:
raise ValueError("Protocol not implemented")
except Exception as e:
logger.info("Can't get connection to %s, we will retry." %instance_url)
logger.info("Exception: " + str(e))
raise ValueError("Cannot get new connection after %d try (for %s s)" %(count, str(time.time()-start_time)))
def waitForPendingActivities(self, instance_url):
def isInstanceReady(self, site_availability_url, userhosts_bin, requestUrlScript, exec_env):
start_time = time.time()
parsed = urlparse.urlparse(instance_url)
user = parsed.username;
password = parsed.password;
header_dict = {'Authorization': 'Basic %s' % \
base64.encodestring('%s:%s' % (user, password)).strip()}
count = 0
no_pending_activities = False
while MAX_BOOTSRAPPING_TIME > time.time()-start_time and not no_pending_activities:
zope_connection = self.getConnection(instance_url)
while MAX_BOOTSRAPPING_TIME > time.time()-start_time:
try:
count = count + 1
zope_connection.request(
'GET', '/erp5/portal_activities/getMessageList',
headers=header_dict
)
result = zope_connection.getresponse()
message_list_text = result.read()
message_list = [s.strip() for s in message_list_text[1:-1].split(',')]
if len(message_list)==0 or len(message_list)==1: # hack to ignore alarm activities
logger.info("There are no pending activities.")
command = [userhosts_bin, requestUrlScript, site_availability_url]
result = self.testnode.process_manager.spawn(*command, **exec_env)
if result['status_code']:
logger.info("Error checking site availability. Response: " + str(result['stdout']))
if int(result['stdout']) == 0:
logger.info("Site ready for test.")
return True
logger.info("There are %d pending activities" %len(message_list))
logger.info("Site is not available yet.")
logger.info("[DEBUG] Site availability url response: %d" % int(result['stdout']))
time.sleep(CHECK_ACTIVITIES_TIME)
except Exception as e:
logger.info("ERROR getting activities: " + str(e))
return no_pending_activities
logger.info("Error checking site availability: " + str(e))
time.sleep(CHECK_ACTIVITIES_TIME)
return False
def runTestSuite(self, node_test_suite, portal_url):
if not self.launchable:
logger.info("Current test_suite is not actually launchable.")
return {'status_code' : 1} # Unable to continue due to not realizable configuration
return {'status_code' : 1, 'error_message': "Current test_suite is not actually launchable." }
configuration_list = node_test_suite.configuration_list
test_list = range(0, len(configuration_list))
test_result_proxy = self.testnode.taskdistribution.createTestResult(
node_test_suite.revision, test_list,
self.testnode.config['test_node_title'],
True, node_test_suite.test_suite_title,
node_test_suite.project_title)
try:
test_result_proxy = self.testnode.taskdistribution.createTestResult(
node_test_suite.revision, test_list,
self.testnode.config['test_node_title'],
True, node_test_suite.test_suite_title,
node_test_suite.project_title)
except Exception as e:
return {'status_code' : 1, 'error_message': "Error in communication with master: " + str(e) }
logger.debug("Test Result created.")
count = 0
error_message = None
......@@ -565,93 +546,110 @@ Require valid-user
error_message = "Could not find test suite %s in the repositories" % node_test_suite.test_suite
return {'status_code' : 1, 'error_message': error_message }
# Each cluster configuration are tested
# Each cluster configuration is tested
for configuration in configuration_list:
# First configuration doesn't need XML configuration update.
if count > 0:
logger.info("Requesting instance with configuration %d" % count)
self.slapos_communicator.requestInstanceStop()
self.slapos_communicator.waitInstanceStopped(self.instance_title)
self._updateInstanceXML(configuration, self.instance_title,
node_test_suite.test_result, node_test_suite.test_suite)
node_test_suite.test_result, node_test_suite.test_suite, self.frontend_software, self.frontend_instance_guid)
self.slapos_communicator.waitInstanceStarted(self.instance_title)
logger.debug("INSTANCE CORRECTLY STARTED")
# loop for getting instance information (with frontend)
# Start only the current test
exclude_list=[x for x in test_list if x!=test_list[count]]
try:
test_result_line_proxy = test_result_proxy.start(exclude_list)
except Exception as e:
error_message = "Error in communication with master: " + str(e)
break
if test_result_line_proxy == None :
error_message = "Test case already tested."
break
logger.info("Test case for count : %d is in a running state." % count)
# loop for getting instance information
logger.info("Getting instance information:")
instance_information_time = time.time()
instance_information = self.slapos_communicator.getInstanceUrlDict()
logger.info("Getting instance information:")
while not instance_information['frontend-url-list'] and \
time.time() - instance_information_time < MAX_FRONTEND_TIME:
time.sleep(5*60)
instance_information = self.slapos_communicator.getInstanceUrlDict()
logger.info(instance_information)
if not instance_information['frontend-url-list']:
return {'status_code' : 1, 'error_message': "Error getting instance information: frontend url not available" }
error_message = "Error getting instance information: frontend url not available"
break
instance_url = suite.getScalabilityTestUrl(instance_information)
bootstrap_url = suite.getBootstrapScalabilityTestUrl(instance_information, count)
metric_url = suite.getScalabilityTestMetricUrl(instance_information)
logger.info("Instance url: " + str(instance_url))
# get information from testsuite
try:
instance_url = suite.getScalabilityTestUrl(instance_information)
bootstrap_url = suite.getBootstrapScalabilityTestUrl(instance_information, count)
metric_url = suite.getScalabilityTestMetricUrl(instance_information)
site_availability_url = suite.getSiteAvailabilityUrl(instance_information)
logger.info("Instance url: " + str(instance_url))
except Exception as e:
error_message = "Error getting testsuite information: " + str(e)
break
# adding frontend url to userhosts
logger.info("Updating userhots with instance frontend url...")
parsed_url = urlparse.urlparse(instance_url)
testsuite_directory = self.testnode.config['repository_path_list'][0].rsplit('/', 1)[0]
hosts_file = open(testsuite_directory + HOSTFILE, "w")
file_content = """%s %s
try:
parsed_url = urlparse.urlparse(instance_url)
testsuite_directory = self.testnode.config['repository_path_list'][0].rsplit('/', 1)[0]
host_file_path = testsuite_directory + HOSTFILE
hosts_file = open(host_file_path, "w")
file_content = """%s %s
""" % (self.frontend_master_ipv6, parsed_url.hostname)
hosts_file.write(file_content)
hosts_file.close()
hosts_file.write(file_content)
hosts_file.close()
except Exception as e:
error_message = "Error preparing userhost file: " + str(e)
break
software_bin_directory = self.testnode.config['slapos_binary'].rsplit("slapos", 1)[0]
exec_env = os.environ.copy()
exec_env.update({'HOSTS': host_file_path})
exec_env.update({'raise_error_if_fail': False})
software_hash_directory = self.testnode.config['slapos_binary'].rsplit("bin/slapos", 1)[0]
userhosts_bin = software_hash_directory + "parts/userhosts/userhosts"
requestUrlScript = software_bin_directory + REQUEST_URL_SCRIPT
bootstrap_password = "insecure"
if bootstrap_url is not None:
logger.info("Bootstrapping instance...")
if bootstrap_url:
try:
response = requests.get(bootstrap_url)
logger.info("Bootstrapping instance...")
command = [userhosts_bin, requestUrlScript, bootstrap_url]
result = self.testnode.process_manager.spawn(*command, **exec_env)
if result['status_code']:
error_message = "Error while bootstrapping: " + str(result['stdout'])
else:
response_dict = json.loads(result['stdout'])
if response_dict["status_code"] != 0:
error_message = "Error while bootstrapping: " + response_dict["error_message"]
break
bootstrap_password = response_dict["password"]
logger.info("Bootstrap password: " + bootstrap_password)
except Exception as e:
error_message = "Site error. %s not found. Instance site not properly created? Error: %s" % (instance_url, e)
return {'status_code' : 1, 'error_message': error_message }
try:
result = json.loads(response.text)
except:
return {'status_code' : 1, 'error_message': "ERROR bootstrapping: " + str(response.text)}
if result["status_code"] != 0:
return {'status_code' : result["status_code"], 'error_message': "ERROR bootstrapping: " + result["error_message"]}
bootstrap_password = result["password"]
if not self.waitForPendingActivities(bootstrap_url):
return {'status_code' : 1, 'error_message': "ERROR waiting for pending activities."}
logger.info("Bootstrap password: " + bootstrap_password)
error_message = "Error while bootstrapping: " + str(e)
break
if not self.isInstanceReady(site_availability_url, userhosts_bin, requestUrlScript, exec_env):
error_message = "Error while bootstrapping: max bootstrap time exceded."
if error_message:
break
software_bin_directory = self.testnode.config['slapos_binary'].rsplit("slapos", 1)[0]
runner = software_bin_directory + PERFORMANCE_RUNNER_SCRIPT
scalabilityRunner = software_bin_directory + SCALABILITY_RUNNER_SCRIPT
slappart_directory = self.testnode.config['srv_directory'].rsplit("srv", 1)[0]
log_path = slappart_directory + "var/log/"
# Start only the current test
exclude_list=[x for x in test_list if x!=test_list[count]]
test_result_line_proxy = test_result_proxy.start(exclude_list)
logger.info("Test case for count : %d is in a running state." %count)
count += 1
# Loop for getting the running test case
test_line_time = time.time()
current_test = test_result_proxy.getRunningTestCase()
while not current_test and \
time.time() - test_line_time < MAX_CREATION_TEST_LINE:
time.sleep(15)
try:
current_test = test_result_proxy.getRunningTestCase()
if test_result_line_proxy == None :
error_message = "Test case already tested."
logger.info("ERROR: " + error_message)
current_test_data_json = json.loads(current_test)
current_test_data = "%d,%s,%s" % (current_test_data_json["count"], current_test_data_json["title"], current_test_data_json["relative_path"])
except Exception as e:
error_message = "Error getting current test case information: " + str(e)
break
current_test_data_json = json.loads(current_test)
current_test_data = "%d,%s,%s" % (current_test_data_json["count"], current_test_data_json["title"], current_test_data_json["relative_path"])
command = [ USERHOSTS_BIN,
command = [ userhosts_bin,
scalabilityRunner,
"--instance-url", instance_url,
"--bootstrap-password", bootstrap_password,
......@@ -666,53 +664,48 @@ Require valid-user
"--log-path", log_path,
"--metric-url", metric_url
]
hosts_file = testsuite_directory + HOSTFILE
exec_env = os.environ.copy()
exec_env.update({'HOSTS': hosts_file})
logger.info("Running test case...")
test_thread = TestThread(self.testnode.process_manager, command, logger.info, env=exec_env)
test_thread.start()
# Wait for test case ending
test_case_start_time = time.time()
while test_result_line_proxy.isTestCaseAlive() and \
test_result_proxy.isAlive() and \
time.time() - test_case_start_time < MAX_TEST_CASE_TIME:
time.sleep(60)
# Max time limit reach for current test case: failure.
if test_result_line_proxy.isTestCaseAlive():
error_message = "Test case during for %s seconds, too long. (max: %s seconds). Test failure." \
%(str(time.time() - test_case_start_time), MAX_TEST_CASE_TIME)
logger.info("ERROR: " + str(error_message))
break
logger.info("[DEBUG] Test case for current configuration finished")
# Test cancelled, finished or in an undeterminate state.
if not test_result_proxy.isAlive():
logger.info("[DEBUG] Test case finished")
# Test finished
if count == len(configuration_list):
try:
while test_result_line_proxy.isTestCaseAlive() and \
test_result_proxy.isAlive() and \
time.time() - test_case_start_time < MAX_TEST_CASE_TIME:
time.sleep(60)
if test_result_line_proxy.isTestCaseAlive():
error_message = "Test case during for %s seconds, too long. (max: %s seconds). Test failure." \
%(str(time.time() - test_case_start_time), MAX_TEST_CASE_TIME)
break
# Cancelled or in an undeterminate state.
error_message = "Test case cancelled or undeterminate state."
logger.info("ERROR: " + str(error_message))
logger.info("[DEBUG] Test case for current configuration finished")
count += 1
# Test cancelled, finished or in an undeterminate state.
if not test_result_proxy.isAlive():
logger.info("[DEBUG] Test case finished")
if count == len(configuration_list):
break
# Cancelled or in an undeterminate state.
error_message = "Test case cancelled or undeterminate state."
break
except Exception as e:
error_message = "Error in communication with master: " + str(e)
break
# Stop current instance
self.slapos_communicator.requestInstanceStop()
self.slapos_communicator.waitInstanceStopped(self.instance_title)
# Delete old instances
self._cleanUpOldInstance()
logger.info("Destroying requested instances")
self.slapos_communicator.requestInstanceDestroy(self.instance_title)
# If error appears then that's a test failure.
if error_message:
test_result_line_proxy.stop(error_count=1, failure_count=1,
stdout=error_message, stderr=error_message)
test_result_proxy.reportFailure(stdout=error_message)
logger.info("There was an error during the testsuite run: " + str(error_message))
try:
test_result_line_proxy.stop(error_count=1, failure_count=1,
stdout=error_message, stderr=error_message)
test_result_proxy.reportFailure(stdout=error_message)
except Exception as e:
error_message = "Error in communication with master: " + str(e)
logger.debug("Test Failed.")
return {'status_code' : 1, 'error_message': error_message}
# Test is finished.
......@@ -720,12 +713,6 @@ Require valid-user
return {'status_code' : 0}
def _cleanUpOldInstance(self):
self.slapos_communicator.requestInstanceDestroy(self.instance_title)
# TODO: check status? Is it possible to check status 'destroy requested' ?
# does make sense to wait the destruction? For now it waits until stop
self.slapos_communicator.waitInstanceStopped(self.instance_title)
def getRelativePathUsage(self):
"""
Used by the method testnode.constructProfile() to know
......
......@@ -494,7 +494,7 @@ class SlapOSTester(SlapOSMasterCommunicator):
self._request(INSTANCE_STATE_DESTROYED, instance["title"])
else:
root_instance = instance
self._request(INSTANCE_STATE_DESTROYED, instance_title)
self._request(INSTANCE_STATE_DESTROYED, root_instance["title"])
else:
logger.info("Instance not found")
......
......@@ -71,6 +71,8 @@ setup(name=name,
'erp5.util.benchmark.scalability_tester:main [scalability_tester]',
'runScalabilityTestSuite = '\
'erp5.util.scalability.runScalabilityTestSuite:main',
'requestUrl = '\
'erp5.util.scalability.requestUrl:main',
'generate_erp5_tester_report = '\
'erp5.util.benchmark.report:generateReport [benchmark-report]',
'web_checker_utility = erp5.util.webchecker:web_checker_utility'
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment