Commit d085369f authored by Alain Takoudjou's avatar Alain Takoudjou

signal

parent b8d4799b
......@@ -40,7 +40,7 @@ import psutil
import functools
import signal
import inspect
from multiprocessing import Process, Queue as MQueue
from multiprocessing import Event, Process, Queue as MQueue
from six.moves import queue, reload_module
from slapos.util import mkdir_p, chownDirectory
from slapos.grid.utils import dropPrivileges, killProcessTree
......@@ -233,8 +233,8 @@ class PromiseProcess(Process):
class PromiseWorker(Process):
def __init__(self, task_queue, done_queue, logger, plugin_folder, output_folder,
promise_timeout, check_anomaly, debug=False, force=False, dry_run=False,
uid=None, gid=None):
promise_timeout, exit_event, check_anomaly, debug=False, force=False,
dry_run=False, uid=None, gid=None):
"""
Launch the promise and send the result to queue.
......@@ -260,19 +260,20 @@ class PromiseWorker(Process):
self.uid = uid
self.gid = gid
self.bang_called = False
self.__close_queue = False
self.exit_event = exit_event
@staticmethod
def terminateProcess(pps, logger, signum, frame):
pps.__close_queue = True
if pps.current_process is None:
#@staticmethod
#def terminateProcess(self, logger, signum, frame):
def terminateProcess(self):
if self.current_process is None:
return
process = pps.current_process
if signum in [signal.SIGINT, signal.SIGTERM] and process.is_alive():
process = self.current_process
#if signum in [signal.SIGINT, signal.SIGTERM] and process.is_alive():
if process.is_alive():
process.terminate()
process.join(1)
if process.is_alive():
logger.info("Killing process %s..." % process.getPromiseTitle())
self.logger.info("Killing process %s..." % process.getPromiseTitle())
killProcessTree(process.pid, logger)
def _createInitFile(self):
......@@ -407,6 +408,8 @@ class PromiseWorker(Process):
# no result found in process result Queue
if not promise_process.is_alive():
break
except KeyboardInterrupt:
return
else:
break
......@@ -459,10 +462,7 @@ class PromiseWorker(Process):
dropPrivileges(self.uid, self.gid, logger=self.logger)
self._createInitFile()
self.current_process = None
handler = functools.partial(self.terminateProcess, self, self.logger)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
while not self.__close_queue:
while not self.exit_event.is_set():
try:
promise_name, task_dict = self.task_queue.get(True, 1)
except queue.Empty:
......@@ -471,6 +471,7 @@ class PromiseWorker(Process):
except KeyboardInterrupt:
break
self.proceedTask(promise_name, task_dict)
self.terminateProcess()
class PromiseLauncher(object):
......@@ -567,6 +568,8 @@ class PromiseLauncher(object):
self.task_queue = MQueue()
self.done_queue = MQueue()
self.bang_called = False
# this event is set to ask worker to exit
self.exit_event = Event()
self.promise_output_dir = os.path.join(
self.partition_folder,
......@@ -598,12 +601,18 @@ class PromiseLauncher(object):
PROMISE_STATE_FOLDER_NAME)
chownDirectory(folder_path, stat_info.st_uid, stat_info.st_gid)
@staticmethod
def terminateWorker(me, worker, signum, frame):
if signum in [signal.SIGINT, signal.SIGTERM] and worker.is_alive():
me.exit_event.set()
def _launchPromiseWorker(self):
failed_promise_name = ""
promise_worker = PromiseWorker(
task_queue=self.task_queue,
done_queue=self.done_queue,
logger=self.logger,
exit_event=self.exit_event,
output_folder=self.promise_output_dir,
debug=self.debug,
force=self.force,
......@@ -615,14 +624,20 @@ class PromiseLauncher(object):
dry_run=self.dry_run
)
promise_worker.start()
handler = functools.partial(self.terminateWorker, self, promise_worker)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
while promise_worker.is_alive():
try:
promise_result = self.done_queue.get(True, 0.5)
except queue.Empty:
# no result Yet!
continue
except KeyboardInterrupt:
break
#except KeyboardInterrupt:
# self.exit_event.set()
# break
if promise_result.item.type() == "Empty Result":
# Empty result, promise skipped
continue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment