Commit 63a9e08a authored by Alain Takoudjou's avatar Alain Takoudjou

fixup to make it work

parent 24fb62a4
...@@ -35,10 +35,13 @@ import time ...@@ -35,10 +35,13 @@ import time
import importlib import importlib
import ConfigParser import ConfigParser
import re import re
import traceback
import psutil import psutil
import subprocess
import slapos.slap import slapos.slap
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from multiprocessing import Process, Queue from multiprocessing import Process, Queue as MQueue
import Queue
from datetime import datetime, timedelta from datetime import datetime, timedelta
from threading import Thread from threading import Thread
from slapos.grid.promise import interface from slapos.grid.promise import interface
...@@ -49,13 +52,15 @@ class PromiseError(Exception): ...@@ -49,13 +52,15 @@ class PromiseError(Exception):
pass pass
class BaseResult(object): class BaseResult(object):
def __init__(self, problem=False, message=None, date): def __init__(self, problem=False, message=None, date=None):
self.__problem = problem self.__problem = problem
self.__message = message self.__message = message
self.__date = date self.__date = date
if self.__date is None:
self.__date = datetime.now()
def hasFailed(self): def hasFailed(self):
return not self.__problem return self.__problem
@property @property
def type(self): def type(self):
...@@ -78,8 +83,8 @@ class TestResult(BaseResult): ...@@ -78,8 +83,8 @@ class TestResult(BaseResult):
class AnomalyResult(BaseResult): class AnomalyResult(BaseResult):
@property @property
def type(self): def type(self):
return "Anomaly Result" return "Anomaly Result"
class PromiseQueueResult(object): class PromiseQueueResult(object):
...@@ -99,22 +104,22 @@ class GenericPromise(object): ...@@ -99,22 +104,22 @@ class GenericPromise(object):
self.__log_folder = self.__config.pop('log-folder', None) self.__log_folder = self.__config.pop('log-folder', None)
self.__partition_folder = self.__config.pop('partition-folder', None) self.__partition_folder = self.__config.pop('partition-folder', None)
sef.__check_anomaly = self.___config.pop('check-anomaly', False) self.__check_anomaly = self.__config.pop('check-anomaly', False)
self.__title = self.___config.pop('title', None) self.__title = self.__config.pop('title', None)
self.__periodicity = self.___config.pop('periodicity', None) self.__periodicity = self.__config.pop('periodicity', None)
self.__debug = self.___config.pop('debug', True) self.__debug = self.__config.pop('debug', True)
self.__name = self.___config.pop('name', None) self.__name = self.__config.pop('name', None)
self.__promise_path = self.___config.pop('path', None) self.__promise_path = self.__config.pop('path', None)
self.queue = self.___config.pop('queue', None) self.queue = self.__config.pop('queue', None)
self.__logger_buffer = None self.__logger_buffer = None
self._validateConf() self._validateConf()
self._configureLogger() self._configureLogger()
for key, value in config.items(): for key, value in config.items():
setattr(self, key.replace('-', '_', value) setattr(self, key.replace('-', '_'), value)
def _configureLogger(self): def _configureLogger(self):
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(self.__name)
if self.__log_folder is None: if self.__log_folder is None:
# configure logger with StringIO # configure logger with StringIO
import cStringIO import cStringIO
...@@ -130,8 +135,8 @@ class GenericPromise(object): ...@@ -130,8 +135,8 @@ class GenericPromise(object):
self.logger.setLevel(logging.DEBUG if self.__debug else logging.INFO) self.logger.setLevel(logging.DEBUG if self.__debug else logging.INFO)
logger_handler.setFormatter( logger_handler.setFormatter(
format=logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"), fmt=logging.Formatter("%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S" datefmt="%Y-%m-%d %H:%M:%S")
) )
self.logger.addHandler(logger_handler) self.logger.addHandler(logger_handler)
...@@ -156,8 +161,9 @@ class GenericPromise(object): ...@@ -156,8 +161,9 @@ class GenericPromise(object):
"(?P<day_of_week>\*|[0-6](\-[0-6])?)"\ "(?P<day_of_week>\*|[0-6](\-[0-6])?)"\
) )
) )
if validate_crontab_time_format_regex.match(period) is None: #if validate_crontab_time_format_regex.match(self.__periodicity) is None:
raise ValueError("Periodicity %r is not a valid Cron time format." % period) # raise ValueError("Promise periodicity %r is not a valid Cron time " \
# "format." % self.__periodicity)
def getConfig(self): def getConfig(self):
return self.__config return self.__config
...@@ -171,7 +177,7 @@ class GenericPromise(object): ...@@ -171,7 +177,7 @@ class GenericPromise(object):
def getPartitionFolder(self): def getPartitionFolder(self):
return self.__partition_folder return self.__partition_folder
de getPromiseFile(self): def getPromiseFile(self):
return self.__promise_path return self.__promise_path
def __bang(self, message): def __bang(self, message):
...@@ -194,18 +200,24 @@ class GenericPromise(object): ...@@ -194,18 +200,24 @@ class GenericPromise(object):
) )
computer_partition.bang(message) computer_partition.bang(message)
def __getResultFromString(self, regex, result_string, only_failure=False): def __getLogRegex(self):
return re.compile(r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\-?\s*(\w{4,7})\s+\-?\s*(.*)")
def __getResultFromString(self, result_string, only_failure=False):
line_list = result_string.split('\n') line_list = result_string.split('\n')
result_list = [] result_list = []
line_part = "" line_part = ""
regex = self.__getLogRegex()
for line in line_list: for line in line_list:
if not line:
continue
match = regex.match(line) match = regex.match(line)
if match is not None: if match is not None:
if not only_failure or (only_failure and match.groups()[1] == 'ERROR'): if not only_failure or (only_failure and match.groups()[1] == 'ERROR'):
result_list.append([ result_list.append([
match.groups()[0], match.groups()[0],
match.groups()[1], match.groups()[1],
match.groups()[2] + line_part, (match.groups()[2] + line_part).strip(),
]) ])
line_part = "" line_part = ""
else: else:
...@@ -224,12 +236,9 @@ class GenericPromise(object): ...@@ -224,12 +236,9 @@ class GenericPromise(object):
ex: [['2018-02-02 17:49:01', 'ERROR', "Promise has failed"], ...] ex: [['2018-02-02 17:49:01', 'ERROR', "Promise has failed"], ...]
""" """
regex = re.compile(r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+(\w{4,7})\s+(.*)")
if self.__log_file is None: if self.__log_file is None:
if self.__logger_buffer is not None: if self.__logger_buffer is not None:
return self.__getResultFromString(regex, return self.__getResultFromString(self.__logger_buffer.getvalue(),
self.__logger_buffer.getvalue(),
only_failure) only_failure)
else: else:
return [] return []
...@@ -237,6 +246,7 @@ class GenericPromise(object): ...@@ -237,6 +246,7 @@ class GenericPromise(object):
if not os.path.exists(self.__log_file): if not os.path.exists(self.__log_file):
return [] return []
regex = self.__getLogRegex()
date = datetime.now() - timedelta(minutes=latest_minute) date = datetime.now() - timedelta(minutes=latest_minute)
date_string = date.strftime('%Y-%m-%d %H:%M:%S') date_string = date.strftime('%Y-%m-%d %H:%M:%S')
...@@ -266,7 +276,7 @@ class GenericPromise(object): ...@@ -266,7 +276,7 @@ class GenericPromise(object):
line_list.append([ line_list.append([
result.groups()[0], result.groups()[0],
result.groups()[1], result.groups()[1],
result.groups()[2] + line_part, (result.groups()[2] + line_part).strip(),
]) ])
else: else:
line_part += '\n' + line line_part += '\n' + line
...@@ -276,15 +286,38 @@ class GenericPromise(object): ...@@ -276,15 +286,38 @@ class GenericPromise(object):
line_list.reverse() line_list.reverse()
return line_list return line_list
def defaultTest(self, latest_minute=3):
"""
Fail if the latest 2 messages has failed.
If only there is only one result, fail if that result has error
"""
problem = False
message = ""
latest_result_list = self.getLastPromiseResultList(
latest_minute=latest_minute,
only_failure=False
)
result_size = len(latest_result_list)
if result_size == 0:
return TestResult(problem=False, message="No result!")
i = 0
latest_result_list.reverse()
# we test at most the 2 latest results
while i < result_size and i < 2:
if latest_result_list[i][1] == 'ERROR':
problem = True
i += 1
return TestResult(problem=problem, message=latest_result_list[0][2])
@abstractmethod @abstractmethod
def sense(self): def sense(self):
"""Run the promise code and store the result""" """Run the promise code and store the result"""
@abstractmethod
def anomaly(self): def anomaly(self):
"""Called to detect if there is an anomaly which require to bang.""" """Called to detect if there is an anomaly which require to bang."""
@abstractmethod
def test(self): def test(self):
"""Test promise and say if problem is detected or not""" """Test promise and say if problem is detected or not"""
...@@ -299,7 +332,7 @@ class GenericPromise(object): ...@@ -299,7 +332,7 @@ class GenericPromise(object):
except Exception, e: except Exception, e:
# log the result # log the result
self.logger.error(str(e)) self.logger.error(str(e))
if sef.__check_anomaly: if self.__check_anomaly:
# run sense, anomaly # run sense, anomaly
try: try:
result = self.anomaly() result = self.anomaly()
...@@ -336,7 +369,6 @@ class PromiseWrapper(GenericPromise): ...@@ -336,7 +369,6 @@ class PromiseWrapper(GenericPromise):
A wrapper promise used to run old promises style and bash promises A wrapper promise used to run old promises style and bash promises
""" """
import subprocess
zope_interface.implements(interface.IPromise) zope_interface.implements(interface.IPromise)
def __init__(self, config): def __init__(self, config):
...@@ -357,32 +389,14 @@ class PromiseWrapper(GenericPromise): ...@@ -357,32 +389,14 @@ class PromiseWrapper(GenericPromise):
message = output or "" message = output or ""
if error: if error:
message += "\n" + error message += "\n" + error
if promise_process.return_code != 0: if promise_process.returncode != 0:
self.logger.error(message) self.logger.error(message.strip())
else: else:
self.logger.info(message) self.logger.info(message.strip())
def test(self): def test(self):
""" return self.defaultTest(latest_minute=3)
Fail if the latest 2 messages has failed.
"""
failed_count = 0
message = ""
latest_result_list = self.getLastPromiseResultList(latest_minute=4,
only_failure=False)
if len(latest_result_list) == 0:
return TestResult(problem=False, message="No result")
latest_result_list.reverse()
for in in range(0, 2):
if latest_result_list[i][1] == 'ERROR':
failed_count += 1
return TestResult(
problem=failed_count == 2,
message=latest_result_list[0][2]
)
class PromiseRunner(Process): class PromiseRunner(Process):
...@@ -399,9 +413,9 @@ class PromiseRunner(Process): ...@@ -399,9 +413,9 @@ class PromiseRunner(Process):
@param allow_bang: Bolean saying if bang should be called in case of @param allow_bang: Bolean saying if bang should be called in case of
anomaly failure. anomaly failure.
""" """
Process.__init__() Process.__init__(self)
self.promise_instance = promise_instance self.promise = promise_instance
self.config = config self.logger = logger
self.allow_bang = allow_bang self.allow_bang = allow_bang
self.uid = uid self.uid = uid
self.gid = gid self.gid = gid
...@@ -409,11 +423,11 @@ class PromiseRunner(Process): ...@@ -409,11 +423,11 @@ class PromiseRunner(Process):
# self.daemon = False # self.daemon = False
def run(self): def run(self):
if self.uid is not None and self.gid is not None: if self.uid and self.gid:
dropPrivileges(uid, gid, logger=logger) dropPrivileges(self.uid, self.gid, logger=self.logger)
if self.cwd is not None: if self.cwd is not None:
os.chdir(self.cwd) os.chdir(self.cwd)
promise_instance.run(self.allow_bang) self.promise.run(self.allow_bang)
class PromiseLauncher(object): class PromiseLauncher(object):
...@@ -462,6 +476,7 @@ class PromiseLauncher(object): ...@@ -462,6 +476,7 @@ class PromiseLauncher(object):
If True, show Promise consumption and execution time information If True, show Promise consumption and execution time information
""" """
self.save_method = save_method
self.__config = { self.__config = {
'promise-timeout': 20, 'promise-timeout': 20,
'promise-dir': None, 'promise-dir': None,
...@@ -469,21 +484,26 @@ class PromiseLauncher(object): ...@@ -469,21 +484,26 @@ class PromiseLauncher(object):
'log-folder': None, 'log-folder': None,
'profile': False, 'profile': False,
'uid': None, 'uid': None,
'gid': None 'gid': None,
'master-url': None,
'partition-cert': None,
'partition-key': None,
'partition-id': None,
'computer-id': None,
'debug': True,
'check-anomaly': False
} }
if self.config_file is not None: if config_file is not None:
self.loadConfigFromFile(config_file) self.loadConfigFromFile(config_file)
if config is not None: if config is not None:
self.__config.update(config) self.__config.update(config)
if save_method is not None:
self.save_method = save_method
for key, value in self.__config.items(): for key, value in self.__config.items():
setattr(self, key.replace('-', '_'), value or None) setattr(self, key.replace('-', '_'), value or None)
if self.promise_dir is None: if self.promise_dir is None:
raise ValueError("Promise folder is missing in configuration!") raise ValueError("Promise folder is missing in configuration!")
if logger is None: if logger is None:
self.logger = logging.getLogger(__name__) self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG) self.logger.setLevel(logging.DEBUG)
...@@ -492,16 +512,10 @@ class PromiseLauncher(object): ...@@ -492,16 +512,10 @@ class PromiseLauncher(object):
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
) )
self.logger.addHandler(handler) self.logger.addHandler(handler)
else:
self.logger = logger
init_file = os.path.join(self.promise_dir, '__init__.py') self.queue_result = MQueue()
if not os.path.exists(init_file):
with open(ini_file, 'w') as f:
f.write("")
os.chmod(init_file, 0644)
if sys.path[0] != self.promise_dir:
sys.path[0:0] = [self.promise_dir]
self.queue_result = Queue()
self.bang_called = False self.bang_called = False
def _loadConfigFromFile(self, config_file): def _loadConfigFromFile(self, config_file):
...@@ -549,14 +563,22 @@ class PromiseLauncher(object): ...@@ -549,14 +563,22 @@ class PromiseLauncher(object):
Launch the promise and save the result if `self.save_method` is not None Launch the promise and save the result if `self.save_method` is not None
If no save method is set, raise PromiseError in case of failure If no save method is set, raise PromiseError in case of failure
""" """
self.logger.info("Checking promise %s..." % promise_name) if self.profile:
if promise_module is None: self.logger.info("Checking promise %s..." % promise_name)
promise_instance = PromiseWrapper(argument_dict) try:
else: if promise_module is None:
promise_instance = promise_module.RunPromise(argument_dict) promise_instance = PromiseWrapper(argument_dict)
else:
promise_instance = promise_module.RunPromise(argument_dict)
except Exception:
# to not prevent run other promises
self.logger.error(traceback.format_exc())
self.logger.warning("Promise %s skipped." % promise_name)
return
promise_process = PromiseRunner( promise_process = PromiseRunner(
promise_module, promise_instance,
logger=self.logger logger=self.logger,
allow_bang=not self.bang_called and self.check_anomaly, allow_bang=not self.bang_called and self.check_anomaly,
uid=self.uid, uid=self.uid,
gid=self.gid, gid=self.gid,
...@@ -577,7 +599,7 @@ class PromiseLauncher(object): ...@@ -577,7 +599,7 @@ class PromiseLauncher(object):
pass pass
for current_increment in range(0, increment_limit): for current_increment in range(0, increment_limit):
execution_time = current_increment * sleep_time execution_time = current_increment * sleep_time
if not process.is_alive(): if not promise_process.is_alive():
try: try:
queue_item = self.queue_result.get(False, 2) queue_item = self.queue_result.get(False, 2)
except Queue.Empty: except Queue.Empty:
...@@ -643,23 +665,36 @@ class PromiseLauncher(object): ...@@ -643,23 +665,36 @@ class PromiseLauncher(object):
Run all promises Run all promises
""" """
promise_list = [] promise_list = []
base_config = {
'log-folder': self.log_folder,
'partition-folder': self.partition_folder,
'debug': True,
'master-url': self.master_url,
'partition-cert': self.partition_cert,
'partition-key': self.partition_key,
'partition-id': self.partition_id,
'computer-id': self.computer_id,
'queue': self.queue_result,
}
if os.path.exists(self.promise_dir) and os.path.isdir(self.promise_dir): if os.path.exists(self.promise_dir) and os.path.isdir(self.promise_dir):
# if there is no __init file, add it
init_file = os.path.join(self.promise_dir, '__init__.py')
if not os.path.exists(init_file):
with open(init_file, 'w') as f:
f.write("")
os.chmod(init_file, 0644)
if sys.path[0] != self.promise_dir:
sys.path[0:0] = [self.promise_dir]
promise_list = []
# load all promises so we can catch import errors before launch them # load all promises so we can catch import errors before launch them
promise_list = [(promise_name, self._loadPromiseModule(promise_name)) for promise_name in os.listdir(self.promise_dir):
for promise_name in os.listdir(self.promise_dir)] if promise_name.startswith('__init__'):
continue
base_config = { promise_list.append((promise_name,
'log-folder': self.log_folder, self._loadPromiseModule(promise_name)))
'partition-folder': self.partition_folder,
'debug': True,
'master-url': self.master_url,
'partition-cert': self.partition_cert,
'partition-key': self.partition_key,
'partition-id': self.partition_id,
'computer-id': self.computer_id,
'queue': self.queue_result,
}
for promise in promise_list: for promise in promise_list:
config = { config = {
'path': os.path.join(self.promise_dir, promise[0]), 'path': os.path.join(self.promise_dir, promise[0]),
...@@ -670,11 +705,17 @@ class PromiseLauncher(object): ...@@ -670,11 +705,17 @@ class PromiseLauncher(object):
if os.path.exists(self.old_promise_dir) and os.path.isdir(self.old_promise_dir): if os.path.exists(self.old_promise_dir) and os.path.isdir(self.old_promise_dir):
# run old promise styles # run old promise styles
for promise_name in self.old_promise_dir: for promise_name in os.listdir(self.old_promise_dir):
promise_path = os.path.join(self.old_promise_dir, promise_name)
if not os.path.isfile(promise_path) or \
not os.access(promise_path, os.X_OK):
self.logger.warning("Bad promise file at %r." % promise_path)
continue
config = { config = {
'path': os.path.join(self.old_promise_dir, promise_name), 'path': promise_path,
'name': promise_name 'name': promise_name
} }
config.update(base_config) config.update(base_config)
# We will use PromiseWrapper class to run this # We will use promise wrapper to run this
self._launchPromise(promise_name, config) self._launchPromise(promise_name, config)
...@@ -61,7 +61,7 @@ from slapos.grid.svcbackend import (launchSupervisord, ...@@ -61,7 +61,7 @@ from slapos.grid.svcbackend import (launchSupervisord,
_getSupervisordConfigurationDirectory, _getSupervisordConfigurationDirectory,
_getSupervisordSocketPath) _getSupervisordSocketPath)
from slapos.grid.utils import (md5digest, dropPrivileges, SlapPopen, updateFile) from slapos.grid.utils import (md5digest, dropPrivileges, SlapPopen, updateFile)
from slapos.grid.promise import PromiseLauncher from slapos.grid.promise import PromiseLauncher, PromiseError
from slapos.human import human2bytes from slapos.human import human2bytes
import slapos.slap import slapos.slap
from netaddr import valid_ipv4, valid_ipv6 from netaddr import valid_ipv4, valid_ipv6
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment