Commit 73c42017 authored by Vivien Alger's avatar Vivien Alger

Mass Upgrade: Upgraded tests and small refactoring

parent 8500e7ad
......@@ -50,26 +50,30 @@
</item>
<item>
<key> <string>_body</string> </key>
<value> <string>person = context.getDestinationSectionValue(portal_type="Person")\n
<value> <string>instance = context.getAggregateRelatedValue()\n
\n
status = context.getSlapState()\n
if instance is not None:\n
host_sub = instance.getSpecialiseValue()\n
person = host_sub.getDestinationSectionValue(portal_type="Person")\n
\n
if status == "start_requested":\n
status = host_sub.getSlapState()\n
\n
if status == "start_requested":\n
state = "started"\n
elif status == "stop_requested":\n
elif status == "stop_requested":\n
state = "stopped"\n
elif status == "destroy_requested":\n
elif status == "destroy_requested":\n
state = "destroyed"\n
\n
person.requestSoftwareInstance(\n
person.requestSoftwareInstance(\n
state=state,\n
software_release=new_sr_url,\n
software_title=context.getTitle(),\n
software_type=context.getSourceReference(),\n
instance_xml=context.getTextContent(),\n
sla_xml=context.getSlaXml(),\n
shared=context.isRootSlave()\n
)\n
software_title=host_sub.getTitle(),\n
software_type=host_sub.getSourceReference(),\n
instance_xml=host_sub.getTextContent(),\n
sla_xml=host_sub.getSlaXml(),\n
shared=host_sub.isRootSlave()\n
)\n
</string> </value>
</item>
<item>
......@@ -78,7 +82,7 @@ person.requestSoftwareInstance(\n
</item>
<item>
<key> <string>id</string> </key>
<value> <string>HostingSubscription_changeSoftwareRelease</string> </value>
<value> <string>ComputerPartition_changeHostingSubscriptionSoftwareRelease</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -56,18 +56,12 @@
old_sr_url = context.getUrlString()\n
portal = context.getPortalObject()\n
\n
hosting_sub_list = portal.portal_catalog.searchAndActivate(\n
portal_type = \'Hosting Subscription\',\n
default_specialise_related_uid = [x.getUid() for x in portal.portal_catalog(\n
portal_type = [\'Software Instance\', \'Slave Instance\'],\n
url_string = old_sr_url,\n
default_aggregate_uid = [y.getUid() for y in portal.portal_catalog(\n
portal.portal_catalog.searchAndActivate( \n
portal_type = \'Computer Partition\',\n
parent_reference = comp_reference_list,\n
free_for_request = 0\n
)]\n
)],\n
method_id = \'HostingSubscription_changeSoftwareRelease\',\n
free_for_request = 0,\n
software_release_url = old_sr_url,\n
method_id = \'ComputerPartition_changeHostingSubscriptionSoftwareRelease\',\n
method_kw = {\'new_sr_url\': new_sr_url},\n
activate_kw = {\'tag\': tag}\n
)\n
......
......@@ -100,34 +100,6 @@ class TestSlapOSMassUpgrade(testSlapOSMixin):
return software_release
def _makeSoftwareInstallations(self, new_id):
self.start_requested_software_installation = self.portal\
.software_installation_module.template_software_installation\
.Base_createCloneDocument(batch_mode=1)
self.start_requested_software_installation.edit(
url_string=self.start_requested_software_release_url,
aggregate=self.computer.getRelativeUrl(),
reference='TESTSOFTINSTS-%s' % new_id,
title='Start requested for %s' % self.computer.getUid()
)
self.start_requested_software_installation.validate()
self.start_requested_software_installation.requestStart()
self.destroy_requested_software_installation = self.portal\
.software_installation_module.template_software_installation\
.Base_createCloneDocument(batch_mode=1)
self.destroy_requested_software_installation.edit(
url_string=self.destroy_requested_software_release_url,
aggregate=self.computer.getRelativeUrl(),
reference='TESTSOFTINSTD-%s' % new_id,
title='Destroy requested for %s' % self.computer\
.getUid()
)
self.destroy_requested_software_installation.validate()
self.destroy_requested_software_installation.requestStart()
def _makeHostingSubscription(self, new_id):
hosting_subscription = self.portal\
.hosting_subscription_module.template_hosting_subscription\
......@@ -154,24 +126,29 @@ class TestSlapOSMassUpgrade(testSlapOSMixin):
hosting_subscription.requestStart(**kw)
hosting_subscription.requestInstance(**kw)
def test_SoftwareRelease_getUsage(self):
def test_SoftwareRelease_getUsage_no_instance(self):
software_release = self._makeSoftwareRelease(self.new_id)
self.assertEqual(0,software_release.SoftwareRelease_getUsage())
def test_SoftwareRelease_getUsage_with_instance(self):
software_release = self._makeSoftwareRelease(self.new_id)
hosting_subscription = self._makeHostingSubscription(self.new_id)
self._makeSoftwareInstance(hosting_subscription,
software_release.getUrlString())
self.tic()
self.assertEqual(1,software_release.SoftwareRelease_getUsage())
def test_HostingSubscription_changeSoftwareRelease(self):
def test_ComputerPartition_changeHostingSubscriptionSoftwareRelease(self):
computer = self._makeComputer(self.new_id)
self._makeComputerPartitions(computer)
person = self._makePerson(self.new_id)
host_sub = self._makeHostingSubscription(self.new_id)
host_sub.edit(destination_section_value = person.getRelativeUrl())
old_sr = self.generateNewSoftwareReleaseUrl()
self._makeSoftwareInstance(host_sub,old_sr)
instance = host_sub.getPredecessorValue()
instance.edit(aggregate_value = computer.partition1.getRelativeUrl())
self.tic()
# Check setup
......@@ -181,14 +158,38 @@ class TestSlapOSMassUpgrade(testSlapOSMixin):
new_sr = self.generateNewSoftwareReleaseUrl()
host_sub.HostingSubscription_changeSoftwareRelease(new_sr)
computer.partition1.ComputerPartition_changeHostingSubscriptionSoftwareRelease(new_sr)
# Check that url_string change, but slap state doesn't
self.assertEqual(new_sr,host_sub.getUrlString())
self.assertEqual(slap_state,host_sub.getSlapState())
def _simulateHostingSubscription_changeSoftwareRelease(self):
script_name = 'HostingSubscription_changeSoftwareRelease'
def test_ComputerPartition_changeHostingSubscriptionSoftwareRelease_instance_unallocated(self):
computer = self._makeComputer(self.new_id)
self._makeComputerPartitions(computer)
person = self._makePerson(self.new_id)
host_sub = self._makeHostingSubscription(self.new_id)
host_sub.edit(destination_section_value = person.getRelativeUrl())
old_sr = self.generateNewSoftwareReleaseUrl()
self._makeSoftwareInstance(host_sub,old_sr)
self.tic()
# Check setup
self.assertEqual(old_sr, host_sub.getUrlString())
slap_state = host_sub.getSlapState()
new_sr = self.generateNewSoftwareReleaseUrl()
computer.partition1.ComputerPartition_changeHostingSubscriptionSoftwareRelease(new_sr)
# Check that nothing change
self.assertEqual(old_sr,host_sub.getUrlString())
self.assertEqual(slap_state,host_sub.getSlapState())
def _simulateComputerPartition_changeHostingSubscriptionSoftwareRelease(self):
script_name = 'ComputerPartition_changeHostingSubscriptionSoftwareRelease'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
......@@ -196,30 +197,78 @@ class TestSlapOSMassUpgrade(testSlapOSMixin):
'*args, **kw',
'# Script body\n'
"""portal_workflow = context.portal_workflow
portal_workflow.doActionFor(context, action='edit_action', comment='Visited by HostingSubscription_changeSoftwareRelease') """ )
portal_workflow.doActionFor(context, action='edit_action', comment='Visited by ComputerPartition_changeHostingSubscriptionSoftwareRelease') """ )
transaction.commit()
def _dropHostingSubscription_changeSoftwareRelease(self):
script_name = 'HostingSubscription_changeSoftwareRelease'
def _dropComputerPartition_changeHostingSubscriptionSoftwareRelease(self):
script_name = 'ComputerPartition_changeHostingSubscriptionSoftwareRelease'
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
def test_SoftwareRelease_requestInstanceUpgrade_with_comp(self):
def test_SoftwareRelease_requestInstanceUpgrade(self):
computer = self._makeComputer(self.new_id)
self._makeComputerPartitions(computer)
old_software_release = self._makeSoftwareRelease(self.new_id)
hosting_subscription = self._makeHostingSubscription(self.new_id)
self._makeSoftwareInstance(hosting_subscription,
old_software_release.getUrlString())
host_sub = self._makeHostingSubscription(self.new_id)
self._makeSoftwareInstance(host_sub, old_software_release.getUrlString())
instance = host_sub.getPredecessorValue()
instance.edit(aggregate_value = computer.partition1.getRelativeUrl())
computer.partition1.markBusy()
self.tic()
comp_reference_list = [computer.getReference()]
self._simulateComputerPartition_changeHostingSubscriptionSoftwareRelease()
try:
old_software_release.SoftwareRelease_requestInstanceUpgrade(
self.generateNewSoftwareReleaseUrl(),
comp_reference_list,
"Test SoftwareRelease_requestInstanceUpgrade %s" % self.new_id
)
self.tic()
finally:
self._dropComputerPartition_changeHostingSubscriptionSoftwareRelease()
self.assertEqual('Visited by ComputerPartition_changeHostingSubscriptionSoftwareRelease',
computer.partition1.workflow_history['edit_workflow'][-1]['comment'])
instance = hosting_subscription.getPredecessorValue()
def test_SoftwareRelease_requestInstanceUpgrade_partition_not_marked(self):
computer = self._makeComputer(self.new_id)
self._makeComputerPartitions(computer)
old_software_release = self._makeSoftwareRelease(self.new_id)
host_sub = self._makeHostingSubscription(self.new_id)
self._makeSoftwareInstance(host_sub, old_software_release.getUrlString())
instance = host_sub.getPredecessorValue()
instance.edit(aggregate_value = computer.partition1.getRelativeUrl())
self.tic()
comp_reference_list = [computer.getReference()]
self._simulateComputerPartition_changeHostingSubscriptionSoftwareRelease()
try:
old_software_release.SoftwareRelease_requestInstanceUpgrade(
self.generateNewSoftwareReleaseUrl(),
comp_reference_list,
"Test SoftwareRelease_requestInstanceUpgrade %s" % self.new_id
)
self.tic()
finally:
self._dropComputerPartition_changeHostingSubscriptionSoftwareRelease()
self.assertNotEqual('Visited by ComputerPartition_changeHostingSubscriptionSoftwareRelease',
computer.partition1.workflow_history['edit_workflow'][-1]['comment'])
def test_SoftwareRelease_requestInstanceUpgrade_no_instance(self):
computer = self._makeComputer(self.new_id)
self._makeComputerPartitions(computer)
old_software_release = self._makeSoftwareRelease(self.new_id)
computer.partition1.markBusy()
self.tic()
comp_reference_list = [computer.getReference()]
self._simulateHostingSubscription_changeSoftwareRelease()
self._simulateComputerPartition_changeHostingSubscriptionSoftwareRelease()
try:
old_software_release.SoftwareRelease_requestInstanceUpgrade(
self.generateNewSoftwareReleaseUrl(),
......@@ -228,10 +277,42 @@ portal_workflow.doActionFor(context, action='edit_action', comment='Visited by H
)
self.tic()
finally:
self._dropHostingSubscription_changeSoftwareRelease()
self._dropComputerPartition_changeHostingSubscriptionSoftwareRelease()
self.assertNotEqual('Visited by ComputerPartition_changeHostingSubscriptionSoftwareRelease',
computer.partition1.workflow_history['edit_workflow'][-1]['comment'])
def test_SoftwareRelease_requestInstanceUpgrade_no_computer(self):
old_software_release = self._makeSoftwareRelease(self.new_id)
self.assertEqual('Visited by HostingSubscription_changeSoftwareRelease',
hosting_subscription.workflow_history['edit_workflow'][-1]['comment'])
comp_reference_list = []
self._simulateComputerPartition_changeHostingSubscriptionSoftwareRelease()
try:
self.assertRaises(ValueError,old_software_release.SoftwareRelease_requestInstanceUpgrade,
self.generateNewSoftwareReleaseUrl(),
comp_reference_list,
"Test SoftwareRelease_requestInstanceUpgrade %s" % self.new_id
)
finally:
self._dropComputerPartition_changeHostingSubscriptionSoftwareRelease()
def test_SoftwareRelease_requestInstanceUpgrade_no_new_sr(self):
old_software_release = self._makeSoftwareRelease(self.new_id)
computer = self._makeComputer(self.new_id)
self.tic()
comp_reference_list = [computer.getReference()]
self._simulateComputerPartition_changeHostingSubscriptionSoftwareRelease()
try:
self.assertRaises(ValueError,old_software_release.SoftwareRelease_requestInstanceUpgrade,
"",
comp_reference_list,
"Test SoftwareRelease_requestInstanceUpgrade %s" % self.new_id
)
finally:
self._dropComputerPartition_changeHostingSubscriptionSoftwareRelease()
def _simulateSoftwareRelease_requestInstanceUpgrade(self):
script_name = 'SoftwareRelease_requestInstanceUpgrade'
......@@ -251,7 +332,7 @@ portal_workflow.doActionFor(context,action='edit_action', comment='Visited by So
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
def test_SoftwareProduct_launchMassUpgrade_with_action_and_comp(self):
def test_SoftwareProduct_launchMassUpgrade(self):
computer = self._makeComputer(self.new_id)
software_product = self._makeSoftwareProduct(self.new_id)
software_release1 = self._makeSoftwareRelease(self.new_id)
......@@ -278,3 +359,57 @@ portal_workflow.doActionFor(context,action='edit_action', comment='Visited by So
self.assertEqual('Visited by SoftwareRelease_requestInstanceUpgrade',
software_release1.workflow_history['edit_workflow'][-1]['comment'])
def test_SoftwareProduct_launchMassUpgrade_no_computer(self):
software_product = self._makeSoftwareProduct(self.new_id)
software_release1 = self._makeSoftwareRelease(self.new_id)
software_release2 = self._makeSoftwareRelease(self.new_id)
self.tic()
selection_name = 'mass_upgrade_selection'
listbox = [
{
'listbox_key':software_release1.getUrl(),
'computer_filter':[],
'workflow_action':software_release2.getUrl()
}
]
self._simulateSoftwareRelease_requestInstanceUpgrade()
try:
software_product.SoftwareProduct_launchMassUpgrade(
listbox,
selection_name
)
self.tic()
finally:
self._dropSoftwareRelease_requestInstanceUpgrade()
self.assertEqual('Visited by SoftwareRelease_requestInstanceUpgrade',
software_release1.workflow_history['edit_workflow'][-1]['comment'])
def test_SoftwareProduct_launchMassUpgrade_no_new_software_release(self):
computer = self._makeComputer(self.new_id)
software_product = self._makeSoftwareProduct(self.new_id)
software_release1 = self._makeSoftwareRelease(self.new_id)
self.tic()
selection_name = 'mass_upgrade_selection'
listbox = [
{
'listbox_key':software_release1.getUrl(),
'computer_filter':[computer.getReference()],
'workflow_action':""
}
]
self._simulateSoftwareRelease_requestInstanceUpgrade()
try:
software_product.SoftwareProduct_launchMassUpgrade(
listbox,
selection_name
)
self.tic()
finally:
self._dropSoftwareRelease_requestInstanceUpgrade()
self.assertNotEqual('Visited by SoftwareRelease_requestInstanceUpgrade',
software_release1.workflow_history['edit_workflow'][-1]['comment'])
8
\ No newline at end of file
9
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment