Commit 429a7ab9 authored by Alain Takoudjou's avatar Alain Takoudjou

Restore testSlapOSPDMSkins after BusinessTemplate Update

parent 776e433d
......@@ -27,10 +27,29 @@
##############################################################################
import transaction
from functools import wraps
from Products.SlapOS.tests.testSlapOSMixin import testSlapOSMixin
from Products.ERP5Type.tests.utils import createZODBPythonScript
from DateTime import DateTime
def simulate(script_id, params_string, code_string):
def upperWrap(f):
@wraps(f)
def decorated(self, *args, **kw):
if script_id in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_id)
createZODBPythonScript(self.portal.portal_skins.custom,
script_id, params_string, code_string)
try:
result = f(self, *args, **kw)
finally:
if script_id in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_id)
transaction.commit()
return result
return decorated
return upperWrap
class TestSlapOSPDMSkins(testSlapOSMixin):
def afterSetUp(self):
super(TestSlapOSPDMSkins, self).afterSetUp()
......@@ -43,6 +62,14 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
shared=False,
state="started"
)
def beforeTearDown(self):
id_list = []
for upgrade_decision in self.portal.portal_catalog(
portal_type="Upgrade Decision", reference="UD-TEST%"):
id_list.append(upgrade_decision.getId())
self.portal.upgrade_decision_module.manage_delObjects(id_list)
self.tic()
def generateNewId(self):
return "%sTEST" % self.portal.portal_ids.generateNewId(
......@@ -250,7 +277,6 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
self.assertEquals(computer.getRelativeUrl(),
found_computer.getRelativeUrl())
def testUpgradeDecision_getComputer_2_computer(self):
computer = self._makeComputer(self.new_id)
upgrade_decision = self._makeUpgradeDecision()
......@@ -270,17 +296,6 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
found_computer = upgrade_decision.UpgradeDecision_getComputer()
self.assertEquals(None, found_computer)
def testUpgradeDecision_getSoftwareRelease(self):
software_release = self._makeSoftwareRelease(self.new_id)
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision_line = self._makeUpgradeDecisionLine(upgrade_decision)
upgrade_decision_line.setAggregateValue(software_release)
found_software_release = upgrade_decision.UpgradeDecision_getSoftwareRelease()
self.assertEquals(software_release.getRelativeUrl(),
found_software_release.getRelativeUrl())
def testUpgradeDecision_getHostingSubscription(self):
hosting_subscription = self._makeHostingSubscription(self.new_id)
......@@ -408,6 +423,43 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
self.assertEqual(slap_state, hosting_subscription.getSlapState())
self.assertEqual('stopped', upgrade_decision.getSimulationState())
def testUpgradeDecision_processUpgradeeHostingSubscription(self):
person = self._makePerson(self.new_id)
hosting_subscription = self._makeHostingSubscription(self.new_id)
hosting_subscription.edit(
destination_section_value = person.getRelativeUrl())
self._makeSoftwareInstance(hosting_subscription,
hosting_subscription.getUrlString())
software_release = self._makeSoftwareRelease(self.new_id)
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision_line = self._makeUpgradeDecisionLine(upgrade_decision)
upgrade_decision_line.setAggregateValueList(
[software_release, hosting_subscription])
self.tic()
slap_state = hosting_subscription.getSlapState()
self.assertFalse(upgrade_decision.UpgradeDecision_processUpgrade())
self.assertNotEqual(software_release.getUrlString(),
hosting_subscription.getUrlString())
upgrade_decision.confirm()
upgrade_decision.start()
# Check that url_string change, but slap state doesn't
self.assertNotEqual(software_release.getUrlString(),
hosting_subscription.getUrlString())
self.assertTrue(upgrade_decision.UpgradeDecision_processUpgrade())
self.assertEqual(software_release.getUrlString(),
hosting_subscription.getUrlString())
self.assertEqual(slap_state, hosting_subscription.getSlapState())
self.assertEqual('stopped', upgrade_decision.getSimulationState())
def testUpgradeDecision_upgradeHostingSubscription_no_software_release(self):
person = self._makePerson(self.new_id)
......@@ -503,6 +555,33 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
self.assertEqual('stopped', upgrade_decision.getSimulationState())
def testUpgradeDecision_processUpgradeComputer(self):
person = self._makePerson(self.new_id)
computer = self._makeComputer(self.new_id)
software_release = self._makeSoftwareRelease(self.new_id)
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision_line = self._makeUpgradeDecisionLine(upgrade_decision)
upgrade_decision_line.setAggregateValueList([software_release, computer])
url = software_release.getUrlString()
self.tic()
self.assertFalse(upgrade_decision.UpgradeDecision_processUpgrade())
upgrade_decision.confirm()
upgrade_decision.start()
self.assertTrue(upgrade_decision.UpgradeDecision_processUpgrade())
self.tic()
software_installation = computer.getAggregateRelatedValue(
portal_type='Software Installation')
self.assertEqual('start_requested', software_installation.getSlapState())
self.assertEqual(url, software_installation.getUrlString())
self.assertEqual('validated', software_installation.getValidationState())
self.assertEqual('stopped', upgrade_decision.getSimulationState())
def testSoftwareRelease_createUpgradeDecision_computer(self):
person = self._makePerson(self.new_id)
computer = self._makeComputer(self.new_id)
......@@ -910,5 +989,344 @@ class TestSlapOSPDMSkins(testSlapOSMixin):
self.assertEqual(release.getUrlString(),
software_release3.getUrlString())
def testBase_acceptUpgradeDecision_no_reference(self):
upgrade_decision = self._makeUpgradeDecision()
self.assertRaises(ValueError, self.portal.Base_acceptUpgradeDecision, None)
def testBase_acceptUpgradeDecision_duplicated_reference(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTBADREFERENCE")
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTBADREFERENCE")
self.tic()
self.assertRaises(ValueError, self.portal.Base_acceptUpgradeDecision, None)
def testBase_acceptUpgradeDecision_no_upgrade_decision(self):
redirect_url = self.portal.Base_acceptUpgradeDecision('UD-UNEXISTING')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Unable%20to%20find%20the%20Upgrade%20Decision."),
"%s contains the wrong message" % redirect_url)
def testBase_acceptUpgradeDecision_draft_upgrade_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTDRAFT")
self.tic()
redirect_url = self.portal.Base_acceptUpgradeDecision('UD-TESTDRAFT')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Sorry%2C%20the%20upgrade%20is%20not%20possible%20yet%21"),
"%s contains the wrong message" % redirect_url)
def testBase_acceptUpgradeDecision_planned_upgrade_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTPLANNED")
upgrade_decision.plan()
self.tic()
redirect_url = self.portal.Base_acceptUpgradeDecision('UD-TESTPLANNED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Sorry%2C%20the%20upgrade%20is%20not%20possible%20yet%21"),
"%s contains the wrong message" % redirect_url)
def testBase_acceptUpgradeDecision_confirmed_upgrade_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTCONFIRMED")
upgrade_decision.confirm()
self.tic()
redirect_url = self.portal.Base_acceptUpgradeDecision('UD-TESTCONFIRMED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=This%20Upgrade%20Decision%20has%20been%20"\
"requested%2C%20it%20will%20be%20processed%20in%20few%20minutes."),
"%s contains the wrong message" % redirect_url)
self.assertEquals(upgrade_decision.getSimulationState(), 'started')
def testBase_acceptUpgradeDecision_started_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTSTARTED")
upgrade_decision.start()
self.tic()
redirect_url = self.portal.Base_acceptUpgradeDecision('UD-TESTSTARTED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=This%20Upgrade%20Decision%20is%20already%20Started."),
"%s contains the wrong message" % redirect_url)
def testBase_acceptUpgradeDecision_stop_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTSTOP")
upgrade_decision.start()
upgrade_decision.stop()
self.tic()
redirect_url = self.portal.Base_acceptUpgradeDecision('UD-TESTSTOP')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=This%20Upgrade%20Decision%20has%20been%20already%20processed."),
"%s contains the wrong message" % redirect_url)
def testBase_acceptUpgradeDecision_delivered_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTDELIVERED")
upgrade_decision.start()
upgrade_decision.stop()
upgrade_decision.deliver()
self.tic()
redirect_url = self.portal.Base_acceptUpgradeDecision('UD-TESTDELIVERED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=This%20Upgrade%20Decision%20has%20been%20already%20processed."),
"%s contains the wrong message" % redirect_url)
def testBase_acceptUpgradeDecision_cancelled_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTCANCELLED")
upgrade_decision.cancel()
self.tic()
redirect_url = self.portal.Base_acceptUpgradeDecision('UD-TESTCANCELLED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Sorry%2C%20the%20upgrade%20is%20not%20possble%2C%20Upgrade%20Decision%20was%20Canceled%20or%20Rejected%21"),
"%s contains the wrong message" % redirect_url)
def testBase_acceptUpgradeDecision_rejected_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTREJECT")
upgrade_decision.cancel()
self.tic()
redirect_url = self.portal.Base_acceptUpgradeDecision('UD-TESTREJECT')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Sorry%2C%20the%20upgrade%20is%20not%20possble%2C%20Upgrade%20Decision%20was%20Canceled%20or%20Rejected%21"),
"%s contains the wrong message" % redirect_url)
def testBase_rejectUpgradeDecision_no_reference(self):
upgrade_decision = self._makeUpgradeDecision()
self.assertRaises(ValueError, self.portal.Base_rejectUpgradeDecision, None)
def testBase_rejectUpgradeDecision_duplicated_reference(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTBADREFERENCE")
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTBADREFERENCE")
self.tic()
self.assertRaises(ValueError, self.portal.Base_acceptUpgradeDecision, None)
def testBase_rejectUpgradeDecision_no_upgrade_decision(self):
redirect_url = self.portal.Base_rejectUpgradeDecision('UD-UNEXISTING')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Unable%20to%20find%20the%20Upgrade%20Decision."),
"%s contains the wrong message" % redirect_url)
def testBase_rejectUpgradeDecision_draft_upgrade_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTDRAFT")
self.tic()
redirect_url = self.portal.Base_rejectUpgradeDecision('UD-TESTDRAFT')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Thanks%20Upgrade%20Decision%20has%20been"\
"%20rejected%20Successfully%20%28You%20cannot%20use%20it%20anymore%29."),
"%s contains the wrong message" % redirect_url)
self.assertEquals(upgrade_decision.getSimulationState(), 'rejected')
def testBase_rejectUpgradeDecision_planned_upgrade_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTPLANNED")
upgrade_decision.plan()
self.tic()
redirect_url = self.portal.Base_rejectUpgradeDecision('UD-TESTPLANNED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Thanks%20Upgrade%20Decision%20has%20been"\
"%20rejected%20Successfully%20%28You%20cannot%20use%20it%20anymore%29."),
"%s contains the wrong message" % redirect_url)
self.assertEquals(upgrade_decision.getSimulationState(), 'rejected')
def testBase_rejectUpgradeDecision_confirmed_upgrade_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTCONFIRMED")
upgrade_decision.confirm()
self.tic()
redirect_url = self.portal.Base_rejectUpgradeDecision('UD-TESTCONFIRMED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Thanks%20Upgrade%20Decision%20has%20been"\
"%20rejected%20Successfully%20%28You%20cannot%20use%20it%20anymore%29."),
"%s contains the wrong message" % redirect_url)
self.assertEquals(upgrade_decision.getSimulationState(), 'rejected')
def testBase_rejectUpgradeDecision_started_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTSTARTED")
upgrade_decision.start()
self.tic()
redirect_url = self.portal.Base_rejectUpgradeDecision('UD-TESTSTARTED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Sorry%2C%20This%20Upgrade%20Decision%20is%20"\
"already%20Started%2C%20you%20cannot%20reject%20it%20anymore."),
"%s contains the wrong message" % redirect_url)
def testBase_rejectUpgradeDecision_stop_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTSTOP")
upgrade_decision.start()
upgrade_decision.stop()
self.tic()
redirect_url = self.portal.Base_rejectUpgradeDecision('UD-TESTSTOP')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Sorry%2C%20this%20Upgrade%20Decision%20has%20been%20already%20processed."),
"%s contains the wrong message" % redirect_url)
def testBase_rejectUpgradeDecision_delivered_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTDELIVERED")
upgrade_decision.start()
upgrade_decision.stop()
upgrade_decision.deliver()
self.tic()
redirect_url = self.portal.Base_rejectUpgradeDecision('UD-TESTDELIVERED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Sorry%2C%20this%20Upgrade%20Decision%20has%20been%20already%20processed."),
"%s contains the wrong message" % redirect_url)
def testBase_rejectUpgradeDecision_cancelled_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTCANCELLED")
upgrade_decision.cancel()
self.tic()
redirect_url = self.portal.Base_rejectUpgradeDecision('UD-TESTCANCELLED')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Upgrade%20Decision%20is%20already%20Rejected%21"),
"%s contains the wrong message" % redirect_url)
def testBase_rejectUpgradeDecision_reject_decision(self):
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.setReference("UD-TESTREJECT")
upgrade_decision.reject()
self.tic()
redirect_url = self.portal.Base_rejectUpgradeDecision('UD-TESTREJECT')
self.assertTrue(redirect_url.endswith(
"?portal_status_message=Upgrade%20Decision%20is%20already%20Rejected%21"),
"%s contains the wrong message" % redirect_url)
@simulate('NotificationTool_getDocumentValue',
'reference=None',
'assert reference == "slapos-upgrade-computer.notification"\n' \
'return context.restrictedTraverse(' \
'context.REQUEST["testUpgradeDecision_notify_computer"])')
def testUpgradeDecision_notify_computer(self):
person = self._makePerson(self.new_id)
computer = self._makeComputer(self.new_id)
software_release = self._makeSoftwareRelease(self.new_id)
software_product = self._makeSoftwareProduct(self.new_id)
software_release.setAggregateValue(software_product)
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.edit(destination_decision_value=person)
upgrade_decision_line = self._makeUpgradeDecisionLine(upgrade_decision)
upgrade_decision_line.setAggregateValueList([software_release, computer])
notification_message = self.portal.notification_message_module.newContent(
portal_type="Notification Message",
title='Test NM title %s' % self.new_id,
text_content_substitution_mapping_method_id=
"NotificationMessage_getSubstitutionMappingDictFromArgument",
text_content="""${software_product_title}
${computer_title}
${computer_reference}
${software_release_name}
${software_release_reference}
${upgrade_accept_link}
${upgrade_reject_link}
${new_software_release_url}""",
content_type='text/html',
)
self.portal.REQUEST\
['testUpgradeDecision_notify_computer'] = \
notification_message.getRelativeUrl()
self.tic()
self.assertEquals(None, upgrade_decision.UpgradeDecision_notify())
upgrade_decision.plan()
self.tic()
self.assertEquals(None, upgrade_decision.UpgradeDecision_notify())
self.tic()
self.assertEquals(upgrade_decision.getSimulationState(), 'confirmed')
self.assertEquals(len(upgrade_decision.getFollowUpRelatedValueList()), 1)
event = upgrade_decision.getFollowUpRelatedValue()
self.assertEquals(event.getTitle(),
"New Software available for Installation at %s" % computer.getTitle())
self.assertEqual(event.getTextContent().splitlines(),
[software_product.getTitle(), computer.getTitle(), computer.getReference(),
software_release.getTitle(), software_release.getReference(),
'Base_acceptUpgradeDecision?reference=%s' % upgrade_decision.getReference(),
'Base_rejectUpgradeDecision?reference=%s' % upgrade_decision.getReference(),
software_release.getUrlString()])
self.assertEquals(event.getSimulationState(), "delivered")
@simulate('NotificationTool_getDocumentValue',
'reference=None',
'assert reference == "slapos-upgrade-hosting-subscription.notification"\n' \
'return context.restrictedTraverse(' \
'context.REQUEST["testUpgradeDecision_notify_hosting_subscription"])')
def testUpgradeDecision_notify_hosting_subscription(self):
person = self._makePerson(self.new_id)
hosting_subscription = self._makeHostingSubscription(self.new_id)
software_release = self._makeSoftwareRelease(self.new_id)
software_product = self._makeSoftwareProduct(self.new_id)
software_release.setAggregateValue(software_product)
upgrade_decision = self._makeUpgradeDecision()
upgrade_decision.edit(destination_decision_value=person)
upgrade_decision_line = self._makeUpgradeDecisionLine(upgrade_decision)
upgrade_decision_line.setAggregateValueList([software_release,
hosting_subscription])
old_url = hosting_subscription.getUrlString()
notification_message = self.portal.notification_message_module.newContent(
portal_type="Notification Message",
title='Test NM title %s' % self.new_id,
text_content_substitution_mapping_method_id=
"NotificationMessage_getSubstitutionMappingDictFromArgument",
text_content="""${software_product_title}
${hosting_subscription_title}
${old_software_release_url}
${software_release_name}
${software_release_reference}
${upgrade_accept_link}
${upgrade_reject_link}
${new_software_release_url}""",
content_type='text/html',
)
self.portal.REQUEST\
['testUpgradeDecision_notify_hosting_subscription'] = \
notification_message.getRelativeUrl()
self.tic()
self.assertEquals(None, upgrade_decision.UpgradeDecision_notify())
upgrade_decision.plan()
self.tic()
self.assertEquals(None, upgrade_decision.UpgradeDecision_notify())
self.tic()
self.assertEquals(upgrade_decision.getSimulationState(), 'confirmed')
self.assertEquals(len(upgrade_decision.getFollowUpRelatedValueList()), 1)
event = upgrade_decision.getFollowUpRelatedValue()
self.assertEquals(event.getTitle(),
"New Upgrade available for %s" % hosting_subscription.getTitle())
self.assertEqual(event.getTextContent().splitlines(),
[software_product.getTitle(), hosting_subscription.getTitle(),
old_url, software_release.getTitle(), software_release.getReference(),
'Base_acceptUpgradeDecision?reference=%s' % upgrade_decision.getReference(),
'Base_rejectUpgradeDecision?reference=%s' % upgrade_decision.getReference(),
software_release.getUrlString()])
self.assertEquals(event.getSimulationState(), "delivered")
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment