diff --git a/product/ERP5Catalog/tests/testArchive.py b/product/ERP5Catalog/tests/testArchive.py index 306f6beedf51a82a4f7c64dbf525610a81faa546..7a4a886f7e78e83d3f2d10ac48e1e186bcde2080 100644 --- a/product/ERP5Catalog/tests/testArchive.py +++ b/product/ERP5Catalog/tests/testArchive.py @@ -27,15 +27,17 @@ # ############################################################################## +import time import unittest -from Testing import ZopeTestCase -from AccessControl.SecurityManagement import newSecurityManager +from Testing.ZopeTestCase import _print, PortalTestCase +from AccessControl.SecurityManagement import \ + getSecurityManager, newSecurityManager from zLOG import LOG from DateTime import DateTime from Products.ERP5Type.tests.utils import getExtraSqlConnectionStringList from Products.ERP5.tests.testInventoryAPI import InventoryAPITestCase -from Products.ERP5Type.tests.utils import reindex +from Products.ERP5Type.tests.utils import reindex, createZODBPythonScript class TestArchive(InventoryAPITestCase): @@ -133,7 +135,7 @@ class TestArchive(InventoryAPITestCase): if not run: return if not quiet: message = 'Archive' - ZopeTestCase._print('\n%s ' % message) + _print('\n%s ' % message) LOG('Testing... ',0,message) portal = self.getPortal() @@ -342,9 +344,42 @@ class TestArchive(InventoryAPITestCase): # check the current archive self.assertEqual(portal_archive.getCurrentArchive(), dest) + def test_MaximumRecursionDepthExceededWithComplexSecurity(self): + skin = self.portal.portal_skins.custom + colour = self.portal.portal_categories.colour + colour.hasObject('green') or colour.newContent('green') + login = str(time.time()) + script_id = ["ERP5Type_getSecurityCategoryMapping", + "ERP5Type_getSecurityCategory"] + createZODBPythonScript(skin, script_id[0], "", + "return ((%r, ('colour',)),)" % script_id[1]) + createZODBPythonScript(skin, script_id[1], + "base_category_list, user_name, object, portal_type, depth=[]", """if 1: + # This should not be called recursively, or at least if should not fail. + # Because RuntimeError is catched by 'except:' clauses, we detect it + # with a static variable. + depth.append(None) + assert not portal_type, portal_type + # the following line calls Base_zSearchRelatedObjectsByCategoryList + object.getSourceDecisionRelatedValueList() + bc, = base_category_list + depth.pop() + return [] if depth else [{bc: 'green'}] + """) + person = self.portal.person_module.newContent(reference=login) + try: + self.tic() + PortalTestCase.login(self, login) + self.assertEqual(['green'], getSecurityManager().getUser().getGroups()) + self.portal.portal_caches.clearAllCache() + PortalTestCase.login(self, login) + unittest.expectedFailure(self.assertEqual)( + ['green'], getSecurityManager().getUser().getGroups()) + finally: + skin.manage_delObjects(script_id) + self.commit() def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestArchive)) return suite - diff --git a/product/ERP5Type/patches/DA.py b/product/ERP5Type/patches/DA.py index e401eca21cc9c33bad31b47aa564c4b0f4d5cd8f..3b42902785045d12da0ca02f47ae78bc07f1ce3a 100644 --- a/product/ERP5Type/patches/DA.py +++ b/product/ERP5Type/patches/DA.py @@ -162,20 +162,17 @@ def DA__call__(self, REQUEST=None, __ick__=None, src__=0, test__=0, **kw): # Patch to implement dynamic connection id # Connection id is retrieve from user preference if c is None: - physical_path = self.getPhysicalPath() # XXX cleaner solution will be needed - if 'portal_catalog' not in physical_path and\ - 'cmf_activity' not in self.connection_id and\ - 'transactionless' not in self.connection_id: - try: - archive_id = self.portal_preferences.getPreferredArchive() - except AttributeError: - pass - else: - if archive_id not in (None, ''): + if not (self.connection_id in ('cmf_activity_sql_connection', + 'erp5_sql_transactionless_connection') + or 'portal_catalog' in self.getPhysicalPath()): + portal = self.getPortalObject() + if 'portal_archives' in portal.__dict__: + archive_id = portal.portal_preferences.getPreferredArchive() + if archive_id: archive_id = archive_id.split('/')[-1] #LOG("DA__call__, archive_id 2", 300, archive_id) - archive = self.portal_archives._getOb(archive_id, None) + archive = portal.portal_archives._getOb(archive_id, None) if archive is not None: c = archive.getConnectionId() #LOG("DA call", INFO, "retrieved connection %s from preference" %(c,))