Commit 470f301c authored by Alain Takoudjou's avatar Alain Takoudjou

monitor: add tests for boostrap and runpromise

parent 3c581963
Pipeline #1243 skipped
......@@ -78,7 +78,7 @@ setup(name=name,
'monitor.runpromise = slapos.monitor.runpromise:main',
'monitor.genstatus = slapos.monitor.globalstate:main',
'monitor.genrss = slapos.monitor.status2rss:main',
'monitor.genconfig = slapos.monitor.monitor_gen_update_config:main',
'monitor.configwrite = slapos.monitor.monitor_config_write:main',
'runResiliencyUnitTestTestNode = slapos.resiliencytest:runUnitTest',
'runResiliencyScalabilityTestNode = slapos.resiliencytest:runResiliencyTest',
'runStandaloneResiliencyTest = slapos.resiliencytest:runStandaloneResiliencyTest',
......
#!/usr/bin/env python
import sys
import os
import re
import json
import argparse
import subprocess
from datetime import datetime
import time
def parseArguments():
"""
Parse arguments for monitor instance.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--config_folder',
help='Path where json configuration/document will be read and write')
parser.add_argument('--htpasswd_bin',
help='Path apache htpasswd binary. Needed to write htpasswd file.')
parser.add_argument('--output_cfg_file',
help='Ouput parameters in cfg file.')
return parser.parse_args()
class MonitorConfigWrite(object):
def __init__(self, config_json_file, htpasswd_bin, output_cfg_file=""):
self.config_json_file = config_json_file
self.output_cfg_file = output_cfg_file
self.htpasswd_bin = htpasswd_bin
def _fileWrite(self, file_path, content):
if os.path.exists(file_path):
try:
with open(file_path, 'w') as wf:
wf.write(content)
return True
except OSError, e:
print "ERROR while writing changes to %s.\n %s" % (file_path, str(e))
return False
def _htpasswdWrite(self, htpasswd_bin, parameter_dict, value):
if not os.path.exists(parameter_dict['file']):
return False
command = [htpasswd_bin, '-cb', parameter_dict['htpasswd'], parameter_dict['user'], value]
process = subprocess.Popen(
command,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
result = process.communicate()[0]
if process.returncode != 0:
print result
return False
with open(parameter_dict['file'], 'w') as pfile:
pfile.write(value)
return True
def _httpdCorsDomainWrite(self, httpd_cors_file, httpd_gracefull_bin, cors_domain):
cors_string = ""
cors_domain_list = cors_domain.split()
old_httpd_cors_file = os.path.join(
os.path.dirname(httpd_cors_file),
'prev_%s' % os.path.basename(httpd_cors_file)
)
if os.path.exists(old_httpd_cors_file) and os.path.isfile(old_httpd_cors_file):
try:
with open(old_httpd_cors_file, 'r') as cors_file:
if cors_file.read() == cors_domain:
if os.path.exists(httpd_cors_file) and (os.stat(httpd_cors_file).st_size > 0
or (cors_domain == "" and os.stat(httpd_cors_file).st_size == 0)):
# Skip if cors file is not empty
return True
except OSError, e:
print "Failed to open file at %s. \n%s" % (old_httpd_cors_file, str(e))
for domain in cors_domain_list:
if cors_string:
cors_string += '|'
cors_string += re.escape(domain)
try:
with open(httpd_cors_file, 'w') as file:
file.write('SetEnvIf Origin "^http(s)?://(.+\.)?(%s)$" origin_is=$0\n' % cors_string)
file.write('Header always set Access-Control-Allow-Origin %{origin_is}e env=origin_is')
except OSError, e:
print "ERROR while writing CORS changes to %s.\n %s" % (httpd_cors_file, str(e))
return False
# Save current cors domain list
try:
with open(old_httpd_cors_file, 'w') as cors_file:
cors_file.write(cors_domain)
except OSError, e:
print "Failed to open file at %s. \n%s" % (old_httpd_cors_file, str(e))
return False
# Restart httpd process
try:
subprocess.call(httpd_gracefull_bin)
except OSError, e:
print "Failed to execute command %s.\n %s" % (httpd_gracefull_bin, str(e))
return False
return True
def applyConfigChanges(self):
parameter_config_file = os.path.join(
os.path.dirname(self.config_json_file),
'config.parameters.json'
)
if not os.path.exists(self.config_json_file) or not os.path.isfile(self.config_json_file):
#print "ERROR: Config file doesn't exist... Exiting"
return {}
new_parameter_list = []
parameter_list = []
description_dict = {}
result_dict = {}
try:
with open(self.config_json_file) as tmpfile:
new_parameter_list = json.loads(tmpfile.read())
except ValueError:
print "Error: Couldn't parse json file %s" % self.config_json_file
with open(parameter_config_file) as tmpfile:
description_dict = json.loads(tmpfile.read())
for i in range(0, len(new_parameter_list)):
key = new_parameter_list[i]['key']
if key != '':
description_entry = description_dict[key]
if description_entry['type'] == 'file':
result_dict[key] = self._fileWrite(
description_entry['file'],
new_parameter_list[i]['value']
)
elif description_entry['type'] == 'htpasswd':
result_dict[key] = self._htpasswdWrite(
self.htpasswd_bin,
description_entry,
new_parameter_list[i]['value']
)
elif description_entry['type'] == 'httpdcors':
result_dict[key] = self._httpdCorsDomainWrite(
description_entry['cors_file'],
description_entry['gracefull_bin'],
new_parameter_list[i]['value']
)
if (self.output_cfg_file):
try:
with open(self.output_cfg_file, 'w') as pfile:
pfile.write('[public]\n')
for parameter in new_parameter_list:
if parameter['key']:
pfile.write('%s = %s\n' % (parameter['key'], parameter['value']))
except OSError, e:
print "Error failed to create file %s" % self.output_cfg_file
pass
return result_dict
def main():
parser = parseArguments()
parameter_tmp_file = os.path.join(parser.config_folder, 'config.tmp.json')
config_file = os.path.join(parser.config_folder, 'config.json')
# Run 4 times with sleep
run_counter = 1
max_runn = 4
sleep_time = 15
instance = MonitorConfigWrite(
parameter_tmp_file,
parser.htpasswd_bin,
parser.output_cfg_file)
while True:
result_dict = instance.applyConfigChanges()
if result_dict != {}:
status = True
for key in result_dict:
if not result_dict[key]:
status = False
if status and os.path.exists(parameter_tmp_file):
try:
os.unlink(config_file)
except OSError, e:
print "ERROR cannot remove file: %s" % parameter_tmp_file
else:
os.rename(parameter_tmp_file, config_file)
if run_counter == max_runn:
break
else:
run_counter += 1
time.sleep(sleep_time)
#!/usr/bin/env python
import sys
import os
import re
import json
import argparse
import subprocess
from datetime import datetime
import time
def parseArguments():
"""
Parse arguments for monitor instance.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--config_folder',
help='Path where json configuration/document will be read and write')
parser.add_argument('--htpasswd_bin',
help='Path apache htpasswd binary. Needed to write htpasswd file.')
parser.add_argument('--output_cfg_file',
help='Ouput parameters in cfg file.')
return parser.parse_args()
def fileWrite(file_path, content):
if os.path.exists(file_path):
try:
with open(file_path, 'w') as wf:
wf.write(content)
return True
except OSError, e:
print "ERROR while writing changes to %s.\n %s" % (file_path, str(e))
return False
def htpasswdWrite(htpasswd_bin, parameter_dict, value):
if not os.path.exists(parameter_dict['file']):
return False
command = [htpasswd_bin, '-cb', parameter_dict['htpasswd'], parameter_dict['user'], value]
process = subprocess.Popen(
command,
stdin=None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
result = process.communicate()[0]
if process.returncode != 0:
print result
return False
with open(parameter_dict['file'], 'w') as pfile:
pfile.write(value)
return True
def httpdCorsDomainWrite(httpd_cors_file, httpd_gracefull_bin, cors_domain):
cors_string = ""
cors_domain_list = cors_domain.split()
old_httpd_cors_file = os.path.join(
os.path.dirname(httpd_cors_file),
'prev_%s' % os.path.basename(httpd_cors_file)
)
if os.path.exists(old_httpd_cors_file) and os.path.isfile(old_httpd_cors_file):
try:
with open(old_httpd_cors_file, 'r') as cors_file:
if cors_file.read() == cors_domain:
if os.path.exists(httpd_cors_file) and (os.stat(httpd_cors_file).st_size > 0
or (cors_domain == "" and os.stat(httpd_cors_file).st_size == 0)):
# Skip if cors file is not empty
return True
except OSError, e:
print "Failed to open file at %s. \n%s" % (old_httpd_cors_file, str(e))
for domain in cors_domain_list:
if cors_string:
cors_string += '|'
cors_string += re.escape(domain)
try:
with open(httpd_cors_file, 'w') as file:
file.write('SetEnvIf Origin "^http(s)?://(.+\.)?(%s)$" origin_is=$0\n' % cors_string)
file.write('Header always set Access-Control-Allow-Origin %{origin_is}e env=origin_is')
except OSError, e:
print "ERROR while writing CORS changes to %s.\n %s" % (httpd_cors_file, str(e))
return False
# Save current cors domain list
try:
with open(old_httpd_cors_file, 'w') as cors_file:
cors_file.write(cors_domain)
except OSError, e:
print "Failed to open file at %s. \n%s" % (old_httpd_cors_file, str(e))
return False
# Restart httpd process
try:
subprocess.call(httpd_gracefull_bin)
except OSError, e:
print "Failed to execute command %s.\n %s" % (httpd_gracefull_bin, str(e))
return False
def applyEditChage(parser):
parameter_tmp_file = os.path.join(parser.config_folder, 'config.tmp.json')
config_file = os.path.join(parser.config_folder, 'config.json')
parameter_config_file = os.path.join(parser.config_folder, 'config.parameters.json')
if not os.path.exists(parameter_tmp_file) or not os.path.isfile(parameter_tmp_file):
return {}
if not os.path.exists(config_file):
print "ERROR: Config file doesn't exist... Exiting"
return {}
new_parameter_list = []
parameter_list = []
description_dict = {}
result_dict = {}
try:
with open(parameter_tmp_file) as tmpfile:
new_parameter_list = json.loads(tmpfile.read())
except ValueError:
print "Error: Couldn't parse json file %s" % parameter_tmp_file
with open(parameter_config_file) as tmpfile:
description_dict = json.loads(tmpfile.read())
for i in range(0, len(new_parameter_list)):
key = new_parameter_list[i]['key']
if key != '':
description_entry = description_dict[key]
if description_entry['type'] == 'file':
result_dict[key] = fileWrite(description_entry['file'], new_parameter_list[i]['value'])
elif description_entry['type'] == 'htpasswd':
result_dict[key] = htpasswdWrite(parser.htpasswd_bin, description_entry, new_parameter_list[i]['value'])
elif description_entry['type'] == 'httpdcors':
result_dict[key] = httpdCorsDomainWrite(description_entry['cors_file'], description_entry['gracefull_bin'], new_parameter_list[i]['value'])
if (parser.output_cfg_file):
try:
with open(parser.output_cfg_file, 'w') as pfile:
pfile.write('[public]\n')
for parameter in new_parameter_list:
if parameter['key']:
pfile.write('%s = %s\n' % (parameter['key'], parameter['value']))
except OSError, e:
print "Error failed to create file %s" % parser.output_cfg_file
pass
return result_dict
def main():
parser = parseArguments()
parameter_tmp_file = os.path.join(parser.config_folder, 'config.tmp.json')
config_file = os.path.join(parser.config_folder, 'config.json')
# Run 4 times with sleep
run_counter = 1
max_runn = 4
sleep_time = 15
while True:
result_dict = applyEditChage(parser)
if result_dict != {}:
status = True
for key in result_dict:
if not result_dict[key]:
status = False
if status and os.path.exists(parameter_tmp_file):
try:
os.unlink(config_file)
except OSError, e:
print "ERROR cannot remove file: %s" % parameter_tmp_file
else:
os.rename(parameter_tmp_file, config_file)
if run_counter == max_runn:
break
else:
run_counter += 1
time.sleep(sleep_time)
......@@ -184,4 +184,4 @@ def executeCommand(args):
def main():
arg_parser = parseArguments()
sys.exit(runpromise(parser.parse_args()))
sys.exit(runpromise(arg_parser.parse_args()))
......@@ -34,7 +34,7 @@ def parseArguments():
def getKey(item):
return item.pubDate
def main():
def genrss():
parser = parseArguments()
rss_item_list = []
......@@ -77,4 +77,4 @@ def main():
frss.write(rss_feed.to_xml())
def main():
exit(main())
exit(genrss())
This diff is collapsed.
# -*- coding: utf-8 -*-
import os, time
import sys
import re
import shutil
import tempfile
import unittest
import json
from slapos.monitor.monitor_config_write import MonitorConfigWrite
class MonitorConfigDocument(unittest.TestCase):
def setUp(self):
self.base_dir = tempfile.mkdtemp()
self.config_dir = os.path.join(self.base_dir, 'config')
self.config_path = os.path.join(self.config_dir, 'config.json')
os.mkdir(self.config_dir)
self.httpd_passwd = "btouhjng"
self.file_content = "wjkqelod"
self.httpd_passwd_bin = os.path.join(self.base_dir, 'htpasswd')
self.httpd_passwd_script = """#!/bin/sh
echo "htpasswd $@" > %s/monitor-htpasswd
""" % self.base_dir
self.parameter_dict = {
"cors-domain":
{
"gracefull_bin": ["/bin/echo", "restarted"],
"type": "httpdcors",
"cors_file": "%s/test-httpd-cors.cfg" % self.base_dir
},
"httpd-password":
{
"htpasswd": "%s/monitor-htpasswd" % self.base_dir,
"type": "htpasswd",
"user": "admin",
"file": "%s/.httpd_pwd_real" % self.base_dir
},
"from-file":
{
"type": "file",
"file": "%s/content" % self.base_dir
}
}
self.config = [
{
"value": "raw content2",
"key": "",
"title": "raw-content2"
},
{
"value": "%s" % self.httpd_passwd,
"key": "httpd-password",
"title": "httpd-password"
},
{
"value": "%s" % self.file_content,
"key": "from-file",
"title": "from-file"
},
{
"value": "",
"key": "cors-domain",
"title": "cors-domain"
},
{
"value": "raw content",
"key": "",
"title": "raw-value"
}
]
self.writeContent("%s/test-httpd-cors.cfg" % self.base_dir, "")
self.writeContent("%s/monitor-htpasswd" % self.base_dir, "")
self.writeContent("%s/content" % self.base_dir, self.file_content)
self.writeContent("%s/.httpd_pwd_real" % self.base_dir, self.httpd_passwd)
self.writeContent(self.httpd_passwd_bin, self.httpd_passwd_script)
os.chmod(self.httpd_passwd_bin, 0755)
def tearDown(self):
if os.path.exists(self.base_dir):
shutil.rmtree(self.base_dir)
def writeContent(self, file_path, config):
with open(file_path, 'w') as cfg:
cfg.write(config)
def generate_cors_string(self, cors_domain_list):
cors_string = ""
for domain in cors_domain_list:
if cors_string:
cors_string += '|'
cors_string += re.escape(domain)
cors_string = 'SetEnvIf Origin "^http(s)?://(.+\.)?(%s)$" origin_is=$0\n' % cors_string
cors_string += 'Header always set Access-Control-Allow-Origin %{origin_is}e env=origin_is'
return cors_string
def check_config(self):
config_parameter = os.path.join(self.config_dir, 'config.parameters.json')
config_parameter_json = json.load(open(config_parameter))
config_json = json.load(open(self.config_path))
for config in config_json:
if config["key"]:
self.assertTrue(config_parameter_json.has_key(config["key"]))
parameter = config_parameter_json[config["key"]]
else:
continue
if config["key"] == 'from-file':
self.assertTrue(os.path.exists(parameter['file']))
self.assertEqual(config["value"], open(parameter['file']).read())
elif config["key"] == 'httpd-password':
http_passwd = "%s/monitor-htpasswd" % self.base_dir
#XXX where \n bellow come from ?
command = 'htpasswd -cb %s admin %s%s' % (http_passwd, config["value"], '\n')
self.assertTrue(os.path.exists(parameter['file']))
self.assertTrue(os.path.exists(http_passwd))
self.assertEquals(config["value"], open(parameter['file']).read())
self.assertEquals(open(http_passwd).read(), command)
elif config["key"] == 'cors-domain':
cors_file = "%s/test-httpd-cors.cfg" % self.base_dir
self.assertTrue(os.path.exists(cors_file))
cors_string = self.generate_cors_string(config["value"].split())
self.assertEquals(cors_string, open(cors_file).read())
def check_cfg_config(self, config_list):
cfg_output = os.path.join(self.config_dir, 'config.cfg')
config_cfg = "[public]\n"
for config in config_list:
if config['key']:
config_cfg += '%s = %s\n' % (config['key'], config['value'])
with open(cfg_output) as cfg:
self.assertEqual(cfg.read(), config_cfg)
def test_write_config_default(self):
self.writeContent(self.config_path, json.dumps(self.config))
self.writeContent(os.path.join(self.config_dir, 'config.parameters.json'), json.dumps(self.parameter_dict))
cfg_output = os.path.join(self.config_dir, 'config.cfg')
instance = MonitorConfigWrite(
self.config_path,
self.httpd_passwd_bin,
cfg_output)
result = instance.applyConfigChanges()
self.assertTrue(os.path.exists(cfg_output))
# Check result of non raw parameter edition
self.assertEquals(result, {'cors-domain': True, 'from-file': True, 'httpd-password': True})
self.check_config()
self.check_cfg_config(self.config)
def test_write_config_parts(self):
# remove cors config
for element in self.config:
if element['key'] == "cors-domain":
element['key'] = ""
self.parameter_dict.pop("cors-domain")
self.writeContent(self.config_path, json.dumps(self.config))
self.writeContent(os.path.join(self.config_dir, 'config.parameters.json'), json.dumps(self.parameter_dict))
cfg_output = os.path.join(self.config_dir, 'config.cfg')
instance = MonitorConfigWrite(
self.config_path,
self.httpd_passwd_bin,
cfg_output)
result = instance.applyConfigChanges()
self.assertTrue(os.path.exists(cfg_output))
# Check result of non raw parameter edition
self.assertEquals(result, {'from-file': True, 'httpd-password': True})
self.check_config()
self.check_cfg_config(self.config)
def test_write_config_edit_values(self):
self.writeContent(self.config_path, json.dumps(self.config))
self.writeContent(os.path.join(self.config_dir, 'config.parameters.json'), json.dumps(self.parameter_dict))
cfg_output = os.path.join(self.config_dir, 'config.cfg')
instance = MonitorConfigWrite(
self.config_path,
self.httpd_passwd_bin,
cfg_output)
result = instance.applyConfigChanges()
self.assertTrue(os.path.exists(cfg_output))
self.assertEquals(result, {'cors-domain': True, 'from-file': True, 'httpd-password': True})
self.check_config()
self.check_cfg_config(self.config)
for config in self.config:
if config["key"] != "":
config["value"] = "changed.value"
self.writeContent(self.config_path, json.dumps(self.config))
result = instance.applyConfigChanges()
self.assertEquals(result, {'cors-domain': True, 'from-file': True, 'httpd-password': True})
self.check_config()
self.check_cfg_config(self.config)
# Add new domain in cors domain
for config in self.config:
if config["key"] != "cors-domain":
config["value"] = "changed.value new.domain.com"
self.writeContent(self.config_path, json.dumps(self.config))
result = instance.applyConfigChanges()
self.assertEquals(result, {'cors-domain': True, 'from-file': True, 'httpd-password': True})
self.check_config()
self.check_cfg_config(self.config)
# -*- coding: utf-8 -*-
import os, time
import sys
import shutil
import tempfile
import unittest
import json
from datetime import datetime
from slapos.monitor.runpromise import *
class MonitorPromiseTest(unittest.TestCase):
def setUp(self):
self.base_dir = tempfile.mkdtemp()
self.promise_dir = os.path.join(self.base_dir, 'promise')
self.report_dir = os.path.join(self.base_dir, 'report')
self.public_dir = os.path.join(self.base_dir, 'public')
self.private_dir = os.path.join(self.base_dir, 'private')
self.run_dir = os.path.join(self.base_dir, 'run')
os.mkdir(self.promise_dir)
os.mkdir(self.public_dir)
os.mkdir(self.private_dir)
os.mkdir(self.report_dir)
os.mkdir(self.run_dir)
def tearDown(self):
if os.path.exists(self.base_dir):
shutil.rmtree(self.base_dir)
def writePromiseOK(self, name):
content = """#!/bin/sh
echo "success"
exit 0
"""
promise_path = os.path.join(self.promise_dir, name)
self.writeContent(promise_path, content)
os.chmod(promise_path, 0755)
return promise_path
def writePromiseNOK(self, name):
content = """#!/bin/sh
echo "failed"
exit 2
"""
promise_path = os.path.join(self.promise_dir, name)
self.writeContent(promise_path, content)
os.chmod(promise_path, 0755)
return promise_path
def writeContent(self, file_path, config):
with open(file_path, 'w') as cfg:
cfg.write(config)
def getPromiseParser(self, name, promise_path, promise_type):
pid_path = os.path.join(self.run_dir, '%s.pid' % name)
if promise_type == "report":
output_path = os.path.join(self.private_dir, '%s.report.json' % name)
else:
output_path = os.path.join(self.public_dir, '%s.status.json' % name)
promise_cmd = [
'--pid_path',
'%s' % pid_path, '--output', output_path,
'--promise_script', promise_path,
'--promise_name', name, '--promise_type', promise_type,
'--monitor_url', 'https://monitor.test.com/share/jio_private/',
'--history_folder', self.public_dir,
'--instance_name', 'Monitor', '--hosting_name', 'Monitor ROOT']
arg_parser = parseArguments()
return arg_parser.parse_args(promise_cmd)
def test_promise_OK(self):
promise = self.writePromiseOK('promise_1')
parser = self.getPromiseParser('promise_1', promise, 'status')
runpromise(parser)
result_file = os.path.join(self.public_dir, 'promise_1.status.json')
self.assertTrue(os.path.exists(result_file))
result1 = json.loads(open(result_file).read())
change_time = result1.pop('change-time', 0)
change_date = datetime.fromtimestamp(change_time)
start_date = result1.pop('start-date')
expected_result = {'status': 'OK', 'hosting_subscription': 'Monitor ROOT',
'title': u'promise_1', 'instance': 'Monitor',
'_links':
{'monitor': {'href': 'https://monitor.test.com/share/jio_private/'}},
'message': 'success\n', 'type': 'status'}
self.assertEquals(expected_result, result1)
# second run
runpromise(parser)
result2 = json.loads(open(result_file).read())
change_time2 = result2.pop('change-time', 0)
result2.pop('start-date', '2016-08-05 00:00:00')
change_date2 = datetime.fromtimestamp(change_time2)
self.assertEquals(expected_result, result2)
self.assertEquals(change_date.strftime('%Y-%m-%d %H:%M:%S'),
change_date2.strftime('%Y-%m-%d %H:%M:%S'))
history_file = os.path.join(self.public_dir, 'promise_1.history.json')
self.assertTrue(os.path.exists(history_file))
history = json.load(open(history_file))
self.assertTrue(history['date'] > change_time)
self.assertTrue(len(history['data']) == 1)
result1['change-time'] = change_time
result1['start-date'] = start_date
result1.pop('_links')
self.assertEquals(history['data'][0], result1)
def test_promise_NOK(self):
promise = self.writePromiseNOK('promise_1')
parser = self.getPromiseParser('promise_1', promise, 'status')
runpromise(parser)
result_file = os.path.join(self.public_dir, 'promise_1.status.json')
self.assertTrue(os.path.exists(result_file))
result1 = json.loads(open(result_file).read())
change_time = result1.pop('change-time', 0)
change_date = datetime.fromtimestamp(change_time)
start_date = result1.pop('start-date')
expected_result = {'status': 'ERROR', 'hosting_subscription': 'Monitor ROOT',
'title': u'promise_1', 'instance': 'Monitor',
'_links':
{'monitor': {'href': 'https://monitor.test.com/share/jio_private/'}},
'message': 'failed\n', 'type': 'status'}
self.assertEquals(expected_result, result1)
# second run
runpromise(parser)
result2 = json.loads(open(result_file).read())
change_time2 = result2.pop('change-time', 0)
result2.pop('start-date', '2016-08-05 00:00:00')
change_date2 = datetime.fromtimestamp(change_time2)
self.assertEquals(expected_result, result2)
self.assertEquals(change_date.strftime('%Y-%m-%d %H:%M:%S'),
change_date2.strftime('%Y-%m-%d %H:%M:%S'))
def test_promise_mixed(self):
promise = self.writePromiseOK('promise_1')
parser = self.getPromiseParser('promise_1', promise, 'status')
runpromise(parser)
result_file = os.path.join(self.public_dir, 'promise_1.status.json')
self.assertTrue(os.path.exists(result_file))
result1 = json.loads(open(result_file).read())
change_time = result1.pop('change-time')
change_date = datetime.fromtimestamp(change_time)
start_date = result1.pop('start-date')
expected_result = {'status': 'OK', 'hosting_subscription': 'Monitor ROOT',
'title': u'promise_1', 'instance': 'Monitor',
'_links':
{'monitor': {'href': 'https://monitor.test.com/share/jio_private/'}},
'message': 'success\n', 'type': 'status'}
self.assertEquals(expected_result, result1)
# second run with failure
time.sleep(2)
promise = self.writePromiseNOK('promise_1')
parser = self.getPromiseParser('promise_1', promise, 'status')
expected_result['message'] = 'failed\n'
expected_result['status'] = 'ERROR'
runpromise(parser)
result2 = json.loads(open(result_file).read())
change_time2 = result2.pop('change-time')
result2.pop('start-date')
change_date2 = datetime.fromtimestamp(change_time2)
self.assertEquals(expected_result, result2)
self.assertNotEquals(change_date.strftime('%Y-%m-%d %H:%M:%S'),
change_date2.strftime('%Y-%m-%d %H:%M:%S'))
def test_report_OK(self):
promise = self.writePromiseOK('sample_report')
parser = self.getPromiseParser('sample_report', promise, 'report')
runpromise(parser)
result_file = os.path.join(self.private_dir, 'sample_report.report.json')
self.assertTrue(os.path.exists(result_file))
result1 = json.loads(open(result_file).read())
change_time = result1.pop('change-time', 0)
change_date = datetime.fromtimestamp(change_time)
start_date = result1.pop('start-date')
expected_result = {'status': 'OK', 'hosting_subscription': 'Monitor ROOT',
'title': 'sample_report', 'instance': 'Monitor',
'_links':
{'monitor': {'href': 'https://monitor.test.com/share/jio_private/'}},
'message': 'success\n', 'type': 'report'}
self.assertEquals(expected_result, result1)
# second run
runpromise(parser)
result2 = json.loads(open(result_file).read())
change_time2 = result2.pop('change-time', 0)
result2.pop('start-date', '2016-08-05 00:00:00')
change_date2 = datetime.fromtimestamp(change_time2)
self.assertEquals(expected_result, result2)
self.assertEquals(change_date.strftime('%Y-%m-%d %H:%M:%S'),
change_date2.strftime('%Y-%m-%d %H:%M:%S'))
history_file = os.path.join(self.public_dir, 'sample_report.history.json')
self.assertTrue(os.path.exists(history_file))
history = json.load(open(history_file))
result1['change-time'] = change_time
result1['start-date'] = start_date
#result1.pop('_links')
self.assertEquals(history, result1)
def test_report_mixed(self):
promise = self.writePromiseOK('sample_report')
parser = self.getPromiseParser('sample_report', promise, 'report')
runpromise(parser)
result_file = os.path.join(self.private_dir, 'sample_report.report.json')
self.assertTrue(os.path.exists(result_file))
result1 = json.loads(open(result_file).read())
change_time = result1.pop('change-time', 0)
change_date = datetime.fromtimestamp(change_time)
start_date = result1.pop('start-date')
expected_result = {'status': 'OK', 'hosting_subscription': 'Monitor ROOT',
'title': 'sample_report', 'instance': 'Monitor',
'_links':
{'monitor': {'href': 'https://monitor.test.com/share/jio_private/'}},
'message': 'success\n', 'type': 'report'}
self.assertEquals(expected_result, result1)
# second run with failure
time.sleep(2)
promise = self.writePromiseNOK('sample_report')
parser = self.getPromiseParser('sample_report', promise, 'report')
expected_result['message'] = 'failed\n'
expected_result['status'] = 'ERROR'
runpromise(parser)
result2 = json.loads(open(result_file).read())
change_time2 = result2.pop('change-time')
result2.pop('start-date')
change_date2 = datetime.fromtimestamp(change_time2)
self.assertEquals(expected_result, result2)
self.assertNotEquals(change_date.strftime('%Y-%m-%d %H:%M:%S'),
change_date2.strftime('%Y-%m-%d %H:%M:%S'))
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment