Commit deba110c authored by Jérome Perrin's avatar Jérome Perrin

accounting: better strategy for grouping reference of internal invoices

Make sure we don't set grouping reference on lines that have a valid
group only for source (or destination) but not for the other side.
parent 297150fc
Pipeline #25308 failed with stage
......@@ -4,17 +4,28 @@ If accounting_transaction_line_uid_list is not passed, this script assumes that
it's called on the context of an accounting transaction and it guess the grouping
of related accounting transactions using causality.
"""
import collections
from Products.ERP5Type.Utils import int2letter
# this dict associates (node, section, mirror_section, extra_grouping_parameter) to a list of
# accounting lines info (total_price, date and path)
lines_per_node = {}
# this counter associates the lines to the count of entities that are
# impacted by this accounting line.
# Typically it is 1 if the line is only for source/destination like in
# "normal" invoices and it can be 2 for Internal Invoices.
# When we found a match that can be grouped, we decrement this counter for
# this line. If all lines of the group have a 0 count, we can set grouping
# reference. This is a way to set grouping reference when the group is only
# valid for one side, it has to be valid for both side.
non_grouped_section_count = collections.Counter()
portal = context.getPortalObject()
allow_grouping_with_different_quantity = portal.portal_preferences.getPreference(
'preferred_grouping_with_different_quantities', 0)
if portal.portal_preferences.getPreference(
'preferred_grouping_with_different_quantities', False):
return
accounting_transaction_line_value_list = []
if accounting_transaction_line_uid_list is None:
......@@ -35,18 +46,22 @@ else:
uid=accounting_transaction_line_uid_list)
]
for line in accounting_transaction_line_value_list:
# source
section_relative_url = None
source_section = line.getSourceSectionValue(portal_type='Organisation')
if source_section is not None:
source_section = \
source_section.Organisation_getMappingRelatedOrganisation()
section_relative_url = source_section.getRelativeUrl()
lines_per_node.setdefault(
node_relative_url = line.getSource(portal_type='Account')
if node_relative_url:
section_relative_url = None
source_section = line.getSourceSectionValue(portal_type='Organisation')
if source_section is not None:
source_section = \
source_section.Organisation_getMappingRelatedOrganisation()
section_relative_url = source_section.getRelativeUrl()
line_relative_url = line.getRelativeUrl()
non_grouped_section_count.update({line_relative_url: 1})
lines_per_node.setdefault(
(
line.getSource(portal_type='Account'),
node_relative_url,
section_relative_url,
line.getDestinationSection(),
line.AccountingTransactionLine_getGroupingExtraParameterList(
......@@ -55,49 +70,68 @@ for line in accounting_transaction_line_value_list:
dict(
total_price=line.getSourceInventoriatedTotalAssetPrice() or 0,
date=line.getStartDate(),
path=line.getRelativeUrl()))
path=line_relative_url))
# destination
section_relative_url = None
destination_section = line.getDestinationSectionValue(
portal_type='Organisation')
if destination_section is not None:
destination_section = \
destination_section.Organisation_getMappingRelatedOrganisation()
section_relative_url = destination_section.getRelativeUrl()
lines_per_node.setdefault(
(
line.getDestination(portal_type='Account'),
section_relative_url,
line.getSourceSection(),
line.AccountingTransactionLine_getGroupingExtraParameterList(
source=False),
), []).append(
dict(
total_price=line.getDestinationInventoriatedTotalAssetPrice() or 0,
date=line.getStopDate(),
path=line.getRelativeUrl()))
node_relative_url = line.getDestination(portal_type='Account')
if node_relative_url:
section_relative_url = None
destination_section = line.getDestinationSectionValue(
portal_type='Organisation')
if destination_section is not None:
destination_section = \
destination_section.Organisation_getMappingRelatedOrganisation()
section_relative_url = destination_section.getRelativeUrl()
line_relative_url = line.getRelativeUrl()
non_grouped_section_count.update({line_relative_url: 1})
lines_per_node.setdefault(
(
node_relative_url,
section_relative_url,
line.getSourceSection(),
line.AccountingTransactionLine_getGroupingExtraParameterList(
source=False),
), []).append(
dict(
total_price=line.getDestinationInventoriatedTotalAssetPrice() or 0,
date=line.getStopDate(),
path=line_relative_url))
changed_line_set = set()
# We do two passes, a first pass to check if lines are groupable from both
# sides in case of internal invoices - then a second path to actually set
# grouping references on groups that were groupable from both sides.
for (
node,
section,
mirror_section,
_,
), line_info_list in lines_per_node.items():
if node is None:
continue
), line_info_list in list(lines_per_node.items()):
# get the currency rounding for this section, with a fallback that something that would
# allow grouping in case precision is not defined.
currency_precision = 5
if section:
default_currency = portal.restrictedTraverse(section).getPriceCurrencyValue()
default_currency = portal.restrictedTraverse(
section).getPriceCurrencyValue()
if default_currency is not None:
currency_precision = default_currency.getQuantityPrecision()
total_price = round(sum([l['total_price'] for l in line_info_list]), currency_precision)
if total_price == 0 or allow_grouping_with_different_quantity:
total_price = round(
sum([l['total_price'] for l in line_info_list]), currency_precision)
if total_price == 0:
for line in line_info_list:
non_grouped_section_count.subtract({line['path']: 1})
else:
# this group is not valid, remove it for second path
del lines_per_node[node, section, mirror_section, _]
changed_line_set = set()
for (
node,
section,
mirror_section,
_,
), line_info_list in lines_per_node.items():
if sum(non_grouped_section_count[line['path']] for line in line_info_list) == 0:
# we should include mirror node in the id_group, but this would reset
# id generators and generate grouping references that were already used.
id_group = ('grouping_reference', node, section, mirror_section)
......
......@@ -6304,12 +6304,13 @@ class TestInternalInvoiceTransaction(AccountingTestCase):
)
internal_invoice1.start()
internal_invoice1.newContent(
id='not_grouped',
id='line1',
source_value=self.portal.account_module.receivable,
destination_value=self.portal.account_module.payable,
source_debit=10,
)
internal_invoice1.newContent(
id='line2',
source_value=self.portal.account_module.goods_sales,
destination_value=self.portal.account_module.goods_purchase,
source_credit=10,
......@@ -6328,20 +6329,74 @@ class TestInternalInvoiceTransaction(AccountingTestCase):
)
internal_invoice2.start()
internal_invoice2.newContent(
id='line1',
source_value=self.portal.account_module.goods_sales,
destination_value=self.portal.account_module.payable,
source_debit=10,
)
internal_invoice2.newContent(
id='not_grouped',
id='line2',
source_value=self.portal.account_module.receivable,
destination_value=self.portal.account_module.goods_purchase,
source_debit=10,
)
internal_invoice2.stop()
self.tic()
self.assertFalse(internal_invoice1.not_grouped.getGroupingReference())
self.assertFalse(internal_invoice1.not_grouped.getGroupingReference())
self.assertFalse(internal_invoice1.line1.getGroupingReference())
self.assertFalse(internal_invoice1.line2.getGroupingReference())
self.assertFalse(internal_invoice2.line1.getGroupingReference())
self.assertFalse(internal_invoice2.line2.getGroupingReference())
def test_grouping_reference_both_sides_with_line_for_0(self):
# Lines for 0 are automatically grouped, but this takes care that the
# amount is also 0 for the mirror side.
# | Source Account | Debit | Credit | Grouping | Destination Account | Debit | Credit | Grouping |
# |----------------|-------|--------|----------|---------------------|-------|--------|----------|
# | receivable | 0 | | A | | | | |
# | sales | | 0 | | purchase | 10 | | |
# | | | | | payable | | 10 | |
# | | | | | receivable | | 0 | A |
#
internal_invoice1 = self.portal.accounting_module.newContent(
portal_type='Internal Invoice Transaction',
title='internal_invoice1',
source_section_value=self.section,
destination_section_value=self.main_section,
start_date=DateTime(2015, 1, 1),
created_by_builder=True,
)
# start before creating lines, because we don't want our lines to
# be initialized with mirror accounts.
internal_invoice1.start()
internal_invoice1.newContent(
id='line_1',
source_value=self.portal.account_module.receivable,
source_debit=0,
)
internal_invoice1.newContent(
id='line_2',
source_value=self.portal.account_module.goods_sales,
destination_value=self.portal.account_module.goods_purchase,
source_credit=0,
destination_asset_debit=10,
)
internal_invoice1.newContent(
id='line_3',
destination_value=self.portal.account_module.payable,
destination_asset_credit=10,
)
internal_invoice1.newContent(
id='line_4',
destination_value=self.portal.account_module.receivable,
destination_credit=0,
)
internal_invoice1.stop()
self.tic()
self.assertTrue(internal_invoice1.line_1.getGroupingReference())
self.assertFalse(internal_invoice1.line_2.getGroupingReference())
self.assertFalse(internal_invoice1.line_3.getGroupingReference())
self.assertTrue(internal_invoice1.line_4.getGroupingReference())
class TestAccountingAlarms(AccountingTestCase):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment