Commit f7b5c4a4 authored by Guillaume Michon's avatar Guillaume Michon

Amortisation system generisation


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@2879 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 9aa34428
......@@ -27,12 +27,15 @@
##############################################################################
from AccessControl import ClassSecurityInfo
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5.Document.Rule import Rule
from DateTime import DateTime
from copy import deepcopy
from string import lower
from string import lower, capitalize
from Products.ERP5Type.DateUtils import centis, getClosestDate, addToDate
from Products.ERP5Type.DateUtils import getDecimalNumberOfYearsBetween
from Products.ERP5Type import Permissions, PropertySheet, Constraint, Interface
from Products.ERP5.Document.Rule import Rule
from Products.CMFCore.utils import getToolByName
from zLOG import LOG
......@@ -112,6 +115,20 @@ An ERP5 Rule..."""
)
}
movement_name_dict = { 'immobilisation': { 'immo': 'start_immo',
'amo': 'start_amo',
'vat': 'start_vat',
'in_out':'start_in_out' },
'unimmobilisation': { 'immo': 'stop_immo',
'amo': 'stop_amo',
'vat': 'stop_vat',
'in_out':'stop_in_out' },
'annuity': { 'depr': 'annuity_depr',
'amo': 'annuity_amo' },
'correction': 'correction'
}
def test(self, movement):
"""
Tests if the rule (still) applies
......@@ -132,96 +149,647 @@ An ERP5 Rule..."""
An applied rule can be expanded only if its parent movement
is expanded.
"""
valid_state_list = ['delivered']
class CachedValues:
"""
This empty class is used to pass an object through the heavy price calculation,
in order to cache already calculated results, so the calculation is shorter
"""
pass
def updateSimulationMovementProperties(simulation_movement, calculated_movement, set_ratio=0):
"""
Update the properties of the given simulation movement according
to the given calculated_movement.
WARNING : This method does not check if the state of the Amortisation
Transaction corresponding to the Simulation Movement makes it uneditable
set_ratio is used to force the delivery_ratio property update
"""
for (key, value) in calculated_movement.items():
if value != None and key not in ('name','status','id','divergent'):
setter_name = 'set%s' % ''.join([capitalize(o) for o in key.split('_')])
setter = getattr(simulation_movement, setter_name)
setter(value)
simulation_movement.setStartDate(simulation_movement.getStopDate())
if set_ratio:
simulation_movement.setDefaultDeliveryProperties()
simulation_movement.immediateReindexObject()
def updateSimulationMovement(aggregated_movement, calculated_movement,
correction_number, aggregated_period_number,
correction_movement_dict):
"""
Update the Simulation Movement corresponding to aggregated_movement.
Modify it to respect calculated_movement values.
If the corresponding Amortisation Transaction is already validated,
create a corrective Simulation Movement, since a validated Transaction
must not be modified.
If a correction movement already exists, the new movement takes care of it.
correction_number is the id number for new movements.
Return the number of new Simulation Movements created
"""
def createMovement(property_dict, correction_number):
new_id = '%s_%i_%i' % (self.movement_name_dict['correction'], aggregated_period_number, correction_number)
simulation_movement = applied_rule.newContent(portal_type=delivery_line_type, id=new_id)
updateSimulationMovementProperties(simulation_movement = simulation_movement,
calculated_movement = property_dict)
if aggregated_movement['status'] in valid_state_list:
# The Simulation Movement corresponds to an Amortisation Transaction Line
# whose Amortisation Transaction is in a valid state, so we cannot modify
# the Simulation Movement. Some new Simulation Movements are so created
# to correct the Simulation state.
same_path = 1
for property in ("source", "destination",
"source_section_value",
"destination_section_value",
"resource_value", "stop_date", "start_date"):
if aggregated_movement[property] != calculated_movement[property]:
same_path = 0
# Determine the list of correction movement for this aggregated movement.
# It is done only for a validated aggregated movement, since a non-validated
# one should have been modified, rather than corrected by a correction movement
path_tuple = (aggregated_movement['source'],
aggregated_movement['destination'],
aggregated_movement['source_section_value'],
aggregated_movement['destination_section_value'],
aggregated_movement['resource_value'],
aggregated_movement['stop_date'],
aggregated_movement['start_date'])
correction_movement_list = correction_movement_dict.get(path_tuple, [])
already_corrected_quantity = 0
for correction_movement in correction_movement_list:
already_corrected_quantity += correction_movement['quantity']
if len(correction_movement_list) != 0:
del correction_movement_dict[path_tuple]
if same_path:
# We only need to create a new Simulation Movement to correct the amount
correction_quantity = calculated_movement['quantity'] - aggregated_movement['quantity']
correction_quantity -= already_corrected_quantity
property_dict = dict(aggregated_movement)
if correction_quantity != 0:
property_dict['quantity'] = correction_quantity
createMovement(property_dict, correction_number)
return 1
else:
# We need to create two new Simulation Movements : one to annulate the
# aggregated amount, and one to correct the value according to the calculated movements
property_dict = dict(aggregated_movement)
correction_quantity = - property_dict['quantity']
correction_quantity -= already_corrected_quantity
if correction_quantity != 0:
property_dict['quantity'] = correction_quantity
createMovement(property_dict, correction_number)
correction_number += 1
createMovement(calculated_movement, correction_number)
return 2
else:
# The Simulation Movement corresponds to an Amortisation Transaction Line
# whose Amortisation Transaction is not in a valid state, so we can
# modify the Simulation Movement. It introduces an inconsistency the user
# will have to solve.
simulation_movement = getattr(applied_rule, aggregated_movement['id'], None)
updateSimulationMovementProperties(simulation_movement = simulation_movement,
calculated_movement = calculated_movement)
return 0
def updateSimulationMovementToZero(aggregated_movement,
correction_number,
aggregated_period_number,
correction_movement_dict):
"""
Set the quantity value of the given aggregated movement to 0.
This method takes care of the validated aggregated movements
Return the number of new movements created
"""
property_list = dict(aggregated_movement)
if aggregated_movement['quantity'] != 0:
property_list['quantity'] = 0
return updateSimulationMovement(aggregated_movement = aggregated_movement,
calculated_movement = property_list,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = correction_movement_dict)
return 0
def setRemainingAggregatedMovementsToZero(aggregated_movement_dict,
correction_number,
aggregated_period_number,
correction_movement_dict):
"""
The remaining aggregation movements in aggregated_movement_dict
are set to quantity 0, taking care of their validation state and
the already made correction
"""
method_movements_created = 0
for (type, aggregated_movement_list) in aggregated_movement_dict.items():
if type != self.movement_name_dict['correction']:
for aggregated_movement in aggregated_movement_list:
movements_created = updateSimulationMovementToZero(aggregated_movement = aggregated_movement,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = correction_movement_dict)
correction_number += movements_created
method_movements_created += movements_created
# Some correction movements may still be unused, we need to set them to 0
unused_correction_list = []
for correction_movement_list_list in correction_movement_dict.values():
for correction_movement_list in correction_movement_list:
for correction_movement in correction_movement_list:
unused_correction_list.append(correction_movement)
correction_movement_list = aggregated_movement_dict.get( self.movement_name_dict['correction'], [] )
for correction_movement in correction_movement_list:
if correction_movement in unused_correction_list:
movements_created = updateSimulationMovementToZero(aggregated_movement = correction_movement,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = {})
correction_number += movements_created
method_movements_created += movements_created
return method_movements_created
### Start of expand() ###
delivery_line_type = 'Simulation Movement'
# Get the item we come from
my_item = applied_rule.getDefaultCausalityValue()
my_item = applied_rule.getCausalityValue()
# Only expand if my_item is not None
if my_item is not None:
if my_item is None:
return
### First, plan the theorical accounting movements
cached_values = CachedValues()
accounting_movement_list = []
immobilisation_movement_list = my_item.getImmobilisationMovementValueList()
immobilisation_movement_list = my_item.getImmobilisationMovementValueList(cached_data=cached_values)
period_number = 0
current_immo_movement = None
for mvt_number in range(len(immobilisation_movement_list)):
# Update previous, current and next movement variables
prev_immo_movement = current_immo_movement
current_immo_movement = immobilisation_movement_list[mvt_number]
if current_immo_movement.getImmobilisation():
period_number += 1
next_immo_movement = None
if mvt_number < len(immobilisation_movement_list) - 1:
next_immo_movement = immobilisation_movement_list[mvt_number + 1]
# Calculate the accounting movements
accounting_movements = self._getAccountingMovement(current_immo_movement=current_immo_movement,
next_immo_movement=next_immo_movement,
previous_immo_movement=prev_immo_movement)
previous_immo_movement=prev_immo_movement,
period_number = period_number,
cached_data = cached_values)
accounting_movement_list.extend(accounting_movements)
### The next step is to create the simulation movements
# First, we delete all of the simulation movements which are children
# of the applied rule : the entire simulation for this item has been
# re-calculated, so old values are necessary wrong
# However, the simulation movements already used to make accounting
# are not deleted.
movement_id_list = []
movement_last_id_dict = {}
# of the applied rule, but which have not been aggregated.
to_delete_id_list = []
aggregated_period_dict = {}
portal_workflow = getToolByName(self, 'portal_workflow')
for movement in applied_rule.contentValues():
movement_id = movement.getId()
movement_id_name = '_'.join( movement_id.split('_')[:-1] )
movement_id_number = int(movement_id.split('_')[-1])
if movement.getDeliveryValue() is None:
movement_id_name = '_'.join( movement_id.split('_')[:-2] )
movement_id_period_number = int(movement_id.split('_')[-2])
delivery_value = movement.getDeliveryValue()
if delivery_value is None:
# This movement is not already used by the accounting module,
# we can add it to the list to delete
movement_id_list.append(movement_id)
to_delete_id_list.append(movement_id)
else:
# This movement is already used by the accounting module,
# we store the greater id number for its id name, to avoid
# overwriting it later
if movement_last_id_dict.get( movement_id_name, None) is None \
or movement_id_number > movement_last_id_dict[movement_id_name]:
movement_last_id_dict[movement_id_name] = movement_id_number
applied_rule.deleteContent(movement_id_list)
# Simulated movements creation : only if their value (quantity) is != 0
ids = {}
for accounting_movement in accounting_movement_list:
if accounting_movement['quantity'] != 0:
# Determine the new id
my_type = accounting_movement['type']
if ids.get(my_type) is None:
ids[my_type] = movement_last_id_dict.get(my_type, -1)
ids[my_type] = ids[my_type] + 1
new_id = my_type + '_' + str(ids[my_type])
# we store it according to the state of the corresponding
# Amortisation Transaction. We also make a data structure
# to make easier the future work of correspondance
accounting_status = portal_workflow.getStatusOf('amortisation_transaction_workflow', delivery_value.getParent())
accounting_status = accounting_status['amortisation_transaction_state']
movement_dict = { 'stop_date': movement.getStopDate(),
'start_date': movement.getStartDate(),
'quantity': movement.getQuantity(),
'source_section_value': movement.getSourceSectionValue(),
'destination_section_value':movement.getDestinationSectionValue(),
'source': movement.getSource(),
'destination': movement.getDestination(),
'resource_value': movement.getResourceValue(),
'id': movement.getId(),
'status': accounting_status,
'divergent': movement.isDivergent() }
self._placeMovementInStructure(aggregated_period_dict, movement_dict, movement_id_period_number, movement_id_name)
# Add the delivery to the list to be notified (since each aggregated movement will be modified)
parent = delivery_value.getParent()
if parent:
path = parent.getPhysicalPath()
if not path in self._v_notify_dict.keys():
self._v_notify_dict[path] = None
# Deletion of non-aggregated movements
applied_rule.deleteContent(to_delete_id_list)
# Re-handle data of calculated movements to make easier the future
# work of correspondance
calculated_period_dict = {}
for movement in accounting_movement_list:
# Round date
stop_date = accounting_movement['stop_date']
stop_date = movement['stop_date']
if stop_date.latestTime() - stop_date < centis:
stop_date = stop_date + 1
stop_date = DateTime('%s/%s/%s' % (repr(stop_date.year()), repr(stop_date.month()), repr(stop_date.day())))
# Create the simulated movement and set its properties
accounting_movement['stop_date'] = stop_date
stop_date = DateTime(stop_date.Date())
movement['stop_date'] = stop_date
movement['start_date'] = stop_date
splitted_name = movement['name'].split('_')
movement_name = '_'.join( splitted_name[:-2] )
movement_period = int(splitted_name[-2])
if movement['quantity'] != 0:
self._placeMovementInStructure(calculated_period_dict, movement, movement_period, movement_name)
# Then, we need to make a correspondance between aggregated movements and calculated ones
for current_dict in (aggregated_period_dict, calculated_period_dict):
for type_dict in current_dict.values():
for movement_list in type_dict.values():
movement_list.sort( lambda a,b: cmp(a['stop_date'], b['stop_date']) )
matched_dict = self._matchAmortisationPeriods(calculated_period_dict, aggregated_period_dict)
# We can now apply the calculated movements on the applied rule
try:
new_period = max(aggregated_period_dict.keys()) + 1
except:
new_period = 0
for (c_period_number, calculated_dict) in calculated_period_dict.items():
# First, look for a potential found match
match = matched_dict.get(c_period_number, None)
if match is None:
# We did not find any match for this calculated period, so we
# simply add the Simulation Movements into the Simulation
for (type, movement_list) in calculated_dict.items():
for movement_number in range(len(movement_list)):
movement = movement_list[movement_number]
if movement['quantity'] != 0:
new_id = '%s_%i_%i' % (type, new_period, movement_number)
simulation_movement = applied_rule.newContent(portal_type=delivery_line_type, id=new_id)
simulation_movement.setStartDate(stop_date)
simulation_movement.setTargetStartDate(stop_date)
simulation_movement.setTargetStopDate(stop_date)
for (key, value) in accounting_movement.items():
if key != 'type' and value != None:
setter_name = 'set'
tokens = key.split('_')
for i in range(len(tokens)):
setter_name += tokens[i].capitalize()
setter = getattr(simulation_movement, setter_name)
setter(value)
# Set the properties
updateSimulationMovementProperties(simulation_movement = simulation_movement,
calculated_movement = movement)
new_period += 1
else:
# A match has been found between this calculated period, and
# an already aggregated one. In this case, there can be orphaned
# calculated movements, and orphaned aggregated movements.
relocate = match['relocate']
aggregated_period_number = match['aggregated']
aggregated_movement_dict = aggregated_period_dict[aggregated_period_number]
correction_data = self._getCorrectionMovementData(aggregated_movement_dict)
correction_number = correction_data['correction_number']
correction_movement_dict = correction_data['correction_movement_dict']
for (type, calculated_movement_list) in calculated_dict.items():
aggregated_movement_list = aggregated_movement_dict.get(type, [])
new_aggregated_number = 0
for aggregated_movement in aggregated_movement_list:
movement_id = int( aggregated_movement['id'].split('_')[-1] )
if movement_id + 1 > new_aggregated_number:
new_aggregated_number = movement_id + 1
if type in self.movement_name_dict['annuity'].values():
# Annuity movement
# We use relocate to match the movements.
to_delete_from_aggregated = []
for i in range(len(calculated_movement_list)):
calculated_movement = calculated_movement_list[i]
if not (i + relocate < 0 or i + relocate > len(aggregated_movement_list) - 1):
# We have two annuities to match
aggregated_movement = aggregated_movement_list[i + relocate]
movements_created = updateSimulationMovement(aggregated_movement = aggregated_movement,
calculated_movement = calculated_movement,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = correction_movement_dict)
correction_number += movements_created
to_delete_from_aggregated.append(aggregated_movement)
else:
# No matching found. We simply create the annuity
new_id = '%s_%i_%i' % (type, aggregated_period_number, new_aggregated_number)
simulation_movement = applied_rule.newContent(portal_type=delivery_line_type, id=new_id)
updateSimulationMovementProperties(simulation_movement = simulation_movement,
calculated_movement = calculated_movement)
new_aggregated_number += 1
# There is no calculated movement left. We set the remaining aggregated movements to zero
for movement in to_delete_from_aggregated:
aggregated_movement_list.remove(movement)
for aggregated_movement in aggregated_movement_list:
movements_created = updateSimulationMovementToZero(aggregated_movement = aggregated_movement,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = correction_movement_dict)
correction_number += movements_created
else:
# Immobilisation or unimmobilisation movement
# If there are more than one of such movements (this should
# occur quite rarely), the matching process has found
# the most matching ones
non_annuity_match = match['non-annuity'].get(type, None)
if non_annuity_match is not None:
aggregated_movement = aggregated_movement_list[non_annuity_match[1]]
calculated_movement = calculated_movement_list[non_annuity_match[0]]
movements_created = updateSimulationMovement(aggregated_movement = aggregated_movement,
calculated_movement = calculated_movement,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = correction_movement_dict)
correction_number += movements_created
aggregated_movement_list.remove(aggregated_movement)
calculated_movement_list.remove(calculated_movement)
# Then the remaining movements are arbitratry matched
for calculated_movement in calculated_movement_list:
if len(aggregated_movement_list) > 0:
aggregated_movement = aggregated_movement_list[0]
movements_created = updateSimulationMovement(aggregated_movement = aggregated_movement,
calculated_movement = calculated_movement,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = correction_movement_dict)
correction_number += movements_created
aggregated_movement_list.remove(aggregated_movement)
else:
# There is no aggregated movement left. We simply create the remaining calculated movements
new_id = '%s_%i_%i' % (type, aggregated_period_number, new_aggregated_number)
simulation_movement = applied_rule.newContent(portal_type=delivery_line_type, id=new_id)
updateSimulationMovementProperties(simulation_movement = simulation_movement,
calculated_movement = calculated_movement)
new_aggregated_number += 1
for aggregated_movement in aggregated_movement_list:
# There is no calculated movement left. We set the remaining aggregated movements to zero.
movements_created = updateSimulationMovementToZero(aggregated_movement = aggregated_movement,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = correction_movement_dict)
correction_number += movements_created
# We delete this movement type from aggregation, in order to determine
# the types which have not been matched later
try:
del aggregated_movement_dict[type]
except:
pass
movements_created = setRemainingAggregatedMovementsToZero(aggregated_movement_dict = aggregated_movement_dict,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = correction_movement_dict)
correction_number += movements_created
# This aggregated period handling is finished. We delete it from the dictionary
# in order to determine the non-matched aggregated periods later.
del aggregated_period_dict[aggregated_period_number]
# The matching process is finished. Now we set to 0 each remaining aggregated movement
for (aggregated_period_number, aggregated_movement_dict) in aggregated_period_dict.items():
correction_data = self._getCorrectionMovementData(aggregated_movement_dict)
correction_number = correction_data['correction_number']
correction_movement_dict = correction_data['correction_movement_dict']
movements_created = setRemainingAggregatedMovementsToZero(aggregated_movement_dict = aggregated_movement_dict,
correction_number = correction_number,
aggregated_period_number = aggregated_period_number,
correction_movement_dict = correction_movement_dict)
correction_number += movements_created
def _getCorrectionMovementData(self, aggregated_movement_dict):
"""
Return a dictionary containing the first id number for a new correction movement,
and a re-handled structure containing the correction movements, in order to make
easier their search
It is needed to reduce the number of correction movements. If we can notice that
an aggregated movement is already corrected by a correction movement, we do not
have to correct it again
"""
correction_movement_list = aggregated_movement_dict.get(self.movement_name_dict['correction'], [])[:]
correction_number = 0
for correction_movement in correction_movement_list:
movement_id = int( correction_movement['id'].split('_')[-1] )
if movement_id + 1 > correction_number:
correction_number = movement_id + 1
correction_movement_dict = {}
for correction_movement in correction_movement_list:
path_tuple = (correction_movement['source'],
correction_movement['destination'],
correction_movement['source_section_value'],
correction_movement['destination_section_value'],
correction_movement['resource_value'],
correction_movement['stop_date'],
correction_movement['start_date'])
if correction_movement_dict.get(path_tuple, None) is None:
correction_movement_dict[path_tuple] = []
correction_movement_dict[path_tuple].append(correction_movement)
return { 'correction_number':correction_number, 'correction_movement_dict':correction_movement_dict }
def _matchAmortisationPeriods(self, calculated_period_dict, aggregated_period_dict):
"""
Try to match each period in calculated_period_dict with a period in
aggregated_period_dict.
It is done by using a "matching ratio" : when two movements of both dictionaries
have a identical property (source, destination, quantity, resource, ...), the
matching ratio is incremented for the correspondance between the both corresponding
periods.
Then, periods are matched in order of priority of the matching ratio.
"""
def calculateMovementMatch(movement_a, movement_b, parameter_list = ['source_section_value',
'destination_section_value', 'source', 'destination', 'resource_value', 'quantity'],
compare_dates=0 ):
if compare_dates:
parameter_list.append('stop_date')
matching = { 'max':0, 'score':0 }
for matching_parameter in parameter_list:
matching['max'] = matching['max'] + 1
if movement_a[matching_parameter] == movement_b[matching_parameter]:
matching['score'] = matching['score'] + 1
return matching
matching_ratio_list = []
for (calculated_period_number,calculated_dict) in calculated_period_dict.items():
calculated_immobilisation = calculated_dict.get(self.movement_name_dict['immobilisation']['immo'], [])
for (aggregated_period_number, aggregated_dict) in aggregated_period_dict.items():
# We first compare the dates of immobilisation, so we can compare the annuity suit
# first directly, and then by relocating in time
relocate_list = [0, 1, -1]
aggregated_immobilisation = calculated_dict.get(self.movement_name_dict['immobilisation']['immo'], [])
if len(calculated_immobilisation) != 0 and len(aggregated_immobilisation) != 0:
c_immobilisation_movement = calculated_immobilisation[-1]
a_immobilisation_movement = aggregated_immobilisation[-1]
c_date = c_immobilisation_movement['stop_date']
a_date = a_immobilisation_movement['stop_date']
if a_date < c_date:
date_difference = int(getDecimalNumberOfYearsBetween(a_date, c_date))
else:
date_difference = int(- getDecimalNumberOfYearsBetween(c_date, a_date))
if abs(date_difference) >= 1:
relocate_list.extend(date_difference-1, date_difference, date_difference+1)
for o in relocate_list[:]:
while relocate_list.count(o) > 1:
relocate_list.remove(o)
# Then we try to effectively match some data in these two periods, by relocating in time
# Annuities
current_matching = {'score':0, 'max':0, 'relocate':0, 'non-annuity':{}}
for relocate in relocate_list:
relocate_matching = {'score':0, 'max':0, 'relocate':relocate, 'non-annuity':{}}
a_annuity_list = aggregated_dict.get(self.movement_name_dict['annuity']['amo'], [])
c_annuity_list = calculated_dict.get(self.movement_name_dict['annuity']['amo'], [])
for i in range(len(a_annuity_list)):
if not (i + relocate < 0 or i + relocate > len(c_annuity_list) - 1):
a_annuity = a_annuity_list[i]
c_annuity = c_annuity_list[i + relocate]
this_matching = calculateMovementMatch(a_annuity, c_annuity)
relocate_matching['score'] = relocate_matching['score'] + this_matching['score']
relocate_matching['max'] = relocate_matching['max'] + this_matching['max']
# Compare the current relocated matching with the best relocated matching found until now
if current_matching['max'] == 0:
current_matching_ratio = 0
else:
current_matching_ratio = current_matching['score'] / (current_matching['max']+0.)
if relocate_matching['max'] == 0: relocate_matching['max'] = 1
relocate_matching_ratio = relocate_matching['score'] / (relocate_matching['max']+0.)
if relocate_matching_ratio >= current_matching_ratio:
if relocate_matching_ratio > current_matching_ratio or abs(relocate) < abs(current_matching['relocate']):
current_matching = relocate_matching
# Immobilisation and unimmobilisation ; normally, there should only be one or
# two movements of each type here, so we can compare each movement with all
# of the others without losing much time
for movement_type in ('immobilisation', 'unimmobilisation'):
for immobilisation_type in self.movement_name_dict['immobilisation'].values():
a_movement_list = aggregated_dict.get(immobilisation_type, [])
c_movement_list = calculated_dict.get(immobilisation_type, [])
local_best_matching = {'score':0, 'max':0, 'non-annuity':{} }
local_current_matching = {'score':0, 'max':0}
for a_number in range(len(a_movement_list)):
a_movement = a_movement_list[a_number]
for c_number in range(len(c_movement_list)):
c_movement = c_movement_list[c_number]
local_current_matching = calculateMovementMatch(a_movement, c_movement, compare_dates=1)
if local_best_matching['max'] == 0: local_best_matching['max'] = 1
local_best_ratio = local_best_matching['score'] / (local_best_matching['max']+0.)
if local_current_matching['max'] == 0: local_current_matching['max'] = 1
local_current_ratio = local_current_matching['score'] / (local_current_matching['max']+0.)
if local_current_ratio > local_best_ratio:
local_best_matching = local_current_matching
local_best_matching['non-annuity'] = { immobilisation_type: [a_number, c_number] }
# Add the best found matching to the current matching score
current_matching['score'] = current_matching['score'] + local_best_matching['score']
current_matching['max'] = current_matching['max'] + local_best_matching['max']
current_matching['non-annuity'].update( local_best_matching['non-annuity'] )
# We found a matching ratio for this aggregated-calculated periods pair, with a particular
# relocating. We add the ratio in the list in order to be able to retrieve it later
if current_matching['max'] == 0:
ratio = 0
else:
ratio = current_matching['score'] / (current_matching['max']+0.)
matching_ratio_list.append( { 'calculated_period' : calculated_period_number,
'aggregated_period' : aggregated_period_number,
'ratio' : ratio,
'relocate' : current_matching['relocate'],
'non-annuity' : current_matching['non-annuity'] } )
# We have each matching ratio. Now we need to match each amortisation period
# according to these ratio : the highest ratio gets the priority, then the next
# highest is taken into account if corresponding resources are free, and so on
matching_ratio_list.sort(lambda a, b: - cmp(a['ratio'], b['ratio']))
calculated_to_match = calculated_period_dict.keys()
aggregated_to_match = aggregated_period_dict.keys()
match_dict = {}
for matching_ratio in matching_ratio_list:
calculated = matching_ratio['calculated_period']
aggregated = matching_ratio['aggregated_period']
relocate = matching_ratio['relocate']
non_annuity = matching_ratio['non-annuity']
if calculated in calculated_to_match and aggregated in aggregated_to_match:
match_dict[calculated] = { 'aggregated':aggregated, 'relocate':relocate, 'non-annuity':non_annuity }
calculated_to_match.remove(calculated)
aggregated_to_match.remove(aggregated)
return match_dict
def _placeMovementInStructure(self, structure, movement_dict, period_number, name):
"""
Used to sort aggregated and calculated movements in a structure
to make easier the correspondance work
"""
period_dict = structure.get(period_number, None)
if period_dict is None:
structure[period_number] = {}
period_dict = structure[period_number]
movement_list = period_dict.get(name, None)
if movement_list is None:
period_dict[name] = []
movement_list = period_dict[name]
movement_list.append( movement_dict )
security.declareProtected(Permissions.View, '_getAccountingMovement')
def _getAccountingMovement(self,current_immo_movement,next_immo_movement=None, previous_immo_movement=None):
def _getAccountingMovement(self,current_immo_movement,next_immo_movement=None, previous_immo_movement=None,
period_number=0, **kw):
"""
Calculates the value of accounting movements during the period
between the two given immobilisation movements.
If next_immo_movement is None, accounting movements are made at infinite.
"""
# These methods are used to create dictionaries containing data to return
def buildImmobilisationCalculatedMovementList(date, period, source_section, destination_section,
currency, movement_list=[]):
return buildSpecificCalculatedMovementList(date, period, 0, source_section, destination_section,
currency, movement_list, 'immobilisation')
def buildUnimmobilisationCalculatedMovementList(date, period, source_section, destination_section,
currency, movement_list=[]):
return buildSpecificCalculatedMovementList(date, period, 0, source_section, destination_section,
currency, movement_list, 'unimmobilisation')
def buildAnnuityCalculatedMovementList(date, period, annuity, source_section, destination_section,
currency, movement_list=[]):
return buildSpecificCalculatedMovementList(date, period, annuity, source_section, destination_section,
currency, movement_list, 'annuity')
def buildSpecificCalculatedMovementList(date, period, annuity, source_section, destination_section,
currency, movement_list, name):
for movement in movement_list:
movement['name'] = self.movement_name_dict[name][movement['name']]
return buildCalculatedMovementList(date, period, annuity, source_section,
destination_section, currency, movement_list)
def buildCalculatedMovementList(date, period, annuity, source_section,
destination_section, currency, movement_list = []):
return_list = []
for movement in movement_list:
return_list.append(
{ 'stop_date' : date,
'name' : '%s_%i_%i' % (movement['name'], period, annuity),
'quantity' : movement['quantity'],
'source' : movement['source'],
'destination' : movement['destination'],
'source_section_value' : source_section,
'destination_section_value' : destination_section,
'resource_value' : currency } )
return return_list
item = current_immo_movement.getParent()
if item is not None:
# Get some variables
begin_value = current_immo_movement.getAmortisationOrDefaultAmortisationPrice()
begin_remaining = current_immo_movement.getAmortisationOrDefaultAmortisationDuration()
disposal_price = current_immo_movement.getDisposalPrice()
begin_price = current_immo_movement.getAmortisationOrDefaultAmortisationPrice(**kw)
begin_remaining = current_immo_movement.getAmortisationOrDefaultAmortisationDuration(**kw)
section = current_immo_movement.getSectionValue()
currency = current_immo_movement.getPriceCurrency()
if currency is not None:
......@@ -238,54 +806,46 @@ An ERP5 Rule..."""
replace = 0 # replace is used to know if we need to reverse an one-side movement
# in order to have a one-side movement whose destination side is unset
if immobilised_before and previous_immo_movement is not None:
immo_begin_value = previous_immo_movement.getAmortisationOrDefaultAmortisationPrice()
immo_end_value = current_immo_movement.getDefaultAmortisationPrice() # We use this method in order
immo_begin_price = previous_immo_movement.getAmortisationOrDefaultAmortisationPrice(**kw)
immo_end_price = current_immo_movement.getDefaultAmortisationPrice(**kw) # We use this method in order
# to get the calculated value of the item, and not the
# value entered later by the user
if immo_end_value is not None:
if immo_end_price is not None:
# Set "end of amortisation period" data
amortisation_price = immo_begin_value - immo_end_value
end_vat = previous_immo_movement.getVat() * immo_end_value / immo_begin_value
immo_end_value_vat = immo_end_value + end_vat
returned_list.extend([{ 'stop_date' : start_date,
'type' : 'immo',
'quantity' : -immo_begin_value,
amortisation_price = immo_begin_price - immo_end_price
end_vat = previous_immo_movement.getVat() * immo_end_price / immo_begin_price
immo_end_price_vat = immo_end_price + end_vat
returned_list.extend(
buildUnimmobilisationCalculatedMovementList(date = start_date,
period = period_number - 1,
source_section = None,
destination_section = previous_immo_movement.getSectionValue(),
currency = currency,
movement_list=[
{ 'name' : 'immo',
'quantity' : -immo_begin_price,
'source' : None,
'destination' : previous_immo_movement.getImmobilisationAccount(),
'source_section_value' : None,
'destination_section_value' : previous_immo_movement.getSectionValue(),
'resource_value' : currency },
{ 'stop_date' : start_date,
'type' : 'vat',
'destination' : previous_immo_movement.getImmobilisationAccount() },
{ 'name' : 'vat',
'quantity' : -end_vat,
'source' : None,
'destination' : previous_immo_movement.getVatAccount(),
'source_section_value' : None,
'destination_section_value' : previous_immo_movement.getSectionValue(),
'resource_value' : currency },
{ 'stop_date' : start_date,
'type' : 'amo',
'destination' : previous_immo_movement.getVatAccount() },
{ 'name' : 'amo',
'quantity' : amortisation_price,
'source' : None,
'destination' : previous_immo_movement.getAmortisationAccount(),
'source_section_value' : None,
'destination_section_value' : previous_immo_movement.getSectionValue(),
'resource_value' : currency },
{ 'stop_date' : start_date,
'type' : 'in_out',
'quantity' : immo_end_value_vat,
'destination' : previous_immo_movement.getAmortisationAccount() },
{ 'name' : 'in_out',
'quantity' : immo_end_price_vat,
'source' : None,
'destination' : previous_immo_movement.getOutputAccount(),
'source_section_value' : None,
'destination_section_value' : previous_immo_movement.getSectionValue(),
'resource_value' : currency } ] )
'destination' : previous_immo_movement.getOutputAccount() }
] ) )
replace = 1
if immobilised_after:
# Set "begin of amortisation" data
immo_begin_value = begin_value
immo_begin_price = begin_price
begin_vat = current_immo_movement.getVat()
if len(returned_list) > 0 and round(immo_begin_value,2) == round(immo_end_value,2) and round(begin_vat,2) == round(end_vat,2):
if len(returned_list) > 0 and round(immo_begin_price,2) == round(immo_end_price,2) and round(begin_vat,2) == round(end_vat,2):
# Gather data into a single movement
returned_list[0]['source'] = current_immo_movement.getImmobilisationAccount()
returned_list[1]['source'] = current_immo_movement.getVatAccount()
......@@ -296,39 +856,30 @@ An ERP5 Rule..."""
replace = 0
else:
# Create another movement
returned_list.extend([{ 'stop_date' : start_date,
'type' : 'immo',
'quantity' : - immo_begin_value,
returned_list.extend(
buildImmobilisationCalculatedMovementList(date = start_date,
period = period_number,
source_section = section,
destination_section = None,
currency = currency,
movement_list=[
{ 'name' : 'immo',
'quantity' : - immo_begin_price,
'source' : current_immo_movement.getImmobilisationAccount(),
'destination' : None,
'source_section_value' : section,
'destination_section_value' : None,
'resource_value' : currency },
{ 'stop_date' : start_date,
'type' : 'vat',
'destination' : None },
{ 'name' : 'vat',
'quantity' : - begin_vat,
'source' : current_immo_movement.getVatAccount(),
'destination' : None,
'source_section_value' : section,
'destination_section_value' : None,
'resource_value' : currency },
{ 'stop_date' : start_date,
'type' : 'amo',
'destination' : None },
{ 'name' : 'amo',
'quantity' : 0,
'source' : current_immo_movement.getAmortisationAccount(),
'destination' : None,
'source_section_value' : section,
'destination_section_value' : None,
'resource_value' : currency },
{ 'stop_date' : start_date,
'type' : 'in_out',
'quantity' : immo_begin_value + begin_vat,
'destination' : None },
{ 'name' : 'in_out',
'quantity' : immo_begin_price + begin_vat,
'source' : current_immo_movement.getInputAccount(),
'destination' : None,
'source_section_value' : section,
'destination_section_value' : None,
'resource_value' : currency } ] )
'destination' : None }
] ) )
if replace:
# Replace destination by source on the immobilisation-ending writings
for i in range(4):
......@@ -339,62 +890,102 @@ An ERP5 Rule..."""
returned_list[i]['quantity'] = - returned_list[i]['quantity']
# Calculate the annuities
current_value = begin_value
current_price = begin_price
if immobilised_after:
# Search for the first financial end date after the first immobilisation movement
end_date = getClosestDate(target_date=start_date,
date=section.getFinancialYearStopDate(),
precision='year',
before=0)
while (stop_date is None and current_value > 0) or (stop_date is not None and end_date - stop_date < 0):
annuity_end_value = item.getAmortisationPrice(at_date=end_date)
if annuity_end_value is not None:
annuity_value = current_value - annuity_end_value
if annuity_value != 0:
returned_list.extend([{ 'stop_date' : end_date,
'type' : 'annuity_depr',
'quantity' : (- annuity_value),
annuity_number = 0
while (stop_date is None and current_price > disposal_price) or \
(stop_date is not None and end_date - stop_date < 0):
annuity_end_price = item.getAmortisationPrice(at_date=end_date, **kw)
if annuity_end_price is None:
break
if annuity_end_price is not None:
annuity_price = current_price - annuity_end_price
if annuity_price < 0:
break
if annuity_price != 0:
returned_list.extend(
buildAnnuityCalculatedMovementList(date = end_date,
period = period_number,
annuity = annuity_number,
source_section = section,
destination_section = None,
currency = currency,
movement_list=[
{ 'name' : 'depr',
'quantity' : - annuity_price,
'source' : current_immo_movement.getDepreciationAccount(),
'destination' : None,
'source_section_value' : section,
'destination_section_value': None,
'resource_value' : currency },
{ 'stop_date' : end_date,
'type' : 'annuity_amo',
'quantity' : annuity_value,
'destination' : None },
{ 'name' : 'amo',
'quantity' : annuity_price,
'source' : current_immo_movement.getAmortisationAccount(),
'destination' : None,
'source_section_value' : section,
'destination_section_value': None,
'resource_value' : currency } ] )
current_value -= annuity_value
'destination' : None }
] ) )
current_price -= annuity_price
end_date = addToDate(end_date, {'year':1})
annuity_number += 1
# Proceed the last annuity (incomplete, from financial year end date to stop_date)
if stop_date is not None:
# We use getDefaultAmortisationPrice in order to get the calculated value of the item,
# and not the value entered later by the user for the next immobilisation period
annuity_end_value = next_immo_movement.getDefaultAmortisationPrice()
if annuity_end_value is not None:
annuity_value = current_value - annuity_end_value
if annuity_value != 0:
returned_list.extend([{ 'stop_date' : end_date,
'type' : 'annuity_depr',
'quantity' : (- annuity_value),
annuity_end_price = next_immo_movement.getDefaultAmortisationPrice(**kw)
if annuity_end_price is not None and annuity_end_price < current_price:
annuity_price = current_price - annuity_end_price
if annuity_price != 0:
returned_list.extend(
buildAnnuityCalculatedMovementList(date = end_date,
period = period_number,
annuity = annuity_number,
source_section = section,
destination_section = None,
currency = currency,
movement_list=[
{ 'name' : 'depr',
'quantity' : - annuity_price,
'source' : current_immo_movement.getDepreciationAccount(),
'destination' : None,
'source_section_value' : section,
'destination_section_value': None,
'resource_value' : currency },
{ 'stop_date' : end_date,
'type' : 'annuity_amo',
'quantity' : annuity_value,
'destination' : None },
{ 'name' : 'amo',
'quantity' : annuity_price,
'source' : current_immo_movement.getAmortisationAccount(),
'destination' : None,
'source_section_value' : section,
'destination_section_value': None,
'resource_value' : currency } ] )
'destination' : None }
] ) )
# Construct an unimmobilisation set of movements if the disposal value
# is greater than 0
if stop_date is None and disposal_price and current_price <= disposal_price:
end_date = addToDate(end_date, year=-1)
amortisation_price = begin_price - current_price
end_vat = current_immo_movement.getVat() * current_price / begin_price
immo_end_price_vat = current_price + end_vat
returned_list.extend(
buildUnimmobilisationCalculatedMovementList(date = end_date,
period = period_number,
source_section = current_immo_movement.getSectionValue(),
destination_section = None,
currency = currency,
movement_list=[
{ 'name' : 'immo',
'quantity' : begin_price,
'source' : current_immo_movement.getImmobilisationAccount(),
'destination' : None },
{ 'name' : 'vat',
'quantity' : end_vat,
'source' : current_immo_movement.getVatAccount(),
'destination' : None },
{ 'name' : 'amo',
'quantity' : - amortisation_price,
'source' : current_immo_movement.getAmortisationAccount(),
'destination' : None },
{ 'name' : 'in_out',
'quantity' : - immo_end_price_vat,
'source' : current_immo_movement.getOutputAccount(),
'destination' : None }
] ) )
return returned_list
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment