Commit 5cd7a01c authored by Kazuhiko Shiozaki's avatar Kazuhiko Shiozaki

* Movement Group becomes ERP5 Document. See...

* Movement Group becomes ERP5 Document. See http://www.erp5.org/HowToConfigureBuilder for the detail.
* add _duplicate() in CopySupport.py. This method is similar to copy and paste, but it preserves workflow history.
* add a new target solver CopyAndPropagate.py that updates values according to passed divergence list and propagate them.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@23647 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 56831a07
No related merge requests found
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class BaseVariantMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Base Variant Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
category_list = movement.getVariationBaseCategoryList()
if category_list is None:
category_list = []
category_list.sort()
property_dict['_base_category_list'] = category_list
return property_dict
def test(self, object, property_dict):
category_list = object.getVariationBaseCategoryList()
if category_list is None:
category_list = []
category_list.sort()
return property_dict['_base_category_list'] == category_list
def testToUpdate(self, object, property_dict, **kw):
# This movement group does not affect updating.
return True, {}
......@@ -28,7 +28,7 @@
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.ObjectMessage import ObjectMessage
from Products.ERP5Type.DivergenceMessage import DivergenceMessage
from Products.ERP5Type import Permissions, PropertySheet, Interface
from Products.ERP5.Document.PropertyDivergenceTester import \
PropertyDivergenceTester
......@@ -69,12 +69,6 @@ class CategoryDivergenceTester(PropertyDivergenceTester):
divergence_message_list = []
tested_property = self.getTestedPropertyList()
# Get the solver script list
solver_script_list = self.getSolverScriptList()
if solver_script_list is None:
solver_script_list = []
solver_script_list = self._splitStringList(solver_script_list)
delivery_mvt = simulation_movement.getDeliveryValue()
for tested_property_id, tested_property_title in \
self._splitStringList(tested_property):
......@@ -107,17 +101,16 @@ class CategoryDivergenceTester(PropertyDivergenceTester):
else:
simulation_category_title_list.append(category_value.getTitle())
delivery_mvt_property = ' , '.join(delivery_mvt_category_title_list)
simulation_mvt_property = ' , '.join(simulation_category_title_list)
message = ObjectMessage(
message = DivergenceMessage(
divergence_scope='category',
object_relative_url=delivery_mvt.getRelativeUrl(),
simulation_movement=simulation_movement,
decision_value=delivery_mvt_property ,
prevision_value=simulation_mvt_property,
decision_value=delivery_mvt_category_list,
prevision_value=simulation_category_list,
decision_title=', '.join(delivery_mvt_category_title_list),
prevision_title=', '.join(simulation_category_title_list),
tested_property=tested_property_id,
message=tested_property_title,
solver_script_list=solver_script_list
)
divergence_message_list.append(message)
......
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.PropertyMovementGroup import PropertyMovementGroup
class CategoryMovementGroup(PropertyMovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
This movement group is used to group movements that have the same
categories (eg. source, destination, etc.).
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Category Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
for prop in self.getTestedPropertyList():
property_dict['%s_list' % prop] = movement.getPropertyList(prop, None)
return property_dict
def test(self, object, property_dict, property_list=None, **kw):
if property_list not in (None, []):
target_property_list = [x for x in self.getTestedPropertyList() \
if x in property_list]
else:
target_property_list = self.getTestedPropertyList()
for prop in target_property_list:
if sorted(property_dict['%s_list' % prop]) != \
sorted(object.getPropertyList(prop, None)):
return False
return True
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class CausalityAssignmentMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
This movement group is used in order to define the causality on lines
and cells.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Causality Assignment Movement Group'
def _getPropertyDict(self, movement, **kw):
return self._addCausalityToEdit(movement)
def _separate(self, movement_list):
property_dict = {}
for movement in movement_list:
self._addCausalityToEdit(movement, property_dict)
return [[movement_list, property_dict]]
def testToUpdate(self, movement, property_dict, **kw):
# We can always update.
return True, property_dict
def _addCausalityToEdit(self, movement, property_dict=None):
if property_dict is None:
property_dict = {}
parent = movement
# Go upper into the simulation tree in order to find an order link
while parent.getOrderValue() is None and not(parent.isRootAppliedRule()):
parent = parent.getParentValue()
order_movement = parent.getOrderValue()
if order_movement is not None:
causality = property_dict.get('causality_list', [])
order_movement_url = order_movement.getRelativeUrl()
if order_movement_url not in causality:
causality.append(order_movement_url)
property_dict['causality_list'] = causality
return property_dict
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class CausalityMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Causality Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
explanation_relative_url = self._getExplanationRelativeUrl(movement)
property_dict['_explanation'] = explanation_relative_url
return property_dict
def testToUpdate(self, movement, property_dict, **kw):
# we don't care the difference of explanation url when updating
#return True, property_dict
return True, {}
def _getExplanationRelativeUrl(self, movement):
""" Get the order value for a movement """
if hasattr(movement, 'getParentValue'):
# This is a simulation movement
if movement.getParentValue() != movement.getRootAppliedRule() :
# get the explanation of parent movement if we have not been
# created by the root applied rule.
movement = movement.getParentValue().getParentValue()
explanation_value = movement.getExplanationValue()
if explanation_value is None:
raise ValueError, 'No explanation for movement %s' % movement.getPath()
else:
# This is a temp movement
explanation_value = None
if explanation_value is None:
explanation_relative_url = None
else:
# get the enclosing delivery for this cell or line
if hasattr(explanation_value, 'getExplanationValue') :
explanation_value = explanation_value.getExplanationValue()
explanation_relative_url = explanation_value.getRelativeUrl()
return explanation_relative_url
......@@ -856,3 +856,15 @@ class Delivery(XMLObject, ImmobilisationDelivery):
"""
pass
def getBuilderList(self):
"""Returns appropriate builder list."""
return self._getTypeBasedMethod('getBuilderList')()
def getRuleReference(self):
"""Returns an appropriate rule reference."""
method = self._getTypeBasedMethod('getRuleReference')
if method is not None:
return method()
else:
raise 'SimulationError', '%s_getRuleReference script is missing.' \
% self.getPortalType()
......@@ -27,16 +27,10 @@
##############################################################################
from AccessControl import ClassSecurityInfo
from Products.CMFCore.utils import getToolByName
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Acquisition import aq_base, aq_parent, aq_inner, aq_acquire
from Products.ERP5 import MovementGroup
from Products.ERP5Type.Utils import convertToUpperCase
from Products.ERP5Type import Permissions, PropertySheet
from Products.ERP5.Document.OrderBuilder import OrderBuilder
from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
from zLOG import LOG
class SelectMethodError(Exception): pass
class SelectMovementError(Exception): pass
......@@ -149,7 +143,8 @@ class DeliveryBuilder(OrderBuilder):
def _setDeliveryMovementProperties(self, delivery_movement,
simulation_movement, property_dict,
update_existing_movement=0):
update_existing_movement=0,
force_update=0):
"""
Initialize or update delivery movement properties.
Set delivery ratio on simulation movement.
......@@ -159,116 +154,168 @@ class DeliveryBuilder(OrderBuilder):
OrderBuilder._setDeliveryMovementProperties(
self, delivery_movement,
simulation_movement, property_dict,
update_existing_movement=update_existing_movement)
# Check if simulation movement is not already linked to a existing
# movement
if simulation_movement.getDeliveryValue() is not None:
raise SelectMovementError,\
"simulation_movement '%s' must not be selected !" %\
simulation_movement.getRelativeUrl()
# Update simulation movement
update_existing_movement=update_existing_movement,
force_update=force_update)
simulation_movement.edit(delivery_value=delivery_movement)
# Simulation consistency propagation
security.declareProtected(Permissions.ModifyPortalContent,
'updateFromSimulation')
def updateFromSimulation(self, delivery_relative_url, create_new_delivery=1):
def updateFromSimulation(self, delivery_relative_url, **kw):
"""
Update all lines of this transaction based on movements in the
simulation related to this transaction.
"""
updateFromSimulation = UnrestrictedMethod(self._updateFromSimulation)
return updateFromSimulation(delivery_relative_url,
create_new_delivery=create_new_delivery)
# We have to get a delivery, else, raise a Error
delivery = self.getPortalObject().restrictedTraverse(delivery_relative_url)
divergence_to_adopt_list = delivery.getDivergenceList()
return self.solveDivergence(
delivery_relative_url,
divergence_to_adopt_list=divergence_to_adopt_list)
def solveDeliveryGroupDivergence(self, *args, **kw):
"""
solve each divergence according to users decision (accept, adopt
or do nothing).
"""
solveDeliveryGroupDivergence = UnrestrictedMethod(self._solveDeliveryGroupDivergence)
return solveDeliveryGroupDivergence(*args, **kw)
def _solveDeliveryGroupDivergence(self, delivery_relative_url,
property_dict=None, comment=None):
if property_dict in (None, {}):
return
delivery = self.getPortalObject().restrictedTraverse(delivery_relative_url)
delivery.edit(comment=comment, **property_dict)
# Try to remove existing properties/categories from Movements that
# should exist on Deliveries.
for movement in delivery.getMovementList():
for prop in property_dict.keys():
# XXX The following should be implemented in better way.
if movement.hasProperty(prop):
try:
# for Property
movement._delProperty(prop)
except AttributeError:
# for Category
movement.setProperty(prop, None)
divergence_to_accept_list = []
for divergence in delivery.getDivergenceList():
if divergence.getProperty('tested_property') not in property_dict.keys():
continue
divergence_to_accept_list.append(divergence)
self._solveDivergence(delivery_relative_url,
divergence_to_accept_list=divergence_to_accept_list)
def solveDivergence(self, *args, **kw):
"""
solve each divergence according to users decision (accept, adopt
or do nothing).
"""
solveDivergence = UnrestrictedMethod(self._solveDivergence)
return solveDivergence(*args, **kw)
def _updateFromSimulation(self, delivery_relative_url, create_new_delivery=1):
def _solveDivergence(self, delivery_relative_url,
divergence_to_accept_list=None,
divergence_to_adopt_list=None,
**kw):
# We have to get a delivery, else, raise a Error
delivery = self.getPortalObject().restrictedTraverse(delivery_relative_url)
delivery_uid = delivery.getUid()
if divergence_to_accept_list is None:
divergence_to_accept_list = []
if divergence_to_adopt_list is None:
divergence_to_adopt_list = []
if not len(divergence_to_accept_list) and \
not len(divergence_to_adopt_list):
return
# First, we update simulation movements according to
# divergence_to_accept_list.
if len(divergence_to_accept_list):
solver_script = delivery._getTypeBasedMethod('acceptDecision',
'Delivery_acceptDecision')
solver_script(divergence_to_accept_list)
# Then, we update delivery/line/cell from simulation movements
# according to divergence_to_adopt_list.
if not len(divergence_to_adopt_list):
return
# Select
movement_list = delivery.getMovementList()
simulation_movement_list = []
for movement in delivery.getMovementList():
for movement in movement_list:
movement.edit(quantity=0)
for simulation_movement in movement.getDeliveryRelatedValueList(
portal_type="Simulation Movement"):
simulation_movement.setDelivery(None)
simulation_movement_list.append(simulation_movement)
# Collect
root_group = self.collectMovement(simulation_movement_list)
# Update delivery
rejected_movement_list = self._deliveryUpdateGroupProcessing(
delivery,
root_group)
for sim_mvt in root_group.getMovementList():
sim_mvt.immediateReindexObject()
# Store the good quantity value on delivery line
for movement in delivery.getMovementList():
total_quantity = 0
sim_mvt_list = movement.getDeliveryRelatedValueList(
portal_type="Simulation Movement")
for simulation_movement in sim_mvt_list:
total_quantity += simulation_movement.getQuantity()
if total_quantity != 0:
for simulation_movement in sim_mvt_list:
quantity = simulation_movement.getQuantity()
#simulation_movement.setDeliveryRatio(quantity/total_quantity)
simulation_movement.edit(delivery_ratio=quantity/total_quantity)
else:
if len(sim_mvt_list) != 0:
# Distribute equally ratio to all movement
mvt_ratio = 1 / len(sim_mvt_list)
for simulation_movement in sim_mvt_list:
#simulation_movement.setDeliveryRatio(mvt_ratio)
simulation_movement.edit(delivery_ratio=mvt_ratio)
movement.edit(quantity=total_quantity)
# To update the divergence status, the simulation movements
# must be reindexed, and then the delivery must be touched
path_list = []
# Launch delivery creation
if (create_new_delivery == 1) and\
(rejected_movement_list != []):
movement_relative_url_list = []
for movement in rejected_movement_list:
# XXX FIXME Not very generic...
if movement.__class__.__name__ == "FakeMovement":
movement_relative_url_list.extend(
[x.getRelativeUrl() for x in movement.getMovementList()])
# Build
portal = self.getPortalObject()
delivery_module = getattr(portal, self.getDeliveryModule())
delivery_to_update_list = [delivery]
self._resetUpdated()
delivery_list = self._deliveryGroupProcessing(
delivery_module,
root_group,
self.getDeliveryMovementGroupList(),
delivery_to_update_list=delivery_to_update_list,
divergence_list=divergence_to_adopt_list,
force_update=1)
# Then, we should re-apply quantity divergence according to 'Do
# nothing' quanity divergence list because all quantity are already
# calculated in adopt prevision phase.
quantity_dict = {}
for divergence in delivery.getDivergenceList():
if divergence.getProperty('divergence_scope') != 'quantity' or \
divergence in divergence_to_accept_list or \
divergence in divergence_to_adopt_list:
continue
s_m = divergence.getProperty('simulation_movement')
delivery_movement = s_m.getDeliveryValue()
quantity_gap = divergence.getProperty('decision_value') - \
divergence.getProperty('prevision_value')
delivery_movement.setQuantity(delivery_movement.getQuantity() + \
quantity_gap)
quantity_dict[s_m] = \
divergence.getProperty('decision_value')
# Finally, recalculate delivery_ratio
#
# Here, created/updated movements are not indexed yet. So we try to
# gather delivery relations from simulation movements.
delivery_dict = {}
for s_m in simulation_movement_list:
delivery_path = s_m.getDelivery()
delivery_dict[delivery_path] = \
delivery_dict.get(delivery_path, []) + \
[s_m]
for s_m_list_per_movement in delivery_dict.values():
total_quantity = reduce(lambda x, y: \
x + quantity_dict.get(y, y.getQuantity()),
s_m_list_per_movement, 0)
if total_quantity != 0.0:
for s_m in s_m_list_per_movement:
delivery_ratio = quantity_dict.get(s_m, s_m.getQuantity()) \
/ total_quantity
s_m.edit(delivery_ratio=delivery_ratio)
else:
movement_relative_url_list.append(movement.getRelativeUrl())
self.activate(activity="SQLQueue").build(
movement_relative_url_list=movement_relative_url_list)
for s_m in s_m_list_per_movement:
delivery_ratio = 1.0 / len(s_m_list_per_movement)
s_m.edit(delivery_ratio=delivery_ratio)
def _deliveryUpdateGroupProcessing(self, delivery, movement_group):
"""
Update delivery movement
"""
rejected_movement_list = []
property_dict = {}
for collect_order in self.getDeliveryCollectOrderList():
for group in movement_group.getGroupList()[1:]:
rejected_movement_list.extend(group.getMovementList())
movement_group = movement_group.getGroupList()[0]
property_dict.update(movement_group.getGroupEditDict())
# Put properties on delivery
delivery.edit(**property_dict)
# Then, reconnect simulation to delivery line
for group in movement_group.getGroupList():
self._deliveryLineGroupProcessing(
delivery,
group,
self.getDeliveryLineCollectOrderList()[1:],
{},
update_requested=1)
return rejected_movement_list
# Call afterscript if new deliveries are created
new_delivery_list = [x for x in delivery_list if x != delivery]
self.callAfterBuildingScript(new_delivery_list, simulation_movement_list)
return delivery_list
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5Type.XMLObject import XMLObject
from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet
class MovementGroup(XMLObject):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Movement Group'
# Declarative security
security = ClassSecurityInfo()
security.declareObjectProtected(Permissions.AccessContentsInformation)
# Default Properties
property_sheets = (
PropertySheet.Base,
PropertySheet.SimpleItem,
PropertySheet.CategoryCore,
PropertySheet.MovementGroup,
PropertySheet.SortIndex,
)
def _getPropertyDict(self, movement, **kw):
# This method should be defined in sub classes.
raise NotImplementedError
def _separate(self, movement_list):
# By default, we separate movements by _getPropertyDict() values.
# You can override this method in each MovementGroup class.
tmp_dict = {}
for movement in movement_list:
property_dict = self._getPropertyDict(movement)
# XXX it can be wrong. we need a good way to get hash value, or
# we should compare for all pairs.
key = repr(property_dict)
if tmp_dict.has_key(key):
tmp_dict[key][0].append(movement)
else:
tmp_dict[key] = [[movement], property_dict]
return tmp_dict.values()
def separate(self, movement_list):
# We sort group of simulation movements by their IDs.
# DO NOT OVERRIDE THIS METHOD. Override _separate() instead.
return sorted([[sorted(x[0], lambda a,b:cmp(a.getId(), b.getId())), x[1]] \
for x in self._separate(movement_list)],
lambda a,b: cmp(a[0][0].getId(), b[0][0].getId()))
def test(self, object, property_dict, **kw):
raise NotImplementedError
def testToUpdate(self, object, property_dict, **kw):
if self.test(object, property_dict, **kw):
return True, property_dict
else:
return False, property_dict
......@@ -31,10 +31,11 @@ from Products.ERP5Type import Permissions, PropertySheet
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5.Document.Predicate import Predicate
from Products.ERP5.Document.Amount import Amount
from Products.ERP5 import MovementGroup
from Products.ERP5Type.Utils import convertToUpperCase
from Products.ERP5.MovementGroup import MovementGroupNode
from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
from Products.ERP5Type.CopySupport import CopyError, tryMethodCallWithTemporaryPermission
from DateTime import DateTime
from zLOG import LOG
from Acquisition import aq_parent, aq_inner
class CollectError(Exception): pass
class MatrixError(Exception): pass
......@@ -213,55 +214,64 @@ class OrderBuilder(XMLObject, Amount, Predicate):
movement_list.append(movement)
return movement_list
def getCollectOrderList(self):
"""
Simply method to get the 3 collect order lists define on a
DeliveryBuilder
"""
return self.getDeliveryCollectOrderList()+\
self.getDeliveryLineCollectOrderList()+\
self.getDeliveryCellCollectOrderList()
def collectMovement(self, movement_list):
"""
group movements in the way we want. Thanks to this method, we are able
to retrieve movement classed by order, resource, criterion,....
movement_list : the list of movement wich we want to group
check_list : the list of classes used to group movements. The order
class_list : the list of classes used to group movements. The order
of the list is important and determines by what we will
group movement first
Typically, check_list is :
[DateMovementGroup,PathMovementGroup,...]
"""
class_list = [getattr(MovementGroup, x) \
for x in self.getCollectOrderList()]
last_line_class_name = self.getDeliveryCollectOrderList()[-1]
movement_group_list = self.getMovementGroupList()
last_line_movement_group = self.getDeliveryMovementGroupList()[-1]
separate_method_name_list = self.getDeliveryCellSeparateOrderList([])
my_root_group = MovementGroup.RootMovementGroup(
class_list,
last_line_class_name=last_line_class_name,
separate_method_name_list=separate_method_name_list)
for movement in movement_list:
my_root_group.append(movement)
my_root_group = MovementGroupNode(
separate_method_name_list=separate_method_name_list,
movement_group_list=movement_group_list,
last_line_movement_group=last_line_movement_group)
my_root_group.append(movement_list)
return my_root_group
def testObjectProperties(self, instance, property_dict):
"""
Test instance properties.
"""
result = 1
for key in property_dict:
getter_name = 'get%s' % convertToUpperCase(key)
getter = getattr(instance, getter_name, None)
if getter is not None:
value = getter()
if value != property_dict[key]:
result = 0
break
def _testToUpdate(self, instance, movement_group_list,
divergence_list):
result = True
new_property_dict = {}
for movement_group in movement_group_list:
tmp_result, tmp_property_dict = movement_group.testToUpdate(
instance, divergence_list)
if tmp_result == False:
result = tmp_result
new_property_dict.update(tmp_property_dict)
return result, new_property_dict
def _findUpdatableObject(self, instance_list, movement_group_list,
divergence_list):
instance = None
property_dict = {}
if not len(instance_list):
for movement_group in movement_group_list:
property_dict.update(movement_group.getGroupEditDict())
else:
result = 0
# we want to check the original first.
# the original is the delivery of the last (bottom) movement group.
try:
original = movement_group_list[-1].getMovementList()[0].getDeliveryValue()
except AttributeError:
original = None
if original is not None:
original_id = original.getId()
instance_list.sort(lambda a,b:cmp(b.getId()==original_id,
a.getId()==original_id))
for instance_to_update in instance_list:
result, property_dict = self._testToUpdate(
instance_to_update, movement_group_list, divergence_list)
if result == True:
instance = instance_to_update
break
return result
return instance, property_dict
def buildDeliveryList(self, movement_group, delivery_relative_url_list=None,
movement_list=None,**kw):
......@@ -286,60 +296,98 @@ class OrderBuilder(XMLObject, Amount, Predicate):
delivery_to_update_list.extend([sql_delivery.getObject() \
for sql_delivery \
in to_update_delivery_sql_list])
# We do not want to update the same object more than twice in one
# _deliveryGroupProcessing().
self._resetUpdated()
delivery_list = self._deliveryGroupProcessing(
delivery_module,
movement_group,
self.getDeliveryCollectOrderList(),
{},
self.getDeliveryMovementGroupList(),
delivery_to_update_list=delivery_to_update_list,
**kw)
return delivery_list
def _deliveryGroupProcessing(self, delivery_module, movement_group,
collect_order_list, property_dict,
collect_order_list, movement_group_list=None,
delivery_to_update_list=None,
activate_kw=None,**kw):
divergence_list=None,
activate_kw=None, force_update=0, **kw):
"""
Build empty delivery from a list of movement
"""
if movement_group_list is None:
movement_group_list = []
if divergence_list is None:
divergence_list = []
# do not use 'append' or '+=' because they are destructive.
movement_group_list = movement_group_list + [movement_group]
# Parameter initialization
if delivery_to_update_list is None:
delivery_to_update_list = []
delivery_list = []
# Get current properties from current movement group
# And fill property_dict
property_dict.update(movement_group.getGroupEditDict())
if collect_order_list != []:
if len(collect_order_list):
# Get sorted movement for each delivery
for group in movement_group.getGroupList():
new_delivery_list = self._deliveryGroupProcessing(
delivery_module,
group,
collect_order_list[1:],
property_dict.copy(),
movement_group_list=movement_group_list,
delivery_to_update_list=delivery_to_update_list,
activate_kw=activate_kw)
divergence_list=divergence_list,
activate_kw=activate_kw,
force_update=force_update, **kw)
delivery_list.extend(new_delivery_list)
force_update = 0
else:
# Test if we can update a existing delivery, or if we need to create
# a new one
delivery = None
for delivery_to_update in delivery_to_update_list:
if self.testObjectProperties(delivery_to_update, property_dict):
# Check if delivery has the correct portal_type
if delivery_to_update.getPortalType() ==\
self.getDeliveryPortalType():
delivery = delivery_to_update
break
delivery_to_update_list = [
x for x in delivery_to_update_list \
if x.getPortalType() == self.getDeliveryPortalType() and \
not self._isUpdated(x, 'delivery')]
delivery, property_dict = self._findUpdatableObject(
delivery_to_update_list, movement_group_list,
divergence_list)
# if all deliveries are rejected in case of update, we update the
# first one.
if force_update and delivery is None and len(delivery_to_update_list):
delivery = delivery_to_update_list[0]
if delivery is None:
# Create delivery
try:
old_delivery = movement_group.getMovementList()[0].getDeliveryValue()
except AttributeError:
old_delivery = None
if old_delivery is None:
# from scratch
new_delivery_id = str(delivery_module.generateNewId())
delivery = delivery_module.newContent(
portal_type=self.getDeliveryPortalType(),
id=new_delivery_id,
created_by_builder=1,
activate_kw=activate_kw,**kw)
else:
# from duplicated original delivery
old_delivery = old_delivery.getExplanationValue()
cp = tryMethodCallWithTemporaryPermission(
delivery_module, 'Copy or Move',
lambda parent, *ids:
parent._duplicate(parent.manage_copyObjects(ids=ids))[0],
(delivery_module, old_delivery.getId()), {}, CopyError)
delivery = delivery_module[cp['new_id']]
# delete non-split movements
keep_id_list = [y.getDeliveryValue().getId() for y in \
movement_group.getMovementList()]
delete_id_list = [x.getId() for x in delivery.contentValues() \
if x.getId() not in keep_id_list]
delivery.deleteContent(delete_id_list)
# Put properties on delivery
self._setUpdated(delivery, 'delivery')
if property_dict:
delivery.edit(**property_dict)
# Then, create delivery line
......@@ -347,48 +395,82 @@ class OrderBuilder(XMLObject, Amount, Predicate):
self._deliveryLineGroupProcessing(
delivery,
group,
self.getDeliveryLineCollectOrderList()[1:],
{},
activate_kw=activate_kw,**kw)
self.getDeliveryLineMovementGroupList()[1:],
divergence_list=divergence_list,
activate_kw=activate_kw,
force_update=force_update)
delivery_list.append(delivery)
return delivery_list
def _deliveryLineGroupProcessing(self, delivery, movement_group,
collect_order_list, property_dict,
activate_kw=None,**kw):
collect_order_list, movement_group_list=None,
divergence_list=None,
activate_kw=None, force_update=0, **kw):
"""
Build delivery line from a list of movement on a delivery
"""
# Get current properties from current movement group
# And fill property_dict
property_dict.update(movement_group.getGroupEditDict())
if collect_order_list != []:
if movement_group_list is None:
movement_group_list = []
if divergence_list is None:
divergence_list = []
# do not use 'append' or '+=' because they are destructive.
movement_group_list = movement_group_list + [movement_group]
if len(collect_order_list):
# Get sorted movement for each delivery line
for group in movement_group.getGroupList():
self._deliveryLineGroupProcessing(
delivery, group, collect_order_list[1:], property_dict.copy(),
activate_kw=activate_kw)
delivery, group, collect_order_list[1:],
movement_group_list=movement_group_list,
divergence_list=divergence_list,
activate_kw=activate_kw, force_update=force_update)
else:
# Test if we can update an existing line, or if we need to create a new
# one
delivery_line = None
update_existing_line = 0
for delivery_line_to_update in delivery.contentValues(
filter={'portal_type':self.getDeliveryLinePortalType()}):
if self.testObjectProperties(delivery_line_to_update, property_dict):
delivery_line = delivery_line_to_update
delivery_line_to_update_list = [x for x in delivery.contentValues(
portal_type=self.getDeliveryLinePortalType()) if \
not self._isUpdated(x, 'line')]
delivery_line, property_dict = self._findUpdatableObject(
delivery_line_to_update_list, movement_group_list,
divergence_list)
if delivery_line is not None:
update_existing_line = 1
break
if delivery_line is None:
else:
# Create delivery line
update_existing_line = 0
try:
old_delivery_line = self._searchUpByPortalType(
movement_group.getMovementList()[0].getDeliveryValue(),
self.getDeliveryLinePortalType())
except AttributeError:
old_delivery_line = None
if old_delivery_line is None:
# from scratch
new_delivery_line_id = str(delivery.generateNewId())
delivery_line = delivery.newContent(
portal_type=self.getDeliveryLinePortalType(),
id=new_delivery_line_id,
variation_category_list=[],
activate_kw=activate_kw)
else:
# from duplicated original line
cp = tryMethodCallWithTemporaryPermission(
delivery, 'Copy or Move',
lambda parent, *ids:
parent._duplicate(parent.manage_copyObjects(ids=ids))[0],
(delivery, old_delivery_line.getId()), {}, CopyError)
delivery_line = delivery[cp['new_id']]
# delete non-split movements
keep_id_list = [y.getDeliveryValue().getId() for y in \
movement_group.getMovementList()]
delete_id_list = [x.getId() for x in delivery_line.contentValues() \
if x.getId() not in keep_id_list]
delivery_line.deleteContent(delete_id_list)
# Put properties on delivery line
self._setUpdated(delivery_line, 'line')
if property_dict:
delivery_line.edit(**property_dict)
# Update variation category list on line
line_variation_category_list = delivery_line.getVariationCategoryList()
for movement in movement_group.getMovementList():
......@@ -410,40 +492,50 @@ class OrderBuilder(XMLObject, Amount, Predicate):
self._deliveryCellGroupProcessing(
delivery_line,
group,
self.getDeliveryCellCollectOrderList()[1:],
{},
self.getDeliveryCellMovementGroupList()[1:],
update_existing_line=update_existing_line,
activate_kw=activate_kw)
divergence_list=divergence_list,
activate_kw=activate_kw,
force_update=force_update)
else:
self._deliveryCellGroupProcessing(
delivery_line,
movement_group,
[],
{},
update_existing_line=update_existing_line,
activate_kw=activate_kw)
divergence_list=divergence_list,
activate_kw=activate_kw,
force_update=force_update)
def _deliveryCellGroupProcessing(self, delivery_line, movement_group,
collect_order_list, property_dict,
update_existing_line=0,activate_kw=None):
collect_order_list, movement_group_list=None,
update_existing_line=0,
divergence_list=None,
activate_kw=None, force_update=0):
"""
Build delivery cell from a list of movement on a delivery line
or complete delivery line
"""
# Get current properties from current movement group
# And fill property_dict
property_dict.update(movement_group.getGroupEditDict())
if collect_order_list != []:
if movement_group_list is None:
movement_group_list = []
if divergence_list is None:
divergence_list = []
# do not use 'append' or '+=' because they are destructive.
movement_group_list = movement_group_list + [movement_group]
if len(collect_order_list):
# Get sorted movement for each delivery line
for group in movement_group.getGroupList():
self._deliveryCellGroupProcessing(
delivery_line,
group,
collect_order_list[1:],
property_dict.copy(),
movement_group_list=movement_group_list,
update_existing_line=update_existing_line,
activate_kw=activate_kw)
divergence_list=divergence_list,
activate_kw=activate_kw,
force_update=force_update)
else:
movement_list = movement_group.getMovementList()
if len(movement_list) != 1:
......@@ -460,32 +552,58 @@ class OrderBuilder(XMLObject, Amount, Predicate):
# Decision can only be made with line matrix range:
# because matrix range can be empty even if line variation category
# list is not empty
property_dict = {}
if list(delivery_line.getCellKeyList(base_id=base_id)) == []:
# update line
object_to_update = delivery_line
if self.testObjectProperties(delivery_line, property_dict):
if update_existing_line == 1:
# We update a initialized line
if self._isUpdated(delivery_line, 'cell'):
object_to_update_list = []
else:
object_to_update_list = [delivery_line]
object_to_update, property_dict = self._findUpdatableObject(
object_to_update_list, movement_group_list,
divergence_list)
if object_to_update is not None:
update_existing_movement = 1
else:
for cell_key in delivery_line.getCellKeyList(base_id=base_id):
if delivery_line.hasCell(base_id=base_id, *cell_key):
cell = delivery_line.getCell(base_id=base_id, *cell_key)
if self.testObjectProperties(cell, property_dict):
object_to_update = delivery_line
else:
object_to_update_list = [
delivery_line.getCell(base_id=base_id, *cell_key) for cell_key in \
delivery_line.getCellKeyList(base_id=base_id) \
if delivery_line.hasCell(base_id=base_id, *cell_key)]
object_to_update, property_dict = self._findUpdatableObject(
object_to_update_list, movement_group_list,
divergence_list)
if object_to_update is not None:
# We update a existing cell
# delivery_ratio of new related movement to this cell
# must be updated to 0.
update_existing_movement = 1
object_to_update = cell
break
if object_to_update is None:
# create a new cell
cell_key = movement.getVariationCategoryList(
omit_optional_variation=1)
if not delivery_line.hasCell(base_id=base_id, *cell_key):
try:
old_cell = movement_group.getMovementList()[0].getDeliveryValue()
except AttributeError:
old_cell = None
if old_cell is None:
# from scratch
cell = delivery_line.newCell(base_id=base_id, \
portal_type=self.getDeliveryCellPortalType(),
activate_kw=activate_kw,*cell_key)
else:
# from duplicated original line
cp = tryMethodCallWithTemporaryPermission(
delivery_line, 'Copy or Move',
lambda parent, *ids:
parent._duplicate(parent.manage_copyObjects(ids=ids))[0],
(delivery_line, old_cell.getId()), {}, CopyError)
cell = delivery_line[cp['new_id']]
vcl = movement.getVariationCategoryList()
cell._edit(category_list=vcl,
# XXX hardcoded value
......@@ -497,25 +615,27 @@ class OrderBuilder(XMLObject, Amount, Predicate):
else:
raise MatrixError, 'Cell: %s already exists on %s' % \
(str(cell_key), str(delivery_line))
self._setUpdated(object_to_update, 'cell')
self._setDeliveryMovementProperties(
object_to_update, movement, property_dict,
update_existing_movement=update_existing_movement)
update_existing_movement=update_existing_movement,
force_update=force_update)
def _setDeliveryMovementProperties(self, delivery_movement,
simulation_movement, property_dict,
update_existing_movement=0):
update_existing_movement=0,
force_update=0):
"""
Initialize or update delivery movement properties.
Set delivery ratio on simulation movement.
"""
if update_existing_movement == 1:
if update_existing_movement == 1 and not force_update:
# Important.
# Attributes of object_to_update must not be modified here.
# Because we can not change values that user modified.
# Delivery will probably diverge now, but this is not the job of
# DeliveryBuilder to resolve such problem.
# Use Solver instead.
#simulation_movement.setDeliveryRatio(0)
simulation_movement.edit(delivery_ratio=0)
else:
# Now, only 1 movement is possible, so copy from this movement
......@@ -524,13 +644,14 @@ class OrderBuilder(XMLObject, Amount, Predicate):
property_dict['price'] = simulation_movement.getPrice()
# Update properties on object (quantity, price...)
delivery_movement._edit(force_update=1, **property_dict)
#simulation_movement.setDeliveryRatio(1)
simulation_movement.edit(delivery_ratio=1)
def callAfterBuildingScript(self, delivery_list, movement_list=None, **kw):
"""
Call script on each delivery built
"""
if not len(delivery_list):
return
# Parameter initialization
if movement_list is None:
movement_list = []
......@@ -561,3 +682,64 @@ class OrderBuilder(XMLObject, Amount, Predicate):
script(related_simulation_movement_path_list=related_simulation_movement_path_list)
else:
script()
security.declareProtected(Permissions.AccessContentsInformation,
'getMovementGroupList')
def getMovementGroupList(self, portal_type=None, collect_order_group=None,
**kw):
"""
Return a list of movement groups sorted by collect order group and index.
"""
category_index_dict = {}
for i in self.getPortalObject().portal_categories.collect_order_group.contentValues():
category_index_dict[i.getId()] = i.getIntIndex()
def sort_movement_group(a, b):
return cmp(category_index_dict.get(a.getCollectOrderGroup()),
category_index_dict.get(b.getCollectOrderGroup())) or \
cmp(a.getIntIndex(), b.getIntIndex())
if portal_type is None:
portal_type = self.getPortalMovementGroupTypeList()
movement_group_list = [x for x in self.contentValues(filter={'portal_type': portal_type}) \
if collect_order_group is None or collect_order_group == x.getCollectOrderGroup()]
return sorted(movement_group_list, sort_movement_group)
# XXX category name is hardcoded.
def getDeliveryMovementGroupList(self, **kw):
return self.getMovementGroupList(collect_order_group='delivery')
# XXX category name is hardcoded.
def getDeliveryLineMovementGroupList(self, **kw):
return self.getMovementGroupList(collect_order_group='line')
# XXX category name is hardcoded.
def getDeliveryCellMovementGroupList(self, **kw):
return self.getMovementGroupList(collect_order_group='cell')
def _searchUpByPortalType(self, obj, portal_type):
limit_portal_type = self.getPortalObject().getPortalType()
while obj is not None:
obj_portal_type = obj.getPortalType()
if obj_portal_type == portal_type:
break
elif obj_portal_type == limit_portal_type:
obj = None
break
else:
obj = aq_parent(aq_inner(obj))
return obj
def _isUpdated(self, obj, level):
tv = getTransactionalVariable(self)
return level in tv['builder_processed_list'].get(obj, [])
def _setUpdated(self, obj, level):
tv = getTransactionalVariable(self)
if tv.get('builder_processed_list', None) is None:
self._resetUpdated()
tv['builder_processed_list'][obj] = \
tv['builder_processed_list'].get(obj, []) + [level]
def _resetUpdated(self):
tv = getTransactionalVariable(self)
tv['builder_processed_list'] = {}
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class OrderMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
This movement group is used to group movements that come from the same
Order.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Order Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
order_relative_url = self._getOrderRelativeUrl(movement)
property_dict['causality'] = order_relative_url
return property_dict
def testToUpdate(self, movement, property_dict, **kw):
if movement.getCausality() == property_dict['causality']:
return True, property_dict
else:
return False, property_dict
def _getOrderRelativeUrl(self, movement):
if hasattr(movement, 'getRootAppliedRule'):
# This is a simulation movement
order_relative_url = movement.getRootAppliedRule().getCausality(
portal_type=movement.getPortalOrderTypeList())
if order_relative_url is None:
# In some cases (ex. DeliveryRule), there is no order
# we may consider a PackingList as the order in the OrderGroup
order_relative_url = movement.getRootAppliedRule().getCausality(
portal_type=movement.getPortalDeliveryTypeList())
else:
# This is a temp movement
order_relative_url = None
return order_relative_url
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class ParentExplanationMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
This movement group was first created for building Sale invoices
transactions lines. We need to put accounting lines in the same
invoices than invoice lines.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Parent Explanation Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
parent_explanation_value = movement.getParentExplanationValue()
property_dict['parent_explanation_value'] = parent_explanation_value
return property_dict
def test(self, object, property_dict, property_list=None, **kw):
return object.getParentExplanationValue() == \
property_dict['parent_explanation_value']
......@@ -30,7 +30,7 @@ from AccessControl import ClassSecurityInfo
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type.ObjectMessage import ObjectMessage
from Products.ERP5Type.DivergenceMessage import DivergenceMessage
from Products.ERP5Type import Permissions, PropertySheet, Interface
class PropertyDivergenceTester(XMLObject):
......@@ -75,26 +75,20 @@ class PropertyDivergenceTester(XMLObject):
divergence_message_list = []
tested_property = self.getTestedPropertyList()
# Get the list of solvers callable in this case
solver_script_list = self.getSolverScriptList()
if solver_script_list is None:
solver_script_list = []
solver_script_list = self._splitStringList(solver_script_list)
delivery_mvt = simulation_movement.getDeliveryValue()
for tested_property_id, tested_property_title in \
self._splitStringList(tested_property):
delivery_mvt_property = delivery_mvt.getProperty(tested_property_id)
simulation_mvt_property = simulation_movement.getProperty(tested_property_id)
if delivery_mvt_property != simulation_mvt_property:
message = ObjectMessage(
message = DivergenceMessage(
divergence_scope='property',
object_relative_url=delivery_mvt.getRelativeUrl(),
simulation_movement=simulation_movement,
decision_value=delivery_mvt_property ,
prevision_value=simulation_mvt_property,
tested_property=tested_property_id,
message=tested_property_title,
solver_script_list=solver_script_list
)
divergence_message_list.append(message)
......
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class PropertyMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
This movement group is used to group movements that have the same
properties (eg. start_date, stop_date, etc.).
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Property Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
for prop in self.getTestedPropertyList():
property_dict[prop] = movement.getProperty(prop, None)
return property_dict
def test(self, object, property_dict, property_list=None, **kw):
if property_list not in (None, []):
target_property_list = [x for x in self.getTestedPropertyList() \
if x in property_list]
else:
target_property_list = self.getTestedPropertyList()
for prop in target_property_list:
if property_dict[prop] != object.getProperty(prop, None):
return False
return True
......@@ -28,7 +28,7 @@
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.ObjectMessage import ObjectMessage
from Products.ERP5Type.DivergenceMessage import DivergenceMessage
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5.Document.PropertyDivergenceTester import \
PropertyDivergenceTester
......@@ -73,17 +73,13 @@ class QuantityDivergenceTester(PropertyDivergenceTester):
quantity = simulation_movement.getCorrectedQuantity()
d_error = simulation_movement.getDeliveryError()
solver_script_list = self.getSolverScriptList()
if solver_script_list is None:
solver_script_list = []
message = ObjectMessage(object_relative_url= delivery.getRelativeUrl(),
message = DivergenceMessage(object_relative_url= delivery.getRelativeUrl(),
divergence_scope='quantity',
simulation_movement = simulation_movement,
decision_value = d_quantity ,
prevision_value = quantity,
tested_property='quantity',
message='Quantity',
solver_script_list=self._splitStringList(solver_script_list)
)
......
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class QuantitySignMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Quantity Sign Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
quantity = movement.getQuantity()
property_dict['quantity_sign'] = cmp(quantity, 0)
return property_dict
def _separate(self, movement_list):
tmp_list = [[], [], []] # -1:minus, 0:zero, 1:plus
for movement in movement_list:
tmp_list[cmp(movement.getQuantity(), 0)].append(movement)
if len(tmp_list[1]):
if len(tmp_list[-1]):
return[
[tmp_list[1]+tmp_list[0], {'quantity_sign':1}],
[tmp_list[-1], {'quantity_sign':-1}],
]
else:
return[
[tmp_list[1]+tmp_list[0], {'quantity_sign':1}],
]
elif len(tmp_list[-1]):
return[
[tmp_list[-1]+tmp_list[0], {'quantity_sign':-1}],
]
else:
return [
[tmp_list[0], {'quantity_sign':0}],
]
def test(self, object, property_dict, **kw):
quantity = object.getQuantity()
sign1 = property_dict['quantity_sign']
sign2 = cmp(quantity, 0)
if sign2 == 0:
return 1
if sign1 == 0:
property_dict['quantity_sign'] = sign2
return 1
return sign1 == sign2
def testToUpdate(self, object, property_dict, **kw):
if object.getQuantitySign() == property_dict['quantity_sign']:
return True, property_dict
else:
return False, property_dict
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class RequirementMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Requirement Movement Group'
def _getPropertyDict(self, movement, **kw):
return {'requirement':self._getRequirementList(movement)}
def testToUpdate(self, movement, property_dict, **kw):
# We can always update
return True, property_dict
def _getRequirementList(self, movement):
order_value = movement.getOrderValue()
requirement_list = []
if order_value is not None:
if 'Line' in order_value.getPortalType():
requirement_list = order_value.getRequirementList()
return requirement_list
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class RootAppliedRuleCausalityMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
This movement group is used to group movements whose root apply rule
has the same causality.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Root Applied Rule Causality Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
root_causality_value = self._getRootCausalityValue(movement)
property_dict['root_causality_value_list'] = [root_causality_value]
return property_dict
def testToUpdate(self, movement, property_dict, **kw):
# We can always update
return True, property_dict
def _getRootCausalityValue(self, movement):
""" Get the causality value of the root applied rule for a movement """
return movement.getRootAppliedRule().getCausalityValue()
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class SplitMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Split Movement Group'
def _getPropertyDict(self, movement, **kw):
return {}
def testToUpdate(self, object, property_dict, **kw):
return True, property_dict
def _separate(self, movement_list):
return [[[movement], {}] for movement in movement_list]
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class TitleMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Title Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
property_dict['title'] = self._getTitle(movement)
return property_dict
def test(self, object, property_dict, **kw):
return self._getTitle(object) == property_dict['title']
def testToUpdate(self, object, property_dict, **kw):
# If title is different, we want to update existing object instead
# of creating a new one.
return True, property_dict
def _getTitle(self,movement):
order_value = movement.getOrderValue()
title = ''
if order_value is not None:
if "Line" in order_value.getPortalType():
title = order_value.getTitle()
elif "Cell" in order_value.getPortalType():
title = order_value.getParentValue().getTitle()
return title
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.ERP5.Document.MovementGroup import MovementGroup
class VariantMovementGroup(MovementGroup):
"""
The purpose of MovementGroup is to define how movements are grouped,
and how values are updated from simulation movements.
"""
meta_type = 'ERP5 Movement Group'
portal_type = 'Variant Movement Group'
def _getPropertyDict(self, movement, **kw):
property_dict = {}
category_list = movement.getVariationCategoryList()
if category_list is None:
category_list = []
category_list.sort()
property_dict['variation_category_list'] = category_list
return property_dict
def test(self, object, property_dict, **kw):
category_list = object.getVariationCategoryList()
if category_list is None:
category_list = []
category_list.sort()
return property_dict['variation_category_list'] == category_list
......@@ -1013,6 +1013,14 @@ class ERP5Site(FolderMixIn, CMFSite):
"""
return self._getPortalGroupedTypeList('inventory_movement')
security.declareProtected(Permissions.AccessContentsInformation,
'getPortalMovementGroupTypeList')
def getPortalMovementGroupTypeList(self):
"""
Return movement group types.
"""
return self._getPortalGroupedTypeList('movement_group')
security.declareProtected(Permissions.AccessContentsInformation,
'getDefaultModuleId')
def getDefaultModuleId(self, portal_type, default=MARKER):
......
......@@ -28,10 +28,6 @@
#
##############################################################################
"""
Define in this file all classes intended to group every kind of movement
"""
from warnings import warn
from Products.PythonScripts.Utility import allow_class
......@@ -39,72 +35,54 @@ class MovementRejected(Exception) : pass
class FakeMovementError(Exception) : pass
class MovementGroupError(Exception) : pass
class RootMovementGroup:
def __init__(self, class_list, movement=None, last_line_class_name=None,
separate_method_name_list=[]):
self._nested_class = None
self.setNestedClass(class_list=class_list)
class MovementGroupNode:
def __init__(self, movement_group_list=None, movement_list=None,
last_line_movement_group=None,
separate_method_name_list=[], movement_group=None):
self._movement_list = []
self._group_list = []
self._class_list = class_list
self._last_line_class_name = last_line_class_name
self._movement_group = movement_group
self._movement_group_list = movement_group_list
self._last_line_movement_group = last_line_movement_group
self._separate_method_name_list = separate_method_name_list
if movement is not None :
self.append(movement)
def getNestedClass(self, class_list):
if len(class_list)>0:
return class_list[0]
return None
def setNestedClass(self,class_list):
"""
This sets an appropriate nested class.
"""
self._nested_class = self.getNestedClass(class_list)
def _appendGroup(self, movement):
nested_instance = self._nested_class(
movement=movement,
class_list=self._class_list[1:],
last_line_class_name=self._last_line_class_name,
if movement_list is not None :
self.append(movement_list)
def _appendGroup(self, movement_list, property_dict):
nested_instance = MovementGroupNode(
movement_group=self._movement_group_list[0],
movement_group_list=self._movement_group_list[1:],
last_line_movement_group=self._last_line_movement_group,
separate_method_name_list=self._separate_method_name_list)
nested_instance.setGroupEdit(**property_dict)
split_movement_list = nested_instance.append(movement_list)
self._group_list.append(nested_instance)
def append(self, movement):
is_movement_in_group = 0
for group in self.getGroupList():
if group.test(movement) :
try:
group.append(movement)
is_movement_in_group = 1
break
except MovementRejected:
if self.__class__.__name__ == self._last_line_class_name:
pass
return split_movement_list
def append(self, movement_list):
all_split_movement_list = []
if len(self._movement_group_list):
for separate_movement_list, property_dict in \
self._movement_group_list[0].separate(movement_list):
split_movement_list = self._appendGroup(separate_movement_list,
property_dict)
if len(split_movement_list):
if self._movement_group == self._last_line_movement_group:
self.append(split_movement_list)
else:
raise MovementRejected
if is_movement_in_group == 0 :
if self._nested_class is not None:
self._appendGroup(movement)
all_split_movement_list.extend(split_movement_list)
else:
# We are on a node group
movement_list = self.getMovementList()
if len(movement_list) > 0:
self._movement_list.append(movement_list[0])
for movement in movement_list[1:]:
# We have a conflict here, because it is forbidden to have
# 2 movements on the same node group
tmp_result = self._separate(movement)
self._movement_list, split_movement_list = tmp_result
if split_movement_list != [None]:
self._movement_list, split_movement = tmp_result
if split_movement is not None:
# We rejected a movement, we need to put it on another line
# Or to create a new one
raise MovementRejected
else:
# No movement on this node, we can add it
self._movement_list.append(movement)
all_split_movement_list.append(split_movement)
return all_split_movement_list
def getGroupList(self):
return self._group_list
......@@ -125,7 +103,11 @@ class RootMovementGroup:
"""
Get property dict for the futur created object
"""
return getattr(self, '_property_dict', {})
property_dict = getattr(self, '_property_dict', {}).copy()
for key in property_dict.keys():
if key.startswith('_'):
del(property_dict[key])
return property_dict
def getMovementList(self):
"""
......@@ -140,6 +122,59 @@ class RootMovementGroup:
movement_list.extend(group.getMovementList())
return movement_list
def getMovement(self):
"""
Return first movement of the movement list in the current group
"""
movement = self.getMovementList()[0]
if movement.__class__.__name__ == 'FakeMovement':
return movement.getMovementList()[0]
else:
return movement
def testToUpdate(self, movement, divergence_list):
# Try to check if movement is updatable or not.
#
# 1. if Divergence has no scope: update anyway.
# 2. if Divergence has a scope: update in related Movement Group only.
#
# return value is:
# [updatable? (True/False), property dict for update]
if self._movement_group is not None:
property_list = []
if len(divergence_list):
divergence_scope = self._movement_group.getDivergenceScope()
if divergence_scope is None:
# Update anyway (eg. CausalityAssignmentMovementGroup etc.)
pass
else:
related_divergence_list = [
x for x in divergence_list \
if divergence_scope == x.divergence_scope and \
self.hasSimulationMovement(x.simulation_movement)]
if not len(related_divergence_list):
return True, {}
property_list = [x.tested_property for x in related_divergence_list]
return self._movement_group.testToUpdate(movement, self._property_dict,
property_list=property_list)
else:
return True, {}
def getDivergenceScope(self):
if self._movement_group is not None:
return self._movement_group.getDivergenceScope()
else:
return None
def hasSimulationMovement(self, simulation_movement):
for movement in self.getMovementList():
if movement.__class__.__name__ == "FakeMovement":
if simulation_movement in movement.getMovementList():
return True
elif simulation_movement == movement:
return True
return False
def _separate(self, movement):
"""
Separate 2 movements on a node group
......@@ -165,7 +200,7 @@ class RootMovementGroup:
else:
break
return [new_stored_movement], [rejected_movement]
return [new_stored_movement], rejected_movement
########################################################
# Separate methods
......@@ -210,6 +245,7 @@ class RootMovementGroup:
def __repr__(self):
repr_str = '<%s object at 0x%x\n' % (self.__class__.__name__, id(self))
repr_str += ' _movement_group = %r,\n' % self._movement_group
if getattr(self, '_property_dict', None) is not None:
repr_str += ' _property_dict = %r,\n' % self._property_dict
if self._movement_list:
......@@ -218,558 +254,11 @@ class RootMovementGroup:
repr_str += ' _group_list = [\n%s]>' % (
'\n'.join([' %s' % x for x in (',\n'.join([repr(i) for i in self._group_list])).split('\n')]))
else:
repr_str += ' _last_line_class_name = %r,\n' % self._last_line_class_name
repr_str += ' _separate_method_name_list = %r,\n' % self._separate_method_name_list
repr_str += ' _group_list = []>'
repr_str += ' _last_line_movement_group = %r,\n' % self._last_line_movement_group
repr_str += ' _separate_method_name_list = %r>' % self._separate_method_name_list
return repr_str
allow_class(RootMovementGroup)
class OrderMovementGroup(RootMovementGroup):
""" Group movements that comes from the same Order. """
def __init__(self,movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
order_value = self._getOrderValue(movement)
if order_value is None:
order_relative_url = None
else:
# get the id of the enclosing delivery
# for this cell or line
order_relative_url = order_value.getRelativeUrl()
self.order = order_relative_url
self.setGroupEdit(causality_value=order_value)
def _getOrderValue(self, movement):
if hasattr(movement, 'getRootAppliedRule'):
# This is a simulation movement
order_value = movement.getRootAppliedRule().getCausalityValue(
portal_type=movement.getPortalOrderTypeList())
if order_value is None:
# In some cases (ex. DeliveryRule), there is no order
# we may consider a PackingList as the order in the OrderGroup
order_value = movement.getRootAppliedRule().getCausalityValue(
portal_type=movement.getPortalDeliveryTypeList())
else:
# This is a temp movement
order_value = None
return order_value
def test(self,movement):
order_value = self._getOrderValue(movement)
if order_value is None:
order_relative_url = None
else:
# get the id of the enclosing delivery
# for this cell or line
order_relative_url = order_value.getRelativeUrl()
return order_relative_url == self.order
allow_class(OrderMovementGroup)
class CausalityAssignmentMovementGroup(RootMovementGroup):
"""
This movement group is used in order to define the causality
on lines and cells.
"""
def addCausalityToEdit(self, movement):
parent = movement
# Go upper into the simulation tree in order to find an order link
while parent.getOrderValue() is None and not(parent.isRootAppliedRule()):
parent = parent.getParentValue()
order_movement = parent.getOrderValue()
if order_movement is not None:
causality = self.getGroupEditDict().get('causality_list', [])
order_movement_url = order_movement.getRelativeUrl()
if order_movement_url not in causality:
causality.append(order_movement_url)
self.setGroupEdit(causality_list=causality)
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
self.addCausalityToEdit(movement)
def test(self, movement):
self.addCausalityToEdit(movement)
return 1
allow_class(CausalityAssignmentMovementGroup)
class CausalityMovementGroup(RootMovementGroup):
""" TODO: docstring """
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
explanation_relative_url = self._getExplanationRelativeUrl(movement)
self.explanation = explanation_relative_url
def _getExplanationRelativeUrl(self, movement):
""" Get the order value for a movement """
if hasattr(movement, 'getParentValue'):
# This is a simulation movement
if movement.getParentValue() != movement.getRootAppliedRule() :
# get the explanation of parent movement if we have not been
# created by the root applied rule.
movement = movement.getParentValue().getParentValue()
explanation_value = movement.getExplanationValue()
if explanation_value is None:
raise ValueError, 'No explanation for movement %s' % movement.getPath()
else:
# This is a temp movement
explanation_value = None
if explanation_value is None:
explanation_relative_url = None
else:
# get the enclosing delivery for this cell or line
if hasattr(explanation_value, 'getExplanationValue') :
explanation_value = explanation_value.getExplanationValue()
explanation_relative_url = explanation_value.getRelativeUrl()
return explanation_relative_url
def test(self,movement):
return self._getExplanationRelativeUrl(movement) == self.explanation
allow_class(CausalityMovementGroup)
class RootAppliedRuleCausalityMovementGroup(RootMovementGroup):
""" Group movement whose root apply rules have the same causality."""
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
explanation_relative_url = self._getExplanationRelativeUrl(movement)
self.explanation = explanation_relative_url
explanation_value = movement.getPortalObject().unrestrictedTraverse(
explanation_relative_url)
self.setGroupEdit(
root_causality_value_list = [explanation_value]
)
def _getExplanationRelativeUrl(self, movement):
""" TODO: docstring; method name is bad; variables names are bad """
root_applied_rule = movement.getRootAppliedRule()
explanation_value = root_applied_rule.getCausalityValue()
explanation_relative_url = None
if explanation_value is not None:
explanation_relative_url = explanation_value.getRelativeUrl()
return explanation_relative_url
def test(self,movement):
return self._getExplanationRelativeUrl(movement) == self.explanation
allow_class(RootAppliedRuleCausalityMovementGroup)
# Again add a strange movement group, it was first created for building
# Sale invoices transactions lines. We need to put accounting lines
# in the same invoices than invoice lines.
class ParentExplanationMovementGroup(RootMovementGroup):
""" TODO: docstring """
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
explanation_value = self._getParentExplanationValue(movement)
self.explanation_value = explanation_value
self.setGroupEdit(
parent_explanation_value = explanation_value
)
def _getParentExplanationValue(self, movement):
""" Get the order value for a movement """
return movement.getParentValue().getExplanationValue()
def test(self,movement):
return self._getParentExplanationValue(movement) == self.explanation_value
allow_class(ParentExplanationMovementGroup)
class PathMovementGroup(RootMovementGroup):
""" Group movements that have the same source and the same destination."""
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
source_list = movement.getSourceList()
destination_list = movement.getDestinationList()
source_list.sort()
destination_list.sort()
self.source_list = source_list
self.destination_list = destination_list
self.setGroupEdit(
source_list=source_list,
destination_list=destination_list
)
def test(self, movement):
source_list = movement.getSourceList()
destination_list = movement.getDestinationList()
source_list.sort()
destination_list.sort()
return source_list == self.source_list and \
destination_list == self.destination_list
allow_class(PathMovementGroup)
class SectionPathMovementGroup(RootMovementGroup):
""" Groups movement that have the same source_section and
destination_section."""
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
source_section_list = movement.getSourceSectionList()
destination_section_list = movement.getDestinationSectionList()
source_section_list.sort()
destination_section_list.sort()
self.source_section_list = source_section_list
self.destination_section_list = destination_section_list
self.setGroupEdit(
source_section_list=source_section_list,
destination_section_list=destination_section_list
)
def test(self, movement):
source_section_list = movement.getSourceSectionList()
destination_section_list = movement.getDestinationSectionList()
source_section_list.sort()
destination_section_list.sort()
return source_section_list == self.source_section_list and \
destination_section_list == self.destination_section_list
allow_class(SectionPathMovementGroup)
class PaymentPathMovementGroup(RootMovementGroup):
""" Groups movement that have the same source_payment and
destination_payment"""
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
source_payment_list = movement.getSourcePaymentList()
destination_payment_list = movement.getDestinationPaymentList()
source_payment_list.sort()
destination_payment_list.sort()
self.source_payment_list = source_payment_list
self.destination_payment_list = destination_payment_list
self.setGroupEdit(
source_payment_list=source_payment_list,
destination_payment_list=destination_payment_list
)
def test(self, movement):
source_payment_list = movement.getSourcePaymentList()
destination_payment_list = movement.getDestinationPaymentList()
source_payment_list.sort()
destination_payment_list.sort()
return source_payment_list == self.source_payment_list and \
destination_payment_list == self.destination_payment_list
class AdministrationPathMovementGroup(RootMovementGroup):
""" Groups movement that have the same source_administration and
destination_administration."""
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
source_administration_list = movement.getSourceAdministrationList()
destination_administration_list = movement.getDestinationAdministrationList()
source_administration_list.sort()
destination_administration_list.sort()
self.source_administration_list = source_administration_list
self.destination_administration_list = destination_administration_list
self.setGroupEdit(
source_administration_list=source_administration_list,
destination_administration_list=destination_administration_list
)
def test(self, movement):
source_administration_list = movement.getSourceAdministrationList()
destination_administration_list = movement.getDestinationAdministrationList()
source_administration_list.sort()
destination_administration_list.sort()
return source_administration_list == self.source_administration_list and \
destination_administration_list == self.destination_administration_list
class DecisionPathMovementGroup(RootMovementGroup):
""" Groups movement that have the same source_decision and
destination_decision."""
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
source_decision_list = movement.getSourceDecisionList()
destination_decision_list = movement.getDestinationDecisionList()
source_decision_list.sort()
destination_decision_list.sort()
self.source_decision_list = source_decision_list
self.destination_decision_list = destination_decision_list
self.setGroupEdit(
source_decision_list=source_decision_list,
destination_decision_list=destination_decision_list
)
def test(self, movement):
source_decision_list = movement.getSourceDecisionList()
destination_decision_list = movement.getDestinationDecisionList()
source_decision_list.sort()
destination_decision_list.sort()
return source_decision_list == self.source_decision_list and \
destination_decision_list == self.destination_decision_list
class TradePathMovementGroup(RootMovementGroup):
"""
Group movements that have the same source_trade and the same
destination_trade.
"""
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
source_trade_list = movement.getSourceTradeList()
destination_trade_list = movement.getDestinationTradeList()
source_trade_list.sort()
destination_trade_list.sort()
self.source_trade_list = source_trade_list
self.destination_trade_list = destination_trade_list
self.setGroupEdit(
source_trade_list=source_trade_list,
destination_trade_list=destination_trade_list
)
def test(self, movement):
source_trade_list = movement.getSourceTradeList()
destination_trade_list = movement.getDestinationTradeList()
source_trade_list.sort()
destination_trade_list.sort()
return source_trade_list == self.source_trade_list and \
destination_trade_list == self.destination_trade_list
allow_class(TradePathMovementGroup)
# XXX Naming issue ?
class QuantitySignMovementGroup(RootMovementGroup):
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
quantity = movement.getQuantity()
self.sign = cmp(quantity, 0)
self.setGroupEdit(quantity_sign=self.sign)
def test(self, movement):
quantity = movement.getQuantity()
sign = cmp(quantity, 0)
if sign == 0:
return 1
if self.sign == 0:
self.sign = sign
self.setGroupEdit(quantity_sign=self.sign)
return 1
return self.sign == sign
allow_class(QuantitySignMovementGroup)
class DateMovementGroup(RootMovementGroup):
""" Group movements that have exactly the same dates. """
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
self.start_date = movement.getStartDate()
self.stop_date = movement.getStopDate()
self.setGroupEdit(
start_date=movement.getStartDate(),
stop_date=movement.getStopDate()
)
def test(self,movement):
if movement.getStartDate() == self.start_date and \
movement.getStopDate() == self.stop_date :
return 1
else :
return 0
allow_class(DateMovementGroup)
class CriterionMovementGroup(RootMovementGroup):
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
if hasattr(movement, 'getGroupCriterion'):
self.criterion = movement.getGroupCriterion()
else:
self.criterion = None
def test(self,movement):
# we must have the same criterion
if hasattr(movement, 'getGroupCriterion'):
criterion = movement.getGroupCriterion()
else:
criterion = None
return self.criterion == criterion
allow_class(CriterionMovementGroup)
class PropertyMovementGroup(RootMovementGroup):
"""Abstract movement group for movement that share the same value for
a property.
"""
_property = [] # Subclasses must override this
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
if self._property is PropertyMovementGroup._property :
raise ValueError, 'PropertyMovementGroup: property not defined'
assert isinstance(self._property, str)
prop_value = movement.getProperty(self._property)
self._property_dict = { self._property : prop_value }
self.setGroupEdit(
**self._property_dict
)
def test(self, movement) :
return self._property_dict[self._property] == \
movement.getProperty(self._property)
class ResourceMovementGroup(RootMovementGroup):
""" Group movements that have the same resource. """
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
self.resource = movement.getResource()
self.setGroupEdit(
resource_value=movement.getResourceValue()
)
def test(self, movement):
return movement.getResource() == self.resource
allow_class(ResourceMovementGroup)
class SplitResourceMovementGroup(RootMovementGroup):
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
self.resource = movement.getResource()
def test(self, movement):
return movement.getResource() == self.resource
allow_class(SplitResourceMovementGroup)
class BaseVariantMovementGroup(RootMovementGroup):
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
self.base_category_list = movement.getVariationBaseCategoryList()
if self.base_category_list is None:
self.base_category_list = []
self.base_category_list.sort()
def test(self,movement):
movement_base_category_list = movement.getVariationBaseCategoryList()
if movement_base_category_list is None:
movement_base_category_list = []
movement_base_category_list.sort()
return movement_base_category_list == self.base_category_list
allow_class(BaseVariantMovementGroup)
class VariantMovementGroup(RootMovementGroup):
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
self.category_list = movement.getVariationCategoryList()
if self.category_list is None:
self.category_list = []
self.category_list.sort()
self.setGroupEdit(
variation_category_list=self.category_list
)
def test(self,movement):
movement_category_list = movement.getVariationCategoryList()
if movement_category_list is None:
movement_category_list = []
movement_category_list.sort()
return movement_category_list == self.category_list
allow_class(VariantMovementGroup)
class CategoryMovementGroup(RootMovementGroup):
"""
This seems to be a useless class
"""
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
self.category_list = list(movement.getCategoryList())
if self.category_list is None:
self.category_list = []
self.category_list.sort()
def test(self,movement):
movement_category_list = list(movement.getCategoryList())
if movement_category_list is None:
movement_category_list = []
movement_category_list.sort()
return movement_category_list == self.category_list
allow_class(CategoryMovementGroup)
class OptionMovementGroup(RootMovementGroup):
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
option_base_category_list = movement.getPortalOptionBaseCategoryList()
self.option_category_list = movement.getVariationCategoryList(
base_category_list=option_base_category_list)
if self.option_category_list is None:
self.option_category_list = []
self.option_category_list.sort()
# XXX This is very bad, but no choice today.
self.setGroupEdit(industrial_phase_list = self.option_category_list)
def test(self,movement):
option_base_category_list = movement.getPortalOptionBaseCategoryList()
movement_option_category_list = movement.getVariationCategoryList(
base_category_list=option_base_category_list)
if movement_option_category_list is None:
movement_option_category_list = []
movement_option_category_list.sort()
return movement_option_category_list == self.option_category_list
allow_class(OptionMovementGroup)
class VariationPropertyMovementGroup(RootMovementGroup):
"""
Compare variation property dict of movement.
"""
def __init__(self, movement, **kw):
"""
Store variation property dict of the first movement.
"""
RootMovementGroup.__init__(self, movement=movement, **kw)
self.property_dict = movement.getVariationPropertyDict()
self.setGroupEdit(
variation_property_dict = self.property_dict
)
def test(self, movement):
"""
Check if the movement can be inserted in the group.
"""
identity = 0
variation_property_dict = movement.getVariationPropertyDict()
variation_property_list = variation_property_dict.keys()
if len(variation_property_list) == len(self.property_dict):
# Same number of property. Good point.
for variation_property in variation_property_list:
try:
if variation_property_dict[variation_property] != \
self.property_dict[variation_property]:
# Value is not the same for both movements
break
except KeyError:
# Key is not common to both movements
break
else:
identity = 1
return identity
allow_class(VariationPropertyMovementGroup)
allow_class(MovementGroupNode)
class FakeMovement:
"""
......@@ -795,20 +284,11 @@ class FakeMovement:
# But, if DeliveryBuilder is well configured, this can never append ;)
reference_variation_category_list = movement_list[0].\
getVariationCategoryList()
error_raising_needed = 0
reference_variation_category_list.sort()
for movement in movement_list[1:]:
variation_category_list = movement.getVariationCategoryList()
if len(variation_category_list) !=\
len(reference_variation_category_list):
error_raising_needed = 1
break
for variation_category in variation_category_list:
if variation_category not in reference_variation_category_list:
error_raising_needed = 1
break
if error_raising_needed == 1:
variation_category_list.sort()
if variation_category_list != reference_variation_category_list:
raise ValueError, "FakeMovement not well used."
def append(self, movement):
......@@ -1000,28 +480,108 @@ class FakeMovement:
raise FakeMovementError,\
"Could not call edit on Fakemovement with parameters: %r" % key
class TitleMovementGroup(RootMovementGroup):
def __repr__(self):
repr_str = '<%s object at 0x%x for %r' % (self.__class__.__name__,
id(self),
self.getMovementList())
return repr_str
def getTitle(self,movement):
order_value = movement.getOrderValue()
title = ''
if order_value is not None:
if "Line" in order_value.getPortalType():
title = order_value.getTitle()
elif "Cell" in order_value.getPortalType():
title = order_value.getParentValue().getTitle()
return title
# The following classes are not ported to Document/XxxxMovementGroup.py yet.
class RootMovementGroup(MovementGroupNode):
pass
class CriterionMovementGroup(RootMovementGroup):
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
title = self.getTitle(movement)
self.title = title
if hasattr(movement, 'getGroupCriterion'):
self.criterion = movement.getGroupCriterion()
else:
self.criterion = None
def test(self,movement):
# we must have the same criterion
if hasattr(movement, 'getGroupCriterion'):
criterion = movement.getGroupCriterion()
else:
criterion = None
return self.criterion == criterion
allow_class(CriterionMovementGroup)
class SplitResourceMovementGroup(RootMovementGroup):
def __init__(self, movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
self.resource = movement.getResource()
def test(self, movement):
return movement.getResource() == self.resource
allow_class(SplitResourceMovementGroup)
class OptionMovementGroup(RootMovementGroup):
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
option_base_category_list = movement.getPortalOptionBaseCategoryList()
self.option_category_list = movement.getVariationCategoryList(
base_category_list=option_base_category_list)
if self.option_category_list is None:
self.option_category_list = []
self.option_category_list.sort()
# XXX This is very bad, but no choice today.
self.setGroupEdit(industrial_phase_list = self.option_category_list)
def test(self,movement):
option_base_category_list = movement.getPortalOptionBaseCategoryList()
movement_option_category_list = movement.getVariationCategoryList(
base_category_list=option_base_category_list)
if movement_option_category_list is None:
movement_option_category_list = []
movement_option_category_list.sort()
return movement_option_category_list == self.option_category_list
allow_class(OptionMovementGroup)
class VariationPropertyMovementGroup(RootMovementGroup):
"""
Compare variation property dict of movement.
"""
def __init__(self, movement, **kw):
"""
Store variation property dict of the first movement.
"""
RootMovementGroup.__init__(self, movement=movement, **kw)
self.property_dict = movement.getVariationPropertyDict()
self.setGroupEdit(
title=title
variation_property_dict = self.property_dict
)
def test(self,movement):
return self.getTitle(movement) == self.title
def test(self, movement):
"""
Check if the movement can be inserted in the group.
"""
identity = 0
variation_property_dict = movement.getVariationPropertyDict()
variation_property_list = variation_property_dict.keys()
if len(variation_property_list) == len(self.property_dict):
# Same number of property. Good point.
for variation_property in variation_property_list:
try:
if variation_property_dict[variation_property] != \
self.property_dict[variation_property]:
# Value is not the same for both movements
break
except KeyError:
# Key is not common to both movements
break
else:
identity = 1
return identity
allow_class(VariationPropertyMovementGroup)
# XXX This should not be here
# I (seb) have commited this because movement groups are not
......@@ -1051,52 +611,6 @@ class IntIndexMovementGroup(RootMovementGroup):
allow_class(IntIndexMovementGroup)
# XXX This should not be here
# I (seb) have commited this because movement groups are not
# yet configurable through the zope web interface
class DecisionMovementGroup(PropertyMovementGroup):
_property = 'decision'
allow_class(DecisionMovementGroup)
# XXX This should not be here
# I (seb) have commited this because movement groups are not
# yet configurable through the zope web interface
class BrandMovementGroup(PropertyMovementGroup):
_property = 'brand'
allow_class(BrandMovementGroup)
class AggregateMovementGroup(RootMovementGroup):
def getAggregateList(self,movement):
aggregate_list = movement.getAggregateList()
aggregate_list.sort()
return aggregate_list
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
aggregate = self.getAggregateList(movement)
self.aggregate_list = aggregate
self.setGroupEdit(
aggregate_list=aggregate
)
def test(self,movement):
if self.getAggregateList(movement) == self.aggregate_list:
return 1
else :
return 0
allow_class(AggregateMovementGroup)
class SplitMovementGroup(RootMovementGroup):
def __init__(self,movement, **kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
def test(self, movement):
return 0
allow_class(SplitMovementGroup)
class TransformationAppliedRuleCausalityMovementGroup(RootMovementGroup):
"""
Groups movement that comes from simulation movement that shares the
......@@ -1126,6 +640,8 @@ class TransformationAppliedRuleCausalityMovementGroup(RootMovementGroup):
allow_class(TransformationAppliedRuleCausalityMovementGroup)
class ParentExplanationMovementGroup(RootMovementGroup): pass
class ParentExplanationCausalityMovementGroup(ParentExplanationMovementGroup):
"""
Like ParentExplanationMovementGroup, and set the causality.
......@@ -1137,71 +653,3 @@ class ParentExplanationCausalityMovementGroup(ParentExplanationMovementGroup):
)
allow_class(ParentExplanationCausalityMovementGroup)
class SourceMovementGroup(PropertyMovementGroup):
"""Group movements having the same source."""
_property = 'source'
allow_class(SourceMovementGroup)
class DestinationMovementGroup(PropertyMovementGroup):
"""Group movements having the same destination."""
_property = 'destination'
allow_class(DestinationMovementGroup)
class DestinationDecisionMovementGroup(PropertyMovementGroup):
"""Group movements having the same destination decision."""
_property = 'destination_decision'
allow_class(DestinationDecisionMovementGroup)
class SourceProjectMovementGroup(PropertyMovementGroup):
""" Group movements that have the same source project ."""
_property = 'source_project'
allow_class(SourceProjectMovementGroup)
class RequirementMovementGroup(RootMovementGroup):
"""
Group movements that have same Requirement.
"""
def getRequirementList(self,movement):
order_value = movement.getOrderValue()
requirement_list = []
if order_value is not None:
if "Line" in order_value.getPortalType():
requirement_list = order_value.getRequirementList()
return requirement_list
def __init__(self,movement,**kw):
RootMovementGroup.__init__(self, movement=movement, **kw)
requirement_list = self.getRequirementList(movement)
self.requirement_list = requirement_list
self.setGroupEdit(
requirement=requirement_list
)
def test(self,movement):
return self.getRequirementList(movement) == self.requirement_list
class BaseContributionMovementGroup(PropertyMovementGroup):
""" Group movements that have the same base contributions."""
_property = 'base_contribution_list'
class BaseApplicationMovementGroup(PropertyMovementGroup):
""" Group movements that have the same base applications."""
_property = 'base_application_list'
class PriceCurrencyMovementGroup(PropertyMovementGroup):
""" Group movements that have the same price currency."""
_property = 'price_currency'
class QuantityUnitMovementGroup(PropertyMovementGroup):
""" Group movements that have the same quantity unit."""
_property = 'quantity_unit'
allow_class(QuantityUnitMovementGroup)
class PaymentModeMovementGroup(PropertyMovementGroup):
""" Group movements that have the same payment mode."""
_property = 'payment_mode'
class ColourMovementGroup(PropertyMovementGroup):
""" Group movements that have the same colour category."""
_property = 'colour'
......@@ -38,9 +38,4 @@ class DivergenceTester:
'type' : 'lines',
'default' : (),
'mode' : 'w' },
{ 'id' : 'solver_script',
'description' : 'List of scripts used to call the solvers',
'type' : 'string',
'multivalued' : 1,
'mode' : 'w' },
)
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
class MovementGroup:
_properties = (
{ 'id' : 'tested_property',
'description' : 'Property used to Test',
'type' : 'lines',
'default' : (),
'mode' : 'w' },
)
_categories = ('collect_order_group', 'divergence_scope',)
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
##############################################################################
from TargetSolver import TargetSolver
class CopyAndPropagate(TargetSolver):
"""
This solver will copy properties and calculate quantities for
specified divergence list.
"""
def solveMovement(self, movement):
"""
Solves a movement.
"""
movement_relative_url = movement.getRelativeUrl()
for divergence in self.divergence_list:
if movement_relative_url == divergence.getProperty('object_relative_url'):
self._acceptDecision(divergence)
def _acceptDecision(self, divergence):
"""
Accept decision according to movement group
"""
scope = divergence.getProperty('divergence_scope')
simulation_movement = divergence.getProperty('simulation_movement')
delivery = simulation_movement.getDeliveryValue()
value_dict = {}
quantity_ratio = None
if scope == 'quantity':
new_quantity = simulation_movement.getDeliveryQuantity() * \
simulation_movement.getDeliveryRatio()
old_quantity = simulation_movement.getQuantity()
quantity_ratio = 0
if old_quantity not in (None, 0.0):
quantity_ratio = new_quantity / old_quantity
elif scope == 'category':
property_id = divergence.getProperty('tested_property')
new_value_list = delivery.getPropertyList(property_id)
value_dict[property_id] = new_value_list
else: # otherwise we assume that scope is 'property'
property_id = divergence.getProperty('tested_property')
new_value = delivery.getProperty(property_id)
value_dict[property_id] = new_value
self._solveRecursively(simulation_movement,
quantity_ratio=quantity_ratio,
value_dict=value_dict)
def _solveRecursively(self, simulation_movement, is_last_movement=1,
quantity_ratio=None, value_dict=None):
"""
Update value of the current simulation movement, and update
his parent movement.
"""
delivery = simulation_movement.getDeliveryValue()
if is_last_movement and quantity_ratio is not None:
delivery_quantity = delivery.getQuantity()
delivery_ratio = simulation_movement.getDeliveryRatio()
quantity = simulation_movement.getQuantity() * quantity_ratio
quantity_error = delivery_quantity * delivery_ratio - quantity
value_dict['delivery_error'] = quantity_error
value_dict['quantity'] = quantity
simulation_movement.edit(**value_dict)
applied_rule = simulation_movement.getParentValue()
parent_movement = applied_rule.getParentValue()
if parent_movement.getPortalType() == 'Simulation Movement' and \
not parent_movement.isFrozen():
# backtrack to the parent movement while it is not frozen
self._solveRecursively(parent_movement, is_last_movement=0,
quantity_ratio=quantity_ratio,
value_dict=value_dict)
......@@ -1436,10 +1436,23 @@ class TestSaleInvoiceMixin(TestInvoiceMixin,
"""
portal = self.getPortal()
builder = portal.portal_deliveries.sale_invoice_transaction_builder
previous_list = builder.getDeliveryCollectOrderList()
new_list = [x for x in previous_list if x != 'DateMovementGroup']
new_list.append('ParentExplanationMovementGroup')
builder.setDeliveryCollectOrderList(new_list)
delivery_movement_group_list = builder.getDeliveryMovementGroupList()
uf = self.getPortal().acl_users
uf._doAddUser('admin', '', ['Manager'], [])
user = uf.getUserById('admin').__of__(uf)
newSecurityManager(None, user)
for movement_group in delivery_movement_group_list:
if movement_group.getPortalType() == 'Property Movement Group':
# it contains 'start_date' and 'stop_date' only, so we remove
# movement group itself.
builder.deleteContent(movement_group.getId())
builder.newContent(
portal_type = 'Parent Explanation Movement Group',
collect_order_group='delivery',
int_index=len(delivery_movement_group_list)+1
)
user = uf.getUserById('test_invoice_user').__of__(uf)
newSecurityManager(None, user)
def stepEditInvoice(self, sequence=None, sequence_list=None, **kw):
"""Edit the current invoice, to trigger updateAppliedRule."""
......@@ -1582,8 +1595,27 @@ class TestSaleInvoiceMixin(TestInvoiceMixin,
split and defer at the invoice level
"""
invoice = sequence.get('invoice')
invoice.portal_workflow.doActionFor(invoice,'split_prevision_action',
start_date=self.datetime + 15, stop_date=self.datetime + 25)
kw = {'listbox':[
{'listbox_key':line.getRelativeUrl(),
'choice':'SplitAndDefer'} for line in invoice.getMovementList()]}
self.portal.portal_workflow.doActionFor(
invoice,
'split_and_defer_action',
start_date=self.datetime + 15,
stop_date=self.datetime + 25,
**kw)
pass
def stepUnifyStartDateWithDecisionInvoice(self, sequence=None,
sequence_list=None):
invoice = sequence.get('invoice')
self._solveDeliveryGroupDivergence(invoice, 'start_date',
invoice.getRelativeUrl())
def stepAcceptDecisionQuantityInvoice(self,sequence=None, sequence_list=None):
invoice = sequence.get('invoice')
print repr(invoice.getDivergenceList())
self._solveDivergence(invoice, 'quantity', 'accept')
def stepAcceptDecisionInvoice(self, sequence=None, sequence_list=None,
**kw):
......@@ -1772,7 +1804,7 @@ class TestSaleInvoice(TestSaleInvoiceMixin, ERP5TypeTestCase):
"""Tests for sale invoice.
"""
RUN_ALL_TESTS = 1
quiet = 1
quiet = 0
# fix inheritance
login = TestInvoiceMixin.login
......@@ -2038,7 +2070,7 @@ class TestSaleInvoice(TestSaleInvoiceMixin, ERP5TypeTestCase):
stepCheckInvoiceIsCalculating
stepTic
stepCheckInvoiceIsDiverged
stepAcceptDecisionInvoice
stepUnifyStartDateWithDecisionInvoice
stepTic
stepCheckInvoiceNotSplitted
......@@ -2079,7 +2111,7 @@ class TestSaleInvoice(TestSaleInvoiceMixin, ERP5TypeTestCase):
stepCheckInvoiceIsCalculating
stepTic
stepCheckInvoiceIsDiverged
stepAcceptDecisionInvoice
stepUnifyStartDateWithDecisionInvoice
stepTic
stepCheckInvoiceNotSplitted
......@@ -2111,7 +2143,7 @@ class TestSaleInvoice(TestSaleInvoiceMixin, ERP5TypeTestCase):
stepCheckPackingListIsCalculating
stepTic
stepCheckPackingListIsDiverged
stepAcceptDecisionPackingList
stepAcceptDecisionQuantity
stepTic
stepCheckPackingListIsSolved
stepCheckPackingListNotSplitted
......@@ -2181,7 +2213,7 @@ class TestSaleInvoice(TestSaleInvoiceMixin, ERP5TypeTestCase):
stepCheckPackingListIsCalculating
stepTic
stepCheckPackingListIsDiverged
stepAcceptDecisionPackingList
stepAcceptDecisionQuantity
stepTic
stepCheckPackingListIsSolved
stepCheckPackingListNotSplitted
......@@ -2205,8 +2237,10 @@ class TestSaleInvoice(TestSaleInvoiceMixin, ERP5TypeTestCase):
end_sequence = \
"""
stepCheckInvoiceIsDiverged
stepAcceptDecisionInvoice
stepAcceptDecisionQuantityInvoice
stepTic
stepCheckInvoiceIsNotDivergent
stepCheckInvoiceIsSolved
stepStartInvoice
stepTic
stepStopInvoice
......@@ -2300,7 +2334,7 @@ class TestSaleInvoice(TestSaleInvoiceMixin, ERP5TypeTestCase):
stepCheckInvoiceIsCalculating
stepTic
stepCheckInvoiceIsDiverged
stepAcceptDecisionInvoice
stepAcceptDecisionQuantityInvoice
stepTic
stepStartInvoice
stepTic
......@@ -2340,7 +2374,7 @@ class TestSaleInvoice(TestSaleInvoiceMixin, ERP5TypeTestCase):
stepCheckInvoiceIsCalculating
stepTic
stepCheckInvoiceIsDiverged
stepAcceptDecisionInvoice
stepAcceptDecisionQuantityInvoice
stepTic
stepStartInvoice
stepTic
......@@ -2464,7 +2498,7 @@ class TestSaleInvoice(TestSaleInvoiceMixin, ERP5TypeTestCase):
stepCheckInvoiceIsCalculating
stepTic
stepCheckInvoiceIsDiverged
stepAcceptDecisionInvoice
stepAcceptDecisionQuantityInvoice
stepTic
stepStartInvoice
stepTic
......@@ -2624,4 +2658,3 @@ def test_suite():
suite.addTest(unittest.makeSuite(TestSaleInvoice))
suite.addTest(unittest.makeSuite(TestPurchaseInvoice))
return suite
......@@ -57,6 +57,9 @@ class TestOrderMixin:
def getBusinessTemplateList(self):
"""
"""
return ('erp5_base','erp5_pdm', 'erp5_trade', 'erp5_apparel',
'erp5_accounting', 'erp5_invoicing',
'erp5_project', 'erp5_payroll', 'erp5_mrp')
return ('erp5_base','erp5_pdm', 'erp5_trade', 'erp5_apparel',)
def login(self, quiet=0, run=1):
......@@ -122,7 +125,7 @@ class TestOrderMixin:
resource_module = portal.getDefaultModule(self.resource_portal_type)
resource = resource_module.newContent(portal_type=self.resource_portal_type)
resource.edit(
title = "NotVariatedResource",
title = "NotVariatedResource%s" % resource.getId(),
industrial_phase_list=["phase1", "phase2"],
product_line = 'apparel'
)
......@@ -141,7 +144,7 @@ class TestOrderMixin:
resource_module = portal.getDefaultModule(self.resource_portal_type)
resource = resource_module.newContent(portal_type=self.resource_portal_type)
resource.edit(
title = "VariatedResource",
title = "VariatedResource%s" % resource.getId(),
industrial_phase_list=["phase1", "phase2"],
product_line = 'apparel'
)
......@@ -169,7 +172,7 @@ class TestOrderMixin:
sequence.edit( resource_list = resource_list )
def stepCreateOrganisation(self, sequence=None, sequence_list=None,
title='organisation', **kw):
title=None, **kw):
"""
Create a empty organisation
"""
......@@ -180,10 +183,11 @@ class TestOrderMixin:
portal_type=organisation_portal_type)
organisation = organisation_module.newContent( \
portal_type=organisation_portal_type)
organisation.edit(
title=title,
)
#sequence.edit(organisation=organisation)
if title is None:
organisation.edit(title='organisation%s' % organisation.getId())
sequence.edit(organisation=organisation)
else:
organisation.edit(title=title)
sequence.edit(**{title:organisation})
def stepCreateOrder(self,sequence=None, sequence_list=None, **kw):
......@@ -196,7 +200,7 @@ class TestOrderMixin:
order_module = portal.getDefaultModule(portal_type=self.order_portal_type)
order = order_module.newContent(portal_type=self.order_portal_type)
order.edit(
title = "Order",
title = "Order%s" % order.getId(),
start_date = self.datetime + 10,
stop_date = self.datetime + 20,
)
......
......@@ -51,12 +51,6 @@ class TestOrderBuilderMixin(TestOrderMixin):
packing_list_line_portal_type = 'Internal Packing List Line'
packing_list_cell_portal_type = 'Internal Packing List Cell'
delivery_cell_collect_order_tuple = ('VariantMovementGroup',)
delivery_collect_order_tuple = ('PathMovementGroup',
'SectionPathMovementGroup', 'DateMovementGroup')
delivery_line_collect_order_tuple = ('ResourceMovementGroup',
'BaseVariantMovementGroup')
# hardcoded values
order_builder_hardcoded_time_diff = 10.0
......@@ -109,15 +103,43 @@ class TestOrderBuilderMixin(TestOrderMixin):
delivery_line_portal_type = self.order_line_portal_type,
delivery_cell_portal_type = self.order_cell_portal_type,
delivery_collect_order = self.delivery_collect_order_tuple,
delivery_line_collect_order = self.delivery_line_collect_order_tuple,
delivery_cell_collect_order = self.delivery_cell_collect_order_tuple,
destination_value = organisation,
resource_portal_type = self.resource_portal_type,
)
order_builder.newContent(
portal_type = 'Category Movement Group',
collect_order_group='delivery',
tested_property=['source', 'destination',
'source_section', 'destination_section'],
int_index=1
)
order_builder.newContent(
portal_type = 'Property Movement Group',
collect_order_group='delivery',
tested_property=['start_date', 'stop_date'],
int_index=2
)
order_builder.newContent(
portal_type = 'Category Movement Group',
collect_order_group='line',
tested_property=['resource'],
int_index=1
)
order_builder.newContent(
portal_type = 'Base Variant Movement Group',
collect_order_group='line',
int_index=2
)
order_builder.newContent(
portal_type = 'Variant Movement Group',
collect_order_group='cell',
int_index=1
)
def stepCheckGeneratedDocumentListVariated(self, sequence=None, sequence_list=None,
**kw):
"""
......
......@@ -250,9 +250,16 @@ class TestPackingListMixin(TestOrderMixin):
Do the split and defer action
"""
packing_list = sequence.get('packing_list')
packing_list.portal_workflow.doActionFor(packing_list,'split_prevision_action',
kw = {'listbox':[
{'listbox_key':line.getRelativeUrl(),
'choice':'SplitAndDefer'} for line in packing_list.getMovementList() \
if line.isDivergent()]}
self.portal.portal_workflow.doActionFor(
packing_list,
'split_and_defer_action',
start_date=self.datetime + 15,
stop_date=self.datetime + 25)
stop_date=self.datetime + 25,
**kw)
def stepCheckPackingListSplitted(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -472,7 +479,7 @@ class TestPackingListMixin(TestOrderMixin):
"""
applied_rule = sequence.get('applied_rule')
simulation_line_list = applied_rule.objectValues()
self.assertEquals(len(simulation_line_list),1)
# self.assertEquals(len(simulation_line_list),1)
for simulation_line in simulation_line_list:
simulation_line.edit(quantity=self.default_quantity-1)
......@@ -493,34 +500,110 @@ class TestPackingListMixin(TestOrderMixin):
applied_rule = sequence.get('applied_rule')
simulation_line_list = applied_rule.objectValues()
resource_list = sequence.get('resource_list')
self.assertEquals(len(simulation_line_list),len(resource_list))
for simulation_line in simulation_line_list:
simulation_line.edit(start_date=self.datetime+15)
def stepAdoptPrevision(self,sequence=None, sequence_list=None,
packing_list=None,**kw):
def stepModifyOneSimulationLineStartDate(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
"""
if packing_list is None:
packing_list = sequence.get('packing_list')
LOG('packing_list.getSimulationState()',0,packing_list.getSimulationState())
LOG('packing_list.getCausalityState()',0,packing_list.getCausalityState())
packing_list.portal_workflow.doActionFor(packing_list,'adopt_prevision_action')
applied_rule = sequence.get('applied_rule')
simulation_line_list = applied_rule.objectValues()
resource_list = sequence.get('resource_list')
self.assertEquals(len(simulation_line_list),len(resource_list))
simulation_line_list[-1].edit(start_date=self.datetime+15)
def stepModifySimulationLineResource(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
"""
applied_rule = sequence.get('applied_rule')
simulation_line_list = applied_rule.objectValues()
resource_list = sequence.get('resource_list')
for simulation_line in simulation_line_list:
simulation_line.edit(resource_value=resource_list[-1])
def stepModifyOneSimulationLineResource(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
"""
applied_rule = sequence.get('applied_rule')
simulation_line_list = applied_rule.objectValues()
resource_list = sequence.get('resource_list')
simulation_line_list[-1].edit(resource_value=resource_list[-1])
def stepNewPackingListAdoptPrevision(self,sequence=None, sequence_list=None, **kw):
def stepNewPackingListAdoptPrevisionQuantity(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
"""
packing_list = sequence.get('new_packing_list')
self.stepAdoptPrevision(sequence=sequence,packing_list=packing_list)
self._solveDivergence(packing_list, 'quantity', 'adopt')
def stepUnifyDestinationWithDecision(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
"""
packing_list = sequence.get('packing_list')
self._solveDeliveryGroupDivergence(packing_list, 'destination',
packing_list.getRelativeUrl())
def stepAcceptDecisionPackingList(self,sequence=None, sequence_list=None, **kw):
def stepUnifyStartDateWithDecision(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
"""
packing_list = sequence.get('packing_list')
packing_list.portal_workflow.doActionFor(packing_list,'accept_decision_action')
self._solveDeliveryGroupDivergence(packing_list, 'start_date',
packing_list.getRelativeUrl())
def stepUnifyStartDateWithPrevision(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
"""
packing_list = sequence.get('packing_list')
applied_rule = sequence.get('applied_rule')
simulation_line_list = applied_rule.objectValues()
self._solveDeliveryGroupDivergence(packing_list, 'start_date',
simulation_line_list[-1].getRelativeUrl())
def _solveDeliveryGroupDivergence(self, obj, property, target_url):
kw = {'delivery_group_listbox':
{property:{'choice':target_url}}}
self.portal.portal_workflow.doActionFor(
obj,
'solve_divergence_action',
**kw)
def stepAcceptDecisionResource(self,sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('packing_list')
self._solveDivergence(packing_list, 'resource', 'accept')
def stepAcceptDecisionQuantity(self,sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('packing_list')
self._solveDivergence(packing_list, 'quantity', 'accept')
def stepAdoptPrevisionResource(self,sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('packing_list')
self._solveDivergence(packing_list, 'resource', 'adopt')
def stepAdoptPrevisionQuantity(self,sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('packing_list')
self._solveDivergence(packing_list, 'quantity', 'adopt')
def _solveDivergence(self, obj, property, decision, group='line'):
"""
Check if simulation movement are disconnected
"""
kw = {'%s_group_listbox' % group:{}}
for divergence in obj.getDivergenceList():
if divergence.getProperty('tested_property') != property:
continue
sm_url = divergence.getProperty('simulation_movement').getRelativeUrl()
kw['line_group_listbox']['%s&%s' % (sm_url, property)] = {
'choice':decision}
self.portal.portal_workflow.doActionFor(
obj,
'solve_divergence_action',
**kw)
def stepCheckPackingListLineWithNewQuantityPrevision(self,sequence=None, sequence_list=None, **kw):
"""
......@@ -536,6 +619,42 @@ class TestPackingListMixin(TestOrderMixin):
packing_list_line = sequence.get('packing_list_line')
self.assertEquals(packing_list_line.getQuantity(),(self.default_quantity-1)*2)
def stepCheckPackingListLineWithDifferentResource(self,sequence=None, sequence_list=None, **kw):
"""
Look if the packing list has new previsions
"""
packing_list_line = sequence.get('packing_list_line')
new_resource = sequence.get('resource')
self.assertEquals(packing_list_line.getQuantity(), self.default_quantity)
self.assertNotEquals(packing_list_line.getResourceValue(), new_resource)
simulation_line_list = packing_list_line.getDeliveryRelatedValueList()
order_line_list = [x.getOrder() for x in simulation_line_list]
self.assertEquals(sorted(packing_list_line.getCausalityList()),
sorted(order_line_list))
new_packing_list_line = packing_list_line.aq_parent[str(int(packing_list_line.getId())+1)]
self.assertEquals(new_packing_list_line.getQuantity(), self.default_quantity)
self.assertEquals(new_packing_list_line.getResourceValue(), new_resource)
simulation_line_list = new_packing_list_line.getDeliveryRelatedValueList()
order_line_list = [x.getOrder() for x in simulation_line_list]
self.assertEquals(sorted(new_packing_list_line.getCausalityList()),
sorted(order_line_list))
def stepCheckPackingListLineWithSameResource(self,sequence=None, sequence_list=None, **kw):
"""
Look if the packing list has new previsions
"""
old_packing_list_line = sequence.get('packing_list_line')
packing_list_line = old_packing_list_line.aq_parent[str(int(old_packing_list_line.getId())-1)]
resource = sequence.get('resource')
self.assertEquals(old_packing_list_line.getQuantity(), 0)
self.assertNotEquals(old_packing_list_line.getResourceValue(), resource)
self.assertEquals(packing_list_line.getQuantity(), self.default_quantity * 2)
self.assertEquals(packing_list_line.getResourceValue(), resource)
simulation_line_list = packing_list_line.getDeliveryRelatedValueList()
order_line_list = [x.getOrder() for x in simulation_line_list]
self.assertEquals(sorted(packing_list_line.getCausalityList()),
sorted(order_line_list))
def stepCheckNewPackingListAfterStartDateAdopt(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
......@@ -557,6 +676,34 @@ class TestPackingListMixin(TestOrderMixin):
if delivery_value not in delivery_value_list:
delivery_value_list.append(delivery_value_list)
# new_packing_list = delivery_value.getParent()
# self.assertNotEquals(new_packing_list.getUid(),packing_list.getUid())
self.assertEquals(len(delivery_value_list),len(resource_list))
def stepCheckNewSplitPackingListAfterStartDateAdopt(self,sequence=None, sequence_list=None, **kw):
"""
Check if simulation movement are disconnected
"""
applied_rule = sequence.get('applied_rule')
packing_list = sequence.get('packing_list')
packing_list_line = [x for x in packing_list.getMovementList() \
if x.getQuantity()][0]
new_packing_list = self.portal.sale_packing_list_module[str(int(packing_list.getId())-1)]
new_packing_list_line = [x for x in new_packing_list.getMovementList() \
if x.getQuantity()][0]
self.assertEquals(packing_list_line.getQuantity(),self.default_quantity)
self.assertEquals(packing_list.getStartDate(),self.datetime+10)
self.assertEquals(new_packing_list_line.getQuantity(),self.default_quantity)
self.assertEquals(new_packing_list.getStartDate(),self.datetime+15)
simulation_line_list = applied_rule.objectValues()
resource_list = sequence.get('resource_list')
self.assertEquals(len(simulation_line_list),len(resource_list))
delivery_value_list = []
for simulation_line in simulation_line_list:
# self.assertNotEquals(simulation_line.getDeliveryValue(),None)
delivery_value = simulation_line.getDeliveryValue()
if delivery_value not in delivery_value_list:
delivery_value_list.append(delivery_value_list)
# new_packing_list = delivery_value.getParent()
# self.assertNotEquals(new_packing_list.getUid(),packing_list.getUid())
self.assertEquals(len(delivery_value_list),len(resource_list))
......@@ -743,7 +890,7 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepCheckPackingListIsCalculating \
stepTic \
stepCheckPackingListIsDiverged \
stepAcceptDecisionPackingList \
stepUnifyDestinationWithDecision \
stepTic \
stepCheckPackingListIsSolved \
stepCheckPackingListIsNotDivergent \
......@@ -763,7 +910,7 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepCheckPackingListIsCalculating \
stepTic \
stepCheckPackingListIsDiverged \
stepAcceptDecisionPackingList \
stepUnifyStartDateWithDecision \
stepTic \
stepCheckPackingListIsSolved \
stepCheckPackingListIsNotDivergent \
......@@ -798,7 +945,7 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepModifySimulationLineQuantity \
stepTic \
stepCheckPackingListIsDiverged \
stepAdoptPrevision \
stepAdoptPrevisionQuantity \
stepTic \
stepCheckPackingListIsNotDivergent \
stepCheckPackingListIsSolved \
......@@ -817,7 +964,7 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepModifySimulationLineQuantity \
stepTic \
stepCheckPackingListIsDiverged \
stepAcceptDecisionPackingList \
stepAcceptDecisionQuantity \
stepTic \
stepCheckPackingListIsNotDivergent \
stepCheckPackingListIsSolved \
......@@ -836,7 +983,7 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepModifySimulationLineQuantityForMergedLine \
stepTic \
stepCheckPackingListIsDiverged \
stepAdoptPrevision \
stepAdoptPrevisionQuantity \
stepTic \
stepCheckPackingListIsNotDivergent \
stepCheckPackingListIsSolved \
......@@ -855,7 +1002,74 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepModifySimulationLineQuantityForMergedLine \
stepTic \
stepCheckPackingListIsDiverged \
stepAcceptDecisionPackingList \
stepAcceptDecisionQuantity \
stepTic \
stepCheckPackingListIsNotDivergent \
stepCheckPackingListIsSolved \
stepCheckSimulationQuantityUpdatedForMergedLine \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
def test_05d_SimulationChangeResourceOnOneSimulationMovementForMergedLine(self, quiet=quiet, run=run_all_test):
if not run: return
sequence_list = SequenceList()
# Test with a simply order without cell
sequence_string = self.default_sequence_with_duplicated_lines + '\
stepCreateNotVariatedResource \
stepModifyOneSimulationLineResource \
stepTic \
stepCheckPackingListIsDiverged \
stepAdoptPrevisionResource \
stepTic \
stepCheckPackingListIsNotDivergent \
stepCheckPackingListIsSolved \
stepCheckPackingListLineWithDifferentResource \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
def test_05e_SimulationUnifyResourceOnSimulationMovementsForNonMergedLines(self, quiet=quiet, run=run_all_test):
if not run: return
sequence_list = SequenceList()
# Test with a simply order without cell
sequence_string = self.default_sequence_with_two_lines + '\
stepModifySimulationLineResource \
stepTic \
stepCheckPackingListIsDiverged \
stepAdoptPrevisionResource \
stepTic \
stepCheckPackingListIsNotDivergent \
stepCheckPackingListIsSolved \
stepCheckPackingListLineWithSameResource \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
def test_05f_SimulationChangeAndPartialAcceptDecision(self, quiet=quiet, run=run_all_test):
if not run: return
sequence_list = SequenceList()
# Test with a simply order without cell
sequence_string = self.default_sequence_with_duplicated_lines + '\
stepCreateNotVariatedResource \
stepModifySimulationLineQuantityForMergedLine \
stepModifyOneSimulationLineResource \
stepModifySimulationLineStartDate \
stepTic \
stepCheckPackingListIsDiverged \
stepAcceptDecisionQuantity \
stepTic \
stepCheckPackingListIsDiverged \
stepAcceptDecisionResource \
stepTic \
stepCheckPackingListIsDiverged \
stepUnifyStartDateWithDecision \
stepTic \
stepCheckPackingListIsNotDivergent \
stepCheckPackingListIsSolved \
......@@ -874,7 +1088,7 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepModifySimulationLineStartDate \
stepTic \
stepCheckPackingListIsDiverged \
stepAdoptPrevision \
stepUnifyStartDateWithPrevision \
stepTic \
stepCheckPackingListIsSolved \
stepCheckNewPackingListAfterStartDateAdopt \
......@@ -894,7 +1108,28 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepTic \
stepCheckPackingListIsDiverged \
stepCheckPackingListIsDivergent \
stepAdoptPrevision \
stepUnifyStartDateWithPrevision \
stepTic \
stepCheckPackingListIsNotDivergent \
stepCheckPackingListIsSolved \
stepCheckNewPackingListAfterStartDateAdopt \
'
# XXX Check if there is a new packing list created
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
def test_07a_SimulationChangeStartDateWithTwoOrderLine(self, quiet=quiet, run=run_all_test):
if not run: return
sequence_list = SequenceList()
# Test with a simply order without cell
sequence_string = self.default_sequence_with_two_lines + '\
stepModifyOneSimulationLineStartDate \
stepTic \
stepCheckPackingListIsDiverged \
stepCheckPackingListIsDivergent \
stepUnifyStartDateWithPrevision \
stepTic \
stepCheckPackingListIsNotDivergent \
stepCheckPackingListIsSolved \
......@@ -937,6 +1172,12 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepSetContainerFullQuantity \
stepTic \
stepCheckPackingListIsPacked \
stepModifySimulationLineStartDate \
stepTic \
stepCheckPackingListIsDiverged \
stepCheckPackingListIsDivergent \
stepUnifyStartDateWithPrevision \
stepTic \
'
# XXX Check if there is a new packing list created
sequence_list.addSequenceString(sequence_string)
......@@ -1000,7 +1241,7 @@ class TestPackingList(TestPackingListMixin, ERP5TypeTestCase) :
stepSplitAndDeferPackingList \
stepTic \
stepCheckNewPackingListIsDivergent \
stepNewPackingListAdoptPrevision \
stepNewPackingListAdoptPrevisionQuantity \
stepTic \
stepCheckPackingListIsSolved \
stepCheckPackingListSplittedTwoTimes \
......@@ -1110,4 +1351,3 @@ def test_suite():
suite.addTest(unittest.makeSuite(TestPackingList))
suite.addTest(unittest.makeSuite(TestPurchasePackingList))
return suite
......@@ -509,6 +509,7 @@ class TestPayrollMixin(ERP5ReportTestCase):
class TestPayroll(TestPayrollMixin):
quiet = 0
def test_01_modelCreation(self):
'''
......
......@@ -49,27 +49,27 @@ class TestProductionPackingReportListMixin(TestProductionOrderMixin, TestPacking
def stepAcceptDecisionSupplyDeliveryPackingList(self, sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('supply_delivery_packing_list')
self.modifyPackingListState('accept_decision_action', sequence=sequence, packing_list=packing_list)
self._solveDivergence(packing_list, 'quantity', 'accept')
def stepAcceptDecisionProducedDeliveryPackingList(self, sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('produced_delivery_packing_list')
self.modifyPackingListState('accept_decision_action', sequence=sequence, packing_list=packing_list)
self._solveDivergence(packing_list, 'quantity', 'accept')
def stepAdoptPrevisionSupplyDeliveryPackingList(self, sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('supply_delivery_packing_list')
self.modifyPackingListState('adopt_prevision_action', sequence=sequence, packing_list=packing_list)
self._solveDivergence(packing_list, 'quantity', 'adopt')
def stepAdoptPrevisionProducedDeliveryPackingList(self, sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('produced_delivery_packing_list')
self.modifyPackingListState('adopt_prevision_action', sequence=sequence, packing_list=packing_list)
self._solveDivergence(packing_list, 'quantity', 'adopt')
def stepAdoptPrevisionProducedReport(self, sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('produced_report')
self.modifyPackingListState('adopt_prevision_action', sequence=sequence, packing_list=packing_list)
self._solveDivergence(packing_list, 'quantity', 'adopt')
def stepAdoptPrevisionConsumedReport(self, sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('consumed_report')
self.modifyPackingListState('adopt_prevision_action', sequence=sequence, packing_list=packing_list)
self._solveDivergence(packing_list, 'quantity', 'adopt')
def stepSetReadyProducedDeliveryPackingList(self, sequence=None, sequence_list=None, **kw):
packing_list = sequence.get('produced_delivery_packing_list')
......
......@@ -20,8 +20,9 @@ from OFS.CopySupport import CopyContainer as OriginalCopyContainer
from OFS.CopySupport import CopyError
from OFS.CopySupport import eNotSupported, eNoItemsSpecified
from OFS.CopySupport import _cb_encode, _cb_decode, cookie_path
from OFS.CopySupport import sanity_check
from Products.ERP5Type import Permissions
from Acquisition import aq_base
from Acquisition import aq_base, aq_inner, aq_parent
from Products.CMFCore.utils import getToolByName
from Globals import PersistentMapping, MessageDialog
from Products.ERP5Type.Utils import get_request
......@@ -357,6 +358,8 @@ class CopyContainer:
catalog.beforeUnindexObject(None,path=path,uid=uid)
# Then start activity in order to remove lines in catalog,
# sql wich generate locks
if path is None:
path = self.getPath()
# - serialization_tag is used in order to prevent unindexation to
# happen before/in parallel with reindexations of the same object.
catalog.activate(activity='SQLQueue',
......@@ -427,6 +430,92 @@ class CopyContainer:
path_item_list=previous_path,
new_id=self.id)
def _duplicate(self, cp):
try: cp = _cb_decode(cp)
except: raise CopyError, 'Clipboard Error'
oblist=[]
op=cp[0]
app = self.getPhysicalRoot()
result = []
for mdata in cp[1]:
m = Moniker.loadMoniker(mdata)
try: ob = m.bind(app)
except: raise CopyError, 'Not Found'
self._verifyObjectPaste(ob, validate_src=1)
oblist.append(ob)
if op==0:
for ob in oblist:
if not ob.cb_isCopyable():
raise CopyError, 'Not Supported'
try: ob._notifyOfCopyTo(self, op=0)
except: raise CopyError, 'Copy Error'
ob = ob._getCopy(self)
orig_id = ob.getId()
id = self._get_id(ob.getId())
result.append({'id':orig_id, 'new_id':id})
ob._setId(id)
self._setObject(id, ob)
ob = self._getOb(id)
ob._postCopy(self, op=0)
ob._postDuplicate()
ob.wl_clearLocks()
if op==1:
# Move operation
for ob in oblist:
id = ob.getId()
if not ob.cb_isMoveable():
raise CopyError, 'Not Supported'
try: ob._notifyOfCopyTo(self, op=1)
except: raise CopyError, 'Move Error'
if not sanity_check(self, ob):
raise CopyError, 'This object cannot be pasted into itself'
# try to make ownership explicit so that it gets carried
# along to the new location if needed.
ob.manage_changeOwnershipType(explicit=1)
aq_parent(aq_inner(ob))._delObject(id)
ob = aq_base(ob)
orig_id = id
id = self._get_id(id)
result.append({'id':orig_id, 'new_id':id })
ob._setId(id)
self._setObject(id, ob, set_owner=0)
ob = self._getOb(id)
ob._postCopy(self, op=1)
# try to make ownership implicit if possible
ob.manage_changeOwnershipType(explicit=0)
return result
def _postDuplicate(self):
self_base = aq_base(self)
portal_catalog = getToolByName(self, 'portal_catalog')
self_base.uid = portal_catalog.newUid()
# Give the Owner local role to the current user, zope only does this if no
# local role has been defined on the object, which breaks ERP5Security
if getattr(self_base, '__ac_local_roles__', None) is not None:
user = getSecurityManager().getUser()
if user is not None:
userid = user.getId()
if userid is not None:
#remove previous owners
dict = self.__ac_local_roles__
for key, value in dict.items():
if 'Owner' in value:
value.remove('Owner')
#add new owner
l = dict.setdefault(userid, [])
l.append('Owner')
self.__recurse('_postDuplicate')
#### Helper methods
def tryMethodCallWithTemporaryPermission(context, permission, method,
......@@ -455,4 +544,3 @@ def tryMethodCallWithTemporaryPermission(context, permission, method,
result = method(*method_argv, **method_kw)
p.setRoles(old_role_list)
return result
##############################################################################
#
# Copyright (c) 2008 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street - Fifth Floor, Boston, MA 02110-1301, USA.
#
##############################################################################
from Products.PythonScripts.Utility import allow_class
from Products.ERP5Type.ObjectMessage import ObjectMessage
from zLOG import LOG, PROBLEM, INFO
class DivergenceMessage(ObjectMessage):
"""
Divergence Message is used for notifications to user about divergences.
"""
def getMovementGroup(self):
divergence_scope = getattr(self, 'divergence_scope', None)
if divergence_scope is None:
return []
tested_property = getattr(self, 'tested_property', None)
movement_group_list = []
delivery = self.simulation_movement.getDeliveryValue().getParentValue()
for builder in delivery.getBuilderList():
for movement_group in builder.getMovementGroupList():
if movement_group.getDivergenceScope() == divergence_scope:
if tested_property is None or \
tested_property in movement_group.getTestedPropertyList():
return movement_group
def getCollectOrderGroup(self):
movement_group = self.getMovementGroup()
if movement_group is not None:
return movement_group.getCollectOrderGroup()
elif getattr(self, 'divergence_scope', None) == 'quantity':
# We have no MovementGroup for quantity, so try to guess from the
# portal_type.
portal_type = self.getObject().getPortalType()
if 'Line' in portal_type:
return 'line'
elif 'Cell' in portal_type:
return 'cell'
return None
allow_class(DivergenceMessage)
......@@ -193,6 +193,8 @@ class ERP5TypeInformationMixIn( FactoryTypeInformation,
'project',
# Module
'module',
# Movement Group
'movement_group',
)
group_list = ()
......
......@@ -84,7 +84,7 @@ class ObjectMessage:
if request is not None:
for item in request:
if item.meta_type == 'ERP5 Site':
return item.restrictedTraverse(self.object_relative_url)
return item.unrestrictedTraverse(self.object_relative_url)
return None
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment