Commit 7fb88480 authored by Alain Takoudjou's avatar Alain Takoudjou

Update promise management in slapgrid

parent 63e7ec10
...@@ -30,49 +30,56 @@ ...@@ -30,49 +30,56 @@
import os import os
import sys import sys
import subprocess
import logging import logging
import time import time
import importlib import importlib
import ConfigParser import ConfigParser
import re import re
import psutil
import slapos.slap import slapos.slap
from abc import ABCMeta, abstractmethod
from multiprocessing import Process, Queue from multiprocessing import Process, Queue
from datetime import datetime from datetime import datetime, timedelta
from threading import Thread from threading import Thread
from slapos.grid.promise import interface from slapos.grid.promise import interface
from slapos.grid.utils import dropPrivileges
from zope import interface as zope_interface
class PromiseError(Exception): class PromiseError(Exception):
pass pass
class BaseResult(object): class BaseResult(object):
def __init__(self, problem=False, message=None): def __init__(self, problem=False, message=None, date):
self.__problem = problem self.__problem = problem
self.__message = message self.__message = message
self.__date = date
def hasFailed(self): def hasFailed(self):
return not self.__problem return not self.__problem
@property @property
def type(self): def type(self):
return "BaseResult" return "Base Result"
@property @property
def message(self): def message(self):
return self.__message return self.__message
@property
def date(self):
return self.__date
class TestResult(BaseResult): class TestResult(BaseResult):
@property @property
def type(self): def type(self):
return "TestResult" return "Test Result"
class AnomalyResult(BaseResult): class AnomalyResult(BaseResult):
@property @property
def type(self): def type(self):
return "AnomalyResult" return "Anomaly Result"
class PromiseQueueResult(object): class PromiseQueueResult(object):
...@@ -82,7 +89,10 @@ class PromiseQueueResult(object): ...@@ -82,7 +89,10 @@ class PromiseQueueResult(object):
self.result = result self.result = result
self.execution_time = execution_time self.execution_time = execution_time
class GenericPromise(object) class GenericPromise(object):
# Abstract class
__metaclass__ = ABCMeta
def __init__(self, config): def __init__(self, config):
self.__config = config self.__config = config
...@@ -96,22 +106,34 @@ class GenericPromise(object) ...@@ -96,22 +106,34 @@ class GenericPromise(object)
self.__name = self.___config.pop('name', None) self.__name = self.___config.pop('name', None)
self.__promise_path = self.___config.pop('path', None) self.__promise_path = self.___config.pop('path', None)
self.queue = self.___config.pop('queue', None) self.queue = self.___config.pop('queue', None)
self.__logger_buffer = None
self._validateConf() self._validateConf()
self.__log_file = os.path.join(self.__log_folder,
'%s.log' % os.path.splitext(self.__name)[0])
self._configureLogger() self._configureLogger()
for key, value in config.items(): for key, value in config.items():
setattr(self, key.replace('-', '_', value) setattr(self, key.replace('-', '_', value)
def _configureLogger(self): def _configureLogger(self):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
if self.__log_folder is None:
# configure logger with StringIO
import cStringIO
self.__logger_buffer = cStringIO.StringIO()
logger_handler = logging.StreamHandler(self.__logger_buffer)
self.__log_file = None
else:
self.__log_file = os.path.join(
self.__log_folder,
'%s.log' % os.path.splitext(self.__name)[0]
)
logger_handler = logging.FileHandler(self.__log_file)
self.logger.setLevel(logging.DEBUG if self.__debug else logging.INFO) self.logger.setLevel(logging.DEBUG if self.__debug else logging.INFO)
file_handler = logging.FileHandler(self.__log_file) logger_handler.setFormatter(
file_handler.setFormatter( format=logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"),
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") datefmt="%Y-%m-%d %H:%M:%S"
) )
self.logger.addHandler(file_handler) self.logger.addHandler(logger_handler)
def _validateConf(self): def _validateConf(self):
if self.queue is None: if self.queue is None:
...@@ -122,8 +144,6 @@ class GenericPromise(object) ...@@ -122,8 +144,6 @@ class GenericPromise(object)
raise ValueError("Promise path is not set in configuration") raise ValueError("Promise path is not set in configuration")
if self.__title is None: if self.__title is None:
raise ValueError("Monitor title is not set in configuration") raise ValueError("Monitor title is not set in configuration")
if self.__log_folder is None:
raise ValueError("Monitor log folder is not set in configuration")
if self.__partition_folder is None: if self.__partition_folder is None:
raise ValueError("Monitor partition folder is not set in configuration") raise ValueError("Monitor partition folder is not set in configuration")
...@@ -151,6 +171,9 @@ class GenericPromise(object) ...@@ -151,6 +171,9 @@ class GenericPromise(object)
def getPartitionFolder(self): def getPartitionFolder(self):
return self.__partition_folder return self.__partition_folder
de getPromiseFile(self):
return self.__promise_path
def __bang(self, message): def __bang(self, message):
""" """
Call bang if requested Call bang if requested
...@@ -171,6 +194,100 @@ class GenericPromise(object) ...@@ -171,6 +194,100 @@ class GenericPromise(object)
) )
computer_partition.bang(message) computer_partition.bang(message)
def __getResultFromString(self, regex, result_string, only_failure=False):
line_list = result_string.split('\n')
result_list = []
line_part = ""
for line in line_list:
match = regex.match(line)
if match is not None:
if not only_failure or (only_failure and match.groups()[1] == 'ERROR'):
result_list.append([
match.groups()[0],
match.groups()[1],
match.groups()[2] + line_part,
])
line_part = ""
else:
line_part += '\n' + line
return result_list
def getLastPromiseResultList(self, latest_minute=1, only_failure=False):
"""
Return the latest log result of the promise
@param last_minute: the number of minutes in the past. If last_minute is
1, it will return the log of the latest minute execution.
@param only_failure: only return the lines which contain failures.
@return Return a list of logs. The format is [["DATE", "STATUS", MESSAGE]]
ex: [['2018-02-02 17:49:01', 'ERROR', "Promise has failed"], ...]
"""
regex = re.compile(r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+(\w{4,7})\s+(.*)")
if self.__log_file is None:
if self.__logger_buffer is not None:
return self.__getResultFromString(regex,
self.__logger_buffer.getvalue(),
only_failure)
else:
return []
if not os.path.exists(self.__log_file):
return []
date = datetime.now() - timedelta(minutes=latest_minute)
date_string = date.strftime('%Y-%m-%d %H:%M:%S')
line_list = []
with open(self.__log_file, 'r') as f:
offset = 0
f.seek(0, 2)
size = f.tell() * -1
line = line_part = ""
while offset > size:
offset -= 1
f.seek(offset, 2)
char = f.read(1)
if char != '\n':
line = char + line
if char == '\n' or offset == size:
# Add new line
if offset == -1:
continue
if line != "":
result = regex.match(line)
if result is not None:
if result.groups()[0] < date_string:
break
if not only_failure or \
(only_failure and result.groups()[1] == 'ERROR'):
line_list.append([
result.groups()[0],
result.groups()[1],
result.groups()[2] + line_part,
])
else:
line_part += '\n' + line
line = ""
continue
line = line_part = ""
line_list.reverse()
return line_list
@abstractmethod
def sense(self):
"""Run the promise code and store the result"""
@abstractmethod
def anomaly(self):
"""Called to detect if there is an anomaly which require to bang."""
@abstractmethod
def test(self):
"""Test promise and say if problem is detected or not"""
def run(self, can_bang=True): def run(self, can_bang=True):
""" """
Method called to run the Promise Method called to run the Promise
...@@ -201,6 +318,9 @@ class GenericPromise(object) ...@@ -201,6 +318,9 @@ class GenericPromise(object)
except Exception, e: except Exception, e:
result = TestResult(problem=True, message=str(e)) result = TestResult(problem=True, message=str(e))
if self.__logger_buffer is not None:
self.__logger_buffer.close()
# send the result of this promise # send the result of this promise
# should not raise Queue.Full exception as limit is not set to constructor # should not raise Queue.Full exception as limit is not set to constructor
self.queue.put(PromiseQueueResult( self.queue.put(PromiseQueueResult(
...@@ -211,13 +331,57 @@ class GenericPromise(object) ...@@ -211,13 +331,57 @@ class GenericPromise(object)
), True) ), True)
class PromiseWrapper(GenericPromise):
"""
A wrapper promise used to run old promises style and bash promises
"""
import subprocess
zope_interface.implements(interface.IPromise)
def __init__(self, config):
config.update({
"title": config.get('name'),
"periodicity": "*/1 * * * *" # which periodicity to use ?
})
GenericPromise.__init__(self, config)
def sense(self):
promise_process = subprocess.Popen(
[self.getPromiseFile()],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=self.getPartitionFolder()
)
output, error = promise_process.communicate()
message = output or ""
if error:
message += "\n" + error
if promise_process.return_code != 0:
self.logger.error(message)
else:
self.logger.info(message)
def test(self):
failed = False
message = ""
latest_result_list = self.getLastPromiseResultList(latest_minute=2,
only_failure=False)
for result in latest_result_list:
if result[1] == 'ERROR':
failed = True
message += '\n' + result[2]
return TestResult(problem=failed, message=message)
class PromiseRunner(Process): class PromiseRunner(Process):
""" """
Run a promise in a new Process Run a promise in a new Process
""" """
def __init__(self, promise_instance, allow_bang=True): def __init__(self, promise_instance, logger, allow_bang=True, uid=None,
gid=None, cwd=None):
""" """
Initialise Promise Runner Initialise Promise Runner
...@@ -229,9 +393,16 @@ class PromiseRunner(Process): ...@@ -229,9 +393,16 @@ class PromiseRunner(Process):
self.promise_instance = promise_instance self.promise_instance = promise_instance
self.config = config self.config = config
self.allow_bang = allow_bang self.allow_bang = allow_bang
self.daemon = False self.uid = uid
self.gid = gid
self.cwd = cwd
# self.daemon = False
def run(self): def run(self):
if self.uid is not None and self.gid is not None:
dropPrivileges(uid, gid, logger=logger)
if self.cwd is not None:
os.chdir(self.cwd)
promise_instance.run(self.allow_bang) promise_instance.run(self.allow_bang)
...@@ -255,6 +426,8 @@ class PromiseLauncher(object): ...@@ -255,6 +426,8 @@ class PromiseLauncher(object):
Base path of the partition Base path of the partition
promise-dir promise-dir
Promises folder, all promises scripts will be imported from that folder Promises folder, all promises scripts will be imported from that folder
log-folder
Folder where promises will write logs. Can be None
check-anomaly check-anomaly
Ask to check anomaly instead of test. Default: False Ask to check anomaly instead of test. Default: False
debug debug
...@@ -269,11 +442,21 @@ class PromiseLauncher(object): ...@@ -269,11 +442,21 @@ class PromiseLauncher(object):
Computer Partition ID, ex: slappart13 Computer Partition ID, ex: slappart13
computer-id computer-id
Computer ID, ex: COMP-1234 Computer ID, ex: COMP-1234
uid
User UID
gid
User GID
profile
If True, show Promise consumption and execution time information
""" """
self.__config = { self.__config = {
'promise-timeout': 20, 'promise-timeout': 20,
'promise-dir': None, 'promise-dir': None,
'log-folder': None,
'profile': False,
'uid': None,
'gid': None
} }
if self.config_file is not None: if self.config_file is not None:
self.loadConfigFromFile(config_file) self.loadConfigFromFile(config_file)
...@@ -284,7 +467,7 @@ class PromiseLauncher(object): ...@@ -284,7 +467,7 @@ class PromiseLauncher(object):
self.save_method = save_method self.save_method = save_method
for key, value in self.__config.items(): for key, value in self.__config.items():
setattr(self, key.replace('-', '_'), value) setattr(self, key.replace('-', '_'), value or None)
if self.promise_dir is None: if self.promise_dir is None:
raise ValueError("Promise folder is missing in configuration!") raise ValueError("Promise folder is missing in configuration!")
...@@ -357,14 +540,27 @@ class PromiseLauncher(object): ...@@ -357,14 +540,27 @@ class PromiseLauncher(object):
promise_instance = promise_module.RunPromise(argument_dict) promise_instance = promise_module.RunPromise(argument_dict)
promise_process = PromiseRunner( promise_process = PromiseRunner(
promise_module, promise_module,
argument_dict, logger=self.logger
not self.bang_called and self.check_anomaly allow_bang=not self.bang_called and self.check_anomaly,
uid=self.uid,
gid=self.gid,
cwd=self.partition_folder
) )
promise_process.start() promise_process.start()
sleep_time = 0.1 sleep_time = 0.1
increment_limit = int(self.promise_timeout / sleep_time) increment_limit = int(self.promise_timeout / sleep_time)
execution_time = self.promise_timeout
ps_profile = False
if self.profile:
try:
psutil_process = psutil.Process(promise_process.pid)
ps_profile = True
except psutil.NoSuchProcess:
# process is gone
pass
for current_increment in range(0, increment_limit): for current_increment in range(0, increment_limit):
execution_time = current_increment * sleep_time
if not process.is_alive(): if not process.is_alive():
try: try:
queue_item = self.queue_result.get(False, 2) queue_item = self.queue_result.get(False, 2)
...@@ -376,7 +572,7 @@ class PromiseLauncher(object): ...@@ -376,7 +572,7 @@ class PromiseLauncher(object):
promise_instance, promise_instance,
promise_name=promise_name, promise_name=promise_name,
messsage="No output returned by the promise", messsage="No output returned by the promise",
execution_time=current_increment * sleep_time execution_time=execution_time
) )
self.save_method(queue_item) self.save_method(queue_item)
break break
...@@ -385,13 +581,31 @@ class PromiseLauncher(object): ...@@ -385,13 +581,31 @@ class PromiseLauncher(object):
if self.save_method is None: if self.save_method is None:
raise PromiseError(queue_item.result.message) raise PromiseError(queue_item.result.message)
elif isinstance(queue_item.result, AnomalyResult): elif isinstance(queue_item.result, AnomalyResult):
# stop to bang is it was called # stop to bang as it was called
self.bang_called = True self.bang_called = True
if self.save_method is not None: if self.save_method is not None:
queue_item.execution_time = current_increment * sleep_time queue_item.execution_time = execution_time
self.save_method(queue_item) self.save_method(queue_item)
break break
if ps_profile:
try:
io_counter = psutil_process.io_counters()
self.logger.debug(
"[t=%ss] CPU: %s%%, MEM: %s MB (%s%%), DISK: %s Read - %s Write" % (
execution_time,
psutil_process.cpu_percent(),
psutil_process.memory_info().rss / float(2 ** 20),
round(psutil_process.memory_percent(), 4),
io_counter.read_count,
io_counter.write_count
)
)
except (psutil.AccessDenied, psutil.NoSuchProcess):
# defunct process will raise AccessDenied
pass
time.sleep(sleep_time)
else: else:
promise_process.terminate() promise_process.terminate()
message = 'Promise timed out after %s seconds' % self.promise_timeout message = 'Promise timed out after %s seconds' % self.promise_timeout
...@@ -402,10 +616,16 @@ class PromiseLauncher(object): ...@@ -402,10 +616,16 @@ class PromiseLauncher(object):
promise_instance, promise_instance,
promise_name=promise_name, promise_name=promise_name,
message=message, message=message,
execution_time=self.promise_timeout execution_time=execution_time
)) ))
if self.profile:
self.logger.info("Finished promise %r in %s second(s)." % (
promise_name, execution_time))
def run(self): def run(self):
"""
Run all promises
"""
promise_list = [] promise_list = []
# load all promises so we can catch import errors before launch them # load all promises so we can catch import errors before launch them
...@@ -413,14 +633,14 @@ class PromiseLauncher(object): ...@@ -413,14 +633,14 @@ class PromiseLauncher(object):
for promise_name in os.listdir(self.promise_dir)] for promise_name in os.listdir(self.promise_dir)]
base_config = { base_config = {
'log-folder': '', 'log-folder': self.log_folder,
'partition-folder': '', 'partition-folder': self.partition_folder,
'debug': True, 'debug': True,
'master-url': '', 'master-url': self.master_url,
'partition-cert': '', 'partition-cert': self.partition_cert,
'partition-key': '', 'partition-key': self.partition_key,
'partition-id': '', 'partition-id': self.partition_id,
'computer-id': '', 'computer-id': self.computer_id,
'queue': self.queue_result, 'queue': self.queue_result,
} }
for promise in promise_list: for promise in promise_list:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment