Commit 238b167d authored by Alain Takoudjou's avatar Alain Takoudjou

monitor: run all promises in a single cron task with timeout of 12 seconds per promises

parent 54399dda
......@@ -75,7 +75,7 @@ class Monitoring(object):
self.crond_folder = config.get("monitor", "crond-folder")
self.logrotate_d = config.get("monitor", "logrotate-folder")
self.promise_runner = config.get("monitor", "promise-runner")
self.promise_folder_list = config.get("monitor", "promise-folder-list").split()
self.promise_folder = config.get("monitor", "promise-folder")
self.public_folder = config.get("monitor", "public-folder")
self.private_folder = config.get("monitor", "private-folder")
self.collector_db = config.get("monitor", "collector-db")
......@@ -98,10 +98,6 @@ class Monitoring(object):
self.promise_output_file = config.get("monitor", "promise-output-file")
self.bootstrap_is_ok = True
self.promise_dict = {}
for promise_folder in self.promise_folder_list:
self.setupPromiseDictFromFolder(promise_folder)
def loadConfig(self, pathes, config=None):
if config is None:
config = ConfigParser.ConfigParser()
......@@ -182,13 +178,6 @@ class Monitoring(object):
pass
return configuration_list
def setupPromiseDictFromFolder(self, folder):
for filename in os.listdir(folder):
path = os.path.join(folder, filename)
if os.path.isfile(path) and os.access(path, os.X_OK):
self.promise_dict[filename] = {"path": path,
"configuration": ConfigParser.ConfigParser()}
def createSymlinksFromConfig(self, destination_folder, source_path_list, name=""):
if destination_folder:
if source_path_list:
......@@ -406,37 +395,30 @@ class Monitoring(object):
def generateServiceCronEntries(self):
# XXX only if at least one configuration file is modified, then write in the cron
#cron_line_list = ['PATH=%s\n' % os.environ['PATH']]
cron_line_list = []
service_name_list = [name.replace('.status.json', '')
for name in os.listdir(self.public_folder) if name.endswith('.status.json')]
for service_name, promise in self.promise_items:
service_config = promise["configuration"]
service_status_path = "%s/%s.status.json" % (self.public_folder, service_name)
mkdirAll(os.path.dirname(service_status_path))
promise_cmd_line = [
softConfigGet(service_config, "service", "frequency") or "* * * * *",
self.promise_runner,
'--pid_path "%s"' % os.path.join(self.service_pid_folder,
"%s.pid" % service_name),
'--output "%s"' % service_status_path,
'--promise_script "%s"' % promise["path"],
'--promise_name "%s"' % service_name,
'--monitor_url "%s/jio_private/"' % self.webdav_url, # XXX hardcoded,
'--history_folder "%s"' % self.public_folder,
'--instance_name "%s"' % self.title,
'--hosting_name "%s"' % self.root_title]
cron_line_list.append(' '.join(promise_cmd_line))
if service_name in service_name_list:
promise_cmd_line = [
"* * * * *",
"sleep $((1 + RANDOM % 30)) &&", # Sleep between 1 to 30 seconds
self.promise_runner,
'--pid_path "%s"' % os.path.join(self.service_pid_folder,
"monitor-promises.pid"),
'--output "%s"' % self.public_folder,
'--promise_folder "%s"' % self.promise_folder,
'--monitor_url "%s/jio_private/"' % self.webdav_url, # XXX hardcoded,
'--history_folder "%s"' % self.public_folder,
'--instance_name "%s"' % self.title,
'--hosting_name "%s"' % self.root_title]
registered_promise_list = os.listdir(self.promise_folder)
for service_name in service_name_list:
if service_name in registered_promise_list:
service_name_list.pop(service_name_list.index(service_name))
if service_name_list != []:
# XXX Some service was removed, delete his status file so monitor will not consider his status anymore
# XXX Some service was removed, delete his status file so monitor will not consider the status anymore
for service_name in service_name_list:
status_path = os.path.join(self.public_folder, '%s.status.json' % service_name)
if os.path.exists(status_path):
......@@ -447,7 +429,7 @@ class Monitoring(object):
pass
with open(self.crond_folder + "/monitor-promises", "w") as fp:
fp.write("\n".join(cron_line_list))
fp.write(' '.join(promise_cmd_line))
def addCronEntry(self, name, frequency, command):
entry_line = '%s %s' % (frequency, command)
......@@ -466,21 +448,6 @@ class Monitoring(object):
self.configureFolders()
# create symlinks from service configurations
self.promise_items = self.promise_dict.items()
for service_name, promise in self.promise_items:
service_config = promise["configuration"]
public_path_list = softConfigGet(service_config, "service", "public-path-list")
private_path_list = softConfigGet(service_config, "service", "private-path-list")
if public_path_list:
self.createSymlinksFromConfig(self.public_folder,
public_path_list.split(),
service_name)
if private_path_list:
self.createSymlinksFromConfig(self.private_folder,
private_path_list.split(),
service_name)
# Generate OPML file
self.generateOpmlFile(self.monitor_url_list,
os.path.join(self.public_folder, 'feeds'))
......@@ -514,7 +481,8 @@ class Monitoring(object):
self.generateLogrotateEntry('monitor.service.status', file_list, option_list)
# Add cron entry for SlapOS Collect
command = "%s %s --output_folder %s --collector_db %s" % (self.python,
command = "sleep $((1 + RANDOM % 60)) && " # Random sleep between 1 to 60 seconds
command += "%s %s --output_folder %s --collector_db %s" % (self.python,
self.collect_script, self.data_folder, self.collector_db)
self.addCronEntry('monitor_collect', '* * * * *', command)
......
......@@ -12,6 +12,9 @@ import glob
import argparse
import traceback
# Promise timeout after 12 seconds
promise_timeout = 12
def parseArguments():
"""
Parse arguments for monitor collector instance.
......@@ -23,6 +26,8 @@ def parseArguments():
help='The Path of file where Json result of this promise will be saved.')
parser.add_argument('--promise_script',
help='Promise script to execute.')
parser.add_argument('--promise_folder',
help='Folder where to find promises to execute. Hide --promise_script and --promise_name')
parser.add_argument('--promise_name',
help='Title to give to this promise.')
parser.add_argument('--promise_type',
......@@ -41,147 +46,276 @@ def parseArguments():
return parser
def runpromise(parser):
class RunPromise(object):
def __init__(self, config_parser):
self.config = config_parser
def runpromise(self):
if self.config.promise_folder:
# run all promises from the given folder
return self.runpromise_synchronous()
if os.path.exists(self.config.pid_path):
with open(self.config.pid_path, "r") as pidfile:
try:
pid = int(pidfile.read(6))
except ValueError:
pid = None
if pid and os.path.exists("/proc/" + str(pid)):
print("A process is already running with pid " + str(pid))
return 1
start_date = ""
with open(self.config.pid_path, "w") as pidfile:
process = self.executeCommand(self.config.promise_script)
ps_process = psutil.Process(process.pid)
start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ps_process.create_time()))
pidfile.write(str(process.pid))
status_json = self.generateStatusJsonFromProcess(process, start_date=start_date)
status_json['_links'] = {"monitor": {"href": self.config.monitor_url}}
status_json['title'] = self.config.promise_name
status_json['instance'] = self.config.instance_name
status_json['hosting_subscription'] = self.config.hosting_name
status_json['type'] = self.config.promise_type
# Save the lastest status change date (needed for rss)
status_json['change-time'] = ps_process.create_time()
if os.path.exists(self.config.output):
with open(self.config.output) as f:
try:
last_result = json.loads(f.read())
if status_json['status'] == last_result['status'] and last_result.has_key('change-time'):
status_json['change-time'] = last_result['change-time']
except ValueError:
pass
self.updateStatusHistoryFolder(
self.config.promise_name,
self.config.output,
self.config.history_folder,
self.config.promise_type
)
with open(self.config.output, "w") as outputfile:
json.dump(status_json, outputfile)
os.remove(self.config.pid_path)
def runpromise_synchronous(self):
if os.path.exists(self.config.pid_path):
# Check if another run promise is running
with open(self.config.pid_path) as fpid:
try:
pid = int(fpid.read(6))
except ValueError:
pid = None
if pid and os.path.exists("/proc/" + str(pid)):
print("A process is already running with pid " + str(pid))
return []
with open(self.config.pid_path, 'w') as fpid:
fpid.write(str(os.getpid()))
status_list = self.checkPromises(self.config.promise_folder)
promises_status_file = os.path.join(self.config.output, '_promise_status')
previous_state_dict = {}
new_state_dict = {}
base_dict = {
'_links': {"monitor": {"href": self.config.monitor_url}},
'instance': self.config.instance_name,
'hosting_subscription': self.config.hosting_name,
}
if os.path.exists(promises_status_file):
with open(promises_status_file) as f:
try:
previous_state_dict = json.loads(f.read())
except ValueError:
pass
for status_dict in status_list:
status_dict.update(base_dict)
if previous_state_dict.has_key(status_dict['title']):
status, time = previous_state_dict[status_dict['title']].split('#')
if status_dict['status'] == status:
status_dict['change-time'] = float(time)
promise_result_file = os.path.join(self.config.output,
"%s.status.json" % status_dict['title'])
with open(promise_result_file, "w") as outputfile:
json.dump(status_dict, outputfile)
new_state_dict[status_dict['title']] = '%s#%s' % (status_dict['status'],
status_dict['change-time'])
self.updateStatusHistoryFolder(
status_dict['title'],
promise_result_file,
self.config.history_folder,
'status'
)
with open(promises_status_file, "w") as outputfile:
json.dump(new_state_dict, outputfile)
if os.path.exists(parser.pid_path):
with open(parser.pid_path, "r") as pidfile:
os.remove(self.config.pid_path)
def updateStatusHistoryFolder(self, name, status_file, history_folder, promise_type):
history_path = os.path.join(history_folder)
if not os.path.exists(status_file):
return
if not os.path.exists(history_folder):
return
if not os.path.exists(history_path):
try:
pid = int(pidfile.read(6))
except ValueError:
pid = None
if pid and os.path.exists("/proc/" + str(pid)):
print("A process is already running with pid " + str(pid))
return 1
start_date = ""
with open(parser.pid_path, "w") as pidfile:
process = executeCommand(parser.promise_script)
ps_process = psutil.Process(process.pid)
start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(ps_process.create_time()))
pidfile.write(str(process.pid))
status_json = generateStatusJsonFromProcess(process, start_date=start_date)
status_json['_links'] = {"monitor": {"href": parser.monitor_url}}
status_json['title'] = parser.promise_name
status_json['instance'] = parser.instance_name
status_json['hosting_subscription'] = parser.hosting_name
status_json['type'] = parser.promise_type
# Save the lastest status change date (needed for rss)
status_json['change-time'] = ps_process.create_time()
if os.path.exists(parser.output):
with open(parser.output) as f:
os.makedirs(history_path)
except OSError, e:
if e.errno == os.errno.EEXIST and os.path.isdir(history_path):
pass
else: raise
with open(status_file, 'r') as sf:
try:
last_result = json.loads(f.read())
if status_json['status'] == last_result['status'] and last_result.has_key('change-time'):
status_json['change-time'] = last_result['change-time']
status_dict = json.loads(sf.read())
except ValueError:
pass
updateStatusHistoryFolder(
parser.promise_name,
parser.output,
parser.history_folder,
parser.promise_type
)
with open(parser.output, "w") as outputfile:
json.dump(status_json, outputfile)
os.remove(parser.pid_path)
def updateStatusHistoryFolder(name, status_file, history_folder, promise_type):
history_path = os.path.join(history_folder)
if not os.path.exists(status_file):
return
if not os.path.exists(history_folder):
return
if not os.path.exists(history_path):
try:
os.makedirs(history_path)
except OSError, e:
if e.errno == os.errno.EEXIST and os.path.isdir(history_path):
pass
else: raise
with open(status_file, 'r') as sf:
try:
status_dict = json.loads(sf.read())
except ValueError:
traceback.print_exc()
return
traceback.print_exc()
return
if promise_type == 'status':
filename = '%s.history.json' % name
history_file = os.path.join(history_path, filename)
# Remove links from history (not needed)
status_dict.pop('_links', None)
if not os.path.exists(history_file):
with open(history_file, 'w') as f_history:
data_dict = {
"date": time.time(),
"data": [status_dict]
}
f_history.write(json.dumps(data_dict))
else:
# Remove useless informations
status_dict.pop('hosting_subscription', '')
status_dict.pop('title', '')
status_dict.pop('instance', '')
status_dict.pop('type', '')
if promise_type == 'status':
filename = '%s.history.json' % name
history_file = os.path.join(history_path, filename)
# Remove links from history (not needed)
status_dict.pop('_links', None)
if not os.path.exists(history_file):
with open(history_file, 'w') as f_history:
data_dict = {
"date": time.time(),
"data": [status_dict]
}
f_history.write(json.dumps(data_dict))
with open (history_file, mode="r+") as f_history:
f_history.seek(0,2)
position = f_history.tell() -2
f_history.seek(position)
#f_history.write(',%s]}' % str(status_dict))
f_history.write('%s}' % ',{}]'.format(json.dumps(status_dict)))
elif promise_type == 'report':
# keep_item_amount = 3
filename = '%s.history.json' % (
name)
copyfile(status_file, os.path.join(history_path, filename))
"""# Don't let history foler grow too much, keep xx files
file_list = filter(os.path.isfile,
glob.glob("%s/*.%s.history.json" % (history_path, promise_type))
)
file_count = len(file_list)
if file_count > keep_item_amount:
file_list.sort(key=lambda x: os.path.getmtime(x))
while file_count > keep_item_amount:
to_delete = file_list.pop(0)
try:
os.unlink(to_delete)
file_count -= 1
except OSError:
raise"""
def generateStatusJsonFromProcess(self, process, start_date=None, title=None):
stdout, stderr = process.communicate()
status_json = {}
if process.returncode != 0:
status_json["status"] = "ERROR"
else:
# Remove useless informations
status_dict.pop('hosting_subscription', '')
status_dict.pop('title', '')
status_dict.pop('instance', '')
status_dict.pop('type', '')
with open (history_file, mode="r+") as f_history:
f_history.seek(0,2)
position = f_history.tell() -2
f_history.seek(position)
#f_history.write(',%s]}' % str(status_dict))
f_history.write('%s}' % ',{}]'.format(json.dumps(status_dict)))
elif promise_type == 'report':
# keep_item_amount = 3
filename = '%s.history.json' % (
name)
copyfile(status_file, os.path.join(history_path, filename))
"""# Don't let history foler grow too much, keep xx files
file_list = filter(os.path.isfile,
glob.glob("%s/*.%s.history.json" % (history_path, promise_type))
)
file_count = len(file_list)
if file_count > keep_item_amount:
file_list.sort(key=lambda x: os.path.getmtime(x))
while file_count > keep_item_amount:
to_delete = file_list.pop(0)
try:
os.unlink(to_delete)
file_count -= 1
except OSError:
raise"""
def generateStatusJsonFromProcess(process, start_date=None, title=None):
stdout, stderr = process.communicate()
status_json = {}
if process.returncode != 0:
status_json["status"] = "ERROR"
else:
status_json["status"] = "OK"
if stderr:
status_json["message"] = stderr
elif stdout:
status_json["message"] = stdout
if start_date:
status_json["start-date"] = start_date
if title:
status_json["title"] = title
return status_json
def executeCommand(args):
return subprocess.Popen(
args,
#cwd=instance_path,
#env=None if sys.platform == 'cygwin' else {},
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
status_json["status"] = "OK"
if stderr:
status_json["message"] = stderr
elif stdout:
status_json["message"] = stdout
if start_date:
status_json["start-date"] = start_date
if title:
status_json["title"] = title
return status_json
def executeCommand(self, args):
return subprocess.Popen(
args,
#cwd=instance_path,
#env=None if sys.platform == 'cygwin' else {},
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
def checkPromises(self, promise_dir):
"""
Run all promises found into specified folder
"""
if not os.path.exists(promise_dir) or not os.path.isdir(promise_dir):
return []
promise_result_list = []
# Check whether every promise is kept
for promise in os.listdir(promise_dir):
promise_script = os.path.join(promise_dir, promise)
if not os.path.isfile(promise_script) or not os.access(promise_script, os.X_OK):
# Not executable file
continue
command = [promise_script]
promise_name = os.path.basename(command[0])
result_dict = {
"status": "ERROR",
"type": "status",
"title": promise_name,
"start-date" : time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"change-time": time.time()
}
process_handler = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE)
process_handler.stdin.flush()
process_handler.stdin.close()
process_handler.stdin = None
sleep_time = 0.1
increment_limit = int(promise_timeout / sleep_time)
for current_increment in range(0, increment_limit):
if process_handler.poll() is None:
time.sleep(sleep_time)
continue
if process_handler.poll() == 0:
# Success!
result_dict["message"] = process_handler.communicate()[0]
result_dict["status"] = "OK"
else:
result_dict["message"] = process_handler.communicate()[1]
break
else:
process_handler.terminate()
message = process_handler.stderr.read()
if message is None:
message = process_handler.stdout.read() or ""
message += '\nPROMISE TIME OUT AFTER %s SECONDS' % promise_timeout
result_dict["message"] = message
promise_result_list.append(result_dict)
return promise_result_list
def main():
arg_parser = parseArguments()
sys.exit(runpromise(arg_parser.parse_args()))
promise_runner = RunPromise(arg_parser.parse_args())
sys.exit(promise_runner.runpromise())
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment