Commit 2ec3bfbb authored by Vincent Pelletier's avatar Vincent Pelletier

Implement optimisation of getInventoryList.

  This optimisation prevents specific query paterns on stock table from
  reading a number of lines which is proportionnal to the number of operations
  present for a given criterion set.
  This optimisation requires:
   - new ZSQLMethod from erp5_core
   - 2 new catalog tables from erp5_mysql_innodb_catalog
   - inventories with new "full_inventory" property set to true
       full_inventory means that the user which did the inventory acknowledges
       there is no remaining resource besides what is present in the inventory.
Add test for this optimisation, disabled for now (required business templates
are not updated yet).


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@16103 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 19962b5a
......@@ -48,6 +48,8 @@ from Products.PythonScripts.Utility import allow_class
from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery, QueryMixin
from Shared.DC.ZRDB.Results import Results
class SimulationTool(BaseTool):
"""
The SimulationTool implements the ERP5
......@@ -848,7 +850,8 @@ class SimulationTool(BaseTool):
security.declareProtected(Permissions.AccessContentsInformation,
'getInventoryList')
def getInventoryList(self, src__=0, ignore_variation=0, standardise=0,
def getInventoryList(self, src__=0, optimisation__=True,
ignore_variation=0, standardise=0,
omit_simulation=0,
selection_domain=None, selection_report=None,
statistic=0, inventory_list=1,
......@@ -861,17 +864,285 @@ class SimulationTool(BaseTool):
NOTE: we may want to define a parameter so that we can select
the kind of inventory statistics we want to display (ex. sum,
average, cost, etc.)
Optimisation queries.
Optimisation of a stock lookup is done to avoid a table scan
of all lines corresponding to a given node, section or payment,
because they grow with time and query time should not.
First query: Fetch fitting full inventory dates.
For each node, section or payment, find the first anterior full
inventory.
Second query: Fetch full inventory amounts.
Fetch values of inventory identified in the first query.
Tird query: Classic stock table read.
Fetch all rows in stock table which are posterior to the inventory.
Final result
Add results of the second and third queries, and return it.
Missing optimisations:
- In a getInventory case where everything is specified for the
resource, it's not required for the inventory to be full, it
just need to be done for the right resource.
If the resource isn't completely defined, we require inventory
to be full, which is implemented.
- Querying multiple nodes/categories/payments in one call prevents
from using optimisation, it should be equivalent to multiple calls
on individual nodes/categories/payments.
-
"""
if src__:
sql_source_list = []
# If no group at all, give a default sort group by
kw.update(self._getDefaultGroupByParameters(**kw))
sql_kw = self._generateSQLKeywordDict(**kw)
return self.Resource_zGetInventoryList(
sql_kw, new_kw = self._generateKeywordDict(**kw)
stock_sql_kw = self._generateSQLKeywordDictFromKeywordDict(
table='stock', sql_kw=sql_kw, new_kw=new_kw)
Resource_zGetFullInventoryDate = \
getattr(self, 'Resource_zGetFullInventoryDate', None)
optimisation_success = optimisation__ and \
Resource_zGetFullInventoryDate is not None
# Generate first query parameter dict
if optimisation_success:
def getFirstQueryParameterDict(query_generator_kw):
AVAILABLE_CRITERIONS_IN_INVENTORY_TABLE = ['node_uid',
'section_uid',
'payment_uid']
group_by_list = query_generator_kw.get('group_by', [])
column_value_dict = query_generator_kw.get('column_value_dict', {})
new_group_by_list = []
new_column_value_dict = {}
for criterion_id in AVAILABLE_CRITERIONS_IN_INVENTORY_TABLE:
criterion_value_list = column_value_dict.get(criterion_id, [])
if not isinstance(criterion_value_list, (list, tuple)):
criterion_value_list = [criterion_value_list]
if len(criterion_value_list) > 0:
if len(criterion_value_list) > 1:
# Impossible to optimise if there is more than one possible
# value per criterion.
optimisation_success = False
break
new_column_value_dict[criterion_id] = criterion_value_list
new_group_by_list.append(criterion_id)
elif criterion_id in group_by_list:
new_group_by_list.append(criterion_id)
group_by_expression = ', '.join(new_group_by_list)
column_id_list = new_column_value_dict.keys()
column_value_list_list = new_column_value_dict.values()
date_value_list = column_value_dict.get('date', {}).get('query', [])
if len(date_value_list) > 0:
date = min(date_value_list)
if isinstance(date, DateTime):
date = date.ISO()
else:
date = None
return {'group_by_expression': group_by_expression,
'column_id_list': column_id_list,
'column_value_list_list': column_value_list_list,
'date': date}
first_query_param_dict = getFirstQueryParameterDict(new_kw)
if optimisation_success:
if len(first_query_param_dict['column_id_list']):
inventory_date_line_list = self.Resource_zGetFullInventoryDate(
**first_query_param_dict)
if src__:
sql_source_list.append(
self.Resource_zGetFullInventoryDate(src__=src__,
**first_query_param_dict))
# Check that all expected uids have been found, otherwise a full
# inventory of a node/section/payment might be missing.
if len(inventory_date_line_list) >= max([len(x) for x in \
first_query_param_dict['column_value_list_list']]):
# Generate a where expression which filters on dates retrieved
# in the first query to be used in the second query.
# Also, generate a where expression to use in the third query,
# since it is based on the same data.
# XXX: uggly duplicated query generation code
# XXX: duplicates SQL variable formatting present in
# ERP5Type/patches/sqlvar.py about datetime SQL columns.
equal_date_query_list = []
greater_than_date_query_list = []
EQUAL_DATE_TABLE_ID = 'inventory_stock'
GREATER_THAN_DATE_TABLE_ID = 'stock'
for inventory_date_line_dict in \
inventory_date_line_list.dictionaries():
date = inventory_date_line_dict['date']
non_date_value_dict = dict([(k, v) for k, v \
in inventory_date_line_dict.iteritems() if k != 'date'])
equal_date_query_list.append(
ComplexQuery(
ComplexQuery(operator='AND',
*[Query(**{'%s.%s' % (EQUAL_DATE_TABLE_ID, k): v}) \
for k, v in non_date_value_dict.iteritems()]),
Query(**{'%s.date' % (EQUAL_DATE_TABLE_ID, ): date}),
operator='AND'))
greater_than_date_query_list.append(
ComplexQuery(
ComplexQuery(operator='AND',
*[Query(**{'%s.%s' % (GREATER_THAN_DATE_TABLE_ID, k): \
v}) \
for k, v in non_date_value_dict.iteritems()]),
Query(**{'%s.date' % (GREATER_THAN_DATE_TABLE_ID, ): \
'>%s' % (date.ISO(), )}),
operator='AND'))
assert len(equal_date_query_list) == \
len(greater_than_date_query_list)
assert len(equal_date_query_list) > 0
equal_date_query = ComplexQuery(operator='OR',
*equal_date_query_list).asSQLExpression()\
['where_expression']
greater_than_date_query = ComplexQuery(operator='OR',
*greater_than_date_query_list).asSQLExpression()\
['where_expression']
inventory_stock_sql_kw = \
self._generateSQLKeywordDictFromKeywordDict(
table=EQUAL_DATE_TABLE_ID, sql_kw=sql_kw, new_kw=new_kw)
inventory_stock_where_query = \
inventory_stock_sql_kw.get('where_expression', '(1)')
assert isinstance(inventory_stock_where_query, basestring) \
and len(inventory_stock_where_query)
inventory_stock_sql_kw['where_expression'] = '(%s) AND (%s)' % \
(inventory_stock_where_query, equal_date_query)
where_query = stock_sql_kw.get('where_expression', '(1)')
assert isinstance(where_query, basestring) and len(where_query)
stock_sql_kw['where_expression'] = '(%s) AND (%s)' % \
(where_query, greater_than_date_query)
# Get initial inventory amount
initial_inventory_line_list = self.Resource_zGetInventoryList(
stock_table_id='inventory_stock',
src__=src__, ignore_variation=ignore_variation,
standardise=standardise, omit_simulation=omit_simulation,
selection_domain=selection_domain,
selection_report=selection_report, precision=precision,
inventory_list=inventory_list,
statistic=statistic, **sql_kw)
statistic=statistic, **inventory_stock_sql_kw)
# Get delta inventory
delta_inventory_line_list = self.Resource_zGetInventoryList(
stock_table_id='stock',
src__=src__, ignore_variation=ignore_variation,
standardise=standardise, omit_simulation=omit_simulation,
selection_domain=selection_domain,
selection_report=selection_report, precision=precision,
inventory_list=inventory_list,
statistic=statistic, **stock_sql_kw)
# Match & add initial and delta inventories
if src__:
sql_source_list.extend((initial_inventory_line_list,
delta_inventory_line_list))
else:
if 'group_by' in new_kw:
group_by_id_list = []
group_by_id_list_append = group_by_id_list.append
for group_by_id in new_kw['group_by']:
if group_by_id == 'uid':
group_by_id_list_append('stock_uid')
else:
group_by_id_list_append(group_by_id)
def getInventoryListKey(line):
"""
Generate a key based on values used in SQL group_by
"""
return tuple([line[x] for x in group_by_id_list])
else:
def getInventoryListKey(line):
"""
No group by criterion, regroup everything.
"""
return 'dummy_key'
result_column_id_dict = {}
result_column_id_dict['inventory'] = None
result_column_id_dict['total_quantity'] = None
result_column_id_dict['total_price'] = None
def addLineValues(line_a=None, line_b=None):
"""
Addition columns of 2 lines and return a line with same
schema. If one of the parameters is None, returns the
other parameters.
Arythmetic modifications on additions:
None + x = x
None + None = None
"""
if line_a is None:
return line_b
if line_b is None:
return line_a
result = {}
for key, value in line_a.iteritems():
if key in result_column_id_dict:
value_b = line_b[key]
if None not in (value, value_b):
result[key] = value + value_b
elif value is not None:
result[key] = value
else:
result[key] = value_b
elif line_a[key] == line_b[key]:
result[key] = line_a[key]
else:
LOG('InventoryTool.getInventoryList.addLineValues', 0,
'missmatch for %s column: %s and %s' % \
(key, line_a[key], line_b[key]))
return result
inventory_list_dict = {}
for line_list in (initial_inventory_line_list,
delta_inventory_line_list):
for line in line_list.dictionaries():
line_key = getInventoryListKey(line)
line_a = inventory_list_dict.get(line_key)
inventory_list_dict[line_key] = addLineValues(line_a,
line)
## XXX: Returns a dict instead of an <r> instance
## As long as they are accessed like dicts it's ok, but...
#result = inventory_list_dict.values()
sorted_inventory_list = inventory_list_dict.values()
sort_on = new_kw.get('sort_on', tuple())
if len(sort_on) != 0:
def cmp_inventory_line(line_a, line_b):
"""
Compare 2 inventory lines and sort them according to
sort_on parameter.
"""
result = 0
for key, sort_direction in sort_on:
if not(key in line_a and key in line_b):
raise Exception, "Impossible to sort result since " \
"columns sort happens on are not available in " \
"result."
result = cmp(line_a[key], line_b[key])
if result != 0:
if len(sort_direction[0]) and \
sort_direction[0].upper() != 'A':
# Default sort is ascending, if a sort is given and
# it does not start with an 'A' then reverse sort.
# Tedious syntax checking is MySQL's job, and
# happened when queries were executed.
result *= -1
break
return result
sorted_inventory_list.sort(cmp_inventory_line)
result = Results((delta_inventory_line_list.\
_searchable_result_columns(),
tuple(sorted_inventory_list)))
else:
# Not all required full inventories are found
optimisation_success = False
else:
# Not enough criterions to trigger optimisation
optimisation_success = False
if not optimisation_success:
result = self.Resource_zGetInventoryList(
stock_table_id='stock',
src__=src__, ignore_variation=ignore_variation,
standardise=standardise, omit_simulation=omit_simulation,
selection_domain=selection_domain,
selection_report=selection_report, precision=precision,
inventory_list=inventory_list,
statistic=statistic, **stock_sql_kw)
if src__:
sql_source_list.append(result)
if src__:
result = ';\n-- NEXT QUERY\n'.join(sql_source_list)
return result
security.declareProtected(Permissions.AccessContentsInformation,
'getCurrentInventoryList')
......
......@@ -1441,6 +1441,393 @@ class TestTrackingList(InventoryAPITestCase):
self.assertEquals(len(result),2)
self.failIfDifferentSet([x.uid for x in result], [item_uid, other_item_uid])
class TestInventoryDocument(InventoryAPITestCase):
""" Test impact of creating full inventories of stock points on inventory
lookup. This is an optimisation to regular inventory system to avoid
reading all stock entries since a node/section/payment is used when
gathering its amounts of resources.
"""
def _createAutomaticInventoryAtDate(self, date, override_inventory=None,
full_inventory=False):
"""
getInventoryList is tested to work in another unit test.
If full_inventory is false, only inventoriate the first resource
found.
"""
self.tic() # Tic so that grabbed inventory is up to date.
getInventoryList = self.getSimulationTool().getInventoryList
portal = self.getPortal()
inventory_module = portal.getDefaultModule(portal_type='Inventory')
inventory = inventory_module.newContent(portal_type='Inventory')
inventory.edit(destination_value=self.node,
destination_section_value=self.section,
start_date=date,
full_inventory=full_inventory)
inventory_list = getInventoryList(node_uid=self.node.getUid(),
at_date=date,
omit_output=1)
if full_inventory:
inventory_list = [inventory_list[0]]
# TODO: Define a secon resource which will only be present in full
# inventories. This will allow testing getInentoryList.
#else:
# inventory_list.append({'resource_relative_url': '','total_quantity': 50,'variation_text': ''})
for inventory_line in inventory_list:
line = inventory.newContent(portal_type='Inventory Line')
if override_inventory is None:
total_quantity = inventory_line['total_quantity']
else:
total_quantity = override_inventory
line.edit(resource=inventory_line['resource_relative_url'],
inventory=total_quantity,
variation_text=inventory_line['variation_text'])
# TODO: pass more properties through from calcuated inventory to
# inventory lines if needed.
inventory.deliver()
return inventory
def _populateInventoryModule(self):
"""
Create 3 inventories:
Type Deviation Date (see stepCreateInitialMovements)
- partial 1000
- full 10000
- full 100000
"""
self.BASE_QUANTITY = BASE_QUANTITY = 1
# TODO: It would be better to strip numbers below seconds instead of below
# days.
self.MAX_DATE = MAX_DATE = DateTime(DateTime().Date()) - 1
self.INVENTORY_DATE_3 = INVENTORY_DATE_3 = MAX_DATE - 10 # Newest
self.INVENTORY_QUANTITY_3 = INVENTORY_QUANTITY_3 = 100000
self.INVENTORY_DATE_2 = INVENTORY_DATE_2 = INVENTORY_DATE_3 - 10
self.INVENTORY_QUANTITY_2 = INVENTORY_QUANTITY_2 = 10000
self.INVENTORY_DATE_1 = INVENTORY_DATE_1 = INVENTORY_DATE_2 - 10 # Oldest
self.INVENTORY_QUANTITY_1 = INVENTORY_QUANTITY_1 = 1000
self.movement_uid_list = movement_uid_list = []
movement = self._makeMovement(quantity=BASE_QUANTITY,
start_date=INVENTORY_DATE_1 - 1)
movement_uid_list.append(movement.getUid())
partial_inventory = self._createAutomaticInventoryAtDate(
date=INVENTORY_DATE_1, override_inventory=INVENTORY_QUANTITY_1)
movement = self._makeMovement(quantity=BASE_QUANTITY,
start_date=INVENTORY_DATE_2 - 1)
movement_uid_list.append(movement.getUid())
self._createAutomaticInventoryAtDate(date=INVENTORY_DATE_2,
override_inventory=INVENTORY_QUANTITY_2,
full_inventory=True)
movement = self._makeMovement(quantity=BASE_QUANTITY,
start_date=INVENTORY_DATE_3 - 1)
movement_uid_list.append(movement.getUid())
self._createAutomaticInventoryAtDate(date=INVENTORY_DATE_3,
override_inventory=INVENTORY_QUANTITY_3,
full_inventory=True)
movement = self._makeMovement(quantity=BASE_QUANTITY,
start_date=INVENTORY_DATE_3 + 1)
movement_uid_list.append(movement.getUid())
self.tic()
manage_test = self.getPortal().erp5_sql_transactionless_connection.manage_test
def executeSQL(query):
manage_test("BEGIN\x00%s\x00COMMIT" % (query, ))
# Make stock table inconsistent with inventory_stock to make sure
# inventory_stock is actually tested.
executeSQL("UPDATE stock SET quantity=quantity*2 WHERE uid IN (%s)" %
(', '.join([str(x) for x in movement_uid_list]), ))
self.BASE_QUANTITY *= 2
# Make inventory_stock table inconsistent with stock to make sure
# inventory_stock is actually not used when checking that partial
# inventory is not taken into account.
executeSQL("UPDATE inventory_stock SET quantity=quantity*2 WHERE "\
"uid IN (%s)" % (', '.join([str(x.getUid()) for x in \
partial_inventory.objectValues()]),
))
def afterSetUp(self):
InventoryAPITestCase.afterSetUp(self)
self._populateInventoryModule()
simulation_tool = self.getSimulationTool()
self.getInventory = simulation_tool.getInventory
self.getInventoryList = simulation_tool.getInventoryList
self.node_uid = self.node.getUid()
def _doesInventoryLineMatch(self, criterion_dict, inventory_line):
"""
True: all values from criterion_dict match given inventory_line.
False otherwise.
"""
for criterion_id, criterion_value in criterion_dict.iteritems():
if criterion_id not in inventory_line \
or criterion_value != inventory_line[criterion_id]:
return False
return True
def _checkInventoryList(self, inventory_list, criterion_dict_list,
ordered_check=False):
"""
Check that:
- inventory_list matches length of criterion_dict_list
- inventory_list contains criterions mentionned in
criterion_dict_list, line per line.
If ordered_check is true, chek that lines match in the order they are
provided.
Warning: If a criterion can match multiple line, the first encountered
line is accepted and will not be available for other checks. Sort
inventory & criterions prior to checking if there is no other way - but
it's most probable that your test is wrong if such case happens.
Given inventory must have usable methods:
__contains__ : to know if a column is present in the inventory
__getitem__ : to get the value of an inventory column
"""
if getattr(inventory_list, 'dictionaries', None) is not None:
inventory_list = inventory_list.dictionaries()
else:
inventory_list = inventory_list[:] # That list is modified in this method
self.assertEquals(len(inventory_list), len(criterion_dict_list))
for criterion_dict in criterion_dict_list:
success = False
for inventory_position in xrange(len(inventory_list)):
if self._doesInventoryLineMatch(criterion_dict,
inventory_list[inventory_position]):
del inventory_list[inventory_position]
success = True
break
if ordered_check:
# We only reach this test if first line of inventory_list didn't
# match current criterion_dict, which means lines at same initial
# position do not match.
break
# Avoid rendering assertion error messages when no error happened.
# This is because error messages might causes errors to be thrown if
# they are rendered in cases where no assertion error would happen...
# Leads to rasing exception instead of calling self.assert[...] method.
if not success:
if ordered_check:
raise AssertionError, 'Line %r do not match %r' % \
(inventory_list[inventory_position],
criterion_dict)
else:
raise AssertionError, 'No line in %r match %r' % \
(inventory_list, criterion_dict)
def getInventoryEquals(self, value, inventory_kw):
"""
Check that optimised getInventory call is equal to given value
and that unoptimised call is *not* equal to thi value.
"""
self.assertEquals(value, self.getInventory(**inventory_kw))
self.assertNotEquals(value,
self.getInventory(optimisation__=False,
**inventory_kw))
def test_01_CurrentInventoryWithFullInventory(self):
"""
Check that inventory optimisation is executed when querying current
amount (there is a usable full inventory which is the latest).
"""
self.getInventoryEquals(value=self.INVENTORY_QUANTITY_3 + \
self.BASE_QUANTITY,
inventory_kw={'node_uid': self.node_uid})
def test_02_InventoryAtLatestFullInventoryDate(self):
"""
Check that inventory optimisation is executed when querying an amount
at the exact time of latest usable full inventory.
"""
self.getInventoryEquals(value=self.INVENTORY_QUANTITY_3,
inventory_kw={'node_uid': self.node_uid,
'at_date': self.INVENTORY_DATE_3})
def test_03_InventoryAtEarlierFullInventoryDate(self):
"""
Check that inventory optimisation is executed when querying past
amount (there is a usable full inventory which is not the latest).
"""
self.getInventoryEquals(value=self.INVENTORY_QUANTITY_2 + \
self.BASE_QUANTITY,
inventory_kw={'node_uid': self.node_uid,
'at_date': self.INVENTORY_DATE_3 - \
1})
def test_04_InventoryBeforeFullInventoryAfterPartialInventory(self):
"""
Check that optimisation is not executed when querying past amount
with no usable full inventory.
If optimisation was executed,
self.INVENTORY_QUANTITY_1 * 2 + self.BASE_QUANTITY * 2
would be found.
"""
self.assertEquals(self.INVENTORY_QUANTITY_1 + self.BASE_QUANTITY * 2,
self.getInventory(node_uid=self.node_uid,
at_date=self.INVENTORY_DATE_2 - 1))
def test_05_InventoryListWithFullInventory(self):
"""
Check that inventory optimisation is executed when querying current
amount list (there is a usable full inventory which is the latest).
"""
inventory = self.getInventoryList(node_uid=self.node_uid)
reference_inventory = [
{'date': self.INVENTORY_DATE_3,
'inventory': self.INVENTORY_QUANTITY_3,
'node_uid': self.node_uid},
{'date': self.INVENTORY_DATE_3 + 1,
'inventory': self.BASE_QUANTITY,
'node_uid': self.node_uid}
]
self._checkInventoryList(inventory, reference_inventory)
def test_06_InventoryListAtLatestFullInventoryDate(self):
"""
Check that inventory optimisation is executed when querying past
amount list (there is a usable full inventory which is not the latest).
"""
inventory = self.getInventoryList(node_uid=self.node_uid,
at_date=self.INVENTORY_DATE_3)
reference_inventory = [
{'date': self.INVENTORY_DATE_3,
'inventory': self.INVENTORY_QUANTITY_3,
'node_uid': self.node_uid}
]
self._checkInventoryList(inventory, reference_inventory)
def test_07_InventoryListAtEarlierFullInventoryDate(self):
"""
Check that inventory optimisation is executed when querying past
amount list (there is a usable full inventory which is not the latest).
"""
inventory = self.getInventoryList(node_uid=self.node_uid,
at_date=self.INVENTORY_DATE_3 - 1)
reference_inventory = [
{'date': self.INVENTORY_DATE_2,
'inventory': self.INVENTORY_QUANTITY_2,
'node_uid': self.node_uid},
{'date': self.INVENTORY_DATE_3 - 1,
'inventory': self.BASE_QUANTITY,
'node_uid': self.node_uid}
]
self._checkInventoryList(inventory, reference_inventory)
def test_08_InventoryListBeforeFullInventoryAfterPartialInventory(self):
"""
Check that optimisation is not executed when querying past amount list
with no usable full inventory.
"""
inventory = self.getInventoryList(node_uid=self.node_uid,
at_date=self.INVENTORY_DATE_2 - 1)
reference_inventory = [
{'date': self.INVENTORY_DATE_1 - 1,
'inventory': self.BASE_QUANTITY,
'node_uid': self.node_uid},
{'date': self.INVENTORY_DATE_1,
'inventory': self.INVENTORY_QUANTITY_1,
'node_uid': self.node_uid},
{'date': self.INVENTORY_DATE_2 - 1,
'inventory': self.BASE_QUANTITY,
'node_uid': self.node_uid}
]
self._checkInventoryList(inventory, reference_inventory)
def test_09_InventoryListGroupedByResource(self):
"""
Group inventory list by resource explicitely, used inventory is the
latest.
"""
inventory = self.getInventoryList(node_uid=self.node_uid,
group_by_resource=1)
reference_inventory = [
{'inventory': self.INVENTORY_QUANTITY_3 + self.BASE_QUANTITY,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid}
]
self._checkInventoryList(inventory, reference_inventory)
def test_10_InventoryListGroupedByResourceBeforeLatestFullInventoryDate(self):
"""
Group inventory list by resource explicitely, used inventory is not the
latest.
"""
inventory = self.getInventoryList(node_uid=self.node_uid,
group_by_resource=1,
at_date=self.INVENTORY_DATE_3 - 1)
reference_inventory = [
{'inventory': self.INVENTORY_QUANTITY_2 + self.BASE_QUANTITY,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid}
]
self._checkInventoryList(inventory, reference_inventory)
def test_11_InventoryListAroundLatestInventoryDate(self):
"""
Test getInventoryList with a min and a max date around a full
inventory. A full inventory is used and is not the latest.
"""
inventory = self.getInventoryList(node_uid=self.node_uid,
from_date=self.INVENTORY_DATE_3 - 1,
at_date=self.INVENTORY_DATE_3 + 1)
reference_inventory = [
{'inventory': self.BASE_QUANTITY,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid,
'date': self.INVENTORY_DATE_3 - 1},
{'inventory': self.INVENTORY_QUANTITY_3,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid,
'date': self.INVENTORY_DATE_3},
{'inventory': -self.INVENTORY_QUANTITY_2,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid,
'date': self.INVENTORY_DATE_3},
{'inventory': self.BASE_QUANTITY,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid,
'date': self.INVENTORY_DATE_3 + 1}
]
self._checkInventoryList(inventory, reference_inventory)
def test_12_InventoryListWithOrderByDate(self):
"""
Test order_by is preserved by optimisation on date column.
Also sort on total_quantity column because there are inventory lines
which are on the same date but with distinct quantities.
"""
inventory = self.getInventoryList(node_uid=self.node_uid,
from_date=self.INVENTORY_DATE_3 - 1,
at_date=self.INVENTORY_DATE_3 + 1,
sort_on=(('date', 'ASC'),
('total_quantity', 'DESC')))
reference_inventory = [
{'inventory': self.BASE_QUANTITY,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid,
'date': self.INVENTORY_DATE_3 - 1},
{'inventory': self.INVENTORY_QUANTITY_3,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid,
'date': self.INVENTORY_DATE_3},
{'inventory': -self.INVENTORY_QUANTITY_2,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid,
'date': self.INVENTORY_DATE_3},
{'inventory': self.BASE_QUANTITY,
'resource_uid': self.resource.getUid(),
'node_uid': self.node_uid,
'date': self.INVENTORY_DATE_3 + 1}
]
self._checkInventoryList(inventory, reference_inventory,
ordered_check=True)
inventory = self.getInventoryList(node_uid=self.node_uid,
from_date=self.INVENTORY_DATE_3 - 1,
at_date=self.INVENTORY_DATE_3 + 1,
sort_on=(('date', 'DESC'),
('total_quantity', 'ASC')))
reference_inventory.reverse()
self._checkInventoryList(inventory, reference_inventory,
ordered_check=True)
if __name__ == '__main__':
framework()
......@@ -1455,6 +1842,7 @@ else:
suite.addTest(unittest.makeSuite(TestNextNegativeInventoryDate))
suite.addTest(unittest.makeSuite(TestInventoryStat))
suite.addTest(unittest.makeSuite(TestTrackingList))
#suite.addTest(unittest.makeSuite(TestInventoryDocument))
return suite
# vim: foldmethod=marker
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment