Commit 62f85140 authored by Cédric Le Ninivin's avatar Cédric Le Ninivin

slapos_erp5: Move Default Scenario Test to jIO API

parent 74ef6cd6
...@@ -22,13 +22,14 @@ ...@@ -22,13 +22,14 @@
import six import six
import six.moves.urllib.parse import six.moves.urllib.parse
from erp5.component.test.testSlapOSCloudSecurityGroup import TestSlapOSSecurityMixin from erp5.component.test.testSlapOSCloudSecurityGroup import TestSlapOSSecurityMixin
from erp5.component.test.testSlapOSJIOAPI import TestSlapOSJIOAPIMixin
from erp5.component.test.SlapOSTestCaseMixin import changeSkin from erp5.component.test.SlapOSTestCaseMixin import changeSkin
import re import re
import xml_marshaller import xml_marshaller
from AccessControl.SecurityManagement import getSecurityManager, \ from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager setSecurityManager
class DefaultScenarioMixin(TestSlapOSSecurityMixin): class DefaultScenarioMixin(TestSlapOSSecurityMixin, TestSlapOSJIOAPIMixin):
def afterSetUp(self): def afterSetUp(self):
TestSlapOSSecurityMixin.afterSetUp(self) TestSlapOSSecurityMixin.afterSetUp(self)
...@@ -41,6 +42,9 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -41,6 +42,9 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
preferred_cloud_contract_enabled=True preferred_cloud_contract_enabled=True
) )
self.web_site = self.portal.web_site_module.hostingjs
self.callUpdateRevision()
# Enable alarms for regularisation request # Enable alarms for regularisation request
self.portal.portal_alarms.slapos_crm_create_regularisation_request.setEnabled(True) self.portal.portal_alarms.slapos_crm_create_regularisation_request.setEnabled(True)
self.portal.portal_alarms.slapos_crm_invalidate_suspended_regularisation_request.setEnabled(True) self.portal.portal_alarms.slapos_crm_invalidate_suspended_regularisation_request.setEnabled(True)
...@@ -123,17 +127,23 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -123,17 +127,23 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
agent.setAccessStatus("#access ") agent.setAccessStatus("#access ")
def requestComputeNode(self, title): def requestComputeNode(self, title):
requestXml = self.portal.portal_slap.requestComputer(title) request_data = self.postToApi({
"portal_type": "Compute Node",
"title": title
})
self.tic() self.tic()
self.assertTrue('marshal' in requestXml) self.callUpdateRevisionAndTic()
compute_node = xml_marshaller.xml_marshaller.loads(requestXml) return request_data["compute_node_id"]
compute_node_id = getattr(compute_node, '_computer_id', None)
self.assertNotEqual(None, compute_node_id)
return compute_node_id.encode('UTF-8')
def supplySoftware(self, server, url, state='available'): def supplySoftware(self, server, url, state='available'):
self.portal.portal_slap.supplySupply(url, server.getReference(), state) self.postToApi({
"portal_type": "Software Installation",
"compute_node_id": server.getReference(),
"software_release_uri": url,
"state": state,
})
self.tic() self.tic()
self.callUpdateRevisionAndTic()
software_installation = self.portal.portal_catalog.getResultValue( software_installation = self.portal.portal_catalog.getResultValue(
portal_type='Software Installation', portal_type='Software Installation',
...@@ -192,28 +202,43 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -192,28 +202,43 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
def formatComputeNode(self, compute_node, partition_count=10): def formatComputeNode(self, compute_node, partition_count=10):
compute_node_dict = dict( compute_node_dict = dict(
software_root='/opt', compute_node_id=compute_node.getReference(),
reference=compute_node.getReference(), portal_type="Compute Node",
netmask='255.255.255.0',
address='128.0.0.1',
instance_root='/srv'
) )
compute_node_dict['partition_list'] = [] compute_node_dict['compute_partition_list'] = []
a = compute_node_dict['partition_list'].append a = compute_node_dict['compute_partition_list'].append
for i in range(1, partition_count+1): for i in range(1, partition_count+1):
a(dict( a(dict(
reference='part%s' % i, partition_id='part%s' % i,
tap=dict(name='tap%s' % i), ip_list=[
address_list=[ {
dict(addr='p%sa1' % i, netmask='p%sn1' % i), "ip-address":'p%sa1' % i,
dict(addr='p%sa2' % i, netmask='p%sn2' % i) "network-interface": 'tap%s' % i,
},
{
"ip-address":'p%sa2' % i,
"network-interface": 'tap%s' % i,
},
{
"ip-address":'p%sa1' % i,
"gateway-ip-address":'p%sn1' % i,
"network-interface": 'tap%s' % i,
"netmask": '255.255.255.0',
"network-address": '128.0.0.1',
},
{
"ip-address":'p%sa2' % i,
"gateway-ip-address":'p%sn2' % i,
"network-interface": 'tap%s' % i,
"netmask": '255.255.255.0',
"network-address": '128.0.0.1',
}
] ]
)) ))
sm = getSecurityManager() sm = getSecurityManager()
try: try:
self.login(compute_node.getUserId()) self.login(compute_node.getUserId())
self.portal.portal_slap.loadComputerConfigurationFromXML( self.putToApi(compute_node_dict)
xml_marshaller.xml_marshaller.dumps(compute_node_dict))
self.tic() self.tic()
self.assertEqual(partition_count, self.assertEqual(partition_count,
len(compute_node.contentValues(portal_type='Compute Partition'))) len(compute_node.contentValues(portal_type='Compute Partition')))
...@@ -225,21 +250,25 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -225,21 +250,25 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
compute_node_user_id = compute_node.getUserId() compute_node_user_id = compute_node.getUserId()
try: try:
self.login(compute_node_user_id) self.login(compute_node_user_id)
compute_node_xml = self.portal.portal_slap.getFullComputerInformation( software_release_list = self.allDocsToApi({
computer_id=compute_node.getReference()) "portal_type": "Software Installation",
if not isinstance(compute_node_xml, str): "compute_node_id": compute_node.getReference()
compute_node_xml = compute_node_xml.getBody() })["result_list"]
slap_compute_node = xml_marshaller.xml_marshaller.loads(compute_node_xml) for software_release in software_release_list:
self.assertEqual('Computer', slap_compute_node.__class__.__name__) if software_release["state"] == 'destroyed':
for software_release in slap_compute_node._software_release_list: self.putToApi({
if software_release._requested_state == 'destroyed': "portal_type": "Software Installation",
self.portal.portal_slap.destroyedSoftwareRelease( "compute_noode_id": compute_node.getReference(),
software_release._software_release, "software_release_uri": software_release["software_release_uri"],
compute_node.getReference()) "reported_state": "destroyed",
})
else: else:
self.portal.portal_slap.availableSoftwareRelease( self.putToApi({
software_release._software_release, "portal_type": "Software Installation",
compute_node.getReference()) "compute_noode_id": compute_node.getReference(),
"software_release_uri": software_release["software_release_uri"],
"reported_state": "available",
})
finally: finally:
setSecurityManager(sm) setSecurityManager(sm)
self.tic() self.tic()
...@@ -248,26 +277,27 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -248,26 +277,27 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
sm = getSecurityManager() sm = getSecurityManager()
compute_node_user_id = compute_node.getUserId() compute_node_user_id = compute_node.getUserId()
try: try:
self.callUpdateRevisionAndTic()
self.login(compute_node_user_id) self.login(compute_node_user_id)
compute_node_xml = self.portal.portal_slap.getFullComputerInformation( instance_list = self.allDocsToApi({
computer_id=compute_node.getReference()) "portal_type": "Software Instance",
if not isinstance(compute_node_xml, str): "compute_node_id": compute_node.getReference()
compute_node_xml = compute_node_xml.getBody() })["result_list"]
slap_compute_node = xml_marshaller.xml_marshaller.loads(compute_node_xml)
self.assertEqual('Computer', slap_compute_node.__class__.__name__)
destroyed_partition_id_list = [] destroyed_partition_id_list = []
for partition in slap_compute_node._computer_partition_list: for partition in instance_list:
if partition._requested_state == 'destroyed' \ if partition["state"] == 'destroyed':
and partition._need_modification == 1: self.putToApi({
self.portal.portal_slap.destroyedComputerPartition(compute_node.getReference(), "portal_type": "Software Instance",
partition._partition_id.encode("UTF-8") "reference": partition["reference"],
) "reported_state": "destroyed"
destroyed_partition_id_list.append(partition._partition_id.encode("UTF-8")) })
destroyed_partition_id_list.append(partition["compute_partition_id"])
finally: finally:
setSecurityManager(sm) setSecurityManager(sm)
self.tic() self.tic()
self.stepCallSlaposFreeComputePartitionAlarm() self.stepCallSlaposFreeComputePartitionAlarm()
self.tic() self.tic()
self.callUpdateRevisionAndTic()
free_partition_id_list = [] free_partition_id_list = []
for partition in compute_node.contentValues(portal_type='Compute Partition'): for partition in compute_node.contentValues(portal_type='Compute Partition'):
if partition.getReference() in destroyed_partition_id_list \ if partition.getReference() in destroyed_partition_id_list \
...@@ -277,25 +307,25 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -277,25 +307,25 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
def simulateSlapgridCP(self, compute_node): def simulateSlapgridCP(self, compute_node):
sm = getSecurityManager() sm = getSecurityManager()
compute_node_reference = compute_node.getReference()
compute_node_user_id = compute_node.getUserId() compute_node_user_id = compute_node.getUserId()
try: try:
self.login(compute_node_user_id) self.login(compute_node_user_id)
compute_node_xml = self.portal.portal_slap.getFullComputerInformation( instance_list = self.allDocsToApi({
computer_id=compute_node.getReference()) "portal_type": "Software Instance",
if not isinstance(compute_node_xml, str): "compute_node_id": compute_node.getReference()
compute_node_xml = compute_node_xml.getBody() })["result_list"]
slap_compute_node = xml_marshaller.xml_marshaller.loads(compute_node_xml) for partition in instance_list:
self.assertEqual('Computer', slap_compute_node.__class__.__name__) if partition["state"] in ('started', 'stopped'):
for partition in slap_compute_node._computer_partition_list: partition = self.getToApi({
if partition._requested_state in ('started', 'stopped') \ "portal_type": "Software Instance",
and partition._need_modification == 1: "reference": partition["reference"],
instance_reference = partition._instance_guid.encode('UTF-8') })
ip_list = partition._parameter_dict['ip_list'] instance_reference = partition["reference"]
connection_xml = xml_marshaller.xml_marshaller.dumps(dict( ip_list = partition['ip_list']
connection_dict = dict(
url_1 = 'http://%s/' % ip_list[0][1], url_1 = 'http://%s/' % ip_list[0][1],
url_2 = 'http://%s/' % ip_list[1][1], url_2 = 'http://%s/' % ip_list[1][1],
)) )
self.login() self.login()
instance_user_id = self.portal.portal_catalog.getResultValue( instance_user_id = self.portal.portal_catalog.getResultValue(
reference=instance_reference, portal_type="Software Instance").getUserId() reference=instance_reference, portal_type="Software Instance").getUserId()
...@@ -303,23 +333,27 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -303,23 +333,27 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
oldsm = getSecurityManager() oldsm = getSecurityManager()
try: try:
self.login(instance_user_id) self.login(instance_user_id)
self.portal.portal_slap.setComputerPartitionConnectionXml( self.putToApi({
computer_id=compute_node_reference, "portal_type": "Software Instance",
computer_partition_id=partition._partition_id, "reference": partition["reference"],
connection_xml=connection_xml "connection_parameters": connection_dict,
) })
for slave in partition._parameter_dict['slave_instance_list']: hosted_instance_list = self.allDocsToApi({
slave_reference = slave['slave_reference'] "portal_type": "Shared Instance",
connection_xml = xml_marshaller.xml_marshaller.dumps(dict( "host_instance_reference": partition["reference"],
url_1 = 'http://%s/%s' % (ip_list[0][1], slave_reference), "state": "started"
url_2 = 'http://%s/%s' % (ip_list[1][1], slave_reference) })["result_list"]
)) for hosted_instance in hosted_instance_list:
self.portal.portal_slap.setComputerPartitionConnectionXml( hosted_reference = hosted_instance['reference']
computer_id=compute_node_reference, connection_dict = dict(
computer_partition_id=partition._partition_id, url_1 = 'http://%s/%s' % (ip_list[0][1], hosted_reference),
connection_xml=connection_xml, url_2 = 'http://%s/%s' % (ip_list[1][1], hosted_reference)
slave_reference=slave_reference
) )
self.putToApi({
"portal_type": "Software Instance",
"reference": hosted_reference,
"connection_parameters": connection_dict,
})
finally: finally:
setSecurityManager(oldsm) setSecurityManager(oldsm)
...@@ -328,16 +362,25 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -328,16 +362,25 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
self.tic() self.tic()
def personRequestInstanceNotReady(self, **kw): def personRequestInstanceNotReady(self, **kw):
response = self.portal.portal_slap.requestComputerPartition(**kw) kw["portal_type"] = "Software Instance"
status = getattr(response, 'status', None) response = self.postToApi(kw)
self.assertEqual(408, status) response.pop("$schema")
response.pop("debug_id", None)
expected_dict = {
"message": "Software Instance Not Ready",
"name": "SoftwareInstanceNotReady",
"status": 102,
}
if response != expected_dict:
self.assertIsNone(response['compute_node_id'])
self.assertIsNone(response['compute_partition_id'])
self.tic() self.tic()
def personRequestInstance(self, **kw): def personRequestInstance(self, **kw):
response = self.portal.portal_slap.requestComputerPartition(**kw) kw["portal_type"] = "Software Instance"
self.assertTrue(isinstance(response, str), "response is not a string: %s" % response) software_instance = self.postToApi(kw)
software_instance = xml_marshaller.xml_marshaller.loads(response) self.assertTrue(software_instance.get("portal_type", "") in ("Slave Instance", "Software Instance"))
self.assertEqual('SoftwareInstance', software_instance.__class__.__name__)
self.tic() self.tic()
return software_instance return software_instance
...@@ -346,20 +389,21 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -346,20 +389,21 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
self.login(person_user_id) self.login(person_user_id)
self.personRequestInstanceNotReady( self.personRequestInstanceNotReady(
software_release=software_release, software_release_uri=software_release,
software_type=software_type, software_type=software_type,
partition_reference=instance_title, title=instance_title,
shared_xml='<marshal><bool>1</bool></marshal>' shared=True
) )
self.stepCallSlaposAllocateInstanceAlarm() self.stepCallSlaposAllocateInstanceAlarm()
self.tic() self.tic()
self.callUpdateRevisionAndTic()
self.personRequestInstance( self.personRequestInstance(
software_release=software_release, software_release_uri=software_release,
software_type=software_type, software_type=software_type,
partition_reference=instance_title, title=instance_title,
shared_xml='<marshal><bool>1</bool></marshal>' shared=True
) )
# now instantiate it on compute_node and set some nice connection dict # now instantiate it on compute_node and set some nice connection dict
...@@ -391,11 +435,11 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -391,11 +435,11 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
self.login(person_user_id) self.login(person_user_id)
self.personRequestInstanceNotReady( self.personRequestInstanceNotReady(
software_release=software_release, software_release_uri=software_release,
software_type=software_type, software_type=software_type,
partition_reference=instance_title, title=instance_title,
shared_xml='<marshal><bool>1</bool></marshal>', shared=True,
state='<marshal><string>destroyed</string></marshal>' state='destroyed',
) )
# let's find instances of user and check connection strings # let's find instances of user and check connection strings
...@@ -411,10 +455,10 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -411,10 +455,10 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
self.login(person_user_id) self.login(person_user_id)
self.personRequestInstanceNotReady( self.personRequestInstanceNotReady(
software_release=software_release, software_release_uri=software_release,
software_type=software_type, software_type=software_type,
partition_reference=instance_title, title=instance_title,
state='<marshal><string>destroyed</string></marshal>' state='destroyed'
) )
# now instantiate it on compute_node and set some nice connection dict # now instantiate it on compute_node and set some nice connection dict
...@@ -495,9 +539,9 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -495,9 +539,9 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
self.login(person_user_id) self.login(person_user_id)
self.personRequestInstanceNotReady( self.personRequestInstanceNotReady(
software_release=software_release, software_release_uri=software_release,
software_type=software_type, software_type=software_type,
partition_reference=instance_title, title=instance_title,
) )
self.checkCloudContract(person_user_id, person_reference, self.checkCloudContract(person_user_id, person_reference,
...@@ -505,11 +549,12 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -505,11 +549,12 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
self.stepCallSlaposAllocateInstanceAlarm() self.stepCallSlaposAllocateInstanceAlarm()
self.tic() self.tic()
self.callUpdateRevisionAndTic()
self.personRequestInstance( self.personRequestInstance(
software_release=software_release, software_release_uri=software_release,
software_type=software_type, software_type=software_type,
partition_reference=instance_title, title=instance_title,
) )
# now instantiate it on compute_node and set some nice connection dict # now instantiate it on compute_node and set some nice connection dict
...@@ -716,7 +761,201 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin): ...@@ -716,7 +761,201 @@ class DefaultScenarioMixin(TestSlapOSSecurityMixin):
self.login(person_user_id) self.login(person_user_id)
self.personRequestInstanceNotReady( self.personRequestInstanceNotReady(
software_release=software_release, software_release_uri=software_release,
software_type=software_type, software_type=software_type,
partition_reference=instance_title, title=instance_title,
) )
class DefaultScenarioMixinSlapTool(DefaultScenarioMixin):
"""
Same as default scenario mixin but with SlapTool methods
"""
def requestComputeNode(self, title):
requestXml = self.portal.portal_slap.requestComputer(title)
self.tic()
self.assertTrue('marshal' in requestXml)
compute_node = xml_marshaller.xml_marshaller.loads(requestXml)
compute_node_id = getattr(compute_node, '_computer_id', None)
self.assertNotEqual(None, compute_node_id)
return compute_node_id.encode('UTF-8')
def supplySoftware(self, server, url, state='available'):
self.portal.portal_slap.supplySupply(url, server.getReference(), state)
self.tic()
software_installation = self.portal.portal_catalog.getResultValue(
portal_type='Software Installation',
url_string=url,
default_aggregate_uid=server.getUid())
self.assertNotEqual(None, software_installation)
if state=='available':
self.assertEqual('start_requested', software_installation.getSlapState())
else:
self.assertEqual('destroy_requested', software_installation.getSlapState())
def formatComputeNode(self, compute_node, partition_count=10):
compute_node_dict = dict(
software_root='/opt',
reference=compute_node.getReference(),
netmask='255.255.255.0',
address='128.0.0.1',
instance_root='/srv'
)
compute_node_dict['partition_list'] = []
a = compute_node_dict['partition_list'].append
for i in range(1, partition_count+1):
a(dict(
reference='part%s' % i,
tap=dict(name='tap%s' % i),
address_list=[
dict(addr='p%sa1' % i, netmask='p%sn1' % i),
dict(addr='p%sa2' % i, netmask='p%sn2' % i)
]
))
sm = getSecurityManager()
try:
self.login(compute_node.getUserId())
self.portal.portal_slap.loadComputerConfigurationFromXML(
xml_marshaller.xml_marshaller.dumps(compute_node_dict))
self.tic()
self.assertEqual(partition_count,
len(compute_node.contentValues(portal_type='Compute Partition')))
finally:
setSecurityManager(sm)
def simulateSlapgridSR(self, compute_node):
sm = getSecurityManager()
compute_node_user_id = compute_node.getUserId()
try:
self.login(compute_node_user_id)
compute_node_xml = self.portal.portal_slap.getFullComputerInformation(
computer_id=compute_node.getReference())
if not isinstance(compute_node_xml, str):
compute_node_xml = compute_node_xml.getBody()
slap_compute_node = xml_marshaller.xml_marshaller.loads(compute_node_xml)
self.assertEqual('Computer', slap_compute_node.__class__.__name__)
for software_release in slap_compute_node._software_release_list:
if software_release._requested_state == 'destroyed':
self.portal.portal_slap.destroyedSoftwareRelease(
software_release._software_release,
compute_node.getReference())
else:
self.portal.portal_slap.availableSoftwareRelease(
software_release._software_release,
compute_node.getReference())
finally:
setSecurityManager(sm)
self.tic()
def simulateSlapgridUR(self, compute_node):
sm = getSecurityManager()
compute_node_user_id = compute_node.getUserId()
try:
self.login(compute_node_user_id)
compute_node_xml = self.portal.portal_slap.getFullComputerInformation(
computer_id=compute_node.getReference())
if not isinstance(compute_node_xml, str):
compute_node_xml = compute_node_xml.getBody()
slap_compute_node = xml_marshaller.xml_marshaller.loads(compute_node_xml)
self.assertEqual('Computer', slap_compute_node.__class__.__name__)
destroyed_partition_id_list = []
for partition in slap_compute_node._computer_partition_list:
if partition._requested_state == 'destroyed' \
and partition._need_modification == 1:
self.portal.portal_slap.destroyedComputerPartition(compute_node.getReference(),
partition._partition_id.encode("UTF-8")
)
destroyed_partition_id_list.append(partition._partition_id.encode("UTF-8"))
finally:
setSecurityManager(sm)
self.tic()
self.stepCallSlaposFreeComputePartitionAlarm()
self.tic()
free_partition_id_list = []
for partition in compute_node.contentValues(portal_type='Compute Partition'):
if partition.getReference() in destroyed_partition_id_list \
and partition.getSlapState() == 'free':
free_partition_id_list.append(partition.getReference())
self.assertSameSet(destroyed_partition_id_list, free_partition_id_list)
def simulateSlapgridCP(self, compute_node):
sm = getSecurityManager()
compute_node_reference = compute_node.getReference()
compute_node_user_id = compute_node.getUserId()
try:
self.login(compute_node_user_id)
compute_node_xml = self.portal.portal_slap.getFullComputerInformation(
computer_id=compute_node.getReference())
if not isinstance(compute_node_xml, str):
compute_node_xml = compute_node_xml.getBody()
slap_compute_node = xml_marshaller.xml_marshaller.loads(compute_node_xml)
self.assertEqual('Computer', slap_compute_node.__class__.__name__)
for partition in slap_compute_node._computer_partition_list:
if partition._requested_state in ('started', 'stopped') \
and partition._need_modification == 1:
instance_reference = partition._instance_guid.encode('UTF-8')
ip_list = partition._parameter_dict['ip_list']
connection_xml = xml_marshaller.xml_marshaller.dumps(dict(
url_1 = 'http://%s/' % ip_list[0][1],
url_2 = 'http://%s/' % ip_list[1][1],
))
self.login()
instance_user_id = self.portal.portal_catalog.getResultValue(
reference=instance_reference, portal_type="Software Instance").getUserId()
oldsm = getSecurityManager()
try:
self.login(instance_user_id)
self.portal.portal_slap.setComputerPartitionConnectionXml(
computer_id=compute_node_reference,
computer_partition_id=partition._partition_id,
connection_xml=connection_xml
)
for slave in partition._parameter_dict['slave_instance_list']:
slave_reference = slave['slave_reference']
connection_xml = xml_marshaller.xml_marshaller.dumps(dict(
url_1 = 'http://%s/%s' % (ip_list[0][1], slave_reference),
url_2 = 'http://%s/%s' % (ip_list[1][1], slave_reference)
))
self.portal.portal_slap.setComputerPartitionConnectionXml(
computer_id=compute_node_reference,
computer_partition_id=partition._partition_id,
connection_xml=connection_xml,
slave_reference=slave_reference
)
finally:
setSecurityManager(oldsm)
finally:
setSecurityManager(sm)
self.tic()
def requestParameterBackwardCompatibility(self, request_dict):
request_dict["partition_reference"] = request_dict.pop("title")
if "software_release_uri" in request_dict:
request_dict["software_release"] = request_dict.pop("software_release_uri")
if "shared" in request_dict:
shared = request_dict.pop("shared")
if shared:
request_dict["shared_xml"] = '<marshal><bool>1</bool></marshal>'
if "state" in request_dict:
request_dict["state"] = "<marshal><string>%s</string></marshal>" % request_dict["state"]
return request_dict
def personRequestInstanceNotReady(self, **kw):
response = self.portal.portal_slap.requestComputerPartition(**self.requestParameterBackwardCompatibility(kw))
status = getattr(response, 'status', None)
self.assertEqual(408, status)
self.tic()
def personRequestInstance(self, **kw):
response = self.portal.portal_slap.requestComputerPartition(**self.requestParameterBackwardCompatibility(kw))
self.assertTrue(isinstance(response, str), "response is not a string: %s" % response)
software_instance = xml_marshaller.xml_marshaller.loads(response)
self.assertEqual('SoftwareInstance', software_instance.__class__.__name__)
self.tic()
return software_instance
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# #
############################################################################## ##############################################################################
from erp5.component.test.SlapOSTestCaseDefaultScenarioMixin import DefaultScenarioMixin from erp5.component.test.SlapOSTestCaseDefaultScenarioMixin import DefaultScenarioMixin, DefaultScenarioMixinSlapTool
from DateTime import DateTime from DateTime import DateTime
import re import re
...@@ -571,3 +571,6 @@ class TestSlapOSDefaultCRMEscalation(DefaultScenarioMixin): ...@@ -571,3 +571,6 @@ class TestSlapOSDefaultCRMEscalation(DefaultScenarioMixin):
# check final document state # check final document state
self.assertPersonDocumentCoverage(person) self.assertPersonDocumentCoverage(person)
class TestSlapOSDefaultScenarioSlapTool(TestSlapOSDefaultScenario, DefaultScenarioMixinSlapTool):
pass
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment