Commit 6dd253a0 authored by Roque's avatar Roque

(WIP) erp5_wendelin_data_lake_ingestion: security tests

parent 722b3759
from Products.ERP5Type.tests.SecurityTestCase import SecurityTestCase
from pprint import pformat
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from Products.ERP5Type import Permissions
from zExceptions import Unauthorized
import string
import random
import csv
......@@ -22,6 +27,8 @@ class TestDataIngestion(SecurityTestCase):
REF_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
REF_SUPPLIER_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
INVALID = "_invalid"
DOWNLOADER_USER_ID = 'test_downloader_B'
CONTRIBUTOR_USER_ID = 'test_contributor'
def getTitle(self):
return "DataIngestionTest"
......@@ -135,6 +142,90 @@ class TestDataIngestion(SecurityTestCase):
return data_set, [data_stream]
def createUserCredentials(self, user_id, category_list):
module = self.portal.getDefaultModule(portal_type='Credential Request')
if self.portal.CredentialRequest_checkLoginAvailability(user_id):
credential_request = module.newContent(
portal_type = "Credential Request",
first_name = "test_user",
last_name = user_id,
reference = user_id,
password = "test_password",
default_email_text = user_id + "@lake_security_test.com"
)
self.tic()
credential_request.setCategoryList(category_list)
tag = 'set_login_%s' % user_id.encode('hex')
credential_request.reindexObject(activate_kw={'tag': tag})
credential_request.submit("Automatic submit")
self.tic()
# call explicitly alarm to accept credential requests
self.portal.log("ROQUE CALLING ALARM")
self.portal.log("ROQUE portal preferences getPreferredCredentialRequestAutomaticApproval:")
self.portal.log(self.portal.portal_preferences.getPreferredCredentialRequestAutomaticApproval())
#if self.portal.portal_preferences.getPreferredCredentialRequestAutomaticApproval():
self.portal.log("ROQUE SIMULATING ALARM")
searchAndActivate_ = self.portal.portal_catalog.searchAndActivate
def searchAndActivate(**kw):
searchAndActivate_('Credential_accept', **kw)
portal_type_list = ['Credential Request']
self.portal.log("ROQUE calling searchAndActivate...")
searchAndActivate(
portal_type=portal_type_list,
validation_state='submitted',
)
self.portal.log("ROQUE searchAndActivate DONE")
self.portal.portal_alarms.accept_submitted_credentials.Alarm_acceptSubmittedCredentialList()
self.tic()
if category_list[0] == 'function/downloader':
# as credential creates contributor assigments by default, change it for downloader user
person = self.portal.portal_catalog.getResultValue(
portal_type = 'Person',
default_email_text = user_id + "@lake_security_test.com")
self.portal.log("CREATED CREDENTIAL: " + str(credential_request))
self.portal.log("CREATED CREDENTIAL PERSON: " + str(person))
if person is not None:
assignment = self.portal.restrictedTraverse('person_module/%s/1' % person.getId())
assignment.setFunction(['downloader'])
self.portal.log("CREATED CREDENTIAL PERSON ASSIGMENT: " + str(assignment.getFunction()))
def failUnlessUserHavePermissionOnDocument(self, permission_name, username, document):
sm = getSecurityManager()
try:
self.loginByUserName(username)
user = getSecurityManager().getUser()
if not user.has_permission(permission_name, document):
groups = []
if hasattr(user, 'getGroups'):
groups = user.getGroups()
raise AssertionError(
'User %s does NOT have %s permission on %s %s (user roles: [%s], '
'roles needed: [%s], existing local roles:\n%s\n'
'your user groups: [%s])' %
(username, permission_name, document.getPortalTypeName(),
document, ', '.join(user.getRolesInContext(document)),
', '.join([x['name'] for x in
document.rolesOfPermission(permission_name)
if x['selected']]),
pformat(document.get_local_roles()),
', '.join(groups)))
finally:
setSecurityManager(sm)
def failUnlessUserCanCreateViewAndValidate(self, user_id, container_portal_type, workflow_transition=None):
reference_base = ''.join(random.choice(string.digits) for x in range(8))
self.loginByUserName(user_id)
new_container = self.portal.getDefaultModule(container_portal_type
).newContent(portal_type=container_portal_type,
title=self.id() + reference_base)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user_id, new_container)
self.failUnlessUserHavePermissionOnDocument(Permissions.ModifyPortalContent, user_id, new_container)
def test_01_DefaultEbulkIngestion(self):
"""
Test default ingestion with ebulk too.
......@@ -281,4 +372,68 @@ class TestDataIngestion(SecurityTestCase):
self.assertEqual(first_file_stream.getValidationState(), 'published')
self.assertEqual(second_file_stream.getValidationState(), 'published')
def test_06_DefaultModelSecurityModel(self):
"""
Test default security model : 'All can download, only contributors can upload.'
"""
#ingest to generate some documents
self.login()
data_set, data_stream_list = self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic()
data_stream = data_stream_list[0]
data_ingestion = self.getDataIngestion(data_stream.getReference())
checkPermission = self.portal.portal_membership.checkPermission
#create users
self.createUserCredentials(self.DOWNLOADER_USER_ID, ['function/downloader'])
self.createUserCredentials(self.CONTRIBUTOR_USER_ID, ['function/contributor'])
#anonymous can't access modules or not published data
self.logout()
self.assertFalse(checkPermission(Permissions.View, self.portal.data_set_module))
self.assertFalse(checkPermission(Permissions.View, self.portal.data_stream_module))
self.assertFalse(checkPermission(Permissions.View, self.portal.data_ingestion_module))
self.assertFalse(checkPermission(Permissions.View, data_set))
self.assertFalse(checkPermission(Permissions.View, data_stream))
self.assertFalse(checkPermission(Permissions.View, data_ingestion))
#publish dataset
self.login()
data_set.publish()
self.tic()
#anonymous can access published data set and data stream
self.logout()
self.assertTrue(checkPermission(Permissions.View, data_set))
self.assertTrue(checkPermission(Permissions.View, data_stream))
#anonymous can't ingest
try:
self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic()
raise AssertionError("Anonymous user should not be able to ingest")
except Unauthorized:
pass
#users can access data
for user in [self.DOWNLOADER_USER_ID, self.CONTRIBUTOR_USER_ID]:
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, self.portal.data_set_module)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, self.portal.data_stream_module)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, self.portal.data_ingestion_module)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, data_set)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, data_stream)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, data_ingestion)
#downloader can't ingest
self.loginByUserName(self.DOWNLOADER_USER_ID)
try:
self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic()
raise AssertionError("Downloader user should not be able to ingest")
except Unauthorized:
pass
#contributor can ingest
self.loginByUserName(self.CONTRIBUTOR_USER_ID)
self.failUnlessUserCanCreateViewAndValidate(self.CONTRIBUTOR_USER_ID, 'Data Ingestion', 'validate_action')
self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic()
# XXX: new test which simulates download / upload of Data Set and increase DS version
\ No newline at end of file
......@@ -46,8 +46,8 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W:105, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:105, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:112, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:112, 76: Unused variable \'j\' (unused-variable)</string>
</tuple>
</value>
</item>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment