Commit afd84b01 authored by Nicolas Delaby's avatar Nicolas Delaby

s/file_name/filename/

s/source_reference/filename/

update tests
Add test to check Hackability of ContributionTool (Everything can be managed by IDiscoverable API)


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@40972 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent de2545fc
......@@ -383,7 +383,7 @@ class OOoDocument(OOoDocumentExtensibleTraversableMixin, BaseConvertableFileMixi
temp_image = self.portal_contributions.newContent(
portal_type='Image',
file=cStringIO.StringIO(),
file_name=self.getId(),
filename=self.getId(),
temp_object=1)
temp_image._setData(data)
# we care for first page only but as well for image quality
......@@ -420,23 +420,23 @@ class OOoDocument(OOoDocumentExtensibleTraversableMixin, BaseConvertableFileMixi
else:
must_close = 0
for f in zip_file.infolist():
file_name = f.filename
document = self.get(file_name, None)
filename = f.filename
document = self.get(filename, None)
if document is not None:
self.manage_delObjects([file_name]) # For compatibility with old implementation
if file_name.endswith('html'):
self.manage_delObjects([filename]) # For compatibility with old implementation
if filename.endswith('html'):
mime = 'text/html'
# call portal_transforms to strip HTML in safe mode
portal = self.getPortalObject()
transform_tool = getToolByName(portal, 'portal_transforms')
data = transform_tool.convertToData('text/x-html-safe',
zip_file.read(file_name),
zip_file.read(filename),
object=self, context=self,
mimetype=mime)
else:
mime = guess_content_type(file_name)[0]
data = Pdata(zip_file.read(file_name))
self.setConversion(data, mime=mime, format=EMBEDDED_FORMAT, file_name=file_name)
mime = guess_content_type(filename)[0]
data = Pdata(zip_file.read(filename))
self.setConversion(data, mime=mime, format=EMBEDDED_FORMAT, filename=filename)
if must_close:
zip_file.close()
archive_file.close()
......@@ -450,7 +450,7 @@ class OOoDocument(OOoDocumentExtensibleTraversableMixin, BaseConvertableFileMixi
"""
server_proxy = OOoServerProxy(self)
response_code, response_dict, response_message = server_proxy.run_convert(
self.getSourceReference() or self.getId(),
self.getFilename() or self.getId(),
enc(str(self.getData())),
None,
None,
......@@ -468,9 +468,7 @@ class OOoDocument(OOoDocumentExtensibleTraversableMixin, BaseConvertableFileMixi
"OOoDocument: Error converting document to base format %s:%s:"
% (response_code, response_message))
security.declareProtected(Permissions.AccessContentsInformation,
'getContentInformation')
def getContentInformation(self):
def _getContentInformation(self):
"""
Returns the metadata extracted by the conversion
server.
......
......@@ -74,11 +74,12 @@ import difflib
from AccessControl import Unauthorized
from Products.ERP5Type import Permissions
from Products.ERP5Type.tests.backportUnittest import expectedFailure
from Products.ERP5.Tool.ContributionTool import AlreadyIngestedUrlError
QUIET = 0
TEST_FILES_HOME = os.path.join(os.path.dirname(__file__), 'test_document')
FILE_NAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})"
FILENAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})"
REFERENCE_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})(-(?P<language>[a-z]{2}))?(-(?P<version>[0-9]{3}))?"
def makeFilePath(name):
......@@ -114,7 +115,7 @@ class TestDocumentMixin(ERP5TypeTestCase):
conversion_dict = _getConversionServerDict()
default_pref.setPreferredOoodocServerAddress(conversion_dict['hostname'])
default_pref.setPreferredOoodocServerPortNumber(conversion_dict['port'])
default_pref.setPreferredDocumentFileNameRegularExpression(FILE_NAME_REGULAR_EXPRESSION)
default_pref.setPreferredDocumentFilenameRegularExpression(FILENAME_REGULAR_EXPRESSION)
default_pref.setPreferredDocumentReferenceRegularExpression(REFERENCE_REGULAR_EXPRESSION)
if self.portal.portal_workflow.isTransitionPossible(default_pref, 'enable'):
default_pref.enable()
......@@ -193,14 +194,14 @@ class TestDocument(TestDocumentMixin):
## helper methods
def createTestDocument(self, file_name=None, portal_type='Text', reference='TEST', version='002', language='en'):
def createTestDocument(self, filename=None, portal_type='Text', reference='TEST', version='002', language='en'):
"""
Creates a text document
"""
dm=self.getPortal().document_module
doctext=dm.newContent(portal_type=portal_type)
if file_name is not None:
f = open(makeFilePath(file_name), 'rb')
if filename is not None:
f = open(makeFilePath(filename), 'rb')
doctext.setTextContent(f.read())
f.close()
doctext.setReference(reference)
......@@ -585,7 +586,7 @@ class TestDocument(TestDocumentMixin):
# tests that owners can download OOo documents, and all headers (including
# filenames) are set correctly
doc = self.portal.document_module.newContent(
source_reference='test.ods',
filename='test.ods',
portal_type='Spreadsheet')
doc.edit(file=makeFileUpload('import_data_list.ods'))
......@@ -608,7 +609,7 @@ class TestDocument(TestDocumentMixin):
# tests that members can download OOo documents in pdf format (at least in
# published state), and all headers (including filenames) are set correctly
doc = self.portal.document_module.newContent(
source_reference='test.ods',
filename='test.ods',
portal_type='Spreadsheet')
doc.edit(file=makeFileUpload('import.file.with.dot.in.filename.ods'))
doc.publish()
......@@ -1276,32 +1277,28 @@ class TestDocument(TestDocumentMixin):
upload_file = makeFileUpload('REF-en-001.pdf')
document = self.portal.document_module.newContent(portal_type='PDF')
# Here we use edit instead of setFile,
# because only edit method set filename as source_reference.
# because only edit method set filename as filename.
document.edit(file=upload_file)
self.assertEquals('application/pdf', document.getContentType())
def test_Document_getStandardFileName(self):
def test_Document_getStandardFilename(self):
upload_file = makeFileUpload('metadata.pdf')
document = self.portal.document_module.newContent(portal_type='PDF')
# Here we use edit instead of setFile,
# because only edit method set filename as source_reference.
document.edit(file=upload_file)
self.assertEquals(document.getStandardFileName(), 'metadata.pdf')
self.assertEquals(document.getStandardFileName(format='png'),
self.assertEquals(document.getStandardFilename(), 'metadata.pdf')
self.assertEquals(document.getStandardFilename(format='png'),
'metadata.png')
document.setVersion('001')
document.setLanguage('en')
self.assertEquals(document.getStandardFileName(), 'metadata-001-en.pdf')
self.assertEquals(document.getStandardFileName(format='png'),
self.assertEquals(document.getStandardFilename(), 'metadata-001-en.pdf')
self.assertEquals(document.getStandardFilename(format='png'),
'metadata-001-en.png')
# check when format contains multiple '.'
upload_file = makeFileUpload('TEST-en-003.odp')
document = self.portal.document_module.newContent(portal_type='Presentation')
# Here we use edit instead of setFile,
# because only edit method set filename as source_reference.
document.edit(file=upload_file)
self.assertEquals(document.getStandardFileName(), 'TEST-en-003.odp')
self.assertEquals('TEST-en-003.odg', document.getStandardFileName(format='odp.odg'))
self.assertEquals(document.getStandardFilename(), 'TEST-en-003.odp')
self.assertEquals('TEST-en-003.odg', document.getStandardFilename(format='odp.odg'))
def test_CMYKImageTextContent(self):
......@@ -1320,14 +1317,10 @@ class TestDocument(TestDocumentMixin):
self.stepTic()
self.assertEquals('converted', document.getExternalProcessingState())
# Upload different type of file inside which can not be converted to base format
upload_file = makeFileUpload('REF-en-001.pdf')
document.edit(file=upload_file)
# Delete base_data
document.edit(base_data=None)
self.stepTic()
self.assertEquals('application/pdf', document.getContentType())
self.assertEquals('conversion_failed', document.getExternalProcessingState())
# As document is not converted, text convertion is impossible
# But document can still be retrive with portal catalog
# As document is not converted, text conversion is impossible
self.assertRaises(NotConvertedError, document.asText)
self.assertRaises(NotConvertedError, document.getSearchableText)
self.assertEquals('This document is not converted yet.',
......@@ -1646,6 +1639,28 @@ document.write('<sc'+'ript type="text/javascript" src="http://somosite.bg/utb.ph
self.assertTrue('AZERTYY' not in safe_html)
self.assertTrue('#FFAA44' in safe_html)
@expectedFailure
def test_safeHTML_impossible_conversion(self):
"""Some html are not parsable.
"""
web_page_portal_type = 'Web Page'
module = self.portal.getDefaultModule(web_page_portal_type)
web_page = module.newContent(portal_type=web_page_portal_type)
# very dirty html
html_content = """
<html>
<body>
<p><a href="http://www.example.com/category/html/" style="font-weight: bold; color: rgb(0, 0, 0); font-size: 90.8777%; text-decoration: none;" title="catégorie how to write valid html d" alt="Diancre pas d" accord="" :="" 6="" articles="">Its french</a></p>
</body>
</html>
"""
web_page.edit(text_content=html_content)
from HTMLParser import ParserError
try:
web_page.asStrippedHTML()
except ParserError:
self.fail('Even BeautifulSoup is not able to parse such HTML')
def test_parallel_conversion(self):
"""Check that conversion engine is able to fill in
cache without overwrite previous conversion
......@@ -1768,7 +1783,8 @@ document.write('<sc'+'ript type="text/javascript" src="http://somosite.bg/utb.ph
upload_file = makeFileUpload('TEST-text-iso8859-1.txt')
web_page = module.newContent(portal_type=web_page_portal_type,
file=upload_file)
transaction.commit()
self.tic()
text_content = web_page.getTextContent()
my_utf_eight_token = 'ùééàçèîà'
text_content = text_content.replace('\n', '\n%s\n' % my_utf_eight_token)
......@@ -1798,9 +1814,9 @@ return 1
transaction.commit()
def _test_document_conversion_to_base_format_no_original_format_access(self,
portal_type, file_name):
portal_type, filename):
module = self.portal.getDefaultModule(portal_type)
upload_file = makeFileUpload(file_name)
upload_file = makeFileUpload(filename)
document = module.newContent(portal_type=portal_type,
file=upload_file)
......@@ -1869,48 +1885,6 @@ return 1
self.assertTrue('Continue' in response.getBody())
self.assertTrue('Last page' in response.getBody())
def test_contributeLink(self):
"""
Test contributing a link.
"""
portal = self.portal
kw = {'url':portal.absolute_url()}
web_page_1 = portal.Base_contribute(**kw)
self.stepTic()
self.assertTrue(web_page_1.getRevision()=='2')
web_page_2 = portal.Base_contribute(**kw)
self.stepTic()
self.assertTrue(web_page_1==web_page_2)
self.assertTrue(web_page_2.getRevision()=='3')
web_page_3 = portal.Base_contribute(**kw)
self.stepTic()
self.assertTrue(web_page_2==web_page_3)
self.assertTrue(web_page_3.getRevision()=='4')
# test in synchronous mode
kw['synchronous_metadata_discovery']=True
web_page_4 = portal.Base_contribute(**kw)
self.stepTic()
self.assertTrue(web_page_3==web_page_4)
self.assertTrue(web_page_4.getRevision()=='5')
web_page_5 = portal.Base_contribute(**kw)
self.stepTic()
self.assertTrue(web_page_4==web_page_5)
self.assertTrue(web_page_5.getRevision()=='6')
web_page_6 = portal.Base_contribute(**kw)
self.stepTic()
self.assertTrue(web_page_5==web_page_6)
self.assertTrue(web_page_6.getRevision()=='7')
# test contribute link is a safe html (duplicates parts of test_safeHTML_conversion)
web_page_6_entire_html = web_page_6.asEntireHTML()
self.assertTrue('<script' not in web_page_6_entire_html)
self.assertTrue('<javascript' not in web_page_6_entire_html)
def test_getTargetFormatItemList(self):
"""
Test getting target conversion format item list.
......
......@@ -40,7 +40,7 @@ from Products.ERP5Type.Utils import convertToUpperCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase,\
_getConversionServerDict
from Products.ERP5Type.tests.Sequence import SequenceList
from Products.ERP5Type.tests.utils import FileUpload
from Products.ERP5Type.tests.utils import FileUpload, createZODBPythonScript
from Products.ERP5OOo.Document.OOoDocument import ConversionError
from Products.ERP5OOo.OOoUtils import OOoBuilder
from zLOG import LOG, INFO, ERROR
......@@ -48,7 +48,7 @@ from Products.CMFCore.utils import getToolByName
# test files' home
TEST_FILES_HOME = os.path.join(os.path.dirname(__file__), 'test_document')
FILE_NAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z&é@{]{3,7})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})"
FILENAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z&é@{]{3,7})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})"
REFERENCE_REGULAR_EXPRESSION = "(?P<reference>[A-Z&é@{]{3,7})(-(?P<language>[a-z]{2}))?(-(?P<version>[0-9]{3}))?"
......@@ -98,6 +98,17 @@ class TestIngestion(ERP5TypeTestCase):
self.setSimulatedNotificationScript()
def beforeTearDown(self):
# cleanup modules
module_id_list = """web_page_module
document_module
image_module
external_source_module
""".split()
for module_id in module_id_list:
module = self.portal[module_id]
module.manage_delObjects([id for id in module.objectIds()])
transaction.commit()
self.tic()
activity_tool = self.portal.portal_activities
activity_status = set(m.processing_node < -1
for m in activity_tool.getMessageList())
......@@ -106,13 +117,31 @@ class TestIngestion(ERP5TypeTestCase):
else:
assert not activity_status
self.portal.portal_caches.clearAllCache()
# Cleanup portal_skins
script_id_list = ('Document_getPropertyDictFromContent',
'Document_getPropertyDictFromInput',
'Document_getPropertyDictFromFilename',
'Document_getPropertyDictFromUserLogin',
'Document_finishIngestion',
'Document_getPreferredDocumentMetadataDiscoveryOrderList',
'Text_getPropertyDictFromContent',
'Text_getPropertyDictFromInput',
'Text_getPropertyDictFromFilename',
'Text_getPropertyDictFromUserLogin',
'Text_finishIngestion',
'Text_getPreferredDocumentMetadataDiscoveryOrderList',)
skin_tool = self.portal.portal_skins
for script_id in script_id_list:
if script_id in skin_tool.custom.objectIds():
skin_tool.custom._delObject(script_id)
transaction.commit()
def setSystemPreference(self):
default_pref = self.portal.portal_preferences.default_site_preference
conversion_dict = _getConversionServerDict()
default_pref.setPreferredOoodocServerAddress(conversion_dict['hostname'])
default_pref.setPreferredOoodocServerPortNumber(conversion_dict['port'])
default_pref.setPreferredDocumentFileNameRegularExpression(FILE_NAME_REGULAR_EXPRESSION)
default_pref.setPreferredDocumentFilenameRegularExpression(FILENAME_REGULAR_EXPRESSION)
default_pref.setPreferredDocumentReferenceRegularExpression(REFERENCE_REGULAR_EXPRESSION)
if default_pref.getPreferenceState() != 'global':
default_pref.enable()
......@@ -124,10 +153,9 @@ class TestIngestion(ERP5TypeTestCase):
context = self.portal.portal_skins.custom
script_id = 'Document_notifyByEmail'
if not hasattr(context, script_id):
factory = context.manage_addProduct['PythonScripts'].manage_addPythonScript
factory(id=script_id)
script = getattr(context, script_id)
script.ZPythonScript_edit('email_to, event, doc, **kw', 'return')
createZODBPythonScript(context, script_id,
'email_to, event, doc, **kw', 'return')
def createDefaultCategoryList(self):
"""
......@@ -208,47 +236,16 @@ class TestIngestion(ERP5TypeTestCase):
categories.append(category)
return categories
def getDocument(self, id):
"""
Returns a document with given ID in the
document module.
"""
document_module = self.portal.document_module
return getattr(document_module, id)
def checkIsObjectCatalogged(self, portal_type, **kw):
"""
Make sure that a document with given portal type
and kw properties is already present in the catalog.
Typical use of this method consists in providing
an id or reference.
"""
res = self.portal_catalog(portal_type=portal_type, **kw.copy())
self.assertEquals(len(res), 1)
for key, value in kw.items():
self.assertEquals(res[0].getProperty(key), value)
def newEmptyCataloggedDocument(self, portal_type, id):
def newEmptyDocument(self, portal_type):
"""
Create an empty document of given portal type
and given ID.
Documents are immediately catalogged and verified
both form catalog point of view and from their
presence in the document module.
"""
document_module = self.portal.getDefaultModule(portal_type)
document = getattr(document_module, id, None)
if document is not None:
document_module.manage_delObjects([id,])
document = document_module.newContent(portal_type=portal_type, id=id)
self.stepTic()
self.checkIsObjectCatalogged(portal_type, id=id, parent_uid=document_module.getUid())
self.assert_(hasattr(document_module, id))
return document
return document_module.newContent(portal_type=portal_type)
def ingestFormatList(self, document_id, format_list, portal_type=None):
def ingestFormatList(self, document, format_list):
"""
Upload in document document_id all test files which match
any of the formats in format_list.
......@@ -260,11 +257,6 @@ class TestIngestion(ERP5TypeTestCase):
For every file, this checks is the word "magic"
is present in both SearchableText and asText.
"""
if portal_type is None:
document_module = self.portal.document_module
else:
document_module = self.portal.getDefaultModule(portal_type)
document = getattr(document_module, document_id)
for revision, format in enumerate(format_list):
filename = 'TEST-en-002.%s' %format
f = makeFileUpload(filename)
......@@ -280,13 +272,12 @@ class TestIngestion(ERP5TypeTestCase):
# check if SearchableText() does not raise any exception
document.SearchableText()
def checkDocumentExportList(self, document_id, format, asserted_target_list):
def checkDocumentExportList(self, document, format, asserted_target_list):
"""
Upload document ID document_id with
a test file of given format and assert that the document
can be converted to any of the formats in asserted_target_list
"""
document = self.getDocument(document_id)
filename = 'TEST-en-002.' + format
f = makeFileUpload(filename)
document.edit(file=f)
......@@ -295,7 +286,8 @@ class TestIngestion(ERP5TypeTestCase):
self.getPortal().portal_caches.clearCache()
target_list = document.getTargetFormatList()
for target in asserted_target_list:
self.assert_(target in target_list)
self.assertTrue(target in target_list, 'target:%r not in %r' % (target,
target_list,))
def contributeFileList(self, with_portal_type=False):
"""
......@@ -344,57 +336,57 @@ class TestIngestion(ERP5TypeTestCase):
self.assertEquals(document.getExternalProcessingState(), 'converted')
self.assert_('magic' in document.SearchableText())
def newPythonScript(self, object_id, script_id, argument_list, code):
def newPythonScript(self, script_id, argument_list, code):
"""
Creates a new python script with given argument_list
and source code.
"""
context = self.getDocument(object_id)
context.manage_addProduct['PythonScripts'].manage_addPythonScript(id=script_id)
script = getattr(context, script_id)
script.ZPythonScript_edit(argument_list, code)
context = self.portal.portal_skins.custom
if context._getOb(script_id, None) is not None:
context._delObject(script_id)
createZODBPythonScript(context, script_id, argument_list, code)
def setDiscoveryOrder(self, order, id='one'):
def setDiscoveryOrder(self, order):
"""
Creates a script to define the metadata discovery order
for Text documents.
"""
script_code = "return %s" % str(order)
self.newPythonScript(id, 'Text_getPreferredDocumentMetadataDiscoveryOrderList', '', script_code)
self.newPythonScript('Text_getPreferredDocumentMetadataDiscoveryOrderList',
'', script_code)
def discoverMetadata(self, document_id='one'):
def discoverMetadata(self, document):
"""
Sets input parameters and on the document ID document_id
and discover metadata. For reindexing
"""
document = self.getDocument(document_id)
# simulate user input
document._backup_input = dict(reference='INPUT',
input_parameter_dict = dict(reference='INPUT',
language='in',
version='004',
short_title='from_input',
contributor='person_module/james')
# pass to discovery file_name and user_login
document.discoverMetadata(document.getSourceReference(), 'john_doe')
# pass to discovery filename and user_login
document.discoverMetadata(filename=document.getFilename(),
user_login='john_doe',
input_parameter_dict=input_parameter_dict)
self.stepTic()
def checkMetadataOrder(self, expected_metadata, document_id='one'):
def checkMetadataOrder(self, document, expected_metadata):
"""
Asserts that metadata of document ID document_id
is the same as expected_metadata
"""
document = self.getDocument(document_id)
for k, v in expected_metadata.items():
self.assertEquals(document.getProperty(k), v)
def receiveEmail(self, data,
portal_type='Document Ingestion Message',
container_path='document_ingestion_module',
file_name='email.emx'):
filename='email.emx'):
return self.portal.portal_contributions.newContent(data=data,
portal_type=portal_type,
container_path=container_path,
file_name=file_name)
filename=filename)
##################################
## Basic steps
......@@ -422,56 +414,63 @@ class TestIngestion(ERP5TypeTestCase):
Create an empty Text document with ID 'one'
This document will be used in most tests.
"""
self.newEmptyCataloggedDocument('Text', 'one')
document = self.newEmptyDocument('Text')
sequence.edit(document_path=document.getPath())
def stepCreateSpreadsheetDocument(self, sequence=None, sequence_list=None, **kw):
"""
Create an empty Spreadsheet document with ID 'two'
This document will be used in most tests.
"""
self.newEmptyCataloggedDocument('Spreadsheet', 'two')
document = self.newEmptyDocument('Spreadsheet')
sequence.edit(document_path=document.getPath())
def stepCreatePresentationDocument(self, sequence=None, sequence_list=None, **kw):
"""
Create an empty Presentation document with ID 'three'
This document will be used in most tests.
"""
self.newEmptyCataloggedDocument('Presentation', 'three')
document = self.newEmptyDocument('Presentation')
sequence.edit(document_path=document.getPath())
def stepCreateDrawingDocument(self, sequence=None, sequence_list=None, **kw):
"""
Create an empty Drawing document with ID 'four'
This document will be used in most tests.
"""
self.newEmptyCataloggedDocument('Drawing', 'four')
document = self.newEmptyDocument('Presentation')
sequence.edit(document_path=document.getPath())
def stepCreatePDFDocument(self, sequence=None, sequence_list=None, **kw):
"""
Create an empty PDF document with ID 'five'
This document will be used in most tests.
"""
self.newEmptyCataloggedDocument('PDF', 'five')
document = self.newEmptyDocument('PDF')
sequence.edit(document_path=document.getPath())
def stepCreateImageDocument(self, sequence=None, sequence_list=None, **kw):
"""
Create an empty Image document with ID 'six'
This document will be used in most tests.
"""
self.newEmptyCataloggedDocument('Image', 'six')
document = self.newEmptyDocument('Image')
sequence.edit(document_path=document.getPath())
def stepCreateFileDocument(self, sequence=None, sequence_list=None, **kw):
"""
Create an empty File document with ID 'file'
This document will be used in most tests.
"""
self.newEmptyCataloggedDocument('File', 'file')
document = self.newEmptyDocument('File')
sequence.edit(document_path=document.getPath())
def stepCheckEmptyState(self, sequence=None, sequence_list=None, **kw):
"""
Check if the document is in "empty" processing state
(ie. no file upload has been done yet)
"""
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
return self.assertEquals(document.getExternalProcessingState(), 'empty')
def stepCheckUploadedState(self, sequence=None, sequence_list=None, **kw):
......@@ -479,7 +478,7 @@ class TestIngestion(ERP5TypeTestCase):
Check if the document is in "uploaded" processing state
(ie. a file upload has been done)
"""
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
return self.assertEquals(document.getExternalProcessingState(), 'uploaded')
def stepCheckConvertingState(self, sequence=None, sequence_list=None, **kw):
......@@ -487,7 +486,7 @@ class TestIngestion(ERP5TypeTestCase):
Check if the document is in "converting" processing state
(ie. a file upload has been done and the document is converting)
"""
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
return self.assertEquals(document.getExternalProcessingState(), 'converting')
def stepCheckConvertedState(self, sequence=None, sequence_list=None, **kw):
......@@ -496,23 +495,22 @@ class TestIngestion(ERP5TypeTestCase):
(ie. a file conversion has been done and the document has
been converted)
"""
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
return self.assertEquals(document.getExternalProcessingState(), 'converted')
def stepStraightUpload(self, sequence=None, sequence_list=None, **kw):
"""
Upload a file directly from the form
check if it has the data and source_reference
check if it has the data and filename
"""
filename = 'TEST-en-002.doc'
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
# First revision is 1 (like web pages)
self.assertEquals(document.getRevision(), '1')
f = makeFileUpload(filename)
document.edit(file=f)
self.assert_(document.hasFile())
# source_reference set to file name ?
self.assertEquals(document.getSourceReference(), filename)
self.assertEquals(document.getFilename(), filename)
# Revision is 1 after upload (revisions are strings)
self.assertEquals(document.getRevision(), '2')
document.reindexObject()
......@@ -522,7 +520,7 @@ class TestIngestion(ERP5TypeTestCase):
"""
Upload a file from view form and make sure this increases the revision
"""
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
f = makeFileUpload('TEST-en-002.doc')
revision = document.getRevision()
document.edit(file=f)
......@@ -535,7 +533,8 @@ class TestIngestion(ERP5TypeTestCase):
Upload a file from contribution.
"""
f = makeFileUpload('TEST-en-002.doc')
self.portal.portal_contributions.newContent(id='one', file=f)
document = self.portal.portal_contributions.newContent(file=f)
sequence.edit(document_path=document.getPath())
transaction.commit()
def stepReuploadTextFromContributionTool(self, sequence=None, sequence_list=None, **kw):
......@@ -543,7 +542,7 @@ class TestIngestion(ERP5TypeTestCase):
Upload a file from contribution form and make sure this update existing
document and don't make a new document.
"""
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
revision = document.getRevision()
number_of_document = len(self.portal.document_module.objectIds())
self.assert_('This document is modified.' not in document.asText())
......@@ -565,10 +564,10 @@ class TestIngestion(ERP5TypeTestCase):
Upload another file from contribution.
"""
f = makeFileUpload('ANOTHE-en-001.doc')
self.portal.portal_contributions.newContent(id='two', file=f)
document = self.portal.portal_contributions.newContent(id='two', file=f)
sequence.edit(document_path=document.getPath())
self.stepTic()
document = self.getDocument('two')
self.assert_('This is a another very interesting document.' in document.asText())
self.assertTrue('This is a another very interesting document.' in document.asText())
self.assertEquals(document.getReference(), 'ANOTHE')
self.assertEquals(document.getVersion(), '001')
self.assertEquals(document.getLanguage(), 'en')
......@@ -579,10 +578,10 @@ class TestIngestion(ERP5TypeTestCase):
discovery and we should have basic coordinates immediately,
from first stage.
"""
document = self.getDocument('one')
file_name = 'TEST-en-002.doc'
document = self.portal.restrictedTraverse(sequence.get('document_path'))
filename = 'TEST-en-002.doc'
# First make sure the regular expressions work
property_dict = document.getPropertyDictFromFileName(file_name)
property_dict = document.getPropertyDictFromFilename(filename)
self.assertEquals(property_dict['reference'], 'TEST')
self.assertEquals(property_dict['language'], 'en')
self.assertEquals(property_dict['version'], '002')
......@@ -593,12 +592,12 @@ class TestIngestion(ERP5TypeTestCase):
self.assertEquals(property_dict['description'], 'comments')
self.assertEquals(property_dict['subject_list'], ['keywords'])
# Then make sure metadata discovery works
f = makeFileUpload(file_name)
f = makeFileUpload(filename)
document.edit(file=f)
self.assertEquals(document.getReference(), 'TEST')
self.assertEquals(document.getLanguage(), 'en')
self.assertEquals(document.getVersion(), '002')
self.assertEquals(document.getSourceReference(), file_name)
self.assertEquals(document.getFilename(), filename)
def stepCheckConvertedContent(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -607,7 +606,7 @@ class TestIngestion(ERP5TypeTestCase):
the word "magic"
"""
self.tic()
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.assert_(document.hasBaseData())
self.assert_('magic' in document.SearchableText())
self.assert_('magic' in str(document.asText()))
......@@ -617,9 +616,9 @@ class TestIngestion(ERP5TypeTestCase):
Create Text_getPropertyDictFrom[source] scripts
to simulate custom site's configuration
"""
self.newPythonScript('one', 'Text_getPropertyDictFromUserLogin',
self.newPythonScript('Text_getPropertyDictFromUserLogin',
'user_name=None', "return {'contributor':'person_module/john'}")
self.newPythonScript('one', 'Text_getPropertyDictFromContent', '',
self.newPythonScript('Text_getPropertyDictFromContent', '',
"return {'short_title':'short', 'title':'title', 'contributor':'person_module/john',}")
def stepTestMetadataSetting(self, sequence=None, sequence_list=None, **kw):
......@@ -627,9 +626,8 @@ class TestIngestion(ERP5TypeTestCase):
Upload with custom getPropertyDict methods
check that all metadata are correct
"""
document = self.getDocument('one')
f = makeFileUpload('TEST-en-002.doc')
document.edit(file=f)
document = self.portal.portal_contributions.newContent(file=f)
self.stepTic()
# Then make sure content discover works
property_dict = document.getPropertyDictFromUserLogin()
......@@ -647,7 +645,7 @@ class TestIngestion(ERP5TypeTestCase):
"""
we change metadata in a document which has ODF
"""
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
kw = dict(title='another title',
subject='another subject',
description='another description')
......@@ -661,7 +659,7 @@ class TestIngestion(ERP5TypeTestCase):
# XXX actually this is an example of how it should be
# implemented in OOoDocument class - we don't really
# need oood for getting/setting metadata...
document = self.getDocument('one')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
newcontent = document.getBaseData()
builder = OOoBuilder(newcontent)
xml_tree = etree.fromstring(builder.extract('meta.xml'))
......@@ -678,23 +676,28 @@ class TestIngestion(ERP5TypeTestCase):
make sure they are converted
"""
format_list = ['rtf', 'doc', 'txt', 'sxw', 'sdw']
self.ingestFormatList('one', format_list)
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.ingestFormatList(document, format_list)
def stepIngestSpreadsheetFormats(self, sequence=None, sequence_list=None, **kw):
def stepIngestSpreadsheetFormats(self, sequence=None, sequence_list=None,
**kw):
"""
ingest all supported spreadsheet formats
make sure they are converted
"""
format_list = ['xls', 'sxc', 'sdc']
self.ingestFormatList('two', format_list)
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.ingestFormatList(document, format_list)
def stepIngestPresentationFormats(self, sequence=None, sequence_list=None, **kw):
def stepIngestPresentationFormats(self, sequence=None, sequence_list=None,
**kw):
"""
ingest all supported presentation formats
make sure they are converted
"""
format_list = ['ppt', 'sxi', 'sdd']
self.ingestFormatList('three', format_list)
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.ingestFormatList(document, format_list)
def stepIngestPDFFormats(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -702,7 +705,8 @@ class TestIngestion(ERP5TypeTestCase):
make sure they are converted
"""
format_list = ['pdf']
self.ingestFormatList('five', format_list)
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.ingestFormatList(document, format_list)
def stepIngestDrawingFormats(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -710,7 +714,8 @@ class TestIngestion(ERP5TypeTestCase):
make sure they are converted
"""
format_list = ['sxd',]
self.ingestFormatList('four', format_list)
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.ingestFormatList(document, format_list)
def stepIngestPDFFormats(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -718,39 +723,52 @@ class TestIngestion(ERP5TypeTestCase):
make sure they are converted
"""
format_list = ['pdf']
self.ingestFormatList('five', format_list)
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.ingestFormatList(document, format_list)
def stepIngestImageFormats(self, sequence=None, sequence_list=None, **kw):
"""
ingest all supported image formats
"""
format_list = ['jpg', 'gif', 'bmp', 'png']
self.ingestFormatList('six', format_list, 'Image')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.ingestFormatList(document, format_list)
def stepIngestFileFormats(self, sequence=None, sequence_list=None, **kw):
"""
ingest all supported file formats
"""
format_list = ['txt', 'rss', 'xml',]
self.ingestFormatList('file', format_list)
def stepCheckTextDocumentExportList(self, sequence=None, sequence_list=None, **kw):
self.checkDocumentExportList('one', 'doc', ['pdf', 'doc', 'rtf', 'writer.html', 'txt'])
def stepCheckSpreadsheetDocumentExportList(self, sequence=None, sequence_list=None, **kw):
self.checkDocumentExportList('two', 'xls', ['csv', 'calc.html', 'xls', 'calc.pdf'])
def stepCheckPresentationDocumentExportList(self, sequence=None, sequence_list=None, **kw):
self.checkDocumentExportList('three', 'ppt', ['impr.pdf', 'ppt'])
def stepCheckDrawingDocumentExportList(self, sequence=None, sequence_list=None, **kw):
self.checkDocumentExportList('four', 'sxd', ['jpg', 'draw.pdf', 'svg'])
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.ingestFormatList(document, format_list)
def stepCheckTextDocumentExportList(self, sequence=None, sequence_list=None,
**kw):
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.checkDocumentExportList(document, 'doc',
['pdf', 'doc', 'rtf', 'writer.html', 'txt'])
def stepCheckSpreadsheetDocumentExportList(self, sequence=None,
sequence_list=None, **kw):
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.checkDocumentExportList(document, 'xls',
['csv', 'calc.html', 'xls', 'calc.pdf'])
def stepCheckPresentationDocumentExportList(self, sequence=None,
sequence_list=None, **kw):
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.checkDocumentExportList(document, 'ppt', ['impr.pdf', 'ppt'])
def stepCheckDrawingDocumentExportList(self, sequence=None,
sequence_list=None, **kw):
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.checkDocumentExportList(document, 'sxd', ['jpg', 'draw.pdf', 'svg'])
def stepExportPDF(self, sequence=None, sequence_list=None, **kw):
"""
Try to export PDF to text and HTML
"""
document = self.getDocument('five')
document = self.portal.restrictedTraverse(sequence.get('document_path'))
f = makeFileUpload('TEST-en-002.pdf')
document.edit(file=f)
mime, text = document.convert('text')
......@@ -764,7 +782,7 @@ class TestIngestion(ERP5TypeTestCase):
"""
Check we are able to resize images
"""
image = self.portal.image_module.six
image = self.portal.restrictedTraverse(sequence.get('document_path'))
f = makeFileUpload('TEST-en-002.jpg')
image.edit(file=f)
self.stepTic()
......@@ -781,7 +799,7 @@ class TestIngestion(ERP5TypeTestCase):
"""
portal = self.getPortal()
for module in (portal.document_module, portal.image_module, portal.document_ingestion_module):
module.manage_delObjects(map(None, module.objectIds()))
module.manage_delObjects(list(module.objectIds()))
def stepContributeFileListWithType(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -790,14 +808,16 @@ class TestIngestion(ERP5TypeTestCase):
"""
self.contributeFileList(with_portal_type=True)
def stepContributeFileListWithNoType(self, sequence=None, sequence_list=None, **kw):
def stepContributeFileListWithNoType(self, sequence=None, sequence_list=None,
**kw):
"""
Contribute all kinds of files
let the system figure out portal type by itself
"""
self.contributeFileList(with_portal_type=False)
def stepSetSimulatedDiscoveryScriptForOrdering(self, sequence=None, sequence_list=None, **kw):
def stepSetSimulatedDiscoveryScriptForOrdering(self, sequence=None,
sequence_list=None, **kw):
"""
set scripts which are supposed to overwrite each other's metadata
desing is the following:
......@@ -808,53 +828,85 @@ class TestIngestion(ERP5TypeTestCase):
contributor john jack james
short_title from_content from_input
"""
self.newPythonScript('one', 'Text_getPropertyDictFromUserLogin', 'user_name=None', "return {'reference':'USER', 'language':'us', 'contributor':'person_module/john'}")
self.newPythonScript('one', 'Text_getPropertyDictFromContent', '', "return {'reference':'CONT', 'version':'003', 'contributor':'person_module/jack', 'short_title':'from_content'}")
def stepCheckMetadataSettingOrderFICU(self, sequence=None, sequence_list=None, **kw):
input_dict = dict(reference='INPUT',
language='in',
version='004',
short_title='from_input',
contributor='person_module/james')
self.newPythonScript('Text_getPropertyDictFromInput',
'inputed_kw', "return %r" % (input_dict,))
self.newPythonScript('Text_getPropertyDictFromUserLogin', 'user_name=None',
"return {'reference':'USER', 'language':'us',"\
" 'contributor':'person_module/john'}")
self.newPythonScript('Text_getPropertyDictFromContent', '',
"return {'reference':'CONT', 'version':'003',"\
" 'contributor':'person_module/jack',"\
" 'short_title':'from_content'}")
def stepCheckMetadataSettingOrderFICU(self, sequence=None,
sequence_list=None, **kw):
"""
This is the default
"""
expected_metadata = dict(reference='TEST', language='en', version='002', short_title='from_input', contributor='person_module/james')
self.setDiscoveryOrder(['file_name', 'input', 'content', 'user_login'])
self.discoverMetadata()
self.checkMetadataOrder(expected_metadata)
expected_metadata = dict(reference='TEST', language='en', version='002',
short_title='from_input',
contributor='person_module/james')
self.setDiscoveryOrder(['filename', 'input', 'content', 'user_login'])
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.discoverMetadata(document)
self.checkMetadataOrder(document, expected_metadata)
def stepCheckMetadataSettingOrderCUFI(self, sequence=None, sequence_list=None, **kw):
def stepCheckMetadataSettingOrderCUFI(self, sequence=None,
sequence_list=None, **kw):
"""
Content - User - Filename - Input
"""
expected_metadata = dict(reference='CONT', language='us', version='003', short_title='from_content', contributor='person_module/jack')
self.setDiscoveryOrder(['content', 'user_login', 'file_name', 'input'])
self.discoverMetadata()
self.checkMetadataOrder(expected_metadata)
expected_metadata = dict(reference='CONT', language='us', version='003',
short_title='from_content',
contributor='person_module/jack')
self.setDiscoveryOrder(['content', 'user_login', 'filename', 'input'])
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.discoverMetadata(document)
self.checkMetadataOrder(document, expected_metadata)
def stepCheckMetadataSettingOrderUIFC(self, sequence=None, sequence_list=None, **kw):
def stepCheckMetadataSettingOrderUIFC(self, sequence=None,
sequence_list=None, **kw):
"""
User - Input - Filename - Content
"""
expected_metadata = dict(reference='USER', language='us', version='004', short_title='from_input', contributor='person_module/john')
self.setDiscoveryOrder(['user_login', 'input', 'file_name', 'content'])
self.discoverMetadata()
self.checkMetadataOrder(expected_metadata)
expected_metadata = dict(reference='USER', language='us', version='004',
short_title='from_input',
contributor='person_module/john')
self.setDiscoveryOrder(['user_login', 'input', 'filename', 'content'])
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.discoverMetadata(document)
self.checkMetadataOrder(document, expected_metadata)
def stepCheckMetadataSettingOrderICUF(self, sequence=None, sequence_list=None, **kw):
def stepCheckMetadataSettingOrderICUF(self, sequence=None,
sequence_list=None, **kw):
"""
Input - Content - User - Filename
"""
expected_metadata = dict(reference='INPUT', language='in', version='004', short_title='from_input', contributor='person_module/james')
self.setDiscoveryOrder(['input', 'content', 'user_login', 'file_name'])
self.discoverMetadata()
self.checkMetadataOrder(expected_metadata)
expected_metadata = dict(reference='INPUT', language='in', version='004',
short_title='from_input',
contributor='person_module/james')
self.setDiscoveryOrder(['input', 'content', 'user_login', 'filename'])
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.discoverMetadata(document)
self.checkMetadataOrder(document, expected_metadata)
def stepCheckMetadataSettingOrderUFCI(self, sequence=None, sequence_list=None, **kw):
def stepCheckMetadataSettingOrderUFCI(self, sequence=None,
sequence_list=None, **kw):
"""
User - Filename - Content - Input
"""
expected_metadata = dict(reference='USER', language='us', version='002', short_title='from_content', contributor='person_module/john')
self.setDiscoveryOrder(['user_login', 'file_name', 'content', 'input'])
self.discoverMetadata()
self.checkMetadataOrder(expected_metadata)
expected_metadata = dict(reference='USER', language='us', version='002',
short_title='from_content',
contributor='person_module/john')
self.setDiscoveryOrder(['user_login', 'filename', 'content', 'input'])
document = self.portal.restrictedTraverse(sequence.get('document_path'))
self.discoverMetadata(document)
self.checkMetadataOrder(document, expected_metadata)
def stepReceiveEmail(self, sequence=None, sequence_list=None, **kw):
"""
......@@ -864,7 +916,8 @@ class TestIngestion(ERP5TypeTestCase):
document = self.receiveEmail(f.read())
self.stepTic()
def stepReceiveMultipleAttachmentsEmail(self, sequence=None, sequence_list=None, **kw):
def stepReceiveMultipleAttachmentsEmail(self, sequence=None,
sequence_list=None, **kw):
"""
Email was sent in by someone to ERP5.
"""
......@@ -958,7 +1011,7 @@ class TestIngestion(ERP5TypeTestCase):
reference='MAIL',
language='en',
version='002')
self.assertEquals('MAIL-en-002.doc', ingested_document.getSourceReference())
self.assertEquals('MAIL-en-002.doc', ingested_document.getFilename())
self.assertEquals('converted', ingested_document.getExternalProcessingState())
self.assertTrue('magic' in ingested_document.asText())
......@@ -978,7 +1031,7 @@ class TestIngestion(ERP5TypeTestCase):
conversion_dict = _getConversionServerDict()
self.assertEquals(preference_tool.getPreferredOoodocServerAddress(), conversion_dict['hostname'])
self.assertEquals(preference_tool.getPreferredOoodocServerPortNumber(), conversion_dict['port'])
self.assertEquals(preference_tool.getPreferredDocumentFileNameRegularExpression(), FILE_NAME_REGULAR_EXPRESSION)
self.assertEquals(preference_tool.getPreferredDocumentFilenameRegularExpression(), FILENAME_REGULAR_EXPRESSION)
self.assertEquals(preference_tool.getPreferredDocumentReferenceRegularExpression(), REFERENCE_REGULAR_EXPRESSION)
def test_02_FileExtensionRegistry(self):
......@@ -1008,8 +1061,8 @@ class TestIngestion(ERP5TypeTestCase):
'xxx' : 'File',
}
for type, portal_type in correct_type_mapping.items():
file_name = 'aaa.' + type
self.assertEquals(reg.findPortalTypeName(file_name, None, None),
filename = 'aaa.' + type
self.assertEquals(reg.findPortalTypeName(filename=filename),
portal_type)
def test_03_TextDoc(self):
......@@ -1300,7 +1353,7 @@ class TestIngestion(ERP5TypeTestCase):
"""
f = makeFileUpload('TEST-en-002.doc', 'T&é@{T-en-002.doc')
document = self.portal.portal_contributions.newContent(file=f)
sequence.edit(document_id=document.getId())
sequence.edit(document_path=document.getPath())
transaction.commit()
def stepDiscoverFromFilenameWithNonASCIIFilename(self,
......@@ -1310,10 +1363,10 @@ class TestIngestion(ERP5TypeTestCase):
discovery and we should have basic coordinates immediately,
from first stage.
"""
context = self.getDocument(sequence.get('document_id'))
file_name = 'T&é@{T-en-002.doc'
context = self.portal.restrictedTraverse(sequence.get('document_path'))
filename = 'T&é@{T-en-002.doc'
# First make sure the regular expressions work
property_dict = context.getPropertyDictFromFileName(file_name)
property_dict = context.getPropertyDictFromFilename(filename)
self.assertEquals(property_dict['reference'], 'T&é@{T')
self.assertEquals(property_dict['language'], 'en')
self.assertEquals(property_dict['version'], '002')
......@@ -1327,7 +1380,7 @@ class TestIngestion(ERP5TypeTestCase):
self.assertEquals(context.getReference(), 'T&é@{T')
self.assertEquals(context.getLanguage(), 'en')
self.assertEquals(context.getVersion(), '002')
self.assertEquals(context.getSourceReference(), file_name)
self.assertEquals(context.getFilename(), filename)
def test_13_UploadTextFromContributionToolWithNonASCIIFilename(self):
"""
......@@ -1363,8 +1416,8 @@ class TestIngestion(ERP5TypeTestCase):
self.assertEquals(1,
len(portal.portal_catalog(path=contribution_tool.getPath())))
def test_15_TestFileNameDiscovery(self):
"""Test that filename is well set in source_reference
def test_15_TestFilenameDiscovery(self):
"""Test that filename is well set in filename
- filename can we discovery from file
- filename can be pass as argument by the user
"""
......@@ -1372,12 +1425,12 @@ class TestIngestion(ERP5TypeTestCase):
contribution_tool = getToolByName(portal, 'portal_contributions')
file_object = makeFileUpload('TEST-en-002.doc')
document = contribution_tool.newContent(file=file_object)
self.assertEquals(document.getSourceReference(), 'TEST-en-002.doc')
self.assertEquals(document.getFilename(), 'TEST-en-002.doc')
my_filename = 'Something.doc'
document = contribution_tool.newContent(file=file_object,
file_name=my_filename)
filename=my_filename)
self.stepTic()
self.assertEquals(document.getSourceReference(), my_filename)
self.assertEquals(document.getFilename(), my_filename)
def test_16_TestMetadataDiscoveryFromUserLogin(self):
"""
......@@ -1395,16 +1448,502 @@ class TestIngestion(ERP5TypeTestCase):
self.stepTic()
file_object = makeFileUpload('TEST-en-002.doc')
document = contribution_tool.newContent(file=file_object)
document.discoverMetadata(document.getSourceReference(), 'contributor1')
document.discoverMetadata(document.getFilename(), 'contributor1')
self.stepTic()
self.assertEquals(document.getSourceReference(), 'TEST-en-002.doc')
self.assertEquals(document.getFilename(), 'TEST-en-002.doc')
self.assertEquals('anybody', document.getGroup())
self.assertEquals('site/arctic/spitsbergen', document.getSite())
# Missing tests
def test_IngestionConfigurationByTypeBasedMethod_usecase1(self):
"""How to configure meta data discovery so that each time a file
with same URL is uploaded, a new document is created with same reference
but increased version ?
"""
input_script_id = 'Document_getPropertyDictFromContent'
python_code = """from Products.CMFCore.utils import getToolByName
portal = context.getPortalObject()
information = context.getContentInformation()
result = {}
property_id_list = context.propertyIds()
for k, v in information.items():
key = k.lower()
if v:
if isinstance(v, unicode):
v = v.encode('utf-8')
if key in property_id_list:
if key == 'reference':
pass # XXX - We can not trust reference on getContentInformation
else:
result[key] = v
elif key == 'author':
p = context.portal_catalog.getResultValue(title=v, portal_type='Person')
if p is not None:
result['contributor'] = p.getRelativeUrl()
elif key == 'keywords':
result['subject_list'] = v.split()
reference = context.asNormalisedURL()
result['reference'] = reference
id_group = ('dms_version_generator', reference)
result['version'] = '%.5d' % (portal.portal_ids.generateNewId(id_group=id_group, default=1))
return result
"""
self.newPythonScript(input_script_id, '', python_code)
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest.publish()
transaction.commit()
self.tic()
url = document_to_ingest.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), first_doc.asNormalisedURL())
self.assertEquals(first_doc.getVersion(), '00001')
self.assertEquals(first_doc.asURL(), url)
second_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), second_doc.asNormalisedURL())
self.assertEquals(second_doc.getVersion(), '00002')
self.assertEquals(second_doc.asURL(), url)
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest2.publish()
transaction.commit()
self.tic()
url2 = document_to_ingest2.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), first_doc.asNormalisedURL())
self.assertEquals(first_doc.getVersion(), '00001')
self.assertEquals(first_doc.asURL(), url2)
second_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), second_doc.asNormalisedURL())
self.assertEquals(second_doc.getVersion(), '00002')
self.assertEquals(second_doc.asURL(), url2)
def test_IngestionConfigurationByTypeBasedMethod_usecase2(self):
"""How to configure meta data discovery so that each time a file
with same URL is uploaded, a new document is created
with same reference but same version ?
"""
input_script_id = 'Document_getPropertyDictFromContent'
python_code = """from Products.CMFCore.utils import getToolByName
portal = context.getPortalObject()
information = context.getContentInformation()
result = {}
property_id_list = context.propertyIds()
for k, v in information.items():
key = k.lower()
if v:
if isinstance(v, unicode):
v = v.encode('utf-8')
if key in property_id_list:
if key == 'reference':
pass # XXX - We can not trust reference on getContentInformation
else:
result[key] = v
elif key == 'author':
p = context.portal_catalog.getResultValue(title=v, portal_type='Person')
if p is not None:
result['contributor'] = p.getRelativeUrl()
elif key == 'keywords':
result['subject_list'] = v.split()
reference = context.asNormalisedURL()
result['reference'] = reference
return result
"""
self.newPythonScript(input_script_id, '', python_code)
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest.publish()
transaction.commit()
self.tic()
url = document_to_ingest.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), first_doc.asNormalisedURL())
self.assertEquals(first_doc.getVersion(), '001')
self.assertEquals(first_doc.asURL(), url)
second_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), second_doc.asNormalisedURL())
self.assertEquals(second_doc.getVersion(), '001')
self.assertEquals(second_doc.asURL(), url)
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest2.publish()
transaction.commit()
self.tic()
url2 = document_to_ingest2.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), first_doc.asNormalisedURL())
self.assertEquals(first_doc.getVersion(), '001')
self.assertEquals(first_doc.asURL(), url2)
second_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), second_doc.asNormalisedURL())
self.assertEquals(second_doc.getVersion(), '001')
self.assertEquals(second_doc.asURL(), url2)
def test_IngestionConfigurationByTypeBasedMethod_usecase3(self):
"""How to discover metadata so that each new document
has a new reference which is generated automatically
as an increase sequence of numbers ?
"""
input_script_id = 'Document_finishIngestion'
python_code = """from Products.CMFCore.utils import getToolByName
portal = context.getPortalObject()
portal_ids = getToolByName(portal, 'portal_ids')
id_group = 'dms_reference_generator3'
reference = 'I CHOOSED THIS REFERENCE %s' % portal.portal_ids.generateNewId(id_group=id_group)
context.setReference(reference)
"""
self.newPythonScript(input_script_id, '', python_code)
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest.publish()
transaction.commit()
self.tic()
url = document_to_ingest.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 1')
self.assertEquals(first_doc.getVersion(), '001')
self.assertEquals(first_doc.asURL(), url)
second_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 2')
self.assertEquals(second_doc.getVersion(), '001')
self.assertEquals(second_doc.asURL(), url)
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest2.publish()
transaction.commit()
self.tic()
self.assertEquals(document_to_ingest2.getReference(),
'I CHOOSED THIS REFERENCE 3')
url2 = document_to_ingest2.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 4')
self.assertEquals(first_doc.getVersion(), '001')
self.assertEquals(first_doc.asURL(), url2)
second_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 5')
self.assertEquals(second_doc.getVersion(), '001')
self.assertEquals(second_doc.asURL(), url2)
def test_IngestionConfigurationByTypeBasedMethod_usecase4(self):
"""How to configure meta data discovery so that each time a file
with same URL is uploaded, a new document is created
with same reference (generated automatically as an
increase sequence of numbers) but increased version ?
"""
input_script_id = 'Document_getPropertyDictFromContent'
python_code = """from Products.CMFCore.utils import getToolByName
portal = context.getPortalObject()
information = context.getContentInformation()
result = {}
property_id_list = context.propertyIds()
for k, v in information.items():
key = k.lower()
if v:
if isinstance(v, unicode):
v = v.encode('utf-8')
if key in property_id_list:
if key == 'reference':
pass # XXX - We can not trust reference on getContentInformation
else:
result[key] = v
elif key == 'author':
p = context.portal_catalog.getResultValue(title=v, portal_type='Person')
if p is not None:
result['contributor'] = p.getRelativeUrl()
elif key == 'keywords':
result['subject_list'] = v.split()
url = context.asNormalisedURL()
portal_url_registry = getToolByName(context.getPortalObject(),
'portal_url_registry')
try:
reference = portal_url_registry.getReferenceFromURL(url)
except KeyError:
id_group = 'dms_reference_generator4'
reference = 'I CHOOSED THIS REFERENCE %s' % portal.portal_ids.generateNewId(id_group=id_group)
result['reference'] = reference
id_group = ('dms_version_generator', reference)
result['version'] = '%.5d' % (portal.portal_ids.generateNewId(id_group=id_group, default=1))
return result
"""
property_dict = context.getPropertyDictFromInput()
self.newPythonScript(input_script_id, '', python_code)
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest.publish()
transaction.commit()
self.tic()
url = document_to_ingest.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 1')
self.assertEquals(first_doc.getVersion(), '00001')
self.assertEquals(first_doc.asURL(), url)
second_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 1')
self.assertEquals(second_doc.getVersion(), '00002')
self.assertEquals(second_doc.asURL(), url)
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest2.publish()
transaction.commit()
self.tic()
self.assertEquals(document_to_ingest2.getReference(),
'I CHOOSED THIS REFERENCE 2')
url2 = document_to_ingest2.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 3')
self.assertEquals(first_doc.getVersion(), '00001')
self.assertEquals(first_doc.asURL(), url2)
second_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 3')
self.assertEquals(second_doc.getVersion(), '00002')
self.assertEquals(second_doc.asURL(), url2)
def test_IngestionConfigurationByTypeBasedMethod_usecase5(self):
"""How to configure meta data discovery so that each time a file
with same URL is uploaded, a new document is created
with same reference (generated automatically as
an increase sequence of numbers) but same version?
"""
input_script_id = 'Document_getPropertyDictFromContent'
python_code = """from Products.CMFCore.utils import getToolByName
portal = context.getPortalObject()
information = context.getContentInformation()
result = {}
property_id_list = context.propertyIds()
for k, v in information.items():
key = k.lower()
if v:
if isinstance(v, unicode):
v = v.encode('utf-8')
if key in property_id_list:
if key == 'reference':
pass # XXX - We can not trust reference on getContentInformation
else:
result[key] = v
elif key == 'author':
p = context.portal_catalog.getResultValue(title=v, portal_type='Person')
if p is not None:
result['contributor'] = p.getRelativeUrl()
elif key == 'keywords':
result['subject_list'] = v.split()
url = context.asNormalisedURL()
portal_url_registry = getToolByName(context.getPortalObject(),
'portal_url_registry')
try:
reference = portal_url_registry.getReferenceFromURL(url)
except KeyError:
id_group = 'dms_reference_generator5'
reference = 'I CHOOSED THIS REFERENCE %s' % portal.portal_ids.generateNewId(id_group=id_group)
result['reference'] = reference
return result
"""
self.newPythonScript(input_script_id, '', python_code)
document_to_ingest = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest.publish()
transaction.commit()
self.tic()
url = document_to_ingest.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 1')
self.assertEquals(first_doc.getVersion(), '001')
self.assertEquals(first_doc.asURL(), url)
second_doc = self.portal.portal_contributions.newContent(url=url)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 1')
self.assertEquals(second_doc.getVersion(), '001')
self.assertEquals(second_doc.asURL(), url)
document_to_ingest2 = self.portal.portal_contributions.newContent(
portal_type='File',
filename='toto.txt',
data='Hello World!')
document_to_ingest2.publish()
transaction.commit()
self.tic()
self.assertEquals(document_to_ingest2.getReference(),
'I CHOOSED THIS REFERENCE 2')
url2 = document_to_ingest2.absolute_url() + '/getData'
first_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(first_doc.getPortalType(), 'Text')
self.assertEquals(first_doc.getContentType(), 'text/plain')
self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 3')
self.assertEquals(first_doc.getVersion(), '001')
self.assertEquals(first_doc.asURL(), url2)
second_doc = self.portal.portal_contributions.newContent(url=url2)
transaction.commit()
self.tic()
self.assertEquals(second_doc.getPortalType(), 'Text')
self.assertEquals(second_doc.getContentType(), 'text/plain')
self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 3')
self.assertEquals(second_doc.getVersion(), '001')
self.assertEquals(second_doc.asURL(), url2)
def test_IngestionConfigurationByTypeBasedMethod_usecase6(self):
"""How to configure meta data discovery so that a Spreadsheet
as a application/octet-stream without explicit extension, become
a Spreadsheet ?
"""
path = makeFilePath('import_region_category.xls')
data = open(path, 'r').read()
document = self.portal.portal_contributions.newContent(filename='toto',
data=data,
reference='Custom.Reference')
transaction.commit()
self.tic()# Discover metadata will delete first ingested document
# then reingest new one with appropriate portal_type
result_list = self.portal.portal_catalog(reference='Custom.Reference')
self.assertEquals(len(result_list), 1)
self.assertEquals(result_list[0].getPortalType(), 'Spreadsheet')
def test_IngestionConfigurationByTypeBasedMethod_usecase7(self):
"""How to reingest a published document, by a user action ?
If after a while the user decide to change the portal_type of a
published document , File => Text ?
"""
module = self.portal.document_module
document = module.newContent(portal_type='File',
property_which_doesnot_exists='Foo',
data='Hello World!',
filename='toto.txt')
document.publish()
transaction.commit()
self.tic()
document.edit(title='One title', reference='EFAA')
transaction.commit()
self.tic()
# Now change it to a Text portal_type
new_doc = document.migratePortalType('Text')
transaction.commit()
self.tic()
self.assertEquals(new_doc.getPortalType(), 'Text')
self.assertEquals(new_doc.getProperty('property_which_doesnot_exists'),
'Foo')
self.assertEquals(new_doc.getTitle(), 'One title')
self.assertEquals(new_doc.getReference(), 'EFAA')
self.assertEquals(new_doc.getValidationState(), 'published')
self.assertEquals(new_doc.getData(), 'Hello World!')
# Migrate a document with url property
url = new_doc.absolute_url() + '/getData'
document = self.portal.portal_contributions.newContent(url=url)
document.submit()
transaction.commit()
self.tic()
self.assertEquals(document.getPortalType(), 'Text')
# Change it to File
new_doc = document.migratePortalType('File')
self.assertEquals(new_doc.getPortalType(), 'File')
self.assertEquals(new_doc.asURL(), url)
self.assertEquals(new_doc.getData(), 'Hello World!')
self.assertEquals(new_doc.getValidationState(), 'submitted')
def test_suite():
suite = unittest.TestSuite()
......
......@@ -42,11 +42,6 @@ from zLOG import LOG
import os
TEST_FILES_HOME = os.path.join(os.path.dirname(__file__), 'test_document')
FILE_NAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})"
REFERENCE_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})(-(?P<language>[a-z]{2}))?(-(?P<version>[0-9]{3}))?"
def makeFilePath(name):
return os.path.join(os.path.dirname(__file__), 'test_document', name)
......@@ -291,7 +286,10 @@ class TestDocumentConversionCache(TestDocumentMixin):
filename = 'TEST-en-002.doc'
file = makeFileUpload(filename)
document_id = 'an id with spaces'
document = self.portal.portal_contributions.newContent(id=document_id, file=file)
portal_type = 'Text'
module = self.portal.getDefaultModule(portal_type)
document = module.newContent(id=document_id, file=file,
portal_type=portal_type)
transaction.commit()
self.tic()
document_url = document.getRelativeUrl()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment