diff --git a/product/ERP5Catalog/tests/testERP5Catalog.py b/product/ERP5Catalog/tests/testERP5Catalog.py index 2825456ebfdb1c0f2f460247e7be882e744f11cd..63edbc233cdadab15c7296559824f60820feabf3 100644 --- a/product/ERP5Catalog/tests/testERP5Catalog.py +++ b/product/ERP5Catalog/tests/testERP5Catalog.py @@ -39,13 +39,14 @@ from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from AccessControl.SecurityManagement import newSecurityManager from zLOG import LOG from DateTime import DateTime +from Products.CMFCore.tests.base.testcase import LogInterceptor try: from transaction import get as get_transaction except ImportError: pass -class TestERP5Catalog(ERP5TypeTestCase): +class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): """ Tests for ERP5 Catalog. """ @@ -62,10 +63,15 @@ class TestERP5Catalog(ERP5TypeTestCase): def afterSetUp(self, quiet=1, run=1): self.login() - portal = self.getPortal() - catalog_tool = self.getCatalogTool() - # XXX This does not works - #catalog_tool.reindexObject(portal) + + def beforeTearDown(self): + for module in [ self.getPersonModule(), + self.getOrganisationModule(), + self.getCategoryTool().region, + self.getCategoryTool().group ]: + module.manage_delObjects(list(module.objectIds())) + self.getPortal().portal_activities.manageClearActivities() + get_transaction().commit() def login(self, quiet=0, run=run_all_test): uf = self.getPortal().acl_users @@ -740,3 +746,70 @@ class TestERP5Catalog(ERP5TypeTestCase): self.assertNotEquals([], self.getCatalogTool().searchResults( portal_type='Person', title=u'A Person')) + def test_SortOn(self, quiet=quiet, run=run_all_test): + if not run: return + self.assertEquals('catalog.title', + self.getCatalogTool().buildSQLQuery( + sort_on=(('catalog.title', 'ascending'),))['order_by_expression']) + + def test_SortOnDescending(self, quiet=quiet, run=run_all_test): + if not run: return + self.assertEquals('catalog.title DESC', + self.getCatalogTool().buildSQLQuery( + sort_on=(('catalog.title', 'descending'),))['order_by_expression']) + + def test_SortOnUnknownKeys(self, quiet=quiet, run=run_all_test): + if not run: return + self.assertEquals('', + self.getCatalogTool().buildSQLQuery( + sort_on=(('ignored', 'ascending'),))['order_by_expression']) + + def test_SortOnAmbigousKeys(self, quiet=quiet, run=run_all_test): + if not run: return + # if the sort key is found on the catalog table, it will use that catalog + # table. + self.assertEquals('catalog.title', + self.getCatalogTool().buildSQLQuery( + sort_on=(('title', 'ascending'),))['order_by_expression']) + + # if not found on catalog, it won't do any filtering + # in the above, start_date exists both in delivery and movement table. + self._catch_log_errors(ignored_level = sys.maxint) + self.assertEquals('', + self.getCatalogTool().buildSQLQuery( + sort_on=(('start_date', 'ascending'),))['order_by_expression']) + self._ignore_log_errors() + # buildSQLQuery will simply put a warning in the error log and ignore + # this key + logged_errors = [ logrecord for logrecord in self.logged + if logrecord[0] == 'SQLCatalog' ] + self.failUnless( + 'could not build the new sort index' in logged_errors[0][2]) + + # of course, in that case, it's possible to prefix with table name + self.assertEquals('delivery.start_date', + self.getCatalogTool().buildSQLQuery( + sort_on=(('delivery.start_date', 'ascending'), + ))['order_by_expression']) + + def test_SortOnMultipleKeys(self, quiet=quiet, run=run_all_test): + if not run: return + self.assertEquals('catalog.title,catalog.id', + self.getCatalogTool().buildSQLQuery( + sort_on=(('catalog.title', 'ascending'), + ('catalog.id', 'asc'))) + ['order_by_expression'].replace(' ', '')) + + def test_SortOnRelatedKey(self, quiet=quiet, run=run_all_test): + """Sort-on parameter and related key. (Assumes that region_title is a \ + valid related key)""" + if not run: return + self.assertNotEquals('', + self.getCatalogTool().buildSQLQuery(region_title='', + sort_on=(('region_title', 'ascending'),))['order_by_expression']) + self.assertNotEquals('', + self.getCatalogTool().buildSQLQuery( + sort_on=(('region_title', 'ascending'),))['order_by_expression'], + 'sort_on parameter must be taken into account even if related key ' + 'is not a parameter of the current query') + diff --git a/product/ZSQLCatalog/SQLCatalog.py b/product/ZSQLCatalog/SQLCatalog.py index 64c389099a671806e0ecdfc912fe720f833b5e4c..1f8194d737abf38d36f454af3f5bf3e739a71784 100644 --- a/product/ZSQLCatalog/SQLCatalog.py +++ b/product/ZSQLCatalog/SQLCatalog.py @@ -1380,7 +1380,7 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): return full_list def buildSQLQuery(self, query_table='catalog', REQUEST=None, - ignore_empty_string=1,**kw): + ignore_empty_string=1, **kw): """ Builds a complex SQL query to simulate ZCalatog behaviour """ # Get search arguments: if REQUEST is None and (kw is None or kw == {}): @@ -1401,10 +1401,45 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): topic_search_keys = self.sql_catalog_topic_search_keys multivalue_keys = self.sql_catalog_multivalue_keys + + # Compute "sort_index", which is a sort index, or none: + if kw.has_key('sort-on'): + sort_index=kw['sort-on'] + elif hasattr(self, 'sort-on'): + sort_index=getattr(self, 'sort-on') + elif kw.has_key('sort_on'): + sort_index=kw['sort_on'] + else: sort_index=None + + # Compute the sort order + if kw.has_key('sort-order'): + so=kw['sort-order'] + elif hasattr(self, 'sort-order'): + so=getattr(self, 'sort-order') + elif kw.has_key('sort_order'): + so=kw['sort_order'] + else: so=None + + # We must now turn sort_index into + # a dict with keys as sort keys and values as sort order + if isinstance(sort_index, basestring): + sort_index = [(sort_index, so)] + elif not isinstance(sort_index, (list, tuple)): + sort_index = None + + # if we have a sort index, we must take it into account to get related + # keys. + if sort_index: + related_key_kw = dict(kw) + for sort_info in sort_index: + related_key_kw.setdefault(sort_info[0], '') + related_tuples = self.getSqlCatalogRelatedKeyList(**related_key_kw) + else: + related_tuples = self.getSqlCatalogRelatedKeyList(**kw) + # Define related maps - # each tuple has the form (key, 'table1,table2,table3/column/where_expression') - related_tuples = self.getSqlCatalogRelatedKeyList(**kw) - #LOG('related_tuples', 0, str(related_tuples)) + # each tuple from `related_tuples` has the form (key, + # 'table1,table2,table3/column/where_expression') related_keys = [] related_method = {} related_table_map = {} @@ -1451,35 +1486,6 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): where_expression = [] from_table_dict = {'catalog' : 'catalog'} # Always include catalog table - - # Compute "sort_index", which is a sort index, or none: - if kw.has_key('sort-on'): - sort_index=kw['sort-on'] - elif hasattr(self, 'sort-on'): - sort_index=getattr(self, 'sort-on') - elif kw.has_key('sort_on'): - sort_index=kw['sort_on'] - else: sort_index=None - - # Compute the sort order - if kw.has_key('sort-order'): - so=kw['sort-order'] - elif hasattr(self, 'sort-order'): - so=getattr(self, 'sort-order') - elif kw.has_key('sort_order'): - so=kw['sort_order'] - else: so=None - - # We must now turn sort_index into - # a dict with keys as sort keys and values as sort order - if isinstance(sort_index, basestring): - sort_index = [(sort_index, so)] - elif not isinstance(sort_index, (list, tuple)): - sort_index = None - - - # If sort_index is a dictionnary - # then parse it and change it sort_on = None if sort_index is not None: new_sort_index = [] @@ -1528,7 +1534,7 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): raise except: LOG('SQLCatalog', WARNING, 'buildSQLQuery could not build the new sort index', error=sys.exc_info()) - pass + sort_on = '' # Rebuild keywords to behave as new style query (_usage='toto:titi' becomes {'toto':'titi'}) new_kw = {} @@ -1698,7 +1704,7 @@ class Catalog(Folder, Persistent, Acquisition.Implicit, ExtensionClass.Base): if k != query_table: where_expression.append('%s.uid = %s.uid' % (query_table, tid)) # Calculate extra where_expressions based on related definition - for (table_list,method_id) in related_methods.keys(): + for (table_list, method_id) in related_methods.keys(): related_method = getattr(self, method_id, None) if related_method is not None: table_id = {'src__' : 1} # Return query source, do not evaluate