Commit 5daf9565 authored by Alain Takoudjou's avatar Alain Takoudjou

slapgrid.promise: Improve promise check speed

Use cache to  save some promise instance information.
First run of slapgrid fill each promise cached (when there is no cache). The next run will check some usefull info like if promise `is test less` or promise `is anomaly less` directly from cache without need to import the module.

Cache is updated if promise file or module change, or if the promise periodicity expire (when the promise it update cache).

/reviewed-on !101
parents d259174f 0c398b4d
...@@ -37,6 +37,7 @@ import json ...@@ -37,6 +37,7 @@ import json
import importlib import importlib
import traceback import traceback
import psutil import psutil
import inspect
from multiprocessing import Process, Queue as MQueue from multiprocessing import Process, Queue as MQueue
from six.moves import queue, reload_module from six.moves import queue, reload_module
from slapos.util import mkdir_p, chownDirectory from slapos.util import mkdir_p, chownDirectory
...@@ -46,11 +47,12 @@ from slapos.grid.promise.generic import (GenericPromise, PromiseQueueResult, ...@@ -46,11 +47,12 @@ from slapos.grid.promise.generic import (GenericPromise, PromiseQueueResult,
AnomalyResult, TestResult, AnomalyResult, TestResult,
PROMISE_STATE_FOLDER_NAME, PROMISE_STATE_FOLDER_NAME,
PROMISE_RESULT_FOLDER_NAME, PROMISE_RESULT_FOLDER_NAME,
PROMISE_PARAMETER_NAME, PROMISE_PARAMETER_NAME)
PROMISE_PERIOD_FILE_NAME)
from slapos.grid.promise.wrapper import WrapPromise from slapos.grid.promise.wrapper import WrapPromise
from slapos.version import version from slapos.version import version
PROMISE_CACHE_FOLDER_NAME = '.slapgrid/promise/cache'
class PromiseError(Exception): class PromiseError(Exception):
pass pass
...@@ -69,6 +71,7 @@ class PromiseProcess(Process): ...@@ -69,6 +71,7 @@ class PromiseProcess(Process):
@param promise_name: The name of the promise to run @param promise_name: The name of the promise to run
@param promise_path: path of the promise @param promise_path: path of the promise
@param argument_dict: all promise parameters in a dictionary @param argument_dict: all promise parameters in a dictionary
@param queue: Queue used to send promise result
@param allow_bang: Bolean saying if bang should be called in case of @param allow_bang: Bolean saying if bang should be called in case of
anomaly failure. anomaly failure.
@param check_anomaly: Bolean saying if promise anomaly should be run. @param check_anomaly: Bolean saying if promise anomaly should be run.
...@@ -89,54 +92,66 @@ class PromiseProcess(Process): ...@@ -89,54 +92,66 @@ class PromiseProcess(Process):
self.partition_folder = partition_folder self.partition_folder = partition_folder
self.wrap_promise = wrap self.wrap_promise = wrap
self._periodicity = None self._periodicity = None
self._timestamp_file = os.path.join(partition_folder, self.cache_folder = os.path.join(self.partition_folder,
PROMISE_STATE_FOLDER_NAME, PROMISE_CACHE_FOLDER_NAME)
'%s.timestamp' % promise_name) self.cache_file = os.path.join(self.cache_folder, self.getPromiseTitle())
periodicity_file = os.path.join(partition_folder, # XXX - remove old files used to store promise timestamp and periodicity
self._cleanupDeprecated()
def _cleanupDeprecated(self):
timestamp_file = os.path.join(self.partition_folder,
PROMISE_STATE_FOLDER_NAME,
'%s.timestamp' % self.name)
periodicity_file = os.path.join(self.partition_folder,
PROMISE_STATE_FOLDER_NAME, PROMISE_STATE_FOLDER_NAME,
PROMISE_PERIOD_FILE_NAME % promise_name) '%s.periodicity' % self.name)
if os.path.exists(periodicity_file) and os.stat(periodicity_file).st_size: if os.path.exists(timestamp_file) and os.path.isfile(timestamp_file):
with open(periodicity_file) as f: os.unlink(timestamp_file)
try: if os.path.exists(periodicity_file) and os.path.isfile(periodicity_file):
self._periodicity = float(f.read()) os.unlink(periodicity_file)
except ValueError:
# set to None, run the promise and regenerate the file def getPromiseTitle(self):
pass return os.path.splitext(self.name)[0]
def isPeriodicityMatch(self): def updatePromiseCache(self, promise_class, promise_instance, started=True):
""" """
Return True if promise should be run now, considering the promise Cache some data from the promise that can be reused
periodicity in minutes
""" """
if self._periodicity is not None and \ py_file = '%s.py' % os.path.splitext(inspect.getfile(promise_class))[0]
os.path.exists(self._timestamp_file) and \ stat = os.stat(py_file)
os.stat(self._timestamp_file).st_size: timestamp = time.time()
with open(self._timestamp_file) as f: cache_dict = dict(
try: is_tested= not hasattr(promise_instance, 'isTested') or \
latest_timestamp = float(f.read()) promise_instance.isTested(),
current_timediff = (time.time() - latest_timestamp) / 60.0 is_anomaly_detected=not hasattr(promise_instance, 'isAnomalyDetected') or \
if current_timediff >= self._periodicity: promise_instance.isAnomalyDetected(),
return True periodicity=promise_instance.getPeriodicity(),
#self.logger.debug("Skip Promise %r. periodicity=%s, time_diff=%s" % ( next_run_after=timestamp + (promise_instance.getPeriodicity() * 60.0),
# self.name, self._periodicity, current_timediff)) timestamp=timestamp,
except ValueError: module_file=py_file,
# if the file is broken, run the promise and regenerate it module_file_mtime=stat.st_mtime,
return True )
else: if not started:
return False cache_dict['next_run_after'] = timestamp
return True with open(self.cache_file, 'w') as f:
f.write(json.dumps(cache_dict))
def setPromiseStartTimestamp(self): def loadPromiseCacheDict(self):
""" """
Save the promise execution timestamp Load cached data for this promise.
If saved promise module file is not exists then invalidate cache.
Cache will be updated when promise run
""" """
state_directory = os.path.dirname(self._timestamp_file) if os.path.exists(self.cache_file):
mkdir_p(state_directory) try:
with open(self._timestamp_file, 'w') as f: with open(self.cache_file) as f:
f.write(str(time.time())) cache_dict = json.loads(f.read())
if not os.path.exists(cache_dict['module_file']):
def getPromiseTitle(self): # file not exists mean path was changed
return os.path.splitext(self.name)[0] return None
return cache_dict
except ValueError:
return None
def run(self): def run(self):
""" """
...@@ -146,9 +161,10 @@ class PromiseProcess(Process): ...@@ -146,9 +161,10 @@ class PromiseProcess(Process):
""" """
try: try:
os.chdir(self.partition_folder) os.chdir(self.partition_folder)
promise_started = False
if self.uid and self.gid: if self.uid and self.gid:
dropPrivileges(self.uid, self.gid, logger=self.logger) dropPrivileges(self.uid, self.gid, logger=self.logger)
mkdir_p(self.cache_folder)
if self.wrap_promise: if self.wrap_promise:
promise_instance = WrapPromise(self.argument_dict) promise_instance = WrapPromise(self.argument_dict)
else: else:
...@@ -161,7 +177,11 @@ class PromiseProcess(Process): ...@@ -161,7 +177,11 @@ class PromiseProcess(Process):
(promise_instance.isAnomalyDetected() and self.check_anomaly) or \ (promise_instance.isAnomalyDetected() and self.check_anomaly) or \
(promise_instance.isTested() and not self.check_anomaly): (promise_instance.isTested() and not self.check_anomaly):
# if the promise will run, we save execution timestamp # if the promise will run, we save execution timestamp
self.setPromiseStartTimestamp() promise_started = True
self.updatePromiseCache(
WrapPromise if self.wrap_promise else promise_module.RunPromise,
promise_instance,
started=promise_started)
promise_instance.run(self.check_anomaly, self.allow_bang) promise_instance.run(self.check_anomaly, self.allow_bang)
except Exception: except Exception:
self.logger.error(traceback.format_exc()) self.logger.error(traceback.format_exc())
...@@ -218,6 +238,7 @@ class PromiseProcess(Process): ...@@ -218,6 +238,7 @@ class PromiseProcess(Process):
key, extra_dict)) key, extra_dict))
self.argument_dict[key] = extra_dict[key] self.argument_dict[key] = extra_dict[key]
class PromiseLauncher(object): class PromiseLauncher(object):
def __init__(self, config=None, logger=None, dry_run=False): def __init__(self, config=None, logger=None, dry_run=False):
...@@ -311,6 +332,7 @@ class PromiseLauncher(object): ...@@ -311,6 +332,7 @@ class PromiseLauncher(object):
self.queue_result = MQueue() self.queue_result = MQueue()
self.bang_called = False self.bang_called = False
self._skipped_amount = 0
self.promise_output_dir = os.path.join( self.promise_output_dir = os.path.join(
self.partition_folder, self.partition_folder,
...@@ -375,6 +397,19 @@ class PromiseLauncher(object): ...@@ -375,6 +397,19 @@ class PromiseLauncher(object):
)) ))
return result return result
def _writePromiseResult(self, result_item):
if result_item.item.type() == "Empty Result":
# no result collected (sense skipped)
return
elif result_item.item.hasFailed():
self.logger.error(result_item.item.message)
if result_item.execution_time != -1 and \
isinstance(result_item.item, AnomalyResult) and self.check_anomaly:
# stop to bang as it was called
self.bang_called = True
# Send result
self._savePromiseResult(result_item)
def _emptyQueue(self): def _emptyQueue(self):
"""Remove all entries from queue until it's empty""" """Remove all entries from queue until it's empty"""
while True: while True:
...@@ -390,6 +425,11 @@ class PromiseLauncher(object): ...@@ -390,6 +425,11 @@ class PromiseLauncher(object):
PROMISE_STATE_FOLDER_NAME) PROMISE_STATE_FOLDER_NAME)
chownDirectory(folder_path, stat_info.st_uid, stat_info.st_gid) chownDirectory(folder_path, stat_info.st_uid, stat_info.st_gid)
def isPeriodicityMatch(self, next_timestamp):
if next_timestamp:
return time.time() >= next_timestamp
return True
def _launchPromise(self, promise_name, promise_path, argument_dict, def _launchPromise(self, promise_name, promise_path, argument_dict,
wrap_process=False): wrap_process=False):
""" """
...@@ -399,7 +439,6 @@ class PromiseLauncher(object): ...@@ -399,7 +439,6 @@ class PromiseLauncher(object):
If the promise periodicity doesn't match, the previous promise result is If the promise periodicity doesn't match, the previous promise result is
checked. checked.
""" """
self.logger.info("Checking promise %s..." % promise_name)
try: try:
promise_process = PromiseProcess( promise_process = PromiseProcess(
self.partition_folder, self.partition_folder,
...@@ -414,8 +453,18 @@ class PromiseLauncher(object): ...@@ -414,8 +453,18 @@ class PromiseLauncher(object):
wrap=wrap_process, wrap=wrap_process,
) )
if not self.force and not promise_process.isPeriodicityMatch(): promise_cache_dict = promise_process.loadPromiseCacheDict()
if promise_cache_dict is not None:
if self.check_anomaly and not promise_cache_dict.get('is_anomaly_detected') \
or not self.check_anomaly and not promise_cache_dict.get('is_tested'):
# promise is skipped, send empty result
self._writePromiseResult(PromiseQueueResult())
self._skipped_amount += 1
return
if not self.force and (promise_cache_dict is not None and not
self.isPeriodicityMatch(promise_cache_dict.get('next_run_after'))):
# we won't start the promise process, just get the latest result # we won't start the promise process, just get the latest result
self._skipped_amount += 1
result = self._loadPromiseResult(promise_process.getPromiseTitle()) result = self._loadPromiseResult(promise_process.getPromiseTitle())
if result is not None: if result is not None:
if result.item.hasFailed(): if result.item.hasFailed():
...@@ -432,6 +481,7 @@ class PromiseLauncher(object): ...@@ -432,6 +481,7 @@ class PromiseLauncher(object):
self.logger.warning("Promise %s skipped." % promise_name) self.logger.warning("Promise %s skipped." % promise_name)
return True return True
self.logger.info("Checking promise %s..." % promise_name)
queue_item = None queue_item = None
sleep_time = 0.1 sleep_time = 0.1
increment_limit = int(self.promise_timeout / sleep_time) increment_limit = int(self.promise_timeout / sleep_time)
...@@ -498,21 +548,8 @@ class PromiseLauncher(object): ...@@ -498,21 +548,8 @@ class PromiseLauncher(object):
message="Error: No output returned by the promise", message="Error: No output returned by the promise",
execution_time=execution_time execution_time=execution_time
) )
elif queue_item.item.type() == "Empty Result":
# no result collected (sense skipped)
skipped_method = "Anomaly" if self.check_anomaly else "Test"
self.logger.debug("Skipped, %s is disabled in promise %r." % (
skipped_method, promise_name))
return False
if not self.dry_run:
self._savePromiseResult(queue_item)
if queue_item.item.hasFailed():
self.logger.error(queue_item.item.message)
if isinstance(queue_item.item, AnomalyResult) and self.check_anomaly:
# stop to bang as it was called
self.bang_called = True
self._writePromiseResult(queue_item)
if self.debug: if self.debug:
self.logger.debug("Finished promise %r in %s second(s)." % ( self.logger.debug("Finished promise %r in %s second(s)." % (
promise_name, execution_time)) promise_name, execution_time))
...@@ -584,6 +621,8 @@ class PromiseLauncher(object): ...@@ -584,6 +621,8 @@ class PromiseLauncher(object):
failed_promise_name = promise_name failed_promise_name = promise_name
self._updateFolderOwner(self.promise_output_dir) self._updateFolderOwner(self.promise_output_dir)
if self._skipped_amount > 0:
self.logger.info("%s promises didn't need to be checked." % \
self._skipped_amount)
if failed_promise_name: if failed_promise_name:
raise PromiseError("Promise %r failed." % failed_promise_name) raise PromiseError("Promise %r failed." % failed_promise_name)
...@@ -46,7 +46,6 @@ PROMISE_RESULT_FOLDER_NAME = '.slapgrid/promise/result' ...@@ -46,7 +46,6 @@ PROMISE_RESULT_FOLDER_NAME = '.slapgrid/promise/result'
PROMISE_LOG_FOLDER_NAME = '.slapgrid/promise/log' PROMISE_LOG_FOLDER_NAME = '.slapgrid/promise/log'
PROMISE_PARAMETER_NAME = 'extra_config_dict' PROMISE_PARAMETER_NAME = 'extra_config_dict'
PROMISE_PERIOD_FILE_NAME = '%s.periodicity'
LOGLINE_RE = r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\-?\s*(\w{4,7})\s+\-?\s+(\d+\-\d{3})\s+\-?\s*(.*)" LOGLINE_RE = r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\-?\s*(\w{4,7})\s+\-?\s+(\d+\-\d{3})\s+\-?\s*(.*)"
matchLogStr = re.compile(LOGLINE_RE).match matchLogStr = re.compile(LOGLINE_RE).match
...@@ -159,10 +158,6 @@ class GenericPromise(with_metaclass(ABCMeta, object)): ...@@ -159,10 +158,6 @@ class GenericPromise(with_metaclass(ABCMeta, object)):
self.__promise_path = self.__config.pop('path', None) self.__promise_path = self.__config.pop('path', None)
self.__queue = self.__config.pop('queue', None) self.__queue = self.__config.pop('queue', None)
self.__logger_buffer = None self.__logger_buffer = None
self.__periodicity_file = os.path.join(
self.__partition_folder,
PROMISE_STATE_FOLDER_NAME,
PROMISE_PERIOD_FILE_NAME % self.__name)
self.setPeriodicity(self.__config.pop('periodicity', 2)) self.setPeriodicity(self.__config.pop('periodicity', 2))
self.__transaction_id = '%s-%s' % (int(time.time()), random.randint(100, 999)) self.__transaction_id = '%s-%s' % (int(time.time()), random.randint(100, 999))
...@@ -236,8 +231,6 @@ class GenericPromise(with_metaclass(ABCMeta, object)): ...@@ -236,8 +231,6 @@ class GenericPromise(with_metaclass(ABCMeta, object)):
if minute <= 0: if minute <= 0:
raise ValueError("Cannot set promise periodicity to a value less than 1") raise ValueError("Cannot set promise periodicity to a value less than 1")
self.__periodicity = minute self.__periodicity = minute
with open(self.__periodicity_file, 'w') as f:
f.write('%s' % minute)
def getPeriodicity(self): def getPeriodicity(self):
return self.__periodicity return self.__periodicity
......
...@@ -36,7 +36,8 @@ import logging ...@@ -36,7 +36,8 @@ import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
import six import six
from six.moves import queue from six.moves import queue
from slapos.grid.promise import interface, PromiseLauncher, PromiseProcess, PromiseError from slapos.grid.promise import (interface, PromiseLauncher, PromiseProcess,
PromiseError, PROMISE_CACHE_FOLDER_NAME)
from slapos.grid.promise.generic import (GenericPromise, TestResult, AnomalyResult, from slapos.grid.promise.generic import (GenericPromise, TestResult, AnomalyResult,
PromiseQueueResult, PROMISE_STATE_FOLDER_NAME, PromiseQueueResult, PROMISE_STATE_FOLDER_NAME,
PROMISE_RESULT_FOLDER_NAME, PROMISE_RESULT_FOLDER_NAME,
...@@ -93,7 +94,7 @@ class TestSlapOSPromiseMixin(unittest.TestCase): ...@@ -93,7 +94,7 @@ class TestSlapOSPromiseMixin(unittest.TestCase):
self.launcher = PromiseLauncher( self.launcher = PromiseLauncher(
config=parameter_dict, config=parameter_dict,
logger=logging.getLogger('slapos.test.promise'), #logger=logging.getLogger('slapos.test.promise'),
dry_run=dry_run dry_run=dry_run
) )
if save_method: if save_method:
...@@ -1213,6 +1214,102 @@ exit 1 ...@@ -1213,6 +1214,102 @@ exit 1
# no result returned by the promise # no result returned by the promise
self.assertTrue(self.called) self.assertTrue(self.called)
def test_promise_cache(self):
promise_name = 'my_promise.py'
promise_file = os.path.join(self.plugin_dir, promise_name)
self.configureLauncher(timeout=1, enable_anomaly=True)
self.generatePromiseScript(promise_name, success=True, periodicity=0.01,
with_anomaly=True, is_tested=False)
# run promise, no failure
self.launcher.run()
cache_folder = os.path.join(self.partition_dir, PROMISE_CACHE_FOLDER_NAME)
cache_file = os.path.join(cache_folder, 'my_promise')
self.assertTrue(os.path.exists(cache_folder))
self.assertTrue(os.path.exists(cache_file))
file_stat = os.stat(promise_file)
with open(cache_file) as f:
cache_dict = json.load(f)
timestamp = cache_dict.pop('timestamp')
info_dict = {
u'is_tested': False,
u'is_anomaly_detected': True,
u'periodicity': 0.01,
u'next_run_after' : (timestamp + 0.01 * 60.0),
u'module_file': u'%s' % promise_file,
u'module_file_mtime': file_stat.st_mtime,
}
# next run is in future
self.assertTrue(info_dict['next_run_after'] > time.time())
self.assertEqual(info_dict, cache_dict)
def test_promise_cache_expire_with_periodicity(self):
self.called = False
def test_method(result):
self.called = True
promise_name = 'my_promise.py'
promise_file = os.path.join(self.plugin_dir, promise_name)
self.configureLauncher(save_method=test_method, timeout=1, enable_anomaly=True)
self.generatePromiseScript(promise_name, success=True, periodicity=0.01,
with_anomaly=True, is_tested=False)
# run promise, no failure
self.launcher.run()
cache_folder = os.path.join(self.partition_dir, PROMISE_CACHE_FOLDER_NAME)
cache_file = os.path.join(cache_folder, 'my_promise')
self.assertTrue(os.path.exists(cache_folder))
self.assertTrue(os.path.exists(cache_file))
file_stat = os.stat(promise_file)
with open(cache_file) as f:
cache_dict = json.load(f)
timestamp = cache_dict.pop('timestamp')
info_dict = {
u'is_tested': False,
u'is_anomaly_detected': True,
u'periodicity': 0.01,
u'next_run_after' : (timestamp + 0.01 * 60.0),
u'module_file': u'%s' % promise_file,
u'module_file_mtime': file_stat.st_mtime,
}
self.assertEqual(info_dict, cache_dict)
self.assertTrue(self.called)
next_run_after = cache_dict['next_run_after']
# periodicity not match
self.called = False
self.configureLauncher(save_method=test_method, timeout=1, enable_anomaly=True)
self.launcher.run()
self.assertFalse(self.called)
with open(cache_file) as f:
cache_dict = json.load(f)
# no change!
current_timestamp = cache_dict.pop('timestamp')
self.assertEqual(current_timestamp, timestamp)
self.assertEqual(info_dict, cache_dict)
time.sleep(1)
# periodicity match
self.configureLauncher(save_method=test_method, timeout=1, enable_anomaly=True)
self.launcher.run()
# cached was updated
with open(cache_file) as f:
cache_dict = json.load(f)
new_timestamp = cache_dict.pop('timestamp')
info_dict = {
u'is_tested': False,
u'is_anomaly_detected': True,
u'periodicity': 0.01,
u'next_run_after' : (new_timestamp + 0.01 * 60.0),
u'module_file': u'%s' % promise_file,
u'module_file_mtime': file_stat.st_mtime,
}
self.assertTrue(new_timestamp > timestamp)
# next run is in future
self.assertTrue(cache_dict['next_run_after'] > next_run_after)
self.assertEqual(info_dict, cache_dict)
class TestSlapOSGenericPromise(TestSlapOSPromiseMixin): class TestSlapOSGenericPromise(TestSlapOSPromiseMixin):
def initialisePromise(self, promise_content="", success=True, timeout=60): def initialisePromise(self, promise_content="", success=True, timeout=60):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment