Commit f5df17ce authored by Jérome Perrin's avatar Jérome Perrin

modernize testERP5Catalog

parent 3438bad5
...@@ -96,8 +96,6 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -96,8 +96,6 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
return ('erp5_full_text_myisam_catalog', 'erp5_base',) return ('erp5_full_text_myisam_catalog', 'erp5_base',)
# Different variables used for this test # Different variables used for this test
run_all_test = 1
quiet = 0
username = 'seb' username = 'seb'
new_erp5_sql_connection = 'erp5_sql_connection2' new_erp5_sql_connection = 'erp5_sql_connection2'
new_erp5_deferred_sql_connection = 'erp5_sql_deferred_connection2' new_erp5_deferred_sql_connection = 'erp5_sql_deferred_connection2'
...@@ -173,33 +171,20 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -173,33 +171,20 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.failUnless(path not in path_list) self.failUnless(path not in path_list)
LOG('checkRelativeUrlInSQLPathList not found path:',0,path) LOG('checkRelativeUrlInSQLPathList not found path:',0,path)
def test_01_HasEverything(self, quiet=quiet, run=run_all_test): def test_01_HasEverything(self):
if not run: return self.assertNotEquals(self.getCategoryTool(), None)
if not quiet: self.assertNotEquals(self.getSimulationTool(), None)
ZopeTestCase._print('\nTest Has Everything ') self.assertNotEquals(self.getTypeTool(), None)
LOG('Testing... ',0,'testHasEverything') self.assertNotEquals(self.getSQLConnection(), None)
self.failUnless(self.getCategoryTool()!=None) self.assertNotEquals(self.getCatalogTool(), None)
self.failUnless(self.getSimulationTool()!=None)
self.failUnless(self.getTypeTool()!=None) def test_02_EverythingCatalogued(self):
self.failUnless(self.getSQLConnection()!=None)
self.failUnless(self.getCatalogTool()!=None)
def test_02_EverythingCatalogued(self, quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
ZopeTestCase._print('\nTest Everything Catalogued')
LOG('Testing... ',0,'testEverythingCatalogued')
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
self.tic() self.tic()
organisation_module_list = portal_catalog(portal_type='Organisation Module') organisation_module_list = portal_catalog(portal_type='Organisation Module')
self.assertEquals(len(organisation_module_list),1) self.assertEquals(len(organisation_module_list),1)
def test_03_CreateAndDeleteObject(self, quiet=quiet, run=run_all_test): def test_03_CreateAndDeleteObject(self):
if not run: return
if not quiet:
message = 'Test Create And Delete Objects'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
person_module = self.getPersonModule() person_module = self.getPersonModule()
person = person_module.newContent(id='1',portal_type='Person') person = person_module.newContent(id='1',portal_type='Person')
...@@ -233,12 +218,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -233,12 +218,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.tic() self.tic()
self.checkRelativeUrlNotInSQLPathList(path_list) self.checkRelativeUrlNotInSQLPathList(path_list)
def test_04_SearchFolderWithDeletedObjects(self, quiet=quiet, run=run_all_test): def test_04_SearchFolderWithDeletedObjects(self):
if not run: return
if not quiet:
message = 'Search Folder With Deleted Objects'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Now we will try the same thing as previous test and look at searchFolder # Now we will try the same thing as previous test and look at searchFolder
folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()] folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
...@@ -253,13 +233,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -253,13 +233,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()] folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
self.assertEquals([],folder_object_list) self.assertEquals([],folder_object_list)
def test_05_SearchFolderWithImmediateReindexObject(self, quiet=quiet, run=run_all_test): def test_05_SearchFolderWithImmediateReindexObject(self):
if not run: return
if not quiet:
message = 'Search Folder With Immediate Reindex Object'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Now we will try the same thing as previous test and look at searchFolder # Now we will try the same thing as previous test and look at searchFolder
...@@ -270,20 +244,13 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -270,20 +244,13 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
person.immediateReindexObject() person.immediateReindexObject()
folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()] folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
self.assertEquals(['4'],folder_object_list) self.assertEquals(['4'],folder_object_list)
person_module.manage_delObjects('4') person_module.manage_delObjects('4')
self.tic() self.tic()
folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()] folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
self.assertEquals([],folder_object_list) self.assertEquals([],folder_object_list)
def test_06_SearchFolderWithRecursiveImmediateReindexObject(self, def test_06_SearchFolderWithRecursiveImmediateReindexObject(self):
quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Search Folder With Recursive Immediate Reindex Object'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Now we will try the same thing as previous test and look at searchFolder # Now we will try the same thing as previous test and look at searchFolder
...@@ -294,19 +261,13 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -294,19 +261,13 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
person_module.recursiveImmediateReindexObject() person_module.recursiveImmediateReindexObject()
folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()] folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
self.assertEquals(['4'],folder_object_list) self.assertEquals(['4'],folder_object_list)
person_module.manage_delObjects('4') person_module.manage_delObjects('4')
self.tic() self.tic()
folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()] folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
self.assertEquals([],folder_object_list) self.assertEquals([],folder_object_list)
def test_07_ClearCatalogAndTestNewContent(self, quiet=quiet, run=run_all_test): def test_07_ClearCatalogAndTestNewContent(self):
if not run: return
if not quiet:
message = 'Clear Catalog And Test New Content'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Clear catalog # Clear catalog
...@@ -318,14 +279,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -318,14 +279,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()] folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
self.assertEquals(['4'],folder_object_list) self.assertEquals(['4'],folder_object_list)
def test_08_ClearCatalogAndTestRecursiveImmediateReindexObject(self, def test_08_ClearCatalogAndTestRecursiveImmediateReindexObject(self):
quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Clear Catalog And Test Recursive Immediate Reindex Object'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Clear catalog # Clear catalog
...@@ -337,14 +291,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -337,14 +291,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()] folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
self.assertEquals(['4'],folder_object_list) self.assertEquals(['4'],folder_object_list)
def test_09_ClearCatalogAndTestImmediateReindexObject(self, quiet=quiet, def test_09_ClearCatalogAndTestImmediateReindexObject(self):
run=run_all_test):
if not run: return
if not quiet:
message = 'Clear Catalog And Test Immediate Reindex Object'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Clear catalog # Clear catalog
...@@ -356,13 +303,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -356,13 +303,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()] folder_object_list = [x.getObject().getId() for x in person_module.searchFolder()]
self.assertEquals(['4'],folder_object_list) self.assertEquals(['4'],folder_object_list)
def test_10_OrderedSearchFolder(self, quiet=quiet, run=run_all_test): def test_10_OrderedSearchFolder(self):
if not run: return
if not quiet:
message = 'Ordered Search Folder'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Clear catalog # Clear catalog
...@@ -387,13 +328,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -387,13 +328,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
sort_on=[('title','ascending'),('description','descending')])] sort_on=[('title','ascending'),('description','descending')])]
self.assertEquals(['a','b','c'],folder_object_list) self.assertEquals(['a','b','c'],folder_object_list)
def test_11_CastStringAsInt(self, quiet=quiet, run=run_all_test): def test_11_CastStringAsInt(self):
if not run: return
if not quiet:
message = 'Cast String As Int With Order By'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Clear catalog # Clear catalog
...@@ -411,13 +346,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -411,13 +346,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
folder_object_list = [x.getObject().getTitle() for x in person_module.searchFolder(sort_on=[('title','ascending','int')])] folder_object_list = [x.getObject().getTitle() for x in person_module.searchFolder(sort_on=[('title','ascending','int')])]
self.assertEquals(['1','2','12'],folder_object_list) self.assertEquals(['1','2','12'],folder_object_list)
def test_12_TransactionalUidBuffer(self, quiet=quiet, run=run_all_test): def test_12_TransactionalUidBuffer(self):
if not run: return
if not quiet:
message = 'Transactional Uid Buffer'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
catalog = portal_catalog.getSQLCatalog() catalog = portal_catalog.getSQLCatalog()
self.failUnless(catalog is not None) self.failUnless(catalog is not None)
...@@ -445,12 +374,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -445,12 +374,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
uid_buffer = getUIDBuffer() uid_buffer = getUIDBuffer()
self.failUnless(len(uid_buffer) == 0) self.failUnless(len(uid_buffer) == 0)
def test_13_ERP5Site_reindexAll(self, quiet=quiet, run=run_all_test): def test_13_ERP5Site_reindexAll(self):
if not run: return
if not quiet:
message = 'ERP5Site_reindexAll'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
# Flush message queue # Flush message queue
self.tic() self.tic()
# Create some objects # Create some objects
...@@ -487,17 +411,13 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -487,17 +411,13 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
new_path_list = self.getSQLPathList() new_path_list = self.getSQLPathList()
self.assertEquals(set(original_path_list) - set(new_path_list), set()) self.assertEquals(set(original_path_list) - set(new_path_list), set())
def test_14_ReindexWithBrokenCategory(self, quiet=quiet, run=run_all_test): def test_14_ReindexWithBrokenCategory(self):
if not run: return """Reindexing an object with 1 broken category must not affect other valid
if not quiet: categories"""
message = 'Reindexing an object with 1 broken category must not'\
' affect other valid categories '
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ', 0, message)
# Flush message queue # Flush message queue
self.tic() self.tic()
# Create some objects # Create some objects
portal = self.getPortal() portal = self.portal
portal_category = self.getCategoryTool() portal_category = self.getCategoryTool()
group_nexedi_category = portal_category.group\ group_nexedi_category = portal_category.group\
.newContent( id = 'nexedi', ) .newContent( id = 'nexedi', )
...@@ -548,41 +468,30 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -548,41 +468,30 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertEquals(theorical_count, cataloged_obj_count, self.assertEquals(theorical_count, cataloged_obj_count,
'category %s is not cataloged correctly' % base_cat) 'category %s is not cataloged correctly' % base_cat)
def test_15_getObject(self, quiet=quiet, run=run_all_test): def test_15_getObject(self,):
if not run: return
if not quiet:
message = 'getObject'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
# portal_catalog.getObject raises a ValueError if UID parameter is a string # portal_catalog.getObject raises a ValueError if UID parameter is a string
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
self.assertRaises(ValueError, portal_catalog.getObject, "StringUID") self.assertRaises(ValueError, portal_catalog.getObject, "StringUID")
obj = self._makeOrganisation() obj = self._makeOrganisation()
# otherwise it returns the object # otherwise it returns the object
self.assertEquals(obj, portal_catalog.getObject(obj.getUid()).getObject()) self.assertEquals(obj, portal_catalog.getObject(obj.getUid()).getObject())
# but raises KeyError if object is not in catalog # but raises KeyError if object is not in catalog
self.assertRaises(KeyError, portal_catalog.getObject, sys.maxint) self.assertRaises(KeyError, portal_catalog.getObject, sys.maxint)
def test_getRecordForUid(self): def test_getRecordForUid(self):
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
obj = self._makeOrganisation() obj = self._makeOrganisation()
self.assertEquals(obj, self.assertEquals(obj,
portal_catalog.getSQLCatalog().getRecordForUid(obj.getUid()).getObject()) portal_catalog.getSQLCatalog().getRecordForUid(obj.getUid()).getObject())
def test_path(self): def test_path(self):
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
obj = self._makeOrganisation() obj = self._makeOrganisation()
self.assertEquals(obj.getPath(), portal_catalog.getpath(obj.getUid())) self.assertEquals(obj.getPath(), portal_catalog.getpath(obj.getUid()))
self.assertRaises(KeyError, portal_catalog.getpath, sys.maxint) self.assertRaises(KeyError, portal_catalog.getpath, sys.maxint)
def test_16_newUid(self):
def test_16_newUid(self, quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'newUid'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
# newUid should not assign the same uid # newUid should not assign the same uid
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
from Products.ZSQLCatalog.SQLCatalog import UID_BUFFER_SIZE from Products.ZSQLCatalog.SQLCatalog import UID_BUFFER_SIZE
...@@ -592,17 +501,12 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -592,17 +501,12 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.failUnless(isinstance(uid, long)) self.failUnless(isinstance(uid, long))
self.failIf(uid in uid_dict) self.failIf(uid in uid_dict)
uid_dict[uid] = None uid_dict[uid] = None
def test_17_CreationDate_ModificationDate(self, quiet=quiet, run=run_all_test): def test_17_CreationDate_ModificationDate(self):
if not run: return
if not quiet:
message = 'getCreationDate, getModificationDate'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
portal = self.getPortal() portal = self.getPortal()
sql_connection = self.getSQLConnection() sql_connection = self.getSQLConnection()
module = portal.getDefaultModule('Organisation') module = portal.getDefaultModule('Organisation')
organisation = module.newContent(portal_type='Organisation',) organisation = module.newContent(portal_type='Organisation',)
creation_date = organisation.getCreationDate().toZone('UTC').ISO() creation_date = organisation.getCreationDate().toZone('UTC').ISO()
...@@ -619,7 +523,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -619,7 +523,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
result[0]['modification_date'].ISO()) result[0]['modification_date'].ISO())
self.assertEquals(creation_date, self.assertEquals(creation_date,
result[0]['modification_date'].ISO()) result[0]['modification_date'].ISO())
import time; time.sleep(3) import time; time.sleep(3)
organisation.edit(title='edited') organisation.edit(title='edited')
self.tic() self.tic()
...@@ -635,16 +539,12 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -635,16 +539,12 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
result[0]['modification_date'].ISO()) result[0]['modification_date'].ISO())
self.assertTrue(organisation.getModificationDate()>now) self.assertTrue(organisation.getModificationDate()>now)
self.assertTrue(result[0]['creation_date']<result[0]['modification_date']) self.assertTrue(result[0]['creation_date']<result[0]['modification_date'])
# TODO: this test is disabled (and maybe not complete), because this feature # TODO: this test is disabled (and maybe not complete), because this feature
# is not implemented # is not implemented
def test_18_buildSQLQueryAnotherTable(self, quiet=quiet, run=0): @todo_erp5
def test_18_buildSQLQueryAnotherTable(self):
"""Tests that buildSQLQuery works with another query_table than 'catalog'""" """Tests that buildSQLQuery works with another query_table than 'catalog'"""
if not run: return
if not quiet:
message = 'buildSQLQuery with query_table'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
portal = self.getPortal() portal = self.getPortal()
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
# clear catalog # clear catalog
...@@ -749,15 +649,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -749,15 +649,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertEquals( brains[0]['uid'], self.assertEquals( brains[0]['uid'],
source_organisation.getUid(), source_organisation.getUid(),
testMethod(src__=1, **kw) ) testMethod(src__=1, **kw) )
def test_19_SearchFolderWithNonAsciiCharacter(self,
quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Search Folder With Non Ascii Character'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
def test_19_SearchFolderWithNonAsciiCharacter(self):
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Now we will try the same thing as previous test and look at searchFolder # Now we will try the same thing as previous test and look at searchFolder
...@@ -769,16 +662,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -769,16 +662,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
folder_object_list = [x.getObject().getId() for x in folder_object_list = [x.getObject().getId() for x in
person_module.searchFolder(title=title)] person_module.searchFolder(title=title)]
self.assertEquals(['5'],folder_object_list) self.assertEquals(['5'],folder_object_list)
def test_20_SearchFolderWithDynamicRelatedKey(self,
quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Search Folder With Dynamic Related Key'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
def test_20_SearchFolderWithDynamicRelatedKey(self):
# Create some objects # Create some objects
portal = self.getPortal() portal = self.getPortal()
portal_category = self.getCategoryTool() portal_category = self.getCategoryTool()
...@@ -840,14 +726,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -840,14 +726,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertEquals(organisation_list,[organisation2]) self.assertEquals(organisation_list,[organisation2])
def test_21_SearchFolderWithDynamicStrictRelatedKey(self, def test_21_SearchFolderWithDynamicStrictRelatedKey(self):
quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Search Folder With Strict Dynamic Related Key'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
# Create some objects # Create some objects
portal = self.getPortal() portal = self.getPortal()
portal_category = self.getCategoryTool() portal_category = self.getCategoryTool()
...@@ -883,25 +762,14 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -883,25 +762,14 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
module.searchFolder(strict_group_description='b')] module.searchFolder(strict_group_description='b')]
self.assertEquals(organisation_list,[organisation]) self.assertEquals(organisation_list,[organisation])
def test_22_SearchingWithUnicode(self, quiet=quiet, run=run_all_test): def test_22_SearchingWithUnicode(self):
if not run: return
if not quiet:
message = 'Test searching with unicode'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
person_module.newContent(portal_type='Person', title='A Person') person_module.newContent(portal_type='Person', title='A Person')
self.tic() self.tic()
self.assertNotEquals([], self.getCatalogTool().searchResults( self.assertNotEquals([], self.getCatalogTool().searchResults(
portal_type='Person', title=u'A Person')) portal_type='Person', title=u'A Person'))
def test_23_DeleteObjectRaiseErrorWhenQueryFail(self, quiet=quiet, run=run_all_test): def test_23_DeleteObjectRaiseErrorWhenQueryFail(self):
if not run: return
if not quiet:
message = 'Test That Delete Object Raise Error When the Query Fail'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Now we will ask to immediatly reindex # Now we will ask to immediatly reindex
...@@ -920,51 +788,31 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -920,51 +788,31 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertRaises(AttributeError,unindex,person,uid=person.getUid()) self.assertRaises(AttributeError,unindex,person,uid=person.getUid())
self.abort() self.abort()
def test_24_SortOn(self, quiet=quiet, run=run_all_test): def test_24_SortOn(self):
if not run: return
if not quiet:
message = 'Test Sort On'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
self.assertTrue( self.assertTrue(
self.getCatalogTool().buildSQLQuery( self.getCatalogTool().buildSQLQuery(
sort_on=(('catalog.title', 'ascending'),))['order_by_expression'] in \ sort_on=(('catalog.title', 'ascending'),))['order_by_expression'] in \
('catalog.title', '`catalog`.`title` ASC', 'catalog.title ASC')) ('catalog.title', '`catalog`.`title` ASC', 'catalog.title ASC'))
def test_25_SortOnDescending(self, quiet=quiet, run=run_all_test): def test_25_SortOnDescending(self):
if not run: return
if not quiet:
message = 'Test Sort On Descending'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
self.assertTrue( self.assertTrue(
self.getCatalogTool().buildSQLQuery( self.getCatalogTool().buildSQLQuery(
sort_on=(('catalog.title', 'descending'),))['order_by_expression'] in \ sort_on=(('catalog.title', 'descending'),))['order_by_expression'] in \
('catalog.title DESC', '`catalog`.`title` DESC')) ('catalog.title DESC', '`catalog`.`title` DESC'))
def test_26_SortOnUnknownKeys(self, quiet=quiet, run=run_all_test): def test_26_SortOnUnknownKeys(self):
if not run: return
if not quiet:
message = 'Test Sort On Unknow Keys'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
self.assertEquals('', self.assertEquals('',
self.getCatalogTool().buildSQLQuery(select_list=('uid', 'path'), self.getCatalogTool().buildSQLQuery(select_list=('uid', 'path'),
sort_on=(('ignored', 'ascending'),))['order_by_expression']) sort_on=(('ignored', 'ascending'),))['order_by_expression'])
def test_27_SortOnAmbigousKeys(self, quiet=quiet, run=run_all_test): def test_27_SortOnAmbigousKeys(self):
if not run: return
if not quiet:
message = 'Test Sort On Ambigous Keys'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
# if the sort key is found on the catalog table, it will use that catalog # if the sort key is found on the catalog table, it will use that catalog
# table. # table.
self.assertTrue( self.assertTrue(
self.getCatalogTool().buildSQLQuery( self.getCatalogTool().buildSQLQuery(
sort_on=(('title', 'ascending'),))['order_by_expression'] in \ sort_on=(('title', 'ascending'),))['order_by_expression'] in \
('catalog.title', '`catalog`.`title` ASC')) ('catalog.title', '`catalog`.`title` ASC'))
# if not found on catalog, it won't do any filtering # if not found on catalog, it won't do any filtering
# in the above, start_date exists both in delivery and movement table. # in the above, start_date exists both in delivery and movement table.
self.assertRaises(ValueError, self.getCatalogTool().buildSQLQuery, self.assertRaises(ValueError, self.getCatalogTool().buildSQLQuery,
...@@ -976,13 +824,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -976,13 +824,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
sort_on=(('delivery.start_date', 'ascending'), sort_on=(('delivery.start_date', 'ascending'),
))['order_by_expression'] in \ ))['order_by_expression'] in \
('delivery.start_date', 'delivery.start_date ASC')) ('delivery.start_date', 'delivery.start_date ASC'))
def test_28_SortOnMultipleKeys(self, quiet=quiet, run=run_all_test): def test_28_SortOnMultipleKeys(self):
if not run: return
if not quiet:
message = 'Test Sort On Multiple Keys'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
self.assertTrue( self.assertTrue(
self.getCatalogTool().buildSQLQuery( self.getCatalogTool().buildSQLQuery(
sort_on=(('catalog.title', 'ascending'), sort_on=(('catalog.title', 'ascending'),
...@@ -990,14 +833,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -990,14 +833,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
['order_by_expression'].replace(' ', '') in \ ['order_by_expression'].replace(' ', '') in \
('catalog.title,catalog.id', '`catalog`.`title`ASC,`catalog`.`id`ASC', 'catalog.titleASC,catalog.idASC')) ('catalog.title,catalog.id', '`catalog`.`title`ASC,`catalog`.`id`ASC', 'catalog.titleASC,catalog.idASC'))
def test_29_SortOnRelatedKey(self, quiet=quiet, run=run_all_test): def test_29_SortOnRelatedKey(self):
"""Sort-on parameter and related key. (Assumes that region_title is a \ """Sort-on parameter and related key. (Assumes that region_title is a \
valid related key)""" valid related key)"""
if not run: return
if not quiet:
message = 'Test Sort On Related Keys'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
self.assertTrue( self.assertTrue(
self.getCatalogTool().buildSQLQuery(region_title='foo', self.getCatalogTool().buildSQLQuery(region_title='foo',
sort_on=(('region_title', 'ascending'),))['order_by_expression'].endswith('.`title` ASC')) sort_on=(('region_title', 'ascending'),))['order_by_expression'].endswith('.`title` ASC'))
...@@ -1030,29 +868,19 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1030,29 +868,19 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
organisation.edit(**kw) organisation.edit(**kw)
self.tic() self.tic()
return organisation return organisation
def test_30_SimpleQueryDict(self, quiet=quiet, run=run_all_test): def test_30_SimpleQueryDict(self):
"""use a dict as a keyword parameter. """use a dict as a keyword parameter.
""" """
if not run: return
if not quiet:
message = 'Test Simple Query Dict'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
organisation_title = 'Nexedi Organisation' organisation_title = 'Nexedi Organisation'
organisation = self._makeOrganisation(title=organisation_title) organisation = self._makeOrganisation(title=organisation_title)
self.assertEquals([organisation.getPath()], self.assertEquals([organisation.getPath()],
[x.path for x in self.getCatalogTool()( [x.path for x in self.getCatalogTool()(
title={'query': organisation_title})]) title={'query': organisation_title})])
def test_31_RelatedKeySimpleQueryDict(self, quiet=quiet, run=run_all_test): def test_31_RelatedKeySimpleQueryDict(self):
"""use a dict as a keyword parameter, but using a related key """use a dict as a keyword parameter, but using a related key
""" """
if not run: return
if not quiet:
message = 'Test Related Key Simple Query Dict'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
organisation = self._makeOrganisation() organisation = self._makeOrganisation()
self.assertEquals([organisation.getPath()], self.assertEquals([organisation.getPath()],
[x.path for x in self.getCatalogTool()( [x.path for x in self.getCatalogTool()(
...@@ -1061,15 +889,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1061,15 +889,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
# also member of itself # also member of itself
portal_type=organisation.getPortalTypeName())]) portal_type=organisation.getPortalTypeName())])
def test_32_SimpleQueryDictWithOrOperator(self, quiet=quiet, def test_32_SimpleQueryDictWithOrOperator(self):
run=run_all_test):
"""use a dict as a keyword parameter, with OR operator. """use a dict as a keyword parameter, with OR operator.
""" """
if not run: return
if not quiet:
message = 'Test Query Dict With Or Operator'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
organisation_title = 'Nexedi Organisation' organisation_title = 'Nexedi Organisation'
organisation = self._makeOrganisation(title=organisation_title) organisation = self._makeOrganisation(title=organisation_title)
...@@ -1078,15 +900,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1078,15 +900,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
title={'query': (organisation_title, 'something else'), title={'query': (organisation_title, 'something else'),
'operator': 'or'})]) 'operator': 'or'})])
def test_33_SimpleQueryDictWithAndOperator(self, quiet=quiet, def test_33_SimpleQueryDictWithAndOperator(self):
run=run_all_test):
"""use a dict as a keyword parameter, with AND operator. """use a dict as a keyword parameter, with AND operator.
""" """
if not run: return
if not quiet:
message = 'Test Query Dict With And Operator'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
organisation_title = 'Nexedi Organisation' organisation_title = 'Nexedi Organisation'
organisation = self._makeOrganisation(title=organisation_title) organisation = self._makeOrganisation(title=organisation_title)
...@@ -1096,92 +912,62 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1096,92 +912,62 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
title={'query': (organisation_title, organisation_title), title={'query': (organisation_title, organisation_title),
'operator': 'and'})]) 'operator': 'and'})])
def test_34_SimpleQueryDictWithMaxRangeParameter(self, quiet=quiet, def test_34_SimpleQueryDictWithMaxRangeParameter(self):
run=run_all_test):
"""use a dict as a keyword parameter, with max range parameter ( < ) """use a dict as a keyword parameter, with max range parameter ( < )
""" """
if not run: return
if not quiet:
message = 'Test Query Dict With Max Range Operator'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
org_a = self._makeOrganisation(title='A') org_a = self._makeOrganisation(title='A')
org_b = self._makeOrganisation(title='B') org_b = self._makeOrganisation(title='B')
org_c = self._makeOrganisation(title='C') org_c = self._makeOrganisation(title='C')
self.assertEquals([org_a.getPath()], self.assertEquals([org_a.getPath()],
[x.path for x in self.getCatalogTool()( [x.path for x in self.getCatalogTool()(
portal_type='Organisation', portal_type='Organisation',
title={'query': 'B', 'range': 'max'})]) title={'query': 'B', 'range': 'max'})])
def test_35_SimpleQueryDictWithMinRangeParameter(self, quiet=quiet, def test_35_SimpleQueryDictWithMinRangeParameter(self):
run=run_all_test):
"""use a dict as a keyword parameter, with min range parameter ( >= ) """use a dict as a keyword parameter, with min range parameter ( >= )
""" """
if not run: return
if not quiet:
message = 'Test Query Dict With Min Range Operator'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
org_a = self._makeOrganisation(title='A') org_a = self._makeOrganisation(title='A')
org_b = self._makeOrganisation(title='B') org_b = self._makeOrganisation(title='B')
org_c = self._makeOrganisation(title='C') org_c = self._makeOrganisation(title='C')
self.failIfDifferentSet([org_b.getPath(), org_c.getPath()], self.failIfDifferentSet([org_b.getPath(), org_c.getPath()],
[x.path for x in self.getCatalogTool()( [x.path for x in self.getCatalogTool()(
portal_type='Organisation', portal_type='Organisation',
title={'query': 'B', 'range': 'min'})]) title={'query': 'B', 'range': 'min'})])
def test_36_SimpleQueryDictWithNgtRangeParameter(self, quiet=quiet, def test_36_SimpleQueryDictWithNgtRangeParameter(self):
run=run_all_test):
"""use a dict as a keyword parameter, with ngt range parameter ( <= ) """use a dict as a keyword parameter, with ngt range parameter ( <= )
""" """
if not run: return
if not quiet:
message = 'Test Query Dict With Ngt Range Operator'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
org_a = self._makeOrganisation(title='A') org_a = self._makeOrganisation(title='A')
org_b = self._makeOrganisation(title='B') org_b = self._makeOrganisation(title='B')
org_c = self._makeOrganisation(title='C') org_c = self._makeOrganisation(title='C')
self.failIfDifferentSet([org_a.getPath(), org_b.getPath()], self.failIfDifferentSet([org_a.getPath(), org_b.getPath()],
[x.path for x in self.getCatalogTool()( [x.path for x in self.getCatalogTool()(
portal_type='Organisation', portal_type='Organisation',
title={'query': 'B', 'range': 'ngt'})]) title={'query': 'B', 'range': 'ngt'})])
def test_37_SimpleQueryDictWithMinMaxRangeParameter(self, quiet=quiet, def test_37_SimpleQueryDictWithMinMaxRangeParameter(self):
run=run_all_test):
"""use a dict as a keyword parameter, with minmax range parameter ( >= < ) """use a dict as a keyword parameter, with minmax range parameter ( >= < )
""" """
if not run: return
if not quiet:
message = 'Test Query Dict With Min Max Range Operator'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
org_a = self._makeOrganisation(title='A') org_a = self._makeOrganisation(title='A')
org_b = self._makeOrganisation(title='B') org_b = self._makeOrganisation(title='B')
org_c = self._makeOrganisation(title='C') org_c = self._makeOrganisation(title='C')
self.assertEquals([org_b.getPath()], self.assertEquals([org_b.getPath()],
[x.path for x in self.getCatalogTool()( [x.path for x in self.getCatalogTool()(
portal_type='Organisation', portal_type='Organisation',
title={'query': ('B', 'C'), 'range': 'minmax'})]) title={'query': ('B', 'C'), 'range': 'minmax'})])
def test_38_SimpleQueryDictWithMinNgtRangeParameter(self, quiet=quiet, def test_38_SimpleQueryDictWithMinNgtRangeParameter(self):
run=run_all_test):
"""use a dict as a keyword parameter, with minngt range parameter ( >= <= ) """use a dict as a keyword parameter, with minngt range parameter ( >= <= )
""" """
if not run: return
if not quiet:
message = 'Test Query Dict With Min Ngt Range Operator'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
org_a = self._makeOrganisation(title='A') org_a = self._makeOrganisation(title='A')
org_b = self._makeOrganisation(title='B') org_b = self._makeOrganisation(title='B')
org_c = self._makeOrganisation(title='C') org_c = self._makeOrganisation(title='C')
self.failIfDifferentSet([org_b.getPath(), org_c.getPath()], self.failIfDifferentSet([org_b.getPath(), org_c.getPath()],
[x.path for x in self.getCatalogTool()( [x.path for x in self.getCatalogTool()(
portal_type='Organisation', portal_type='Organisation',
...@@ -1193,7 +979,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1193,7 +979,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
org_a = self._makeOrganisation(title='A') org_a = self._makeOrganisation(title='A')
org_b = self._makeOrganisation(title='B') org_b = self._makeOrganisation(title='B')
org_c = self._makeOrganisation(title='C') org_c = self._makeOrganisation(title='C')
query_dict = {'query': ('B', 'C'), 'range': 'minngt'} query_dict = {'query': ('B', 'C'), 'range': 'minngt'}
from ZPublisher.HTTPRequest import record from ZPublisher.HTTPRequest import record
query_record = record() query_record = record()
...@@ -1205,14 +991,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1205,14 +991,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
portal_type='Organisation', portal_type='Organisation',
title=query_record)])) title=query_record)]))
def test_39_DeferredConnection(self, quiet=quiet, run=run_all_test): def test_39_DeferredConnection(self):
"""ERP5Catalog uses a deferred connection for full text indexing. """ERP5Catalog uses a deferred connection for full text indexing.
""" """
if not run: return
if not quiet:
message = 'Test Deferred Connection'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
erp5_sql_deferred_connection = getattr(self.getPortal(), erp5_sql_deferred_connection = getattr(self.getPortal(),
'erp5_sql_deferred_connection', 'erp5_sql_deferred_connection',
None) None)
...@@ -1225,14 +1006,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1225,14 +1006,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
getattr(self.getCatalogTool().getSQLCatalog(), getattr(self.getCatalogTool().getSQLCatalog(),
method).connection_id) method).connection_id)
def test_40_DeleteObject(self, quiet=quiet, run=run_all_test): def test_40_DeleteObject(self):
"""Simple test to exercise object deletion """Simple test to exercise object deletion
""" """
if not run: return
if not quiet:
message = 'Test Delete Object'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
folder = self.getOrganisationModule() folder = self.getOrganisationModule()
ob = folder.newContent() ob = folder.newContent()
self.tic() self.tic()
...@@ -1240,14 +1016,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1240,14 +1016,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.tic() self.tic()
self.assertEquals(0, len(folder.searchFolder())) self.assertEquals(0, len(folder.searchFolder()))
def test_41_ProxyRolesInRestrictedPython(self, quiet=quiet, run=run_all_test): def test_41_ProxyRolesInRestrictedPython(self):
"""test that proxy roles apply to catalog queries within python scripts """test that proxy roles apply to catalog queries within python scripts
""" """
if not run: return
if not quiet:
message = 'Proxy Roles In Restricted Python'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
perm = 'View' perm = 'View'
uf = self.getPortal().acl_users uf = self.getPortal().acl_users
...@@ -1295,14 +1066,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1295,14 +1066,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.login('bob') self.login('bob')
self.assertEquals(0, folder.catalog_test_script()) self.assertEquals(0, folder.catalog_test_script())
def test_42_SearchableText(self, quiet=quiet, run=run_all_test): def test_42_SearchableText(self):
"""Tests SearchableText is working in ERP5Catalog """Tests SearchableText is working in ERP5Catalog
""" """
if not run: return
if not quiet:
message = 'Searchable Text'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
folder = self.getOrganisationModule() folder = self.getOrganisationModule()
ob = folder.newContent() ob = folder.newContent()
ob.setTitle('The title of this object') ob.setTitle('The title of this object')
...@@ -1338,13 +1104,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1338,13 +1104,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
# Tritonn # Tritonn
self.assertEquals(10, self.getCatalogTool().countResults( self.assertEquals(10, self.getCatalogTool().countResults(
portal_type='Organisation', SearchableText='different')[0][0]) portal_type='Organisation', SearchableText='different')[0][0])
def test_43_ManagePasteObject(self, quiet=quiet, run=run_all_test): def test_43_ManagePasteObject(self):
if not run: return
if not quiet:
message = 'Manage Paste Objects'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
person_module = self.getPersonModule() person_module = self.getPersonModule()
person = person_module.newContent(id='1',portal_type='Person') person = person_module.newContent(id='1',portal_type='Person')
...@@ -1356,12 +1117,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1356,12 +1117,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
path_list = [new_person.getRelativeUrl()] path_list = [new_person.getRelativeUrl()]
self.checkRelativeUrlInSQLPathList(path_list) self.checkRelativeUrlInSQLPathList(path_list)
def test_44_ParentRelatedKeys(self, quiet=quiet, run=run_all_test): def test_44_ParentRelatedKeys(self):
if not run: return
if not quiet:
message = 'Parent related keys'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
person_module = self.getPersonModule() person_module = self.getPersonModule()
person_module.reindexObject() person_module.reindexObject()
...@@ -1370,15 +1126,10 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1370,15 +1126,10 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertEquals([person], self.assertEquals([person],
[x.getObject() for x in self.getCatalogTool()( [x.getObject() for x in self.getCatalogTool()(
parent_title=person_module.getTitle())]) parent_title=person_module.getTitle())])
def test_45_QueryAndComplexQuery(self,quiet=quiet, run=run_all_test): def test_45_QueryAndComplexQuery(self):
""" """
""" """
if not run: return
if not quiet:
message = 'Query And Complex Query'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
org_a = self._makeOrganisation(title='abc',description='abc') org_a = self._makeOrganisation(title='abc',description='abc')
org_b = self._makeOrganisation(title='bcd',description='bcd') org_b = self._makeOrganisation(title='bcd',description='bcd')
org_c = self._makeOrganisation(title='efg',description='efg') org_c = self._makeOrganisation(title='efg',description='efg')
...@@ -1421,16 +1172,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1421,16 +1172,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.failIfDifferentSet([org_a.getPath(), org_f.getPath()], self.failIfDifferentSet([org_a.getPath(), org_f.getPath()],
[x.path for x in self.getCatalogTool()( [x.path for x in self.getCatalogTool()(
portal_type='Organisation',**catalog_kw)]) portal_type='Organisation',**catalog_kw)])
def test_46_TestLimit(self,quiet=quiet, run=run_all_test):
"""
"""
if not run: return
if not quiet:
message = 'Test Limit'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
def test_46_TestLimit(self):
ctool = self.getCatalogTool() ctool = self.getCatalogTool()
old_default_result_limit = ctool.default_result_limit old_default_result_limit = ctool.default_result_limit
max_ = ctool.default_result_limit = 3 max_ = ctool.default_result_limit = 3
...@@ -1458,20 +1201,13 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1458,20 +1201,13 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
pass pass
self.commit() self.commit()
def test_48_ERP5Site_hotReindexAll(self, quiet=quiet, run=run_all_test): def test_48_ERP5Site_hotReindexAll(self):
""" """
test the hot reindexing of catalog -> catalog2 test the hot reindexing of catalog -> catalog2
then a hot reindexing detailed catalog2 -> catalog then a hot reindexing detailed catalog2 -> catalog
this test use the variable environment: extra_sql_connection_string_list this test use the variable environment: extra_sql_connection_string_list
""" """
if not run: return portal = self.portal
if not quiet:
message = 'Hot Reindex All'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
portal = self.getPortal()
self.original_connection_id = 'erp5_sql_connection' self.original_connection_id = 'erp5_sql_connection'
self.original_deferred_connection_id = self.new_erp5_deferred_sql_connection self.original_deferred_connection_id = self.new_erp5_deferred_sql_connection
self.new_connection_id = self.new_erp5_sql_connection self.new_connection_id = self.new_erp5_sql_connection
...@@ -1634,11 +1370,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1634,11 +1370,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
Check that cached values are invalidated due to Check that cached values are invalidated due to
catalog migration catalog migration
""" """
message = 'Hot Reindex All: cache invalidation' portal = self.portal
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ', 0, message)
portal = self.getPortal()
original_connection_id = 'erp5_sql_connection' original_connection_id = 'erp5_sql_connection'
original_deferred_connection_id = 'erp5_sql_deferred_connection' original_deferred_connection_id = 'erp5_sql_deferred_connection'
new_connection_string = getExtraSqlConnectionStringList()[0] new_connection_string = getExtraSqlConnectionStringList()[0]
...@@ -1751,15 +1483,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1751,15 +1483,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
folder=portal.portal_skins, folder=portal.portal_skins,
sql_connection_id_dict = sql_connection_id_dict) sql_connection_id_dict = sql_connection_id_dict)
def test_47_Unrestricted(self, quiet=quiet, run=run_all_test): def test_47_Unrestricted(self):
"""test unrestricted search/count results. """test unrestricted search/count results.
""" """
if not run: return
if not quiet:
message = 'Unrestricted queries'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
uf = self.getPortal().acl_users uf = self.getPortal().acl_users
uf._doAddUser('alice', '', ['Member', 'Manager', 'Assignor'], []) uf._doAddUser('alice', '', ['Member', 'Manager', 'Assignor'], [])
uf._doAddUser('bob', '', ['Member'], []) uf._doAddUser('bob', '', ['Member'], [])
...@@ -1770,27 +1496,21 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1770,27 +1496,21 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
ob = folder.newContent(title='Object Title') ob = folder.newContent(title='Object Title')
ob.manage_permission('View', ['Manager'], 0) ob.manage_permission('View', ['Manager'], 0)
self.tic() self.tic()
# bob cannot see the document # bob cannot see the document
self.login('bob') self.login('bob')
ctool = self.getCatalogTool() ctool = self.getCatalogTool()
self.assertEquals(0, len(ctool.searchResults(title='Object Title'))) self.assertEquals(0, len(ctool.searchResults(title='Object Title')))
self.assertEquals(0, ctool.countResults(title='Object Title')[0][0]) self.assertEquals(0, ctool.countResults(title='Object Title')[0][0])
# unless using unrestricted searches # unless using unrestricted searches
self.assertEquals(1, self.assertEquals(1,
len(ctool.unrestrictedSearchResults(title='Object Title'))) len(ctool.unrestrictedSearchResults(title='Object Title')))
self.assertEquals(1, self.assertEquals(1,
ctool.unrestrictedCountResults(title='Object Title')[0][0]) ctool.unrestrictedCountResults(title='Object Title')[0][0])
@todo_erp5
def test_49_IndexInOrderedSearchFolder(self, quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Index In Ordered Search Folder'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
@todo_erp5
def test_49_IndexInOrderedSearchFolder(self):
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Clear catalog # Clear catalog
...@@ -1828,15 +1548,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1828,15 +1548,9 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
sql = person_module.searchFolder(src__=1, sort_on=[('title','ascending')]) sql = person_module.searchFolder(src__=1, sort_on=[('title','ascending')])
self.failUnless('use index' in sql) self.failUnless('use index' in sql)
def test_50_LocalRolesArgument(self, quiet=quiet, run=run_all_test): def test_50_LocalRolesArgument(self):
"""test local_roles= argument """test local_roles= argument
""" """
if not run: return
if not quiet:
message = 'local_roles= argument'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
uf = self.getPortal().acl_users uf = self.getPortal().acl_users
uf._doAddUser('bob', '', ['Member'], []) uf._doAddUser('bob', '', ['Member'], [])
...@@ -1848,7 +1562,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1848,7 +1562,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
ob2_id = ob2.getId() ob2_id = ob2.getId()
ob2.manage_addLocalRoles('bob', ['Assignee']) ob2.manage_addLocalRoles('bob', ['Assignee'])
self.tic() self.tic()
# by default bob can see those 2 documents # by default bob can see those 2 documents
self.login('bob') self.login('bob')
ctool = self.getCatalogTool() ctool = self.getCatalogTool()
...@@ -1914,13 +1628,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1914,13 +1628,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertEquals(1, folder.countFolder(title='Object Title', self.assertEquals(1, folder.countFolder(title='Object Title',
local_roles='Assignee')[0][0]) local_roles='Assignee')[0][0])
def test_51_SearchWithKeyWords(self, quiet=quiet, run=run_all_test): def test_51_SearchWithKeyWords(self):
if not run: return
if not quiet:
message = 'Test searching with SQL keywords'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
and_ = person_module.newContent(portal_type='Person', title='AND') and_ = person_module.newContent(portal_type='Person', title='AND')
or_ = person_module.newContent(portal_type='Person', title='OR') or_ = person_module.newContent(portal_type='Person', title='OR')
...@@ -1941,18 +1649,12 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1941,18 +1649,12 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertEquals([select_], [x.getObject() for x in self.assertEquals([select_], [x.getObject() for x in
ctool(portal_type='Person', title='SELECT')]) ctool(portal_type='Person', title='SELECT')])
def test_52_QueryAndTableAlias(self,quiet=quiet, run=run_all_test): def test_52_QueryAndTableAlias(self):
""" """
Make sure we can use aliases for tables wich will Make sure we can use aliases for tables wich will
be used by related keys. This allow in some particular be used by related keys. This allow in some particular
cases to decrease a lot the number of aliases cases to decrease a lot the number of aliases
""" """
if not run: return
if not quiet:
message = 'Query And Table Alias'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
org_a = self._makeOrganisation(title='abc',default_address_city='abc') org_a = self._makeOrganisation(title='abc',default_address_city='abc')
module = self.getOrganisationModule() module = self.getOrganisationModule()
module.immediateReindexObject() module.immediateReindexObject()
...@@ -1978,13 +1680,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -1978,13 +1680,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
("parent","catalog"), ("parent","catalog"),
("grand_parent","catalog")), ("grand_parent","catalog")),
table_alias_list) table_alias_list)
def test_53_DateFormat(self, quiet=quiet, run=run_all_test): def test_53_DateFormat(self):
if not run: return
if not quiet:
message = 'Date Format'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
org_a = self._makeOrganisation(title='org_a') org_a = self._makeOrganisation(title='org_a')
org_b = self._makeOrganisation(title='org_b') org_b = self._makeOrganisation(title='org_b')
sql_connection = self.getSQLConnection() sql_connection = self.getSQLConnection()
...@@ -2037,12 +1734,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2037,12 +1734,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
[x.path for x in self.getCatalogTool()( [x.path for x in self.getCatalogTool()(
portal_type='Organisation',**catalog_kw)]) portal_type='Organisation',**catalog_kw)])
def test_54_FixIntUid(self, quiet=quiet, run=run_all_test): def test_54_FixIntUid(self):
if not run: return
if not quiet:
message = 'Test if portal_catalog ensures that uid is long'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
portal = self.getPortal() portal = self.getPortal()
...@@ -2065,13 +1757,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2065,13 +1757,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.failUnless(isinstance(uid, long)) self.failUnless(isinstance(uid, long))
self.assertEquals(organisation.uid, uid) self.assertEquals(organisation.uid, uid)
def test_55_FloatFormat(self, quiet=quiet, run=run_all_test): def test_55_FloatFormat(self):
if not run: return
if not quiet:
message = 'Float Format'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
catalog_kw = {'uid': {'query': '2 567.54', catalog_kw = {'uid': {'query': '2 567.54',
'format': '1 234.12', 'format': '1 234.12',
'type': 'float'}} 'type': 'float'}}
...@@ -2079,7 +1765,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2079,7 +1765,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertTrue("TRUNCATE(catalog.uid,2) = '2567.54'" in sql_src or \ self.assertTrue("TRUNCATE(catalog.uid,2) = '2567.54'" in sql_src or \
'TRUNCATE(`catalog`.`uid`, 2) = 2567.54' in sql_src, sql_src) 'TRUNCATE(`catalog`.`uid`, 2) = 2567.54' in sql_src, sql_src)
def test_56_CreateUidDuringClearCatalog(self, quiet=quiet,run=run_all_test): def test_56_CreateUidDuringClearCatalog(self):
""" """
Create a script in the catalog to generate a uid list Create a script in the catalog to generate a uid list
Check the creation some objects, or activities, during a clear Check the creation some objects, or activities, during a clear
...@@ -2099,8 +1785,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2099,8 +1785,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
# before the recreate tables of catalog # before the recreate tables of catalog
catalog.manage_catalogClear() catalog.manage_catalogClear()
def test_SearchOnOwner(self, quiet=quiet, run=run_all_test): def test_SearchOnOwner(self):
if not run: return
# owner= can be used a search key in the catalog to have all documents for # owner= can be used a search key in the catalog to have all documents for
# a specific owner and on which he have the View permission. # a specific owner and on which he have the View permission.
obj = self._makeOrganisation(title='The Document') obj = self._makeOrganisation(title='The Document')
...@@ -2117,8 +1802,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2117,8 +1802,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
owner='somebody else')]) owner='somebody else')])
def test_SubDocumentsSecurityIndexing(self, quiet=quiet, run=run_all_test): def test_SubDocumentsSecurityIndexing(self):
if not run: return
# make sure indexing of security on sub-documents works as expected # make sure indexing of security on sub-documents works as expected
uf = self.getPortal().acl_users uf = self.getPortal().acl_users
uf._doAddUser('bob', '', ['Member'], []) uf._doAddUser('bob', '', ['Member'], [])
...@@ -2140,9 +1824,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2140,9 +1824,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
obj.searchFolder(portal_type='Bank Account')]) obj.searchFolder(portal_type='Bank Account')])
@todo_erp5 @todo_erp5
def test_SubDocumentsWithAcquireLocalRoleSecurityIndexing( def test_SubDocumentsWithAcquireLocalRoleSecurityIndexing(self):
self, quiet=quiet, run=run_all_test):
if not run: return
# Check that sub object indexation is compatible with ZODB settings # Check that sub object indexation is compatible with ZODB settings
# when the sub object acquires the parent local roles # when the sub object acquires the parent local roles
perm = 'View' perm = 'View'
...@@ -2204,10 +1886,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2204,10 +1886,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
local_roles='Auditor') local_roles='Auditor')
self.assertSameSet([], [x.getObject() for x in result]) self.assertSameSet([], [x.getObject() for x in result])
def test_60_ViewableOwnerIndexing(self, quiet=quiet, run=run_all_test): def test_60_ViewableOwnerIndexing(self):
if not run:
return
logout = self.logout logout = self.logout
uf = self.getPortal().acl_users uf = self.getPortal().acl_users
uf._doAddUser('super_owner', '', ['Member', 'Author', 'Assignee'], []) uf._doAddUser('super_owner', '', ['Member', 'Author', 'Assignee'], [])
...@@ -2344,8 +2023,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2344,8 +2023,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
result = sql_connection.manage_test(sql % sub_obj.getUid()) result = sql_connection.manage_test(sql % sub_obj.getUid())
self.assertSameSet(['little_owner'], [x.owner for x in result]) self.assertSameSet(['little_owner'], [x.owner for x in result])
def test_ExactMatchSearch(self, quiet=quiet, run=run_all_test): def test_ExactMatchSearch(self):
if not run: return
# test exact match search with queries # test exact match search with queries
doc = self._makeOrganisation(title='Foo%') doc = self._makeOrganisation(title='Foo%')
other_doc = self._makeOrganisation(title='FooBar') other_doc = self._makeOrganisation(title='FooBar')
...@@ -2359,8 +2037,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2359,8 +2037,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
ctool(portal_type='Organisation', title=dict(query='Foo%', ctool(portal_type='Organisation', title=dict(query='Foo%',
key='ExactMatch'))]) key='ExactMatch'))])
def test_KeywordSearch(self, quiet=quiet, run=run_all_test): def test_KeywordSearch(self):
if not run: return
# test keyword search with queries # test keyword search with queries
doc = self._makeOrganisation(description='Foo') doc = self._makeOrganisation(description='Foo')
other_doc = self._makeOrganisation(description='Foobar') other_doc = self._makeOrganisation(description='Foobar')
...@@ -2375,8 +2052,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2375,8 +2052,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
key='Keyword'))])) key='Keyword'))]))
def test_ignore_empty_string(self, quiet=quiet, run=run_all_test): def test_ignore_empty_string(self):
if not run: return
# ERP5Catalog ignore empty strings by default # ERP5Catalog ignore empty strings by default
doc_with_description = self._makeOrganisation(description='X') doc_with_description = self._makeOrganisation(description='X')
doc_with_empty_description = self._makeOrganisation(description='') doc_with_empty_description = self._makeOrganisation(description='')
...@@ -2409,10 +2085,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2409,10 +2085,8 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertEquals(set([doc_with_empty_region_description]), self.assertEquals(set([doc_with_empty_region_description]),
searchResults(ignore_empty_string=0, region_description='')) searchResults(ignore_empty_string=0, region_description=''))
def test_complex_query(self, quiet=quiet, run=run_all_test): def test_complex_query(self):
# Make sure that complex query works on real environment. # Make sure that complex query works on real environment.
if not run: return
catalog = self.getCatalogTool() catalog = self.getCatalogTool()
person_module = self.getPersonModule() person_module = self.getPersonModule()
...@@ -2468,7 +2142,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2468,7 +2142,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertRaises(NotImplementedError, ComplexQuery, query_find_european, query_find_name_erp5, operator='OR') self.assertRaises(NotImplementedError, ComplexQuery, query_find_european, query_find_name_erp5, operator='OR')
def test_check_security_table_content(self, quiet=quiet, run=run_all_test): def test_check_security_table_content(self):
sql_connection = self.getSQLConnection() sql_connection = self.getSQLConnection()
portal = self.getPortalObject() portal = self.getPortalObject()
portal_types = portal.portal_types portal_types = portal.portal_types
...@@ -2482,7 +2156,6 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2482,7 +2156,6 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.getPortal().ERP5Site_reindexAll() self.getPortal().ERP5Site_reindexAll()
self.tic() self.tic()
# Person stuff # Person stuff
person_module = portal.person_module person_module = portal.person_module
person = 'Person' person = 'Person'
...@@ -2595,10 +2268,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor): ...@@ -2595,10 +2268,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
result = query('SELECT roles_and_users.uid, roles_and_users.allowedRolesAndUsers FROM roles_and_users, catalog WHERE roles_and_users.uid = catalog.security_uid AND catalog.uid = %i AND roles_and_users.allowedRolesAndUsers LIKE "user:bar%%"' % (object.uid, )) result = query('SELECT roles_and_users.uid, roles_and_users.allowedRolesAndUsers FROM roles_and_users, catalog WHERE roles_and_users.uid = catalog.security_uid AND catalog.uid = %i AND roles_and_users.allowedRolesAndUsers LIKE "user:bar%%"' % (object.uid, ))
self.assertEqual(len(result), 0, '%r: len(%r) != 0' % (getObjectDictKey(), result)) self.assertEqual(len(result), 0, '%r: len(%r) != 0' % (getObjectDictKey(), result))
def test_RealOwnerIndexing(self, quiet=quiet, run=run_all_test): def test_RealOwnerIndexing(self):
if not run:
return
logout = self.logout logout = self.logout
user1 = 'local_foo' user1 = 'local_foo'
user2 = 'local_bar' user2 = 'local_bar'
...@@ -2780,10 +2450,7 @@ VALUES ...@@ -2780,10 +2450,7 @@ VALUES
sql_catalog.sql_search_tables = current_sql_search_tables sql_catalog.sql_search_tables = current_sql_search_tables
self.commit() self.commit()
def test_MonoValueAssigneeIndexing(self, quiet=quiet, run=run_all_test): def test_MonoValueAssigneeIndexing(self):
if not run:
return
logout = self.logout logout = self.logout
user1 = 'local_foo' user1 = 'local_foo'
user2 = 'local_bar' user2 = 'local_bar'
...@@ -2978,10 +2645,7 @@ VALUES ...@@ -2978,10 +2645,7 @@ VALUES
sql_catalog.sql_search_tables = current_sql_search_tables sql_catalog.sql_search_tables = current_sql_search_tables
self.commit() self.commit()
def test_UserOrGroupRoleIndexing(self, quiet=quiet, run=run_all_test): def test_UserOrGroupRoleIndexing(self):
if not run:
return
logout = self.logout logout = self.logout
user1 = 'a_great_user_name' user1 = 'a_great_user_name'
user1_group = 'a_great_user_group' user1_group = 'a_great_user_group'
...@@ -3235,10 +2899,7 @@ VALUES ...@@ -3235,10 +2899,7 @@ VALUES
sql_catalog.sql_search_tables = current_sql_search_tables sql_catalog.sql_search_tables = current_sql_search_tables
self.commit() self.commit()
def test_UserOrGroupLocalRoleIndexing(self, quiet=quiet, run=run_all_test): def test_UserOrGroupLocalRoleIndexing(self):
if not run:
return
logout = self.logout logout = self.logout
user1 = 'another_great_user_name' user1 = 'another_great_user_name'
user1_group = 'another_great_user_group' user1_group = 'another_great_user_group'
...@@ -3599,11 +3260,8 @@ VALUES ...@@ -3599,11 +3260,8 @@ VALUES
sql_catalog.sql_search_tables = current_sql_search_tables sql_catalog.sql_search_tables = current_sql_search_tables
self.commit() self.commit()
def test_ObjectReindexationConcurency(self, quiet=quiet, run=run_all_test): def test_ObjectReindexationConcurency(self):
if not run: portal = self.portal
return
portal = self.getPortalObject()
portal_activities = getattr(portal, 'portal_activities', None) portal_activities = getattr(portal, 'portal_activities', None)
if portal_activities is None: if portal_activities is None:
...@@ -3698,13 +3356,10 @@ VALUES ...@@ -3698,13 +3356,10 @@ VALUES
self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 2) self.assertEqual(len([x for x in portal_activities.getMessageList() if x.processing_node == 0]), 2)
self.tic() self.tic()
def test_PercentCharacter(self, quiet=quiet, run=run_all_test): def test_PercentCharacter(self):
""" """
Check expected behaviour of % character for simple query Check expected behaviour of % character for simple query
""" """
if not run:
return
portal_type = 'Organisation' portal_type = 'Organisation'
folder = self.getOrganisationModule() folder = self.getOrganisationModule()
folder.newContent(portal_type=portal_type, title='foo_organisation_1') folder.newContent(portal_type=portal_type, title='foo_organisation_1')
...@@ -3721,13 +3376,10 @@ VALUES ...@@ -3721,13 +3376,10 @@ VALUES
self.assertEquals(1, len(folder.portal_catalog(portal_type=portal_type, self.assertEquals(1, len(folder.portal_catalog(portal_type=portal_type,
title='foo_org%ion_1'))) title='foo_org%ion_1')))
def test_SearchedStringIsNotStripped(self, quiet=quiet, run=run_all_test): def test_SearchedStringIsNotStripped(self):
""" """
Check that extra spaces in lookup values are preserved Check that extra spaces in lookup values are preserved
""" """
if not run:
return
portal_type = 'Organisation' portal_type = 'Organisation'
folder = self.getOrganisationModule() folder = self.getOrganisationModule()
first_doc = folder.newContent(portal_type=portal_type, reference="foo") first_doc = folder.newContent(portal_type=portal_type, reference="foo")
...@@ -3745,13 +3397,10 @@ VALUES ...@@ -3745,13 +3397,10 @@ VALUES
#compareSet('foo ', []) #compareSet('foo ', [])
#compareSet(' foo ', []) #compareSet(' foo ', [])
def test_WildcardMatchesUnsetValue(self, quiet=quiet, run=run_all_test): def test_WildcardMatchesUnsetValue(self):
""" """
Check that the "%" wildcard matches unset values. Check that the "%" wildcard matches unset values.
""" """
if not run:
return
portal_type = 'Organisation' portal_type = 'Organisation'
folder = self.getOrganisationModule() folder = self.getOrganisationModule()
first_doc = folder.newContent(portal_type=portal_type, reference="doc 1") first_doc = folder.newContent(portal_type=portal_type, reference="doc 1")
...@@ -3761,14 +3410,11 @@ VALUES ...@@ -3761,14 +3410,11 @@ VALUES
self.assertEqual(len(result), 2) self.assertEqual(len(result), 2)
@todo_erp5 @todo_erp5
def test_sortOnRelatedKeyWithUnsetRelation(self, quiet=quiet, run=run_all_test): def test_sortOnRelatedKeyWithUnsetRelation(self):
""" """
Check that sorting on a related key does not filter out objects for Check that sorting on a related key does not filter out objects for
which the relation is not set. which the relation is not set.
""" """
if not run:
return
portal = self.getPortalObject() portal = self.getPortalObject()
organisation = portal.organisation_module.\ organisation = portal.organisation_module.\
newContent(portal_type="Organisation") newContent(portal_type="Organisation")
...@@ -3780,7 +3426,7 @@ VALUES ...@@ -3780,7 +3426,7 @@ VALUES
self.assertEqual(len(person_module.searchFolder()), self.assertEqual(len(person_module.searchFolder()),
len(person_module.searchFolder(sort_on=[('subordination_title', 'ascending')]))) len(person_module.searchFolder(sort_on=[('subordination_title', 'ascending')])))
def test_multipleRelatedKeyDoMultipleJoins(self, quiet=quiet, run=run_all_test): def test_multipleRelatedKeyDoMultipleJoins(self):
""" """
Check that when multiple related keys are present in the same query, Check that when multiple related keys are present in the same query,
each one does a separate join. each one does a separate join.
...@@ -3791,10 +3437,7 @@ VALUES ...@@ -3791,10 +3437,7 @@ VALUES
- objects whose site_title is foo - objects whose site_title is foo
- objects whose site_description is bar - objects whose site_description is bar
""" """
if not run: portal = self.portal
return
portal = self.getPortalObject()
def _create(**kw): def _create(**kw):
return portal.organisation_module.newContent(portal_type='Organisation', **kw) return portal.organisation_module.newContent(portal_type='Organisation', **kw)
def create(id, related_obect_list): def create(id, related_obect_list):
...@@ -3853,16 +3496,9 @@ VALUES ...@@ -3853,16 +3496,9 @@ VALUES
Query(site_title='=foo2'), Query(site_title='=foo2'),
operator='AND')) operator='AND'))
def test_SearchFolderWithRelatedDynamicRelatedKey(self, def test_SearchFolderWithRelatedDynamicRelatedKey(self):
quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Search Folder With Related Dynamic Related Key'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
# Create some objects # Create some objects
portal = self.getPortal() portal = self.portal
portal_category = self.getCategoryTool() portal_category = self.getCategoryTool()
portal_category.group.manage_delObjects([x for x in portal_category.group.manage_delObjects([x for x in
portal_category.group.objectIds()]) portal_category.group.objectIds()])
...@@ -3932,16 +3568,9 @@ VALUES ...@@ -3932,16 +3568,9 @@ VALUES
title='Storever')] title='Storever')]
self.assertEquals(category_list,[group_nexedi_category2]) self.assertEquals(category_list,[group_nexedi_category2])
def test_SearchFolderWithRelatedDynamicStrictRelatedKey(self, def test_SearchFolderWithRelatedDynamicStrictRelatedKey(self):
quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Search Folder With Related Strict Dynamic Related Key'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
# Create some objects # Create some objects
portal = self.getPortal() portal = self.portal
portal_category = self.getCategoryTool() portal_category = self.getCategoryTool()
portal_category.group.manage_delObjects([x for x in portal_category.group.manage_delObjects([x for x in
portal_category.group.objectIds()]) portal_category.group.objectIds()])
...@@ -3989,14 +3618,7 @@ VALUES ...@@ -3989,14 +3618,7 @@ VALUES
strict_group_related_description='c')] strict_group_related_description='c')]
self.assertEquals(category_list,[sub_group_nexedi]) self.assertEquals(category_list,[sub_group_nexedi])
def test_EscapingLoginInSescurityQuery(self, def test_EscapingLoginInSescurityQuery(self):
quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Test that login are escaped when call security_query'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
# Create some objects # Create some objects
reference = "aaa.o'connor@fake.ie" reference = "aaa.o'connor@fake.ie"
portal = self.getPortal() portal = self.getPortal()
...@@ -4006,13 +3628,7 @@ VALUES ...@@ -4006,13 +3628,7 @@ VALUES
newSecurityManager(None, user) newSecurityManager(None, user)
portal.view() portal.view()
def test_IndexationContextIndependence(self, quiet=quiet, run=run_all_test): def test_IndexationContextIndependence(self):
if not run: return
if not quiet:
message = 'Test that indexation is context-independent'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
def doCatalog(catalog, document): def doCatalog(catalog, document):
catalog.catalogObjectList([document], check_uid=0) catalog.catalogObjectList([document], check_uid=0)
result = catalog(select_expression='reference', uid=document.getUid()) result = catalog(select_expression='reference', uid=document.getUid())
...@@ -4056,8 +3672,7 @@ VALUES ...@@ -4056,8 +3672,7 @@ VALUES
delattr(portal, 'foo') delattr(portal, 'foo')
delattr(portal, 'bar') delattr(portal, 'bar')
def test_distinct_select_expression(self, quiet=quiet, run=run_all_test): def test_distinct_select_expression(self):
if not run: return
person = self.portal.person_module.newContent(portal_type='Person') person = self.portal.person_module.newContent(portal_type='Person')
self.tic() self.tic()
portal_catalog = self.getCatalogTool() portal_catalog = self.getCatalogTool()
...@@ -4067,20 +3682,15 @@ VALUES ...@@ -4067,20 +3682,15 @@ VALUES
portal_type='Person', portal_type='Person',
) )
self.assertEquals(1, len(res)) self.assertEquals(1, len(res))
self.assertEquals(person, res[0].getObject())
def test_CatalogUidDuplicates(self, quiet=quiet, run=run_all_test): def test_CatalogUidDuplicates(self):
""" """
Initially, the catalog was changing uids when a duplicate was found. Initially, the catalog was changing uids when a duplicate was found.
This operation was really too dangerous, so now we raise errors in this This operation was really too dangerous, so now we raise errors in this
case. Here we now check that the error is raised case. Here we now check that the error is raised
""" """
if not run: return
if not quiet:
message = 'Catalog Uid Duplicates'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
# Create an object just to allocate a new valid uid. # Create an object just to allocate a new valid uid.
person_module = self.getPersonModule() person_module = self.getPersonModule()
person = person_module.newContent(portal_type='Person') person = person_module.newContent(portal_type='Person')
...@@ -4108,12 +3718,7 @@ VALUES ...@@ -4108,12 +3718,7 @@ VALUES
person1.is_indexable = person2.is_indexable = True person1.is_indexable = person2.is_indexable = True
self.assertRaises(ValueError, portal_catalog.catalogObjectList,[person1, person2]) self.assertRaises(ValueError, portal_catalog.catalogObjectList,[person1, person2])
def test_SearchFolderWithParenthesis(self, quiet=quiet): def test_SearchFolderWithParenthesis(self):
if not quiet:
message = 'Search Folder With Parenthesis'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Make sure that the catalog will not split it with such research : # Make sure that the catalog will not split it with such research :
...@@ -4127,13 +3732,8 @@ VALUES ...@@ -4127,13 +3732,8 @@ VALUES
folder_object_list = [x.getObject().getId() for x in folder_object_list = [x.getObject().getId() for x in
person_module.searchFolder(title=title)] person_module.searchFolder(title=title)]
self.assertEquals([person_id],folder_object_list) self.assertEquals([person_id],folder_object_list)
def test_SearchFolderWithMultipleSpaces(self, quiet=quiet):
if not quiet:
message = 'Search Folder With Multiple Spaces'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
def test_SearchFolderWithMultipleSpaces(self):
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Make sure that the catalog will not split it with such research : # Make sure that the catalog will not split it with such research :
...@@ -4149,13 +3749,8 @@ VALUES ...@@ -4149,13 +3749,8 @@ VALUES
folder_object_list = [x.getObject().getId() for x in folder_object_list = [x.getObject().getId() for x in
person_module.searchFolder(title=title)] person_module.searchFolder(title=title)]
self.assertEquals([person_id],folder_object_list) self.assertEquals([person_id],folder_object_list)
def test_SearchFolderWithSingleQuote(self, quiet=quiet):
if not quiet:
message = 'Search Folder With Single Quote'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
def test_SearchFolderWithSingleQuote(self):
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Make sure that the catalog will not split it with such research : # Make sure that the catalog will not split it with such research :
...@@ -4170,17 +3765,12 @@ VALUES ...@@ -4170,17 +3765,12 @@ VALUES
person_module.searchFolder(title=title)] person_module.searchFolder(title=title)]
self.assertEquals([person_id],folder_object_list) self.assertEquals([person_id],folder_object_list)
def test_ParameterSelectDict(self, quiet=quiet): def test_ParameterSelectDict(self):
if not quiet:
message = 'Check Parameter Select Dict'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
# Make sure that we are able to retrieve data directly from mysql # Make sure that we are able to retrieve data directly from mysql
# without retrieving real objects # without retrieving real objects
title="foo" title = "foo"
description = "foobar" description = "foobar"
person = person_module.newContent(portal_type='Person',title=title, person = person_module.newContent(portal_type='Person',title=title,
description=description) description=description)
...@@ -4204,12 +3794,7 @@ VALUES ...@@ -4204,12 +3794,7 @@ VALUES
self.assertEquals([x.getTitle() for x in self.assertEquals([x.getTitle() for x in
folder_object_list], real_title_list) folder_object_list], real_title_list)
def test_countResultsUsesFromExpression(self, quiet=quiet): def test_countResultsUsesFromExpression(self):
if not quiet:
message = 'countResults uses from_expression'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ',0,message)
person_module = self.getPersonModule() person_module = self.getPersonModule()
module_len = len(person_module) module_len = len(person_module)
if module_len == 0: if module_len == 0:
...@@ -4234,7 +3819,7 @@ VALUES ...@@ -4234,7 +3819,7 @@ VALUES
count = catalog.countResults(from_expression=from_expression)[0][0] count = catalog.countResults(from_expression=from_expression)[0][0]
self.assertEqual(count, module_len) self.assertEqual(count, module_len)
def test_getParentUid(self, quiet=quiet): def test_getParentUid(self):
from Products.ERP5.Document.Assignment import Assignment from Products.ERP5.Document.Assignment import Assignment
import erp5.portal_type import erp5.portal_type
person_module = self.getPersonModule() person_module = self.getPersonModule()
...@@ -4272,7 +3857,7 @@ VALUES ...@@ -4272,7 +3857,7 @@ VALUES
self.assertTrue(int(person.uid)) self.assertTrue(int(person.uid))
self.assertEqual(person.uid, assignment.getParentUid()) self.assertEqual(person.uid, assignment.getParentUid())
def test_queriesEndingWithSemicolon(self, quiet=quiet): def test_queriesEndingWithSemicolon(self):
connector = self.getPortal().erp5_sql_connection connector = self.getPortal().erp5_sql_connection
result = connector.manage_test('select 1 as foo;') result = connector.manage_test('select 1 as foo;')
self.assertEquals(1, result[0].foo) self.assertEquals(1, result[0].foo)
...@@ -4290,13 +3875,7 @@ VALUES ...@@ -4290,13 +3875,7 @@ VALUES
group = group_category.newContent(id=group_id) group = group_category.newContent(id=group_id)
group.edit(title=title, description=description) group.edit(title=title, description=description)
def test_SelectDictWithDynamicRelatedKey(self, quiet=quiet, run=run_all_test): def test_SelectDictWithDynamicRelatedKey(self):
if not run: return
if not quiet:
message = 'Select Dict With Dynamic Related Key'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ', 0, message)
self._createSomeGroupCategories() self._createSomeGroupCategories()
# Create some orgs associated with varying association with the # Create some orgs associated with varying association with the
...@@ -4378,14 +3957,8 @@ VALUES ...@@ -4378,14 +3957,8 @@ VALUES
module.searchFolder(**search_kw)] module.searchFolder(**search_kw)]
self.assertEquals(organisation_list, [org1, org2, org3, org4]) self.assertEquals(organisation_list, [org1, org2, org3, org4])
def test_BackwardCompatibilityWithOldMethods(self, quiet=quiet, def test_BackwardCompatibilityWithOldMethods(self):
run=run_all_test): 'Dealing with RelatedKey methods missing the proper separator'
if not run: return
if not quiet:
message = 'Dealing with RelatedKey methods missing the proper separator'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ', 0, message)
module = self.getOrganisationModule() module = self.getOrganisationModule()
org_a = self._makeOrganisation(title='abc',default_address_city='abc') org_a = self._makeOrganisation(title='abc',default_address_city='abc')
org_a.setReference(org_a.getId()) org_a.setReference(org_a.getId())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment