Commit ef067a80 authored by Jérome Perrin's avatar Jérome Perrin

ERP5Security: make plugins log a username

This works only for medusa, using the same approach as CMFCore's
CookieCrumbler

simplified backport of nexedi/erp5!901

/reviewed-on https://lab.nexedi.com/nexedi/erp5-capago/merge_requests/46
parent 4ee0fb8b
......@@ -33,6 +33,7 @@ from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlug
from DateTime import DateTime
import base64
import StringIO
import mock
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Security.ERP5DumbHTTPExtractionPlugin import ERP5DumbHTTPExtractionPlugin
......@@ -110,12 +111,18 @@ class TestERP5AccessTokenSkins(AccessTokenTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = self._getTokenCredential(self.portal.REQUEST)
with mock.patch(
'Products.ERP5Security.ERP5AccessTokenExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog:
result = self._getTokenCredential(self.portal.REQUEST)
self.assertTrue(result)
user_id, login = result
self.assertEqual(user_id, person.Person_getUserId())
# tokens have a login value, for auditing purposes
self.assertEqual(access_token.getRelativeUrl(), login)
# tokens have a login value, for auditing purposes. This is the ID of the plugin
# and the relative URL of the token.
self.assertEqual('erp5_access_token_plugin=%s' % access_token.getRelativeUrl(), login)
# this is also what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with(login, self.portal.REQUEST)
def test_bad_token(self):
person = self._createPerson(self.new_id)
......
......@@ -137,11 +137,78 @@ class TestGoogleLogin(ERP5TypeTestCase):
self.assertNotIn("secret_key=", location)
self.assertIn("ERP5Site_receiveGoogleCallback", location)
def test_receive_google_callback(self):
"""
Check if ERP5 set cookie properly after receive code from external service
"""
def test_existing_user(self):
self.login()
person = self.portal.person_module.newContent(
portal_type='Person',
)
person.newContent(
portal_type='Google Login',
reference=getUserId(None)
).validate()
person.newContent(portal_type='Assignment').open()
self.tic()
self.logout()
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
cookie = self.portal.REQUEST.RESPONSE.cookies.get("__ac_google_hash")
self.assertEqual("b01533abb684a658dc71c81da4e67546", cookie["value"])
request = self.portal.REQUEST
response = request.RESPONSE
with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.func_code = getAccessTokenFromCode.func_code
getUserEntry_mock.func_code = getUserEntry.func_code
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
request["__ac_google_hash"] = response.cookies["__ac_google_hash"]["value"]
with mock.patch(
'Products.ERP5Security.ERP5ExternalOauth2ExtractionPlugin._setUserNameForAccessLog'
) as _setUserNameForAccessLog:
credentials = self.portal.acl_users.erp5_google_extraction.extractCredentials(request)
self.assertEqual(
'Google Login',
credentials['login_portal_type'])
self.assertEqual(
getUserId(None),
credentials['external_login'])
# this is what will appear in Z2.log
_setUserNameForAccessLog.assert_called_once_with(
'erp5_google_extraction=%s' % getUserId(None),
request)
user_id, login = self.portal.acl_users.erp5_login_users.authenticateCredentials(credentials)
self.assertEqual(person.getUserId(), user_id)
self.assertEqual(getUserId(None), login)
def test_auth_cookie(self):
request = self.portal.REQUEST
response = request.RESPONSE
# (the secure flag is only set if we accessed through https)
request.setServerURL('https', 'example.com')
with mock.patch(
'erp5.component.extension.GoogleLoginUtility.getAccessTokenFromCode',
side_effect=getAccessTokenFromCode,
) as getAccessTokenFromCode_mock, \
mock.patch(
'erp5.component.extension.GoogleLoginUtility.getUserEntry',
side_effect=getUserEntry
) as getUserEntry_mock:
getAccessTokenFromCode_mock.func_code = getAccessTokenFromCode.func_code
getUserEntry_mock.func_code = getUserEntry.func_code
self.portal.ERP5Site_receiveGoogleCallback(code=CODE)
getAccessTokenFromCode_mock.assert_called_once()
getUserEntry_mock.assert_called_once()
ac_cookie, = [v for (k, v) in response.listHeaders() if k.lower() == 'set-cookie' and '__ac_google_hash=' in v]
self.assertIn('; Secure', ac_cookie)
self.assertIn('; HTTPOnly', ac_cookie)
......@@ -38,6 +38,8 @@ from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.ERP5Type.UnrestrictedMethod import UnrestrictedMethod
from Products.ERP5Security import _setUserNameForAccessLog
class ERP5AccessTokenExtractionPlugin(BasePlugin):
"""
......@@ -68,6 +70,7 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
creds['erp5_access_token_id'] = token
creds['remote_host'] = request.get('REMOTE_HOST', '')
creds['remote_address'] = request.getClientAddr()
creds['request'] = request
return creds
#######################
......@@ -86,7 +89,9 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
if method is not None:
user_value = method()
if user_value is not None:
return (user_value.getUserId(), token_document.getRelativeUrl())
username = '%s=%s' % (self.getId(), token_document.getRelativeUrl())
_setUserNameForAccessLog(username, credentials['request'])
return (user_value.getUserId(), username)
#Form for new plugin in ZMI
......
......@@ -33,7 +33,8 @@ from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from Products.PluggableAuthService.interfaces import plugins
from Products.PluggableAuthService.utils import classImplements
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products import ERP5Security
from Products.ERP5Security import _setUserNameForAccessLog
from AccessControl.SecurityManagement import getSecurityManager, \
setSecurityManager, newSecurityManager
from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE
......@@ -191,6 +192,8 @@ class ERP5ExternalOauth2ExtractionPlugin:
creds['remote_address'] = request.getClientAddr()
except AttributeError:
creds['remote_address'] = request.get('REMOTE_ADDR', '')
_setUserNameForAccessLog('%s=%s' % (self.getId(), creds['external_login']) , request)
return creds
class ERP5FacebookExtractionPlugin(ERP5ExternalOauth2ExtractionPlugin, BasePlugin):
......
......@@ -16,6 +16,7 @@
"""
from copy import deepcopy
from base64 import encodestring
from AccessControl.Permissions import manage_users as ManageUsers
from Products.PluggableAuthService.PluggableAuthService import registerMultiPlugin
......@@ -52,6 +53,23 @@ def mergedLocalRoles(object):
return deepcopy(merged)
def _setUserNameForAccessLog(username, REQUEST):
"""Make the current user look as `username` in Zope's Z2.log
Taken from Products.CMFCore.CookieCrumbler._setAuthHeader
"""
# Set the authorization header in the medusa http request
# so that the username can be logged to the Z2.log
try:
# Put the full-arm latex glove on now...
medusa_headers = REQUEST.RESPONSE.stdout._request._header_cache
except AttributeError:
pass
else:
medusa_headers['authorization'] = 'Basic %s' % encodestring('%s:' % username).rstrip()
def initialize(context):
import ERP5UserManager
import ERP5LoginUserManager
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment