Commit fc092505 authored by Alain Takoudjou's avatar Alain Takoudjou

grid.promise: save and use promise periodicity in cache

parent 459e9ac4
...@@ -72,6 +72,7 @@ class PromiseProcess(Process): ...@@ -72,6 +72,7 @@ class PromiseProcess(Process):
@param promise_name: The name of the promise to run @param promise_name: The name of the promise to run
@param promise_path: path of the promise @param promise_path: path of the promise
@param argument_dict: all promise parameters in a dictionary @param argument_dict: all promise parameters in a dictionary
@param queue: Queue used to send promise result
@param allow_bang: Bolean saying if bang should be called in case of @param allow_bang: Bolean saying if bang should be called in case of
anomaly failure. anomaly failure.
@param check_anomaly: Bolean saying if promise anomaly should be run. @param check_anomaly: Bolean saying if promise anomaly should be run.
...@@ -94,74 +95,52 @@ class PromiseProcess(Process): ...@@ -94,74 +95,52 @@ class PromiseProcess(Process):
self._periodicity = None self._periodicity = None
self.cache_folder = os.path.join(self.partition_folder, self.cache_folder = os.path.join(self.partition_folder,
PROMISE_CACHE_FOLDER_NAME) PROMISE_CACHE_FOLDER_NAME)
mkdir_p(self.cache_folder)
self.cache_file = os.path.join(self.cache_folder, self.getPromiseTitle()) self.cache_file = os.path.join(self.cache_folder, self.getPromiseTitle())
self._timestamp_file = os.path.join(partition_folder, # XXX - remove old files used to store promise timestamp and periodicity
PROMISE_STATE_FOLDER_NAME, self._cleanupDeprecated()
'%s.timestamp' % promise_name)
periodicity_file = os.path.join(partition_folder, def _cleanupDeprecated(self):
timestamp_file = os.path.join(self.partition_folder,
PROMISE_STATE_FOLDER_NAME,
'%s.timestamp' % self.name)
periodicity_file = os.path.join(self.partition_folder,
PROMISE_STATE_FOLDER_NAME, PROMISE_STATE_FOLDER_NAME,
PROMISE_PERIOD_FILE_NAME % promise_name) PROMISE_PERIOD_FILE_NAME % self.name)
if os.path.exists(periodicity_file) and os.stat(periodicity_file).st_size: if os.path.exists(timestamp_file) and os.path.isfile(timestamp_file):
with open(periodicity_file) as f: os.unlink(timestamp_file)
try: if os.path.exists(periodicity_file) and os.path.isfile(periodicity_file):
self._periodicity = float(f.read()) os.unlink(periodicity_file)
except ValueError:
# set to None, run the promise and regenerate the file
pass
def isPeriodicityMatch(self): def getNextPromiseTime(self, periodicity):
""" """
Return True if promise should be run now, considering the promise Return the next promise execution timestamp from now
periodicity in minutes
""" """
if self._periodicity is not None and \ return time.time() + (periodicity * 60.0)
os.path.exists(self._timestamp_file) and \
os.stat(self._timestamp_file).st_size:
with open(self._timestamp_file) as f:
try:
latest_timestamp = float(f.read())
current_timediff = (time.time() - latest_timestamp) / 60.0
if current_timediff >= self._periodicity:
return True
#self.logger.debug("Skip Promise %r. periodicity=%s, time_diff=%s" % (
# self.name, self._periodicity, current_timediff))
except ValueError:
# if the file is broken, run the promise and regenerate it
return True
else:
return False
return True
def setPromiseStartTimestamp(self):
"""
Save the promise execution timestamp
"""
state_directory = os.path.dirname(self._timestamp_file)
mkdir_p(state_directory)
with open(self._timestamp_file, 'w') as f:
f.write(str(time.time()))
def getPromiseTitle(self): def getPromiseTitle(self):
return os.path.splitext(self.name)[0] return os.path.splitext(self.name)[0]
def updatePromiseCache(self, promise_class, promise_instance): def updatePromiseCache(self, promise_class, promise_instance, started=True):
""" """
Cache some data from the promise that can be reused Cache some data from the promise that can be reused
""" """
py_file = '%s.py' % os.path.splitext(inspect.getfile(promise_class))[0] py_file = '%s.py' % os.path.splitext(inspect.getfile(promise_class))[0]
stat = os.stat(py_file) stat = os.stat(py_file)
timestamp = time.time()
cache_dict = dict( cache_dict = dict(
is_tested= not hasattr(promise_instance, 'isTested') or \ is_tested= not hasattr(promise_instance, 'isTested') or \
promise_instance.isTested(), promise_instance.isTested(),
is_anomaly_detected=not hasattr(promise_instance, 'isAnomalyDetected') or \ is_anomaly_detected=not hasattr(promise_instance, 'isAnomalyDetected') or \
promise_instance.isAnomalyDetected(), promise_instance.isAnomalyDetected(),
periodicity=promise_instance.getPeriodicity(), periodicity=promise_instance.getPeriodicity(),
next_run_after=timestamp + (promise_instance.getPeriodicity() * 60.0),
timestamp=timestamp,
module_file=py_file, module_file=py_file,
module_file_mtime=stat.st_mtime, module_file_mtime=stat.st_mtime,
module_file_size=stat.st_size,
) )
if not os.path.isdir(self.cache_folder): if not started:
mkdir_p(self.cache_folder) cache_dict['next_run_after'] = timestamp
with open(self.cache_file, 'w') as f: with open(self.cache_file, 'w') as f:
f.write(json.dumps(cache_dict)) f.write(json.dumps(cache_dict))
...@@ -174,8 +153,7 @@ class PromiseProcess(Process): ...@@ -174,8 +153,7 @@ class PromiseProcess(Process):
# file not exists mean path was changed # file not exists mean path was changed
return None return None
current_stat = os.stat(cache_dict['module_file']) current_stat = os.stat(cache_dict['module_file'])
if current_stat.st_mtime != cache_dict['module_file_mtime'] or \ if current_stat.st_mtime != cache_dict['module_file_mtime']:
current_stat.st_size != cache_dict['module_file_size']:
# file was modified, update cache # file was modified, update cache
return None return None
return cache_dict return cache_dict
...@@ -190,23 +168,26 @@ class PromiseProcess(Process): ...@@ -190,23 +168,26 @@ class PromiseProcess(Process):
""" """
try: try:
os.chdir(self.partition_folder) os.chdir(self.partition_folder)
promise_started = False
if self.uid and self.gid: if self.uid and self.gid:
dropPrivileges(self.uid, self.gid, logger=self.logger) dropPrivileges(self.uid, self.gid, logger=self.logger)
if self.wrap_promise: if self.wrap_promise:
promise_instance = WrapPromise(self.argument_dict) promise_instance = WrapPromise(self.argument_dict)
else: else:
self._createInitFile() self._createInitFile()
promise_module = self._loadPromiseModule() promise_module = self._loadPromiseModule()
promise_instance = promise_module.RunPromise(self.argument_dict) promise_instance = promise_module.RunPromise(self.argument_dict)
self.updatePromiseCache(promise_module.RunPromise, promise_instance)
if not hasattr(promise_instance, 'isAnomalyDetected') or not \ if not hasattr(promise_instance, 'isAnomalyDetected') or not \
hasattr(promise_instance, 'isTested') or \ hasattr(promise_instance, 'isTested') or \
(promise_instance.isAnomalyDetected() and self.check_anomaly) or \ (promise_instance.isAnomalyDetected() and self.check_anomaly) or \
(promise_instance.isTested() and not self.check_anomaly): (promise_instance.isTested() and not self.check_anomaly):
# if the promise will run, we save execution timestamp # if the promise will run, we save execution timestamp
self.setPromiseStartTimestamp() promise_started = True
self.updatePromiseCache(
WrapPromise if self.wrap_promise else promise_module.RunPromise,
promise_instance,
started=promise_started)
promise_instance.run(self.check_anomaly, self.allow_bang) promise_instance.run(self.check_anomaly, self.allow_bang)
except Exception: except Exception:
self.logger.error(traceback.format_exc()) self.logger.error(traceback.format_exc())
...@@ -387,7 +368,7 @@ class PromiseLauncher(object): ...@@ -387,7 +368,7 @@ class PromiseLauncher(object):
execution_time=execution_time execution_time=execution_time
) )
def _writePromiseResult(self, result): def _savePromiseResult(self, result):
if not isinstance(result, PromiseQueueResult): if not isinstance(result, PromiseQueueResult):
self.logger.error('Bad result: %s is not type of PromiseQueueResult...' % result) self.logger.error('Bad result: %s is not type of PromiseQueueResult...' % result)
return return
...@@ -421,12 +402,13 @@ class PromiseLauncher(object): ...@@ -421,12 +402,13 @@ class PromiseLauncher(object):
)) ))
return result return result
def _savePromiseResult(self, result_item): def _writePromiseResult(self, result_item):
if result_item.item.type() == "Empty Result": if result_item.item.type() == "Empty Result":
# no result collected (sense skipped) # no result collected (sense skipped)
skipped_method = "Anomaly" if self.check_anomaly else "Test" skipped_method = "Anomaly" if self.check_anomaly else "Test"
self.logger.debug("Skipped, %s is disabled in promise %r" % ( self.logger.debug("Skipped, %s is disabled in promise %r" % (
skipped_method, result_item.name)) skipped_method, result_item.name))
return
elif result_item.item.hasFailed(): elif result_item.item.hasFailed():
self.logger.error(result_item.item.message) self.logger.error(result_item.item.message)
if result_item.execution_time != -1 and \ if result_item.execution_time != -1 and \
...@@ -434,7 +416,7 @@ class PromiseLauncher(object): ...@@ -434,7 +416,7 @@ class PromiseLauncher(object):
# stop to bang as it was called # stop to bang as it was called
self.bang_called = True self.bang_called = True
# Send result # Send result
self._writePromiseResult(result_item) self._savePromiseResult(result_item)
def _emptyQueue(self): def _emptyQueue(self):
"""Remove all entries from queue until it's empty""" """Remove all entries from queue until it's empty"""
...@@ -451,6 +433,11 @@ class PromiseLauncher(object): ...@@ -451,6 +433,11 @@ class PromiseLauncher(object):
PROMISE_STATE_FOLDER_NAME) PROMISE_STATE_FOLDER_NAME)
chownDirectory(folder_path, stat_info.st_uid, stat_info.st_gid) chownDirectory(folder_path, stat_info.st_uid, stat_info.st_gid)
def isPeriodicityMatch(self, next_timestamp):
if next_timestamp:
return time.time() >= next_timestamp
return True
def _launchPromise(self, promise_name, promise_path, argument_dict, def _launchPromise(self, promise_name, promise_path, argument_dict,
wrap_process=False): wrap_process=False):
""" """
...@@ -460,7 +447,6 @@ class PromiseLauncher(object): ...@@ -460,7 +447,6 @@ class PromiseLauncher(object):
If the promise periodicity doesn't match, the previous promise result is If the promise periodicity doesn't match, the previous promise result is
checked. checked.
""" """
self.logger.info("Checking promise %s..." % promise_name)
try: try:
promise_process = PromiseProcess( promise_process = PromiseProcess(
self.partition_folder, self.partition_folder,
...@@ -480,9 +466,10 @@ class PromiseLauncher(object): ...@@ -480,9 +466,10 @@ class PromiseLauncher(object):
if self.check_anomaly and not promise_cache_dict.get('is_anomaly_detected') \ if self.check_anomaly and not promise_cache_dict.get('is_anomaly_detected') \
or not self.check_anomaly and not promise_cache_dict.get('is_tested'): or not self.check_anomaly and not promise_cache_dict.get('is_tested'):
# promise is skipped, send empty result # promise is skipped, send empty result
self._savePromiseResult(PromiseQueueResult()) self._writePromiseResult(PromiseQueueResult())
return return
if not self.force and not promise_process.isPeriodicityMatch(): if not self.force and (promise_cache_dict is not None and not
self.isPeriodicityMatch(promise_cache_dict.get('next_run_after'))):
# we won't start the promise process, just get the latest result # we won't start the promise process, just get the latest result
result = self._loadPromiseResult(promise_process.getPromiseTitle()) result = self._loadPromiseResult(promise_process.getPromiseTitle())
if result is not None: if result is not None:
...@@ -500,6 +487,7 @@ class PromiseLauncher(object): ...@@ -500,6 +487,7 @@ class PromiseLauncher(object):
self.logger.warning("Promise %s skipped." % promise_name) self.logger.warning("Promise %s skipped." % promise_name)
return True return True
self.logger.info("Checking promise %s..." % promise_name)
queue_item = None queue_item = None
sleep_time = 0.1 sleep_time = 0.1
increment_limit = int(self.promise_timeout / sleep_time) increment_limit = int(self.promise_timeout / sleep_time)
...@@ -567,7 +555,7 @@ class PromiseLauncher(object): ...@@ -567,7 +555,7 @@ class PromiseLauncher(object):
execution_time=execution_time execution_time=execution_time
) )
self._savePromiseResult(queue_item) self._writePromiseResult(queue_item)
if self.debug: if self.debug:
self.logger.debug("Finished promise %r in %s second(s)." % ( self.logger.debug("Finished promise %r in %s second(s)." % (
promise_name, execution_time)) promise_name, execution_time))
......
...@@ -236,8 +236,6 @@ class GenericPromise(with_metaclass(ABCMeta, object)): ...@@ -236,8 +236,6 @@ class GenericPromise(with_metaclass(ABCMeta, object)):
if minute <= 0: if minute <= 0:
raise ValueError("Cannot set promise periodicity to a value less than 1") raise ValueError("Cannot set promise periodicity to a value less than 1")
self.__periodicity = minute self.__periodicity = minute
with open(self.__periodicity_file, 'w') as f:
f.write('%s' % minute)
def getPeriodicity(self): def getPeriodicity(self):
return self.__periodicity return self.__periodicity
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment