Commit 19d94258 authored by Roque's avatar Roque

erp5_wendelin_data_lake_ingestion: security tests

parent 54faa13d
from Products.ERP5Type.tests.SecurityTestCase import SecurityTestCase
from pprint import pformat
from AccessControl.SecurityManagement import getSecurityManager
from AccessControl.SecurityManagement import setSecurityManager
from Products.ERP5Type import Permissions
from zExceptions import Unauthorized
import string
import random
import csv
......@@ -9,6 +14,15 @@ import base64
class TestDataIngestion(SecurityTestCase):
def getBusinessTemplateList(self):
return (
'erp5_wendelin_data_lake_ingestion',
'erp5_credential',
'erp5_ui_test_core',
'erp5_accounting',
'erp5_test_result',
)
REFERENCE_SEPARATOR = "/"
PART_1 = REFERENCE_SEPARATOR + "001"
PART_2 = REFERENCE_SEPARATOR + "002"
......@@ -22,6 +36,8 @@ class TestDataIngestion(SecurityTestCase):
REF_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
REF_SUPPLIER_PREFIX = "fake-supplier" + REFERENCE_SEPARATOR
INVALID = "_invalid"
DOWNLOADER_USER_ID = 'test_downloader'
CONTRIBUTOR_USER_ID = 'test_contributor'
def getTitle(self):
return "DataIngestionTest"
......@@ -135,6 +151,66 @@ class TestDataIngestion(SecurityTestCase):
return data_set, [data_stream]
def createUserCredentials(self, user_id, category_list):
module = self.portal.getDefaultModule(portal_type='Credential Request')
if self.portal.CredentialRequest_checkLoginAvailability(user_id):
credential_request = module.newContent(
portal_type = "Credential Request",
first_name = "test_user",
last_name = user_id,
reference = user_id,
password = "test_password",
default_email_text = user_id + "@lake_security_test.com"
)
self.tic()
credential_request.setCategoryList(category_list)
tag = 'set_login_%s' % user_id.encode('hex')
credential_request.reindexObject(activate_kw={'tag': tag})
credential_request.submit("Automatic submit")
self.tic()
if category_list[0] == 'function/downloader':
# as credential creates contributor assigments by default, change it for downloader user
person = self.portal.portal_catalog.getResultValue(
portal_type = 'Person',
default_email_text = user_id + "@lake_security_test.com")
if person is not None:
assignment = self.portal.restrictedTraverse('person_module/%s/1' % person.getId())
assignment.setFunction(['downloader'])
def failUnlessUserHavePermissionOnDocument(self, permission_name, username, document):
sm = getSecurityManager()
try:
self.loginByUserName(username)
user = getSecurityManager().getUser()
if not user.has_permission(permission_name, document):
groups = []
if hasattr(user, 'getGroups'):
groups = user.getGroups()
raise AssertionError(
'User %s does NOT have %s permission on %s %s (user roles: [%s], '
'roles needed: [%s], existing local roles:\n%s\n'
'your user groups: [%s])' %
(username, permission_name, document.getPortalTypeName(),
document, ', '.join(user.getRolesInContext(document)),
', '.join([x['name'] for x in
document.rolesOfPermission(permission_name)
if x['selected']]),
pformat(document.get_local_roles()),
', '.join(groups)))
finally:
setSecurityManager(sm)
def failUnlessUserCanCreateViewAndValidate(self, user_id, container_portal_type, workflow_transition=None):
reference_base = ''.join(random.choice(string.digits) for x in range(8))
self.loginByUserName(user_id)
new_container = self.portal.getDefaultModule(container_portal_type
).newContent(portal_type=container_portal_type,
title=self.id() + reference_base)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user_id, new_container)
self.failUnlessUserHavePermissionOnDocument(Permissions.ModifyPortalContent, user_id, new_container)
def test_01_DefaultEbulkIngestion(self):
"""
Test default ingestion with ebulk too.
......@@ -218,9 +294,9 @@ class TestDataIngestion(SecurityTestCase):
self.assertNotEqual(None,
getattr(self.portal.data_supply_module, "embulk", None))
def test_04_DefaultModelSecurityModel(self):
def test_04_DatasetAndDatastreamsConsistency(self):
"""
Test default security model : 'All can download, only contributors can upload.'
Test that data set state transition also changes its data streams states
"""
data_set, data_stream_list = self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic()
......@@ -281,4 +357,68 @@ class TestDataIngestion(SecurityTestCase):
self.assertEqual(first_file_stream.getValidationState(), 'published')
self.assertEqual(second_file_stream.getValidationState(), 'published')
def test_06_DefaultModelSecurityModel(self):
"""
Test default security model : 'All can download, only contributors can upload.'
"""
#ingest to generate some documents
self.login()
data_set, data_stream_list = self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic()
data_stream = data_stream_list[0]
data_ingestion = self.getDataIngestion(data_stream.getReference())
checkPermission = self.portal.portal_membership.checkPermission
#create users
self.createUserCredentials(self.DOWNLOADER_USER_ID, ['function/downloader'])
self.createUserCredentials(self.CONTRIBUTOR_USER_ID, ['function/contributor'])
#anonymous can't access modules or not published data
self.logout()
self.assertFalse(checkPermission(Permissions.View, self.portal.data_set_module))
self.assertFalse(checkPermission(Permissions.View, self.portal.data_stream_module))
self.assertFalse(checkPermission(Permissions.View, self.portal.data_ingestion_module))
self.assertFalse(checkPermission(Permissions.View, data_set))
self.assertFalse(checkPermission(Permissions.View, data_stream))
self.assertFalse(checkPermission(Permissions.View, data_ingestion))
#publish dataset
self.login()
data_set.publish()
self.tic()
#anonymous can access published data set and data stream
self.logout()
self.assertTrue(checkPermission(Permissions.View, data_set))
self.assertTrue(checkPermission(Permissions.View, data_stream))
#anonymous can't ingest
try:
self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic()
raise AssertionError("Anonymous user should not be able to ingest")
except Unauthorized:
pass
#users can access data
for user in [self.DOWNLOADER_USER_ID, self.CONTRIBUTOR_USER_ID]:
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, self.portal.data_set_module)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, self.portal.data_stream_module)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, self.portal.data_ingestion_module)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, data_set)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, data_stream)
self.failUnlessUserHavePermissionOnDocument(Permissions.View, user, data_ingestion)
#downloader can't ingest
self.loginByUserName(self.DOWNLOADER_USER_ID)
try:
self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic()
raise AssertionError("Downloader user should not be able to ingest")
except Unauthorized:
pass
#contributor can ingest
self.loginByUserName(self.CONTRIBUTOR_USER_ID)
self.failUnlessUserCanCreateViewAndValidate(self.CONTRIBUTOR_USER_ID, 'Data Ingestion', 'validate_action')
self.stepIngest(self.CSV, ",", randomize_ingestion_reference=True)
self.tic()
# XXX: new test which simulates download / upload of Data Set and increase DS version
\ No newline at end of file
......@@ -46,8 +46,8 @@
<key> <string>text_content_warning_message</string> </key>
<value>
<tuple>
<string>W:105, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:105, 76: Unused variable \'j\' (unused-variable)</string>
<string>W:121, 34: Unused variable \'i\' (unused-variable)</string>
<string>W:121, 76: Unused variable \'j\' (unused-variable)</string>
</tuple>
</value>
</item>
......
erp5_credential
erp5_wendelin
\ No newline at end of file
erp5_credential
erp5_wendelin_data_lake_ingestion_default_security_model
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment