Commit 46e300cb authored by Romain Courteaud's avatar Romain Courteaud

Instead of generating security query which hardcoded column names, use new

configuration parameter defined on the catalog tool.


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@19414 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 6bb58b82
......@@ -197,7 +197,6 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
manage_options = ({ 'label' : 'Overview', 'action' : 'manage_overview' },
) + ZCatalog.manage_options
def __init__(self):
ZCatalog.__init__(self, self.getId())
......@@ -447,12 +446,16 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
user_is_superuser = (user_str == SUPER_USER)
allowedRolesAndUsers = self._listAllowedRolesAndUsers(user)
role_column_dict = {}
column_map = self.getSQLCatalog(sql_catalog_id).getColumnMap()
local_role_column_dict = {}
catalog = self.getSQLCatalog(sql_catalog_id)
column_map = catalog.getColumnMap()
# Patch for ERP5 by JP Smets in order
# to implement worklists and search of local roles
if kw.has_key('local_roles'):
local_roles = kw['local_roles']
local_role_dict = dict(catalog.getSQLCatalogLocalRoleKeysList())
role_dict = dict(catalog.getSQLCatalogRoleKeysList())
# XXX user is not enough - we should also include groups of the user
# Only consider local_roles if it is not empty
if local_roles not in (None, '', []): # XXX: Maybe "if local_roles:" is enough.
......@@ -460,27 +463,42 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
# Turn it into a list if necessary according to ';' separator
if isinstance(local_roles, str):
local_roles = local_roles.split(';')
local_roles = [x.lower() for x in local_roles]
# Local roles now has precedence (since it comes from a WorkList)
for user_or_group in allowedRolesAndUsers:
for role in local_roles:
# Performance optimisation
if role in column_map:
if local_role_dict.has_key(role):
# XXX This should be a list
# If a given role exists as a column in the catalog,
# then it is considered as single valued and indexed
# through the catalog.
if not user_is_superuser:
role_column_dict[role] = user_str # XXX This should be a list
# which also includes all user groups
# XXX This should be a list
# which also includes all user groups
column_id = local_role_dict[role]
local_role_column_dict[column_id] = user_str
if role_dict.has_key(role):
# XXX This should be a list
# If a given role exists as a column in the catalog,
# then it is considered as single valued and indexed
# through the catalog.
if not user_is_superuser:
# XXX This should be a list
# which also includes all user groups
column_id = role_dict[role]
role_column_dict[column_id] = user_str
else:
# Else, we use the standard approach
new_allowedRolesAndUsers.append('%s:%s' % (user_or_group, role))
allowedRolesAndUsers = new_allowedRolesAndUsers
if local_role_column_dict == {}:
allowedRolesAndUsers = new_allowedRolesAndUsers
else:
# We only consider here the Owner role (since it was not indexed)
# since some objects may only be visible by their owner
# which was not indexed
if 'owner' in column_map:
for role, column_id in catalog.getSQLCatalogRoleKeysList():
# XXX This should be a list
if not user_is_superuser:
try:
# if called by an executable with proxy roles, we don't use
......@@ -488,11 +506,11 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
eo = getSecurityManager()._context.stack[-1]
proxy_roles = getattr(eo, '_proxy_roles', None)
if not proxy_roles:
role_column_dict['owner'] = user_str
role_column_dict[column_id] = user_str
except IndexError:
role_column_dict['owner'] = user_str
role_column_dict[column_id] = user_str
return allowedRolesAndUsers, role_column_dict
return allowedRolesAndUsers, role_column_dict, local_role_column_dict
def getSecurityUidListAndRoleColumnDict(self, sql_catalog_id=None, **kw):
"""
......@@ -503,7 +521,8 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
site as long as security uids are considered consistent among all
catalogs.
"""
allowedRolesAndUsers, role_column_dict = self.getAllowedRolesAndUsers(**kw)
allowedRolesAndUsers, role_column_dict, local_role_column_dict = \
self.getAllowedRolesAndUsers(**kw)
catalog = self.getSQLCatalog(sql_catalog_id)
method = getattr(catalog, catalog.sql_search_security, None)
if allowedRolesAndUsers:
......@@ -534,7 +553,7 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
security_uid_cache[cache_key] = security_uid_list
else:
security_uid_list = []
return security_uid_list, role_column_dict
return security_uid_list, role_column_dict, local_role_column_dict
security.declarePublic('getSecurityQuery')
def getSecurityQuery(self, query=None, sql_catalog_id=None, **kw):
......@@ -544,7 +563,9 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
catalogued with columns.
"""
original_query = query
security_uid_list, role_column_dict = self.getSecurityUidListAndRoleColumnDict(sql_catalog_id=sql_catalog_id, **kw)
security_uid_list, role_column_dict, local_role_column_dict = \
self.getSecurityUidListAndRoleColumnDict(
sql_catalog_id=sql_catalog_id, **kw)
if role_column_dict:
query_list = []
for key, value in role_column_dict.items():
......@@ -560,6 +581,16 @@ class CatalogTool (UniqueObject, ZCatalog, CMFCoreCatalogTool, ActiveObject):
query, operator='OR')
else:
query = Query(security_uid=security_uid_list, operator='IN')
if local_role_column_dict:
query_list = []
for key, value in local_role_column_dict.items():
new_query = Query(**{key : value})
query_list.append(new_query)
operator_kw = {'operator': 'AND'}
local_role_query = ComplexQuery(*query_list, **operator_kw)
query = ComplexQuery(query, local_role_query, operator='AND')
if original_query is not None:
query = ComplexQuery(query, original_query, operator='AND')
return query
......
......@@ -1896,7 +1896,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
self.assertEquals([], [x.getObject() for x in
obj.searchFolder(portal_type='Bank Account')])
def test_60_OwnerIndexing(self, quiet=quiet, run=run_all_test):
def test_60_ViewableOwnerIndexing(self, quiet=quiet, run=run_all_test):
if not run:
return
......@@ -1913,7 +1913,7 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
sub_portal_type = self.getPortal().portal_types._getOb(sub_portal_type_id)
sql_connection = self.getSQLConnection()
sql = 'select owner from catalog where uid=%s'
sql = 'select viewable_owner as owner from catalog where uid=%s'
login(self, 'super_owner')
......@@ -2292,6 +2292,198 @@ class TestERP5Catalog(ERP5TypeTestCase, LogInterceptor):
result = query('SELECT roles_and_users.uid, roles_and_users.allowedRolesAndUsers FROM roles_and_users, catalog WHERE roles_and_users.uid = catalog.security_uid AND catalog.uid = %i AND roles_and_users.allowedRolesAndUsers LIKE "user:bar%%"' % (object.uid, ))
self.assertEqual(len(result), 0, '%r: len(%r) != 0' % (getObjectDictKey(), result))
def test_RealOwnerIndexing(self, quiet=quiet, run=run_all_test):
if not run:
return
login = PortalTestCase.login
logout = self.logout
user1 = 'local_foo'
user2 = 'local_bar'
uf = self.getPortal().acl_users
uf._doAddUser(user1, user1, ['Member', ], [])
uf._doAddUser(user2, user2, ['Member', ], [])
perm = 'View'
folder = self.getOrganisationModule()
folder.manage_setLocalRoles(user1, ['Author', 'Auditor'])
folder.manage_setLocalRoles(user2, ['Author', 'Auditor'])
portal_type = 'Organisation'
sql_connection = self.getSQLConnection()
login(self, user2)
obj2 = folder.newContent(portal_type=portal_type)
obj2.manage_setLocalRoles(user1, ['Auditor'])
obj2.manage_permission(perm, ['Owner', 'Auditor'], 0)
login(self, user1)
obj = folder.newContent(portal_type=portal_type)
obj.manage_setLocalRoles(user1, ['Owner', 'Auditor'])
# Check that nothing is returned when user can not view the object
obj.manage_permission(perm, [], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, ], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
self.assertSameSet([], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, ], [x.getObject() for x in result])
# Check that object is returned when he can view the object
obj.manage_permission(perm, ['Auditor'], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
self.assertSameSet([], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
# Check that object is returned when he can view the object
obj.manage_permission(perm, ['Owner'], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
self.assertSameSet([obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, ], [x.getObject() for x in result])
# Add a new table to the catalog
sql_catalog = self.portal.portal_catalog.getSQLCatalog()
local_roles_table = "test_local_roles"
create_local_role_table_sql = """
CREATE TABLE `%s` (
`uid` BIGINT UNSIGNED NOT NULL,
`owner_reference` varchar(32) NOT NULL default '',
PRIMARY KEY (`uid`),
KEY `version` (`owner_reference`)
) TYPE=InnoDB;
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z_create_%s' % local_roles_table,
title = '',
arguments = "",
connection_id = 'erp5_sql_connection',
template = create_local_role_table_sql)
drop_local_role_table_sql = """
DROP TABLE IF EXISTS %s
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z0_drop_%s' % local_roles_table,
title = '',
arguments = "",
connection_id = 'erp5_sql_connection',
template = drop_local_role_table_sql)
catalog_local_role_sql = """
REPLACE INTO
%s
VALUES
<dtml-in prefix="loop" expr="_.range(_.len(uid))">
(
<dtml-sqlvar expr="uid[loop_item]" type="int">,
<dtml-sqlvar expr="Base_getOwnerId[loop_item]" type="string" optional>
)
<dtml-if sequence-end>
<dtml-else>
,
</dtml-if>
</dtml-in>
""" % local_roles_table
sql_catalog.manage_addProduct['ZSQLMethods'].manage_addZSQLMethod(
id = 'z_catalog_%s_list' % local_roles_table,
title = '',
connection_id = 'erp5_sql_connection',
arguments = "\n".join(['uid',
'Base_getOwnerId']),
template = catalog_local_role_sql)
get_transaction().commit()
current_sql_catalog_object_list = sql_catalog.sql_catalog_object_list
sql_catalog.sql_catalog_object_list = \
current_sql_catalog_object_list + \
('z_catalog_%s_list' % local_roles_table,)
current_sql_clear_catalog = sql_catalog.sql_clear_catalog
sql_catalog.sql_clear_catalog = \
current_sql_clear_catalog + \
('z0_drop_%s' % local_roles_table, 'z_create_%s' % local_roles_table)
current_sql_catalog_local_role_keys = \
sql_catalog.sql_catalog_local_role_keys
sql_catalog.sql_catalog_local_role_keys = ('Owner | %s.owner_reference' % \
local_roles_table,)
current_sql_search_tables = sql_catalog.sql_search_tables
sql_catalog.sql_search_tables = sql_catalog.sql_search_tables + \
[local_roles_table]
get_transaction().commit()
try:
# Clear catalog
portal_catalog = self.getCatalogTool()
portal_catalog.manage_catalogClear()
get_transaction().commit()
self.portal.portal_caches.clearAllCache()
get_transaction().commit()
obj2.reindexObject()
# Check that nothing is returned when user can not view the object
obj.manage_permission(perm, [], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, ], [x.getObject() for x in result])
method = obj.portal_catalog
result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
self.assertSameSet([], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, ], [x.getObject() for x in result])
# Check that object is returned when he can view the object
obj.manage_permission(perm, ['Auditor'], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
self.assertSameSet([obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
# Check that object is returned when he can view the object
obj.manage_permission(perm, ['Owner'], 0)
obj.reindexObject()
get_transaction().commit()
self.tic()
result = obj.portal_catalog(portal_type=portal_type)
self.assertSameSet([obj2, obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Owner')
self.assertSameSet([obj], [x.getObject() for x in result])
result = obj.portal_catalog(portal_type=portal_type, local_roles='Auditor')
self.assertSameSet([obj2, ], [x.getObject() for x in result])
finally:
sql_catalog.sql_catalog_object_list = \
current_sql_catalog_object_list
sql_catalog.sql_clear_catalog = \
current_sql_clear_catalog
sql_catalog.sql_catalog_local_role_keys = \
current_sql_catalog_local_role_keys
sql_catalog.sql_search_tables = current_sql_search_tables
get_transaction().commit()
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestERP5Catalog))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment