Commit fe27a9c3 authored by Vincent Pelletier's avatar Vincent Pelletier

Reduce reliance on Person.reference as a user id.

To prepare for moving user id to a different property.
Mostly replacing getReference on Persons with Person_getUserId, and
catalog searches with PAS API when it is meant to search for a user and
not really a person by reference.
parent eaa72c7c
......@@ -14,7 +14,7 @@ if access_token_document.getValidationState() == 'validated':
agent_document = access_token_document.getAgentValue()
if agent_document is not None:
result = agent_document.getReference(None)
result = agent_document.Person_getUserId()
comment = "Token usage accepted"
access_token_document.invalidate(comment=comment)
......
......@@ -22,6 +22,6 @@ if access_token_document.getValidationState() == 'validated':
agent_document = access_token_document.getAgentValue()
if agent_document is not None:
result = agent_document.getReference(None)
result = agent_document.Person_getUserId()
return result
......@@ -91,7 +91,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = self._getTokenCredential(self.portal.REQUEST)
self.assertEqual(result.get('external_login'), person.getReference())
self.assertEqual(result.get('external_login'), person.Person_getUserId())
def test_bad_token(self):
person = self.person = self._createPerson(self.new_id)
......@@ -129,7 +129,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
result = access_token.RestrictedAccessToken_getExternalLogin()
self.assertEqual(result, person.getReference())
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(access_token.getValidationState(), 'validated')
def test_RestrictedAccessToken_getExternalLogin_access_token_secret(self):
......@@ -156,7 +156,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
result = access_token.RestrictedAccessToken_getExternalLogin()
self.assertEqual(result, person.getReference())
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(access_token.getValidationState(), 'validated')
def test_RestrictedAccessToken_getExternalLogin_no_agent(self):
......@@ -226,7 +226,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
result = access_token.OneTimeRestrictedAccessToken_getExternalLogin()
self.assertEqual(result, person.getReference())
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(access_token.getValidationState(), 'invalidated')
def test_OneTimeRestrictedAccessToken_getExternalLogin_wrong_values(self):
......
......@@ -22,7 +22,7 @@ context.activate().AccountingTransactionModule_viewFrenchAccountingTransactionFi
at_date,
simulation_state,
ledger,
user_name=person_value.getReference(),
user_name=person_value.Person_getUserId(),
tag=tag,
aggregate_tag=aggregate_tag)
......
......@@ -35,6 +35,6 @@ for person, failure_list in all_blocked_user_login_dict.items():
person.getTitle(),
**{'title': person.getTitle(),
'count':len(failure_list),
'reference': person.getReference(),
'reference': person.Person_getUserId(),
'url': person.absolute_url()}))
return blocked_user_login_list
......@@ -9,7 +9,7 @@ if not portal.Base_checkPermission('portal_preferences', 'Add portal content'):
try:
preference = portal.portal_preferences.createPreferenceForUser(
context.getReference(),
context.Person_getUserId(),
enable=True,
)
except ValueError:
......
......@@ -46,7 +46,7 @@ select_expression = {'date' : 'DATE_FORMAT(creation_date, "%s")'%sql_format, 'po
group_by = ['DATE_FORMAT(creation_date, "%s")' % sql_format, 'portal_type']
# count number of object created by the user for each type of document
reference = kw.get('person_reference_list', context.getReference())
reference = kw.get('person_reference_list', context.Person_getUserId())
result_list = context.portal_catalog.countResults(select_expression=select_expression,
portal_type=portal_type_list,limit=None,
owner=reference,query=query,
......
......@@ -24,7 +24,7 @@ if context.getPortalObject().hasObject('event_module'):
form_id='Person_viewPersonDetailedEventList'))
# contributions list
if context.getReference() not in (None, ""):
if context.Person_getUserId() not in (None, ""):
# list only if user has a login defined
aggregation_level = context.REQUEST.get('aggregation_level')
from_date = context.REQUEST.get('from_date')
......
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string></string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Person_getUserId</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -48,7 +48,7 @@ group_by = ['DATE_FORMAT(creation_date, "%s")' % sql_format,]
# count number of object created by the user for each type of document
result_list = context.portal_catalog.countResults(select_expression=select_expression,
portal_type=portal_type_list,limit=None,
owner=context.getReference(),query=query,
owner=context.Person_getUserId(),query=query,
group_by_expression=group_by)
# build result dict per portal_type then period
......
# XXX-Luke: Seb pointed out that this is very bad idea to clear cache.
document = state_change['object']
if document.getReference() is not None:
if document.Person_getUserId() is not None:
cache_tool = document.getPortalObject().portal_caches
cache_tool.clearCache(cache_factory_list=('erp5_content_short', ),
before_commit=True)
person = sci['object']
if sci['object'].getReference():
if person.Person_getUserId():
person.activate(after_path_and_method_id=(person.getPath(),
('immediateReindexObject',
'recursiveImmediateReindexObject'))).Person_createUserPreference()
......@@ -8,13 +8,12 @@ catalog.
from Products.ERP5Type.Log import log
object = sci['object']
portal = sci.getPortal()
portal = object.getPortalObject()
translateString = portal.Base_translateString
portal_catalog = portal.portal_catalog
# Get the owner
owner = object.getViewPermissionOwner()
owner_value = portal_catalog.getResultValue(portal_type='Person', reference=owner)
owner_value = portal.Base_getUserValueByUserId(owner)
# Get the authenticated user
user_value = portal.portal_membership.getAuthenticatedMember().getUserValue()
......
......@@ -9,7 +9,7 @@ if not key:
token = {
'expiration_timestamp': addToDate(DateTime(), to_add={'hour': 1}).timeTime(),
'reference': context.getReference(),
'reference': context.Person_getUserId(),
'user-agent': context.REQUEST.getHeader('User-Agent'),
'remote-addr': context.REQUEST.get('REMOTE_ADDR')
}
......
......@@ -68,7 +68,7 @@ class TestERP5BearerToken(ERP5TypeTestCase):
token, expiration_time = self.person.Person_getBearerToken()
self.portal.REQUEST._auth = 'Bearer %s' % token
reference = self.getTokenCredential(self.portal.REQUEST)
self.assertEqual(reference, self.person.getReference())
self.assertEqual(reference, self.person.Person_getUserId())
def test_different_user_agent(self):
token, expiration_time = self.person.Person_getBearerToken()
......@@ -103,7 +103,7 @@ class TestERP5BearerToken(ERP5TypeTestCase):
# they are not allowing to pass arguments, so lets hack in test
token = {
'expiration_timestamp': DateTime()-1,
'reference': self.person.getReference(),
'reference': self.person.Person_getUserId(),
'user-agent': self.portal.REQUEST.getHeader('User-Agent'),
'remote-addr': self.portal.REQUEST.get('REMOTE_ADDR')
}
......
......@@ -12,19 +12,19 @@ class Person(ERP5Person):
# in ERP5 user has no SetOwnPassword permission on Person document
# referring himself, so implement "security" by checking that currently
# logged in user is trying to get/revoke his own certificate
reference = self.getReference()
if not reference:
user_id = self.Person_getUserId()
if not user_id:
raise
if getSecurityManager().getUser().getId() != reference:
if getSecurityManager().getUser().getId() != user_id:
raise
def _getCertificate(self):
return self.getPortalObject().portal_certificate_authority\
.getNewCertificate(self.getReference())
.getNewCertificate(self.Person_getUserId())
def _revokeCertificate(self):
return self.getPortalObject().portal_certificate_authority\
.revokeCertificateByCommonName(self.getReference())
.revokeCertificateByCommonName(self.Person_getUserId())
def getCertificate(self):
"""Returns new SSL certificate"""
......
......@@ -78,7 +78,7 @@ for gadget in context.portal_gadgets.objectValues():
# Add a tab and a gadget for everyone
portal = context.getPortalObject()
for person in context.person_module.objectValues():
user_name = person.getReference()
user_name = person.Person_getUserId()
tag = '%s_%s_%s' %(user_name,
'erp5_front',
None)
......
......@@ -263,7 +263,7 @@ class TestRunMyDocsConfiguratorWorkflowFranceLanguage(TestRunMyDocsConfiguratorW
self._stepSetupMultipleUserAccountThree(sequence, user_list)
def stepCheckKnowledgePadRole(self, sequence=None, sequence_list=None, **kw):
self.login("french_creator")
self.loginByUserName("french_creator")
self._stepCheckKnowledgePadRole()
......@@ -312,5 +312,5 @@ class TestRunMyDocsConfiguratorWorkflowBrazilLanguage(TestRunMyDocsConfiguratorW
self._stepSetupMultipleUserAccountThree(sequence, user_list)
def stepCheckKnowledgePadRole(self, sequence=None, sequence_list=None, **kw):
self.login("person_creator")
self.loginByUserName("person_creator")
self._stepCheckKnowledgePadRole()
......@@ -765,9 +765,9 @@ class StandardConfigurationMixin(TestLiveConfiguratorWorkflowMixin):
self.assertEqual('2008', period.getShortTitle())
# security on this period has been initialised
for username in self.accountant_username_list:
for user_id in self._getUserIdList(self.accountant_username_list):
self.failUnlessUserCanPassWorkflowTransition(
username, 'cancel_action', period)
user_id, 'cancel_action', period)
def stepCheckSaleTradeCondition(self, sequence=None, sequence_list=None, **kw):
"""
......
......@@ -390,15 +390,15 @@ class TestUNGConfiguratorWorkflowFranceLanguage(TestUNGConfiguratorWorkflowMixin
def stepCheckWebSiteRoles(self, sequence=None, sequence_list=None, **kw):
""" Check permission of Web Site with normal user """
self.login("french_assignor")
self.loginByUserName("french_assignor")
self._stepCheckWebSiteRoles()
def stepCheckKnowledgePadRole(self, sequence=None, sequence_list=None, **kw):
self.login("french_creator")
self.loginByUserName("french_creator")
self._stepCheckKnowledgePadRole()
def stepCheckCreateNewEvent(self, sequence=None, sequence_list=None, **kw):
self.login("french_assignee")
self.loginByUserName("french_assignee")
self._stepCheckCreateNewEvent()
......@@ -485,13 +485,13 @@ class TestUNGConfiguratorWorkflowBrazilLanguage(TestUNGConfiguratorWorkflowMixin
def stepCheckWebSiteRoles(self, sequence=None, sequence_list=None, **kw):
""" Check permission of Web Site with normal user """
self.login("person_assignor")
self.loginByUserName("person_assignor")
self._stepCheckWebSiteRoles()
def stepCheckKnowledgePadRole(self, sequence=None, sequence_list=None, **kw):
self.login("person_creator")
self.loginByUserName("person_creator")
self._stepCheckKnowledgePadRole()
def stepCheckCreateNewEvent(self, sequence=None, sequence_list=None, **kw):
self.login("person_assignee")
self.loginByUserName("person_assignee")
self._stepCheckCreateNewEvent()
......@@ -13,8 +13,9 @@ portal_preferences = context.portal_preferences
person = context.getDestinationDecisionValue(portal_type="Person")
# Create user of the person only if not exist
if person.hasReference() and person.getPassword():
return person.getReference(), None
user_id = person.Person_getUserId()
if user_id and person.hasPassword():
return user_id, None
# Set login
login = context.getReference()
......
......@@ -3,29 +3,29 @@
Proxy : this required a manager proxy role to be able to search in all persons
'''
portal = context.getPortalObject()
person_module = portal.getDefaultModule('Person')
request = context.REQUEST
web_site = context.getWebSiteValue()
if web_site:
request.set("came_from", web_site.absolute_url())
if choice == "password":
request.set('reference', reference)
portal_preferences = context.portal_preferences
result = person_module.searchFolder(reference={'query': reference, 'key': 'ExactMatch'})
if len(result) != 1:
user_id = portal.Base_getUserIdByUserName(reference)
if user_id is None:
person = None
else:
person = portal.Base_getUserValueByUserId(user_id)
if person is None:
portal_status_message = context.Base_translateString("Could not find your user account.")
if web_site:
return web_site.Base_redirect('login_form', keep_items = dict(portal_status_message=portal_status_message ))
return portal.Base_redirect('login_form', keep_items = dict(portal_status_message=portal_status_message ))
person = result[0]
#If any question, we can create directly the credential recovery
question_free_text = person.getDefaultCredentialQuestionQuestionFreeText()
question_title = person.getDefaultCredentialQuestionQuestionTitle()
if not (question_free_text or question_title) or \
not portal_preferences.isPreferredAskCredentialQuestion():
not portal.portal_preferences.isPreferredAskCredentialQuestion():
return context.ERP5Site_newCredentialRecovery(reference=reference)
web_section = context.getWebSectionValue()
......
......@@ -45,7 +45,11 @@ else:
username = person.getReference()
if password and username == str(portal.portal_membership.getAuthenticatedMember()):
credential_update.accept()
portal.cookie_authentication.credentialsChanged(username, username, password)
portal.cookie_authentication.credentialsChanged(
person.Person_getUserId(),
username,
password,
)
portal_status_message = "Password changed."
portal_status_message = context.Base_translateString(portal_status_message)
......
##############################################################################
#
# Copyright (c) 2006-2007 Nexedi SA and Contributors. All Rights Reserved.
# Copyright (c) 2006-2007,2016 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
......@@ -30,8 +30,7 @@ def getPersonRoleList(self, person, object):
"""
Get list of local roles for user.
"""
acl_users = self.getPortalObject().acl_users
if person.getReference() is not None:
user = acl_users.getUserById(person.getReference()).__of__(acl_users)
user_role_list = user.getRolesInContext(object)
return user_role_list
user_id = person.Person_getUserId()
if user_id is not None:
acl_users = self.getPortalObject().acl_users
return acl_users.getUserById(user_id).__of__(acl_users).getRolesInContext(object)
......@@ -13,22 +13,12 @@ from Products.ERP5Type.Log import log
category_list = []
person_module = context.portal_url.getPortalObject().getDefaultModule('Person')
# It is better to keep getObject(), in this script this
# prevent a very strange bug, sometimes without getObject the
# assignment is not found
person_object_list = [x.getObject() for x in person_module.searchFolder(portal_type='Person', reference=user_name)]
if len(person_object_list) != 1:
if len(person_object_list) > 1:
raise ConsistencyError, "Error: There is more than one Person with reference '%s'" % user_name
else:
# if a person_object was not found in the module, we do nothing more
# this happens for example when a manager with no associated person object
# creates a person_object for a new user
return []
person_object = person_object_list[0]
person_object = context.Base_getUserValueByUserId(user_name)
if person_object is None:
# if a person_object was not found in the module, we do nothing more
# this happens for example when a manager with no associated person object
# creates a person_object for a new user
return []
# We look for valid assignments of this user
for assignment in person_object.contentValues(filter={'portal_type': 'Assignment'}):
......
......@@ -5,21 +5,12 @@ XXX I'm not sure it is used anywhere at the moment.
category_list = []
person_module = context.portal_url.getPortalObject().getDefaultModule('Person')
# It is better to keep getObject(), in this script this
# prevent a very strange bug, sometimes without getObject the
# assignment is not found
person_object_list = [x.getObject() for x in person_module.searchFolder(portal_type='Person', reference=user_name)]
if len(person_object_list) != 1:
if len(person_object_list) > 1:
raise ConsistencyError, "Error: There is more than one Person with reference '%s'" % user_name
else:
# if a person_object was not found in the module, we do nothing more
# this happens for example when a manager with no associated person object
# creates a person_object for a new user
return []
person_object = person_object_list[0]
person_object = context.Base_getUserValueByUserId(user_name)
if person_object is None:
# if a person_object was not found in the module, we do nothing more
# this happens for example when a manager with no associated person object
# creates a person_object for a new user
return []
category_dict = {}
for base_category in base_category_list:
......
......@@ -6,6 +6,8 @@ class TestDiscussionThread(SecurityTestCase):
A Sample Test Class
"""
user_id_dict = {}
def getTitle(self):
return "TestDiscussionThread"
......@@ -27,7 +29,9 @@ class TestDiscussionThread(SecurityTestCase):
]
# now we create the users
for user in user_list:
self.createSimpleUser(**user)
if not self.portal.acl_users.searchUsers(login=user['reference'], exact_match=True):
self.user_id_dict[user['reference']] = \
self.createSimpleUser(**user).Person_getUserId()
self.commit()
self.tic()
......@@ -52,19 +56,19 @@ class TestDiscussionThread(SecurityTestCase):
- that user can reply to his thread
"""
# forum_user should be able to access/view the forum module
self.assertUserCanAccessDocument('forum_user', self.forum_module)
self.assertUserCanViewDocument('forum_user', self.forum_module)
self.assertUserCanAddDocument('forum_user', self.forum_module)
self.assertUserCanAccessDocument(self.user_id_dict['forum_user'], self.forum_module)
self.assertUserCanViewDocument(self.user_id_dict['forum_user'], self.forum_module)
self.assertUserCanAddDocument(self.user_id_dict['forum_user'], self.forum_module)
self.login('forum_user')
self.login(self.user_id_dict['forum_user'])
thread_content='Hey, lets create a new thread!'
thread = self._newThread(content=thread_content)
# user should be able to access/view the created thread
self.assertUserCanViewDocument('forum_user', thread)
self.assertUserCanAccessDocument('forum_user', thread)
self.assertUserCanAddDocument('forum_user', thread)
self.assertUserCanViewDocument(self.user_id_dict['forum_user'], thread)
self.assertUserCanAccessDocument(self.user_id_dict['forum_user'], thread)
self.assertUserCanAddDocument(self.user_id_dict['forum_user'], thread)
# get thread posts
thread_posts = thread.objectValues()
......@@ -89,8 +93,8 @@ class TestDiscussionThread(SecurityTestCase):
batch_mode=True,
)
self.assertUserCanViewDocument('forum_user', post)
self.assertUserCanAccessDocument('forum_user', post)
self.assertUserCanViewDocument(self.user_id_dict['forum_user'], post)
self.assertUserCanAccessDocument(self.user_id_dict['forum_user'], post)
self.tic()
......@@ -110,22 +114,22 @@ class TestDiscussionThread(SecurityTestCase):
- outsiders can't read the thread
- visitor can read the thread
"""
self.login('forum_user')
self.login(self.user_id_dict['forum_user'])
thread = self._newThread()
self.failIfUserCanViewDocument('spy', thread)
self.failIfUserCanAccessDocument('spy', thread)
self.failIfUserCanViewDocument(self.user_id_dict['spy'], thread)
self.failIfUserCanAccessDocument(self.user_id_dict['spy'], thread)
self.assertUserCanViewDocument('visitor', thread)
self.assertUserCanAccessDocument('visitor', thread)
self.assertUserCanViewDocument(self.user_id_dict['visitor'], thread)
self.assertUserCanAccessDocument(self.user_id_dict['visitor'], thread)
# Check that visitor has permissions on related objects
# for example, if visitor has no permissions on the Person
# module, the above checks will pass, but the view
# will not work, because Person.getTitle() will fail
self.assertUserCanViewDocument('visitor', self.portal.person_module)
self.assertUserCanAccessDocument('visitor', self.portal.person_module)
self.assertUserCanViewDocument(self.user_id_dict['visitor'], self.portal.person_module)
self.assertUserCanAccessDocument(self.user_id_dict['visitor'], self.portal.person_module)
response = self.publish('/%s/%s' % \
(self.portal.getId(), thread.getRelativeUrl()),
......@@ -140,13 +144,13 @@ class TestDiscussionThread(SecurityTestCase):
- visitor cannot reply
- visitor cannot post a new thread
"""
self.login('forum_user')
self.login(self.user_id_dict['forum_user'])
thread = self._newThread()
# visitor cannot reply to a thread
self.failIfUserCanAddDocument('visitor', thread)
self.failIfUserCanAddDocument(self.user_id_dict['visitor'], thread)
# visitor cannot create a new thread
self.failIfUserCanAddDocument('visitor', self.forum_module)
self.failIfUserCanAddDocument(self.user_id_dict['visitor'], self.forum_module)
def testAdminCanModerate(self):
"""
......@@ -157,16 +161,16 @@ class TestDiscussionThread(SecurityTestCase):
- admin can display it
- admin reopens it
"""
self.login('admin')
self.login(self.user_id_dict['admin'])
thread = self._newThread()
self.assertUserCanPassWorkflowTransition('admin', 'close_action', thread)
self.assertUserCanPassWorkflowTransition(self.user_id_dict['admin'], 'close_action', thread)
thread.close()
self.commit()
self.assertUserCanViewDocument('admin', thread)
self.assertUserCanAccessDocument('admin', thread)
self.assertUserCanPassWorkflowTransition('admin', 'unclose_action', thread)
self.assertUserCanViewDocument(self.user_id_dict['admin'], thread)
self.assertUserCanAccessDocument(self.user_id_dict['admin'], thread)
self.assertUserCanPassWorkflowTransition(self.user_id_dict['admin'], 'unclose_action', thread)
def testUserCannotModerate(self):
"""
......@@ -174,11 +178,11 @@ class TestDiscussionThread(SecurityTestCase):
- user creates thread
- user cannot close it
"""
self.login('forum_user')
self.login(self.user_id_dict['forum_user'])
thread = self._newThread()
self.assertUserCanPassWorkflowTransition('forum_user', 'close_action', thread)
self.failIfUserCanPassWorkflowTransition('another_forum_user', 'close_action', thread)
self.assertUserCanPassWorkflowTransition(self.user_id_dict['forum_user'], 'close_action', thread)
self.failIfUserCanPassWorkflowTransition(self.user_id_dict['another_forum_user'], 'close_action', thread)
def testCanPostIfNotOwner(self):
......@@ -188,14 +192,14 @@ class TestDiscussionThread(SecurityTestCase):
- another_forum_user displays it
- another_forum_user replies
"""
self.login('forum_user')
self.login(self.user_id_dict['forum_user'])
thread = self._newThread()
# other user (not thread owner) can access and view the thread
self.assertUserCanViewDocument('another_forum_user', thread)
self.assertUserCanAccessDocument('another_forum_user', thread)
self.assertUserCanViewDocument(self.user_id_dict['another_forum_user'], thread)
self.assertUserCanAccessDocument(self.user_id_dict['another_forum_user'], thread)
# ... and can reply to thread even if he did not start it
self.assertUserCanAddDocument('another_forum_user', thread)
self.assertUserCanAddDocument(self.user_id_dict['another_forum_user'], thread)
response = self.publish('/%s/%s' % \
(self.portal.getId(), thread.getRelativeUrl()),
......
context.getWebSiteValue().Base_redirect(form_id='ERP5Site_viewSearchResult',
keep_items=dict(reset=1, portal_type=list(context.getPortalDocumentTypeList()),
owner=context.getReference()))
owner=context.Person_getUserId()))
......@@ -7,7 +7,7 @@
"""
if context.getPortalType() == 'Person':
# If context is a person, get the user
user = context.getReference()
user = context.Person_getUserId()
if user is None:
# no way to determine documents if we have no reference
return [[0]]
......
......@@ -7,7 +7,7 @@
"""
if context.getPortalType() == 'Person':
# If context is a person, get the user
user = context.getReference()
user = context.Person_getUserId()
if user is None:
# no way to determine documents if we have no reference
return []
......
......@@ -40,9 +40,9 @@ if (
source_person.getDefaultEmailText() and # XXX Add unit test: check if task confirmation works if assignee has no mail
destination_decision_person is not None and
destination_decision_person.getDefaultEmailText() and
destination_decision_person.getReference()
destination_decision_person.Person_getUserId()
):
if portal.acl_users.searchUsers(id=source_person.getReference(), exact_match=True):
if portal.acl_users.searchUsers(id=source_person.Person_getUserId(), exact_match=True):
message = """A new task has been assigned to you by %(assignor)s.
This task is named: %(title)s
......
......@@ -111,7 +111,7 @@
<dictionary>
<item>
<key> <string>_text</string> </key>
<value> <string>python:[(x.getTitle(), x.getRelativeUrl()) for x in here.getSourceTradeValueList()] + [(context.portal_catalog.getResultValue(portal_type=\'Person\', reference=context.portal_membership.getAuthenticatedMember()).getTitle(), context.portal_catalog.getResultValue(portal_type=\'Person\', reference=context.portal_membership.getAuthenticatedMember()).getRelativeUrl())] + [(\'\', \'\')]</string> </value>
<value> <string>python:[(x.getTitle(), x.getRelativeUrl()) for x in here.getSourceTradeValueList() + [context.portal_membership.getAuthenticatedMember().getUserValue()]] + [(\'\', \'\')]</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -12,8 +12,8 @@ if destination_decision_person is None:
if source_person is not None \
and destination_decision_person is not None \
and destination_decision_person.getDefaultEmailText() \
and destination_decision_person.getReference():
if portal.acl_users.searchUsers(id=source_person.getReference(), exact_match=True):
and destination_decision_person.Person_getUserId():
if portal.acl_users.searchUsers(id=source_person.Person_getUserId(), exact_match=True):
message = """
%s has finished the task report titled with %s.
Please look at this URL:
......
......@@ -24,8 +24,8 @@ if destination_decision_person is None:
if source_person is not None \
and destination_decision_person is not None\
and source_person.getDefaultEmailText() \
and source_person.getReference():
if portal.acl_users.searchUsers(id=source_person.getReference(), exact_match=True):
and source_person.Person_getUserId():
if portal.acl_users.searchUsers(id=source_person.Person_getUserId(), exact_match=True):
message = """
A question from task has been assigned to you by %(assignor)s.
......
......@@ -12,8 +12,8 @@ if destination_decision_person is None:
if source_person is not None \
and destination_decision_person is not None\
and source_person.getDefaultEmailText() \
and source_person.getReference():
if portal.acl_users.searchUsers(id=source_person.getReference(), exact_match=True):
and source_person.Person_getUserId():
if portal.acl_users.searchUsers(id=source_person.Person_getUserId(), exact_match=True):
message = """
Restarted task has been assigned to you by %(assignor)s.
......
......@@ -11,8 +11,7 @@ if not person_list:
person_list = context.portal_selections.getSelectionValueList(selection_name)
# Find authenticated user
user = context.portal_membership.getAuthenticatedMember()
user_person = context.portal_catalog.getResultValue(portal_type='Person', reference=user)
user_value = context.portal_membership.getAuthenticatedMember().getUserValue()
# For every person, create an event
if not single_event:
......@@ -28,12 +27,12 @@ if not single_event:
# Trigger appropriate workflow action
if direction == 'incoming':
event.setSourceValue(person)
event.setDestinationValue(user_person)
event.setDestinationValue(user_value)
event.receive()
else:
event.plan()
event.setDestinationValue(person)
event.setSourceValue(user_person)
event.setSourceValue(user_value)
else:
if direction == 'incoming' and len(person_list) > 1:
# This case is not possible
......@@ -48,7 +47,7 @@ else:
text_content=text_content) # text_format is set by Event_init
event.plan()
event.setDestinationValueList(person_list)
event.setSourceValue(user_person)
event.setSourceValue(user_value)
count = 1
# Redirect to the event module (but is this the best place to go since events are not yet indexed ?)
......
......@@ -6,8 +6,8 @@ for key in kw.keys():
query_dict[key] = dict(query=kw[key], key='ExactMatch')
result_list = context.portal_catalog(**query_dict)
owner_id = context.portal_membership.getAuthenticatedMember().getId()
functional_test_username = context.Zuite_getHowToInfo()['functional_test_username']
functional_another_test_username = context.Zuite_getHowToInfo()['functional_another_test_username']
functional_test_username = context.Base_getUserIdByUserName(context.Zuite_getHowToInfo()['functional_test_username'])
functional_another_test_username = context.Base_getUserIdByUserName(context.Zuite_getHowToInfo()['functional_another_test_username'])
for result in result_list:
object = result.getObject()
......
......@@ -25,6 +25,6 @@ if person is None:
# XXX (lucas): These tests must be able to run on an instance without security.
for role in ('Assignee', 'Assignor', 'Associate', 'Auditor', 'Owner'):
portal.acl_users.zodb_roles.assignRoleToPrincipal(role, functional_test_username)
portal.acl_users.zodb_roles.assignRoleToPrincipal(role, person.Person_getUserId())
return 'Done.'
......@@ -25,6 +25,6 @@ if person is None:
# XXX (lucas): These tests must be able to run on an instance without security.
for role in ('Assignee', 'Assignor', 'Associate', 'Auditor', 'Owner'):
portal.acl_users.zodb_roles.assignRoleToPrincipal(role, functional_test_username)
portal.acl_users.zodb_roles.assignRoleToPrincipal(role, person.Person_getUserId())
return 'Done.'
......@@ -13,13 +13,9 @@
in erp5_base ?
"""
from zExceptions import Unauthorized
owner_value_list = []
getUserValueByUserId = context.Base_getUserValueByUserId
try:
owner_id_list = [i[0] for i in context.get_local_roles() if 'Owner' in i[1]]
owner_id_list = [getUserValueByUserId(i[0]) for i in context.get_local_roles() if 'Owner' in i[1]]
except Unauthorized:
owner_id_list = []
if len(owner_id_list):
return context.portal_catalog(portal_type='Person', reference=owner_id_list)
else:
return []
return [x for x in owner_id_list if x is not None]
......@@ -3,8 +3,8 @@ import json
form = context.REQUEST.form
portal = context.getPortalObject()
if len(portal.portal_catalog(portal_type="Person",
reference=form.get("login_name"))):
login = form.get("login_name")
if context.acl_users.searchUsers(login=login, exact_match=True):
return json.dumps(None)
person = portal.person_module.newContent(portal_type="Person")
......@@ -12,7 +12,8 @@ person.edit(first_name=form.get("firstname"),
last_name=form.get("lastname"),
email_text=form.get("email"),
password=form.get("password"),
reference=form.get("login_name"))
reference=login,
)
assignment = person.newContent(portal_type='Assignment')
assignment.setFunction("ung_user")
......
......@@ -8,8 +8,8 @@ if REQUEST is not None:
portal = context.getPortalObject()
if not context.getReference():
# noop in case if invoked on non loggable object
if not context.Person_getUserId():
# noop in case if invoked on non-user object
return
from Products.ERP5Type.Message import translateString
......
......@@ -92,7 +92,7 @@
<dictionary>
<item>
<key> <string>text</string> </key>
<value> <string>python: not here.getReference() and context.portal_wizard.WizardTool_isUserSynchronizationAllowed()</string> </value>
<value> <string>python: not here.Person_getUserId() and context.portal_wizard.WizardTool_isUserSynchronizationAllowed()</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -40,18 +40,15 @@ class ShaSecurityMixin(object):
"""
Initialize the ERP5 site.
"""
self.lucas_user = 'lucas'
self.createUser(self.lucas_user, self.lucas_user)
self.lucas_user = self.createUser('lucas', 'lucas').Person_getUserId()
self.toto_user = 'toto'
self.createUser(self.toto_user, self.toto_user)
self.toto_user = self.createUser('toto', 'toto').Person_getUserId()
def createUser(self, reference, password):
"""
Create a user with basic information
"""
person = self.portal.portal_catalog.getResultValue(portal_type='Person',
reference=reference)
person = self.portal.portal_catalog.getResultValue(portal_type='Person', reference=reference)
if person is None:
person = self.portal.person_module.newContent(portal_type='Person')
person.edit(first_name=reference,
......@@ -71,6 +68,8 @@ class ShaSecurityMixin(object):
assignment.open()
self.tic()
return person
def changeUser(self, user_id):
"""
Change the current user to user_id
......
......@@ -59,7 +59,7 @@ class TestShaCacheExternal(ShaCacheMixin, ShaSecurityMixin, ERP5TypeTestCase):
# Define POST headers with Authentication
self.content_type = 'application/json'
authentication_string = '%s:%s' % (self.lucas_user, self.lucas_user)
authentication_string = 'lucas:lucas'
base64string = base64.encodestring(authentication_string).strip()
self.header_dict = {'Authorization': 'Basic %s' % base64string,
'Content-Type': self.content_type}
......
......@@ -60,7 +60,7 @@ class TestShaDirExternal(ShaDirMixin, ShaSecurityMixin, ERP5TypeTestCase):
# Define POST headers with Authentication
self.content_type = 'application/json'
authentication_string = '%s:%s' % (self.lucas_user, self.lucas_user)
authentication_string = 'lucas:lucas'
base64string = base64.encodestring(authentication_string).strip()
self.header_dict = {'Authorization': 'Basic %s' % base64string,
'Content-Type': self.content_type}
......
# Proxy roles: Manager to access searchUsers
if REQUEST is not None:
return
user_id_set = {x['id'] for x in context.acl_users.searchUsers(
login=user_name,
exact_match=True,
)}
if len(user_id_set) == 1:
user_id, = user_id_set
return user_id
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>user_name, REQUEST=None</string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Base_getUserIdByUserName</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
# Proxy roles: Manager to access searchUsers
if REQUEST is not None:
return
user_path_set = {x['path'] for x in context.acl_users.searchUsers(
id=user_id,
exact_match=True,
) if 'path' in x}
if user_path_set:
user_path, = user_path_set
return context.getPortalObject().restrictedTraverse(user_path)
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>user_id, REQUEST=None</string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>Base_getUserValueByUserId</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
......@@ -23,7 +23,7 @@ def getActorName(actor):
return actor_name_cache[actor]
except KeyError:
actor_name_cache[actor] = actor
person = portal_object.portal_catalog.getResultValue(portal_type='Person', reference=actor)
person = portal_object.Base_getUserValueByUserId(actor)
if person is not None:
actor_name_cache[actor] = person.getTitle()
return actor_name_cache[actor]
......
......@@ -72,7 +72,7 @@ for base_category in category_order:
portal_type = category_object.getPortalType()
if portal_type == 'Person':
# We define a person here
user_name = category_object.getReference()
user_name = category_object.Person_getUserId()
if user_name is not None:
user_list.append(user_name)
else:
......
......@@ -22,24 +22,12 @@ NOTE: for now, this script requires proxy manager
category_list = []
# Get the Person module
person_module = context.portal_url.getPortalObject().getDefaultModule('Person')
# It is better to keep getObject(), in this script this
# prevent a very strange bug, sometimes without getObject the
# assignment is not found
person_object_list = [x.getObject() for x in person_module.searchFolder(portal_type='Person', reference=user_name)]
if len(person_object_list) != 1:
if len(person_object_list) > 1:
raise ConsistencyError, "Error: There is more than one Person with reference '%s'" % user_name
else:
# if a person_object was not found in the module, we do nothing more
# this happens for example when a manager with no associated person object
# creates a person_object for a new user
return []
person_object = person_object_list[0]
person_object = context.Base_getUserValueByUserId(user_name)
if person_object is None:
# if a person_object was not found in the module, we do nothing more
# this happens for example when a manager with no associated person object
# creates a person_object for a new user
return []
# We look for every valid assignments of this user
for assignment in person_object.contentValues(filter={'portal_type': 'Assignment'}):
if assignment.getValidationState() == 'open':
......
......@@ -22,24 +22,12 @@ NOTE: for now, this script requires proxy manager
category_list = []
# Get the Person module
person_module = context.portal_url.getPortalObject().getDefaultModule('Person')
# It is better to keep getObject(), in this script this
# prevent a very strange bug, sometimes without getObject the
# assignment is not found
person_object_list = [x.getObject() for x in person_module.searchFolder(portal_type='Person', reference=user_name)]
if len(person_object_list) != 1:
if len(person_object_list) > 1:
raise ConsistencyError, "Error: There is more than one Person with reference '%s'" % user_name
else:
# if a person_object was not found in the module, we do nothing more
# this happens for example when a manager with no associated person object
# creates a person_object for a new user
return []
person_object = person_object_list[0]
person_object = context.Base_getUserValueByUserId(user_name)
if person_object is None:
# if a person_object was not found in the module, we do nothing more
# this happens for example when a manager with no associated person object
# creates a person_object for a new user
return []
# We look for every valid assignments of this user
for assignment in person_object.contentValues(filter={'portal_type': 'Assignment'}):
if assignment.getValidationState() == 'open':
......
......@@ -85,7 +85,7 @@ class TestAccounting_l10n_fr(AccountingTestCase):
self.tic()
uf = self.portal.acl_users
uf.zodb_roles.assignRoleToPrincipal('Assignor', self.username)
uf.zodb_roles.assignRoleToPrincipal('Assignor', person.Person_getUserId())
user = uf.getUser(self.username).__of__(uf)
newSecurityManager(None, user)
......
......@@ -63,7 +63,7 @@ class TestAcknowledgementTool(ERP5TypeTestCase):
self.tic()
acknowledgement_tool_kw = {}
acknowledgement_tool_kw['user_name'] = 'seb'
acknowledgement_tool_kw['user_name'] = person.Person_getUserId()
acknowledgement_tool_kw['portal_type'] = event_type
# draft document must be not be part of acknowledgements
document_url_list = portal.portal_acknowledgements\
......@@ -98,7 +98,7 @@ class TestAcknowledgementTool(ERP5TypeTestCase):
# We now acknowledge the event
acknowledgement = portal.portal_acknowledgements.acknowledge(
path=event.getRelativeUrl(),
user_name='seb')
user_name=person.Person_getUserId())
# Make sure that we have a new acknowledge document which is a proxy of
# the event
self.assertEqual(acknowledgement.getPortalType(), 'Acknowledgement')
......
......@@ -1206,7 +1206,7 @@ class TestWorkflow(SecurityTestCase):
self.assertEqual(sale_invoice.getSimulationState(), 'auto_planned')
# other as anonymous
username = self.other.getReference()
username = self.other.Person_getUserId()
self.failIfUserCanAccessDocument(username, sale_invoice)
self.failIfUserCanAddDocument(username, sale_invoice)
self.failIfUserCanDeleteDocument(username, sale_invoice)
......@@ -1214,7 +1214,7 @@ class TestWorkflow(SecurityTestCase):
self.failIfUserCanViewDocument(username, sale_invoice)
# assignee
username = self.assignee.getReference()
username = self.assignee.Person_getUserId()
self.assertUserCanAccessDocument(username, sale_invoice)
self.assertUserCanAddDocument(username, sale_invoice)
self.assertUserCanDeleteDocument(username, sale_invoice)
......@@ -1222,7 +1222,7 @@ class TestWorkflow(SecurityTestCase):
self.assertUserCanViewDocument(username, sale_invoice)
# assignor
username = self.assignor.getReference()
username = self.assignor.Person_getUserId()
self.assertUserCanAccessDocument(username, sale_invoice)
self.assertUserCanAddDocument(username, sale_invoice)
self.assertUserCanDeleteDocument(username, sale_invoice)
......@@ -1230,7 +1230,7 @@ class TestWorkflow(SecurityTestCase):
self.assertUserCanViewDocument(username, sale_invoice)
# associate
username = self.associate.getReference()
username = self.associate.Person_getUserId()
self.assertUserCanAccessDocument(username, sale_invoice)
self.assertUserCanAddDocument(username, sale_invoice)
self.assertUserCanDeleteDocument(username, sale_invoice)
......@@ -1238,7 +1238,7 @@ class TestWorkflow(SecurityTestCase):
self.assertUserCanViewDocument(username, sale_invoice)
# auditor
username = self.auditor.getReference()
username = self.auditor.Person_getUserId()
self.assertUserCanAccessDocument(username, sale_invoice)
self.failIfUserCanAddDocument(username, sale_invoice)
self.failIfUserCanDeleteDocument(username, sale_invoice)
......@@ -1246,7 +1246,7 @@ class TestWorkflow(SecurityTestCase):
self.assertUserCanViewDocument(username, sale_invoice)
# author
username = self.author.getReference()
username = self.author.Person_getUserId()
self.failIfUserCanAccessDocument(username, sale_invoice)
self.failIfUserCanAddDocument(username, sale_invoice)
self.failIfUserCanDeleteDocument(username, sale_invoice)
......@@ -1254,7 +1254,7 @@ class TestWorkflow(SecurityTestCase):
self.failIfUserCanViewDocument(username, sale_invoice)
# manager
username = self.manager.getReference()
username = self.manager.Person_getUserId()
self.assertUserCanAccessDocument(username, sale_invoice)
self.assertUserCanAddDocument(username, sale_invoice)
self.assertUserCanDeleteDocument(username, sale_invoice)
......
......@@ -231,7 +231,7 @@ class TestCommerce(ERP5TypeTestCase):
#XXX: Security hack (lucas)
self.portal.acl_users.zodb_roles.assignRoleToPrincipal('Manager',
reference)
person.Person_getUserId())
def getDefaultProduct(self, id='1'):
"""
......
......@@ -597,7 +597,7 @@ class TestERP5Credential(ERP5TypeTestCase):
self._assertUserExists('barney', 'secret')
self.login('barney')
from AccessControl import getSecurityManager
self.assertEqual(getSecurityManager().getUser().getIdOrUserName(), 'barney')
self.assertEqual(getSecurityManager().getUser().getIdOrUserName(), person.Person_getUserId())
self.login()
# create a credential recovery
......@@ -833,14 +833,14 @@ class TestERP5Credential(ERP5TypeTestCase):
def stepSetAssigneeRoleToCurrentPersonInCredentialUpdateModule(self,
sequence=None, sequence_list=None, **kw):
person_reference = sequence["person_reference"]
self.portal.credential_update_module.manage_setLocalRoles(person_reference,
user, = self.portal.acl_users.searchUsers(login=sequence['person_reference'], exact_match=True)
self.portal.credential_update_module.manage_setLocalRoles(user['id'],
['Assignor',])
def stepSetAssigneeRoleToCurrentPersonInCredentialRecoveryModule(self,
sequence=None, sequence_list=None, **kw):
person_reference = sequence["person_reference"]
self.portal.credential_recovery_module.manage_setLocalRoles(person_reference,
user, = self.portal.acl_users.searchUsers(login=sequence['person_reference'], exact_match=True)
self.portal.credential_recovery_module.manage_setLocalRoles(user['id'],
['Assignor',])
def stepLogin(self, sequence):
......@@ -861,9 +861,8 @@ class TestERP5Credential(ERP5TypeTestCase):
sequence_list=None, **kw):
person_reference = sequence["person_reference"]
self.login()
person = self.portal.portal_catalog.getResultValue(portal_type="Person",
reference=person_reference)
person.manage_setLocalRoles(person_reference, ["Auditor"])
person = self.portal.acl_users.getUser(person_reference).getUserValue()
person.manage_setLocalRoles(person.Person_getUserId(), ["Auditor"])
self.logout()
def stepCheckPersonAfterUpdatePerson(self, sequence=None,
......
......@@ -1674,6 +1674,7 @@ class TestERP5WebWithSimpleSecurity(ERP5TypeTestCase):
section = site.newContent(portal_type='Web Section', id='section')
person = portal.person_module.newContent(portal_type = 'Person',
reference = person_reference)
person_user_id = person.Person_getUserId()
# add Role Definition for site and section
site_role_definition = site.newContent(portal_type = 'Role Definition',
role_name = 'Assignee',
......@@ -1684,9 +1685,9 @@ class TestERP5WebWithSimpleSecurity(ERP5TypeTestCase):
self.tic()
# check if Role Definition have create local roles
self.assertSameSet(('Assignee',),
site.get_local_roles_for_userid(person_reference))
site.get_local_roles_for_userid(person_user_id))
self.assertSameSet(('Associate',),
section.get_local_roles_for_userid(person_reference))
section.get_local_roles_for_userid(person_user_id))
self.assertRaises(Unauthorized, site_role_definition.edit,
role_name='Manager')
......@@ -1695,9 +1696,9 @@ class TestERP5WebWithSimpleSecurity(ERP5TypeTestCase):
section.manage_delObjects(section_role_definition.getId())
self.tic()
self.assertSameSet((),
site.get_local_roles_for_userid(person_reference))
site.get_local_roles_for_userid(person_user_id))
self.assertSameSet((),
section.get_local_roles_for_userid(person_reference))
section.get_local_roles_for_userid(person_user_id))
def test_03_WebSection_getDocumentValueListSecurity(self):
""" Test WebSection_getDocumentValueList behaviour and security"""
......
......@@ -160,7 +160,7 @@ class TestImmobilisationMixin(ERP5TypeTestCase):
, site = user_data[4]
)
# In the case of PAS, if we want global roles on user, we have to do it manually.
self.assignPASRolesToUser(user_login, user_roles)
self.assignPASRolesToUser(person.Person_getUserId(), user_roles)
assignment.open()
person.validate()
......
......@@ -150,6 +150,7 @@ class TestNotificationTool(ERP5TypeTestCase):
assignment = person.newContent(portal_type='Assignment')
assignment.open()
self.changeToPreviousUser()
sequence['user_a_id'] = person.Person_getUserId()
def stepAddUserB(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -162,6 +163,7 @@ class TestNotificationTool(ERP5TypeTestCase):
assignment = person.newContent(portal_type='Assignment')
assignment.open()
self.changeToPreviousUser()
sequence['user_b_id'] = person.Person_getUserId()
def stepAddUserWithoutEmail(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -174,6 +176,7 @@ class TestNotificationTool(ERP5TypeTestCase):
assignment = person.newContent(portal_type='Assignment')
assignment.open()
self.changeToPreviousUser()
sequence['user_without_email_id'] = person.Person_getUserId()
def test_01_defaultBehaviour(self):
self.assertRaises(
......@@ -191,7 +194,7 @@ class TestNotificationTool(ERP5TypeTestCase):
Check that notification works without sender
"""
self.portal.portal_notifications.sendMessage(
recipient='userA', subject='Subject', message='Message')
recipient=sequence['user_a_id'], subject='Subject', message='Message')
last_message = self.portal.MailHost._last_message
self.assertNotEquals((), last_message)
mfrom, mto, messageText = last_message
......@@ -216,7 +219,7 @@ class TestNotificationTool(ERP5TypeTestCase):
self.assertRaises(
TypeError,
self.portal.portal_notifications.sendMessage,
recipient='userA', message='Message'
recipient=sequence['user_a_id'], message='Message'
)
def test_03_noSubject(self):
......@@ -244,7 +247,7 @@ class TestNotificationTool(ERP5TypeTestCase):
Check that notification is send when no message is passed
"""
self.portal.portal_notifications.sendMessage(
recipient='userA', subject='Subject', )
recipient=sequence['user_a_id'], subject='Subject', )
last_message = self.portal.MailHost._last_message
self.assertNotEquals((), last_message)
mfrom, mto, messageText = last_message
......@@ -267,7 +270,7 @@ class TestNotificationTool(ERP5TypeTestCase):
Check that notification is send in standard use case
"""
self.portal.portal_notifications.sendMessage(
recipient='userA', subject='Subject', message='Message')
recipient=sequence['user_a_id'], subject='Subject', message='Message')
last_message = self.portal.MailHost._last_message
self.assertNotEquals((), last_message)
mfrom, mto, messageText = last_message
......@@ -295,7 +298,7 @@ class TestNotificationTool(ERP5TypeTestCase):
Check attachment
"""
self.portal.portal_notifications.sendMessage(
recipient='userA', subject='Subject', message='Message',
recipient=sequence['user_a_id'], subject='Subject', message='Message',
attachment_list=[
{
'name': 'Attachment 1',
......@@ -339,7 +342,7 @@ class TestNotificationTool(ERP5TypeTestCase):
Check that notification can be send to multiple recipient
"""
self.portal.portal_notifications.sendMessage(
recipient=['userA', 'userB'], subject='Subject', message='Message')
recipient=[sequence['user_a_id'], sequence['user_b_id']], subject='Subject', message='Message')
last_message = self.portal.MailHost._last_message
self.assertNotEquals((), last_message)
......@@ -371,7 +374,7 @@ class TestNotificationTool(ERP5TypeTestCase):
"""
with self.assertRaisesRegexp(ValueError, "email must be set"):
self.portal.portal_notifications.sendMessage(
recipient='userWithoutEmail', subject='Subject', message='Message')
recipient=sequence['user_without_email_id'], subject='Subject', message='Message')
def test_08_PersonWithoutEmail(self):
sequence_list = SequenceList()
......@@ -393,7 +396,7 @@ class TestNotificationTool(ERP5TypeTestCase):
"""
Check that notification is send when recipient is a Person
"""
person = self.portal.portal_catalog(reference='userA', portal_type='Person')[0]
person = self.portal.Base_getUserValueByUserId(sequence['user_a_id'])
self.portal.portal_notifications.sendMessage(
recipient=person.getObject(), subject='Subject', message='Message')
last_message = self.portal.MailHost._last_message
......@@ -428,7 +431,7 @@ class TestNotificationTool(ERP5TypeTestCase):
Yes, I will go."""
self.portal.portal_notifications.sendMessage(
recipient='userA', subject='Subject',
recipient=sequence['user_a_id'], subject='Subject',
message_text_format='text/plain', message=message)
last_message = self.portal.MailHost._last_message
self.assertNotEquals((), last_message)
......@@ -460,7 +463,7 @@ Yes, I will go."""
message = """<a href="http://www.erp5.com/">Click Here!!</a>"""
self.portal.portal_notifications.sendMessage(
recipient='userA', subject='Subject',
recipient=sequence['user_a_id'], subject='Subject',
message_text_format='text/html', message=message)
last_message, = self.portal.MailHost._message_list
mfrom, mto, messageText = last_message
......
......@@ -137,7 +137,7 @@ class TestERP5BankingMixin(ERP5TypeTestCase):
)
if self.PAS_installed and len(user_roles) > 0:
# In the case of PAS, if we want global roles on user, we have to do it manually.
self.assignPASRolesToUser(user_login, user_roles)
self.assignPASRolesToUser(person.Person_getUserId(), user_roles)
elif not self.PAS_installed:
# The user_folder counterpart of the erp5 user must be
# created manually in the case of NuxUserGroup.
......
......@@ -369,10 +369,10 @@ class TestArchive(InventoryAPITestCase):
person = self.portal.person_module.newContent(reference=login)
try:
self.tic()
PortalTestCase.login(self, login)
PortalTestCase.login(self, person.Person_getUserId())
self.assertEqual(['green'], getSecurityManager().getUser().getGroups())
self.portal.portal_caches.clearAllCache()
PortalTestCase.login(self, login)
PortalTestCase.login(self, person.Person_getUserId())
unittest.expectedFailure(self.assertEqual)(
['green'], getSecurityManager().getUser().getGroups())
finally:
......
......@@ -126,24 +126,26 @@ CREATE TABLE alternate_roles_and_users (
# create two persons and users
user1 = self.portal.person_module.newContent(portal_type='Person',
reference='user1')
user1_id = user1.Person_getUserId()
user1.newContent(portal_type='Assignment', group='g1').open()
user1.updateLocalRolesOnSecurityGroups()
self.assertEqual(user1.__ac_local_roles__.get('user1'), ['Auditor'])
self.assertEqual(user1.__ac_local_roles__.get(user1_id), ['Auditor'])
self.assertEqual(user1.__ac_local_roles__.get('GROUP1'), ['Unknown'])
user2 = self.portal.person_module.newContent(portal_type='Person',
reference='user2')
user2_id = user2.Person_getUserId()
user2.newContent(portal_type='Assignment', group='g1').open()
user2.updateLocalRolesOnSecurityGroups()
self.assertEqual(user2.__ac_local_roles__.get('user2'), ['Auditor'])
self.assertEqual(user2.__ac_local_roles__.get(user2_id), ['Auditor'])
self.assertEqual(user2.__ac_local_roles__.get('GROUP1'), ['Unknown'])
self.tic()
# security_uid_dict in catalog contains entries for user1 and user2:
user1_alternate_security_uid = sql_catalog.security_uid_dict[
('Alternate', ('user:user1', 'user:user1:Auditor'))]
('Alternate', ('user:' + user1_id, 'user:' + user1_id + ':Auditor'))]
user2_alternate_security_uid = sql_catalog.security_uid_dict[
('Alternate', ('user:user2', 'user:user2:Auditor'))]
('Alternate', ('user:' + user2_id, 'user:' + user2_id + ':Auditor'))]
# those entries are in alternate security table
alternate_roles_and_users = sql_connection.manage_test(
......
......@@ -62,10 +62,9 @@ class PersonConfiguratorItem(XMLObject, ConfiguratorItemMixin):
def _checkConsistency(self, fixit=False, filter=None, **kw):
error_list = []
person = self.portal_catalog.getResultValue(reference=self.getReference(),
portal_type="Person")
if person is None:
error_list.append("Person %s should be created" % self.getReference())
person_list = self.acl_users.searchUsers(id=self.Person_getUserId(), exact_match=True)
if not person_list:
error_list.append("Person %s should be created" % self.Person_getUserId())
if fixit:
person_module = self.getPortalObject().person_module
person = person_module.newContent(portal_type="Person")
......
......@@ -1130,7 +1130,7 @@ class TestDocument(TestDocumentMixin):
# create test Person objects and add pseudo local security
person1 = self.createUser(reference='user1')
person1.setTitle('Another Contributor')
portal.document_module.manage_setLocalRoles('user1', ['Assignor',])
portal.document_module.manage_setLocalRoles(person1.Person_getUserId(), ['Assignor',])
self.tic()
# login as another user
......@@ -1663,9 +1663,9 @@ class TestDocument(TestDocumentMixin):
# create Person objects and add pseudo local security
person1 = self.createUser(reference='contributor1')
document_module.manage_setLocalRoles('contributor1', ['Assignor',])
document_module.manage_setLocalRoles(person1.Person_getUserId(), ['Assignor',])
person2 = self.createUser(reference='contributor2')
document_module.manage_setLocalRoles('contributor2', ['Assignor',])
document_module.manage_setLocalRoles(person2.Person_getUserId(), ['Assignor',])
self.tic()
# login as first one
......
......@@ -1460,11 +1460,11 @@ class TestIngestion(ERP5TypeTestCase):
dict(group='anybody',
function='musician/wind/saxophone',
site='arctic/spitsbergen'))
portal.document_module.manage_setLocalRoles('contributor1', ['Assignor',])
portal.document_module.manage_setLocalRoles(user.Person_getUserId(), ['Assignor',])
self.tic()
file_object = makeFileUpload('TEST-en-002.doc')
document = contribution_tool.newContent(file=file_object)
document.discoverMetadata(document.getFilename(), 'contributor1')
document.discoverMetadata(document.getFilename(), user.Person_getUserId())
self.tic()
self.assertEqual(document.getFilename(), 'TEST-en-002.doc')
self.assertEqual('anybody', document.getGroup())
......@@ -1484,18 +1484,18 @@ class TestIngestion(ERP5TypeTestCase):
self.createUserAssignment(other_user, dict(group='anybody/a1',))
self.createUserAssignment(other_user, dict(group='anybody/a2',))
portal.document_module.manage_setLocalRoles('contributor2', ['Assignor',])
portal.document_module.manage_setLocalRoles(other_user.Person_getUserId(), ['Assignor',])
self.tic()
file_object = makeFileUpload('TEST-en-002.doc')
document = contribution_tool.newContent(file=file_object)
# We only consider the higher group of assignments
document.discoverMetadata(document.getFilename(), user.getReference())
document.discoverMetadata(document.getFilename(), user.Person_getUserId())
self.tic()
self.assertEqual(document.getFilename(), 'TEST-en-002.doc')
self.assertEqual(['anybody'], document.getGroupList())
document.discoverMetadata(document.getFilename(), other_user.getReference())
document.discoverMetadata(document.getFilename(), other_user.Person_getUserId())
self.assertEqual(['anybody/a1', 'anybody/a2'], document.getGroupList())
def test_IngestionConfigurationByTypeBasedMethod_usecase1(self):
......
......@@ -43,7 +43,7 @@ class TestPackaging(testTioSafeMixin):
zodb_roles = self.portal.acl_users.zodb_roles
for role in user_roles:
if role != 'Member':
zodb_roles.assignRoleToPrincipal(role, user_name)
zodb_roles.assignRoleToPrincipal(role, person.Person_getUserId())
def loginAsUser(self, user_id):
"""Login with a given user_id """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment