Commit 89a6f1c2 authored by Łukasz Nowak's avatar Łukasz Nowak

Merge tests per functionality.

parent a35da019
# Copyright (c) 2002-2012 Nexedi SA and Contributors. All Rights Reserved.
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
class TestSlapOSCoreComputerPartitionSlapInterfaceWorkflow(testSlapOSMixin):
def afterSetUp(self):
super(TestSlapOSCoreComputerPartitionSlapInterfaceWorkflow, self).afterSetUp()
# Clone computer document
self.computer = self.portal.computer_module.template_computer\
.Base_createCloneDocument(batch_mode=1)
new_id = self.generateNewId()
self.computer.edit(
title="computer %s" % (new_id, ),
reference="TESTCOMP-%s" % (new_id, ),
allocation_scope='open/personal',
capacity_scope='open',
)
self.computer.validate()
# install an software release
self.software_installation = self.portal.software_installation_module\
.newContent(portal_type='Software Installation',
url_string=self.generateNewSoftwareReleaseUrl(),
aggregate=self.computer.getRelativeUrl())
self.software_installation.validate()
self.software_installation.requestStart()
self.tic()
def test_markFree(self):
self.login(self.computer.getReference())
partition = self.computer.newContent(portal_type='Computer Partition',
reference='PART-%s' % self.generateNewId())
partition.validate()
partition.markFree()
self.tic()
self.assertEqual(1, self.portal.portal_catalog.countResults(
parent_uid=self.computer.getUid(), free_for_request=1)[0][0])
def test_markFree_markBusy(self):
self.login(self.computer.getReference())
partition = self.computer.newContent(portal_type='Computer Partition',
reference='PART-%s' % self.generateNewId())
partition.validate()
partition.markFree()
self.tic()
self.assertEqual(1, self.portal.portal_catalog.countResults(
parent_uid=self.computer.getUid(), free_for_request=1)[0][0])
partition.markBusy()
self.tic()
self.assertEqual(0, self.portal.portal_catalog.countResults(
parent_uid=self.computer.getUid(), free_for_request=1)[0][0])
def test_markFree_markBusy_markFree(self):
self.login(self.computer.getReference())
partition = self.computer.newContent(portal_type='Computer Partition',
reference='PART-%s' % self.generateNewId())
partition.validate()
partition.markFree()
self.tic()
self.assertEqual(1, self.portal.portal_catalog.countResults(
parent_uid=self.computer.getUid(), free_for_request=1)[0][0])
partition.markBusy()
self.tic()
self.assertEqual(0, self.portal.portal_catalog.countResults(
parent_uid=self.computer.getUid(), free_for_request=1)[0][0])
partition.markFree()
self.tic()
self.assertEqual(1, self.portal.portal_catalog.countResults(
parent_uid=self.computer.getUid(), free_for_request=1)[0][0])
def test_markInactive(self):
self.login(self.computer.getReference())
partition = self.computer.newContent(portal_type='Computer Partition',
reference='PART-%s' % self.generateNewId())
partition.validate()
partition.markInactive()
self.tic()
self.assertEqual(0, self.portal.portal_catalog.countResults(
parent_uid=self.computer.getUid(), free_for_request=1)[0][0])
def test_markInactive_markFree(self):
self.login(self.computer.getReference())
partition = self.computer.newContent(portal_type='Computer Partition',
reference='PART-%s' % self.generateNewId())
partition.validate()
partition.markInactive()
self.tic()
self.assertEqual(0, self.portal.portal_catalog.countResults(
parent_uid=self.computer.getUid(), free_for_request=1)[0][0])
partition.markFree()
self.tic()
self.assertEqual(1, self.portal.portal_catalog.countResults(
parent_uid=self.computer.getUid(), free_for_request=1)[0][0])
# Copyright (c) 2002-2012 Nexedi SA and Contributors. All Rights Reserved.
import transaction
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
from Products.ERP5Type.tests.utils import createZODBPythonScript
class TestSlapOSCorePromiseSlapOSModuleIdGeneratorAlarm(testSlapOSMixin):
def test_Module_assertIdGenerator(self):
module = self.portal.newContent(portal_type='Person Module',
id=str(self.generateNewId()),
id_generator='bad_id_generator')
self.assertEqual('bad_id_generator', module.getIdGenerator())
# check positive response
self.assertTrue(module.Module_assertIdGenerator('bad_id_generator', False))
self.assertEqual('bad_id_generator', module.getIdGenerator())
self.assertTrue(module.Module_assertIdGenerator('bad_id_generator', True))
self.assertEqual('bad_id_generator', module.getIdGenerator())
# check negative response and that no-op run does not modify
self.assertFalse(module.Module_assertIdGenerator('good_id_generator', False))
self.assertEqual('bad_id_generator', module.getIdGenerator())
# check negative response with fixit request
self.assertFalse(module.Module_assertIdGenerator('good_id_generator', True))
self.assertEqual('good_id_generator', module.getIdGenerator())
self.assertTrue(module.Module_assertIdGenerator('good_id_generator', False))
self.assertEqual('good_id_generator', module.getIdGenerator())
transaction.abort()
def _simulateModule_assertIdGenerator(self):
script_name = 'Module_assertIdGenerator'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
script_name,
'id_generator, fixit, active_process=None',
'# Script body\n'
"""from Products.CMFActivity.ActiveResult import ActiveResult
active_result = ActiveResult()
active_result.edit(
summary='Module_assertIdGenerator simulation',
severity=0,
detail=context.getRelativeUrl())
active_process.postResult(active_result)
""" )
transaction.commit()
def _dropModule_assertIdGenerator(self):
script_name = 'Module_assertIdGenerator'
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
def test_alarm(self):
alarm = self.portal.portal_alarms.promise_slapos_module_id_generator
previous_active_process = self.portal.portal_catalog.getResultValue(
portal_type='Active Process',
causality_uid=alarm.getUid(),
sort_on=(('creation_date', 'DESC'),)
)
self._simulateModule_assertIdGenerator()
try:
alarm.activeSense()
self.tic()
finally:
self._dropModule_assertIdGenerator()
active_process = self.portal.portal_catalog.getResultValue(
portal_type='Active Process',
causality_uid=alarm.getUid(),
sort_on=(('creation_date', 'DESC'),)
)
self.assertNotEqual(previous_active_process.getPath(),
active_process.getPath())
visited_list = sorted([q.detail for q in active_process.getResultList() \
if q.summary == 'Module_assertIdGenerator simulation'])
expected_list = sorted([
'account_module',
'accounting_module',
'bug_module',
'business_configuration_module',
'business_process_module',
'campaign_module',
'component_module',
'computer_model_module',
'computer_module',
'computer_network_module',
'credential_recovery_module',
'credential_request_module',
'credential_update_module',
'currency_module',
'data_set_module',
'document_ingestion_module',
'document_module',
'event_module',
'external_source_module',
'glossary_module',
'hosting_subscription_module',
'image_module',
'internal_order_module',
'internal_packing_list_module',
'internal_supply_module',
'internal_trade_condition_module',
'inventory_module',
'item_module',
'knowledge_pad_module',
'meeting_module',
'notification_message_module',
'open_internal_order_module',
'open_purchase_order_module',
'open_sale_order_module',
'organisation_module',
'person_module',
'portal_activities',
'portal_simulation',
'product_module',
'purchase_order_module',
'purchase_packing_list_module',
'purchase_supply_module',
'purchase_trade_condition_module',
'quantity_unit_conversion_module',
'query_module',
'returned_purchase_packing_list_module',
'returned_sale_packing_list_module',
'sale_opportunity_module',
'sale_order_module',
'sale_packing_list_module',
'sale_supply_module',
'sale_trade_condition_module',
'service_module',
'service_report_module',
'software_installation_module',
'software_instance_module',
'software_licence_module',
'software_product_module',
'software_publication_module',
'software_release_module',
'support_request_module',
'transformation_module',
'web_page_module',
'web_site_module',
'workflow_module',
])
self.assertSameSet(expected_list, visited_list)
# Copyright (c) 2002-2012 Nexedi SA and Contributors. All Rights Reserved.
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
import transaction
from Products.ERP5Type.tests.utils import createZODBPythonScript
class TestSlapOSCoreSlapOSAssertHostingSubscriptionPredecessorAlarm(
testSlapOSMixin):
def afterSetUp(self):
super(TestSlapOSCoreSlapOSAssertHostingSubscriptionPredecessorAlarm,
self).afterSetUp()
self._makeTree()
def test_HostingSubscription_assertPredecessor(self):
self.software_instance.rename(new_name=self.generateNewSoftwareTitle())
self.tic()
# check that no interaction has recreated the instance
self.assertFalse(self.hosting_subscription.getTitle() in
self.hosting_subscription.getPredecessorTitleList())
self.hosting_subscription.HostingSubscription_assertPredecessor()
self.assertTrue(self.hosting_subscription.getTitle() in
self.hosting_subscription.getPredecessorTitleList())
def test_HostingSubscription_assertPredecessor_stop_requested(self):
self.software_instance.rename(new_name=self.generateNewSoftwareTitle())
self.portal.portal_workflow._jumpToStateFor(self.hosting_subscription,
'stop_requested')
self.tic()
# check that no interaction has recreated the instance
self.assertFalse(self.hosting_subscription.getTitle() in
self.hosting_subscription.getPredecessorTitleList())
self.hosting_subscription.HostingSubscription_assertPredecessor()
self.assertTrue(self.hosting_subscription.getTitle() in
self.hosting_subscription.getPredecessorTitleList())
def test_HostingSubscription_assertPredecessor_destroy_requested(self):
self.software_instance.rename(new_name=self.generateNewSoftwareTitle())
self.portal.portal_workflow._jumpToStateFor(self.hosting_subscription,
'destroy_requested')
self.tic()
# check that no interaction has recreated the instance
self.assertFalse(self.hosting_subscription.getTitle() in
self.hosting_subscription.getPredecessorTitleList())
self.hosting_subscription.HostingSubscription_assertPredecessor()
self.assertFalse(self.hosting_subscription.getTitle() in
self.hosting_subscription.getPredecessorTitleList())
def test_HostingSubscription_assertPredecessor_archived(self):
self.software_instance.rename(new_name=self.generateNewSoftwareTitle())
self.hosting_subscription.archive()
self.tic()
# check that no interaction has recreated the instance
self.assertFalse(self.hosting_subscription.getTitle() in
self.hosting_subscription.getPredecessorTitleList())
self.hosting_subscription.HostingSubscription_assertPredecessor()
self.assertFalse(self.hosting_subscription.getTitle() in
self.hosting_subscription.getPredecessorTitleList())
def _simulateHostingSubscription_assertPredecessor(self):
script_name = 'HostingSubscription_assertPredecessor'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
script_name,
'*args, **kwargs',
'# Script body\n'
"""portal_workflow = context.portal_workflow
portal_workflow.doActionFor(context, action='edit_action', comment='Visited by HostingSubscription_assertPredecessor') """ )
transaction.commit()
def _dropHostingSubscription_assertPredecessor(self):
script_name = 'HostingSubscription_assertPredecessor'
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
def test_alarm_renamed(self):
self.software_instance.edit(title=self.generateNewSoftwareTitle())
self.tic()
self._simulateHostingSubscription_assertPredecessor()
try:
self.portal.portal_alarms.slapos_assert_hosting_subscription_predecessor.activeSense()
self.tic()
finally:
self._dropHostingSubscription_assertPredecessor()
self.assertEqual(
'Visited by HostingSubscription_assertPredecessor',
self.hosting_subscription.workflow_history['edit_workflow'][-1]['comment'])
def test_alarm_not_renamed(self):
self._simulateHostingSubscription_assertPredecessor()
try:
self.portal.portal_alarms.slapos_assert_hosting_subscription_predecessor.activeSense()
self.tic()
finally:
self._dropHostingSubscription_assertPredecessor()
self.assertNotEqual(
'Visited by HostingSubscription_assertPredecessor',
self.hosting_subscription.workflow_history['edit_workflow'][-1]['comment'])
# Copyright (c) 2002-2012 Nexedi SA and Contributors. All Rights Reserved.
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
import transaction
from Products.ERP5Type.tests.utils import createZODBPythonScript
class TestSlapOSFreeComputerPartitionAlarm(testSlapOSMixin):
def afterSetUp(self):
super(TestSlapOSFreeComputerPartitionAlarm, self).afterSetUp()
self._makeTree()
def test_Instance_tryToUnallocatePartition(self):
self._makeComputer()
self.software_instance.setAggregate(self.partition.getRelativeUrl())
self.partition.markBusy()
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.tic()
self.software_instance.Instance_tryToUnallocatePartition()
self.tic()
self.assertEqual(None, self.software_instance.getAggregate())
self.assertEqual('free', self.partition.getSlapState())
def test_Instance_tryToUnallocatePartition_concurrency(self):
self._makeComputer()
self.software_instance.setAggregate(self.partition.getRelativeUrl())
self.partition.markBusy()
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.tic()
self.partition.activate(tag="allocate_%s" % self.partition.getRelativeUrl()\
).getId()
transaction.commit()
self.software_instance.Instance_tryToUnallocatePartition()
self.tic()
self.assertEqual(self.partition.getRelativeUrl(),
self.software_instance.getAggregate())
self.assertEqual('busy', self.partition.getSlapState())
def test_Instance_tryToUnallocatePartition_twoInstances(self):
software_instance = self.portal.software_instance_module\
.template_software_instance.Base_createCloneDocument(batch_mode=1)
self._makeComputer()
self.software_instance.setAggregate(self.partition.getRelativeUrl())
software_instance.setAggregate(self.partition.getRelativeUrl())
self.partition.markBusy()
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.tic()
self.software_instance.Instance_tryToUnallocatePartition()
self.tic()
self.assertEqual(None, self.software_instance.getAggregate())
self.assertEqual('busy', self.partition.getSlapState())
self.assertEqual(self.partition.getRelativeUrl(), software_instance.getAggregate())
def _simulateInstance_tryToUnallocatePartition(self):
script_name = 'Instance_tryToUnallocatePartition'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
script_name,
'*args, **kwargs',
'# Script body\n'
"""portal_workflow = context.portal_workflow
portal_workflow.doActionFor(context, action='edit_action', comment='Visited by Instance_tryToUnallocatePartition') """ )
transaction.commit()
def _dropInstance_tryToUnallocatePartition(self):
script_name = 'Instance_tryToUnallocatePartition'
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
def test_alarm_allocated(self):
self._makeComputer()
self.software_instance.setAggregate(self.partition.getRelativeUrl())
self.partition.markBusy()
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.software_instance.invalidate()
self.tic()
self._simulateInstance_tryToUnallocatePartition()
try:
self.portal.portal_alarms.slapos_free_computer_partition.activeSense()
self.tic()
finally:
self._dropInstance_tryToUnallocatePartition()
self.assertEqual(
'Visited by Instance_tryToUnallocatePartition',
self.software_instance.workflow_history['edit_workflow'][-1]['comment'])
def test_alarm_unallocated(self):
self._makeComputer()
self.partition.markBusy()
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.software_instance.invalidate()
self.tic()
self._simulateInstance_tryToUnallocatePartition()
try:
self.portal.portal_alarms.slapos_free_computer_partition.activeSense()
self.tic()
finally:
self._dropInstance_tryToUnallocatePartition()
self.assertNotEqual(
'Visited by Instance_tryToUnallocatePartition',
self.software_instance.workflow_history['edit_workflow'][-1]['comment'])
def test_alarm_validated(self):
self._makeComputer()
self.software_instance.setAggregate(self.partition.getRelativeUrl())
self.partition.markBusy()
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.tic()
self._simulateInstance_tryToUnallocatePartition()
try:
self.portal.portal_alarms.slapos_free_computer_partition.activeSense()
self.tic()
finally:
self._dropInstance_tryToUnallocatePartition()
self.assertNotEqual(
'Visited by Instance_tryToUnallocatePartition',
self.software_instance.workflow_history['edit_workflow'][-1]['comment'])
def test_alarm_start_requested(self):
self._makeComputer()
self.software_instance.setAggregate(self.partition.getRelativeUrl())
self.partition.markBusy()
self.tic()
self._simulateInstance_tryToUnallocatePartition()
try:
self.portal.portal_alarms.slapos_free_computer_partition.activeSense()
self.tic()
finally:
self._dropInstance_tryToUnallocatePartition()
self.assertNotEqual(
'Visited by Instance_tryToUnallocatePartition',
self.software_instance.workflow_history['edit_workflow'][-1]['comment'])
class TestSlapOSFreeComputerPartitionAlarmWithSlave(testSlapOSMixin):
def afterSetUp(self):
super(TestSlapOSFreeComputerPartitionAlarmWithSlave, self).afterSetUp()
self._makeTree(requested_template_id='template_slave_instance')
def test_Instance_tryToUnallocatePartition(self):
self._makeComputer()
self.software_instance.setAggregate(self.partition.getRelativeUrl())
self.partition.markBusy()
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.tic()
self.software_instance.Instance_tryToUnallocatePartition()
self.tic()
self.assertEqual(None, self.software_instance.getAggregate())
self.assertEqual('free', self.partition.getSlapState())
def test_Instance_tryToUnallocatePartition_nonDestroyed(self):
self._makeComputer()
self.software_instance.setAggregate(self.partition.getRelativeUrl())
self.partition.markBusy()
self.tic()
self.software_instance.Instance_tryToUnallocatePartition()
self.tic()
self.assertEqual(self.partition.getRelativeUrl(),
self.software_instance.getAggregate())
self.assertEqual('busy', self.partition.getSlapState())
# Copyright (c) 2002-2012 Nexedi SA and Contributors. All Rights Reserved.
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
import transaction
from Products.ERP5Type.tests.utils import createZODBPythonScript
class TestSlapOSGarbageCollectDestroyedRootTreeAlarm(testSlapOSMixin):
def afterSetUp(self):
super(TestSlapOSGarbageCollectDestroyedRootTreeAlarm, self).afterSetUp()
self._makeTree()
def test_Instance_tryToGarbageCollect(self):
self.hosting_subscription.archive()
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.tic()
self.requested_software_instance.Instance_tryToGarbageCollect()
self.tic()
self.assertEqual('destroy_requested',
self.requested_software_instance.getSlapState())
self.assertEqual('validated',
self.requested_software_instance.getValidationState())
def test_Instance_tryToGarbageCollect_not_destroy_requested(self):
self.requested_software_instance.Instance_tryToGarbageCollect()
self.tic()
self.assertEqual('start_requested',
self.requested_software_instance.getSlapState())
self.assertEqual('validated',
self.requested_software_instance.getValidationState())
def test_Instance_tryToGarbageCollect_not_archived(self):
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.tic()
self.requested_software_instance.Instance_tryToGarbageCollect()
self.tic()
self.assertEqual('start_requested',
self.requested_software_instance.getSlapState())
self.assertEqual('validated',
self.requested_software_instance.getValidationState())
def test_Instance_tryToGarbageCollect_only_instance_destroy_requested(self):
self.portal.portal_workflow._jumpToStateFor(self.software_instance,
'destroy_requested')
self.tic()
self.requested_software_instance.Instance_tryToGarbageCollect()
self.tic()
self.assertEqual('start_requested',
self.requested_software_instance.getSlapState())
self.assertEqual('validated',
self.requested_software_instance.getValidationState())
def _simulateInstance_tryToGarbageCollect(self):
script_name = 'Instance_tryToGarbageCollect'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
script_name,
'*args, **kwargs',
'# Script body\n'
"""portal_workflow = context.portal_workflow
portal_workflow.doActionFor(context, action='edit_action', comment='Visited by Instance_tryToGarbageCollect') """ )
transaction.commit()
def _dropInstance_tryToGarbageCollect(self):
script_name = 'Instance_tryToGarbageCollect'
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
def test_alarm(self):
self.hosting_subscription.archive()
self.tic()
self._simulateInstance_tryToGarbageCollect()
try:
self.portal.portal_alarms.slapos_garbage_collect_destroyed_root_tree.activeSense()
self.tic()
finally:
self._dropInstance_tryToGarbageCollect()
self.assertEqual(
'Visited by Instance_tryToGarbageCollect',
self.software_instance.workflow_history['edit_workflow'][-1]['comment'])
def test_alarm_invalidated(self):
self.hosting_subscription.archive()
self.software_instance.invalidate()
self.tic()
self._simulateInstance_tryToGarbageCollect()
try:
self.portal.portal_alarms.slapos_garbage_collect_destroyed_root_tree.activeSense()
self.tic()
finally:
self._dropInstance_tryToGarbageCollect()
self.assertNotEqual(
'Visited by Instance_tryToGarbageCollect',
self.software_instance.workflow_history['edit_workflow'][-1]['comment'])
def test_alarm_not_archived(self):
self.tic()
self._simulateInstance_tryToGarbageCollect()
try:
self.portal.portal_alarms.slapos_garbage_collect_destroyed_root_tree.activeSense()
self.tic()
finally:
self._dropInstance_tryToGarbageCollect()
self.assertNotEqual(
'Visited by Instance_tryToGarbageCollect',
self.software_instance.workflow_history['edit_workflow'][-1]['comment'])
# Copyright (c) 2002-2012 Nexedi SA and Contributors. All Rights Reserved.
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
import transaction
from Products.ERP5Type.tests.utils import createZODBPythonScript
import json
class TestSlapOSGarbageCollectDestroyedRootTreeAlarm(testSlapOSMixin):
def afterSetUp(self):
super(TestSlapOSGarbageCollectDestroyedRootTreeAlarm, self).afterSetUp()
self.computer = self.portal.computer_module.template_computer\
.Base_createCloneDocument(batch_mode=1)
self.computer.edit(
allocation_scope='open/public',
capacity_scope='open',
reference='TESTC-%s' % self.generateNewId(),
)
self.computer.validate()
memcached_dict = self.portal.portal_memcached.getMemcachedDict(
key_prefix='slap_tool',
plugin_path='portal_memcached/default_memcached_plugin')
memcached_dict[self.computer.getReference()] = json.dumps({
'text': '#access ok'
})
transaction.commit()
def test_Computer_checkAndUpdateCapacityScope(self):
self.computer.Computer_checkAndUpdateCapacityScope()
self.assertEqual('open', self.computer.getCapacityScope())
def test_Computer_checkAndUpdateCapacityScope_no_capacity_quantity(self):
self._makeTree()
self.computer.edit(capacity_quantity=1)
partition = self.computer.newContent(portal_type='Computer Partition',
reference='part1')
partition.markFree()
partition.markBusy()
partition.validate()
self.software_instance.setAggregate(partition.getRelativeUrl())
self.tic()
self.computer.Computer_checkAndUpdateCapacityScope()
self.assertEqual('close', self.computer.getCapacityScope())
self.assertEqual('Computer capacity limit exceeded',
self.computer.workflow_history['edit_workflow'][-1]['comment'])
def test_Computer_checkAndUpdateCapacityScope_no_access(self):
self.computer.edit(reference='TESTC-%s' % self.generateNewId())
self.computer.Computer_checkAndUpdateCapacityScope()
self.assertEqual('close', self.computer.getCapacityScope())
self.assertEqual("Computer didn't contact the server",
self.computer.workflow_history['edit_workflow'][-1]['comment'])
def test_Computer_checkAndUpdateCapacityScope_close(self):
self.computer.edit(capacity_scope='close')
self.computer.Computer_checkAndUpdateCapacityScope()
self.assertEqual('open', self.computer.getCapacityScope())
def test_Computer_checkAndUpdateCapacityScope_with_error(self):
memcached_dict = self.portal.portal_memcached.getMemcachedDict(
key_prefix='slap_tool',
plugin_path='portal_memcached/default_memcached_plugin')
memcached_dict[self.computer.getReference()] = json.dumps({
'text': '#error not ok'
})
self.computer.Computer_checkAndUpdateCapacityScope()
self.assertEqual('close', self.computer.getCapacityScope())
self.assertEqual("Computer reported an error",
self.computer.workflow_history['edit_workflow'][-1]['comment'])
def test_Computer_checkAndUpdateCapacityScope_with_error_non_public(self):
memcached_dict = self.portal.portal_memcached.getMemcachedDict(
key_prefix='slap_tool',
plugin_path='portal_memcached/default_memcached_plugin')
memcached_dict[self.computer.getReference()] = json.dumps({
'text': '#error not ok'
})
self.computer.edit(allocation_scope='open/personal')
self.computer.Computer_checkAndUpdateCapacityScope()
self.assertEqual('open', self.computer.getCapacityScope())
def _simulateComputer_checkAndUpdateCapacityScope(self):
script_name = 'Computer_checkAndUpdateCapacityScope'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
script_name,
'*args, **kwargs',
'# Script body\n'
"""portal_workflow = context.portal_workflow
portal_workflow.doActionFor(context, action='edit_action', comment='Visited by Computer_checkAndUpdateCapacityScope') """ )
transaction.commit()
def _dropComputer_checkAndUpdateCapacityScope(self):
script_name = 'Computer_checkAndUpdateCapacityScope'
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
def test_alarm(self):
self._simulateComputer_checkAndUpdateCapacityScope()
try:
self.portal.portal_alarms.slapos_update_computer_capacity_scope.activeSense()
self.tic()
finally:
self._dropComputer_checkAndUpdateCapacityScope()
self.assertEqual(
'Visited by Computer_checkAndUpdateCapacityScope',
self.computer.workflow_history['edit_workflow'][-1]['comment'])
def test_alarm_non_public(self):
self.computer.edit(allocation_scope='open/personal')
self.tic()
self._simulateComputer_checkAndUpdateCapacityScope()
try:
self.portal.portal_alarms.slapos_update_computer_capacity_scope.activeSense()
self.tic()
finally:
self._dropComputer_checkAndUpdateCapacityScope()
self.assertNotEqual(
'Visited by Computer_checkAndUpdateCapacityScope',
self.computer.workflow_history['edit_workflow'][-1]['comment'])
def test_alarm_invalidated(self):
self.computer.invalidate()
self.tic()
self._simulateComputer_checkAndUpdateCapacityScope()
try:
self.portal.portal_alarms.slapos_update_computer_capacity_scope.activeSense()
self.tic()
finally:
self._dropComputer_checkAndUpdateCapacityScope()
self.assertNotEqual(
'Visited by Computer_checkAndUpdateCapacityScope',
self.computer.workflow_history['edit_workflow'][-1]['comment'])
246 247
\ No newline at end of file \ No newline at end of file
testSlapOSCloudComputerPartitionSlapInterfaceWorkflow testSlapOSCloudAlarm
testSlapOSCloudComputerSlapInterfaceWorkflow
testSlapOSCloudConstraint testSlapOSCloudConstraint
testSlapOSCloudInstanceSlapInterfaceWorkflow
testSlapOSCloudPersonSlapInterfaceWorkflow
testSlapOSCloudPromiseSlapOSModuleIdGeneratorAlarm
testSlapOSCloudSlapOSAllocateInstanceAlarm
testSlapOSCloudSlapOSAssertHostingSubscriptionPredecessorAlarm
testSlapOSCloudSlapOSCloudInteractionWorkflow
testSlapOSCloudSlapOSFreeComputerPartitionAlarm
testSlapOSCloudSlapOSGarbageCollectDestroyedRootTreeAlarm
testSlapOSCloudSlapOSUpdateComputerCapacityScopeAlarm
testSlapOSCloudSecurityGroup testSlapOSCloudSecurityGroup
testSlapOSCloudShadow testSlapOSCloudShadow
testSlapOSCloudWorkflow
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment