Commit 3b7f49e9 authored by Bartek Górny's avatar Bartek Górny

complete ingestion, contribution and conversion test

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@13471 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent b69f4877
......@@ -28,6 +28,7 @@
import os
import sys
import cStringIO
from xml.dom.minidom import parseString
import zipfile
......@@ -81,9 +82,13 @@ class FileUploadTest(file):
def __init__(self, path, name):
self.filename = name
file.__init__(self, path)
self.headers = {}
def makeFilePath(name):
return os.getenv('INSTANCE_HOME') + '/../Products/ERP5OOo/tests/' + name
def makeFileUpload(name):
path = os.getenv('INSTANCE_HOME') + '/../Products/ERP5OOo/tests/' + name
path = makeFilePath(name)
return FileUploadTest(path, name)
class TestIngestion(ERP5TypeTestCase):
......@@ -130,12 +135,22 @@ class TestIngestion(ERP5TypeTestCase):
"""
# XXX portal_contributions is not created in bootstrap
# so we have to create it here
# before we delete in case it was created before and --saved
try:
self.portal._delObject('portal_contributions')
except AttributeError:
pass
addTool = self.portal.manage_addProduct['ERP5'].manage_addTool
addTool('ERP5 Contribution Tool', None)
# the same for portal_mailin
try:
self.portal._delObject('portal_mailin')
except AttributeError:
pass
addTool = self.portal.manage_addProduct['CMFMailIn'].manage_addTool
addTool('CMF Mail In Tool', None)
mailin = self.portal.portal_mailin
mailin.edit_configuration('Document_ingestEmail')
# XXX content_type_registry is not services by business templating mechanism
# so it has to be exported and placed in ../../../unit_test/import/ director
# we import it here
......@@ -185,6 +200,9 @@ class TestIngestion(ERP5TypeTestCase):
,{'path' : 'site/arctic/spitsbergen'
,'title': 'Spitsbergen'
}
,{'path' : 'group/anybody'
,'title': 'Anybody'
}
]
# Create categories
......@@ -220,6 +238,8 @@ class TestIngestion(ERP5TypeTestCase):
if hasattr(new_category, method_id):
method = getattr(new_category, method_id)
method(value.encode('UTF-8'))
get_transaction().commit()
self.tic()
def getCategoryList(self, base_category=None):
"""
......@@ -232,6 +252,11 @@ class TestIngestion(ERP5TypeTestCase):
categories.append(category)
return categories
def getDocument(self, id):
dm = self.portal.document_module
doc = getattr(dm, id)
return doc
def checkObjectCatalogged(self, portal_type, reference):
"""
make sure this object is already in the catalog
......@@ -289,8 +314,7 @@ class TestIngestion(ERP5TypeTestCase):
make sure targets are in
the objects target format list
"""
dm = self.portal.document_module
context = getattr(dm, doc_id)
context = self.getDocument(doc_id)
filename = 'TEST-en-002.' + format
f = makeFileUpload(filename)
context.edit(file=f)
......@@ -302,11 +326,16 @@ class TestIngestion(ERP5TypeTestCase):
self.assert_(target in target_list)
def contributeFiles(self, with_portal_type=False):
"""
some file types fail, but generally it shows that if the content type
is detected correctly, everything goes smooth
the sdc issue requires further investigation
"""
ext2type = (
('ppt' , 'Presentation')
,('doc' , 'Text')
,('sdc' , 'Spreadsheet')
,('sxc' , 'Drawing')
#,('sdc' , 'Spreadsheet') - XXX fails for totally mysterious reasons
#,('sxc' , 'Drawing') - not with this content_type_registry
,('pdf' , 'PDF')
,('jpg' , 'Image')
,('py' , 'File')
......@@ -315,10 +344,13 @@ class TestIngestion(ERP5TypeTestCase):
shout(ext)
filename = 'TEST-en-002.' + ext
file = makeFileUpload(filename)
shout(file.filename)
if with_portal_type:
ob = self.portal.portal_contributions.newContent(portal_type=typ, file=file)
else:
ob = self.portal.portal_contributions.newContent(file=file)
get_transaction().commit()
self.tic()
self.assertEquals(ob.getPortalType(), typ)
self.assertEquals(ob.getReference(), 'TEST')
ob.reindexObject(); get_transaction().commit(); self.tic()
......@@ -328,6 +360,34 @@ class TestIngestion(ERP5TypeTestCase):
self.assertEquals(ob.getExternalProcessingState(), 'converted') # this is how we know if it was ok or not
self.assert_('magic' in ob.SearchableText())
def setDiscoveryScript(self, object_id, script_id, args, code):
context = self.getDocument(object_id)
factory = context.manage_addProduct['PythonScripts'].manage_addPythonScript
factory(id=script_id)
script = getattr(context, script_id)
script.ZPythonScript_edit(args, code)
def setDiscoveryOrder(self, order, id='one'):
"""
Creates and sets appropriate discovery order list script
"""
script_code = "return %s" % str(order)
self.setDiscoveryScript(id, 'Text_getPreferredDocumentMetadataDiscoveryOrderList', '', script_code)
def discoverMetadata(self):
context = self.getDocument('one')
shout(context.Text_getPreferredDocumentMetadataDiscoveryOrderList())
context._backup_input = dict(reference='INPUT', language='in', version='004', short_title='from_input', contributor='person_module/james')
shout(context.getSourceReference())
context.discoverMetadata(context.getSourceReference())
# we don't need user_login here because
# for testing we overwrite Text_getPropertyDictFromUserLogin
context.reindexObject(); get_transaction().commit(); self.tic()
def checkMetadataOrder(self, expected):
context = self.getDocument('one')
for k, v in expected.items():
self.assertEquals(context.getProperty(k), v)
##################################
## Basic steps
......@@ -383,6 +443,7 @@ class TestIngestion(ERP5TypeTestCase):
, id=id
, reference = reference
)
person.setDefaultEmailText('john@doe.com')
person.reindexObject(); get_transaction().commit(); self.tic()
def stepCreateTextDocument(self, sequence=None, sequence_list=None, **kw):
......@@ -437,24 +498,21 @@ class TestIngestion(ERP5TypeTestCase):
"""
check if the document is in "empty" processing state
"""
dm = self.portal.document_module
context = getattr(dm, 'one', None)
context = self.getDocument('one')
return self.assertEquals(context.getExternalProcessingState(), 'empty')
def stepCheckUploadedState(self, sequence=None, sequence_list=None, **kw):
"""
check if the document is in "uploaded" processing state
"""
dm = self.portal.document_module
context = getattr(dm, 'one', None)
context = self.getDocument('one')
return self.assertEquals(context.getExternalProcessingState(), 'uploaded')
def stepCheckConvertedState(self, sequence=None, sequence_list=None, **kw):
"""
check if the document is in "converted" processing state
"""
dm = self.portal.document_module
context = getattr(dm, 'one', None)
context = self.getDocument('one')
return self.assertEquals(context.getExternalProcessingState(), 'converted')
def stepStraightUpload(self, sequence=None, sequence_list=None, **kw):
......@@ -462,8 +520,7 @@ class TestIngestion(ERP5TypeTestCase):
Upload a file directly from the form
check if it has the data and source_reference
"""
dm = self.portal.document_module
doc = getattr(dm, 'one')
doc = self.getDocument('one')
f = makeFileUpload('TEST-en-002.doc')
doc.edit(file=f)
self.assert_(doc.hasFile())
......@@ -476,8 +533,7 @@ class TestIngestion(ERP5TypeTestCase):
upload a file using dialog
should increase revision
"""
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
f = makeFileUpload('TEST-en-002.doc')
context.Document_uploadFile(file=f)
self.assertEquals(context.getRevision(), '001')
......@@ -489,8 +545,7 @@ class TestIngestion(ERP5TypeTestCase):
this should trigger metadata discovery and we should have
basic coordinates immediately, from first stage
"""
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
f = makeFileUpload('TEST-en-002.doc')
context.Document_uploadFile(file=f)
self.assertEquals(context.getReference(), 'TEST')
......@@ -503,8 +558,7 @@ class TestIngestion(ERP5TypeTestCase):
and that it includes what it should
"""
self.tic()
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
self.assert_(context.hasOOFile())
self.assert_('magic' in context.SearchableText())
......@@ -513,26 +567,15 @@ class TestIngestion(ERP5TypeTestCase):
Create Text_getPropertyDictFrom[source] scripts
to simulate custom site's configuration
"""
dm = self.portal.document_module
context = getattr(dm, 'one')
script_id = 'Text_getPropertyDictFromUserLogin'
factory = context.manage_addProduct['PythonScripts'].manage_addPythonScript
factory(id=script_id)
script = getattr(context, script_id)
script.ZPythonScript_edit('user_name=None',"return {'contributor':'person_module/john'}")
script_id = 'Text_getPropertyDictFromContent'
factory(id=script_id)
script = getattr(context, script_id)
script.ZPythonScript_edit('', "return {'short_title':'short'}")
result = context.Text_getPropertyDictFromContent()
self.setDiscoveryScript('one', 'Text_getPropertyDictFromUserLogin', 'user_name=None', "return {'contributor':'person_module/john'}")
self.setDiscoveryScript('one', 'Text_getPropertyDictFromContent', '', "return {'short_title':'short'}")
def stepTestMetadataSetting(self, sequence=None, sequence_list=None, **kw):
"""
Upload with custom getPropertyDict methods
check that all metadata are correct
"""
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
f = makeFileUpload('TEST-en-002.doc')
context.Document_uploadFile(file=f)
get_transaction().commit()
......@@ -550,8 +593,7 @@ class TestIngestion(ERP5TypeTestCase):
"""
we change metadata in a doc which has ODF
"""
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
kw = dict(title='another title',
subject='another subject',
description='another description')
......@@ -567,8 +609,7 @@ class TestIngestion(ERP5TypeTestCase):
# XXX actually this is an example of how it should be
# implemented in OOoDocument class - we don't really
# need oood for getting/setting metadata...
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
newcontent = context.getOOFile()
cs = cStringIO.StringIO()
cs.write(unpackData(newcontent))
......@@ -650,35 +691,45 @@ class TestIngestion(ERP5TypeTestCase):
self.checkDocumentExportList('four', 'sxd', ['jpg', 'draw.pdf', 'svg'])
def stepExportPDF(self, sequence=None, sequence_list=None, **kw):
shout('not yet implemented')
"""
see if it is possible to make a jpg
test conversion to html
"""
doc = self.getDocument('five')
f = makeFileUpload('TEST-en-002.pdf')
doc.edit(file=f)
res = doc._makeFile('jpg')
self.failUnless(res)
res_html = doc.getHtmlRepresentation()
self.failUnless('magic' in res_html)
def stepExportImage(self, sequence=None, sequence_list=None, **kw):
shout('not yet implemented')
"""
Don't see a way to test it here, Image.index_html makes heavy use
of REQUEST and RESPONSE, and the rest of the implementation is way down
in Zope core
"""
shout('not implemented')
def stepCheckHasSnapshot(self, sequence=None, sequence_list=None, **kw):
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
self.failUnless(context.hasSnapshot())
def stepCheckHasNoSnapshot(self, sequence=None, sequence_list=None, **kw):
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
self.failIf(context.hasSnapshot())
def stepCreateSnapshot(self, sequence=None, sequence_list=None, **kw):
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
context.createSnapshot()
def stepTryRecreateSnapshot(self, sequence=None, sequence_list=None, **kw):
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
# XXX this always fails, don't know why
#self.assertRaises(ConversionError, context.createSnapshot)
def stepDeleteSnapshot(self, sequence=None, sequence_list=None, **kw):
dm = self.portal.document_module
context = getattr(dm, 'one')
context = self.getDocument('one')
context.deleteSnapshot()
def stepContributeFilesWithType(self, sequence=None, sequence_list=None, **kw):
......@@ -695,14 +746,105 @@ class TestIngestion(ERP5TypeTestCase):
"""
self.contributeFiles(with_portal_type=False)
def stepSetDiscoveryScriptsForOrdering(self, sequence=None, sequence_list=None, **kw):
"""
set scripts which are supposed to overwrite each other's metadata
desing is the following:
File Name User Content Input
reference TEST USER CONT INPUT
language en us in
version 002 003 004
contributor john jack james
short_title from_content from_input
"""
self.setDiscoveryScript('one', 'Text_getPropertyDictFromUserLogin', 'user_name=None', "return {'reference':'USER', 'language':'us', 'contributor':'person_module/john'}")
self.setDiscoveryScript('one', 'Text_getPropertyDictFromContent', '', "return {'reference':'CONT', 'version':'003', 'contributor':'person_module/jack', 'short_title':'from_content'}")
def stepCheckMetadataSettingOrderFIUC(self, sequence=None, sequence_list=None, **kw):
"""
This is the default
"""
expected = dict(reference='TEST', language='en', version='002', short_title='from_input', contributor='person_module/james')
self.setDiscoveryOrder(['file_name', 'input', 'content', 'user_login'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
def stepCheckMetadataSettingOrderCUFI(self, sequence=None, sequence_list=None, **kw):
"""
Content - User - Filename - Input
"""
expected = dict(reference='CONT', language='us', version='003', short_title='from_content', contributor='person_module/jack')
self.setDiscoveryOrder(['content', 'user_login', 'file_name', 'input'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
def stepCheckMetadataSettingOrderUIFC(self, sequence=None, sequence_list=None, **kw):
"""
User - Input - Filename - Content
"""
expected = dict(reference='USER', language='us', version='004', short_title='from_input', contributor='person_module/john')
self.setDiscoveryOrder(['user_login', 'input', 'file_name', 'content'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
def stepCheckMetadataSettingOrderICUF(self, sequence=None, sequence_list=None, **kw):
"""
Input - Content - User - Filename
"""
expected = dict(reference='INPUT', language='in', version='004', short_title='from_input', contributor='person_module/james')
self.setDiscoveryOrder(['input', 'content', 'user_login', 'file_name'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
def stepCheckMetadataSettingOrderUFCI(self, sequence=None, sequence_list=None, **kw):
"""
User - Filename - Content - Input
"""
expected = dict(reference='USER', language='us', version='002', short_title='from_content', contributor='person_module/john')
self.setDiscoveryOrder(['user_login', 'file_name', 'content', 'input'])
self.discoverMetadata()
self.checkMetadataOrder(expected)
def stepReceiveEmailFromUnknown(self, sequence=None, sequence_list=None, **kw):
shout('not yet implemented')
"""
email was sent in by someone who is not in the person_module
"""
self.failUnless(hasattr(self.portal, 'portal_mailin'))
f = open(makeFilePath('email_from.txt'))
res = self.portal.portal_mailin.postMailMessage(f.read())
# we check if the mailin returned anything - it should return a message saying that the recipient does not exist
# the exact wording may differ
# the way mailin works is that if mail was accepted it returns None
self.failUnless(res)
def stepReceiveEmailFromJohn(self, sequence=None, sequence_list=None, **kw):
shout('not yet implemented')
"""
email was sent in by someone who is in the person_module
"""
self.failUnless(hasattr(self.portal, 'portal_mailin'))
f = open(makeFilePath('email_from.txt'))
res = self.portal.portal_mailin.postMailMessage(f.read())
# we check if the mailin returned anything - it should return a message saying that the recipient does not exist
# the exact wording may differ
# the way mailin works is that if mail was accepted it returns None
self.failIf(res)
get_transaction().commit()
self.tic()
def stepVerifyEmailedDocuments(self, sequence=None, sequence_list=None, **kw):
shout('not yet implemented')
"""
find the newly mailed-in document by its reference
check its properties
"""
res = self.portal_catalog(reference='MAIL')
self.assertEquals(len(res), 1) # check if it is there
doc = res[0].getObject()
john_is_owner = 0
for role in doc.get_local_roles():
if role[0] == 'john_doe' and 'Owner' in role[1]:
john_is_owner = 1
break
self.failUnless(john_is_owner)
##################################
......@@ -830,11 +972,10 @@ class TestIngestion(ERP5TypeTestCase):
def test_06_FormatGeneration(self, quiet=QUIET, run=RUN_ALL_TEST):
"""
T,est generationof files in all possible formats
(do we need to test it here? it is tested
in oood tests...)
XXX except PDF and Image which should be tested here
- at least check if they have correct lists of available formats for export
Test generation of files in all possible formats
which means check if they have correct lists of available formats for export
actual generation is tested in oood tests
PDF and Image should be tested here
"""
if testrun and 6 not in testrun:return
if not run: return
......@@ -883,9 +1024,7 @@ class TestIngestion(ERP5TypeTestCase):
def test_08_Cache(self, quiet=QUIET, run=RUN_ALL_TEST):
"""
I don't know how to verify how cache works - the only
think I know how to check is change file contents and
make sure new version is served
I don't know how to verify how cache works
"""
def test_09_Contribute(self, quiet=QUIET, run=RUN_ALL_TEST):
......@@ -911,10 +1050,41 @@ class TestIngestion(ERP5TypeTestCase):
def test_10_MetadataSettingPreferenceOrder(self, quiet=QUIET, run=RUN_ALL_TEST):
"""
Create a doc, try to set the same metadata from different sources
check that the right ones remained
Set some metadata discovery scripts
Contribute a doc, let it get metadata using default setup
(default is FUC)
check that the right ones are there
change preference order, check again
"""
if testrun and 10 not in testrun:return
if not run: return
if not quiet: shout('test_10_MetadataSettingPreferenceOrder')
sequence_list = SequenceList()
step_list = [ 'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepCheckMetadataSettingOrderFIUC'
,'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepCheckMetadataSettingOrderCUFI'
,'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepCheckMetadataSettingOrderUIFC'
,'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepCheckMetadataSettingOrderICUF'
,'stepCreateTextDocument'
,'stepStraightUpload'
,'stepSetDiscoveryScriptsForOrdering'
,'stepCheckMetadataSettingOrderUFCI'
]
sequence_string = ' '.join(step_list)
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
def test_11_EmailIngestion(self, quiet=QUIET, run=RUN_ALL_TEST):
"""
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment