Commit 2acef14b authored by Thomas Gambier's avatar Thomas Gambier 🚴🏼

Fixes for "slapos service info"

 * xml2dict and dict2xml support the JSON format of parameter dict
 * since xml2dict doesn't support unicode, we convert unicode to str in
getInformation
parent c5c597de
......@@ -46,7 +46,7 @@ import six
from .exception import ResourceNotReady, ServerError, NotFoundError, \
ConnectionError
from .hateoas import SlapHateoasNavigator, ConnectionHelper
from slapos.util import loads, dumps, bytes2str, xml2dict, dict2xml, calculate_dict_hash
from slapos.util import loads, dumps, bytes2str, unicode2str, xml2dict, dict2xml, calculate_dict_hash
from xml.sax import saxutils
from zope.interface import implementer
......@@ -279,7 +279,7 @@ class OpenOrder(SlapRequester):
setattr(software_instance, '_%s' % key, value)
if raw_information["data"].get("text_content", None) is not None:
setattr(software_instance, '_parameter_dict', xml2dict(raw_information["data"]['text_content']))
setattr(software_instance, '_parameter_dict', xml2dict(unicode2str(raw_information["data"]['text_content'])))
else:
setattr(software_instance, '_parameter_dict', {})
......
......@@ -402,7 +402,7 @@ class TestCliNode(CliMixin):
class TestCliList(CliMixin):
def test_list(self):
"""
Test "slapos list" command output.
Test "slapos service list" command output.
"""
return_value = {
'instance1': slapos.slap.SoftwareInstance(_title='instance1', _software_release_url='SR1'),
......@@ -424,7 +424,7 @@ class TestCliList(CliMixin):
class TestCliInfo(CliMixin):
def test_info(self, _):
"""
Test "slapos info" command output.
Test "slapos service info" command output.
"""
setattr(self.conf, 'reference', 'instance1')
instance = slapos.slap.SoftwareInstance(
......@@ -444,7 +444,7 @@ class TestCliInfo(CliMixin):
def test_unknownReference(self, _):
"""
Test "slapos info" command output in case reference
Test "slapos service info" command output in case reference
of service is not known.
"""
setattr(self.conf, 'reference', 'instance1')
......@@ -456,7 +456,7 @@ class TestCliInfo(CliMixin):
class TestCliComputerList(CliMixin):
def test_computer_list(self):
"""
Test "slapos list" command output.
Test "slapos computer list" command output.
"""
return_value = {
'computer1': slapos.slap.hateoas.TempDocument(title='computer1', _reference='COMP-1'),
......@@ -492,8 +492,8 @@ class TestComputerCliInfo(CliMixin):
def test_computer_unknownReference(self, _):
"""
Test "slapos info" command output in case reference
of service is not known.
Test "slapos computer info" command output in case reference
of computer is not known.
"""
setattr(self.conf, 'reference', 'COMP-0')
with patch.object(slapos.slap.Computer, 'getInformation', side_effect=raiseNotFoundError):
......
......@@ -39,7 +39,7 @@ import httmock
import mock
import slapos.slap
from slapos.util import dumps, calculate_dict_hash
from slapos.util import dumps, calculate_dict_hash, dict2xml
class UndefinedYetException(Exception):
......@@ -1253,6 +1253,213 @@ class TestOpenOrder(SlapMixin):
self.assertEqual("URL_CONNECTION_PARAMETER",
computer_partition.getConnectionParameter('url'))
def test_getInformation(self):
self.slap = slapos.slap.slap()
parameter_dict = {
"_": {
"param1": "value1",
"param2_dict": {
"param2_param1": "",
"param2_param2_dict": {},
"param2_param3_dict": {"param": "value"}
}
}
}
link_keys = {
"type": {
"href": "urn:jio:get:portal_types/Hosting Subscription",
"name": "Hosting Subscription"
},
}
hosting_subscription_info_dict = {
"connection_parameter_list": {
"title": "my_connection_parameter_dict",
"default": [
{
"connection_key": "key1",
"connection_value": "value1"
},
{
"connection_key": "key2",
"connection_value": "value2"
},
],
"key": "field_my_connection_parameter_list",
"type": "StringField"
},
"title": {
"title": "Title",
"default": "myrefe",
"key": "field_my_title",
"type": "StringField"
},
"text_content": {
"title": "Parameter XML",
"default": dict2xml(parameter_dict),
"key": "field_my_text_content",
"type": "TextAreaField"
},
"slap_state": {
"title": "Slap State",
"default": "stop_requested",
"key": "field_my_slap_state",
"type": "StringField"
},
"source_title": {
"title": "Current Organisation",
"default": "",
"key": "field_my_source_title",
"type": "StringField"
},
"url_string": {
"title": "Url String",
"default": "https://lab.nexedi.com/nexedi/slapos/raw/1.0.115/software/kvm/software.cfg",
"key": "field_my_url_string",
"type": "StringField"
}
}
view_dict = {}
for k in hosting_subscription_info_dict:
view_dict["my_%s" % k] = hosting_subscription_info_dict[k]
view_dict["_embedded"] = {
"form_definition": {
"update_action_title": "",
"_debug": "traverse",
"pt": "form_view_editable",
"title": "HostingSubscription_viewAsHateoas",
"_links": {
"portal": {
"href": "urn:jio:get:erp5",
"name": "ERP5"
},
},
"action": "HostingSubscription_editWebMode",
"update_action": ""
}
}
view_dict["form_id"] = {
"title": "form_id",
"default": "HostingSubscription_viewAsHateoas",
"key": "form_id",
"type": "StringField"
}
view_dict["_links"] = {
"traversed_document": {
"href": "urn:jio:get:hosting_subscription_module/my_refe_id",
"name": "hosting_subscription_module/my_refe_id",
"title": "myrefe"
},
"self": {
"href": "https://localhost/hosting_subscription_module/my_refe_id/HostingSubscription_viewAsHateoas"
},
"form_definition": {
"href": "urn:jio:get:portal_skins/slapos_hal_json_style/HostingSubscription_viewAsHateoas",
"name": "HostingSubscription_viewAsHateoas"
}
}
hateoas_url = "/custom_hateoas_url"
def handler(url, req):
qs = parse.parse_qs(url.query)
if (url.path == '/getHateoasUrl'):
return {
'status_code': 200,
'content': "http://localhost" + hateoas_url
}
elif (url.path == hateoas_url):
return {
'status_code': 200,
'content': {
"default_view": "view",
"_links": {
"traverse": {
"href": "https://localhost/SLAPTEST_getHateoas?mode=traverse{&relative_url,view}",
"name": "Traverse",
"templated": True
},
"raw_search": {
"href": "https://localhost/SLAPTEST_getHateoas?mode=search{&query,select_list*,limit*,group_by*,sort_on*,local_roles*,selection_domain*}",
"name": "Raw Search",
"templated": True
},
},
"_debug": "root",
"title": "Hateoas"
}
}
elif (url.path == '/SLAPTEST_getHateoas'):
if ("mode=search" in url.query):
return {
'status_code': 200,
'content': {
"_sort_on": "",
"_embedded": {
"contents": [
{
"_links": {
"self": {
"href": "urn:jio:get:hosting_subscription_module/my_refe_id"
}
},
"relative_url": "hosting_subscription_module/my_refe_id",
"title": "myrefe"
}
]
},
"_debug": "search",
"_limit": "200",
"_local_roles": "",
"_query": "portal_type:\"Hosting Subscription\" AND validation_state:validated AND title:=\"myrefe\"",
"_select_list": [
"title",
"relative_url"
],
"_links": {
"self": {
"href": "https://localhost/SLAPTEST_getHateoas"
},
"portal": {
"href": "urn:jio:get:erp5",
"name": "ERP5"
},
"site_root": {
"href": "urn:jio:get:web_site_module/hateoas",
"name": "Hateoas"
}
},
"_group_by": "",
"_selection_domain": ""
}
}
elif ("mode=traverse" in url.query):
return {
'status_code': 200,
'content': {
"_embedded": {
"_view": view_dict
},
"_links": link_keys,
"_debug": "traverse",
"title": "myrefe"
}
}
with httmock.HTTMock(handler):
self.slap.initializeConnection('http://%s' % self.id())
open_order = self.slap.registerOpenOrder()
computer_guid = self._getTestComputerId()
requested_partition_id = self.id()
software_instance = open_order.getInformation('myrefe')
self.assertIsInstance(software_instance, slapos.slap.SoftwareInstance)
for key in hosting_subscription_info_dict:
if key not in link_keys:
self.assertEqual(getattr(software_instance, '_' + key), hosting_subscription_info_dict[key]["default"])
self.assertEqual(software_instance._parameter_dict, parameter_dict)
self.assertEqual(software_instance._requested_state, hosting_subscription_info_dict['slap_state']["default"])
self.assertEqual(software_instance._connection_dict, hosting_subscription_info_dict['connection_parameter_list']["default"])
self.assertEqual(software_instance._software_release_url, hosting_subscription_info_dict['url_string']["default"])
class TestSoftwareProductCollection(SlapMixin):
def setUp(self):
......
......@@ -26,7 +26,7 @@
##############################################################################
import os
import slapos.util
from slapos.util import string_to_boolean
from slapos.util import string_to_boolean, unicode2str
import tempfile
import unittest
import shutil
......@@ -120,7 +120,7 @@ class TestUtil(unittest.TestCase):
for value in [True, False, 1, '1', 't', 'tru', 'truelle', 'f', 'fals', 'falsey']:
self.assertRaises(ValueError, string_to_boolean, value)
xml2dict_xml = slapos.util.bytes2str(b"""<?xml version='1.0' encoding='utf-8'?>
xml2dict0_xml = slapos.util.bytes2str(b"""<?xml version='1.0' encoding='utf-8'?>
<instance>
<parameter id="badstr">\xc5\x81</parameter>
<parameter id="badu">\xc5\x81</parameter>
......@@ -134,7 +134,7 @@ class TestUtil(unittest.TestCase):
</instance>
""")
xml2dict_indict = {
xml2dict0_indict = {
u'ukey': u'ustr',
'key': 'str',
'int': 1,
......@@ -146,7 +146,7 @@ class TestUtil(unittest.TestCase):
'badu': u'\u0141'
}
xml2dict_outdict = {
xml2dict0_outdict = {
'badstr': u'\u0141',
'badu': u'\u0141',
'emptystr': None,
......@@ -157,17 +157,61 @@ class TestUtil(unittest.TestCase):
'none': 'None',
'ukey': 'ustr'}
def test_xml2dict(self):
def test_xml2dict0(self):
self.assertEqual(
self.xml2dict_outdict,
slapos.util.xml2dict(self.xml2dict_xml)
dict,
type(slapos.util.xml2dict(self.xml2dict0_xml))
)
self.assertEqual(
self.xml2dict0_outdict,
slapos.util.xml2dict(self.xml2dict0_xml)
)
def test_dict2xml0(self):
self.maxDiff = None
self.assertEqual(
self.xml2dict0_xml,
slapos.util.dict2xml(self.xml2dict0_indict)
)
xml2dict1_xml = """<?xml version='1.0' encoding='utf-8'?>
<instance>
<parameter id="_">{
"param1": "value1",
"param2_dict": {
"param2_param1": "",
"param2_param2_dict": {},
"param2_param3_dict": {
"param": "value"
}
}
}</parameter>
</instance>
"""
xml2dict1_dict = {
"_": {
"param1": "value1",
"param2_dict": {
"param2_param1": "",
"param2_param2_dict": {},
"param2_param3_dict": {"param": "value"}
}
}
}
def test_xml2dict1(self):
self.assertEqual(
self.xml2dict1_dict,
slapos.util.xml2dict(self.xml2dict1_xml)
)
def test_dict2xml(self):
def test_dict2xml1(self):
self.maxDiff = None
self.assertEqual(
self.xml2dict_xml,
slapos.util.dict2xml(self.xml2dict_indict)
self.xml2dict1_xml,
slapos.util.dict2xml(self.xml2dict1_dict)
)
......
......@@ -33,6 +33,7 @@ import socket
import struct
import subprocess
import sqlite3
import json
from xml_marshaller.xml_marshaller import dumps, loads
from lxml import etree
import six
......@@ -162,14 +163,22 @@ else:
def dict2xml(dictionary):
instance = etree.Element('instance')
for k, v in sorted(six.iteritems(dictionary)):
if isinstance(k, bytes):
k = k.decode('utf-8')
if isinstance(v, bytes):
v = v.decode('utf-8')
elif not isinstance(v, six.text_type):
v = str(v)
if len(dictionary) == 1 and '_' in dictionary:
etree.SubElement(instance, "parameter",
attrib={'id': '_'}).text = json.dumps(
dictionary['_'],
separators=(',', ': '),
sort_keys=True,
indent=4)
else:
for k, v in sorted(six.iteritems(dictionary)):
if isinstance(k, bytes):
k = k.decode('utf-8')
if isinstance(v, bytes):
v = v.decode('utf-8')
elif not isinstance(v, six.text_type):
v = str(v)
etree.SubElement(instance, "parameter",
attrib={'id': k}).text = v
return bytes2str(etree.tostring(instance,
pretty_print=True,
......@@ -189,6 +198,8 @@ def xml2dict(xml):
else:
value = element.text
result_dict[key] = value
if len(result_dict) == 1 and '_' in result_dict:
result_dict['_'] = json.loads(result_dict['_'])
return result_dict
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment