Commit 8ec13c2a authored by Gabriel Monnerat's avatar Gabriel Monnerat

clean up the code according to PEP08

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk/utils@43503 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 60c08537
......@@ -30,7 +30,7 @@
from zope.interface import implements
from cloudooo.interfaces.handler import IHandler
from cloudooo.file import File
from subprocess import Popen,PIPE
from subprocess import Popen, PIPE
class FFMPEGHandler(object):
......@@ -38,7 +38,7 @@ class FFMPEGHandler(object):
implements(IHandler)
def __init__(self, base_folder_url, data, source_format,**kw):
def __init__(self, base_folder_url, data, source_format, **kw):
"""
base_folder_url(string)
The requested url for data base folder
......@@ -53,12 +53,19 @@ class FFMPEGHandler(object):
def convert(self, destination_format, **kw):
""" Convert the inputed video to output as format that were informed """
# XXX This implementation could use ffmpeg -i pipe:0, but
# XXX seems super unreliable currently and it generates currupted files in the end
# XXX seems super unreliable currently and it generates currupted files in
# the end
output = File(self.base_folder_url, '', destination_format)
try:
command = [self.ffmpeg_bin, "-i",self.input.getUrl(), "-y", output.getUrl()]
stdout, stderr = Popen(command, stdout=PIPE,
stderr=PIPE, close_fds=True).communicate()
command = [self.ffmpeg_bin,
"-i",
self.input.getUrl(),
"-y",
output.getUrl()]
stdout, stderr = Popen(command,
stdout=PIPE,
stderr=PIPE,
close_fds=True).communicate()
output.reload()
return output.getContent()
finally:
......
......@@ -26,7 +26,7 @@
#
##############################################################################
import unittest
import unittest
import md5
from cloudooo.handler.ffmpeg.handler import FFMPEGHandler
......@@ -35,7 +35,8 @@ class TestFFMPEGHandler(unittest.TestCase):
def testConvertVideo(self):
"""Test coversion of diferents formats of video"""
input_data = FFMPEGHandler("tests/data", open("tests/data/test.3gp").read())
input_data = FFMPEGHandler("tests/data",
open("tests/data/test.3gp").read())
hash_input = input_data.digest()
output_data = handler.convert("ogv")
hash_output = output_data.digest()
......
......@@ -86,7 +86,8 @@ class Application(object):
process = Process(pid)
try:
for connection in process.get_connections():
if connection.status == 'LISTEN' and connection.local_address[1] == self.port:
if connection.status == 'LISTEN' and \
connection.local_address[1] == self.port:
return True
except AccessDenied:
return False
......
......@@ -39,6 +39,7 @@ from cloudooo.handler.ooo.utils.utils import waitStartDaemon, \
removeDirectory, waitStopDaemon, \
socketStatus
class OpenOffice(Application):
"""Object to control one OOo Instance and all features instance."""
......
......@@ -37,6 +37,7 @@ from cloudooo.file import File
class FileSystemDocument(File):
pass
class OdfDocument(object):
"""Manipulates odf documents in memory"""
......
......@@ -51,12 +51,14 @@ DRAW_XPATH_QUERY = './/draw:image'
TABLE_XPATH_QUERY = './/table:table'
IMAGE_TITLE_XPATH_QUERY = './/../../text() | .//../../*/text()'
def getTemplatePath(format):
""" Get the path of template file. This should goes to
some utils library.
"""
return path.join(path.dirname(__file__), 'template.%s' % format)
class OOGranulator(object):
"""Granulate an OpenOffice document into tables, images, chapters and
paragraphs."""
......@@ -105,11 +107,12 @@ class OOGranulator(object):
if len(table_list) == 0:
return None
table = table_list[0]
# Next line do this <office:content><office:body><office:text><table:table>
# Next line do this
# <office:content><office:body><office:text><table:table>
content_xml[-1][0].append(table)
# XXX: Next line replace the <office:automatic-styles> tag. This include a
# lot of unused style tags. Will be better detect the used styles and
# include only those.
# XXX: Next line replace the <office:automatic-styles> tag. This include
# a lot of unused style tags. Will be better detect the used styles and
# include only those.
content_xml.replace(content_xml[-2],
self.document.parsed_content[-2])
......@@ -139,7 +142,6 @@ class OOGranulator(object):
matrix.append(matrix_row)
return matrix
def getColumnItemList(self, file, table_id):
"""Return the list of columns in the form of (id, title)."""
raise NotImplementedError
......@@ -194,7 +196,8 @@ class OOGranulator(object):
try:
paragraph = relevant_paragraph_list[paragraph_id]
except IndexError:
logger.error("Unable to find paragraph %s at paragraph list." % paragraph_id)
msg = "Unable to find paragraph %s at paragraph list." % paragraph_id
logger.error(msg)
return None
text = ''.join(paragraph.itertext())
......
......@@ -126,11 +126,13 @@ class OOHandler:
command = self._getCommand(*feature_list, **kw)
stdout, stderr = self._subprocess(command)
if stderr != "":
raise Exception, stderr
raise Exception(stderr)
return stdout, stderr
def _serializeMimemapper(self, source_extension=None, destination_extension=None):
def _serializeMimemapper(self,
source_extension=None,
destination_extension=None):
"""Serialize parts of mimemapper"""
if destination_extension is None:
return json.dumps(dict(mimetype_by_filter_type=mimemapper._mimetype_by_filter_type))
......
......@@ -17,7 +17,7 @@ def test_openoffice(hostname, port):
def main():
try:
opt_list, arg_list = getopt(sys.argv[1:], "",
["port=","hostname=","uno_path="])
["port=", "hostname=", "uno_path="])
except GetoptError, e:
print >> sys.stderr, "%s \nUse --port and --hostname" % e
sys.exit(2)
......
......@@ -100,8 +100,8 @@ class UnoConverter(object):
sys.path.append(uno_path)
fundamentalrc_file = '%s/fundamentalrc' % office_binary_path
if exists(fundamentalrc_file) and \
not environ.has_key('URE_BOOTSTRAP'):
putenv('URE_BOOTSTRAP','vnd.sun.star.pathname:%s' % fundamentalrc_file)
'URE_BOOTSTRAP' not in environ:
putenv('URE_BOOTSTRAP', 'vnd.sun.star.pathname:%s' % fundamentalrc_file)
def _createProperty(self, name, value):
"""Create property"""
......@@ -133,7 +133,7 @@ class UnoConverter(object):
else:
return []
return [property,]
return [property, ]
def _getFilterName(self, destination_format, type):
for filter_tuple in mimemapper["filter_list"]:
......@@ -164,7 +164,7 @@ class UnoConverter(object):
uno_url = self.systemPathToFileUrl(self.document_url)
uno_document = desktop.loadComponentFromURL(uno_url, "_blank", 0, ())
if not uno_document:
raise AttributeError, "This document can not be loaded or is empty"
raise AttributeError("This document can not be loaded or is empty")
if self.refresh:
# Before converting to expected format, refresh dynamic
# value inside document.
......
......@@ -98,8 +98,8 @@ class UnoMimemapper(object):
sys.path.append(uno_path)
fundamentalrc_file = '%s/fundamentalrc' % office_binary_path
if path.exists(fundamentalrc_file) and \
not environ.has_key('URE_BOOTSTRAP'):
putenv('URE_BOOTSTRAP','vnd.sun.star.pathname:%s' % fundamentalrc_file)
'URE_BOOTSTRAP' not in environ:
putenv('URE_BOOTSTRAP', 'vnd.sun.star.pathname:%s' % fundamentalrc_file)
def getFilterDict(self):
"""Return all filters and your properties"""
......
......@@ -57,7 +57,7 @@ class MimeMapper(object):
def _addFilter(self, filter):
"""Add filter in mimemapper catalog."""
extension = filter.getExtension()
if not self._filter_by_extension_dict.has_key(extension):
if extension not in self._filter_by_extension_dict:
self._filter_by_extension_dict[extension] = []
self._filter_by_extension_dict.get(extension).append(filter)
......@@ -154,7 +154,7 @@ class MimeMapper(object):
# for Export filters
if flag & 0x02:
if not self._mimetype_by_filter_type.has_key(filter_type):
if filter_type not in self._mimetype_by_filter_type:
self._mimetype_by_filter_type[filter_type] = mimetype
# for export filters, one extension is enough.
for ext in filter_extension_list[:1]:
......@@ -181,15 +181,15 @@ class MimeMapper(object):
# hardcode 'extension -> document type' mappings according to
# soffice behaviour for extensions having several candidates.
self._doc_type_list_by_extension.update({
'rtf':['com.sun.star.text.TextDocument'],
'sxd':['com.sun.star.drawing.DrawingDocument'],
'txt':['com.sun.star.text.TextDocument'],
'odg':['com.sun.star.drawing.DrawingDocument'],
'html':['com.sun.star.text.WebDocument'],
'sda':['com.sun.star.drawing.DrawingDocument'],
'sdd':['com.sun.star.drawing.DrawingDocument'],
'pdf':['com.sun.star.drawing.DrawingDocument'],
'xls':['com.sun.star.sheet.SpreadsheetDocument'],
'rtf': ['com.sun.star.text.TextDocument'],
'sxd': ['com.sun.star.drawing.DrawingDocument'],
'txt': ['com.sun.star.text.TextDocument'],
'odg': ['com.sun.star.drawing.DrawingDocument'],
'html': ['com.sun.star.text.WebDocument'],
'sda': ['com.sun.star.drawing.DrawingDocument'],
'sdd': ['com.sun.star.drawing.DrawingDocument'],
'pdf': ['com.sun.star.drawing.DrawingDocument'],
'xls': ['com.sun.star.sheet.SpreadsheetDocument'],
})
self.document_service_list = self._extension_list_by_type.keys()
self.extension_list_by_doc_type =\
......
......@@ -7,6 +7,7 @@ monitor_request = None
monitor_memory = None
monitor_sleeping_time = None
def load(local_config):
"""Start the monitors"""
monitor_interval = int(local_config.get('monitor_interval'))
......
......@@ -56,7 +56,7 @@ class MonitorMemory(Monitor, Process):
except TypeError:
logger.debug("OpenOffice is stopped")
return 0
except psutil.NoSuchProcess, e:
except psutil.NoSuchProcess:
# Exception raised when a process with a certain PID doesn't or no longer
# exists (zombie).
return 0
......
......@@ -28,7 +28,6 @@
from monitor import Monitor
from threading import Thread
import psutil
from cloudooo.utils.utils import logger
from time import sleep, time
......@@ -72,4 +71,3 @@ class MonitorSpleepingTime(Monitor, Thread):
self.openoffice.release()
sleep(self.interval)
logger.debug("Stop MonitorSpleepingTime")
......@@ -28,10 +28,9 @@
import unittest
import sys
from os import path, mkdir
from os import path
from os import environ, putenv
from cloudooo.handler.ooo.application.openoffice import openoffice
from cloudooo.handler.ooo.utils.utils import waitStartDaemon
from cloudooo.handler.ooo.mimemapper import mimemapper
from cloudooo.handler.tests.handlerTestCase import config, check_folder
......@@ -67,7 +66,7 @@ def startFakeEnvironment(start_openoffice=True, conf_path=None):
fundamentalrc_file = '%s/fundamentalrc' % office_binary_path
if path.exists(fundamentalrc_file) and \
not environ.has_key('URE_BOOTSTRAP'):
'URE_BOOTSTRAP' not in environ:
putenv('URE_BOOTSTRAP', 'vnd.sun.star.pathname:%s' % fundamentalrc_file)
if start_openoffice:
......@@ -90,6 +89,7 @@ def startFakeEnvironment(start_openoffice=True, conf_path=None):
openoffice.release()
return openoffice
def stopFakeEnvironment(stop_openoffice=True):
"""Stop Openoffice """
if stop_openoffice:
......
......@@ -88,7 +88,7 @@ def run():
process.wait()
elif OPENOFFICE:
chdir(ENVIRONMENT_PATH)
openoffice = startFakeEnvironment(conf_path=server_cloudooo_conf)
startFakeEnvironment(conf_path=server_cloudooo_conf)
try:
TestRunner(verbosity=2).run(suite)
finally:
......
......@@ -35,6 +35,7 @@ import magic
file_detector = magic.Magic()
DAEMON = True
class TestAllFormats(HandlerTestCase):
"""Test XmlRpc Server. Needs cloudooo server started"""
......@@ -92,4 +93,3 @@ class TestAllFormats(HandlerTestCase):
def test_suite():
return make_suite(TestAllFormats)
......@@ -35,6 +35,7 @@ import magic
file_detector = magic.Magic()
DAEMON = True
class TestAllFormatsERP5Compatibility(HandlerTestCase):
"""
Test XmlRpc Server using ERP5 compatibility API.
......@@ -86,5 +87,6 @@ class TestAllFormatsERP5Compatibility(HandlerTestCase):
message = '\n'.join([template_message % fault for fault in fault_list])
self.fail('Failed Conversions:\n' + message)
def test_suite():
return make_suite(TestAllFormatsERP5Compatibility)
......@@ -63,4 +63,3 @@ class TestApplication(unittest.TestCase):
def test_suite():
return make_suite(TestApplication)
......@@ -56,4 +56,3 @@ class TestFilter(unittest.TestCase):
def test_suite():
return make_suite(TestFilter)
......@@ -36,6 +36,7 @@ import magic
DAEMON = True
mime_decoder = magic.Magic(mime=True)
def basicTestToGenerate(id, proxy, data, source_format, destination_format,
result_list):
"""Test to use method generate of server"""
......@@ -44,6 +45,7 @@ def basicTestToGenerate(id, proxy, data, source_format, destination_format,
assert mimetype == 'application/pdf'
result_list[id] = True
class TestHighLoad(HandlerTestCase):
"""Test with many simultaneous connection"""
......@@ -56,7 +58,7 @@ class TestHighLoad(HandlerTestCase):
process_list = []
data = encodestring(open("data/test.doc", 'r').read())
LOOP = 50
result_list = Array('i', [False]*LOOP)
result_list = Array('i', [False] * LOOP)
for id in range(LOOP):
process = Process(target=basicTestToGenerate, args=(id, self.proxy, data,
'doc', 'pdf',
......@@ -72,4 +74,3 @@ class TestHighLoad(HandlerTestCase):
def test_suite():
return make_suite(TestHighLoad)
......@@ -164,6 +164,7 @@ chart_expected_tuple = (
OPENOFFICE = True
class TestMimeMapper(HandlerTestCase):
"""Test if object load filters correctly of OOo."""
......@@ -332,5 +333,6 @@ class TestMimeMapper(HandlerTestCase):
'com.sun.star.presentation.PresentationDocument')
self.assertEquals(filtername, "impress_html_Export")
def test_suite():
return make_suite(TestMimeMapper)
......@@ -34,6 +34,7 @@ from cloudooo.handler.ooo.monitor.memory import MonitorMemory
OPENOFFICE = True
class TestMonitorInit(HandlerTestCase):
"""Test Case to test if the monitors are controlled correctly"""
......@@ -74,4 +75,3 @@ class TestMonitorInit(HandlerTestCase):
def test_suite():
return make_suite(TestMonitorInit)
......@@ -36,6 +36,7 @@ from cloudoooTestCase import make_suite
OPENOFFICE = True
class TestMonitorMemory(unittest.TestCase):
"""Test case to see if the MonitorMemory is properly managing the
openoffice."""
......@@ -108,4 +109,3 @@ class TestMonitorMemory(unittest.TestCase):
def test_suite():
return make_suite(TestMonitorMemory)
......@@ -34,6 +34,7 @@ from cloudooo.handler.ooo.application.openoffice import openoffice
OPENOFFICE = True
class TestMonitorRequest(HandlerTestCase):
"""Test all features of a monitor following the interface"""
......
......@@ -34,6 +34,7 @@ from cloudoooTestCase import make_suite
OPENOFFICE = True
class TestMonitorTimeout(unittest.TestCase):
"""Test all features of a monitor following the interface"""
......
......@@ -100,7 +100,6 @@ class TestOOGranulator(HandlerTestCase):
self.assertEquals(None, oogranulator.getTableMatrix('Non existent'))
def testGetColumnItemList(self):
"""Test if getColumnItemList() returns the right table columns list"""
self.assertRaises(NotImplementedError, self.oogranulator.getColumnItemList,
......
......@@ -39,6 +39,7 @@ from zipfile import ZipFile
OPENOFFICE = True
class TestOOHandler(HandlerTestCase):
"""Test OOHandler and manipulation of OOo Instance"""
......@@ -93,7 +94,7 @@ class TestOOHandler(HandlerTestCase):
'odt')
metadata = handler.getMetadata()
self.assertEquals(metadata.get('Data'), '')
self.assertTrue(metadata.has_key('Data'))
self.assertTrue('Data' in metadata)
self.assertEquals(metadata.get('MIMEType'),
'application/vnd.oasis.opendocument.text')
handler.document.restoreOriginal()
......@@ -189,6 +190,6 @@ class TestOOHandler(HandlerTestCase):
self.assertTrue(content_tree.xpath('//text:variable-get[text() = "DISPLAY ME"]',
namespaces=content_tree.nsmap))
def test_suite():
return make_suite(TestOOHandler)
......@@ -32,6 +32,7 @@ from cloudoooTestCase import make_suite
from cloudooo.handler.tests.handlerTestCase import HandlerTestCase
from cloudooo.handler.ooo.document import OdfDocument
class TestOdfDocument(HandlerTestCase):
def setUp(self):
......@@ -67,4 +68,3 @@ class TestOdfDocument(HandlerTestCase):
def test_suite():
return make_suite(TestOdfDocument)
......@@ -33,6 +33,7 @@ from cloudooo.handler.ooo.utils.utils import waitStopDaemon
OPENOFFICE = True
class TestOpenOffice(HandlerTestCase):
"""Test OpenOffice object and manipulation of OOo Instance"""
......
......@@ -38,6 +38,7 @@ import magic
DAEMON = True
class TestServer(HandlerTestCase):
"""Test XmlRpc Server. Needs cloudooo server started"""
......@@ -115,7 +116,6 @@ class TestServer(HandlerTestCase):
content_type = self._getFileType(output_url)
self.assertEquals(content_type, stdout_msg)
def _getFileType(self, output_url):
mime = magic.Magic(mime=True)
return mime.from_file(output_url)
......@@ -174,7 +174,7 @@ class TestServer(HandlerTestCase):
def testgetFileMetadataItemListWithoutData(self):
"""Test server using method getFileMetadataItemList. Without data
converted"""
data = open(join('data','testMetadata.odt'),'r').read()
data = open(join('data', 'testMetadata.odt'), 'r').read()
metadata_dict = self.proxy.getFileMetadataItemList(encodestring(data),
'odt')
self.assertEquals(metadata_dict.get("Data"), '')
......@@ -188,7 +188,7 @@ class TestServer(HandlerTestCase):
def testgetFileMetadataItemListWithData(self):
"""Test server using method getFileMetadataItemList. With data converted"""
document_output_url = join(self.tmp_url, "testGetMetadata.odt")
data = open(join('data','testMetadata.odt'),'r').read()
data = open(join('data', 'testMetadata.odt'), 'r').read()
metadata_dict = self.proxy.getFileMetadataItemList(encodestring(data),
"odt",
True)
......@@ -206,9 +206,9 @@ class TestServer(HandlerTestCase):
def testupdateFileMetadata(self):
"""Test server using method updateFileMetadata"""
document_output_url = join(self.tmp_url, "testSetMetadata.odt")
data = open(join('data','testMetadata.odt'),'r').read()
data = open(join('data', 'testMetadata.odt'), 'r').read()
odf_data = self.proxy.updateFileMetadata(encodestring(data), 'odt',
{"Title":"testSetMetadata"})
{"Title": "testSetMetadata"})
open(document_output_url, 'w').write(decodestring(odf_data))
content_type = self._getFileType(document_output_url)
self.assertEquals(content_type, 'application/vnd.oasis.opendocument.text')
......@@ -220,11 +220,11 @@ class TestServer(HandlerTestCase):
def testupdateFileMetadataWithUserMetadata(self):
"""Test server using method updateFileMetadata with unsual metadata"""
document_output_url = join(self.tmp_url, "testSetMetadata.odt")
data = open(join('data','testMetadata.odt'),'r').read()
data = open(join('data', 'testMetadata.odt'), 'r').read()
odf_data = self.proxy.updateFileMetadata(encodestring(data),
'odt',
{"Reference":"testSetMetadata"})
'odt',
{"Reference": "testSetMetadata"})
open(document_output_url, 'w').write(decodestring(odf_data))
content_type = self._getFileType(document_output_url)
self.assertEquals(content_type, 'application/vnd.oasis.opendocument.text')
......@@ -235,11 +235,11 @@ class TestServer(HandlerTestCase):
"""Test server using method updateFileMetadata when the same metadata is
updated"""
document_output_url = join(self.tmp_url, "testSetMetadata.odt")
data = open(join('data','testMetadata.odt'),'r').read()
data = open(join('data', 'testMetadata.odt'), 'r').read()
odf_data = self.proxy.updateFileMetadata(encodestring(data), 'odt',
{"Reference":"testSetMetadata", "Something":"ABC"})
{"Reference": "testSetMetadata", "Something": "ABC"})
new_odf_data = self.proxy.updateFileMetadata(odf_data, 'odt',
{"Reference":"new value", "Something": "ABC"})
{"Reference": "new value", "Something": "ABC"})
open(document_output_url, 'w').write(decodestring(new_odf_data))
content_type = self._getFileType(document_output_url)
self.assertEquals(content_type, 'application/vnd.oasis.opendocument.text')
......@@ -250,13 +250,13 @@ class TestServer(HandlerTestCase):
def testupdateFileMetadataAlreadyHasMetadata(self):
"""Test document that already has metadata. Check if the metadata is not
deleted"""
data = open(join('data','testMetadata.odt'),'r').read()
data = open(join('data', 'testMetadata.odt'), 'r').read()
metadata_dict = self.proxy.getFileMetadataItemList(encodestring(data), 'odt')
self.assertEquals(metadata_dict["Description"], "cloudooo Comments")
self.assertEquals(metadata_dict["Keywords"], "Keywords Test")
self.assertEquals(metadata_dict["Title"], "cloudooo Test")
odf_data = self.proxy.updateFileMetadata(encodestring(data), 'odt',
{"Title":"cloudooo Title"})
{"Title": "cloudooo Title"})
odf_metadata_dict = self.proxy.getFileMetadataItemList(odf_data, 'odt')
self.assertEquals(odf_metadata_dict["Description"], "cloudooo Comments")
self.assertEquals(odf_metadata_dict["Keywords"], "Keywords Test")
......@@ -328,17 +328,17 @@ class TestServer(HandlerTestCase):
def testConvertDocumentToInvalidFormat(self):
"""Try convert one document for a invalid format"""
data = open(join('data','test.doc'),'r').read()
data = open(join('data', 'test.doc'), 'r').read()
self.assertRaises(Fault, self.proxy.convertFile, (data, 'doc', 'xyz'))
def testConvertDocumentToImpossibleFormat(self):
"""Try convert one document to format not possible"""
data = open(join('data','test.odp'),'r').read()
data = open(join('data', 'test.odp'), 'r').read()
self.assertRaises(Fault, self.proxy.convertFile, (data, 'odp', 'doc'))
def testRunConvertMethod(self):
"""Test run_convert method"""
data = open(join('data','test.doc'),'r').read()
data = open(join('data', 'test.doc'), 'r').read()
response_code, response_dict, response_message = \
self.proxy.run_convert('test.doc', encodestring(data))
self.assertEquals(response_code, 200)
......@@ -352,7 +352,7 @@ class TestServer(HandlerTestCase):
def testRunConvertFailResponse(self):
"""Test run_convert method with invalid file"""
data = open(join('data', 'test.doc'),'r').read()[:30]
data = open(join('data', 'test.doc'), 'r').read()[:30]
response_code, response_dict, response_message = \
self.proxy.run_convert('test.doc', encodestring(data))
self.assertEquals(response_code, 402)
......@@ -364,7 +364,7 @@ class TestServer(HandlerTestCase):
def testRunGenerateMethod(self):
"""Test run_generate method"""
data = open(join('data', 'test.odt'),'r').read()
data = open(join('data', 'test.odt'), 'r').read()
generate_result = self.proxy.run_generate('test.odt',
encodestring(data),
None, 'pdf',
......@@ -378,7 +378,7 @@ class TestServer(HandlerTestCase):
def testRunGenerateMethodConvertOdsToHTML(self):
"""Test run_generate method. This test is to validate a bug convertions to
html"""
data = open(join('data', 'test.ods'),'r').read()
data = open(join('data', 'test.ods'), 'r').read()
generate_result = self.proxy.run_generate('test.ods',
encodestring(data),
None, 'html',
......@@ -402,7 +402,7 @@ class TestServer(HandlerTestCase):
def testPNGFileToConvertOdpToHTML(self):
"""Test run_generate method. This test if returns good png files"""
data = open(join('data', 'test_png.odp'),'r').read()
data = open(join('data', 'test_png.odp'), 'r').read()
generate_result = self.proxy.run_generate('test_png.odp',
encodestring(data),
None, 'html',
......@@ -431,7 +431,7 @@ class TestServer(HandlerTestCase):
def testRunGenerateMethodConvertOdpToHTML(self):
"""Test run_generate method. This test is to validate a bug convertions to
html"""
data = open(join('data','test.odp'),'r').read()
data = open(join('data', 'test.odp'), 'r').read()
generate_result = self.proxy.run_generate('test.odp',
encodestring(data),
None, 'html',
......@@ -455,7 +455,7 @@ class TestServer(HandlerTestCase):
# document.
def _testRunGenerateMethodFailResponse(self):
"""Test run_generate method with invalid document"""
data = open(join('data','test.odt'), 'r').read()[:100]
data = open(join('data', 'test.odt'), 'r').read()[:100]
generate_result = self.proxy.run_generate('test.odt',
encodestring(data),
None, 'pdf', 'application/vnd.oasis.opendocument.text')
......@@ -467,10 +467,10 @@ class TestServer(HandlerTestCase):
def testRunSetMetadata(self):
"""Test run_setmetadata method"""
data = open(join('data','testMetadata.odt'),'r').read()
data = open(join('data', 'testMetadata.odt'), 'r').read()
setmetadata_result = self.proxy.run_setmetadata('testMetadata.odt',
encodestring(data),
{"Title":"testSetMetadata", "Description": "Music"})
{"Title": "testSetMetadata", "Description": "Music"})
response_code, response_dict, response_message = setmetadata_result
self.assertEquals(response_code, 200)
self.assertNotEquals(response_dict['data'], '')
......@@ -483,7 +483,7 @@ class TestServer(HandlerTestCase):
self.assertEquals(response_dict['meta']['Description'], "Music")
setmetadata_result = self.proxy.run_setmetadata('testMetadata.odt',
encodestring(data),
{"Title":"Namie's working record",
{"Title": "Namie's working record",
"Description": "Music"})
response_code, response_dict, response_message = setmetadata_result
getmetadata_result = self.proxy.run_getmetadata('testMetadata.odt',
......@@ -495,10 +495,10 @@ class TestServer(HandlerTestCase):
def testRunSetMetadataFailResponse(self):
"""Test run_setmetadata method with invalid document"""
data = open(join('data','testMetadata.odt'),'r').read()[:100]
data = open(join('data', 'testMetadata.odt'), 'r').read()[:100]
setmetadata_result = self.proxy.run_setmetadata('testMetadata.odt',
encodestring(data),
{"Title":"testSetMetadata", "Description": "Music"})
{"Title": "testSetMetadata", "Description": "Music"})
response_code, response_dict, response_message = setmetadata_result
self.assertEquals(response_code, 402)
self.assertEquals(response_dict, {})
......@@ -517,4 +517,3 @@ class TestServer(HandlerTestCase):
def test_suite():
return make_suite(TestServer)
......@@ -38,6 +38,7 @@ from cloudooo.handler.ooo.document import FileSystemDocument
OPENOFFICE = True
class TestUnoConverter(HandlerTestCase):
"""Test case to test all features of the unoconverter script"""
......
......@@ -36,6 +36,7 @@ from cloudooo.handler.tests.handlerTestCase import HandlerTestCase
OPENOFFICE = True
class TestUnoMimeMapper(HandlerTestCase):
"""Test Case to test all features of script unomimemapper"""
......@@ -119,4 +120,3 @@ class TestUnoMimeMapper(HandlerTestCase):
def test_suite():
return make_suite(TestUnoMimeMapper)
......@@ -29,7 +29,7 @@
from socket import socket, error
from errno import EADDRINUSE
from time import sleep
from os import remove, environ
from os import remove
from shutil import rmtree
from cloudooo.utils.utils import logger
......
......@@ -31,7 +31,6 @@ from cloudooo.interfaces.handler import IHandler
from cloudooo.file import File
from subprocess import Popen, PIPE
from tempfile import mktemp
from os import path
class PDFHandler(object):
......@@ -43,6 +42,7 @@ class PDFHandler(object):
""" Load pdf document """
self.base_folder_url = base_folder_url
self.document = File(base_folder_url, data, source_format)
self.environment = kw.get("env", {})
def convert(self, destination_format=None, **kw):
""" Convert a pdf document """
......@@ -52,7 +52,8 @@ class PDFHandler(object):
command = ["pdftotext", self.document.getUrl(), output_url]
stdout, stderr = Popen(command,
stdout=PIPE,
stderr=PIPE).communicate()
stderr=PIPE,
env=self.environment).communicate()
try:
return open(output_url).read()
finally:
......@@ -66,7 +67,8 @@ class PDFHandler(object):
command = ["pdfinfo", self.document.getUrl()]
stdout, stderr = Popen(command,
stdout=PIPE,
stderr=PIPE).communicate()
stderr=PIPE,
env=self.environment).communicate()
info_list = filter(None, stdout.split("\n"))
metadata = {}
for info in iter(info_list):
......
......@@ -10,7 +10,8 @@ ENVIRONMENT_PATH = path.abspath(path.dirname(__file__))
def exit(msg):
sys.stderr.write(msg)
sys.exit(0)
sys.exit(0)
# XXX - Duplicated function. This function must be generic to be used by all handlers
def run():
......@@ -32,7 +33,7 @@ def run():
python_extension = '.py'
if test_name[-3:] == python_extension:
test_name = test_name[:-3]
if not path.exists(path.join(ENVIRONMENT_PATH,
if not path.exists(path.join(ENVIRONMENT_PATH,
'%s%s' % (test_name, python_extension))):
exit("%s not exists\n" % test_name)
......
......@@ -45,7 +45,8 @@ class TestPDFHandler(HandlerTestCase):
def testgetMetadata(self):
"""Test if the metadata are extracted correctly"""
pdf_document = open("data/test.pdf").read()
handler = PDFHandler(self.tmp_url, pdf_document, "pdf")
kw = dict(env=dict(PATH="/hd/cloudooo_handler_ooo/software/parts/xpdf/bin"))
handler = PDFHandler(self.tmp_url, pdf_document, "pdf", **kw)
metadata = handler.getMetadata()
self.assertEquals(type(metadata), DictType)
self.assertNotEquals(metadata, {})
......@@ -53,6 +54,6 @@ class TestPDFHandler(HandlerTestCase):
def test_suite():
suite = unittest.TestSuite()
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(TestPDFHandler))
return suite
......@@ -34,12 +34,14 @@ import sys
config = ConfigParser()
def check_folder(working_path, tmp_dir_path):
if not path.exists(working_path):
mkdir(working_path)
if not path.exists(tmp_dir_path):
mkdir(tmp_dir_path)
class HandlerTestCase(unittest.TestCase):
"""Test Case to load cloudooo conf."""
......
......@@ -41,7 +41,8 @@ class IFile(Interface):
original_data = Attribute("Original data")
def load():
"""From the data creates one archive into file system using original data"""
"""From the data creates one archive into file system using original data
"""
def reload(url):
"""In the case of another file with the same content be created, pass the
......
......@@ -62,8 +62,8 @@ class ITextGranulator(Interface):
"""Provides methods to granulate a document into chapters and paragraphs."""
def getParagraphItemList():
"""Returns the list of paragraphs in the form of (id, class) where class may
have special meaning to define TOC/TOI."""
"""Returns the list of paragraphs in the form of (id, class) where class
may have special meaning to define TOC/TOI."""
def getParagraphItem(paragraph_id):
"""Returns the paragraph in the form of (text, class)."""
......
......@@ -56,7 +56,8 @@ class Manager(object):
zip archive
"""
if not mimemapper.getFilterList(destination_format):
raise ValueError, "This format (%s) is not supported or is invalid" % destination_format
raise ValueError("This format (%s) is not supported " +
"or is invalid" % destination_format)
self.kw['zip'] = zip
self.kw['refresh'] = refresh
document = OOHandler(self._path_tmp_dir,
......@@ -243,7 +244,10 @@ class Manager(object):
try:
response_dict = {}
# XXX - use html format instead of xhtml
if orig_format in ("presentation", "graphics", "spreadsheet", 'text') and extension == "xhtml":
if orig_format in ("presentation",
"graphics",
"spreadsheet",
'text') and extension == "xhtml":
extension = 'html'
response_dict['data'] = self.convertFile(data,
original_extension,
......
......@@ -27,20 +27,21 @@
##############################################################################
import gc
from signal import signal, SIGTERM, SIGINT, SIGQUIT, SIGHUP
from signal import signal, SIGINT, SIGQUIT, SIGHUP
from os import path, mkdir
import os
import cloudooo.handler.ooo.monitor as monitor
from cloudooo.handler.ooo.application.openoffice import openoffice
from cloudooo.wsgixmlrpcapplication import WSGIXMLRPCApplication
from cloudooo.utils.utils import convertStringToBool, configureLogger
from cloudooo.handler.ooo.mimemapper import mimemapper
def stopProcesses(signum, frame):
monitor.stop()
openoffice.stop()
def application(global_config, **local_config):
"""Method to load all configuration of cloudooo and start the application.
To start the application a number of params are required:
......@@ -83,14 +84,14 @@ def application(global_config, **local_config):
# Loading Configuration to start OOo Instance and control it
openoffice.loadSettings(application_hostname,
openoffice_port,
working_path,
local_config.get('office_binary_path'),
local_config.get('uno_path'),
local_config.get('openoffice_user_interface_language',
'en'),
environment_dict=environment_dict,
)
openoffice_port,
working_path,
local_config.get('office_binary_path'),
local_config.get('uno_path'),
local_config.get('openoffice_user_interface_language',
'en'),
environment_dict=environment_dict,
)
openoffice.start()
monitor.load(local_config)
......
......@@ -26,7 +26,6 @@
#
##############################################################################
from os import environ
import logging
logger = logging.getLogger('Cloudooo')
......@@ -72,7 +71,8 @@ def configureLogger(level=None, debug_mode=False):
ch = logging.StreamHandler()
ch.setLevel(level)
# create formatter
formatter = logging.Formatter("%(asctime).19s - %(name)s - %(levelname)s - %(message)s")
format = "%(asctime).19s - %(name)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(format)
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment