Commit cc03d4fa authored by Jérome Perrin's avatar Jérome Perrin

ERP5Security: make plugins log a username

This works only for medusa, using the same approach as CMFCore's
CookieCrumbler
parent 9c7d18be
...@@ -33,6 +33,7 @@ from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlug ...@@ -33,6 +33,7 @@ from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlug
from DateTime import DateTime from DateTime import DateTime
import base64 import base64
import StringIO import StringIO
import mock
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Security.ERP5DumbHTTPExtractionPlugin import ERP5DumbHTTPExtractionPlugin from Products.ERP5Security.ERP5DumbHTTPExtractionPlugin import ERP5DumbHTTPExtractionPlugin
...@@ -110,12 +111,18 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase): ...@@ -110,12 +111,18 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference() self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
with mock.patch(
'Products.ERP5Security.ERP5AccessTokenExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog:
result = self._getTokenCredential(self.portal.REQUEST) result = self._getTokenCredential(self.portal.REQUEST)
self.assertTrue(result) self.assertTrue(result)
user_id, login = result user_id, login = result
self.assertEqual(user_id, person.Person_getUserId()) self.assertEqual(user_id, person.Person_getUserId())
# tokens have a login value, for auditing purposes # tokens have a login value, for auditing purposes. This is the ID of the plugin
self.assertEqual(access_token.getRelativeUrl(), login) # and the relative URL of the token.
self.assertEqual('erp5_access_token_plugin=%s' % access_token.getRelativeUrl(), login)
# this is also what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with(login, self.portal.REQUEST)
def test_bad_token(self): def test_bad_token(self):
person = self._createPerson(self.new_id) person = self._createPerson(self.new_id)
......
...@@ -129,6 +129,9 @@ class TestFacebookLogin(ERP5TypeTestCase): ...@@ -129,6 +129,9 @@ class TestFacebookLogin(ERP5TypeTestCase):
request["__ac_facebook_hash"] = response.cookies["__ac_facebook_hash"]["value"] request["__ac_facebook_hash"] = response.cookies["__ac_facebook_hash"]["value"]
with mock.patch(
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog:
credentials = self.portal.acl_users.erp5_facebook_extraction.extractCredentials(request) credentials = self.portal.acl_users.erp5_facebook_extraction.extractCredentials(request)
self.assertEqual( self.assertEqual(
'Facebook Login', 'Facebook Login',
...@@ -136,6 +139,10 @@ class TestFacebookLogin(ERP5TypeTestCase): ...@@ -136,6 +139,10 @@ class TestFacebookLogin(ERP5TypeTestCase):
self.assertEqual( self.assertEqual(
getUserId(None), getUserId(None),
credentials['external_login']) credentials['external_login'])
# this is what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with(
'erp5_facebook_extraction=%s' % getUserId(None),
request)
user_id, login = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials) user_id, login = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials)
self.assertEqual(person.getUserId(), user_id) self.assertEqual(person.getUserId(), user_id)
......
...@@ -167,6 +167,9 @@ class TestGoogleLogin(ERP5TypeTestCase): ...@@ -167,6 +167,9 @@ class TestGoogleLogin(ERP5TypeTestCase):
request["__ac_google_hash"] = response.cookies["__ac_google_hash"]["value"] request["__ac_google_hash"] = response.cookies["__ac_google_hash"]["value"]
with mock.patch(
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog:
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(request) credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(request)
self.assertEqual( self.assertEqual(
'Google Login', 'Google Login',
...@@ -174,6 +177,10 @@ class TestGoogleLogin(ERP5TypeTestCase): ...@@ -174,6 +177,10 @@ class TestGoogleLogin(ERP5TypeTestCase):
self.assertEqual( self.assertEqual(
getUserId(None), getUserId(None),
credentials['external_login']) credentials['external_login'])
# this is what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with(
'erp5_google_extraction=%s' % getUserId(None),
request)
user_id, login = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials) user_id, login = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials)
self.assertEqual(person.getUserId(), user_id) self.assertEqual(person.getUserId(), user_id)
......
...@@ -38,6 +38,8 @@ from Products.PluggableAuthService.utils import classImplements ...@@ -38,6 +38,8 @@ from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
from Products.ERP5Security import _setUserNameForAccessLog
class ERP5AccessTokenExtractionPlugin(BasePlugin): class ERP5AccessTokenExtractionPlugin(BasePlugin):
""" """
...@@ -68,6 +70,7 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin): ...@@ -68,6 +70,7 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
creds['erp5_access_token_id'] = token creds['erp5_access_token_id'] = token
creds['remote_host'] = request.get('REMOTE_HOST', '') creds['remote_host'] = request.get('REMOTE_HOST', '')
creds['remote_address'] = request.getClientAddr() creds['remote_address'] = request.getClientAddr()
creds['request'] = request
return creds return creds
####################### #######################
...@@ -86,7 +89,9 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin): ...@@ -86,7 +89,9 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
if method is not None: if method is not None:
user_value = method() user_value = method()
if user_value is not None: if user_value is not None:
return (user_value.getUserId(), token_document.getRelativeUrl()) username = '%s=%s' % (self.getId(), token_document.getRelativeUrl())
_setUserNameForAccessLog(username, credentials['request'])
return (user_value.getUserId(), username)
#Form for new plugin in ZMI #Form for new plugin in ZMI
......
...@@ -33,7 +33,8 @@ from Products.PageTemplates.PageTemplateFile import PageTemplateFile ...@@ -33,7 +33,8 @@ from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.interfaces import plugins from Products.PluggableAuthService.interfaces import plugins
from Products.PluggableAuthService.utils import classImplements from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products import ERP5Security from Products.ERP5Security import _setUserNameForAccessLog
from AccessControl.SecurityManagement import getSecurityManager, \ from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager, newSecurityManager setSecurityManager, newSecurityManager
from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE
...@@ -224,6 +225,8 @@ class ERP5ExternalOauth2ExtractionPlugin: ...@@ -224,6 +225,8 @@ class ERP5ExternalOauth2ExtractionPlugin:
creds['remote_address'] = request.getClientAddr() creds['remote_address'] = request.getClientAddr()
except AttributeError: except AttributeError:
creds['remote_address'] = request.get('REMOTE_ADDR', '') creds['remote_address'] = request.get('REMOTE_ADDR', '')
_setUserNameForAccessLog('%s=%s' % (self.getId(), creds['external_login']) , request)
return creds return creds
def getFacebookUserEntry(token): def getFacebookUserEntry(token):
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
from copy import deepcopy from copy import deepcopy
from collections import defaultdict from collections import defaultdict
from base64 import encodestring
from Acquisition import aq_inner, aq_parent from Acquisition import aq_inner, aq_parent
from AccessControl.Permissions import manage_users as ManageUsers from AccessControl.Permissions import manage_users as ManageUsers
...@@ -53,6 +54,23 @@ def mergedLocalRoles(object): ...@@ -53,6 +54,23 @@ def mergedLocalRoles(object):
break break
return deepcopy(merged) return deepcopy(merged)
def _setUserNameForAccessLog(username, REQUEST):
"""Make the current user look as `username` in Zope's Z2.log
Taken from Products.CMFCore.CookieCrumbler._setAuthHeader
"""
# Set the authorization header in the medusa http request
# so that the username can be logged to the Z2.log
try:
# Put the full-arm latex glove on now...
medusa_headers = REQUEST.RESPONSE.stdout._request._header_cache
except AttributeError:
pass
else:
medusa_headers['authorization'] = 'Basic %s' % encodestring('%s:' % username).rstrip()
def initialize(context): def initialize(context):
import ERP5UserManager import ERP5UserManager
import ERP5LoginUserManager import ERP5LoginUserManager
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment