Commit 7e4a3a1a authored by Pere Cortes's avatar Pere Cortes Committed by Sebastien Robin

Implementation of taskdistributiontool library in erp5testnode

parent 4e96e83e
...@@ -58,7 +58,8 @@ class SlapOSControler(object): ...@@ -58,7 +58,8 @@ class SlapOSControler(object):
# connections # connections
time.sleep(10) time.sleep(10)
slap = slapos.slap.slap() slap = slapos.slap.slap()
slap.initializeConnection(config['master_url']) self.slap = slap
self.slap.initializeConnection(config['master_url'])
# register software profile # register software profile
self.software_profile = config['custom_profile_path'] self.software_profile = config['custom_profile_path']
slap.registerSupply().supply( slap.registerSupply().supply(
...@@ -80,6 +81,7 @@ class SlapOSControler(object): ...@@ -80,6 +81,7 @@ class SlapOSControler(object):
# In order to be able to change partition naming scheme, do this at # In order to be able to change partition naming scheme, do this at
# instance_root level (such change happened already, causing problems). # instance_root level (such change happened already, causing problems).
shutil.rmtree(instance_root) shutil.rmtree(instance_root)
if not(os.path.exists(instance_root)):
os.mkdir(instance_root) os.mkdir(instance_root)
for i in range(0, MAX_PARTIONS): for i in range(0, MAX_PARTIONS):
# create partition and configure computer # create partition and configure computer
...@@ -87,6 +89,7 @@ class SlapOSControler(object): ...@@ -87,6 +89,7 @@ class SlapOSControler(object):
# this is not a problem as usually all services are on different ports # this is not a problem as usually all services are on different ports
partition_reference = '%s-%s' %(config['partition_reference'], i) partition_reference = '%s-%s' %(config['partition_reference'], i)
partition_path = os.path.join(instance_root, partition_reference) partition_path = os.path.join(instance_root, partition_reference)
if not(os.path.exists(partition_path)):
os.mkdir(partition_path) os.mkdir(partition_path)
os.chmod(partition_path, 0750) os.chmod(partition_path, 0750)
computer.updateConfiguration(xml_marshaller.xml_marshaller.dumps({ computer.updateConfiguration(xml_marshaller.xml_marshaller.dumps({
...@@ -116,7 +119,7 @@ class SlapOSControler(object): ...@@ -116,7 +119,7 @@ class SlapOSControler(object):
# a SR may fail for number of reasons (incl. network failures) # a SR may fail for number of reasons (incl. network failures)
# so be tolerant and run it a few times before giving up # so be tolerant and run it a few times before giving up
for runs in range(0, MAX_SR_RETRIES): for runs in range(0, MAX_SR_RETRIES):
status_dict = self.spawn(config['slapgrid_software_binary'], '-v', '-c', status_dict = self.spawn(config['slapgrid_software_binary'], '-v', '-c','--now',
config['slapos_config'], raise_error_if_fail=False, config['slapos_config'], raise_error_if_fail=False,
log_prefix='slapgrid_sr', get_output=False) log_prefix='slapgrid_sr', get_output=False)
return status_dict return status_dict
...@@ -124,10 +127,9 @@ class SlapOSControler(object): ...@@ -124,10 +127,9 @@ class SlapOSControler(object):
def runComputerPartition(self, config, environment, def runComputerPartition(self, config, environment,
stdout=None, stderr=None): stdout=None, stderr=None):
self.log("SlapOSControler.runComputerPartition") self.log("SlapOSControler.runComputerPartition")
slap = slapos.slap.slap()
# cloudooo-json is required but this is a hack which should be removed # cloudooo-json is required but this is a hack which should be removed
config['instance_dict']['cloudooo-json'] = "{}" config['instance_dict']['cloudooo-json'] = "{}"
slap.registerOpenOrder().request(self.software_profile, self.slap.registerOpenOrder().request(self.software_profile,
partition_reference='testing partition', partition_reference='testing partition',
partition_parameter_kw=config['instance_dict']) partition_parameter_kw=config['instance_dict'])
...@@ -138,4 +140,5 @@ class SlapOSControler(object): ...@@ -138,4 +140,5 @@ class SlapOSControler(object):
status_dict = self.spawn(config['slapgrid_partition_binary'], '-v', '-c', status_dict = self.spawn(config['slapgrid_partition_binary'], '-v', '-c',
config['slapos_config'], raise_error_if_fail=False, config['slapos_config'], raise_error_if_fail=False,
log_prefix='slapgrid_cp', get_output=False) log_prefix='slapgrid_cp', get_output=False)
self.log('slapgrid_cp status_dict : %r' % (status_dict,))
return status_dict return status_dict
...@@ -26,129 +26,27 @@ ...@@ -26,129 +26,27 @@
############################################################################## ##############################################################################
from datetime import datetime from datetime import datetime
import os import os
import signal
import socket
import subprocess import subprocess
import sys import sys
import time import time
import xmlrpclib
import glob import glob
import SlapOSControler import SlapOSControler
import threading
from ProcessManager import SubprocessError, ProcessManager, CancellationError from ProcessManager import SubprocessError, ProcessManager, CancellationError
from Updater import Updater from Updater import Updater
from erp5.util import taskdistribution
DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep DEFAULT_SLEEP_TIMEOUT = 120 # time in seconds to sleep
supervisord_pid_file = None supervisord_pid_file = None
# XXX: below section is shared with product/ERP5Type/tests/runTestSuite.py . PROFILE_PATH_KEY = 'profile_path'
# They are supposed to be merged into a common library/tool someday, until
# then please keep them synchronised.
# Depending on used xmlrpc backend, different exceptions can be thrown.
SAFE_RPC_EXCEPTION_LIST = [socket.error, xmlrpclib.ProtocolError, xmlrpclib.Fault]
parser, _ = xmlrpclib.getparser()
if xmlrpclib.ExpatParser and isinstance(parser, xmlrpclib.ExpatParser):
SAFE_RPC_EXCEPTION_LIST.append(xmlrpclib.expat.ExpatError)
else:
print >>sys.stderr, 'Warning: unhandled xmlrpclib parser %r, some ' \
'exceptions might get through safeRpcCall' % (parser, )
SAFE_RPC_EXCEPTION_LIST = tuple(SAFE_RPC_EXCEPTION_LIST)
del parser, _
def safeRpcCall(log, proxy, function_id, retry, *args):
# this method will try infinitive calls to backend
# this can cause testnode to looked "stalled"
retry_time = 64
while True:
try:
# it safer to pass proxy and function_id so we avoid httplib.ResponseNotReady
# by trying reconnect before server keep-alive ends and the socket closes
log('safeRpcCall called with method : %s' % function_id)
function = getattr(proxy, function_id)
return function(*args)
except SAFE_RPC_EXCEPTION_LIST, e:
log('Exception in safeRpcCall when trying %s with %r' % (function_id, args),
exc_info=sys.exc_info())
if not(retry):
return
log('will retry safeRpcCall in %i seconds' % retry_time)
time.sleep(retry_time)
retry_time += retry_time >> 1
class RemoteLogger(object):
def __init__(self, log, log_file, test_node_title, process_manager):
self.portal = None
self.test_result_path = None
self.test_node_title = test_node_title
self.log = log
self.log_file = log_file
self.process_manager = process_manager
self.finish = False
self.quit = False
def update(self, portal, test_result_path):
self.portal = portal
self.test_result_path = test_result_path
def getSize(self):
erp5testnode_log = open(self.log_file, 'r')
erp5testnode_log.seek(0, 2)
size = erp5testnode_log.tell()
erp5testnode_log.close()
return size
def __call__(self):
size = self.getSize()
while True:
for x in xrange(0,60):
if self.quit or self.finish:
break
time.sleep(1)
if self.quit:
return
finish = retry = self.finish
if self.test_result_path is None:
if finish:
return
continue
start_size = size
end_size = self.getSize()
# file was truncated
if end_size < start_size:
size = end_size
continue
# display some previous data
if start_size >= 5000:
start_size -= 5000
# do not send tons of log, only last logs
if (end_size-start_size >= 10000):
start_size = end_size-10000
erp5testnode_log = open(self.log_file, 'r')
erp5testnode_log.seek(start_size)
output = erp5testnode_log.read()
erp5testnode_log.close()
if end_size == size:
output += '%s : stucked ??' % datetime.now().strftime("%Y/%m/%d %H:%M:%S")
# check if the test result is still alive
is_alive = safeRpcCall(self.log, self.portal, "isTaskAlive", False,
self.test_result_path)
self.log('isTaskAlive result %r' % is_alive)
if is_alive is not None and is_alive == 0:
self.log('Test Result cancelled on server side, stop current test')
self.process_manager.killPreviousRun(cancellation=True)
return
status_dict = dict(command='erp5testnode', status_code=0,
stdout=''.join(output), stderr='')
safeRpcCall(self.log, self.portal, "reportTaskStatus", retry,
self.test_result_path, status_dict, self.test_node_title)
size = end_size
if finish:
return
PROFILE_PATH_KEY = 'profile_path' class DummyLogger(object):
def __init__(self, func):
for name in ('trace', 'debug', 'info', 'warn', 'warning', 'error',
'critical', 'fatal'):
setattr(self, name, func)
class TestNode(object): class TestNode(object):
...@@ -227,9 +125,6 @@ branch = %(branch)s ...@@ -227,9 +125,6 @@ branch = %(branch)s
same_revision_count = 0 same_revision_count = 0
try: try:
while True: while True:
remote_test_result_needs_cleanup = False
remote_logger = None
remote_logger_thread = None
try: try:
# kill processes from previous loop if any # kill processes from previous loop if any
process_manager.killPreviousRun() process_manager.killPreviousRun()
...@@ -264,36 +159,21 @@ branch = %(branch)s ...@@ -264,36 +159,21 @@ branch = %(branch)s
retry = False retry = False
previous_revision = revision previous_revision = revision
portal_url = config['test_suite_master_url'] portal_url = config['test_suite_master_url']
test_result_path = None portal = taskdistribution.TaskDistributionTool(portal_url, logger = DummyLogger(log))
test_result = (test_result_path, revision) test_result = portal.createTestResult(revision,[],config['test_node_title'],False,test_suite_title,config['project_title'])
if portal_url:
if portal_url[-1] != '/':
portal_url += '/'
portal = xmlrpclib.ServerProxy("%s%s" %
(portal_url, 'portal_task_distribution'),
allow_none=1)
assert safeRpcCall(log, portal, "getProtocolRevision", True) == 1
test_result = safeRpcCall(log, portal, "createTestResult", True,
config['test_suite'], revision, [],
False, test_suite_title,
config['test_node_title'], config['project_title'])
remote_test_result_needs_cleanup = True remote_test_result_needs_cleanup = True
log("testnode, test_result : %r" % (test_result, )) log("testnode, test_result : %r" % (test_result, ))
if test_result: if test_result is not None:
test_result_path, test_revision = test_result
if config.get('log_file'): if config.get('log_file'):
remote_logger = RemoteLogger(log, config['log_file'], log_file_name = config['log_file']
config['test_node_title'], log_file = open(log_file_name)
process_manager) log_file.seek(0,2)
remote_logger.portal = portal log_file.seek(-min(5000,log_file.tell()),2)
remote_logger.test_result_path = test_result_path test_result.addWatch(log_file_name,log_file,max_history_bytes=10000)
remote_logger_thread = threading.Thread(target=remote_logger) if revision != test_result.revision:
remote_logger_thread.start() previous_revision = test_result.revision
if revision != test_revision:
previous_revision = test_revision
log('Disagreement on tested revision, checking out:') log('Disagreement on tested revision, checking out:')
for i, repository_revision in enumerate(test_revision.split(',')): for i, repository_revision in enumerate(previous_revision.split(',')):
vcs_repository = vcs_repository_list[i] vcs_repository = vcs_repository_list[i]
repository_path = vcs_repository['repository_path'] repository_path = vcs_repository['repository_path']
revision = repository_revision.rsplit('-', 1)[1] revision = repository_revision.rsplit('-', 1)[1]
...@@ -328,7 +208,6 @@ branch = %(branch)s ...@@ -328,7 +208,6 @@ branch = %(branch)s
# as partitions can be of any kind we have and likely will never have # as partitions can be of any kind we have and likely will never have
# a reliable way to check if they are up or not ... # a reliable way to check if they are up or not ...
time.sleep(20) time.sleep(20)
run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" %config['instance_root']) run_test_suite_path_list = glob.glob("%s/*/bin/runTestSuite" %config['instance_root'])
if not len(run_test_suite_path_list): if not len(run_test_suite_path_list):
raise ValueError('No runTestSuite provided in installed partitions.') raise ValueError('No runTestSuite provided in installed partitions.')
...@@ -356,17 +235,19 @@ branch = %(branch)s ...@@ -356,17 +235,19 @@ branch = %(branch)s
process_manager.spawn(*invocation_list, process_manager.spawn(*invocation_list,
cwd=config['test_suite_directory'], cwd=config['test_suite_directory'],
log_prefix='runTestSuite', get_output=False) log_prefix='runTestSuite', get_output=False)
if remote_logger: if test_result is not None:
remote_logger.quit = True test_result.removeWatch(log_file_name)
remote_logger_thread.join()
except SubprocessError, e: except SubprocessError, e:
log("SubprocessError", exc_info=sys.exc_info()) log("SubprocessError", exc_info=sys.exc_info())
if remote_logger: if test_result is not None:
remote_logger.finish = True test_result.removeWatch(log_file_name)
remote_logger_thread.join()
if remote_test_result_needs_cleanup: if remote_test_result_needs_cleanup:
safeRpcCall(log, portal, "reportTaskFailure", True, status_dict = e.status_dict or {}
test_result_path, e.status_dict, config['test_node_title']) test_result.reportFailure(
command = status_dict.get('command'),
stdout = status_dict.get('stdout'),
stderr = status_dict.get('stderr'),
)
log("SubprocessError, going to sleep %s" % DEFAULT_SLEEP_TIMEOUT) log("SubprocessError, going to sleep %s" % DEFAULT_SLEEP_TIMEOUT)
time.sleep(DEFAULT_SLEEP_TIMEOUT) time.sleep(DEFAULT_SLEEP_TIMEOUT)
continue continue
...@@ -384,5 +265,9 @@ branch = %(branch)s ...@@ -384,5 +265,9 @@ branch = %(branch)s
# Exceptions are swallowed during cleanup phase # Exceptions are swallowed during cleanup phase
log('Testnode.run, finally close') log('Testnode.run, finally close')
process_manager.killPreviousRun() process_manager.killPreviousRun()
if remote_logger: if test_result is not None:
remote_logger.quit = True try:
test_result.removeWatch(log_file_name)
except KeyError:
log("KeyError, Watcher already deleted or not added correctly")
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment