Commit ee05c9c4 authored by Jérome Perrin's avatar Jérome Perrin

Cleanup google and facebook login and cookie management

First,  cleanup `testERPSecurity`, moving related tests in sub-classes instead of big classes with lots of tests (diff is big, because methods are moved around)

Change google and facebook login to reuse `portal.setAuthCookie`, which is the central point to set a cookie securely so that it can be used for authentication.

Refactor management of oauth keys to fix [#20181121-1A36AE2](https://nexedi.erp5.net/bug_module/20181121-1A36AE2).

Some minor fixes in business template definition.

/reviewed-on !803
parents bab963e4 05deeb26
import facebook
from ZTUtils import make_query
from Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin import getFacebookUserEntry
def _getFacebookClientIdAndSecretKey(portal, reference="default"):
"""Returns facebook client id and secret key.
Internal function.
"""
result_list = portal.portal_catalog.unrestrictedSearchResults(
portal_type="Facebook Connector",
reference=reference,
validation_state="validated",
limit=2,
)
assert result_list, "Facebook Connector not found"
if len(result_list) == 2:
raise ValueError("Impossible to select one Facebook Connector")
facebook_connector = result_list[0]
return facebook_connector.getClientId(), facebook_connector.getSecretKey()
def redirectToFacebookLoginPage(self, came_from=None):
client_id, _ = _getFacebookClientIdAndSecretKey(self.getPortalObject())
query = make_query({
# Call at he context of the appropriate web_service.
'client_id': client_id,
'redirect_uri': "{0}/ERP5Site_callbackFacebookLogin".format(came_from or self.absolute_url()),
'scope': 'email'
})
return self.REQUEST.RESPONSE.redirect("https://www.facebook.com/v2.10/dialog/oauth?{}".format(query))
def getAccessTokenFromCode(self, code, redirect_uri):
client_id, secret_key = self.ERP5Site_getFacebookClientIdAndSecretKey()
client_id, secret_key = _getFacebookClientIdAndSecretKey(self.getPortalObject())
return facebook.GraphAPI(version="2.7").get_access_token_from_code(
code=code, redirect_uri=redirect_uri,
app_id=client_id, app_secret=secret_key)
......
import time
request = container.REQUEST
response = request.RESPONSE
def handleError(error):
context.Base_redirect(
'login_form',
......@@ -21,7 +24,7 @@ elif code is not None:
access_token = response_dict['access_token'].encode('utf-8')
hash_str = context.Base_getHMAC(access_token, access_token)
context.REQUEST.RESPONSE.setCookie('__ac_facebook_hash', hash_str, path='/')
context.setAuthCookie(response, '__ac_facebook_hash', hash_str)
# store timestamp in second since the epoch in UTC is enough
response_dict["response_timestamp"] = time.time()
......@@ -45,7 +48,7 @@ elif code is not None:
# https://developers.facebook.com/support/bugs/318390728250352/?disable_redirect=0
# https://stackoverflow.com/questions/7131909/facebook-callback-appends-to-return-url/33257076#33257076
# https://lab.nexedi.com/nexedi/erp5/merge_requests/417#note_64365
came_from = context.REQUEST.get("came_from", portal.absolute_url() + "#")
return context.REQUEST.RESPONSE.redirect(came_from)
came_from = request.get("came_from", portal.absolute_url() + "#")
return response.redirect(came_from)
return handleError('')
if REQUEST is not None:
raise ValueError("This script can't be called in the URL")
result_list = context.getPortalObject().portal_catalog(
portal_type="Facebook Connector",
reference=reference,
validation_state="validated",
limit=2,
)
assert result_list, "Facebook Connector not found"
if len(result_list) == 2:
raise ValueError("Impossible to select one Facebook Connector")
facebook_connector = result_list[0]
return facebook_connector.getClientId(), facebook_connector.getSecretKey()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>reference="default", REQUEST=None</string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getFacebookClientIdAndSecretKey</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
from ZTUtils import make_query
client_id, _ = context.ERP5Site_getFacebookClientIdAndSecretKey()
query = make_query({
# Call at he context of the appropriate web_service.
'client_id': client_id,
'redirect_uri': "{0}/ERP5Site_callbackFacebookLogin".format(came_from or context.absolute_url()),
'scope': 'email'
})
login_url = "https://www.facebook.com/v2.10/dialog/oauth"
if "?" not in login_url:
login_url += "?"
return context.REQUEST.RESPONSE.redirect("{0}{1}".format(login_url, query))
......@@ -2,68 +2,26 @@
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
<global name="ExternalMethod" module="Products.ExternalMethod.ExternalMethod"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
<key> <string>_function</string> </key>
<value> <string>redirectToFacebookLoginPage</string> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>came_from=None</string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Auditor</string>
</tuple>
</value>
<key> <string>_module</string> </key>
<value> <string>FacebookLoginUtility</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_redirectToFacebookLoginPage</string> </value>
</item>
<item>
<key> <string>title</string> </key>
<value> <string></string> </value>
</item>
</dictionary>
</pickle>
</record>
......
......@@ -74,7 +74,6 @@ class TestFacebookLogin(ERP5TypeTestCase):
FacebookLoginUtility.getUserEntry = getUserEntry
self.dummy_connector_id = "test_facebook_connector"
person_module = self.portal.person_module
portal_catalog = self.portal.portal_catalog
for obj in portal_catalog(portal_type=["Facebook Login", "Person"],
reference=getUserId(None),
......@@ -112,6 +111,17 @@ class TestFacebookLogin(ERP5TypeTestCase):
self.assertNotIn("secret_key=", location)
self.assertIn("ERP5Site_callbackFacebookLogin", location)
def test_auth_cookie(self):
request = self.portal.REQUEST
response = request.RESPONSE
# (the secure flag is only set if we accessed through https)
request.setServerURL('https', 'example.com')
self.portal.ERP5Site_callbackFacebookLogin(code=CODE)
ac_cookie, = [v for (k, v) in response.listHeaders() if k.lower() == 'set-cookie' and '__ac_facebook_hash=' in v]
self.assertIn('; Secure', ac_cookie)
self.assertIn('; HTTPOnly', ac_cookie)
def test_create_user_in_ERP5Site_createFacebookUserToOAuth(self):
"""
Check if ERP5 set cookie properly after receive code from external service
......
......@@ -45,9 +45,7 @@
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W: 77, 4: Unused variable \'person_module\' (unused-variable)</string>
</tuple>
<tuple/>
</value>
</item>
<item>
......
......@@ -5,8 +5,27 @@ from Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin import getGoogleUs
SCOPE_LIST = ['https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/userinfo.email']
def _getGoogleClientIdAndSecretKey(portal, reference="default"):
"""Returns google client id and secret key.
Internal function.
"""
result_list = portal.portal_catalog.unrestrictedSearchResults(
portal_type="Google Connector",
reference=reference,
validation_state="validated",
limit=2,
)
assert result_list, "Google Connector not found"
if len(result_list) == 2:
raise ValueError("Impossible to select one Google Connector")
google_connector = result_list[0].getObject()
return google_connector.getClientId(), google_connector.getSecretKey()
def redirectToGoogleLoginPage(self):
client_id, secret_key = self.ERP5Site_getGoogleClientIdAndSecretKey()
client_id, secret_key = _getGoogleClientIdAndSecretKey(self.getPortalObject())
flow = oauth2client.client.OAuth2WebServerFlow(
client_id=client_id,
client_secret=secret_key,
......@@ -18,8 +37,7 @@ def redirectToGoogleLoginPage(self):
self.REQUEST.RESPONSE.redirect(flow.step1_get_authorize_url())
def getAccessTokenFromCode(self, code, redirect_uri):
portal = self.getPortalObject()
client_id, secret_key = portal.ERP5Site_getGoogleClientIdAndSecretKey()
client_id, secret_key = _getGoogleClientIdAndSecretKey(self.getPortalObject())
flow = oauth2client.client.OAuth2WebServerFlow(
client_id=client_id,
client_secret=secret_key,
......
if REQUEST is not None:
raise ValueError("This script can't be called in the URL")
result_list = context.getPortalObject().portal_catalog(
portal_type="Google Connector",
reference=reference,
validation_state="validated",
limit=2,
)
assert result_list, "Google Connector not found"
if len(result_list) == 2:
raise ValueError("Impossible to select one Google Connector")
google_connector = result_list[0].getObject()
return google_connector.getClientId(), google_connector.getSecretKey()
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>reference="default", REQUEST=None</string> </value>
</item>
<item>
<key> <string>_proxy_roles</string> </key>
<value>
<tuple>
<string>Manager</string>
</tuple>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>ERP5Site_getGoogleClientIdAndSecretKey</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
import time
request = container.REQUEST
response = request.RESPONSE
def handleError(error):
context.Base_redirect(
'login_form',
......@@ -19,7 +22,7 @@ elif code is not None:
if response_dict is not None:
access_token = response_dict['access_token'].encode('utf-8')
hash_str = context.Base_getHMAC(access_token, access_token)
context.REQUEST.RESPONSE.setCookie('__ac_google_hash', hash_str, path='/')
context.setAuthCookie(response, '__ac_google_hash', hash_str)
# store timestamp in second since the epoch in UTC is enough
response_dict["response_timestamp"] = time.time()
context.Base_setBearerToken(hash_str,
......@@ -33,7 +36,6 @@ elif code is not None:
method = getattr(context, "ERP5Site_createGoogleUserToOAuth", None)
if method is not None:
method(user_reference, user_dict)
return context.REQUEST.RESPONSE.redirect(
context.REQUEST.get("came_from") or context.absolute_url())
return response.redirect(request.get("came_from") or context.absolute_url())
return handleError('')
......@@ -111,7 +111,6 @@ class TestGoogleLogin(ERP5TypeTestCase):
GoogleLoginUtility.getUserEntry = getUserEntry
self.dummy_connector_id = "test_google_connector"
person_module = self.portal.person_module
portal_catalog = self.portal.portal_catalog
for obj in portal_catalog(portal_type=["Google Login", "Person"],
reference=getUserId(None),
......@@ -149,6 +148,17 @@ class TestGoogleLogin(ERP5TypeTestCase):
self.assertNotIn("secret_key=", location)
self.assertIn("ERP5Site_receiveGoogleCallback", location)
def test_auth_cookie(self):
request = self.portal.REQUEST
response = request.RESPONSE
# (the secure flag is only set if we accessed through https)
request.setServerURL('https', 'example.com')
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
ac_cookie, = [v for (k, v) in response.listHeaders() if k.lower() == 'set-cookie' and '__ac_google_hash=' in v]
self.assertIn('; Secure', ac_cookie)
self.assertIn('; HTTPOnly', ac_cookie)
def test_create_user_in_ERP5Site_createGoogleUserToOAuth(self):
"""
Check if ERP5 set cookie properly after receive code from external service
......
TemplateToolERP5GoogleExtractionPluginConstraint
\ No newline at end of file
......@@ -48,14 +48,13 @@ from Products.DCWorkflow.DCWorkflow import ValidationFailed
AUTO_LOGIN = object()
class TestUserManagement(ERP5TypeTestCase):
"""Tests User Management in ERP5Security.
class UserManagementTestCase(ERP5TypeTestCase):
"""TestCase for user manement, with utilities to create users and helpers
assertion methods.
"""
_login_generator = itertools.count().next
def getTitle(self):
"""Title of the test."""
return "ERP5Security: User Management"
def getBusinessTemplateList(self):
"""List of BT to install. """
......@@ -68,37 +67,10 @@ class TestUserManagement(ERP5TypeTestCase):
self.getPersonModule().objectIds()])
self.tic()
def login(self):
uf = self.getUserFolder()
uf._doAddUser('alex', '', ['Manager', 'Assignee', 'Assignor',
'Associate', 'Auditor', 'Author'], [])
user = uf.getUserById('alex').__of__(uf)
newSecurityManager(None, user)
def getUserFolder(self):
"""Returns the acl_users. """
return self.getPortal().acl_users
def test_GroupManagerInterfaces(self):
"""Tests group manager plugin respects interfaces."""
# XXX move to GroupManager test class
from Products.PluggableAuthService.interfaces.plugins import IGroupsPlugin
from Products.ERP5Security.ERP5GroupManager import ERP5GroupManager
verifyClass(IGroupsPlugin, ERP5GroupManager)
def test_UserManagerInterfaces(self):
"""Tests user manager plugin respects interfaces."""
from Products.PluggableAuthService.interfaces.plugins import\
IAuthenticationPlugin, IUserEnumerationPlugin
from Products.ERP5Security.ERP5UserManager import ERP5UserManager
verifyClass(IAuthenticationPlugin, ERP5UserManager)
verifyClass(IUserEnumerationPlugin, ERP5UserManager)
def test_UserFolder(self):
"""Tests user folder has correct meta type."""
self.assertTrue(isinstance(self.getUserFolder(),
PluggableAuthService.PluggableAuthService))
def loginAsUser(self, username):
uf = self.portal.acl_users
user = uf.getUserById(username).__of__(uf)
......@@ -203,6 +175,62 @@ class TestUserManagement(ERP5TypeTestCase):
self.tic()
return dummy_document
class RoleManagementTestCase(UserManagementTestCase):
"""Test case with required configuration to test role definitions.
"""
def afterSetUp(self):
"""Initialize requirements of security configuration.
"""
super(RoleManagementTestCase, self).afterSetUp()
# create a security configuration script
skin_folder = self.portal.portal_skins.custom
if 'ERP5Type_getSecurityCategoryMapping' not in skin_folder.objectIds():
createZODBPythonScript(
skin_folder, 'ERP5Type_getSecurityCategoryMapping', '',
"""return ((
'ERP5Type_getSecurityCategoryFromAssignment',
context.getPortalObject().getPortalAssignmentBaseCategoryList()
),)
""")
# configure group, site, function categories
category_tool = self.getCategoryTool()
for bc in ['group', 'site', 'function']:
base_cat = category_tool[bc]
code = bc[0].upper()
if base_cat.get('subcat', None) is not None:
continue
base_cat.newContent(portal_type='Category',
id='subcat',
codification="%s1" % code)
base_cat.newContent(portal_type='Category',
id='another_subcat',
codification="%s2" % code)
self.defined_category = "group/subcat\n"\
"site/subcat\n"\
"function/subcat"
def beforeTearDown(self):
"""Clean up after test.
"""
# clear base categories
for bc in ['group', 'site', 'function']:
base_cat = self.getCategoryTool()[bc]
base_cat.manage_delObjects(list(base_cat.objectIds()))
# clear role definitions
for ti in self.getTypesTool().objectValues():
ti.manage_delObjects([x.id for x in ti.getRoleInformationList()])
# clear modules
for module in self.portal.objectValues():
if module.getId().endswith('_module'):
module.manage_delObjects(list(module.objectIds()))
# commit this
self.tic()
class TestUserManagement(UserManagementTestCase):
"""Tests User Management in ERP5Security.
"""
def test_PersonWithLoginPasswordAreUsers(self):
"""Tests a person with a login & password is a valid user."""
_, login, password = self._makePerson()
......@@ -311,46 +339,44 @@ class TestUserManagement(ERP5TypeTestCase):
"""Tests one cannot use the "system user" special login."""
self._testUserNameExistsButCannotLoginAndCannotCreate(SpecialUsers.system.getUserName())
def test_searchUserId(self):
substring = 'person_id'
user_id_set = {substring + '1', '1' + substring}
for user_id in user_id_set:
self._makePerson(user_id=user_id)
self.assertEqual(
user_id_set,
{x['userid'] for x in self.portal.acl_users.searchUsers(id=substring, exact_match=False)},
)
def test_DeletedPersonIsNotUser(self):
user_id, login, password = self._makePerson()
self._assertUserExists(login, password)
acl_user, = self.portal.acl_users.searchUsers(id=user_id, exact_match=True)
self.portal.restrictedTraverse(acl_user['path']).delete()
self.commit()
self._assertUserDoesNotExists(login, password)
def test_searchLogin(self):
substring = 'person_login'
login_set = {substring + '1', '1' + substring}
for login in login_set:
self._makePerson(login=login)
self.assertEqual(
login_set,
{x['login'] for x in self.portal.acl_users.searchUsers(login=substring, exact_match=False)},
)
def test_ReallyDeletedPersonIsNotUser(self):
user_id, login, password = self._makePerson()
acl_user, = self.portal.acl_users.searchUsers(id=user_id, exact_match=True)
p = self.portal.restrictedTraverse(acl_user['path'])
self._assertUserExists(login, password)
p.getParentValue().deleteContent(p.getId())
self.commit()
self._assertUserDoesNotExists(login, password)
def test_searchUsersIdExactMatch(self):
substring = 'person2_id'
self._makePerson(user_id=substring)
self._makePerson(user_id=substring + '1')
self._makePerson(user_id='1' + substring)
self.assertEqual(
[substring],
[x['userid'] for x in self.portal.acl_users.searchUsers(id=substring, exact_match=True)],
)
def test_InvalidatedPersonIsUser(self):
user_id, login, password = self._makePerson()
acl_user, = self.portal.acl_users.searchUsers(id=user_id, exact_match=True)
p = self.portal.restrictedTraverse(acl_user['path'])
self._assertUserExists(login, password)
p.validate()
p.invalidate()
self.commit()
self._assertUserExists(login, password)
def test_UserIdIsPossibleToUnset(self):
"""Make sure that it is possible to remove user id"""
user_id, login, password = self._makePerson()
acl_user, = self.portal.acl_users.searchUsers(id=user_id, exact_match=True)
person = self.portal.restrictedTraverse(acl_user['path'])
person.setUserId(None)
self.tic()
self.assertEqual(None, person.Person_getUserId())
def test_searchUsersLoginExactMatch(self):
substring = 'person2_login'
self._makePerson(login=substring)
self._makePerson(login=substring + '1')
self._makePerson(login='1' + substring)
self.assertEqual(
[substring],
[x['login'] for x in self.portal.acl_users.searchUsers(login=substring, exact_match=True)],
)
class DuplicatePrevention(UserManagementTestCase):
def test_MultipleUsers(self):
"""Tests that it's refused to create two Persons with same user id."""
user_id, login, _ = self._makePerson()
......@@ -422,6 +448,50 @@ class TestUserManagement(ERP5TypeTestCase):
user_id,
)
def test_duplicateLoginReference(self):
_, login1, _ = self._makePerson()
_, login2, _ = self._makePerson()
pas_user2, = self.portal.acl_users.searchUsers(login=login2, exact_match=True)
pas_login2, = pas_user2['login_list']
login2_value = self.portal.restrictedTraverse(pas_login2['path'])
login2_value.invalidate()
login2_value.setReference(login1)
self.commit()
self.assertRaises(ValidationFailed, login2_value.validate)
self.assertRaises(ValidationFailed, self.portal.portal_workflow.doActionFor, login2_value, 'validate_action')
def _duplicateLoginReference(self, commit):
_, login1, _ = self._makePerson(tic=False)
user_id2, login2, _ = self._makePerson(tic=False)
if commit:
self.commit()
# Note: cannot rely on catalog, on purpose.
person_value, = [
x for x in self.portal.person_module.objectValues()
if x.Person_getUserId() == user_id2
]
login_value, = [
x for x in person_value.objectValues(portal_type='ERP5 Login')
if x.getReference() == login2
]
login_value.invalidate()
login_value.setReference(login1)
self.portal.portal_workflow.doActionFor(login_value, 'validate_action')
result = [x for x in self.portal.portal_catalog(portal_type='ERP5 Login') if x.checkConsistency()]
self.assertEqual(result, [])
self.tic()
result = [x for x in self.portal.portal_catalog(portal_type='ERP5 Login') if x.checkConsistency()]
self.assertEqual(len(result), 2)
def test_duplicateLoginReferenceInSameTransaction(self):
self._duplicateLoginReference(False)
def test_duplicateLoginReferenceInAnotherTransaction(self):
self._duplicateLoginReference(True)
class TestPreferences(UserManagementTestCase):
def test_Preference_created_for_new_user_on_getActiveUserPreference(self):
# Creating a user will create a preference on the first time `getActiveUserPreference`
# is called
......@@ -482,8 +552,93 @@ class TestUserManagement(ERP5TypeTestCase):
# password is not stored in plain text
self.assertNotEquals(new_password, self.portal.restrictedTraverse(pas_user['path']).getPassword())
class TestAssignmentAndRoles(UserManagementTestCase):
def test_AssignmentWithDate(self):
"""Tests a person with an assignment with correct date is a valid user."""
date = DateTime()
_, login, password = self._makePerson(
assignment_start_date=date - 5,
assignment_stop_date=date + 5,
)
self._assertUserExists(login, password)
def test_AssignmentWithBadStartDate(self):
"""Tests a person with an assignment with bad start date is not a valid user."""
date = DateTime()
_, login, password = self._makePerson(
assignment_start_date=date + 1,
assignment_stop_date=date + 5,
)
self._assertUserDoesNotExists(login, password)
def test_AssignmentWithBadStopDate(self):
"""Tests a person with an assignment with bad stop date is not a valid user."""
date = DateTime()
_, login, password = self._makePerson(
assignment_start_date=date - 5,
assignment_stop_date=date - 1,
)
self._assertUserDoesNotExists(login, password)
def test_securityGroupAssignmentCorrectDate(self):
"""
Tests a person with an assignment with correct date
gets correctly assigned to security groups.
"""
date = DateTime()
user_id, login, password = self._makePerson(
assignment_start_date=date - 5,
assignment_stop_date=date + 5,
group_value=self._getOrCreateGroupValue()
)
self.assertIn(
'Assignee',
self.portal.acl_users.getUserById(user_id).\
getRolesInContext(self._createDummyDocument())
)
def test_securityGroupAssignmentBadStartDate(self):
"""
Tests a person with an assignment with bad (future) start date
does not get assigned to security groups.
"""
date = DateTime()
user_id, login, password = self._makePerson(
assignment_start_date=date + 1,
assignment_stop_date=date + 5,
group_value=self._getOrCreateGroupValue()
)
self.assertNotIn(
'Assignee',
self.portal.acl_users.getUserById(user_id).\
getRolesInContext(self._createDummyDocument())
)
def test_securityGroupAssignmentBadStopDate(self):
"""
Tests a person with an assignment with bad (past) stop date
does not get assigned to security groups.
"""
date = DateTime()
user_id, login, password = self._makePerson(
assignment_start_date=date - 5,
assignment_stop_date=date - 1,
group_value=self._getOrCreateGroupValue()
)
self.assertNotIn(
'Assignee',
self.portal.acl_users.getUserById(user_id).\
getRolesInContext(self._createDummyDocument())
)
class TestPermissionCache(UserManagementTestCase):
def test_OpenningAssignmentClearCache(self):
"""Openning an assignment for a person clear the cache automatically."""
"""Openning an assignment for a person clear the cache automatically.
XXX this works only on a single zope and not on a ZEO cluster.
"""
user_id, login, password = self._makePerson(open_assignment=0)
self._assertUserDoesNotExists(login, password)
user, = self.portal.acl_users.searchUsers(id=user_id, exact_match=True)
......@@ -496,22 +651,89 @@ class TestUserManagement(ERP5TypeTestCase):
self.commit()
self._assertUserDoesNotExists(login, password)
def test_PersonNotIndexedNotCached(self):
user_id, login, password = self._makePerson(tic=False)
# not indexed yet
self._assertUserDoesNotExists(login, password)
self.tic()
self._assertUserExists(login, password)
def test_PersonNotIndexedNotCached(self):
user_id, login, password = self._makePerson(tic=False)
# not indexed yet
self._assertUserDoesNotExists(login, password)
self.tic()
self._assertUserExists(login, password)
def test_PersonNotValidNotCached(self):
user_id, login, password = self._makePerson()
password += '2'
pas_user, = self.portal.acl_users.searchUsers(login=login, exact_match=True)
pas_login, = pas_user['login_list']
self._assertUserDoesNotExists(login, password)
self.portal.restrictedTraverse(pas_login['path']).setPassword(password)
self._assertUserExists(login, password)
class TestPASAPI(UserManagementTestCase):
"""Test low level PAS API works as expected.
"""
def test_GroupManagerInterfaces(self):
"""Tests group manager plugin respects interfaces."""
from Products.PluggableAuthService.interfaces.plugins import IGroupsPlugin
from Products.ERP5Security.ERP5GroupManager import ERP5GroupManager
verifyClass(IGroupsPlugin, ERP5GroupManager)
def test_UserManagerInterfaces(self):
"""Tests user manager plugin respects interfaces."""
from Products.PluggableAuthService.interfaces.plugins import\
IAuthenticationPlugin, IUserEnumerationPlugin
from Products.ERP5Security.ERP5UserManager import ERP5UserManager
verifyClass(IAuthenticationPlugin, ERP5UserManager)
verifyClass(IUserEnumerationPlugin, ERP5UserManager)
def test_UserFolder(self):
"""Tests user folder has correct meta type."""
self.assertTrue(isinstance(self.getUserFolder(),
PluggableAuthService.PluggableAuthService))
def test_searchUserId(self):
substring = 'person_id'
user_id_set = {substring + '1', '1' + substring}
for user_id in user_id_set:
self._makePerson(user_id=user_id)
self.assertEqual(
user_id_set,
{x['userid'] for x in self.portal.acl_users.searchUsers(id=substring, exact_match=False)},
)
def test_searchLogin(self):
substring = 'person_login'
login_set = {substring + '1', '1' + substring}
for login in login_set:
self._makePerson(login=login)
self.assertEqual(
login_set,
{x['login'] for x in self.portal.acl_users.searchUsers(login=substring, exact_match=False)},
)
def test_searchUsersIdExactMatch(self):
substring = 'person2_id'
self._makePerson(user_id=substring)
self._makePerson(user_id=substring + '1')
self._makePerson(user_id='1' + substring)
self.assertEqual(
[substring],
[x['userid'] for x in self.portal.acl_users.searchUsers(id=substring, exact_match=True)],
)
def test_searchUsersLoginExactMatch(self):
substring = 'person2_login'
self._makePerson(login=substring)
self._makePerson(login=substring + '1')
self._makePerson(login='1' + substring)
self.assertEqual(
[substring],
[x['login'] for x in self.portal.acl_users.searchUsers(login=substring, exact_match=True)],
)
def test_PersonNotValidNotCached(self):
user_id, login, password = self._makePerson()
password += '2'
pas_user, = self.portal.acl_users.searchUsers(login=login, exact_match=True)
pas_login, = pas_user['login_list']
self._assertUserDoesNotExists(login, password)
self.portal.restrictedTraverse(pas_login['path']).setPassword(password)
self._assertUserExists(login, password)
class TestMigration(UserManagementTestCase):
"""Tests migration to from login on the person to ERP5 Login documents.
"""
def test_PersonLoginMigration(self):
if 'erp5_users' not in self.portal.acl_users:
self.portal.acl_users.manage_addProduct['ERP5Security'].addERP5UserManager('erp5_users')
......@@ -562,164 +784,6 @@ class TestUserManagement(ERP5TypeTestCase):
self.assertTrue('erp5_login_users' in \
[x[0] for x in self.portal.acl_users.plugins.listPlugins(IUserEnumerationPlugin)])
def test_AssignmentWithDate(self):
"""Tests a person with an assignment with correct date is a valid user."""
date = DateTime()
_, login, password = self._makePerson(
assignment_start_date=date - 5,
assignment_stop_date=date + 5,
)
self._assertUserExists(login, password)
def test_AssignmentWithBadStartDate(self):
"""Tests a person with an assignment with bad start date is not a valid user."""
date = DateTime()
_, login, password = self._makePerson(
assignment_start_date=date + 1,
assignment_stop_date=date + 5,
)
self._assertUserDoesNotExists(login, password)
def test_AssignmentWithBadStopDate(self):
"""Tests a person with an assignment with bad stop date is not a valid user."""
date = DateTime()
_, login, password = self._makePerson(
assignment_start_date=date - 5,
assignment_stop_date=date - 1,
)
self._assertUserDoesNotExists(login, password)
def test_securityGroupAssignmentCorrectDate(self):
"""
Tests a person with an assignment with correct date
gets correctly assigned to security groups.
"""
date = DateTime()
user_id, login, password = self._makePerson(
assignment_start_date=date - 5,
assignment_stop_date=date + 5,
group_value=self._getOrCreateGroupValue()
)
self.assertIn(
'Assignee',
self.portal.acl_users.getUserById(user_id).\
getRolesInContext(self._createDummyDocument())
)
def test_securityGroupAssignmentBadStartDate(self):
"""
Tests a person with an assignment with bad (future) start date
does not get assigned to security groups.
"""
date = DateTime()
user_id, login, password = self._makePerson(
assignment_start_date=date + 1,
assignment_stop_date=date + 5,
group_value=self._getOrCreateGroupValue()
)
self.assertNotIn(
'Assignee',
self.portal.acl_users.getUserById(user_id).\
getRolesInContext(self._createDummyDocument())
)
def test_securityGroupAssignmentBadStopDate(self):
"""
Tests a person with an assignment with bad (past) stop date
does not get assigned to security groups.
"""
date = DateTime()
user_id, login, password = self._makePerson(
assignment_start_date=date - 5,
assignment_stop_date=date - 1,
group_value=self._getOrCreateGroupValue()
)
self.assertNotIn(
'Assignee',
self.portal.acl_users.getUserById(user_id).\
getRolesInContext(self._createDummyDocument())
)
def test_DeletedPersonIsNotUser(self):
user_id, login, password = self._makePerson()
self._assertUserExists(login, password)
acl_user, = self.portal.acl_users.searchUsers(id=user_id, exact_match=True)
self.portal.restrictedTraverse(acl_user['path']).delete()
self.commit()
self._assertUserDoesNotExists(login, password)
def test_ReallyDeletedPersonIsNotUser(self):
user_id, login, password = self._makePerson()
acl_user, = self.portal.acl_users.searchUsers(id=user_id, exact_match=True)
p = self.portal.restrictedTraverse(acl_user['path'])
self._assertUserExists(login, password)
p.getParentValue().deleteContent(p.getId())
self.commit()
self._assertUserDoesNotExists(login, password)
def test_InvalidatedPersonIsUser(self):
user_id, login, password = self._makePerson()
acl_user, = self.portal.acl_users.searchUsers(id=user_id, exact_match=True)
p = self.portal.restrictedTraverse(acl_user['path'])
self._assertUserExists(login, password)
p.validate()
p.invalidate()
self.commit()
self._assertUserExists(login, password)
def test_UserIdIsPossibleToUnset(self):
"""Make sure that it is possible to remove user id"""
user_id, login, password = self._makePerson()
acl_user, = self.portal.acl_users.searchUsers(id=user_id, exact_match=True)
person = self.portal.restrictedTraverse(acl_user['path'])
person.setUserId(None)
self.tic()
self.assertEqual(None, person.Person_getUserId())
def test_duplicatePersonUserId(self):
user_id, _, _ = self._makePerson()
self.assertRaises(ValidationFailed, self._makePerson, user_id=user_id)
def test_duplicateLoginReference(self):
_, login1, _ = self._makePerson()
_, login2, _ = self._makePerson()
pas_user2, = self.portal.acl_users.searchUsers(login=login2, exact_match=True)
pas_login2, = pas_user2['login_list']
login2_value = self.portal.restrictedTraverse(pas_login2['path'])
login2_value.invalidate()
login2_value.setReference(login1)
self.commit()
self.assertRaises(ValidationFailed, login2_value.validate)
self.assertRaises(ValidationFailed, self.portal.portal_workflow.doActionFor, login2_value, 'validate_action')
def _duplicateLoginReference(self, commit):
_, login1, _ = self._makePerson(tic=False)
user_id2, login2, _ = self._makePerson(tic=False)
if commit:
self.commit()
# Note: cannot rely on catalog, on purpose.
person_value, = [
x for x in self.portal.person_module.objectValues()
if x.Person_getUserId() == user_id2
]
login_value, = [
x for x in person_value.objectValues(portal_type='ERP5 Login')
if x.getReference() == login2
]
login_value.invalidate()
login_value.setReference(login1)
self.portal.portal_workflow.doActionFor(login_value, 'validate_action')
result = [x for x in self.portal.portal_catalog(portal_type='ERP5 Login') if x.checkConsistency()]
self.assertEqual(result, [])
self.tic()
result = [x for x in self.portal.portal_catalog(portal_type='ERP5 Login') if x.checkConsistency()]
self.assertEqual(len(result), 2)
def test_duplicateLoginReferenceInSameTransaction(self):
self._duplicateLoginReference(False)
def test_duplicateLoginReferenceInAnotherTransaction(self):
self._duplicateLoginReference(True)
class TestUserManagementExternalAuthentication(TestUserManagement):
def getTitle(self):
......@@ -767,10 +831,9 @@ class TestUserManagementExternalAuthentication(TestUserManagement):
self.assertTrue(reference in response.getBody())
class TestLocalRoleManagement(ERP5TypeTestCase):
class TestLocalRoleManagement(RoleManagementTestCase):
"""Tests Local Role Management with ERP5Security.
This test should probably part of ERP5Type ?
"""
def getTitle(self):
return "ERP5Security: User Role Management"
......@@ -778,33 +841,8 @@ class TestLocalRoleManagement(ERP5TypeTestCase):
def afterSetUp(self):
"""Called after setup completed.
"""
self.portal = self.getPortal()
# create a security configuration script
skin_folder = self.portal.portal_skins.custom
if 'ERP5Type_getSecurityCategoryMapping' not in skin_folder.objectIds():
createZODBPythonScript(
skin_folder, 'ERP5Type_getSecurityCategoryMapping', '',
"""return ((
'ERP5Type_getSecurityCategoryFromAssignment',
context.getPortalObject().getPortalAssignmentBaseCategoryList()
),)
""")
# configure group, site, function categories
category_tool = self.getCategoryTool()
for bc in ['group', 'site', 'function']:
base_cat = category_tool[bc]
code = bc[0].upper()
if base_cat.get('subcat', None) is not None:
continue
base_cat.newContent(portal_type='Category',
id='subcat',
codification="%s1" % code)
base_cat.newContent(portal_type='Category',
id='another_subcat',
codification="%s2" % code)
self.defined_category = "group/subcat\n"\
"site/subcat\n"\
"function/subcat"
super(TestLocalRoleManagement, self).afterSetUp()
# any member can add organisations
self.portal.organisation_module.manage_permission(
'Add portal content', roles=['Member', 'Manager'], acquire=1)
......@@ -824,23 +862,6 @@ class TestLocalRoleManagement(ERP5TypeTestCase):
self.person = pers
self.tic()
def beforeTearDown(self):
"""Called before teardown."""
# clear base categories
self.person.getParentValue().manage_delObjects([self.person.getId()])
for bc in ['group', 'site', 'function']:
base_cat = self.getCategoryTool()[bc]
base_cat.manage_delObjects(list(base_cat.objectIds()))
# clear role definitions
for ti in self.getTypesTool().objectValues():
ti.manage_delObjects([x.id for x in ti.getRoleInformationList()])
# clear modules
for module in self.portal.objectValues():
if module.getId().endswith('_module'):
module.manage_delObjects(list(module.objectIds()))
# commit this
self.tic()
def loginAsUser(self, username):
uf = self.portal.acl_users
user = uf.getUserById(username).__of__(uf)
......@@ -1058,6 +1079,15 @@ class TestLocalRoleManagement(ERP5TypeTestCase):
basic='guest:guest')
self.assertEqual(response.getStatus(), 401)
class TestKeyAuthentication(RoleManagementTestCase):
def getBusinessTemplateList(self):
"""This test also uses web and dms
"""
return super(TestKeyAuthentication, self).getBusinessTemplateList() + (
'erp5_base', 'erp5_web', 'erp5_ingestion', 'erp5_dms', 'erp5_administration')
def testKeyAuthentication(self):
"""
Make sure that we can grant security using a key.
......@@ -1157,6 +1187,8 @@ class TestLocalRoleManagement(ERP5TypeTestCase):
base_url, web_page.getReference(), 'ERP5TypeTestCase', ''))
self.assertEqual(response.getStatus(), 200)
class TestOwnerRole(UserManagementTestCase):
def _createZodbUser(self, login, role_list=None):
if role_list is None:
role_list = ['Member', 'Assignee', 'Assignor', 'Author', 'Auditor',
......@@ -1240,6 +1272,53 @@ class TestLocalRoleManagement(ERP5TypeTestCase):
(((cloning_owner_id), ('Owner',)),)
)
class TestAuthenticationCookie(UserManagementTestCase):
"""Test the authentication cookie.
Most of this functionality is already tested in testCookieiCrumbler, this
test uses a fully setup ERP5 site.
"""
def testCookieAttributes(self):
"""ERP5 sets some cookie attributes
"""
_, login, password = self._makePerson()
self.tic()
request = self.portal.REQUEST
request.form['__ac_name'] = login
request.form['__ac_password'] = password
request['PARENTS'] = [self.portal]
# (the secure flag is only set if we accessed through https)
request.setServerURL('https', 'example.com')
request.traverse('/')
response = request.RESPONSE
ac_cookie, = [v for (k, v) in response.listHeaders() if k.lower() == 'set-cookie' and '__ac=' in v]
# Secure flag so that cookie is sent only on https
self.assertIn('; Secure', ac_cookie)
# HTTPOnly flag so that javascript cannot access cookie
self.assertIn('; HTTPOnly', ac_cookie)
class TestReindexObjectSecurity(UserManagementTestCase):
def afterSetUp(self):
super(TestReindexObjectSecurity, self).afterSetUp()
self.username = 'usérn@me'
# create a user and open an assignement
pers = self.getPersonModule().newContent(portal_type='Person',
user_id=self.username)
assignment = pers.newContent( portal_type='Assignment',
group='subcat',
site='subcat',
function='subcat' )
assignment.open()
pers.newContent(portal_type='ERP5 Login',
reference=self.username,
password=self.username).validate()
self.tic()
def _checkMessageMethodIdList(self, expected_method_id_list):
actual_method_id_list = sorted([
message.method_id
......@@ -1274,9 +1353,3 @@ class TestLocalRoleManagement(ERP5TypeTestCase):
check(['immediateReindexObject'] * (len(person) + 1))
self.tic()
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestUserManagement))
suite.addTest(unittest.makeSuite(TestUserManagementExternalAuthentication))
suite.addTest(unittest.makeSuite(TestLocalRoleManagement))
return suite
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment