diff --git a/erp5/util/testnode/SlapOSControler.py b/erp5/util/testnode/SlapOSControler.py index ade1ce2e9b65a241cef7c107748b0cb6ec7583ae..7ef85c2e8ba143606f4b357f0ed2f29abc8e8765 100644 --- a/erp5/util/testnode/SlapOSControler.py +++ b/erp5/util/testnode/SlapOSControler.py @@ -58,7 +58,8 @@ class SlapOSControler(object): # connections time.sleep(10) slap = slapos.slap.slap() - slap.initializeConnection(config['master_url']) + self.slap = slap + self.slap.initializeConnection(config['master_url']) # register software profile self.software_profile = config['custom_profile_path'] slap.registerSupply().supply( @@ -80,14 +81,16 @@ class SlapOSControler(object): # In order to be able to change partition naming scheme, do this at # instance_root level (such change happened already, causing problems). shutil.rmtree(instance_root) - os.mkdir(instance_root) + if not(os.path.exists(instance_root)): + os.mkdir(instance_root) for i in range(0, MAX_PARTIONS): # create partition and configure computer # XXX: at the moment all partitions do share same virtual interface address # this is not a problem as usually all services are on different ports partition_reference = '%s-%s' %(config['partition_reference'], i) partition_path = os.path.join(instance_root, partition_reference) - os.mkdir(partition_path) + if not(os.path.exists(partition_path)): + os.mkdir(partition_path) os.chmod(partition_path, 0750) computer.updateConfiguration(xml_marshaller.xml_marshaller.dumps({ 'address': config['ipv4_address'], @@ -116,7 +119,7 @@ class SlapOSControler(object): # a SR may fail for number of reasons (incl. network failures) # so be tolerant and run it a few times before giving up for runs in range(0, MAX_SR_RETRIES): - status_dict = self.spawn(config['slapgrid_software_binary'], '-v', '-c', + status_dict = self.spawn(config['slapgrid_software_binary'], '-v', '-c','--now', config['slapos_config'], raise_error_if_fail=False, log_prefix='slapgrid_sr', get_output=False) return status_dict @@ -124,10 +127,9 @@ class SlapOSControler(object): def runComputerPartition(self, config, environment, stdout=None, stderr=None): self.log("SlapOSControler.runComputerPartition") - slap = slapos.slap.slap() # cloudooo-json is required but this is a hack which should be removed config['instance_dict']['cloudooo-json'] = "{}" - slap.registerOpenOrder().request(self.software_profile, + self.slap.registerOpenOrder().request(self.software_profile, partition_reference='testing partition', partition_parameter_kw=config['instance_dict']) @@ -138,4 +140,5 @@ class SlapOSControler(object): status_dict = self.spawn(config['slapgrid_partition_binary'], '-v', '-c', config['slapos_config'], raise_error_if_fail=False, log_prefix='slapgrid_cp', get_output=False) + self.log('slapgrid_cp status_dict : %r' % (status_dict,)) return status_dict diff --git a/erp5/util/testnode/testnode.py b/erp5/util/testnode/testnode.py index 806adf2b50e319a01f3fc4856c6163a2a420ed85..bbddd30891cdcc51c6c565bd450cf7ffbeb1128c 100644 --- a/erp5/util/testnode/testnode.py +++ b/erp5/util/testnode/testnode.py @@ -26,129 +26,27 @@ ############################################################################## from datetime import datetime import os -import signal -import socket import subprocess import sys import time -import xmlrpclib import glob import SlapOSControler -import threading from ProcessManager import SubprocessError, ProcessManager, CancellationError from Updater import Updater +from erp5.util import taskdistribution DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep - supervisord_pid_file = None -# XXX: below section is shared with product/ERP5Type/tests/runTestSuite.py . -# They are supposed to be merged into a common library/tool someday, until -# then please keep them synchronised. -# Depending on used xmlrpc backend, different exceptions can be thrown. -SAFE_RPC_EXCEPTION_LIST = [socket.error, xmlrpclib.ProtocolError, xmlrpclib.Fault] -parser, _ = xmlrpclib.getparser() -if xmlrpclib.ExpatParser and isinstance(parser, xmlrpclib.ExpatParser): - SAFE_RPC_EXCEPTION_LIST.append(xmlrpclib.expat.ExpatError) -else: - print >>sys.stderr, 'Warning: unhandled xmlrpclib parser %r, some ' \ - 'exceptions might get through safeRpcCall' % (parser, ) -SAFE_RPC_EXCEPTION_LIST = tuple(SAFE_RPC_EXCEPTION_LIST) -del parser, _ - -def safeRpcCall(log, proxy, function_id, retry, *args): - # this method will try infinitive calls to backend - # this can cause testnode to looked "stalled" - retry_time = 64 - while True: - try: - # it safer to pass proxy and function_id so we avoid httplib.ResponseNotReady - # by trying reconnect before server keep-alive ends and the socket closes - log('safeRpcCall called with method : %s' % function_id) - function = getattr(proxy, function_id) - return function(*args) - except SAFE_RPC_EXCEPTION_LIST, e: - log('Exception in safeRpcCall when trying %s with %r' % (function_id, args), - exc_info=sys.exc_info()) - if not(retry): - return - log('will retry safeRpcCall in %i seconds' % retry_time) - time.sleep(retry_time) - retry_time += retry_time >> 1 - -class RemoteLogger(object): - - def __init__(self, log, log_file, test_node_title, process_manager): - self.portal = None - self.test_result_path = None - self.test_node_title = test_node_title - self.log = log - self.log_file = log_file - self.process_manager = process_manager - self.finish = False - self.quit = False - - def update(self, portal, test_result_path): - self.portal = portal - self.test_result_path = test_result_path - - def getSize(self): - erp5testnode_log = open(self.log_file, 'r') - erp5testnode_log.seek(0, 2) - size = erp5testnode_log.tell() - erp5testnode_log.close() - return size +PROFILE_PATH_KEY = 'profile_path' - def __call__(self): - size = self.getSize() - while True: - for x in xrange(0,60): - if self.quit or self.finish: - break - time.sleep(1) - if self.quit: - return - finish = retry = self.finish - if self.test_result_path is None: - if finish: - return - continue - start_size = size - end_size = self.getSize() - # file was truncated - if end_size < start_size: - size = end_size - continue - # display some previous data - if start_size >= 5000: - start_size -= 5000 - # do not send tons of log, only last logs - if (end_size-start_size >= 10000): - start_size = end_size-10000 - erp5testnode_log = open(self.log_file, 'r') - erp5testnode_log.seek(start_size) - output = erp5testnode_log.read() - erp5testnode_log.close() - if end_size == size: - output += '%s : stucked ??' % datetime.now().strftime("%Y/%m/%d %H:%M:%S") - # check if the test result is still alive - is_alive = safeRpcCall(self.log, self.portal, "isTaskAlive", False, - self.test_result_path) - self.log('isTaskAlive result %r' % is_alive) - if is_alive is not None and is_alive == 0: - self.log('Test Result cancelled on server side, stop current test') - self.process_manager.killPreviousRun(cancellation=True) - return - status_dict = dict(command='erp5testnode', status_code=0, - stdout=''.join(output), stderr='') - safeRpcCall(self.log, self.portal, "reportTaskStatus", retry, - self.test_result_path, status_dict, self.test_node_title) - size = end_size - if finish: - return -PROFILE_PATH_KEY = 'profile_path' +class DummyLogger(object): + def __init__(self, func): + for name in ('trace', 'debug', 'info', 'warn', 'warning', 'error', + 'critical', 'fatal'): + setattr(self, name, func) class TestNode(object): @@ -227,9 +125,6 @@ branch = %(branch)s same_revision_count = 0 try: while True: - remote_test_result_needs_cleanup = False - remote_logger = None - remote_logger_thread = None try: # kill processes from previous loop if any process_manager.killPreviousRun() @@ -264,36 +159,21 @@ branch = %(branch)s retry = False previous_revision = revision portal_url = config['test_suite_master_url'] - test_result_path = None - test_result = (test_result_path, revision) - if portal_url: - if portal_url[-1] != '/': - portal_url += '/' - portal = xmlrpclib.ServerProxy("%s%s" % - (portal_url, 'portal_task_distribution'), - allow_none=1) - assert safeRpcCall(log, portal, "getProtocolRevision", True) == 1 - test_result = safeRpcCall(log, portal, "createTestResult", True, - config['test_suite'], revision, [], - False, test_suite_title, - config['test_node_title'], config['project_title']) - remote_test_result_needs_cleanup = True - + portal = taskdistribution.TaskDistributionTool(portal_url, logger = DummyLogger(log)) + test_result = portal.createTestResult(revision,[],config['test_node_title'],False,test_suite_title,config['project_title']) + remote_test_result_needs_cleanup = True log("testnode, test_result : %r" % (test_result, )) - if test_result: - test_result_path, test_revision = test_result + if test_result is not None: if config.get('log_file'): - remote_logger = RemoteLogger(log, config['log_file'], - config['test_node_title'], - process_manager) - remote_logger.portal = portal - remote_logger.test_result_path = test_result_path - remote_logger_thread = threading.Thread(target=remote_logger) - remote_logger_thread.start() - if revision != test_revision: - previous_revision = test_revision + log_file_name = config['log_file'] + log_file = open(log_file_name) + log_file.seek(0,2) + log_file.seek(-min(5000,log_file.tell()),2) + test_result.addWatch(log_file_name,log_file,max_history_bytes=10000) + if revision != test_result.revision: + previous_revision = test_result.revision log('Disagreement on tested revision, checking out:') - for i, repository_revision in enumerate(test_revision.split(',')): + for i, repository_revision in enumerate(previous_revision.split(',')): vcs_repository = vcs_repository_list[i] repository_path = vcs_repository['repository_path'] revision = repository_revision.rsplit('-', 1)[1] @@ -328,7 +208,6 @@ branch = %(branch)s # as partitions can be of any kind we have and likely will never have # a reliable way to check if they are up or not ... time.sleep(20) - run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" %config['instance_root']) if not len(run_test_suite_path_list): raise ValueError('No runTestSuite provided in installed partitions.') @@ -356,17 +235,19 @@ branch = %(branch)s process_manager.spawn(*invocation_list, cwd=config['test_suite_directory'], log_prefix='runTestSuite', get_output=False) - if remote_logger: - remote_logger.quit = True - remote_logger_thread.join() + if test_result is not None: + test_result.removeWatch(log_file_name) except SubprocessError, e: log("SubprocessError", exc_info=sys.exc_info()) - if remote_logger: - remote_logger.finish = True - remote_logger_thread.join() + if test_result is not None: + test_result.removeWatch(log_file_name) if remote_test_result_needs_cleanup: - safeRpcCall(log, portal, "reportTaskFailure", True, - test_result_path, e.status_dict, config['test_node_title']) + status_dict = e.status_dict or {} + test_result.reportFailure( + command = status_dict.get('command'), + stdout = status_dict.get('stdout'), + stderr = status_dict.get('stderr'), + ) log("SubprocessError, going to sleep %s" % DEFAULT_SLEEP_TIMEOUT) time.sleep(DEFAULT_SLEEP_TIMEOUT) continue @@ -384,5 +265,9 @@ branch = %(branch)s # Exceptions are swallowed during cleanup phase log('Testnode.run, finally close') process_manager.killPreviousRun() - if remote_logger: - remote_logger.quit = True + if test_result is not None: + try: + test_result.removeWatch(log_file_name) + except KeyError: + log("KeyError, Watcher already deleted or not added correctly") +