Commit 975ad4d7 authored by Kazuhiko Shiozaki's avatar Kazuhiko Shiozaki

ERP5Security: cleanup. fix indentation and remove unused import.

parent 3751610a
......@@ -28,7 +28,6 @@
#
##############################################################################
from zLOG import LOG, PROBLEM
from Products.ERP5Type.Globals import InitializeClass
from AccessControl import ClassSecurityInfo
......@@ -102,11 +101,11 @@ def addERP5AccessTokenExtractionPlugin(dispatcher, id, title=None, REQUEST=None)
dispatcher._setObject(plugin.getId(), plugin)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5AccessTokenExtractionPlugin+added.'
% dispatcher.absolute_url())
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5AccessTokenExtractionPlugin+added.'
% dispatcher.absolute_url())
#List implementation of class
classImplements(ERP5AccessTokenExtractionPlugin,
......
......@@ -35,8 +35,8 @@ from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.ERP5Security.ERP5UserManager import SUPER_USER
from Products.PluggableAuthService.PluggableAuthService import DumbHTTPExtractor
from AccessControl.SecurityManagement import getSecurityManager,\
setSecurityManager, newSecurityManager
from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager, newSecurityManager
#Form for new plugin in ZMI
manage_addERP5BearerExtractionPluginForm = PageTemplateFile(
......@@ -50,11 +50,11 @@ def addERP5BearerExtractionPlugin(dispatcher, id, title=None, REQUEST=None):
dispatcher._setObject(plugin.getId(), plugin)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5BearerExtractionPlugin+added.'
% dispatcher.absolute_url())
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5BearerExtractionPlugin+added.'
% dispatcher.absolute_url())
class ERP5BearerExtractionPlugin(BasePlugin):
"""
......
......@@ -69,11 +69,11 @@ def addERP5DumbHTTPExtractionPlugin(dispatcher, id, title=None, REQUEST=None):
dispatcher._setObject(plugin.getId(), plugin)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5DumbHTTPExtractionPlugin+added.'
% dispatcher.absolute_url())
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5DumbHTTPExtractionPlugin+added.'
% dispatcher.absolute_url())
#List implementation of class
classImplements(ERP5DumbHTTPExtractionPlugin,
......
......@@ -49,11 +49,11 @@ def addERP5ExternalAuthenticationPlugin(dispatcher, id, title=None, user_id_key=
dispatcher._setObject(plugin.getId(), plugin)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5ExternalAuthenticationPlugin+added.'
% dispatcher.absolute_url())
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5ExternalAuthenticationPlugin+added.'
% dispatcher.absolute_url())
class ERP5ExternalAuthenticationPlugin(BasePlugin):
"""
......
......@@ -35,8 +35,8 @@ from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.ERP5Security.ERP5UserManager import SUPER_USER
from Products.PluggableAuthService.PluggableAuthService import DumbHTTPExtractor
from AccessControl.SecurityManagement import getSecurityManager,\
setSecurityManager, newSecurityManager
from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager, newSecurityManager
from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE
import socket
from Products.ERP5Security.ERP5UserManager import getUserByLogin
......@@ -66,11 +66,11 @@ def addERP5FacebookExtractionPlugin(dispatcher, id, title=None, REQUEST=None):
dispatcher._setObject(plugin.getId(), plugin)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5FacebookExtractionPlugin+added.'
% dispatcher.absolute_url())
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5FacebookExtractionPlugin+added.'
% dispatcher.absolute_url())
#Form for new plugin in ZMI
manage_addERP5GoogleExtractionPluginForm = PageTemplateFile(
......@@ -84,11 +84,11 @@ def addERP5GoogleExtractionPlugin(dispatcher, id, title=None, REQUEST=None):
dispatcher._setObject(plugin.getId(), plugin)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5GoogleExtractionPlugin+added.'
% dispatcher.absolute_url())
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5GoogleExtractionPlugin+added.'
% dispatcher.absolute_url())
class ERP5ExternalOauth2ExtractionPlugin:
......
......@@ -17,8 +17,6 @@
from Products.ERP5Type.Globals import InitializeClass
from AccessControl import ClassSecurityInfo
from AccessControl.SecurityManagement import newSecurityManager,\
getSecurityManager, setSecurityManager
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.utils import classImplements
......@@ -28,7 +26,6 @@ from Products.ERP5Type.ERP5Type \
import ERP5TYPE_SECURITY_GROUP_ID_GENERATION_SCRIPT
from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
from Products.ZSQLCatalog.SQLCatalog import SimpleQuery
from Products.PluggableAuthService.PropertiedUser import PropertiedUser
from ZODB.POSException import ConflictError
import sys
......@@ -130,7 +127,6 @@ class ERP5GroupManager(BasePlugin):
else: # no person is linked to this user login
return ()
person_object = catalog_result[0].getObject()
person_id = person_object.getId()
# Fetch category values from defined scripts
for (method_name, base_category_list) in security_definition_list:
......
......@@ -35,9 +35,6 @@ from Products.ERP5Type.Globals import InitializeClass
from zope.interface import Interface
from AccessControl import ClassSecurityInfo
from AccessControl.SecurityManagement import getSecurityManager,\
newSecurityManager,\
setSecurityManager
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
......@@ -49,8 +46,8 @@ from Products.PluggableAuthService.plugins.CookieAuthHelper import CookieAuthHel
from Products.ERP5Type.Cache import CachingMethod
from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
from Products.ERP5Security.ERP5UserManager import ERP5UserManager,\
SUPER_USER,\
from Products.ERP5Security.ERP5UserManager import ERP5UserManager, \
SUPER_USER, \
_AuthenticationFailure
from Crypto.Cipher import AES
......@@ -136,22 +133,22 @@ manage_addERP5KeyAuthPluginForm = PageTemplateFile(
'www/ERP5Security_addERP5KeyAuthPlugin', globals(),
__name__='manage_addERP5KeyAuthPluginForm')
def addERP5KeyAuthPlugin(dispatcher, id, title=None,\
encryption_key='', cipher='AES', cookie_name='',\
def addERP5KeyAuthPlugin(dispatcher, id, title=None,
encryption_key='', cipher='AES', cookie_name='',
default_cookie_name='',REQUEST=None):
""" Add a ERP5KeyAuthPlugin to a Pluggable Auth Service. """
""" Add a ERP5KeyAuthPlugin to a Pluggable Auth Service. """
plugin = ERP5KeyAuthPlugin(id=id, title=title, encryption_key=encryption_key,
cipher=cipher, cookie_name=cookie_name,
default_cookie_name=default_cookie_name)
dispatcher._setObject(plugin.getId(), plugin)
plugin = ERP5KeyAuthPlugin(id=id, title=title, encryption_key=encryption_key,
cipher=cipher, cookie_name=cookie_name,
default_cookie_name=default_cookie_name)
dispatcher._setObject(plugin.getId(), plugin)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5KeyAuthPlugin+added.'
% dispatcher.absolute_url())
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5KeyAuthPlugin+added.'
% dispatcher.absolute_url())
class ERP5KeyAuthPlugin(ERP5UserManager, CookieAuthHelper):
"""
......@@ -277,9 +274,9 @@ class ERP5KeyAuthPlugin(ERP5UserManager, CookieAuthHelper):
if creds:
creds['remote_host'] = request.get('REMOTE_HOST', '')
try:
creds['remote_address'] = request.getClientAddr()
creds['remote_address'] = request.getClientAddr()
except AttributeError:
creds['remote_address'] = request.get('REMOTE_ADDR', '')
creds['remote_address'] = request.get('REMOTE_ADDR', '')
except StandardError, e:
#Log standard error to check error
LOG('ERP5KeyAuthPlugin.extractCredentials', PROBLEM, str(e))
......@@ -373,14 +370,13 @@ class ERP5KeyAuthPlugin(ERP5UserManager, CookieAuthHelper):
id='ERP5KeyAuthPlugin_authenticateCredentials',
cache_factory='erp5_content_short')
try:
return _authenticateCredentials(
login=login)
return _authenticateCredentials(login=login)
except _AuthenticationFailure:
return None
return None
except StandardError, e:
#Log standard error
LOG('ERP5KeyAuthPlugin.authenticateCredentials', PROBLEM, str(e))
return None
#Log standard error
LOG('ERP5KeyAuthPlugin.authenticateCredentials', PROBLEM, str(e))
return None
################################
# Properties for ZMI managment #
......@@ -429,8 +425,8 @@ class ERP5KeyAuthPlugin(ERP5UserManager, CookieAuthHelper):
#Redirect
if RESPONSE is not None:
if error_message != '':
self.REQUEST.form['manage_tabs_message'] = error_message
return self.manage_editERP5KeyAuthPluginForm(RESPONSE)
self.REQUEST.form['manage_tabs_message'] = error_message
return self.manage_editERP5KeyAuthPluginForm(RESPONSE)
else:
message = "Updated"
RESPONSE.redirect( '%s/manage_editERP5KeyAuthPluginForm'
......
......@@ -20,60 +20,60 @@ from AccessControl import ClassSecurityInfo
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.interfaces.plugins import IRolesPlugin,\
IRoleEnumerationPlugin
from Products.PluggableAuthService.interfaces.plugins import IRolesPlugin, \
IRoleEnumerationPlugin
from ERP5UserManager import SUPER_USER
manage_addERP5RoleManagerForm = PageTemplateFile(
'www/ERP5Security_addERP5RoleManager', globals(),
__name__='manage_addERP5RoleManagerForm' )
'www/ERP5Security_addERP5RoleManager', globals(),
__name__='manage_addERP5RoleManagerForm' )
def addERP5RoleManager( dispatcher, id, title=None, REQUEST=None ):
""" Add a ERP5RoleManager to a Pluggable Auth Service. """
""" Add a ERP5RoleManager to a Pluggable Auth Service. """
erm = ERP5RoleManager(id, title)
dispatcher._setObject(erm.getId(), erm)
erm = ERP5RoleManager(id, title)
dispatcher._setObject(erm.getId(), erm)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5RoleManager+added.'
% dispatcher.absolute_url())
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5RoleManager+added.'
% dispatcher.absolute_url())
class ERP5RoleManager( BasePlugin ):
""" PAS plugin to add 'Member' as default
Role for every user.
""" PAS plugin to add 'Member' as default
Role for every user.
"""
meta_type = 'ERP5 Role Manager'
security = ClassSecurityInfo()
def __init__(self, id, title=None):
self._id = self.id = id
self.title = title
#
# IRolesPlugin implementation
#
security.declarePrivate( 'getRolesForPrincipal' )
def getRolesForPrincipal( self, principal, request=None ):
""" See IRolesPlugin.
We only ever return Member for every principal
"""
meta_type = 'ERP5 Role Manager'
security = ClassSecurityInfo()
def __init__(self, id, title=None):
self._id = self.id = id
self.title = title
#
# IRolesPlugin implementation
#
security.declarePrivate( 'getRolesForPrincipal' )
def getRolesForPrincipal( self, principal, request=None ):
""" See IRolesPlugin.
We only ever return Member for every principal
"""
if principal.getId() == SUPER_USER:
# If this is the super user, give all the roles present in this system.
# XXX no API to do this in PAS.
rolemakers = self._getPAS().plugins.listPlugins( IRoleEnumerationPlugin )
roles = []
for rolemaker_id, rolemaker in rolemakers:
roles.extend([role['id'] for role in rolemaker.enumerateRoles()])
return tuple(roles)
return ('Member',)
if principal.getId() == SUPER_USER:
# If this is the super user, give all the roles present in this system.
# XXX no API to do this in PAS.
rolemakers = self._getPAS().plugins.listPlugins( IRoleEnumerationPlugin )
roles = []
for rolemaker_id, rolemaker in rolemakers:
roles.extend([role['id'] for role in rolemaker.enumerateRoles()])
return tuple(roles)
return ('Member',)
classImplements( ERP5RoleManager
, IRolesPlugin
......
......@@ -61,7 +61,7 @@ class ERP5User(PropertiedUser):
principal_ids = list( group_ids )
principal_ids.insert( 0, user_id )
local ={}
local = {}
object = aq_inner( object )
while 1:
......@@ -98,102 +98,102 @@ class ERP5User(PropertiedUser):
return list( self.getRoles() ) + local.keys()
def allowed( self, object, object_roles=None ):
""" Check whether the user has access to object.
As for getRolesInContext, we take into account _getAcquireLocalRoles for
ERP5.
"""
if self.getUserName() == SUPER_USER:
# super user is allowed to accesss any object
return 1
if object_roles is _what_not_even_god_should_do:
return 0
""" Check whether the user has access to object.
As for getRolesInContext, we take into account _getAcquireLocalRoles for
ERP5.
"""
if self.getUserName() == SUPER_USER:
# super user is allowed to accesss any object
return 1
if object_roles is _what_not_even_god_should_do:
return 0
# Short-circuit the common case of anonymous access.
if object_roles is None or 'Anonymous' in object_roles:
return 1
# Check for Developer Role, see patches.User for rationale
# XXX-arnau: copy/paste
object_roles = set(object_roles)
if 'Developer' in object_roles:
object_roles.remove('Developer')
product_config = getattr(getConfiguration(), 'product_config', None)
if product_config:
config = product_config.get('erp5')
if config and self.getId() in config.developer_list:
return 1
# Short-circuit the common case of anonymous access.
# Provide short-cut access if object is protected by 'Authenticated'
# role and user is not nobody
if 'Authenticated' in object_roles and (
self.getUserName() != 'Anonymous User'):
return 1
# Check for ancient role data up front, convert if found.
# This should almost never happen, and should probably be
# deprecated at some point.
if 'Shared' in object_roles:
object_roles = self._shared_roles(object)
if object_roles is None or 'Anonymous' in object_roles:
return 1
# Check for Developer Role, see patches.User for rationale
# XXX-arnau: copy/paste
object_roles = set(object_roles)
if 'Developer' in object_roles:
object_roles.remove('Developer')
product_config = getattr(getConfiguration(), 'product_config', None)
if product_config:
config = product_config.get('erp5')
if config and self.getId() in config.developer_list:
return 1
# Provide short-cut access if object is protected by 'Authenticated'
# role and user is not nobody
if 'Authenticated' in object_roles and (
self.getUserName() != 'Anonymous User'):
return 1
# Check for ancient role data up front, convert if found.
# This should almost never happen, and should probably be
# deprecated at some point.
if 'Shared' in object_roles:
object_roles = self._shared_roles(object)
if object_roles is None or 'Anonymous' in object_roles:
# Check for a role match with the normal roles given to
# the user, then with local roles only if necessary. We
# want to avoid as much overhead as possible.
user_roles = self.getRoles()
for role in object_roles:
if role in user_roles:
if self._check_context(object):
return 1
return None
# Still have not found a match, so check local roles. We do
# this manually rather than call getRolesInContext so that
# we can incur only the overhead required to find a match.
inner_obj = aq_inner( object )
user_id = self.getId()
# [ x.getId() for x in self.getGroups() ]
group_ids = self.getGroups()
principal_ids = list( group_ids )
principal_ids.insert( 0, user_id )
while 1:
local_roles = getattr( inner_obj, '__ac_local_roles__', None )
if local_roles:
if callable( local_roles ):
local_roles = local_roles()
dict = local_roles or {}
for principal_id in principal_ids:
local_roles = dict.get( principal_id, [] )
for role in object_roles:
if role in local_roles:
if self._check_context( object ):
return 1
return 0
# patch by Klaus for LocalRole blocking
if getattr(inner_obj, '_getAcquireLocalRoles', None) is not None:
if not inner_obj._getAcquireLocalRoles():
break
inner = aq_inner( inner_obj )
parent = aq_parent( inner )
if parent is not None:
inner_obj = parent
continue
new = getattr( inner_obj, 'im_self', None )
if new is not None:
inner_obj = aq_inner( new )
continue
break
# Check for a role match with the normal roles given to
# the user, then with local roles only if necessary. We
# want to avoid as much overhead as possible.
user_roles = self.getRoles()
for role in object_roles:
if role in user_roles:
if self._check_context(object):
return 1
return None
# Still have not found a match, so check local roles. We do
# this manually rather than call getRolesInContext so that
# we can incur only the overhead required to find a match.
inner_obj = aq_inner( object )
user_id = self.getId()
# [ x.getId() for x in self.getGroups() ]
group_ids = self.getGroups()
principal_ids = list( group_ids )
principal_ids.insert( 0, user_id )
while 1:
local_roles = getattr( inner_obj, '__ac_local_roles__', None )
if local_roles:
if callable( local_roles ):
local_roles = local_roles()
dict = local_roles or {}
for principal_id in principal_ids:
local_roles = dict.get( principal_id, [] )
for role in object_roles:
if role in local_roles:
if self._check_context( object ):
return 1
return 0
# patch by Klaus for LocalRole blocking
if getattr(inner_obj, '_getAcquireLocalRoles', None) is not None:
if not inner_obj._getAcquireLocalRoles():
break
inner = aq_inner( inner_obj )
parent = aq_parent( inner )
if parent is not None:
inner_obj = parent
continue
new = getattr( inner_obj, 'im_self', None )
if new is not None:
inner_obj = aq_inner( new )
continue
break
return None
return None
InitializeClass(ERP5User)
......
......@@ -38,8 +38,8 @@ from zLOG import LOG, PROBLEM
SUPER_USER = '__erp5security-=__'
manage_addERP5UserManagerForm = PageTemplateFile(
'www/ERP5Security_addERP5UserManager', globals(),
__name__='manage_addERP5UserManagerForm' )
'www/ERP5Security_addERP5UserManager', globals(),
__name__='manage_addERP5UserManagerForm' )
def addERP5UserManager(dispatcher, id, title=None, REQUEST=None):
""" Add a ERP5UserManager to a Pluggable Auth Service. """
......@@ -48,11 +48,11 @@ def addERP5UserManager(dispatcher, id, title=None, REQUEST=None):
dispatcher._setObject(eum.getId(), eum)
if REQUEST is not None:
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5UserManager+added.'
% dispatcher.absolute_url())
REQUEST['RESPONSE'].redirect(
'%s/manage_workspace'
'?manage_tabs_message='
'ERP5UserManager+added.'
% dispatcher.absolute_url())
class _AuthenticationFailure(Exception):
"""Raised when authentication failed, to prevent caching the fact that a user
......@@ -72,9 +72,9 @@ def getUserByLogin(portal, login, exact_match=True):
if not (portal.portal_catalog.hasColumn('portal_type') and portal.portal_catalog.hasColumn('reference')):
raise RuntimeError('Catalog does not have column information. Make sure RDB is working and disk is not full.')
result = portal.portal_catalog.unrestrictedSearchResults(
select_expression='reference',
portal_type="Person",
reference=dict(query=login, key=reference_key))
select_expression='reference',
portal_type="Person",
reference=dict(query=login, key=reference_key))
# XXX: Here, we filter catalog result list ALTHOUGH we did pass
# parameters to unrestrictedSearchResults to restrict result set.
# This is done because the following values can match person with
......@@ -95,180 +95,181 @@ def getUserByLogin(portal, login, exact_match=True):
class ERP5UserManager(BasePlugin):
""" PAS plugin for managing users in ERP5
"""
""" PAS plugin for managing users in ERP5
"""
meta_type = 'ERP5 User Manager'
security = ClassSecurityInfo()
def __init__(self, id, title=None):
meta_type = 'ERP5 User Manager'
security = ClassSecurityInfo()
def __init__(self, id, title=None):
self._id = self.id = id
self.title = title
#
# IAuthenticationPlugin implementation
#
security.declarePrivate( 'authenticateCredentials' )
def authenticateCredentials(self, credentials):
""" See IAuthenticationPlugin.
o We expect the credentials to be those returned by
ILoginPasswordExtractionPlugin.
"""
login = credentials.get('login')
ignore_password = False
if not login:
# fallback to support plugins using external tools to extract login
# those are not using login/password pair, they just extract login
# from remote system (eg. SSL certificates)
login = credentials.get('external_login')
ignore_password = True
# Forbidden the usage of the super user.
if login == SUPER_USER:
return None
@UnrestrictedMethod
def _authenticateCredentials(login, password, path,
ignore_password=False):
if not login or not (password or ignore_password):
return None
user_list = self.getUserByLogin(login)
if not user_list:
raise _AuthenticationFailure()
user = user_list[0]
try:
# get assignment
assignment_list = [x for x in user.contentValues(portal_type="Assignment") if x.getValidationState() == "open"]
valid_assignment_list = []
# check dates if exist
login_date = DateTime()
for assignment in assignment_list:
if assignment.getStartDate() is not None and \
assignment.getStartDate() > login_date:
continue
if assignment.hasStopDate() and \
assignment.getStopDate() < login_date:
continue
valid_assignment_list.append(assignment)
if (ignore_password or pw_validate(user.getPassword(), password)) and \
len(valid_assignment_list) and user \
.getValidationState() != 'deleted': #user.getCareerRole() == 'internal':
return login, login # use same for user_id and login
finally:
pass
raise _AuthenticationFailure()
_authenticateCredentials = CachingMethod(_authenticateCredentials,
id='ERP5UserManager_authenticateCredentials',
cache_factory='erp5_content_short')
try:
authentication_result = _authenticateCredentials(
login=login,
password=credentials.get('password'),
path=self.getPhysicalPath(),
ignore_password=ignore_password)
except _AuthenticationFailure:
authentication_result = None
if not self.getPortalObject().portal_preferences.isAuthenticationPolicyEnabled():
# stop here, no authentication policy enabled
# so just return authentication check result
return authentication_result
# authentication policy enabled, we need person object anyway
user_list = self.getUserByLogin(credentials.get('login'))
if not user_list:
# not an ERP5 Person object
return None
user = user_list[0]
if authentication_result is None:
# file a failed authentication attempt
user.notifyLoginFailure()
return None
# check if password is expired
if user.isPasswordExpired():
user.notifyPasswordExpire()
return None
# check if user account is blocked
if user.isLoginBlocked():
return None
return authentication_result
#
# IUserEnumerationPlugin implementation
#
security.declarePrivate( 'enumerateUsers' )
def enumerateUsers(self, id=None, login=None, exact_match=False,
sort_by=None, max_results=None, **kw):
""" See IUserEnumerationPlugin.
"""
if id is None:
id = login
if isinstance(id, str):
id = (id,)
if isinstance(id, list):
id = tuple(id)
user_info = []
plugin_id = self.getId()
id_list = []
for user_id in id:
if SUPER_USER == user_id:
info = { 'id' : SUPER_USER
, 'login' : SUPER_USER
, 'pluginid' : plugin_id
}
user_info.append(info)
else:
id_list.append(user_id)
if id_list:
for user in self.getUserByLogin(tuple(id_list), exact_match=exact_match):
info = { 'id' : user.getReference()
, 'login' : user.getReference()
, 'pluginid' : plugin_id
}
user_info.append(info)
return tuple(user_info)
def getUserByLogin(self, login, exact_match=True):
# Search the Catalog for login and return a list of person objects
# login can be a string or a list of strings
# (no docstring to prevent publishing)
if not login:
return []
if isinstance(login, list):
login = tuple(login)
elif not isinstance(login, tuple):
login = str(login)
try:
return getUserByLogin(self.getPortalObject(), login, exact_match)
except ConflictError:
raise
except:
LOG('ERP5Security', PROBLEM, 'getUserByLogin failed', error=sys.exc_info())
# Here we must raise an exception to prevent callers from caching
# a result of a degraded situation.
# The kind of exception does not matter as long as it's catched by
# PAS and causes a lookup using another plugin or user folder.
# As PAS does not define explicitely such exception, we must use
# the _SWALLOWABLE_PLUGIN_EXCEPTIONS list.
raise _SWALLOWABLE_PLUGIN_EXCEPTIONS[0]
self._id = self.id = id
self.title = title
#
# IAuthenticationPlugin implementation
#
security.declarePrivate( 'authenticateCredentials' )
def authenticateCredentials(self, credentials):
""" See IAuthenticationPlugin.
o We expect the credentials to be those returned by
ILoginPasswordExtractionPlugin.
"""
login = credentials.get('login')
ignore_password = False
if not login:
# fallback to support plugins using external tools to extract login
# those are not using login/password pair, they just extract login
# from remote system (eg. SSL certificates)
login = credentials.get('external_login')
ignore_password = True
# Forbidden the usage of the super user.
if login == SUPER_USER:
return None
@UnrestrictedMethod
def _authenticateCredentials(login, password, path,
ignore_password=False):
if not login or not (password or ignore_password):
return None
user_list = self.getUserByLogin(login)
if not user_list:
raise _AuthenticationFailure()
user = user_list[0]
try:
# get assignment
assignment_list = [x for x in user.contentValues(portal_type="Assignment") if x.getValidationState() == "open"]
valid_assignment_list = []
# check dates if exist
login_date = DateTime()
for assignment in assignment_list:
if assignment.getStartDate() is not None and \
assignment.getStartDate() > login_date:
continue
if assignment.hasStopDate() and \
assignment.getStopDate() < login_date:
continue
valid_assignment_list.append(assignment)
if (ignore_password or pw_validate(user.getPassword(), password)) and \
len(valid_assignment_list) and user \
.getValidationState() != 'deleted': #user.getCareerRole() == 'internal':
return login, login # use same for user_id and login
finally:
pass
raise _AuthenticationFailure()
_authenticateCredentials = CachingMethod(
_authenticateCredentials,
id='ERP5UserManager_authenticateCredentials',
cache_factory='erp5_content_short')
try:
authentication_result = _authenticateCredentials(
login=login,
password=credentials.get('password'),
path=self.getPhysicalPath(),
ignore_password=ignore_password)
except _AuthenticationFailure:
authentication_result = None
if not self.getPortalObject().portal_preferences.isAuthenticationPolicyEnabled():
# stop here, no authentication policy enabled
# so just return authentication check result
return authentication_result
# authentication policy enabled, we need person object anyway
user_list = self.getUserByLogin(credentials.get('login'))
if not user_list:
# not an ERP5 Person object
return None
user = user_list[0]
if authentication_result is None:
# file a failed authentication attempt
user.notifyLoginFailure()
return None
# check if password is expired
if user.isPasswordExpired():
user.notifyPasswordExpire()
return None
# check if user account is blocked
if user.isLoginBlocked():
return None
return authentication_result
#
# IUserEnumerationPlugin implementation
#
security.declarePrivate( 'enumerateUsers' )
def enumerateUsers(self, id=None, login=None, exact_match=False,
sort_by=None, max_results=None, **kw):
""" See IUserEnumerationPlugin.
"""
if id is None:
id = login
if isinstance(id, str):
id = (id,)
if isinstance(id, list):
id = tuple(id)
user_info = []
plugin_id = self.getId()
id_list = []
for user_id in id:
if SUPER_USER == user_id:
info = { 'id' : SUPER_USER
, 'login' : SUPER_USER
, 'pluginid' : plugin_id
}
user_info.append(info)
else:
id_list.append(user_id)
if id_list:
for user in self.getUserByLogin(tuple(id_list), exact_match=exact_match):
info = { 'id' : user.getReference()
, 'login' : user.getReference()
, 'pluginid' : plugin_id
}
user_info.append(info)
return tuple(user_info)
def getUserByLogin(self, login, exact_match=True):
# Search the Catalog for login and return a list of person objects
# login can be a string or a list of strings
# (no docstring to prevent publishing)
if not login:
return []
if isinstance(login, list):
login = tuple(login)
elif not isinstance(login, tuple):
login = str(login)
try:
return getUserByLogin(self.getPortalObject(), login, exact_match)
except ConflictError:
raise
except:
LOG('ERP5Security', PROBLEM, 'getUserByLogin failed', error=sys.exc_info())
# Here we must raise an exception to prevent callers from caching
# a result of a degraded situation.
# The kind of exception does not matter as long as it's catched by
# PAS and causes a lookup using another plugin or user folder.
# As PAS does not define explicitely such exception, we must use
# the _SWALLOWABLE_PLUGIN_EXCEPTIONS list.
raise _SWALLOWABLE_PLUGIN_EXCEPTIONS[0]
classImplements( ERP5UserManager
......
......@@ -74,104 +74,104 @@ registerMultiPlugin(ERP5DumbHTTPExtractionPlugin.ERP5DumbHTTPExtractionPlugin.me
def initialize(context):
context.registerClass( ERP5UserManager.ERP5UserManager
, permission=ManageUsers
, constructors=(
ERP5UserManager.manage_addERP5UserManagerForm,
ERP5UserManager.addERP5UserManager, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5GroupManager.ERP5GroupManager
, permission=ManageGroups
, constructors=(
ERP5GroupManager.manage_addERP5GroupManagerForm,
ERP5GroupManager.addERP5GroupManager, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5RoleManager.ERP5RoleManager
, permission=ManageUsers
, constructors=(
ERP5RoleManager.manage_addERP5RoleManagerForm,
ERP5RoleManager.addERP5RoleManager, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5UserFactory.ERP5UserFactory
, permission=ManageUsers
, constructors=(
ERP5UserFactory.manage_addERP5UserFactoryForm,
ERP5UserFactory.addERP5UserFactory, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5KeyAuthPlugin.ERP5KeyAuthPlugin
, permission=ManageUsers
, constructors=(
ERP5KeyAuthPlugin.manage_addERP5KeyAuthPluginForm,
ERP5KeyAuthPlugin.addERP5KeyAuthPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5ExternalAuthenticationPlugin.ERP5ExternalAuthenticationPlugin
, permission=ManageUsers
, constructors=(
ERP5ExternalAuthenticationPlugin.manage_addERP5ExternalAuthenticationPluginForm,
ERP5ExternalAuthenticationPlugin.addERP5ExternalAuthenticationPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5BearerExtractionPlugin.ERP5BearerExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5BearerExtractionPlugin.manage_addERP5BearerExtractionPluginForm,
ERP5BearerExtractionPlugin.addERP5BearerExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5ExternalOauth2ExtractionPlugin.ERP5FacebookExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5ExternalOauth2ExtractionPlugin.manage_addERP5FacebookExtractionPluginForm,
ERP5ExternalOauth2ExtractionPlugin.addERP5FacebookExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5ExternalOauth2ExtractionPlugin.ERP5GoogleExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5ExternalOauth2ExtractionPlugin.manage_addERP5GoogleExtractionPluginForm,
ERP5ExternalOauth2ExtractionPlugin.addERP5GoogleExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5AccessTokenExtractionPlugin.ERP5AccessTokenExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5AccessTokenExtractionPlugin.manage_addERP5AccessTokenExtractionPluginForm,
ERP5AccessTokenExtractionPlugin.addERP5AccessTokenExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5DumbHTTPExtractionPlugin.ERP5DumbHTTPExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5DumbHTTPExtractionPlugin.manage_addERP5DumbHTTPExtractionPluginForm,
ERP5DumbHTTPExtractionPlugin.addERP5DumbHTTPExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5UserManager.ERP5UserManager
, permission=ManageUsers
, constructors=(
ERP5UserManager.manage_addERP5UserManagerForm,
ERP5UserManager.addERP5UserManager, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5GroupManager.ERP5GroupManager
, permission=ManageGroups
, constructors=(
ERP5GroupManager.manage_addERP5GroupManagerForm,
ERP5GroupManager.addERP5GroupManager, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5RoleManager.ERP5RoleManager
, permission=ManageUsers
, constructors=(
ERP5RoleManager.manage_addERP5RoleManagerForm,
ERP5RoleManager.addERP5RoleManager, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5UserFactory.ERP5UserFactory
, permission=ManageUsers
, constructors=(
ERP5UserFactory.manage_addERP5UserFactoryForm,
ERP5UserFactory.addERP5UserFactory, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5KeyAuthPlugin.ERP5KeyAuthPlugin
, permission=ManageUsers
, constructors=(
ERP5KeyAuthPlugin.manage_addERP5KeyAuthPluginForm,
ERP5KeyAuthPlugin.addERP5KeyAuthPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5ExternalAuthenticationPlugin.ERP5ExternalAuthenticationPlugin
, permission=ManageUsers
, constructors=(
ERP5ExternalAuthenticationPlugin.manage_addERP5ExternalAuthenticationPluginForm,
ERP5ExternalAuthenticationPlugin.addERP5ExternalAuthenticationPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5BearerExtractionPlugin.ERP5BearerExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5BearerExtractionPlugin.manage_addERP5BearerExtractionPluginForm,
ERP5BearerExtractionPlugin.addERP5BearerExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5ExternalOauth2ExtractionPlugin.ERP5FacebookExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5ExternalOauth2ExtractionPlugin.manage_addERP5FacebookExtractionPluginForm,
ERP5ExternalOauth2ExtractionPlugin.addERP5FacebookExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5ExternalOauth2ExtractionPlugin.ERP5GoogleExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5ExternalOauth2ExtractionPlugin.manage_addERP5GoogleExtractionPluginForm,
ERP5ExternalOauth2ExtractionPlugin.addERP5GoogleExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5AccessTokenExtractionPlugin.ERP5AccessTokenExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5AccessTokenExtractionPlugin.manage_addERP5AccessTokenExtractionPluginForm,
ERP5AccessTokenExtractionPlugin.addERP5AccessTokenExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
context.registerClass( ERP5DumbHTTPExtractionPlugin.ERP5DumbHTTPExtractionPlugin
, permission=ManageUsers
, constructors=(
ERP5DumbHTTPExtractionPlugin.manage_addERP5DumbHTTPExtractionPluginForm,
ERP5DumbHTTPExtractionPlugin.addERP5DumbHTTPExtractionPlugin, )
, visibility=None
, icon='www/portal.gif'
)
from AccessControl.SecurityInfo import ModuleSecurityInfo
ModuleSecurityInfo('Products.ERP5Security.ERP5UserManager').declarePublic(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment