Commit 601e46e9 authored by Jérome Perrin's avatar Jérome Perrin

Fix AccessToken login with ERP5 Login

Since the introduction of ERP5 Login, authentication by Access Token is broken, and it is only working if `erp5_login.getReference() == person.getUserId()`

The scriptable part of access token changed, now scripts must return a user object - on which the plugin will call `getUserId` (it was not clear what they should return before, maybe login, but they should return a user id, not a login, as the token plays the same role as a login). To make it clear and to intentionally break compatibility as this is now something different, these scripts have been renamed to be `getUserValue` type based methods.

/reviewed-on !838
parents d8999426 71aac680
......@@ -6,4 +6,7 @@
<item>Reference</item>
<item>Url</item>
</portal_type>
<portal_type id="Template Tool">
<item>TemplateToolERP5AccessTokenExtractionPluginConstraint</item>
</portal_type>
</property_sheet_list>
\ No newline at end of file
......@@ -2,122 +2,65 @@
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Test Component" module="erp5.portal_type"/>
<global name="Property Sheet" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<key> <string>_count</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testERP5DumbHTTPExtractionPlugin</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<value>
<none/>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testERP5DumbHTTPExtractionPlugin</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Test Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<key> <string>_mt_index</string> </key>
<value>
<none/>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<key> <string>_tree</string> </key>
<value>
<tuple/>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<key> <string>description</string> </key>
<value>
<tuple/>
<none/>
</value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
<key> <string>id</string> </key>
<value> <string>TemplateToolERP5AccessTokenExtractionPluginConstraint</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
<key> <string>portal_type</string> </key>
<value> <string>Property Sheet</string> </value>
</item>
</dictionary>
</pickle>
</record>
<record id="2" aka="AAAAAAAAAAI=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary/>
</value>
</item>
</dictionary>
<global name="Length" module="BTrees.Length"/>
</pickle>
<pickle> <int>0</int> </pickle>
</record>
<record id="3" aka="AAAAAAAAAAM=">
<pickle>
<global name="PersistentMapping" module="Persistence.mapping"/>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
</value>
</item>
</dictionary>
<none/>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
<global name="OOBTree" module="BTrees.OOBTree"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
......@@ -2,61 +2,47 @@
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="Test Component" module="erp5.portal_type"/>
<global name="Script Constraint" module="erp5.portal_type"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>_recorded_property_dict</string> </key>
<key> <string>_identity_criterion</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAI=</string> </persistent>
</value>
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testERP5AccessTokenAlarm</string> </value>
</item>
<item>
<key> <string>description</string> </key>
<key> <string>_range_criterion</string> </key>
<value>
<none/>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testERP5AccessTokenAlarm</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
<value> <string>Test Component</string> </value>
</item>
<item>
<key> <string>sid</string> </key>
<key> <string>categories</string> </key>
<value>
<none/>
<tuple>
<string>constraint_type/post_upgrade</string>
</tuple>
</value>
</item>
<item>
<key> <string>text_content_error_message</string> </key>
<key> <string>description</string> </key>
<value>
<tuple/>
<none/>
</value>
</item>
<item>
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple/>
</value>
<key> <string>id</string> </key>
<value> <string>ERP5AccessTokenExtractionPlugin_existence_constraint</string> </value>
</item>
<item>
<key> <string>version</string> </key>
<value> <string>erp5</string> </value>
<key> <string>portal_type</string> </key>
<value> <string>Script Constraint</string> </value>
</item>
<item>
<key> <string>workflow_history</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAM=</string> </persistent>
</value>
<key> <string>script_id</string> </key>
<value> <string>TemplateTool_checkERP5AccessTokenExtractionPluginExistenceConsistency</string> </value>
</item>
</dictionary>
</pickle>
......@@ -85,39 +71,10 @@
<item>
<key> <string>data</string> </key>
<value>
<dictionary>
<item>
<key> <string>component_validation_workflow</string> </key>
<value>
<persistent> <string encoding="base64">AAAAAAAAAAQ=</string> </persistent>
</value>
</item>
</dictionary>
<dictionary/>
</value>
</item>
</dictionary>
</pickle>
</record>
<record id="4" aka="AAAAAAAAAAQ=">
<pickle>
<global name="WorkflowHistoryList" module="Products.ERP5Type.patches.WorkflowTool"/>
</pickle>
<pickle>
<tuple>
<none/>
<list>
<dictionary>
<item>
<key> <string>action</string> </key>
<value> <string>validate</string> </value>
</item>
<item>
<key> <string>validation_state</string> </key>
<value> <string>validated</string> </value>
</item>
</dictionary>
</list>
</tuple>
</pickle>
</record>
</ZopeData>
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>AccessToken_getExternalLogin</string> </value>
<value> <string>AccessToken_getUserValue</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -14,7 +14,7 @@ if access_token_document.getValidationState() == 'validated':
agent_document = access_token_document.getAgentValue()
if agent_document is not None:
result = agent_document.Person_getUserId()
result = agent_document
comment = "Token usage accepted"
access_token_document.invalidate(comment=comment)
......
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>OneTimeRestrictedAccessToken_getExternalLogin</string> </value>
<value> <string>OneTimeRestrictedAccessToken_getUserValue</string> </value>
</item>
</dictionary>
</pickle>
......
......@@ -22,6 +22,29 @@ if access_token_document.getValidationState() == 'validated':
agent_document = access_token_document.getAgentValue()
if agent_document is not None:
result = agent_document.Person_getUserId()
if agent_document.getPortalType() == 'Person':
# if this is a token for a person, only make accept if person has valid
# assignments and a validated login (for compatibility with login/password
# authentication)
if agent_document.getValidationState() == 'deleted':
return None
now = DateTime()
for assignment in agent_document.contentValues(portal_type='Assignment'):
if assignment.getValidationState() == "open" and (
not assignment.hasStartDate() or assignment.getStartDate() <= now
) and (
not assignment.hasStopDate() or assignment.getStopDate() >= now
):
break
else:
return None
user, = context.getPortalObject().acl_users.searchUsers(
exact_match=True,
id=agent_document.Person_getUserId())
if not user['login_list']:
return None
result = agent_document
return result
......@@ -54,7 +54,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>RestrictedAccessToken_getExternalLogin</string> </value>
<value> <string>RestrictedAccessToken_getUserValue</string> </value>
</item>
</dictionary>
</pickle>
......
alpha = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
random_id = ''
for i in range(0,128):
for _ in range(0, 128):
random_id += random.choice(alpha)
# Define Reference from ID provided by portal_ids
......
acl_users = context.getPortalObject().acl_users
token_extraction_id = "erp5_access_token_plugin"
access_token_plugin_list = [
plugin for plugin in acl_users.objectValues()
if plugin.meta_type == 'ERP5 Access Token Extraction Plugin']
if len(access_token_plugin_list) > 1:
return ["More than one plugin found: %s" % access_token_plugin_list]
error_list = []
if not access_token_plugin_list:
# A dumb http extraction plugin is required as fallback if we use an access token
# since https://github.com/Nexedi/erp5/commit/0bee523da0075c6efe3c06296dddd01d9dd5045a
# we enable it automatically at site creation, but for compatibility with old instances
# make sure it is created if needed
if 'erp5_dumb_http_extraction' not in acl_users.objectIds():
error_list.append("erp5_dumb_http_extraction is missing")
if fixit:
dispacher = acl_users.manage_addProduct['ERP5Security']
dispacher.addERP5DumbHTTPExtractionPlugin('erp5_dumb_http_extraction')
acl_users.erp5_dumb_http_extraction.manage_activateInterfaces(('IExtractionPlugin', ))
error_list.append("erp5_access_token_plugin is missing")
if fixit:
dispacher = acl_users.manage_addProduct['ERP5Security']
dispacher.addERP5AccessTokenExtractionPlugin(token_extraction_id)
access_token_plugin_list = [getattr(acl_users, token_extraction_id)]
if access_token_plugin_list:
access_token_plugin, = access_token_plugin_list
# We only check that our plugin is enabled for IAuthenticationPlugin, this covers both
# cases where plugin was not enabled at all or was enabled only for IExtractionPlugin
IAuthenticationPlugin = [
# Products.PluggableAuthService.interfaces.plugins.IAuthenticationPlugin cannot
# be imported in restricted python but we can get it this way.
x for x in acl_users.plugins.listPluginTypeInfo()
if x['id'] == 'IAuthenticationPlugin'][0]['interface']
if (access_token_plugin.getId()
not in acl_users.plugins.listPluginIds(IAuthenticationPlugin)):
error_list.append("erp5_access_token_plugin is not activated")
if fixit:
access_token_plugin.manage_activateInterfaces((
'IExtractionPlugin',
'IAuthenticationPlugin',))
return error_list
<?xml version="1.0"?>
<ZopeData>
<record id="1" aka="AAAAAAAAAAE=">
<pickle>
<global name="PythonScript" module="Products.PythonScripts.PythonScript"/>
</pickle>
<pickle>
<dictionary>
<item>
<key> <string>Script_magic</string> </key>
<value> <int>3</int> </value>
</item>
<item>
<key> <string>_bind_names</string> </key>
<value>
<object>
<klass>
<global name="NameAssignments" module="Shared.DC.Scripts.Bindings"/>
</klass>
<tuple/>
<state>
<dictionary>
<item>
<key> <string>_asgns</string> </key>
<value>
<dictionary>
<item>
<key> <string>name_container</string> </key>
<value> <string>container</string> </value>
</item>
<item>
<key> <string>name_context</string> </key>
<value> <string>context</string> </value>
</item>
<item>
<key> <string>name_m_self</string> </key>
<value> <string>script</string> </value>
</item>
<item>
<key> <string>name_subpath</string> </key>
<value> <string>traverse_subpath</string> </value>
</item>
</dictionary>
</value>
</item>
</dictionary>
</state>
</object>
</value>
</item>
<item>
<key> <string>_params</string> </key>
<value> <string>fixit=False</string> </value>
</item>
<item>
<key> <string>id</string> </key>
<value> <string>TemplateTool_checkERP5AccessTokenExtractionPluginExistenceConsistency</string> </value>
</item>
</dictionary>
</pickle>
</record>
</ZopeData>
# Copyright (c) 2002-2013 Nexedi SA and Contributors. All Rights Reserved.
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2002-2019 Nexedi SA and Contributors. All Rights Reserved.
# Tristan Cavelier <tristan.cavelier@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from Products.PluggableAuthService.interfaces.plugins import IAuthenticationPlugin
from DateTime import DateTime
import base64
import StringIO
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
import transaction
class TestERP5AccessTokenSkins(ERP5TypeTestCase):
from Products.ERP5Security.ERP5DumbHTTPExtractionPlugin import ERP5DumbHTTPExtractionPlugin
test_token_extraction_id = 'test_erp5_access_token_extraction'
class AccessTokenTestCase(ERP5TypeTestCase):
def getBusinessTemplateList(self):
return ('erp5_base',
'erp5_access_token')
def _createPerson(self, new_id):
"""Creates a person in person module, and returns the object, after
indexing is done. """
person_module = self.getPersonModule()
person = person_module.newContent(portal_type='Person',
reference='TESTP-' + new_id)
person.newContent(portal_type='Assignment').open()
person.newContent(portal_type='ERP5 Login', reference=new_id).validate()
self.tic()
return person
class TestERP5AccessTokenSkins(AccessTokenTestCase):
def generateNewId(self):
return str(self.portal.portal_ids.generateNewId(
id_group=('erp5_access_token_test_id')))
......@@ -19,40 +64,15 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
"""
This is ran before anything, used to set the environment
"""
self.portal = self.getPortalObject()
self.new_id = self.generateNewId()
self._setupAccessTokenExtraction()
transaction.commit()
self.portal.portal_templates.TemplateTool_checkERP5AccessTokenExtractionPluginExistenceConsistency(
fixit=True)
self.tic()
def _setupAccessTokenExtraction(self):
pas = self.portal.acl_users
access_extraction_list = [q for q in pas.objectValues() \
if q.meta_type == 'ERP5 Access Token Extraction Plugin']
if len(access_extraction_list) == 0:
dispacher = pas.manage_addProduct['ERP5Security']
dispacher.addERP5AccessTokenExtractionPlugin(self.test_token_extraction_id)
getattr(pas, self.test_token_extraction_id).manage_activateInterfaces(
('IExtractionPlugin',))
elif len(access_extraction_list) == 1:
self.test_token_extraction_id = access_extraction_list[0].getId()
elif len(access_extraction_list) > 1:
raise ValueError
transaction.commit()
def _createPerson(self, new_id):
"""Creates a person in person module, and returns the object, after
indexing is done. """
person_module = self.getPersonModule()
person = person_module.newContent(portal_type='Person',
user_id='TESTP-' + new_id)
person.newContent(portal_type = 'Assignment').open()
transaction.commit()
return person
def _getTokenCredential(self, request):
plugin = getattr(self.portal.acl_users, self.test_token_extraction_id)
return plugin.extractCredentials(request)
"""Authenticate the request and return (user_id, login) or None if not authorized."""
plugin = self.portal.acl_users.erp5_access_token_plugin
return plugin.authenticateCredentials(plugin.extractCredentials(request))
def _createRestrictedAccessToken(self, new_id, person, method, url_string):
access_token = self.portal.access_token_module.newContent(
......@@ -75,7 +95,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
return access_token
def test_working_token(self):
person = self.person = self._createPerson(self.new_id)
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -91,10 +111,14 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = self._getTokenCredential(self.portal.REQUEST)
self.assertEqual(result.get('external_login'), person.Person_getUserId())
self.assertTrue(result)
user_id, login = result
self.assertEqual(user_id, person.Person_getUserId())
# tokens have a login value, for auditing purposes
self.assertEqual(access_token.getRelativeUrl(), login)
def test_bad_token(self):
person = self.person = self._createPerson(self.new_id)
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -110,10 +134,13 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = self._getTokenCredential(self.portal.REQUEST)
self.assertEqual(result, {})
self.assertFalse(result)
def test_RestrictedAccessToken_getExternalLogin(self):
person = self.person = self._createPerson(self.new_id)
def test_token_without_assignment(self):
# Token does not work when person has no open assignment
person = self._createPerson(self.new_id)
for assignment in person.contentValues(portal_type='Assignment'):
assignment.close()
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -123,17 +150,58 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
access_token.validate()
self.tic()
self.portal.REQUEST.form["access_token"] = access_token.getId()
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = self._getTokenCredential(self.portal.REQUEST)
self.assertFalse(result)
def test_token_without_login(self):
# Token does not work when person has no validated login
person = self._createPerson(self.new_id)
for login in person.contentValues(portal_type='ERP5 Login'):
login.invalidate()
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
person,
access_method,
access_url)
access_token.validate()
self.tic()
self.portal.REQUEST.form["access_token"] = access_token.getId()
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = self._getTokenCredential(self.portal.REQUEST)
self.assertFalse(result)
self.assertEqual(result, person.Person_getUserId())
def test_RestrictedAccessToken_getUserValue(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
person,
access_method,
access_url)
access_token.validate()
self.tic()
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, person)
self.assertEqual(access_token.getValidationState(), 'validated')
def test_RestrictedAccessToken_getExternalLogin_access_token_secret(self):
person = self.person = self._createPerson(self.new_id)
def test_RestrictedAccessToken_getUserValue_access_token_secret(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -146,7 +214,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
self.portal.REQUEST.form["access_token_secret"] = "XYXYXYXY"
......@@ -154,12 +222,12 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(result, person)
self.assertEqual(access_token.getValidationState(), 'validated')
def test_RestrictedAccessToken_getExternalLogin_no_agent(self):
def test_RestrictedAccessToken_getUserValue_no_agent(self):
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -173,11 +241,11 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
def test_RestrictedAccessToken_getExternalLogin_wrong_values(self):
person = self.person = self._createPerson(self.new_id)
def test_RestrictedAccessToken_getUserValue_wrong_values(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createRestrictedAccessToken(self.new_id,
......@@ -185,7 +253,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
access_method,
access_url)
self.tic()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
access_token.validate()
......@@ -195,23 +263,23 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["ACTUAL_URL"] = access_url
self.portal.REQUEST.form["access_token_secret"] = access_token.getReference()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
self.portal.REQUEST["ACTUAL_URL"] = "http://exemple.com/foo.bar"
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
access_token.invalidate()
self.tic()
result = access_token.RestrictedAccessToken_getExternalLogin()
result = access_token.RestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
def test_OneTimeRestrictedAccessToken_getExternalLogin(self):
person = self.person = self._createPerson(self.new_id)
def test_OneTimeRestrictedAccessToken_getUserValue(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "GET"
access_token = self._createOneTimeRestrictedAccessToken(self.new_id,
......@@ -224,13 +292,13 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["REQUEST_METHOD"] = access_method
self.portal.REQUEST["ACTUAL_URL"] = access_url
result = access_token.OneTimeRestrictedAccessToken_getExternalLogin()
result = access_token.OneTimeRestrictedAccessToken_getUserValue()
self.assertEqual(result, person.Person_getUserId())
self.assertEqual(result, person)
self.assertEqual(access_token.getValidationState(), 'invalidated')
def test_OneTimeRestrictedAccessToken_getExternalLogin_wrong_values(self):
person = self.person = self._createPerson(self.new_id)
def test_OneTimeRestrictedAccessToken_getUserValue_wrong_values(self):
person = self._createPerson(self.new_id)
access_url = "http://exemple.com/foo"
access_method = "POST"
access_token = self._createOneTimeRestrictedAccessToken(self.new_id,
......@@ -238,7 +306,7 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
access_method,
access_url)
self.tic()
result = access_token.OneTimeRestrictedAccessToken_getExternalLogin()
result = access_token.OneTimeRestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
access_token.validate()
......@@ -247,10 +315,123 @@ class TestERP5AccessTokenSkins(ERP5TypeTestCase):
self.portal.REQUEST["REQUEST_METHOD"] = "GET"
self.portal.REQUEST["ACTUAL_URL"] = access_url
result = access_token.OneTimeRestrictedAccessToken_getExternalLogin()
result = access_token.OneTimeRestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
self.portal.REQUEST["ACTUAL_URL"] = "http://exemple.com/foo.bar"
result = access_token.OneTimeRestrictedAccessToken_getExternalLogin()
result = access_token.OneTimeRestrictedAccessToken_getUserValue()
self.assertEqual(result, None)
class TestERP5AccessTokenAlarm(AccessTokenTestCase):
def test_alarm_old_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
access_token.workflow_history['edit_workflow'] = [{
'comment':'Fake history',
'error_message': '',
'actor': 'ERP5TypeTestCase',
'state': 'current',
'time': DateTime('2012/11/15 11:11'),
'action': 'foo_action'
}]
self.portal.portal_workflow._jumpToStateFor(access_token, 'validated')
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('invalidated', access_token.getValidationState())
self.assertEqual(
'Unused for 1 day.',
access_token.workflow_history['validation_workflow'][-1]['comment'])
def test_alarm_recent_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
self.portal.portal_workflow._jumpToStateFor(access_token, 'validated')
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('validated', access_token.getValidationState())
def test_alarm_old_non_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
access_token.workflow_history['edit_workflow'] = [{
'comment':'Fake history',
'error_message': '',
'actor': 'ERP5TypeTestCase',
'state': 'current',
'time': DateTime('2012/11/15 11:11'),
'action': 'foo_action'
}]
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('draft', access_token.getValidationState())
class TestERP5DumbHTTPExtractionPlugin(AccessTokenTestCase):
def do_fake_request(self, request_method, headers=None):
if headers is None:
headers = {}
__version__ = "0.1"
env={}
env['SERVER_NAME']='bobo.server'
env['SERVER_PORT']='80'
env['REQUEST_METHOD']=request_method
env['REMOTE_ADDR']='204.183.226.81 '
env['REMOTE_HOST']='bobo.remote.host'
env['HTTP_USER_AGENT']='Bobo/%s' % __version__
env['HTTP_HOST']='127.0.0.1'
env['SERVER_SOFTWARE']='Bobo/%s' % __version__
env['SERVER_PROTOCOL']='HTTP/1.0 '
env['HTTP_ACCEPT']='image/gif, image/x-xbitmap, image/jpeg, */* '
env['SERVER_HOSTNAME']='bobo.server.host'
env['GATEWAY_INTERFACE']='CGI/1.1 '
env['SCRIPT_NAME']='Main'
env.update(headers)
return HTTPRequest(StringIO.StringIO(), env, HTTPResponse())
def test_working_authentication(self):
request = self.do_fake_request("GET", {"HTTP_AUTHORIZATION": "Basic " + base64.b64encode("login:password")})
ret = ERP5DumbHTTPExtractionPlugin("default_extraction").extractCredentials(request)
self.assertEqual(ret, {'login': 'login', 'password': 'password', 'remote_host': 'bobo.remote.host', 'remote_address': '204.183.226.81 '})
class TestERP5AccessTokenUpgraderEnablePlugin(AccessTokenTestCase):
def afterSetUp(self):
# disable plugin if it had been enabled by another test.
acl_users = self.portal.acl_users
acl_users.manage_delObjects(ids=[
x.getId() for x in
acl_users.objectValues(spec=('ERP5 Access Token Extraction Plugin',))])
self.commit()
def test_post_upgrade_constraint_enable_plugin(self):
consistency_list = self.portal.portal_templates.checkConsistency(
filter={"constraint_type": "post_upgrade"})
self.assertIn(
'erp5_access_token_plugin is missing',
[x.message for x in consistency_list])
self.portal.portal_templates.checkConsistency(
fixit=True,
filter={"constraint_type": "post_upgrade"})
self.commit()
self.assertIn(
'erp5_access_token_plugin',
self.portal.acl_users.plugins.listPluginIds(IAuthenticationPlugin))
\ No newline at end of file
......@@ -14,7 +14,7 @@
</item>
<item>
<key> <string>default_reference</string> </key>
<value> <string>testERP5AccessTokenSkins</string> </value>
<value> <string>testERP5AccessToken</string> </value>
</item>
<item>
<key> <string>description</string> </key>
......@@ -24,7 +24,7 @@
</item>
<item>
<key> <string>id</string> </key>
<value> <string>test.erp5.testERP5AccessTokenSkins</string> </value>
<value> <string>test.erp5.testERP5AccessToken</string> </value>
</item>
<item>
<key> <string>portal_type</string> </key>
......
# Copyright (c) 2002-2013 Nexedi SA and Contributors. All Rights Reserved.
from DateTime import DateTime
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
class TestERP5AccessTokenAlarm(ERP5TypeTestCase):
def getBusinessTemplateList(self):
return ('erp5_base',
'erp5_access_token')
def test_alarm_old_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
access_token.workflow_history['edit_workflow'] = [{
'comment':'Fake history',
'error_message': '',
'actor': 'ERP5TypeTestCase',
'state': 'current',
'time': DateTime('2012/11/15 11:11'),
'action': 'foo_action'
}]
self.portal.portal_workflow._jumpToStateFor(access_token, 'validated')
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('invalidated', access_token.getValidationState())
self.assertEqual(
'Unused for 1 day.',
access_token.workflow_history['validation_workflow'][-1]['comment'])
def test_alarm_recent_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
self.portal.portal_workflow._jumpToStateFor(access_token, 'validated')
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('validated', access_token.getValidationState())
def test_alarm_old_non_validated_restricted_access_token(self):
access_token = self.portal.access_token_module.newContent(
portal_type="One Time Restricted Access Token",
)
access_token.workflow_history['edit_workflow'] = [{
'comment':'Fake history',
'error_message': '',
'actor': 'ERP5TypeTestCase',
'state': 'current',
'time': DateTime('2012/11/15 11:11'),
'action': 'foo_action'
}]
self.tic()
self.portal.portal_alarms.\
erp5_garbage_collect_one_time_restricted_access_token.activeSense()
self.tic()
self.assertEqual('draft', access_token.getValidationState())
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2015 Nexedi SA and Contributors. All Rights Reserved.
# Tristan Cavelier <tristan.cavelier@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from Products.ERP5Security.ERP5DumbHTTPExtractionPlugin import ERP5DumbHTTPExtractionPlugin
import base64
import transaction
import StringIO
class TestERP5DumbHTTPExtractionPlugin(ERP5TypeTestCase):
test_id = 'test_erp5_dumb_http_extraction'
def getBusinessTemplateList(self):
return ('erp5_base',)
def generateNewId(self):
return str(self.portal.portal_ids.generateNewId(
id_group=('erp5_dumb_http_test_id')))
def afterSetUp(self):
"""
This is ran before anything, used to set the environment
"""
self.portal = self.getPortalObject()
self.new_id = self.generateNewId()
self._setupDumbHTTPExtraction()
transaction.commit()
self.tic()
def do_fake_request(self, request_method, headers={}):
__version__ = "0.1"
env={}
env['SERVER_NAME']='bobo.server'
env['SERVER_PORT']='80'
env['REQUEST_METHOD']=request_method
env['REMOTE_ADDR']='204.183.226.81 '
env['REMOTE_HOST']='bobo.remote.host'
env['HTTP_USER_AGENT']='Bobo/%s' % __version__
env['HTTP_HOST']='127.0.0.1'
env['SERVER_SOFTWARE']='Bobo/%s' % __version__
env['SERVER_PROTOCOL']='HTTP/1.0 '
env['HTTP_ACCEPT']='image/gif, image/x-xbitmap, image/jpeg, */* '
env['SERVER_HOSTNAME']='bobo.server.host'
env['GATEWAY_INTERFACE']='CGI/1.1 '
env['SCRIPT_NAME']='Main'
env.update(headers)
return HTTPRequest(StringIO.StringIO(), env, HTTPResponse())
def _setupDumbHTTPExtraction(self):
pas = self.portal.acl_users
access_extraction_list = [q for q in pas.objectValues() \
if q.meta_type == 'ERP5 Dumb HTTP Extraction Plugin']
if len(access_extraction_list) == 0:
dispacher = pas.manage_addProduct['ERP5Security']
dispacher.addERP5DumbHTTPExtractionPlugin(self.test_id)
getattr(pas, self.test_id).manage_activateInterfaces(
('IExtractionPlugin',))
elif len(access_extraction_list) == 1:
self.test_id = access_extraction_list[0].getId()
elif len(access_extraction_list) > 1:
raise ValueError
transaction.commit()
def _createPerson(self, new_id, password=None):
"""Creates a person in person module, and returns the object, after
indexing is done. """
person_module = self.getPersonModule()
person = person_module.newContent(portal_type='Person',
reference='TESTP-' + new_id)
if password:
person.setPassword(password)
person.newContent(portal_type = 'Assignment').open()
transaction.commit()
return person
def test_working_authentication(self):
person = self.person = self._createPerson(self.new_id, "test")
request = self.do_fake_request("GET", {"HTTP_AUTHORIZATION": "Basic " + base64.b64encode("%s:test" % self.new_id)})
ret = ERP5DumbHTTPExtractionPlugin("default_extraction").extractCredentials(request)
self.assertEquals(ret, {'login': self.new_id, 'password': 'test', 'remote_host': 'bobo.remote.host', 'remote_address': '204.183.226.81 '})
One Time Restricted Access Token | Url
Restricted Access Token | Reference
Restricted Access Token | Url
Template Tool | TemplateToolERP5AccessTokenExtractionPluginConstraint
\ No newline at end of file
TemplateToolERP5AccessTokenExtractionPluginConstraint
\ No newline at end of file
test.erp5.testERP5AccessTokenAlarm
test.erp5.testERP5AccessTokenSkins
test.erp5.testERP5DumbHTTPExtractionPlugin
\ No newline at end of file
test.erp5.testERP5AccessToken
\ No newline at end of file
......@@ -77,7 +77,7 @@ class SupportRequestTestCase(ERP5TypeTestCase, object):
self.user.newContent(
id='erp5_login',
portal_type='ERP5 Login',
reference=self.user.getUserId(), # XXX workaround until https://lab.nexedi.com/nexedi/erp5/merge_requests/478
reference=self.id(),
password=self.user_password
).validate()
self.user.validate()
......@@ -380,18 +380,10 @@ class TestSupportRequestCommentOnExistingSupportRequest(SupportRequestTestCase):
class SupportRequestRSSTestCase(SupportRequestTestCase):
# XXX token PAS plugin is not set up automatically when installing erp5_access_token
# so we set it up the same way test.erp5.testERP5AccessTokenSkins is setting it up
def _setupAccessTokenExtraction(self):
pas = self.portal.acl_users
access_extraction_list = [q for q in pas.objectValues() \
if q.meta_type == 'ERP5 Access Token Extraction Plugin']
if len(access_extraction_list) == 0:
dispacher = pas.manage_addProduct['ERP5Security']
dispacher.addERP5AccessTokenExtractionPlugin('token_login')
pas.token_login.manage_activateInterfaces(('IExtractionPlugin',))
elif len(access_extraction_list) > 1:
raise ValueError("Too many token extraction plugins")
# post upgrade step of erp5_access_token
self.portal.portal_templates.TemplateTool_checkERP5AccessTokenExtractionPluginExistenceConsistency(
fixit=True)
self.tic()
def afterSetUp(self):
......
......@@ -59,35 +59,35 @@ class ERP5AccessTokenExtractionPlugin(BasePlugin):
#ILoginPasswordHostExtractionPlugin#
####################################
security.declarePrivate('extractCredentials')
@UnrestrictedMethod
def extractCredentials(self, request):
""" Extract CookieHash credentials from the request header. """
""" Extract credentials from the request header. """
creds = {}
# XXX Extract from HTTP Header, URL parameter are hardcoded.
# More flexible way would be to configure on the portal type level
token = request.getHeader("X-ACCESS-TOKEN", None)
if token is None:
token = request.form.get("access_token", None)
if token is not None:
# Extract token from HTTP Header
token = request.getHeader("X-ACCESS-TOKEN", request.form.get("access_token", None))
if token:
creds['erp5_access_token_id'] = token
creds['remote_host'] = request.get('REMOTE_HOST', '')
creds['remote_address'] = request.getClientAddr()
return creds
#######################
#IAuthenticationPlugin#
#######################
security.declarePrivate('authenticateCredentials')
@UnrestrictedMethod
def authenticateCredentials(self, credentials):
""" Map credentials to a user ID. """
if 'erp5_access_token_id' in credentials:
erp5_access_token_id = credentials['erp5_access_token_id']
token_document = self.getPortalObject().access_token_module.\
_getOb(token, None)
# Access Token should be validated
# Check restricted access of URL
# Extract login information
_getOb(erp5_access_token_id, None)
if token_document is not None:
external_login = None
method = token_document._getTypeBasedMethod('getExternalLogin')
method = token_document._getTypeBasedMethod('getUserValue')
if method is not None:
external_login = method()
user_value = method()
if user_value is not None:
return (user_value.getUserId(), token_document.getRelativeUrl())
if external_login is not None:
creds['external_login'] = external_login
creds['remote_host'] = request.get('REMOTE_HOST', '')
try:
creds['remote_address'] = request.getClientAddr()
except AttributeError:
creds['remote_address'] = request.get('REMOTE_ADDR', '')
return creds
#Form for new plugin in ZMI
manage_addERP5AccessTokenExtractionPluginForm = PageTemplateFile(
......@@ -109,6 +109,7 @@ def addERP5AccessTokenExtractionPlugin(dispatcher, id, title=None, REQUEST=None)
#List implementation of class
classImplements(ERP5AccessTokenExtractionPlugin,
plugins.ILoginPasswordHostExtractionPlugin
plugins.ILoginPasswordHostExtractionPlugin,
plugins.IAuthenticationPlugin,
)
InitializeClass(ERP5AccessTokenExtractionPlugin)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment