Commit a507993c authored by Alain Takoudjou's avatar Alain Takoudjou

signal

parent b8d4799b
......@@ -40,7 +40,7 @@ import psutil
import functools
import signal
import inspect
from multiprocessing import Process, Queue as MQueue
from multiprocessing import Event, Process, Queue as MQueue
from six.moves import queue, reload_module
from slapos.util import mkdir_p, chownDirectory
from slapos.grid.utils import dropPrivileges, killProcessTree
......@@ -233,8 +233,8 @@ class PromiseProcess(Process):
class PromiseWorker(Process):
def __init__(self, task_queue, done_queue, logger, plugin_folder, output_folder,
promise_timeout, check_anomaly, debug=False, force=False, dry_run=False,
uid=None, gid=None):
promise_timeout, exit_event, check_anomaly, debug=False, force=False,
dry_run=False, uid=None, gid=None):
"""
Launch the promise and send the result to queue.
......@@ -260,20 +260,17 @@ class PromiseWorker(Process):
self.uid = uid
self.gid = gid
self.bang_called = False
self.__close_queue = False
self.exit_event = exit_event
@staticmethod
def terminateProcess(pps, logger, signum, frame):
pps.__close_queue = True
if pps.current_process is None:
def cleanupRunningProcess(self):
if self.current_process is None:
return
process = pps.current_process
if signum in [signal.SIGINT, signal.SIGTERM] and process.is_alive():
process.terminate()
process.join(1)
if process.is_alive():
logger.info("Killing process %s..." % process.getPromiseTitle())
killProcessTree(process.pid, logger)
if self.current_process.is_alive():
self.current_process.terminate()
self.current_process.join(1)
if self.current_process.is_alive():
self.logger.info("Killing process %s..." % self.current_process.name)
killProcessTree(self.current_process.pid, logger)
def _createInitFile(self):
# if there is no __init__ file, add it
......@@ -390,6 +387,8 @@ class PromiseWorker(Process):
queue_item = None
sleep_time = 1
increment_limit = int(self.promise_timeout / sleep_time)
if increment_limit == 0:
sleep_time = self.promise_timeout
psutil_process = None
if self.debug:
try:
......@@ -407,6 +406,8 @@ class PromiseWorker(Process):
# no result found in process result Queue
if not promise_process.is_alive():
break
except KeyboardInterrupt:
return
else:
break
......@@ -459,10 +460,7 @@ class PromiseWorker(Process):
dropPrivileges(self.uid, self.gid, logger=self.logger)
self._createInitFile()
self.current_process = None
handler = functools.partial(self.terminateProcess, self, self.logger)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
while not self.__close_queue:
while not self.exit_event.is_set():
try:
promise_name, task_dict = self.task_queue.get(True, 1)
except queue.Empty:
......@@ -471,6 +469,7 @@ class PromiseWorker(Process):
except KeyboardInterrupt:
break
self.proceedTask(promise_name, task_dict)
self.cleanupRunningProcess()
class PromiseLauncher(object):
......@@ -567,6 +566,8 @@ class PromiseLauncher(object):
self.task_queue = MQueue()
self.done_queue = MQueue()
self.bang_called = False
# this event is set to ask worker to exit
self.exit_event = Event()
self.promise_output_dir = os.path.join(
self.partition_folder,
......@@ -598,12 +599,20 @@ class PromiseLauncher(object):
PROMISE_STATE_FOLDER_NAME)
chownDirectory(folder_path, stat_info.st_uid, stat_info.st_gid)
@staticmethod
def terminateWorker(me, worker, signum, frame):
if signum in [signal.SIGINT, signal.SIGTERM] and worker.is_alive():
me.exit_event.set()
def _launchPromiseWorker(self):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
failed_promise_name = ""
promise_worker = PromiseWorker(
task_queue=self.task_queue,
done_queue=self.done_queue,
logger=self.logger,
exit_event=self.exit_event,
output_folder=self.promise_output_dir,
debug=self.debug,
force=self.force,
......@@ -615,14 +624,17 @@ class PromiseLauncher(object):
dry_run=self.dry_run
)
promise_worker.start()
handler = functools.partial(self.terminateWorker, self, promise_worker)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
while promise_worker.is_alive():
try:
promise_result = self.done_queue.get(True, 0.5)
except queue.Empty:
# no result Yet!
continue
except KeyboardInterrupt:
break
if promise_result.item.type() == "Empty Result":
# Empty result, promise skipped
continue
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment