Commit ababf1f6 authored by Alain Takoudjou's avatar Alain Takoudjou

grid.promise: cache some promise information to speedup testless and anomalyless checks

parent d259174f
...@@ -37,6 +37,7 @@ import json ...@@ -37,6 +37,7 @@ import json
import importlib import importlib
import traceback import traceback
import psutil import psutil
import inspect
from multiprocessing import Process, Queue as MQueue from multiprocessing import Process, Queue as MQueue
from six.moves import queue, reload_module from six.moves import queue, reload_module
from slapos.util import mkdir_p, chownDirectory from slapos.util import mkdir_p, chownDirectory
...@@ -51,6 +52,8 @@ from slapos.grid.promise.generic import (GenericPromise, PromiseQueueResult, ...@@ -51,6 +52,8 @@ from slapos.grid.promise.generic import (GenericPromise, PromiseQueueResult,
from slapos.grid.promise.wrapper import WrapPromise from slapos.grid.promise.wrapper import WrapPromise
from slapos.version import version from slapos.version import version
PROMISE_CACHE_FOLDER_NAME = '.slapgrid/promise/cache'
class PromiseError(Exception): class PromiseError(Exception):
pass pass
...@@ -89,6 +92,9 @@ class PromiseProcess(Process): ...@@ -89,6 +92,9 @@ class PromiseProcess(Process):
self.partition_folder = partition_folder self.partition_folder = partition_folder
self.wrap_promise = wrap self.wrap_promise = wrap
self._periodicity = None self._periodicity = None
self.cache_folder = os.path.join(self.partition_folder,
PROMISE_CACHE_FOLDER_NAME)
self.cache_file = os.path.join(self.cache_folder, self.getPromiseTitle())
self._timestamp_file = os.path.join(partition_folder, self._timestamp_file = os.path.join(partition_folder,
PROMISE_STATE_FOLDER_NAME, PROMISE_STATE_FOLDER_NAME,
'%s.timestamp' % promise_name) '%s.timestamp' % promise_name)
...@@ -138,6 +144,44 @@ class PromiseProcess(Process): ...@@ -138,6 +144,44 @@ class PromiseProcess(Process):
def getPromiseTitle(self): def getPromiseTitle(self):
return os.path.splitext(self.name)[0] return os.path.splitext(self.name)[0]
def updatePromiseCache(self, promise_class, promise_instance):
"""
Cache some data from the promise that can be reused
"""
py_file = '%s.py' % os.path.splitext(inspect.getfile(promise_class))[0]
stat = os.stat(py_file)
cache_dict = dict(
is_tested= not hasattr(promise_instance, 'isTested') or \
promise_instance.isTested(),
is_anomaly_detected=not hasattr(promise_instance, 'isAnomalyDetected') or \
promise_instance.isAnomalyDetected(),
periodicity=promise_instance.getPeriodicity(),
module_file=py_file,
module_file_mtime=stat.st_mtime,
module_file_size=stat.st_size,
)
if not os.path.isdir(self.cache_folder):
mkdir_p(self.cache_folder)
with open(self.cache_file, 'w') as f:
f.write(json.dumps(cache_dict))
def loadPromiseCacheDict(self):
if os.path.exists(self.cache_file):
try:
with open(self.cache_file) as f:
cache_dict = json.loads(f.read())
if not os.path.exists(cache_dict['module_file']):
# file not exists mean path was changed
return None
current_stat = os.stat(cache_dict['module_file'])
if current_stat.st_mtime != cache_dict['module_file_mtime'] or \
current_stat.st_size != cache_dict['module_file_size']:
# file was modified, update cache
return None
return cache_dict
except ValueError:
return None
def run(self): def run(self):
""" """
Run the promise Run the promise
...@@ -155,6 +199,7 @@ class PromiseProcess(Process): ...@@ -155,6 +199,7 @@ class PromiseProcess(Process):
self._createInitFile() self._createInitFile()
promise_module = self._loadPromiseModule() promise_module = self._loadPromiseModule()
promise_instance = promise_module.RunPromise(self.argument_dict) promise_instance = promise_module.RunPromise(self.argument_dict)
self.updatePromiseCache(promise_module.RunPromise, promise_instance)
if not hasattr(promise_instance, 'isAnomalyDetected') or not \ if not hasattr(promise_instance, 'isAnomalyDetected') or not \
hasattr(promise_instance, 'isTested') or \ hasattr(promise_instance, 'isTested') or \
...@@ -218,6 +263,7 @@ class PromiseProcess(Process): ...@@ -218,6 +263,7 @@ class PromiseProcess(Process):
key, extra_dict)) key, extra_dict))
self.argument_dict[key] = extra_dict[key] self.argument_dict[key] = extra_dict[key]
class PromiseLauncher(object): class PromiseLauncher(object):
def __init__(self, config=None, logger=None, dry_run=False): def __init__(self, config=None, logger=None, dry_run=False):
...@@ -341,7 +387,7 @@ class PromiseLauncher(object): ...@@ -341,7 +387,7 @@ class PromiseLauncher(object):
execution_time=execution_time execution_time=execution_time
) )
def _savePromiseResult(self, result): def _writePromiseResult(self, result):
if not isinstance(result, PromiseQueueResult): if not isinstance(result, PromiseQueueResult):
self.logger.error('Bad result: %s is not type of PromiseQueueResult...' % result) self.logger.error('Bad result: %s is not type of PromiseQueueResult...' % result)
return return
...@@ -375,6 +421,21 @@ class PromiseLauncher(object): ...@@ -375,6 +421,21 @@ class PromiseLauncher(object):
)) ))
return result return result
def _savePromiseResult(self, result_item):
if result_item.item.type() == "Empty Result":
# no result collected (sense skipped)
skipped_method = "Anomaly" if self.check_anomaly else "Test"
self.logger.debug("Skipped, %s is disabled in promise %r" % (
skipped_method, result_item.name))
elif result_item.item.hasFailed():
self.logger.error(result_item.item.message)
if result_item.execution_time != -1 and \
isinstance(result_item.item, AnomalyResult) and self.check_anomaly:
# stop to bang as it was called
self.bang_called = True
# Send result
self._writePromiseResult(result_item)
def _emptyQueue(self): def _emptyQueue(self):
"""Remove all entries from queue until it's empty""" """Remove all entries from queue until it's empty"""
while True: while True:
...@@ -414,6 +475,13 @@ class PromiseLauncher(object): ...@@ -414,6 +475,13 @@ class PromiseLauncher(object):
wrap=wrap_process, wrap=wrap_process,
) )
promise_cache_dict = promise_process.loadPromiseCacheDict()
if promise_cache_dict is not None:
if self.check_anomaly and not promise_cache_dict.get('is_anomaly_detected') \
or not self.check_anomaly and not promise_cache_dict.get('is_tested'):
# promise is skipped, send empty result
self._savePromiseResult(PromiseQueueResult())
return
if not self.force and not promise_process.isPeriodicityMatch(): if not self.force and not promise_process.isPeriodicityMatch():
# we won't start the promise process, just get the latest result # we won't start the promise process, just get the latest result
result = self._loadPromiseResult(promise_process.getPromiseTitle()) result = self._loadPromiseResult(promise_process.getPromiseTitle())
...@@ -498,21 +566,8 @@ class PromiseLauncher(object): ...@@ -498,21 +566,8 @@ class PromiseLauncher(object):
message="Error: No output returned by the promise", message="Error: No output returned by the promise",
execution_time=execution_time execution_time=execution_time
) )
elif queue_item.item.type() == "Empty Result":
# no result collected (sense skipped)
skipped_method = "Anomaly" if self.check_anomaly else "Test"
self.logger.debug("Skipped, %s is disabled in promise %r." % (
skipped_method, promise_name))
return False
if not self.dry_run:
self._savePromiseResult(queue_item)
if queue_item.item.hasFailed():
self.logger.error(queue_item.item.message)
if isinstance(queue_item.item, AnomalyResult) and self.check_anomaly:
# stop to bang as it was called
self.bang_called = True
self._savePromiseResult(queue_item)
if self.debug: if self.debug:
self.logger.debug("Finished promise %r in %s second(s)." % ( self.logger.debug("Finished promise %r in %s second(s)." % (
promise_name, execution_time)) promise_name, execution_time))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment