Commit 015bc1c1 authored by Jérome Perrin's avatar Jérome Perrin

ingestion: review publication_state argument

Changing state directly in Base_contribute was only functional for the case
where metadata was discovered asynchronously. In the case of synchronous
discovery, the state was first changed state, and Document_convertToBaseFormatAndDiscoverMetadata
was executed - but this this was causing Unauthorized like this:

      Module script, line 10, in Document_convertToBaseFormatAndDiscoverMetadata
      - <PythonScript at /erp5/Document_convertToBaseFormatAndDiscoverMetadata used for /erp5/document_module/163>
      - Line 10
        return context.discoverMetadata(filename=filename,
    Unauthorized: You are not allowed to access 'discoverMetadata' in this context

because once we have already changed state, regular user no longer have
permission to access discoverMetadata, because that method needs ModifyPortalContent
permission.

Instead, of handling publication_state only in Base_contribute, treat it
like others user input parameter and change state during discovery.

Tests were also re-organised to move Base_contribute related test in testIngestion
and also to run Base_contribute tests as a non-manager user.
parent e3e2c240
...@@ -49,10 +49,8 @@ import unittest ...@@ -49,10 +49,8 @@ import unittest
import time import time
import StringIO import StringIO
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
from cgi import FieldStorage
from unittest import expectedFailure from unittest import expectedFailure
import ZPublisher.HTTPRequest
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import FileUpload from Products.ERP5Type.tests.utils import FileUpload
from Products.ERP5Type.tests.utils import DummyLocalizer from Products.ERP5Type.tests.utils import DummyLocalizer
...@@ -1487,143 +1485,6 @@ class TestDocument(TestDocumentMixin): ...@@ -1487,143 +1485,6 @@ class TestDocument(TestDocumentMixin):
self.tic() self.tic()
self.assertEqual('converted', document.getExternalProcessingState()) self.assertEqual('converted', document.getExternalProcessingState())
def test_Base_contribute(self):
"""
Test contributing a file and attaching it to context.
"""
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
portal_type=None,
title=None,
reference=None,
short_title=None,
language=None,
version=None,
description=None,
attach_document_to_context=True,
file=makeFileUpload('TEST-en-002.odt'))
self.assertEqual('Text', contributed_document.getPortalType())
self.tic()
document_list = person.getFollowUpRelatedValueList()
self.assertEqual(1, len(document_list))
document = document_list[0]
self.assertEqual('converted', document.getExternalProcessingState())
self.assertEqual('Text', document.getPortalType())
self.assertEqual('title', document.getTitle())
self.assertEqual(contributed_document, document)
def test_Base_contribute_empty(self):
"""
Test contributing an empty file and attaching it to context.
"""
person = self.portal.person_module.newContent(portal_type='Person')
empty_file_upload = ZPublisher.HTTPRequest.FileUpload(FieldStorage(
fp=StringIO.StringIO(),
environ=dict(REQUEST_METHOD='PUT'),
headers={"content-disposition":
"attachment; filename=empty;"}))
contributed_document = person.Base_contribute(
portal_type=None,
title=None,
reference=None,
short_title=None,
language=None,
version=None,
description=None,
attach_document_to_context=True,
file=empty_file_upload)
self.tic()
document_list = person.getFollowUpRelatedValueList()
self.assertEqual(1, len(document_list))
document = document_list[0]
self.assertEqual('File', document.getPortalType())
self.assertEqual(contributed_document, document)
def test_Base_contribute_forced_type(self):
"""Test contributing while forcing the portal type.
"""
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
portal_type='PDF',
file=makeFileUpload('TEST-en-002.odt'))
self.assertEqual('PDF', contributed_document.getPortalType())
def test_Base_contribute_input_parameter_dict(self):
"""Test contributing while entering input parameters.
"""
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
title='user supplied title',
file=makeFileUpload('TEST-en-002.pdf'))
self.tic()
self.assertEqual('user supplied title', contributed_document.getTitle())
def test_Base_contribute_publication_state(self):
"""Test contributing and choosing the publication state
"""
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
publication_state=None,
# we use as_name, to prevent regular expression from detecting a
# reference during ingestion, so that we can upload multiple documents
# in one test.
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'draft')
contributed_document = person.Base_contribute(
publication_state='shared',
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document = person.Base_contribute(
publication_state='released',
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'released')
def test_Base_contribute_publication_state_vs_finishIngestion_script(self):
"""Contribute dialog allow choosing a publication state, but there's
also a "finishIngestion" type based script that can be configured to
force change the state. If user selects a publication_state, the state is
changed before the finishIngestion can operate.
"""
createZODBPythonScript(
self.portal.portal_skins.custom,
'PDF_finishIngestion',
'',
'if context.getValidationState() == "draft":\n'
' context.publish()')
try:
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
self.tic()
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=False,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
contributed_document = person.Base_contribute(
publication_state=None,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'published')
finally:
self.portal.portal_skins.custom.manage_delObjects(ids=['PDF_finishIngestion'])
self.commit()
def test_HTML_to_ODT_conversion_keep_enconding(self): def test_HTML_to_ODT_conversion_keep_enconding(self):
"""This test perform an PDF conversion of HTML content """This test perform an PDF conversion of HTML content
then to plain text. then to plain text.
......
...@@ -25,6 +25,7 @@ if user_login is None: ...@@ -25,6 +25,7 @@ if user_login is None:
document_kw = {'user_login': user_login, document_kw = {'user_login': user_login,
'group': group, 'group': group,
'publication_section': publication_section, 'publication_section': publication_section,
'publication_state': publication_state,
} }
if use_context_for_container: if use_context_for_container:
...@@ -70,11 +71,6 @@ else: ...@@ -70,11 +71,6 @@ else:
document_kw.update({'file': file}) document_kw.update({'file': file})
document = portal_contributions.newContent(**document_kw) document = portal_contributions.newContent(**document_kw)
if document.getValidationState() == 'draft':
if publication_state == "shared":
document.share()
elif publication_state == "released":
document.release()
is_existing_document_updated = False is_existing_document_updated = False
if synchronous_metadata_discovery: if synchronous_metadata_discovery:
......
...@@ -137,6 +137,8 @@ class DiscoverableMixin(CachedConvertableMixin): ...@@ -137,6 +137,8 @@ class DiscoverableMixin(CachedConvertableMixin):
currently logged in, then we'll get him from session currently logged in, then we'll get him from session
input_parameter_dict - arguments provided to Create this content by user. input_parameter_dict - arguments provided to Create this content by user.
""" """
if input_parameter_dict is None:
input_parameter_dict = {}
# Preference is made of a sequence of 'user_login', 'content', 'filename', 'input' # Preference is made of a sequence of 'user_login', 'content', 'filename', 'input'
method = self._getTypeBasedMethod('getPreferredDocumentMetadataDiscoveryOrderList') method = self._getTypeBasedMethod('getPreferredDocumentMetadataDiscoveryOrderList')
order_list = list(method()) order_list = list(method())
...@@ -166,8 +168,6 @@ class DiscoverableMixin(CachedConvertableMixin): ...@@ -166,8 +168,6 @@ class DiscoverableMixin(CachedConvertableMixin):
if value not in (None, ''): if value not in (None, ''):
kw[key]=value kw[key]=value
# Prepare the content edit parameters # Prepare the content edit parameters
portal_type = None
if input_parameter_dict is not None:
# User decision take precedence, never try to change this value # User decision take precedence, never try to change this value
portal_type = input_parameter_dict.get('portal_type') portal_type = input_parameter_dict.get('portal_type')
if not portal_type: if not portal_type:
...@@ -187,6 +187,16 @@ class DiscoverableMixin(CachedConvertableMixin): ...@@ -187,6 +187,16 @@ class DiscoverableMixin(CachedConvertableMixin):
portal_type = registry.findPortalTypeName(context=self) portal_type = registry.findPortalTypeName(context=self)
if portal_type != self.getPortalType(): if portal_type != self.getPortalType():
return self.migratePortalType(portal_type) return self.migratePortalType(portal_type)
def maybeChangeState(document):
publication_state = input_parameter_dict.get('publication_state')
if publication_state and document.getValidationState() == 'draft':
if publication_state == "shared":
document.share()
elif publication_state == "released":
document.release()
  • XXX this seem to rely on the fact that workflow method has security definition, which is not something 100% agreed ( !1035 )

  • ( it's more than 90% agreed from me, I wanted mostly to make a cross reference link )

Please register or sign in to reply
maybeChangeState(self)
# Finish ingestion by calling method # Finish ingestion by calling method
self.finishIngestion() # XXX - is this really the right place ? self.finishIngestion() # XXX - is this really the right place ?
self.reindexObject() # XXX - is this really the right place ? self.reindexObject() # XXX - is this really the right place ?
...@@ -194,6 +204,7 @@ class DiscoverableMixin(CachedConvertableMixin): ...@@ -194,6 +204,7 @@ class DiscoverableMixin(CachedConvertableMixin):
# to metadata discovery - refer to the documentation of mergeRevision method # to metadata discovery - refer to the documentation of mergeRevision method
merged_doc = self.mergeRevision() # XXX - is this really the right place ? merged_doc = self.mergeRevision() # XXX - is this really the right place ?
merged_doc.reindexObject() # XXX - is this really the right place ? merged_doc.reindexObject() # XXX - is this really the right place ?
maybeChangeState(merged_doc)
return merged_doc # XXX - is this really the right place ? return merged_doc # XXX - is this really the right place ?
security.declareProtected(Permissions.ModifyPortalContent, 'finishIngestion') security.declareProtected(Permissions.ModifyPortalContent, 'finishIngestion')
......
...@@ -31,7 +31,10 @@ ...@@ -31,7 +31,10 @@
import unittest import unittest
import os import os
import StringIO
from cgi import FieldStorage
from lxml import etree from lxml import etree
from AccessControl.SecurityManagement import newSecurityManager
from DateTime import DateTime from DateTime import DateTime
from Products.ERP5Type.Utils import convertToUpperCase from Products.ERP5Type.Utils import convertToUpperCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ( from Products.ERP5Type.tests.ERP5TypeTestCase import (
...@@ -41,6 +44,7 @@ from Products.ERP5Type.tests.utils import FileUpload, createZODBPythonScript ...@@ -41,6 +44,7 @@ from Products.ERP5Type.tests.utils import FileUpload, createZODBPythonScript
from Products.ERP5OOo.OOoUtils import OOoBuilder from Products.ERP5OOo.OOoUtils import OOoBuilder
from Products.CMFCore.utils import getToolByName from Products.CMFCore.utils import getToolByName
from zExceptions import BadRequest from zExceptions import BadRequest
import ZPublisher.HTTPRequest
from unittest import expectedFailure from unittest import expectedFailure
import urllib import urllib
import urllib2 import urllib2
...@@ -63,21 +67,8 @@ def makeFileUpload(name, as_name=None): ...@@ -63,21 +67,8 @@ def makeFileUpload(name, as_name=None):
path = makeFilePath(name) path = makeFilePath(name)
return FileUpload(path, as_name) return FileUpload(path, as_name)
class TestIngestion(ERP5TypeTestCase):
"""
ERP5 Document Management System - test file ingestion mechanism
"""
##################################
## ZopeTestCase Skeleton
##################################
def getTitle(self):
"""
Return the title of the current test set.
"""
return "ERP5 DMS - Ingestion"
class IngestionTestCase(ERP5TypeTestCase):
def getBusinessTemplateList(self): def getBusinessTemplateList(self):
""" """
Return the list of required business templates. Return the list of required business templates.
...@@ -86,19 +77,6 @@ class TestIngestion(ERP5TypeTestCase): ...@@ -86,19 +77,6 @@ class TestIngestion(ERP5TypeTestCase):
'erp5_ingestion', 'erp5_ingestion_mysql_innodb_catalog', 'erp5_ingestion', 'erp5_ingestion_mysql_innodb_catalog',
'erp5_web', 'erp5_crm', 'erp5_dms') 'erp5_web', 'erp5_crm', 'erp5_dms')
def afterSetUp(self):
"""
Initialize the ERP5 site.
"""
self.login()
self.datetime = DateTime()
self.portal = self.getPortal()
self.portal_categories = self.getCategoryTool()
self.portal_catalog = self.getCatalogTool()
self.createDefaultCategoryList()
self.setSystemPreference()
self.setSimulatedNotificationScript()
def beforeTearDown(self): def beforeTearDown(self):
# cleanup modules # cleanup modules
module_id_list = """web_page_module module_id_list = """web_page_module
...@@ -124,6 +102,7 @@ class TestIngestion(ERP5TypeTestCase): ...@@ -124,6 +102,7 @@ class TestIngestion(ERP5TypeTestCase):
'Document_getPropertyDictFromFilename', 'Document_getPropertyDictFromFilename',
'Document_getPropertyDictFromUserLogin', 'Document_getPropertyDictFromUserLogin',
'Document_finishIngestion', 'Document_finishIngestion',
'PDF_finishIngestion',
'Document_getPreferredDocumentMetadataDiscoveryOrderList', 'Document_getPreferredDocumentMetadataDiscoveryOrderList',
'Text_getPropertyDictFromContent', 'Text_getPropertyDictFromContent',
'Text_getPropertyDictFromInput', 'Text_getPropertyDictFromInput',
...@@ -137,6 +116,28 @@ class TestIngestion(ERP5TypeTestCase): ...@@ -137,6 +116,28 @@ class TestIngestion(ERP5TypeTestCase):
skin_tool.custom._delObject(script_id) skin_tool.custom._delObject(script_id)
self.commit() self.commit()
class TestIngestion(IngestionTestCase):
"""
ERP5 Document Management System - test file ingestion mechanism
"""
##################################
## ZopeTestCase Skeleton
##################################
def afterSetUp(self):
"""
Initialize the ERP5 site.
"""
self.login()
self.datetime = DateTime()
self.portal = self.getPortal()
self.portal_categories = self.getCategoryTool()
self.portal_catalog = self.getCatalogTool()
self.createDefaultCategoryList()
self.setSystemPreference()
self.setSimulatedNotificationScript()
def setSystemPreference(self): def setSystemPreference(self):
default_pref = self.getDefaultSystemPreference() default_pref = self.getDefaultSystemPreference()
default_pref.setPreferredDocumentFilenameRegularExpression(FILENAME_REGULAR_EXPRESSION) default_pref.setPreferredDocumentFilenameRegularExpression(FILENAME_REGULAR_EXPRESSION)
...@@ -2024,7 +2025,178 @@ return result ...@@ -2024,7 +2025,178 @@ return result
self.assertTrue(document is not None) self.assertTrue(document is not None)
self.assertEqual(document.getData(), data) self.assertEqual(document.getData(), data)
def test_suite():
suite = unittest.TestSuite() class Base_contributeMixin:
suite.addTest(unittest.makeSuite(TestIngestion)) """Tests for Base_contribute script.
return suite """
def test_Base_contribute(self):
"""
Test contributing a file and attaching it to context.
"""
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
portal_type=None,
title=None,
reference=None,
short_title=None,
language=None,
version=None,
description=None,
attach_document_to_context=True,
file=makeFileUpload('TEST-en-002.odt'))
self.assertEqual('Text', contributed_document.getPortalType())
self.tic()
document_list = person.getFollowUpRelatedValueList()
self.assertEqual(1, len(document_list))
document = document_list[0]
self.assertEqual('converted', document.getExternalProcessingState())
self.assertEqual('Text', document.getPortalType())
self.assertEqual('title', document.getTitle())
self.assertEqual(contributed_document, document)
def test_Base_contribute_empty(self):
"""
Test contributing an empty file and attaching it to context.
"""
person = self.portal.person_module.newContent(portal_type='Person')
empty_file_upload = ZPublisher.HTTPRequest.FileUpload(FieldStorage(
fp=StringIO.StringIO(),
environ=dict(REQUEST_METHOD='PUT'),
headers={"content-disposition":
"attachment; filename=empty;"}))
contributed_document = person.Base_contribute(
portal_type=None,
title=None,
reference=None,
short_title=None,
language=None,
version=None,
description=None,
attach_document_to_context=True,
file=empty_file_upload)
self.tic()
document_list = person.getFollowUpRelatedValueList()
self.assertEqual(1, len(document_list))
document = document_list[0]
self.assertEqual('File', document.getPortalType())
self.assertEqual(contributed_document, document)
def test_Base_contribute_forced_type(self):
"""Test contributing while forcing the portal type.
"""
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
portal_type='PDF',
file=makeFileUpload('TEST-en-002.odt'))
self.assertEqual('PDF', contributed_document.getPortalType())
def test_Base_contribute_input_parameter_dict(self):
"""Test contributing while entering input parameters.
"""
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
title='user supplied title',
file=makeFileUpload('TEST-en-002.pdf'))
self.tic()
self.assertEqual('user supplied title', contributed_document.getTitle())
def test_Base_contribute_publication_state(self):
"""Test contributing and choosing the publication state
"""
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
publication_state=None,
# we use as_name, to prevent regular expression from detecting a
# reference during ingestion, so that we can upload multiple documents
# in one test.
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'draft')
contributed_document.setReference(None)
self.tic()
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=False,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
self.tic()
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
self.tic()
contributed_document = person.Base_contribute(
publication_state='released',
synchronous_metadata_discovery=False,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'released')
contributed_document.setReference(None)
self.tic()
contributed_document = person.Base_contribute(
publication_state='released',
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'released')
def test_Base_contribute_publication_state_vs_finishIngestion_script(self):
"""Contribute dialog allow choosing a publication state, but there's
also a "finishIngestion" type based script that can be configured to
force change the state. If user selects a publication_state, the state is
changed before the finishIngestion can operate.
"""
createZODBPythonScript(
self.portal.portal_skins.custom,
'PDF_finishIngestion',
'',
'if context.getValidationState() == "draft":\n'
' context.publish()')
person = self.portal.person_module.newContent(portal_type='Person')
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=True,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
self.tic()
contributed_document = person.Base_contribute(
publication_state='shared',
synchronous_metadata_discovery=False,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'shared')
contributed_document.setReference(None)
contributed_document = person.Base_contribute(
publication_state=None,
file=makeFileUpload('TEST-en-002.pdf', as_name='doc.pdf'))
self.tic()
self.assertEqual(contributed_document.getValidationState(), 'published')
class TestBase_contribute(IngestionTestCase, Base_contributeMixin):
"""Base_contribute tests as Manager (ie. without security restrictions)
"""
class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
"""Base_contribute tests with security.
"""
def login(self, *args, **kw):
uf = self.portal.acl_users
uf._doAddUser(self.id(), self.newPassword(), ['Associate', 'Assignor', 'Author'], [])
user = uf.getUserById(self.id()).__of__(uf)
newSecurityManager(None, user)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment