Commit 4a202f4a authored by Pere Cortes's avatar Pere Cortes Committed by Sebastien Robin

Security Thread added to kill blocked process

parent e2c62ce3
......@@ -23,7 +23,8 @@ sys.path.extend([
from erp5.util.testnode.testnode import TestNode
from erp5.util.testnode.testnode import SlapOSInstance
from erp5.util.testnode.ProcessManager import ProcessManager
from erp5.util.testnode.ProcessManager import ProcessManager, SubprocessError
from erp5.util.testnode.SlapOSControler import SlapOSControler
from erp5.util.taskdistribution import TaskDistributor
from erp5.util.taskdistribution import TaskDistributionTool
......@@ -78,12 +79,12 @@ class ERP5TestNode(TestCase):
config["test_node_title"] = "Foo-Test-Node"
return TestNode(log, config)
def getTestSuiteData(self, add_third_repository=False):
def getTestSuiteData(self, add_third_repository=False, reference="foo"):
data = [{
"test_suite": "Foo",
"project_title": "Foo",
"project_title": reference,
"test_suite_title": "Foo-Test",
"test_suite_reference": "foo",
"test_suite_reference": reference,
"vcs_repository_list": [
{'url': self.remote_repository0,
'profile_path': 'software.cfg',
......@@ -275,7 +276,7 @@ branch = foo
self.assertEquals([commit_dict['rep0'][0][0],commit_dict['rep1'][1][0]],
getRepInfo(hash=1))
def test_07_checkOldTestSuite(self):
def test_07_checkExistingTestSuite(self):
test_node = self.getTestNode()
test_suite_data = self.getTestSuiteData(add_third_repository=True)
self.assertEquals([], os.listdir(self.working_directory))
......@@ -313,6 +314,9 @@ branch = foo
Check parameters passed to runTestSuite
Also make sure that --firefox_bin and --xvfb_bin are passed when needed
"""
original_getSupportedParameter = ProcessManager.getSupportedParameterSet
original_spawn = ProcessManager.spawn
try:
def _createPath(path_to_create, end_path):
os.makedirs(path_to_create)
return os.close(os.open(os.path.join(path_to_create,
......@@ -326,7 +330,7 @@ branch = foo
return []
test_node = self.getTestNode()
test_node.slapos_controler = SlapOSControler(self.working_directory,
test_node.config,test_node.log)
test_node.config)
node_test_suite = test_node.getNodeTestSuite('foo')
self.updateNodeTestSuiteData(node_test_suite)
node_test_suite.revision = 'dummy'
......@@ -354,27 +358,46 @@ branch = foo
'--xvfb_bin',
'%s/soft/a/parts/xserver/bin/Xvfb'
%(test_node.config['slapos_directory'])])
finally:
ProcessManager.getSupportedParameterSet = original_getSupportedParameter
ProcessManager.spawn = original_spawn
def test_10_prepareSlapOS(self):
def patch_initializeSlapOSControler(self, *args, **kw):
pass
test_node = self.getTestNode()
test_node_slapos = SlapOSInstance()
node_test_suite = test_node.getNodeTestSuite('foo')
node_test_suite.edit(working_directory=self.working_directory)
status_dict = {"status_code" : 0}
def patch_run(self, *args,**kw):
return status_dict
SlapOSControler.initializeSlapOSControler = patch_initializeSlapOSControler
SlapOSControler.runSoftwareRelease = patch_run
SlapOSControler.runComputerPartition = patch_run
global call_list
call_list = []
class Patch:
def __init__(self, method_name, status_code=0):
self.method_name = method_name
self.status_code = status_code
def __call__(self, *args, **kw):
global call_list
call_list.append({"method_name": self.method_name,
"args": [x for x in args],
"kw": kw})
return {"status_code": self.status_code}
SlapOSControler.initializeSlapOSControler = Patch("initializeSlapOSControler")
SlapOSControler.runSoftwareRelease = Patch("runSoftwareRelease")
SlapOSControler.runComputerPartition = Patch("runComputerPartition")
test_node.prepareSlapOSForTestNode(test_node_slapos)
self.assertEqual(status_dict,{'status_code':0})
self.assertEquals(["initializeSlapOSControler", "runSoftwareRelease"],
[x["method_name"] for x in call_list])
call_list = []
test_node.prepareSlapOSForTestSuite(node_test_suite)
self.assertEqual(status_dict,{'status_code':0})
self.assertEquals(["initializeSlapOSControler", "runSoftwareRelease",
"runComputerPartition"],
[x["method_name"] for x in call_list])
call_list = []
SlapOSControler.runSoftwareRelease = Patch("runSoftwareRelease", status_code=1)
self.assertRaises(SubprocessError, test_node.prepareSlapOSForTestSuite,
node_test_suite)
def test_11_run(self):
def patch_function(self, *args, **kw):
def doNothing(self, *args, **kw):
pass
def patch_killPrevious():
pass
......@@ -385,29 +408,74 @@ branch = foo
counter = 0
def patch_startTestSuite(self,test_node_title):
global counter
if counter == 2:
raise StopIteration
counter += 1
config_list = []
def _checkExistingTestSuite(reference_set):
test_self.assertEquals(set(reference_set),
set(os.listdir(test_node.config["working_directory"])))
for x in reference_set:
test_self.assertTrue(os.path.exists(os.path.join(
test_node.config["working_directory"],x)),True)
if counter == 0:
config_list.append(test_self.getTestSuiteData()[0])
config_list.append(test_self.getTestSuiteData(reference='bar')[0])
elif counter == 1:
_checkExistingTestSuite(set(['foo']))
config_list.append(test_self.getTestSuiteData(reference='bar')[0])
config_list.append(test_self.getTestSuiteData()[0])
config_list.append(test_self.getTestSuiteData(add_third_repository=True)[0])
elif counter == 2:
_checkExistingTestSuite(set(['foo','bar']))
config_list.append(test_self.getTestSuiteData()[0])
config_list.append(test_self.getTestSuiteData(reference='qux')[0])
elif counter == 3:
_checkExistingTestSuite(set(['foo','qux']))
test_node.process_manager.under_cancellation = True
config_list.append(test_self.getTestSuiteData(reference='foox')[0])
elif counter == 4:
test_node.process_manager.under_cancellation = False
config_list.append(test_self.getTestSuiteData(reference='bax')[0])
elif counter == 5:
_checkExistingTestSuite(set(['bax']))
raise StopIteration
counter += 1
return json.dumps(config_list)
def patch_createTestResult(self, revision, test_name_list, node_title,
allow_restart=False, test_title=None, project_title=None):
global counter
if counter == 3 and project_title != 'qux':
result = None
else:
test_result_path = os.path.join(test_result_path_root, test_title)
result = TestResultProxy(self._proxy, self._retry_time,
self._logger, test_result_path, node_title, revision)
return result
time.sleep = patch_function
original_sleep = time.sleep
time.sleep = doNothing
self.generateTestRepositoryList()
TaskDistributor.startTestSuite = patch_startTestSuite
TaskDistributionTool.createTestResult = patch_createTestResult
test_node = self.getTestNode()
test_node._prepareSlapOS = patch_function
test_node.runTestSuite = patch_function
SlapOSControler.initializeSlapOSControler = patch_function
test_node._prepareSlapOS = doNothing
test_node.runTestSuite = doNothing
SlapOSControler.initializeSlapOSControler = doNothing
test_node.process_manager.killPreviousRun = patch_killPrevious
try:
test_node.run()
except Exception as e:
self.assertEqual(type(e),StopIteration)
finally:
time.sleep = original_sleep
def test_12_spawn(self):
def _log(*args,**kw):
for arg in args:
print "TESTNODE LOG : %r" % (arg,)
def _checkCorrectStatus(expected_status,*args):
result = process_manager.spawn(*args)
self.assertEqual(result['status_code'], expected_status)
def patch_sleep(n):
subprocess.check_call(["sleep", n/10000])
original_sleep = time.sleep
process_manager = ProcessManager(log=_log)
_checkCorrectStatus(0, *['sleep','3'])
_checkCorrectStatus(-15, *['sleep','8'])
time.sleep = original_sleep
......@@ -30,6 +30,9 @@ import subprocess
import threading
import signal
import sys
import time
MAX_TIMEOUT = 5
class SubprocessError(EnvironmentError):
def __init__(self, status_dict):
......@@ -39,6 +42,12 @@ class SubprocessError(EnvironmentError):
def __str__(self):
return 'Error %i' % self.status_code
class TimeoutError(EnvironmentError):
def __init__(self):
pass
def __str__(self):
return 'Timeout expired. Process killed'
class CancellationError(EnvironmentError):
pass
......@@ -98,8 +107,16 @@ class ProcessManager(object):
self.process_pid_set = set()
signal.signal(signal.SIGTERM, self.sigterm_handler)
self.under_cancellation = False
self.p = None
self.result = None
def spawn(self, *args, **kw):
def timeoutExpired(p):
time.sleep(MAX_TIMEOUT)
if p.poll() is None:
p.terminate()
raise TimeoutError
if self.under_cancellation:
raise CancellationError("Test Result was cancelled")
get_output = kw.pop('get_output', True)
......@@ -120,6 +137,8 @@ class ProcessManager(object):
p = subprocess.Popen(args, stdin=self.stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env, **subprocess_kw)
self.process_pid_set.add(p.pid)
thread = threading.Thread(target=timeoutExpired, args=(p,))
thread.start()
stdout, stderr = subprocess_capture(p, self.log, log_prefix,
get_output=get_output)
result = dict(status_code=p.returncode, command=command,
......@@ -127,7 +146,7 @@ class ProcessManager(object):
self.process_pid_set.discard(p.pid)
if self.under_cancellation:
raise CancellationError("Test Result was cancelled")
if raise_error_if_fail and p.returncode:
if raise_error_if_fail and p.returncode != -15 and p.returncode:
raise SubprocessError(result)
return result
......
......@@ -257,7 +257,7 @@ branch = %(branch)s
We will build slapos software needed by the testnode itself,
like the building of selenium-runner by default
"""
self._prepareSlapOS(self.config['slapos_directory'],
return self._prepareSlapOS(self.config['slapos_directory'],
test_node_slapos, create_partition=0,
software_path_list=self.config.get("software_list"))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment