Commit a57e0e2f authored by Rafael Monnerat's avatar Rafael Monnerat

ERP5Security: Allow user to login with a user created on the same transaction

See merge request nexedi/erp5!1759
parents bf89c93a 7827c262
Pipeline #27382 failed with stage
in 0 seconds
...@@ -35,6 +35,7 @@ from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin ...@@ -35,6 +35,7 @@ from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.utils import classImplements from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlugin from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlugin
from Products.PluggableAuthService.interfaces.plugins import IUserEnumerationPlugin from Products.PluggableAuthService.interfaces.plugins import IUserEnumerationPlugin
from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
from DateTime import DateTime from DateTime import DateTime
from Products import ERP5Security from Products import ERP5Security
from AccessControl import SpecialUsers from AccessControl import SpecialUsers
...@@ -110,21 +111,9 @@ class ERP5LoginUserManager(BasePlugin): ...@@ -110,21 +111,9 @@ class ERP5LoginUserManager(BasePlugin):
if login_value is None: if login_value is None:
return return
user_value = login_value.getParentValue() user_value = login_value.getParentValue()
if not user_value.hasUserId(): if not self._isUserValueValid(user_value):
return return
if user_value.getValidationState() == 'deleted':
return
if user_value.getPortalType() in ('Person', ):
now = DateTime()
for assignment in user_value.contentValues(portal_type="Assignment"):
if assignment.getValidationState() == "open" and (
not assignment.hasStartDate() or assignment.getStartDate() <= now
) and (
not assignment.hasStopDate() or assignment.getStopDate() >= now
):
break
else:
return
is_authentication_policy_enabled = self.getPortalObject().portal_preferences.isAuthenticationPolicyEnabled() is_authentication_policy_enabled = self.getPortalObject().portal_preferences.isAuthenticationPolicyEnabled()
if check_password: if check_password:
password = credentials.get('password') password = credentials.get('password')
...@@ -147,6 +136,27 @@ class ERP5LoginUserManager(BasePlugin): ...@@ -147,6 +136,27 @@ class ERP5LoginUserManager(BasePlugin):
return return
return (user_value.getUserId(), login_value.getReference()) return (user_value.getUserId(), login_value.getReference())
def _isUserValueValid(self, user_value):
if not user_value.hasUserId():
return
if user_value.getValidationState() == 'deleted':
return
if user_value.getPortalType() in ('Person', ):
now = DateTime()
for assignment in user_value.contentValues(portal_type="Assignment"):
if assignment.getValidationState() == "open" and (
not assignment.hasStartDate() or assignment.getStartDate() <= now
) and (
not assignment.hasStopDate() or assignment.getStopDate() >= now
):
return True
else:
return
return True
def _getLoginValueFromLogin(self, login, login_portal_type=None): def _getLoginValueFromLogin(self, login, login_portal_type=None):
try: try:
user_list = self.enumerateUsers( user_list = self.enumerateUsers(
...@@ -283,6 +293,34 @@ class ERP5LoginUserManager(BasePlugin): ...@@ -283,6 +293,34 @@ class ERP5LoginUserManager(BasePlugin):
} }
for user in user_list if user['user_id'] for user in user_list if user['user_id']
] ]
tv = getTransactionalVariable()
user_value = tv.get("transactional_user", None)
if user_value is not None and self._isUserValueValid(user_value):
login_value_list = [l for l in user_value.objectValues(login_portal_type)
if l.getValidationState() == 'validated' and l.getPassword() is not None]
if (login is not None and login in [(i.getReference(),) for i in login_value_list]) or \
(id is not None and user_value.getUserId() == id[0] and login_value_list):
result.append({
'id': user_value.getUserId(),
# Note: PAS forbids us from returning more than one entry per given id,
# so take any available login.
'login': login_value_list[0].getReference(),
'pluginid': plugin_id,
# Extra properties, specific to ERP5
'path': user_value.getPath(),
'uid': user_value.getUid(),
'login_list': [
{
'reference': login_value.getReference(),
'path': login_value.getRelativeUrl(),
'uid': login_value.getPath(),
} for login_value in login_value_list
],
})
for special_user_name in special_user_name_set: for special_user_name in special_user_name_set:
# Note: special users are a bastard design in Zope: they are expected to # Note: special users are a bastard design in Zope: they are expected to
# have a user name (aka, a login), but no id (aka, they do not exist as # have a user name (aka, a login), but no id (aka, they do not exist as
......
...@@ -47,6 +47,8 @@ from zope.interface.verify import verifyClass ...@@ -47,6 +47,8 @@ from zope.interface.verify import verifyClass
from DateTime import DateTime from DateTime import DateTime
from Products import ERP5Security from Products import ERP5Security
from Products.ERP5Type.Core.Workflow import ValidationFailed from Products.ERP5Type.Core.Workflow import ValidationFailed
from Products.ERP5Type.TransactionalVariable import getTransactionalVariable
AUTO_LOGIN = object() AUTO_LOGIN = object()
...@@ -76,15 +78,16 @@ class UserManagementTestCase(ERP5TypeTestCase): ...@@ -76,15 +78,16 @@ class UserManagementTestCase(ERP5TypeTestCase):
def getUserFolder(self): def getUserFolder(self):
"""Returns the acl_users. """ """Returns the acl_users. """
return self.getPortal().acl_users return self.portal.acl_users
def loginAsUser(self, username): def loginAsUser(self, username):
uf = self.portal.acl_users uf = self.getUserFolder()
user = uf.getUserById(username).__of__(uf) user = uf.getUserById(username).__of__(uf)
newSecurityManager(None, user) newSecurityManager(None, user)
def _makePerson(self, login=AUTO_LOGIN, open_assignment=1, assignment_start_date=None, def _makePerson(self, login=AUTO_LOGIN, open_assignment=1, assignment_start_date=None,
assignment_stop_date=None, tic=True, password='secret', group_value=None, **kw): assignment_stop_date=None, tic=True, password='secret', group_value=None,
set_transactional_user=False, **kw):
"""Creates a person in person module, and returns the object, after """Creates a person in person module, and returns the object, after
indexing is done. """ indexing is done. """
person_module = self.getPersonModule() person_module = self.getPersonModule()
...@@ -104,6 +107,8 @@ class UserManagementTestCase(ERP5TypeTestCase): ...@@ -104,6 +107,8 @@ class UserManagementTestCase(ERP5TypeTestCase):
reference=login, reference=login,
password=password, password=password,
).validate() ).validate()
if set_transactional_user:
getTransactionalVariable()["transactional_user"] = new_person
if tic: if tic:
self.tic() self.tic()
return new_person.Person_getUserId(), login, password return new_person.Person_getUserId(), login, password
...@@ -456,6 +461,72 @@ class TestUserManagement(UserManagementTestCase): ...@@ -456,6 +461,72 @@ class TestUserManagement(UserManagementTestCase):
self.tic() self.tic()
self.assertEqual(None, person.Person_getUserId()) self.assertEqual(None, person.Person_getUserId())
def test_UnindexedPersonIsNotUser(self):
user_id, login, password = self._makePerson(tic=False)
self._assertUserDoesNotExists(login, password)
self.tic()
self._assertUserExists(login, password)
def test_TransactionalPersonWithLoginPasswordAreUsers(self):
"""Tests a person created on same transaction with a login & password
is a valid user if you set transactional variable."""
_, login, password = self._makePerson(tic=0, set_transactional_user=True)
self._assertUserExists(login, password)
def test_TransactionalPersonLoginCaseSensitive(self):
"""Login/password are case sensitive."""
login = 'case_test_user'
_, _, password = self._makePerson(login=login, tic=0, set_transactional_user=True)
self._assertUserExists(login, password)
self._assertUserDoesNotExists('case_test_User', password)
def test_TransactionalPersonLoginNonAscii(self):
"""Login can contain non ascii chars."""
login = 'j\xc3\xa9'
_, _, password = self._makePerson(login=login, tic=0, set_transactional_user=True)
self._assertUserExists(login, password)
def test_TransactionalPersonWithLoginWithNonePasswordAreNotUsers(self):
"""Tests a person created on same transaction with a login but None as
a password is not a valid user."""
# check password set to None at creation
_, login, _ = self._makePerson(password=None, tic=0, set_transactional_user=True)
self._assertUserDoesNotExists(login, None)
self._assertUserDoesNotExists(login, 'None')
self._assertUserDoesNotExists(login, '')
def test_TransactionalPersonWithLoginWithEmptyStringPasswordAreNotUsers(self):
"""Tests a person created on samea transaction with a login but no password
is not a valid user."""
_, login, _ = self._makePerson(password='', tic=0, set_transactional_user=True)
self._assertUserDoesNotExists(login, '')
self._assertUserDoesNotExists(login, 'None')
def test_TransactionalPersonWithLoginWithoutPasswordAreNotUsers(self):
"""Tests a person created on same transaction with a login but
no password set is not a valid user."""
# similar to _makePerson, but not passing password= to newContent
login = 'login_%s' % self._login_generator()
new_person = self.portal.person_module.newContent(portal_type='Person')
new_person.newContent(portal_type='Assignment').open()
new_person.newContent(
portal_type='ERP5 Login',
reference=login,
).validate()
getTransactionalVariable()['transactional_user'] = new_person
self._assertUserDoesNotExists(login, '')
self._assertUserDoesNotExists(login, 'None')
def test_TransactionalOrganisationAreNotUsers(self):
"""Tests a organisation as transactional user fails to login."""
# similar to _makePerson, but not passing password= to newContent
login = 'login_%s' % self._login_generator()
organisation = self.portal.organisation_module.newContent(
portal_type='Organisation', reference=login)
getTransactionalVariable()['transactional_user'] = organisation
# Just to check that fails
self.assertRaises(AttributeError, self._assertUserDoesNotExists, login, '')
class DuplicatePrevention(UserManagementTestCase): class DuplicatePrevention(UserManagementTestCase):
def test_MultipleUsers(self): def test_MultipleUsers(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment