Commit b8d4799b authored by Alain Takoudjou's avatar Alain Takoudjou

grid.promise: save and use promise periodicity in cache

parent 38cba22f
......@@ -73,6 +73,7 @@ class PromiseProcess(Process):
@param promise_name: The name of the promise to run
@param promise_path: path of the promise
@param argument_dict: all promise parameters in a dictionary
@param queue: Queue used to send promise result
@param allow_bang: Bolean saying if bang should be called in case of
anomaly failure.
@param check_anomaly: Bolean saying if promise anomaly should be run.
......@@ -94,74 +95,52 @@ class PromiseProcess(Process):
self._periodicity = None
self.cache_folder = os.path.join(self.partition_folder,
PROMISE_CACHE_FOLDER_NAME)
mkdir_p(self.cache_folder)
self.cache_file = os.path.join(self.cache_folder, self.getPromiseTitle())
self._timestamp_file = os.path.join(partition_folder,
PROMISE_STATE_FOLDER_NAME,
'%s.timestamp' % promise_name)
periodicity_file = os.path.join(partition_folder,
# XXX - remove old files used to store promise timestamp and periodicity
self._cleanupDeprecated()
def _cleanupDeprecated(self):
timestamp_file = os.path.join(self.partition_folder,
PROMISE_STATE_FOLDER_NAME,
'%s.timestamp' % self.name)
periodicity_file = os.path.join(self.partition_folder,
PROMISE_STATE_FOLDER_NAME,
PROMISE_PERIOD_FILE_NAME % promise_name)
if os.path.exists(periodicity_file) and os.stat(periodicity_file).st_size:
with open(periodicity_file) as f:
try:
self._periodicity = float(f.read())
except ValueError:
# set to None, run the promise and regenerate the file
pass
PROMISE_PERIOD_FILE_NAME % self.name)
if os.path.exists(timestamp_file) and os.path.isfile(timestamp_file):
os.unlink(timestamp_file)
if os.path.exists(periodicity_file) and os.path.isfile(periodicity_file):
os.unlink(periodicity_file)
def isPeriodicityMatch(self):
def getNextPromiseTime(self, periodicity):
"""
Return True if promise should be run now, considering the promise
periodicity in minutes
Return the next promise execution timestamp from now
"""
if self._periodicity is not None and \
os.path.exists(self._timestamp_file) and \
os.stat(self._timestamp_file).st_size:
with open(self._timestamp_file) as f:
try:
latest_timestamp = float(f.read())
current_timediff = (time.time() - latest_timestamp) / 60.0
if current_timediff >= self._periodicity:
return True
#self.logger.debug("Skip Promise %r. periodicity=%s, time_diff=%s" % (
# self.name, self._periodicity, current_timediff))
except ValueError:
# if the file is broken, run the promise and regenerate it
return True
else:
return False
return True
def setPromiseStartTimestamp(self):
"""
Save the promise execution timestamp
"""
state_directory = os.path.dirname(self._timestamp_file)
mkdir_p(state_directory)
with open(self._timestamp_file, 'w') as f:
f.write(str(time.time()))
return time.time() + (periodicity * 60.0)
def getPromiseTitle(self):
return os.path.splitext(self.name)[0]
def updatePromiseCache(self, promise_class, promise_instance):
def updatePromiseCache(self, promise_class, promise_instance, started=True):
"""
Cache some data from the promise that can be reused
"""
py_file = '%s.py' % os.path.splitext(inspect.getfile(promise_class))[0]
stat = os.stat(py_file)
timestamp = time.time()
cache_dict = dict(
is_tested= not hasattr(promise_instance, 'isTested') or \
promise_instance.isTested(),
is_anomaly_detected=not hasattr(promise_instance, 'isAnomalyDetected') or \
promise_instance.isAnomalyDetected(),
periodicity=promise_instance.getPeriodicity(),
next_run_after=timestamp + (promise_instance.getPeriodicity() * 60.0),
timestamp=timestamp,
module_file=py_file,
module_file_mtime=stat.st_mtime,
module_file_size=stat.st_size,
)
if not os.path.isdir(self.cache_folder):
mkdir_p(self.cache_folder)
if not started:
cache_dict['next_run_after'] = timestamp
with open(self.cache_file, 'w') as f:
f.write(json.dumps(cache_dict))
......@@ -174,8 +153,7 @@ class PromiseProcess(Process):
# file not exists mean path was changed
return None
current_stat = os.stat(cache_dict['module_file'])
if current_stat.st_mtime != cache_dict['module_file_mtime'] or \
current_stat.st_size != cache_dict['module_file_size']:
if current_stat.st_mtime != cache_dict['module_file_mtime']:
# file was modified, update cache
return None
return cache_dict
......@@ -190,19 +168,24 @@ class PromiseProcess(Process):
"""
try:
os.chdir(self.partition_folder)
promise_started = False
if self.wrap_promise:
promise_instance = WrapPromise(self.argument_dict)
else:
promise_module = self._loadPromiseModule()
promise_instance = promise_module.RunPromise(self.argument_dict)
self.updatePromiseCache(promise_module.RunPromise, promise_instance)
if not hasattr(promise_instance, 'isAnomalyDetected') or not \
hasattr(promise_instance, 'isTested') or \
(promise_instance.isAnomalyDetected() and self.check_anomaly) or \
(promise_instance.isTested() and not self.check_anomaly):
# if the promise will run, we save execution timestamp
self.setPromiseStartTimestamp()
#self.setPromiseStartTimestamp()
promise_started = True
self.updatePromiseCache(
WrapPromise if self.wrap_promise else promise_module.RunPromise,
promise_instance,
started=promise_started)
promise_instance.run(self.check_anomaly, self.allow_bang)
except Exception:
self.logger.error(traceback.format_exc())
......@@ -285,7 +268,7 @@ class PromiseWorker(Process):
if pps.current_process is None:
return
process = pps.current_process
if signum in [signal.SIGINT, signal.SIGTERM] and process.is_alive() is None:
if signum in [signal.SIGINT, signal.SIGTERM] and process.is_alive():
process.terminate()
process.join(1)
if process.is_alive():
......@@ -364,8 +347,12 @@ class PromiseWorker(Process):
# Send result
self.done_queue.put(result_item)
def isPeriodicityMatch(self, next_timestamp):
if next_timestamp:
return time.time() >= next_timestamp
return True
def proceedTask(self, promise_name, task_dict):
self.logger.info("Checking promise %s..." % promise_name)
try:
promise_process = PromiseProcess(
queue=self.queue_result,
......@@ -381,7 +368,8 @@ class PromiseWorker(Process):
# promise is skipped, send empty result
self._sendPromiseResult(PromiseQueueResult())
return
if not self.force and not promise_process.isPeriodicityMatch():
if not self.force and (promise_cache_dict is not None and not
self.isPeriodicityMatch(promise_cache_dict.get('next_run_after'))):
# we won't start the promise process, just get the latest result
result = self._loadPromiseResult(promise_process.getPromiseTitle())
if result is not None:
......@@ -398,6 +386,7 @@ class PromiseWorker(Process):
self.logger.warning("Promise %s skipped." % promise_name)
return
self.logger.info("Checking promise %s..." % promise_name)
queue_item = None
sleep_time = 1
increment_limit = int(self.promise_timeout / sleep_time)
......
......@@ -236,8 +236,6 @@ class GenericPromise(with_metaclass(ABCMeta, object)):
if minute <= 0:
raise ValueError("Cannot set promise periodicity to a value less than 1")
self.__periodicity = minute
with open(self.__periodicity_file, 'w') as f:
f.write('%s' % minute)
def getPeriodicity(self):
return self.__periodicity
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment