Commit a35da019 authored by Łukasz Nowak's avatar Łukasz Nowak

Merge tests per functionality.

parent 6b4bb981
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
#
##############################################################################
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
from DateTime import DateTime
import transaction
import functools
def withAbort(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
func(self, *args, **kwargs)
finally:
transaction.abort()
return wrapped
class TestDefaultInvoiceTransactionRule(testSlapOSMixin):
@withAbort
def test_simulation(self):
from Products.ERP5.Document.SimulationMovement import SimulationMovement
SimulationMovement.original_getSimulationState = SimulationMovement\
.getSimulationState
try:
def getSimulationStatePlanned(self, *args, **kwargs):
return 'planned'
SimulationMovement.getSimulationState = getSimulationStatePlanned
source = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
destination = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
aggregate = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
resource = self.portal.service_module.slapos_instance_subscription
start_date = DateTime('2011/02/16')
stop_date = DateTime('2011/03/16')
root_applied_rule = self.portal.portal_simulation.newContent(
specialise_value=self.portal.portal_rules\
.slapos_invoice_simulation_rule,
portal_type='Applied Rule')
root_simulation_movement = root_applied_rule.newContent(
id='root_simulation_movement',
portal_type='Simulation Movement',
price=2.4,
quantity=4.6,
source_value=source,
source_section_value=source,
destination_value=destination,
destination_section_value=destination,
resource_value=resource,
aggregate_value=aggregate,
start_date=start_date,
stop_date=stop_date,
base_contribution_list=['base_amount/invoicing/discounted',
'base_amount/invoicing/taxable'],
price_currency='currency_module/EUR',
use='trade/sale',
trade_phase='slapos/invoicing',
quantity_unit='unit/piece',
specialise='sale_trade_condition_module/slapos_trade_condition',
causality_list=['business_process_module/slapos_sale_business_p'
'rocess/invoice_path', 'business_process_module/slapos_sale_b'
'usiness_process/invoice'],
delivery_value=self.portal.accounting_module.newContent(
portal_type='Sale Invoice Transaction').newContent(
portal_type='Invoice Line')
)
self.assertEqual('planned',
root_simulation_movement.getSimulationState())
root_simulation_movement.expand(expand_policy='immediate')
applied_rule_list = [q for q in root_simulation_movement.contentValues(
portal_type='Applied Rule') if q.getSpecialiseReference() == \
'default_invoice_transaction_rule']
# movement is in final state, it shall be expanded
self.assertEqual(1, len(applied_rule_list))
applied_rule = applied_rule_list[0]
simulation_movement_list = applied_rule.contentValues(
portal_type='Simulation Movement')
self.assertEqual(2, len(simulation_movement_list))
debit_movement_list = [q for q in simulation_movement_list if \
q.getCausality() == 'business_process_module/slapos_sale_busines'
's_process/account_debit_path']
credit_movement_list = [q for q in simulation_movement_list if \
q.getCausality() == 'business_process_module/slapos_sale_busines'
's_process/account_credit_path']
self.assertEqual(1, len(debit_movement_list))
self.assertEqual(1, len(credit_movement_list))
debit_movement = debit_movement_list[0]
credit_movement = credit_movement_list[0]
def checkSimulationMovement(simulation_movement, source, destination,
quantity_sign, child_rule_reference_list):
self.assertEqual('planned', simulation_movement.getSimulationState())
self.assertEqual(source, simulation_movement.getSource())
self.assertEqual(root_simulation_movement.getSourceSection(),
simulation_movement.getSourceSection())
self.assertEqual(destination, simulation_movement.getDestination())
self.assertEqual(root_simulation_movement.getDestinationSection(),
simulation_movement.getDestinationSection())
self.assertEqual(1, simulation_movement.getPrice())
self.assertEqual(root_simulation_movement.getTotalPrice() *\
quantity_sign, simulation_movement.getQuantity())
self.assertEqual(root_simulation_movement.getPriceCurrency(),
simulation_movement.getResource())
self.assertEqual([], simulation_movement.getAggregateList())
self.assertEqual(root_simulation_movement.getStopDate(),
simulation_movement.getStartDate())
self.assertEqual(root_simulation_movement.getStopDate(),
simulation_movement.getStopDate())
self.assertEqual([], simulation_movement.getBaseContributionList())
self.assertEqual(None, simulation_movement.getPriceCurrency())
self.assertEqual(None, simulation_movement.getUse())
self.assertEqual('slapos/accounting',
simulation_movement.getTradePhase())
self.assertEqual(root_simulation_movement.getQuantityUnit(),
simulation_movement.getQuantityUnit())
self.assertEqual(root_simulation_movement.getSpecialise(),
simulation_movement.getSpecialise())
self.assertSameSet(child_rule_reference_list,
[q.getSpecialiseReference() for q in simulation_movement\
.contentValues(portal_type='Applied Rule')])
checkSimulationMovement(debit_movement, 'account_module/receivable',
'account_module/payable', -1, ['default_payment_rule'])
checkSimulationMovement(credit_movement, 'account_module/sales',
'account_module/purchase', 1, [])
finally:
SimulationMovement.getSimulationState = SimulationMovement\
.original_getSimulationState
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
#
##############################################################################
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
from DateTime import DateTime
import transaction
import functools
def withAbort(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
func(self, *args, **kwargs)
finally:
transaction.abort()
return wrapped
class TestDefaultInvoicingRule(testSlapOSMixin):
@withAbort
def test_simulation(self):
from Products.ERP5.Document.SimulationMovement import SimulationMovement
SimulationMovement.original_getSimulationState = SimulationMovement\
.getSimulationState
try:
def getSimulationStatePlanned(self, *args, **kwargs):
return 'planned'
SimulationMovement.getSimulationState = getSimulationStatePlanned
source = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
destination = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
aggregate = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
resource = self.portal.service_module.slapos_instance_subscription
start_date = DateTime('2011/02/16')
stop_date = DateTime('2011/03/16')
root_applied_rule = self.portal.portal_simulation.newContent(
specialise_value=self.portal.portal_rules\
.slapos_subscription_item_rule,
portal_type='Applied Rule')
root_simulation_movement = root_applied_rule.newContent(
id='root_simulation_movement',
portal_type='Simulation Movement',
price=2.4,
quantity=4.6,
source_value=source,
source_section_value=source,
destination_value=destination,
destination_section_value=destination,
resource_value=resource,
aggregate_value=aggregate,
start_date=start_date,
stop_date=stop_date,
base_contribution_list=['base_amount/invoicing/discounted',
'base_amount/invoicing/taxable'],
price_currency='currency_module/EUR',
use='trade/sale',
trade_phase='slapos/delivery',
quantity_unit='unit/piece',
specialise='sale_trade_condition_module/slapos_trade_condition',
causality_list=['business_process_module/slapos_sale_business_p'
'rocess/delivery_path', 'business_process_module/slapos_sale_'
'business_process/deliver'])
self.assertEqual('planned',
root_simulation_movement.getSimulationState())
root_simulation_movement.expand(expand_policy='immediate')
applied_rule_list = root_simulation_movement.contentValues(
portal_type='Applied Rule')
# movement is in final state, it shall be expanded
self.assertEqual(1, len(applied_rule_list))
applied_rule = applied_rule_list[0]
self.assertEqual('default_invoicing_rule',
applied_rule.getSpecialiseReference())
simulation_movement_list = applied_rule.contentValues(
portal_type='Simulation Movement')
self.assertEqual(1, len(simulation_movement_list))
simulation_movement = simulation_movement_list[0]
self.assertEqual('planned', simulation_movement.getSimulationState())
self.assertEqual(root_simulation_movement.getSource(),
simulation_movement.getSource())
self.assertEqual(root_simulation_movement.getSourceSection(),
simulation_movement.getSourceSection())
self.assertEqual(root_simulation_movement.getDestination(),
simulation_movement.getDestination())
self.assertEqual(root_simulation_movement.getDestinationSection(),
simulation_movement.getDestinationSection())
self.assertEqual(root_simulation_movement.getPrice(),
simulation_movement.getPrice())
self.assertEqual(root_simulation_movement.getQuantity(),
simulation_movement.getQuantity())
self.assertEqual(root_simulation_movement.getResource(),
simulation_movement.getResource())
self.assertEqual(root_simulation_movement.getAggregateList(),
simulation_movement.getAggregateList())
self.assertEqual(root_simulation_movement.getStartDate(),
simulation_movement.getStartDate())
self.assertEqual(root_simulation_movement.getStopDate(),
simulation_movement.getStopDate())
self.assertEqual(root_simulation_movement.getBaseContributionList(),
simulation_movement.getBaseContributionList())
self.assertEqual(root_simulation_movement.getPriceCurrency(),
simulation_movement.getPriceCurrency())
self.assertEqual(root_simulation_movement.getUse(),
simulation_movement.getUse())
self.assertEqual('slapos/invoicing',
simulation_movement.getTradePhase())
self.assertEqual(root_simulation_movement.getQuantityUnit(),
simulation_movement.getQuantityUnit())
self.assertEqual(root_simulation_movement.getSpecialise(),
simulation_movement.getSpecialise())
self.assertEqual(['business_process_module/slapos_sale_business_p'
'rocess/invoice_path', 'business_process_module/slapos_sale_b'
'usiness_process/invoice'], simulation_movement.getCausalityList())
# check children rules' type
child_applied_rule_type_list = [q.getSpecialiseReference() for q in \
simulation_movement.contentValues(portal_type='Applied Rule')]
self.assertSameSet(
['default_invoice_transaction_rule', 'default_trade_model_rule'],
child_applied_rule_type_list)
finally:
SimulationMovement.getSimulationState = SimulationMovement\
.original_getSimulationState
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
#
##############################################################################
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
from DateTime import DateTime
import transaction
import functools
def withAbort(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
func(self, *args, **kwargs)
finally:
transaction.abort()
return wrapped
class TestDefaultInvoiceTransactionRule(testSlapOSMixin):
@withAbort
def test_simulation(self):
from Products.ERP5.Document.SimulationMovement import SimulationMovement
SimulationMovement.original_getSimulationState = SimulationMovement\
.getSimulationState
try:
def getSimulationStatePlanned(self, *args, **kwargs):
return 'planned'
SimulationMovement.getSimulationState = getSimulationStatePlanned
source = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
destination = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
resource = self.portal.currency_module.EUR
start_date = DateTime('2011/02/16')
stop_date = DateTime('2011/03/16')
root_applied_rule = self.portal.portal_simulation.newContent(
specialise_value=self.portal.portal_rules\
.slapos_invoice_transaction_simulation_rule,
portal_type='Applied Rule')
root_simulation_movement = root_applied_rule.newContent(
id='root_simulation_movement',
portal_type='Simulation Movement',
price=1,
quantity=-4.6,
source='account_module/receivable',
source_section_value=source,
destination='account_module/payable',
destination_section_value=destination,
resource_value=resource,
start_date=start_date,
stop_date=stop_date,
use='trade/sale',
trade_phase='slapos/accounting',
quantity_unit='unit/piece',
specialise='sale_trade_condition_module/slapos_trade_condition',
causality_list=['business_process_module/slapos_sale_busines'
's_process/pay'],
delivery_value=self.portal.accounting_module.newContent(
portal_type='Sale Invoice Transaction').newContent(
portal_type='Invoice Line')
)
self.assertEqual('planned',
root_simulation_movement.getSimulationState())
root_simulation_movement.expand(expand_policy='immediate')
applied_rule_list = root_simulation_movement.contentValues(
portal_type='Applied Rule')
# movement is in final state, it shall be expanded
self.assertEqual(1, len(applied_rule_list))
applied_rule = applied_rule_list[0]
self.assertEqual('default_payment_rule',
applied_rule.getSpecialiseReference())
simulation_movement_list = applied_rule.contentValues(
portal_type='Simulation Movement')
self.assertEqual(2, len(simulation_movement_list))
debit_movement_list = [q for q in simulation_movement_list if \
q.getCausality() == 'business_process_module/slapos_sale_busines'
's_process/payment_debit_path']
credit_movement_list = [q for q in simulation_movement_list if \
q.getCausality() == 'business_process_module/slapos_sale_busines'
's_process/payment_credit_path']
self.assertEqual(1, len(debit_movement_list))
self.assertEqual(1, len(credit_movement_list))
debit_movement = debit_movement_list[0]
credit_movement = credit_movement_list[0]
def checkSimulationMovement(simulation_movement, source, destination,
quantity_sign, child_rule_reference_list):
self.assertEqual('planned', simulation_movement.getSimulationState())
self.assertEqual(source, simulation_movement.getSource())
self.assertEqual(root_simulation_movement.getSourceSection(),
simulation_movement.getSourceSection())
self.assertEqual(destination, simulation_movement.getDestination())
self.assertEqual(root_simulation_movement.getDestinationSection(),
simulation_movement.getDestinationSection())
self.assertEqual(1, simulation_movement.getPrice())
self.assertEqual(root_simulation_movement.getTotalPrice() *\
quantity_sign, simulation_movement.getQuantity())
self.assertEqual(root_simulation_movement.getResource(),
simulation_movement.getResource())
self.assertEqual([], simulation_movement.getAggregateList())
self.assertEqual(root_simulation_movement.getStartDate(),
simulation_movement.getStartDate())
self.assertEqual(root_simulation_movement.getStopDate(),
simulation_movement.getStopDate())
self.assertEqual([], simulation_movement.getBaseContributionList())
self.assertEqual(None, simulation_movement.getPriceCurrency())
self.assertEqual(None, simulation_movement.getUse())
self.assertEqual('slapos/payment',
simulation_movement.getTradePhase())
self.assertEqual(root_simulation_movement.getQuantityUnit(),
simulation_movement.getQuantityUnit())
self.assertEqual(root_simulation_movement.getSpecialise(),
simulation_movement.getSpecialise())
self.assertEqual(0, len(simulation_movement.contentValues(
portal_type='Applied Rule')))
checkSimulationMovement(debit_movement, 'account_module/bank',
'account_module/bank', 1, ['default_payment_rule'])
checkSimulationMovement(credit_movement, 'account_module/receivable',
'account_module/payable', -1, [])
finally:
SimulationMovement.getSimulationState = SimulationMovement\
.original_getSimulationState
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
#
##############################################################################
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
from DateTime import DateTime
import transaction
import functools
def withAbort(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
func(self, *args, **kwargs)
finally:
transaction.abort()
return wrapped
class TestDefaultInvoiceTransactionRule(testSlapOSMixin):
@withAbort
def test_simulation(self):
from Products.ERP5.Document.SimulationMovement import SimulationMovement
SimulationMovement.original_getSimulationState = SimulationMovement\
.getSimulationState
try:
def getSimulationStatePlanned(self, *args, **kwargs):
return 'planned'
SimulationMovement.getSimulationState = getSimulationStatePlanned
source = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
destination = self.portal.person_module.template_member\
.Base_createCloneDocument(batch_mode=1)
aggregate = self.portal.hosting_subscription_module\
.template_hosting_subscription.Base_createCloneDocument(batch_mode=1)
resource = self.portal.service_module.slapos_instance_subscription
start_date = DateTime('2011/02/16')
stop_date = DateTime('2011/03/16')
root_applied_rule = self.portal.portal_simulation.newContent(
specialise_value=self.portal.portal_rules\
.slapos_invoice_simulation_rule,
portal_type='Applied Rule')
root_simulation_movement = root_applied_rule.newContent(
id='root_simulation_movement',
portal_type='Simulation Movement',
price=2.4,
quantity=4.6,
source_value=source,
source_section_value=source,
destination_value=destination,
destination_section_value=destination,
resource_value=resource,
aggregate_value=aggregate,
start_date=start_date,
stop_date=stop_date,
base_contribution_list=['base_amount/invoicing/discounted',
'base_amount/invoicing/taxable'],
price_currency='currency_module/EUR',
use='trade/sale',
trade_phase='slapos/invoicing',
quantity_unit='unit/piece',
specialise='sale_trade_condition_module/slapos_trade_condition',
causality_list=['business_process_module/slapos_sale_business_p'
'rocess/invoice_path', 'business_process_module/slapos_sale_b'
'usiness_process/invoice'],
delivery_value=self.portal.accounting_module.newContent(
portal_type='Sale Invoice Transaction').newContent(
portal_type='Invoice Line')
)
self.assertEqual('planned',
root_simulation_movement.getSimulationState())
root_simulation_movement.expand(expand_policy='immediate')
applied_rule_list = [q for q in root_simulation_movement.contentValues(
portal_type='Applied Rule') if q.getSpecialiseReference() == \
'default_trade_model_rule']
# movement is in final state, it shall be expanded
self.assertEqual(1, len(applied_rule_list))
applied_rule = applied_rule_list[0]
simulation_movement_list = applied_rule.contentValues(
portal_type='Simulation Movement')
self.assertEqual(1, len(simulation_movement_list))
simulation_movement = simulation_movement_list[0]
self.assertEqual('planned', simulation_movement.getSimulationState())
self.assertEqual(root_simulation_movement.getSource(),
simulation_movement.getSource())
self.assertEqual(root_simulation_movement.getSourceSection(),
simulation_movement.getSourceSection())
self.assertEqual(root_simulation_movement.getDestination(),
simulation_movement.getDestination())
self.assertEqual(root_simulation_movement.getDestinationSection(),
simulation_movement.getDestinationSection())
self.assertEqual(root_simulation_movement.getTotalPrice(),
simulation_movement.getQuantity())
self.assertEqual(root_simulation_movement.getPriceCurrency(),
simulation_movement.getPriceCurrency())
self.assertEqual([], simulation_movement.getAggregateList())
self.assertEqual(root_simulation_movement.getStopDate(),
simulation_movement.getStartDate())
self.assertEqual(root_simulation_movement.getStopDate(),
simulation_movement.getStopDate())
self.assertEqual(None, simulation_movement.getUse())
trade_model_line_list = simulation_movement.getCausalityValueList(
portal_type='Trade Model Line')
self.assertEqual(1, len(trade_model_line_list))
trade_model_line = trade_model_line_list[0]
self.assertEqual(trade_model_line.getPrice(),
simulation_movement.getPrice())
self.assertEqual(trade_model_line.getResource(),
simulation_movement.getResource())
self.assertEqual([], simulation_movement.getBaseContributionList())
self.assertEqual('slapos/tax',
simulation_movement.getTradePhase())
self.assertEqual(root_simulation_movement.getQuantityUnit(),
simulation_movement.getQuantityUnit())
self.assertEqual(root_simulation_movement.getSpecialise(),
simulation_movement.getSpecialise())
finally:
SimulationMovement.getSimulationState = SimulationMovement\
.original_getSimulationState
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
#
##############################################################################
import transaction
import functools
import os
import tempfile
from Products.ERP5Type.tests.utils import createZODBPythonScript
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
def withAbort(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
func(self, *args, **kwargs)
finally:
transaction.abort()
return wrapped
class Simulator:
def __init__(self, outfile, method, to_return=None):
self.outfile = outfile
open(self.outfile, 'w').write(repr([]))
self.method = method
self.to_return = to_return
def __call__(self, *args, **kwargs):
"""Simulation Method"""
old = open(self.outfile, 'r').read()
if old:
l = eval(old)
else:
l = []
l.append({'recmethod': self.method,
'recargs': args,
'reckwargs': kwargs})
open(self.outfile, 'w').write(repr(l))
return self.to_return
def simulateDelivery_manageBuildingCalculatingDelivery(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
script_name = 'Delivery_manageBuildingCalculatingDelivery'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
script_name,
'*args, **kwargs',
'# Script body\n'
"""portal_workflow = context.portal_workflow
if context.getTitle() == 'Not visited by Delivery_manageBuildingCalculatingDelivery':
context.setTitle('Visited by Delivery_manageBuildingCalculatingDelivery')
""" )
transaction.commit()
try:
func(self, *args, **kwargs)
finally:
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
return wrapped
class TestAlarm(testSlapOSMixin):
@simulateDelivery_manageBuildingCalculatingDelivery
def _test(self, state, message):
delivery = self.portal.sale_packing_list_module.newContent(
title='Not visited by Delivery_manageBuildingCalculatingDelivery',
portal_type='Sale Packing List')
self.portal.portal_workflow._jumpToStateFor(delivery, state)
self.tic()
self.portal.portal_alarms.slapos_manage_building_calculating_delivery\
.activeSense()
self.tic()
self.assertEqual(message, delivery.getTitle())
def test_building(self):
self._test('building', 'Visited by Delivery_manageBuildingCalculatingDelivery')
def test_calculating(self):
self._test('calculating', 'Visited by Delivery_manageBuildingCalculatingDelivery')
def test_diverged(self):
self._test('diverged', 'Not visited by Delivery_manageBuildingCalculatingDelivery')
def test_solved(self):
self._test('solved', 'Not visited by Delivery_manageBuildingCalculatingDelivery')
@withAbort
def _test_Delivery_manageBuildingCalculatingDelivery(self, state, empty=False):
delivery = self.portal.sale_packing_list_module.newContent(
title='Not visited by Delivery_manageBuildingCalculatingDelivery',
portal_type='Sale Packing List')
self.portal.portal_workflow._jumpToStateFor(delivery, state)
updateCausalityState_simulator = tempfile.mkstemp()[1]
updateSimulation_simulator = tempfile.mkstemp()[1]
try:
from Products.ERP5.Document.Delivery import Delivery
Delivery.original_updateCausalityState = Delivery\
.updateCausalityState
Delivery.updateCausalityState = Simulator(
updateCausalityState_simulator, 'updateCausalityState')
Delivery.updateSimulation = Simulator(
updateSimulation_simulator, 'updateSimulation')
delivery.Delivery_manageBuildingCalculatingDelivery()
updateCausalityState_value = eval(open(updateCausalityState_simulator).read())
updateSimulation_value = eval(open(updateSimulation_simulator).read())
if empty:
self.assertEqual([], updateCausalityState_value)
self.assertEqual([], updateSimulation_value)
else:
self.assertEqual([{
'recmethod': 'updateCausalityState',
'recargs': (),
'reckwargs': {'solve_automatically': False}}],
updateCausalityState_value
)
self.assertEqual([{
'recmethod': 'updateSimulation',
'recargs': (),
'reckwargs': {'expand_root': 1, 'expand_related': 1}}],
updateSimulation_value
)
finally:
Delivery.updateCausalityState = Delivery.original_updateCausalityState
delattr(Delivery, 'original_updateCausalityState')
if os.path.exists(updateCausalityState_simulator):
os.unlink(updateCausalityState_simulator)
if os.path.exists(updateSimulation_simulator):
os.unlink(updateSimulation_simulator)
def test_Delivery_manageBuildingCalculatingDelivery_calculating(self):
self._test_Delivery_manageBuildingCalculatingDelivery('calculating')
def test_Delivery_manageBuildingCalculatingDelivery_building(self):
self._test_Delivery_manageBuildingCalculatingDelivery('building')
def test_Delivery_manageBuildingCalculatingDelivery_solved(self):
self._test_Delivery_manageBuildingCalculatingDelivery('solved', True)
def test_Delivery_manageBuildingCalculatingDelivery_diverged(self):
self._test_Delivery_manageBuildingCalculatingDelivery('diverged', True)
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
#
##############################################################################
import transaction
import functools
import os
import tempfile
from Products.ERP5Type.tests.utils import createZODBPythonScript
from Products.SlapOS.tests.testSlapOSMixin import \
testSlapOSMixin
def withAbort(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
try:
func(self, *args, **kwargs)
finally:
transaction.abort()
return wrapped
class Simulator:
def __init__(self, outfile, method, to_return=None):
self.outfile = outfile
open(self.outfile, 'w').write(repr([]))
self.method = method
self.to_return = to_return
def __call__(self, *args, **kwargs):
"""Simulation Method"""
old = open(self.outfile, 'r').read()
if old:
l = eval(old)
else:
l = []
l.append({'recmethod': self.method,
'recargs': args,
'reckwargs': kwargs})
open(self.outfile, 'w').write(repr(l))
return self.to_return
def simulateSimulationMovement_buildSlapOS(func):
@functools.wraps(func)
def wrapped(self, *args, **kwargs):
script_name = 'SimulationMovement_buildSlapOS'
if script_name in self.portal.portal_skins.custom.objectIds():
raise ValueError('Precondition failed: %s exists in custom' % script_name)
createZODBPythonScript(self.portal.portal_skins.custom,
script_name,
'*args, **kwargs',
'# Script body\n'
"""portal_workflow = context.portal_workflow
if context.getTitle() == 'Not visited by SimulationMovement_buildSlapOS':
context.setTitle('Visited by SimulationMovement_buildSlapOS')
""" )
transaction.commit()
try:
func(self, *args, **kwargs)
finally:
if script_name in self.portal.portal_skins.custom.objectIds():
self.portal.portal_skins.custom.manage_delObjects(script_name)
transaction.commit()
return wrapped
class TestAlarm(testSlapOSMixin):
@simulateSimulationMovement_buildSlapOS
def test_SimulationMovement_withoutDelivery(self):
applied_rule = self.portal.portal_simulation.newContent(
portal_type='Applied Rule')
simulation_movement = applied_rule.newContent(
portal_type='Simulation Movement',
title='Not visited by SimulationMovement_buildSlapOS')
self.tic()
self.portal.portal_alarms.slapos_trigger_build.activeSense()
self.tic()
self.assertEqual(
'Visited by SimulationMovement_buildSlapOS',
simulation_movement.getTitle())
@simulateSimulationMovement_buildSlapOS
def test_SimulationMovement_withDelivery(self):
delivery = self.portal.sale_packing_list_module.newContent(
portal_type='Sale Packing List')
delivery_line = delivery.newContent(portal_type='Sale Packing List Line')
applied_rule = self.portal.portal_simulation.newContent(
portal_type='Applied Rule')
simulation_movement = applied_rule.newContent(
portal_type='Simulation Movement',
delivery=delivery_line.getRelativeUrl(),
title='Shall be visited by SimulationMovement_buildSlapOS')
self.tic()
self.portal.portal_alarms.slapos_trigger_build.activeSense()
self.tic()
self.assertNotEqual(
'Not visited by SimulationMovement_buildSlapOS',
simulation_movement.getTitle())
@withAbort
def test_SimulationMovement_buildSlapOS(self):
business_process = self.portal.business_process_module.newContent(
portal_type='Business Process')
root_business_link = business_process.newContent(
portal_type='Business Link')
business_link = business_process.newContent(portal_type='Business Link')
root_applied_rule = self.portal.portal_simulation.newContent(
portal_type='Applied Rule')
simulation_movement = root_applied_rule.newContent(
causality=root_business_link.getRelativeUrl(),
portal_type='Simulation Movement')
applied_rule = simulation_movement.newContent(portal_type='Applied Rule')
lower_simulation_movement = applied_rule.newContent(
causality=business_link.getRelativeUrl(),
portal_type='Simulation Movement')
build_simulator = tempfile.mkstemp()[1]
activate_simulator = tempfile.mkstemp()[1]
try:
from Products.CMFActivity.ActiveObject import ActiveObject
ActiveObject.original_activate = ActiveObject.activate
ActiveObject.activate = Simulator(activate_simulator, 'activate',
root_applied_rule)
from Products.ERP5.Document.BusinessLink import BusinessLink
BusinessLink.original_build = BusinessLink.build
BusinessLink.build = Simulator(build_simulator, 'build')
simulation_movement.SimulationMovement_buildSlapOS(tag='root_tag')
build_value = eval(open(build_simulator).read())
activate_value = eval(open(activate_simulator).read())
self.assertEqual([{
'recmethod': 'build',
'recargs': (),
'reckwargs': {'path': '%s/%%' % root_applied_rule.getPath(),
'activate_kw': {'tag': 'root_tag'}}}],
build_value
)
self.assertEqual([{
'recmethod': 'activate',
'recargs': (),
'reckwargs': {'tag': 'build_in_progress_%s_%s' % (
root_business_link.getUid(), root_applied_rule.getUid()),
'after_tag': 'root_tag', 'activity': 'SQLQueue'}}],
activate_value)
open(build_simulator, 'w').truncate()
open(activate_simulator, 'w').truncate()
lower_simulation_movement.SimulationMovement_buildSlapOS(tag='lower_tag')
build_value = eval(open(build_simulator).read())
activate_value = eval(open(activate_simulator).read())
self.assertEqual([{
'recmethod': 'build',
'recargs': (),
'reckwargs': {'path': '%s/%%' % root_applied_rule.getPath(),
'activate_kw': {'tag': 'lower_tag'}}}],
build_value
)
self.assertEqual([{
'recmethod': 'activate',
'recargs': (),
'reckwargs': {'tag': 'build_in_progress_%s_%s' % (
business_link.getUid(), root_applied_rule.getUid()),
'after_tag': 'lower_tag', 'activity': 'SQLQueue'}}],
activate_value)
finally:
ActiveObject.activate = ActiveObject.original_activate
delattr(ActiveObject, 'original_activate')
BusinessLink.build = BusinessLink.original_build
delattr(BusinessLink, 'original_build')
if os.path.exists(build_simulator):
os.unlink(build_simulator)
if os.path.exists(activate_simulator):
os.unlink(activate_simulator)
@withAbort
def test_SimulationMovement_buildSlapOS_withDelivery(self):
delivery = self.portal.sale_packing_list_module.newContent(
portal_type='Sale Packing List')
delivery_line = delivery.newContent(portal_type='Sale Packing List Line')
business_process = self.portal.business_process_module.newContent(
portal_type='Business Process')
root_business_link = business_process.newContent(
portal_type='Business Link')
business_link = business_process.newContent(portal_type='Business Link')
root_applied_rule = self.portal.portal_simulation.newContent(
portal_type='Applied Rule')
simulation_movement = root_applied_rule.newContent(
causality=root_business_link.getRelativeUrl(),
delivery=delivery_line.getRelativeUrl(),
portal_type='Simulation Movement')
applied_rule = simulation_movement.newContent(portal_type='Applied Rule')
lower_simulation_movement = applied_rule.newContent(
causality=business_link.getRelativeUrl(),
delivery=delivery_line.getRelativeUrl(),
portal_type='Simulation Movement')
build_simulator = tempfile.mkstemp()[1]
activate_simulator = tempfile.mkstemp()[1]
try:
from Products.CMFActivity.ActiveObject import ActiveObject
ActiveObject.original_activate = ActiveObject.activate
ActiveObject.activate = Simulator(activate_simulator, 'activate',
root_applied_rule)
from Products.ERP5.Document.BusinessLink import BusinessLink
BusinessLink.original_build = BusinessLink.build
BusinessLink.build = Simulator(build_simulator, 'build')
simulation_movement.SimulationMovement_buildSlapOS(tag='root_tag')
build_value = eval(open(build_simulator).read())
activate_value = eval(open(activate_simulator).read())
self.assertEqual([], build_value)
self.assertEqual([], activate_value)
open(build_simulator, 'w').write(repr([]))
open(activate_simulator, 'w').write(repr([]))
lower_simulation_movement.SimulationMovement_buildSlapOS(tag='lower_tag')
build_value = eval(open(build_simulator).read())
activate_value = eval(open(activate_simulator).read())
self.assertEqual([], build_value)
self.assertEqual([], activate_value)
finally:
ActiveObject.activate = ActiveObject.original_activate
delattr(ActiveObject, 'original_activate')
BusinessLink.build = BusinessLink.original_build
delattr(BusinessLink, 'original_build')
if os.path.exists(build_simulator):
os.unlink(build_simulator)
if os.path.exists(activate_simulator):
os.unlink(activate_simulator)
91
\ No newline at end of file
92
\ No newline at end of file
testSlapOSAccountingAlarm
testSlapOSAccountingConstraint
testSlapOSAccountingDefaultInvoiceTransactionRule
testSlapOSAccountingDefaultInvoicingRule
testSlapOSAccountingDefaultPaymentRule
testSlapOSAccountingDefaultSubscriptionItemRule
testSlapOSAccountingDefaultTradeModelRule
testSlapOSAccountingInstanceInvoicingAlarm
testSlapOSAccountingInteractionWorkflow
testSlapOSAccountingSlapOSManageBuildingCalculatingDelivery
testSlapOSAccountingSlapOSRequestUpdateHostingSubscriptionOpenSaleOrderAlarm
testSlapOSAccountingSlapOSTriggerBuildAlarm
\ No newline at end of file
testSlapOSAccountingRule
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment