Commit 403ad307 authored by Kazuhiko Shiozaki's avatar Kazuhiko Shiozaki

[by Vincent] Get rid of all module-global callables. Use PAS API instead.

Use all non-PAS API public methods. Use PAS API instead.
Implement IAuthenticationPlugin API better, so PAS API becomes usable for
ERP5-based authentication.
parent 28f07df3
...@@ -19,18 +19,11 @@ from Products.ERP5Type.Globals import InitializeClass ...@@ -19,18 +19,11 @@ from Products.ERP5Type.Globals import InitializeClass
from AccessControl import ClassSecurityInfo from AccessControl import ClassSecurityInfo
from AccessControl.AuthEncoding import pw_validate from AccessControl.AuthEncoding import pw_validate
from Products.PageTemplates.PageTemplateFile import PageTemplateFile from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.PluggableAuthService import \
_SWALLOWABLE_PLUGIN_EXCEPTIONS
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.utils import classImplements from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlugin from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlugin
from Products.PluggableAuthService.interfaces.plugins import IUserEnumerationPlugin from Products.PluggableAuthService.interfaces.plugins import IUserEnumerationPlugin
from Products.ERP5Type.Cache import CachingMethod, transactional_cached
from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
from ZODB.POSException import ConflictError
import sys
from DateTime import DateTime from DateTime import DateTime
from zLOG import LOG, PROBLEM
# This user is used to bypass all security checks. # This user is used to bypass all security checks.
SUPER_USER = '__erp5security-=__' SUPER_USER = '__erp5security-=__'
...@@ -39,358 +32,198 @@ manage_addERP5UserManagerForm = PageTemplateFile( ...@@ -39,358 +32,198 @@ manage_addERP5UserManagerForm = PageTemplateFile(
'www/ERP5Security_addERP5UserManager', globals(), 'www/ERP5Security_addERP5UserManager', globals(),
__name__='manage_addERP5UserManagerForm' ) __name__='manage_addERP5UserManagerForm' )
def addERP5UserManager(dispatcher, id, title=None, REQUEST=None): def addERP5UserManager(dispatcher, id, title=None, RESPONSE=None):
""" Add a ERP5UserManager to a Pluggable Auth Service. """ """ Add a ERP5UserManager to a Pluggable Auth Service. """
eum = ERP5UserManager(id, title) eum = ERP5UserManager(id, title)
dispatcher._setObject(eum.getId(), eum) dispatcher._setObject(eum.getId(), eum)
if RESPONSE is not None:
if REQUEST is not None: RESPONSE.redirect(eum.absolute_url() + '/manage_main')
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5UserManager+added.'
% dispatcher.absolute_url())
class _AuthenticationFailure(Exception):
"""Raised when authentication failed, to prevent caching the fact that a user
does not exist (yet), which happens when someone try to login before the user
account is ready (like when the indexing not finished, an assignment not open
etc...)
"""
@transactional_cached(lambda portal, *args: args)
def getUserByLogin(portal, login, exact_match=True):
if isinstance(login, basestring):
login = login,
if exact_match:
reference_key = 'ExactMatch'
else:
reference_key = 'Keyword'
if not (portal.portal_catalog.hasColumn('portal_type') and portal.portal_catalog.hasColumn('reference')):
raise RuntimeError('Catalog does not have column information. Make sure RDB is working and disk is not full.')
result = portal.portal_catalog.unrestrictedSearchResults(
select_list=('reference, portal_type'),
portal_type=("ERP5 Login"),
reference=dict(query=login, key=reference_key))
# XXX: Here, we filter catalog result list ALTHOUGH we did pass
# parameters to unrestrictedSearchResults to restrict result set.
# This is done because the following values can match person with
# reference "foo":
# "foo " because of MySQL (feature, PADSPACE collation):
# mysql> SELECT reference as r FROM catalog
# -> WHERE reference="foo ";
# +-----+
# | r |
# +-----+
# | foo |
# +-----+
# 1 row in set (0.01 sec)
# "bar OR foo" because of ZSQLCatalog tokenizing searched strings
# by default (feature).
result_list = [x.getObject().getParentValue()
for x in result if not exact_match
or x['reference'] in login]
if result_list:
return result_list
result = portal.portal_catalog.unrestrictedSearchResults(
select_list=('reference, portal_type'),
portal_type=("Person"),
reference=dict(query=login, key=reference_key))
return [x.getObject() for x in result if not exact_match
or x['reference'] in login]
@transactional_cached(lambda portal, *args: args)
def getValidAssignmentList(user):
"""Returns list of valid assignments."""
assignment_list = [x for x in user.contentValues(portal_type="Assignment") if x.getValidationState() == "open"]
valid_assignment_list = []
# check dates if exist
login_date = DateTime()
for assignment in assignment_list:
if assignment.getStartDate() is not None and \
assignment.getStartDate() > login_date:
continue
if assignment.getStopDate() is not None and \
assignment.getStopDate() < login_date:
continue
valid_assignment_list.append(assignment)
return valid_assignment_list
class ERP5UserManager(BasePlugin): class ERP5UserManager(BasePlugin):
""" PAS plugin for managing users in ERP5 """ PAS plugin for managing users in ERP5
""" """
meta_type = 'ERP5 User Manager' meta_type = 'ERP5 User Manager'
login_portal_type = 'ERP5 Login' login_portal_type = 'ERP5 Login'
security = ClassSecurityInfo() security = ClassSecurityInfo()
def __init__(self, id, title=None): def __init__(self, id, title=None):
self._id = self.id = id self._id = self.id = id
self.title = title self.title = title
def getLoginPortalType(self):
return self.login_portal_type
def getPersonByReference(self, reference):
def _getPersonRelativeUrlFromReference(reference):
person_url = self.REQUEST.get('_person_cache', {}).get(reference)
portal = self.getPortalObject()
if person_url is not None:
return person_url
else:
person_list = portal.portal_catalog.unrestrictedSearchResults(
select_list=('relative_url', 'reference'),
portal_type='Person',
reference={'query': reference, 'key': 'ExactMatch'},
limit=2
)
l = len(person_list)
if l > 1:
raise RuntimeError, 'More than one Person have login %r' % \
(reference,)
elif l == 1:
self.REQUEST.set('_person_cache', {})
self.REQUEST['_person_cache'][person_list[0]['reference']] = \
person_list[0]['relative_url']
return person_list[0]['relative_url']
person_relative_url = _getPersonRelativeUrlFromReference(reference)
if person_relative_url is not None:
return self.getPortalObject().unrestrictedTraverse(
person_relative_url)
def checkPersonValidity(self, person):
if person.getValidationState() in ('deleted',):
return False
now = DateTime()
for assignment in person.contentValues(portal_type="Assignment"):
if assignment.getValidationState() != "open":
continue
if assignment.hasStartDate() and \
assignment.getStartDate() > now:
continue
if assignment.hasStopDate() and \
assignment.getStopDate() < now:
continue
return True
return False
# #
# IAuthenticationPlugin implementation # IAuthenticationPlugin implementation
# #
security.declarePrivate( 'authenticateCredentials' ) security.declarePrivate('authenticateCredentials')
def authenticateCredentials(self, credentials): def authenticateCredentials(self, credentials):
""" See IAuthenticationPlugin. login_portal_type = credentials.get(
'login_portal_type',
self.login_portal_type,
)
if 'external_login' in credentials:
check_password = False
login_value = self._getLoginValueFromLogin(
credentials.get('external_login'),
login_portal_type=login_portal_type,
)
elif 'login_relative_url' in credentials:
check_password = False
login_value = self.getPortalObject().unrestrictedTraverse(
credentials.get("login_relative_url"),
)
else:
check_password = True
login_value = self._getLoginValueFromLogin(
credentials.get('login'),
login_portal_type=login_portal_type,
)
if login_value is None:
return
# XXX: need better api on login object !
user_value = login_value.getParentValue()
if user_value.getValidationState() == 'deleted':
return
now = DateTime()
for assignment in user_value.contentValues(portal_type="Assignment"):
if assignment.getValidationState() == "open" and (
not assignment.hasStartDate() or assignment.getStartDate() <= now
) and (
not assignment.hasStopDate() or assignment.getStopDate() >= now
):
break
else:
return
is_authentication_policy_enabled = self.getPortalObject().portal_preferences.isAuthenticationPolicyEnabled()
if check_password and not pw_validate(
login_value.getPassword(),
credentials.get('password'),
):
if is_authentication_policy_enabled:
login_value.notifyLoginFailure()
return
if is_authentication_policy_enabled:
if login_value.isPasswordExpired():
login_value.notifyPasswordExpire()
return
if login_value.isLoginBlocked():
return
return (user_value.getReference(), login_value.getReference())
o We expect the credentials to be those returned by def _getLoginValueFromLogin(self, login, login_portal_type=None):
ILoginPasswordExtractionPlugin.
"""
login = credentials.get('login')
ignore_password = False
if not login:
# fallback to support plugins using external tools to extract login
# those are not using login/password pair, they just extract login
# from remote system (eg. SSL certificates)
login = credentials.get('external_login')
ignore_password = True
# Forbidden the usage of the super user. # Forbidden the usage of the super user.
if login == SUPER_USER: if login == SUPER_USER:
return None return
user_list = self.enumerateUsers(
@UnrestrictedMethod
def _authenticateCredentials(login, password, portal_type,
ignore_password=False):
if not login or not (password or ignore_password):
return None, None
login_object = self.getLoginObject(login, portal_type)
if not login_object:
raise _AuthenticationFailure(None)
if login_object.getPortalType() == 'Person':
# BBB
user = login_object
else:
user = login_object.getParentValue()
try:
if self.checkPersonValidity(user) and \
(ignore_password or self._validatePassword(login_object, password)):
return user.getReference(), login_object.getRelativeUrl()
finally:
pass
raise _AuthenticationFailure(login_object.getRelativeUrl())
_authenticateCredentials = CachingMethod(
_authenticateCredentials,
id=self.__class__.__name__ + '_authenticateCredentials',
cache_factory='erp5_content_short')
try:
user_reference, login_url = _authenticateCredentials(
login=login, login=login,
password=credentials.get('password'), exact_match=True,
portal_type=credentials.get('login_portal_type', login_portal_type=login_portal_type,
self.login_portal_type), )
ignore_password=ignore_password) if not user_list:
except _AuthenticationFailure, exception: return
user_reference = None single_user, = user_list
login_url = exception.message or None single_login, = single_user['login_list']
return self.getPortalObject().unrestrictedTraverse(
if user_reference and '_login_cache' not in self.REQUEST: single_login['path'],
self.REQUEST.set('_login_cache', {}) )
self.REQUEST['_login_cache'][user_reference] = login_url
if not self.getPortalObject().portal_preferences.isAuthenticationPolicyEnabled():
# stop here, no authentication policy enabled
# so just return authentication check result
if user_reference:
return (user_reference, user_reference)
else:
return None
if login_url is None:
return None
# authentication policy enabled, we need person object anyway
login = self.getPortalObject().unrestrictedTraverse(login_url)
if login and '_person_cache' not in self.REQUEST:
self.REQUEST.set('_person_cache', {})
self.REQUEST['_person_cache'][user_reference] = login.getParentValue().getRelativeUrl()
if user_reference is None:
# file a failed authentication attempt
login.notifyLoginFailure()
return None
# check if password is expired
if login.isPasswordExpired():
login.notifyPasswordExpire()
return None
# check if login is blocked
if login.isLoginBlocked():
return None
return (user_reference, user_reference)
def _validatePassword(self, login_object, password):
return pw_validate(login_object.getPassword(), password)
# #
# IUserEnumerationPlugin implementation # IUserEnumerationPlugin implementation
# #
security.declarePrivate( 'enumerateUsers' ) security.declarePrivate('enumerateUsers')
def enumerateUsers(self, id=None, login=None, exact_match=False, def enumerateUsers(self, id=None, login=None, exact_match=False,
sort_by=None, max_results=None, **kw): sort_by=None, max_results=None, login_portal_type=None, **kw):
""" See IUserEnumerationPlugin. """ See IUserEnumerationPlugin.
""" """
if not id: unrestrictedSearchResults = self.getPortalObject(
id = login ).portal_catalog.unrestrictedSearchResults
searchUser = lambda **kw: unrestrictedSearchResults(
select_list=('reference', ),
portal_type='Person',
**kw
).dictionaries()
searchLogin = lambda **kw: unrestrictedSearchResults(
select_list=('parent_uid', 'reference'),
portal_type=login_portal_type,
validation_state='validated',
).dictionaries()
if login is None:
# Only search by id if login is not given. Same logic as in
# PluggableAuthService.searchUsers.
if isinstance(id, str): if isinstance(id, str):
id = (id,) id = (id, )
if isinstance(id, list):
id = tuple(id)
user_info = []
plugin_id = self.getId()
id_list = [] id_list = []
has_super_user = False
for user_id in id: for user_id in id:
if SUPER_USER == user_id: if user_id == SUPER_USER:
info = {'id' : SUPER_USER, has_super_user = True
'login' : SUPER_USER, elif user_id:
'pluginid' : plugin_id,
}
user_info.append(info)
else:
id_list.append(user_id) id_list.append(user_id)
if id_list: if id_list:
if exact_match: if exact_match:
for reference in id_list: requested = set(id_list).__contains__
user = self.getPersonByReference(reference)
if user is not None:
info = {'id': reference,
'login' : reference,
'pluginid': plugin_id,
}
user_info.append(info)
else: else:
for user in self.getPortalObject().portal_catalog.unrestrictedSearchResults( requested = lambda x: True
select_list=('reference',), user_list = [
portal_type='Person', x for x in searchUser(
reference={'query': id_list, 'key': 'Keyword'}, reference={
): 'query': id_list,
info = {'id': user['reference'], 'key': 'ExactMatch' if exact_match else 'Keyword',
'login' : user['reference'], },
'pluginid' : plugin_id, limit=max_results,
}
user_info.append(info)
return tuple(user_info)
@transactional_cached(lambda self, *args: args)
def getLoginObject(self, login, portal_type):
try:
if not login:
return
catalog_result = self.getPortalObject().portal_catalog.unrestrictedSearchResults(
select_list=('portal_type', 'reference', 'validation_state'),
portal_type=(portal_type, 'Person'),
reference=dict(query=login, key='ExactMatch'),
sort_on=(('portal_type',),),
) )
for x in catalog_result: if requested(x['reference'])
if x['portal_type'] != 'Person' and x['validation_state'] != 'validated': ]
continue else:
if x['reference'] != login: user_list = []
continue login_dict = {}
x = x.getObject() if user_list:
if x.objectIds(spec='ERP5 Login'): for login in searchLogin(parent_uid=[x['uid'] for x in user_list]):
continue # Already migrated. login_dict.setdefault(login.parent_uid).append(login)
return x if has_super_user:
except ConflictError: user_list.append({'uid': None, 'reference': SUPER_USER})
raise else:
except: if isinstance(login, str):
LOG('ERP5Security', PROBLEM, 'getLoginObject failed', error=sys.exc_info()) login = (login, )
# Here we must raise an exception to prevent callers from caching login_dict = {}
# a result of a degraded situation. if exact_match:
# The kind of exception does not matter as long as it's catched by requested = set(login).__contains__
# PAS and causes a lookup using another plugin or user folder. else:
# As PAS does not define explicitely such exception, we must use requested = lambda x: True
# the _SWALLOWABLE_PLUGIN_EXCEPTIONS list. if login:
raise _SWALLOWABLE_PLUGIN_EXCEPTIONS[0] for login in searchLogin(
reference={
def getUserByLogin(self, login, exact_match=True): 'query': login,
# Search the Catalog for login and return a list of person objects 'key': 'ExactMatch' if exact_match else 'Keyword',
# login can be a string or a list of strings }
# (no docstring to prevent publishing) limit=max_results,
if not login: ):
return [] if requested(login['refernce']):
if isinstance(login, list): login_dict.setdefault(login['parent_uid']).append(x)
login = tuple(login) if login_dict:
elif not isinstance(login, tuple): user_list = searchUser(uid=list(login_dict))
login = str(login) else:
try: user_list = []
return getUserByLogin(self.getPortalObject(), login, exact_match) plugin_id = self.getId()
except ConflictError: return tuple([
raise {
except: 'id': user['reference'],
LOG('ERP5Security', PROBLEM, 'getUserByLogin failed', error=sys.exc_info()) # Note: PAS forbids us from returning more than one entry per given id,
# Here we must raise an exception to prevent callers from caching # so take any available login.
# a result of a degraded situation. 'login': login_dict.get(user['uid'], [None])[0],
# The kind of exception does not matter as long as it's catched by 'pluginid': plugin_id,
# PAS and causes a lookup using another plugin or user folder.
# As PAS does not define explicitely such exception, we must use
# the _SWALLOWABLE_PLUGIN_EXCEPTIONS list.
raise _SWALLOWABLE_PLUGIN_EXCEPTIONS[0]
classImplements( ERP5UserManager # Extra properties, specific to ERP5
, IAuthenticationPlugin 'path': user['path'],
, IUserEnumerationPlugin 'login_list': [
) {
'reference': login['reference'],
'path': login['path'],
'uid': login['uid'],
}
for login in login_dict.get(user['uid'], [])
],
}
for user in user_list
])
classImplements(ERP5UserManager, IAuthenticationPlugin, IUserEnumerationPlugin)
InitializeClass(ERP5UserManager) InitializeClass(ERP5UserManager)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment