Commit 772a0e93 authored by Gabriel Monnerat's avatar Gabriel Monnerat

continuation of r40432. clean up the code to follow pep08, ignoring 4 spaces to identation

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk/utils@40438 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent 37bad98b
......@@ -26,6 +26,8 @@
#
##############################################################################
import gc
import monitor
from signal import signal, SIGHUP
from application.openoffice import openoffice
from application.xvfb import xvfb
......@@ -33,17 +35,17 @@ from wsgixmlrpcapplication import WSGIXMLRPCApplication
from utils import convertStringToBool, configureLogger
from os import path, mkdir
from mimemapper import mimemapper
import monitor, gc
def stopProcesses():
monitor.stop()
xvfb.stop()
openoffice.stop()
def application(global_config, **local_config):
"""Method to load all configuration of cloudooo and start the application.
To start the application a number of params are required:
Keyword arguments:
debug_mode -- Mode as the application prints the messages.
e.g debug_mode=logging.DEBUG
......@@ -76,31 +78,31 @@ def application(global_config, **local_config):
openoffice_port = int(local_config.get('openoffice_port'))
# Before start Xvfb, first loads the configuration
xvfb.loadSettings(application_hostname,
int(local_config.get('virtual_display_port')),
int(local_config.get('virtual_display_port')),
working_path,
local_config.get('virtual_display_id'),
local_config.get('virtual_display_id'),
virtual_screen=local_config.get('virtual_screen'),
start_timeout=local_config.get('start_timeout'))
xvfb.start()
# Loading Configuration to start OOo Instance and control it
openoffice.loadSettings(application_hostname,
openoffice.loadSettings(application_hostname,
openoffice_port,
working_path,
local_config.get('virtual_display_id'),
local_config.get('office_binary_path'),
local_config.get('office_binary_path'),
local_config.get('uno_path'))
openoffice.start()
monitor.load(local_config)
timeout_response = int(local_config.get('timeout_response'))
kw = dict(uno_path=local_config.get('uno_path'),
kw = dict(uno_path=local_config.get('uno_path'),
office_binary_path=local_config.get('office_binary_path'),
timeout=timeout_response)
# Signal to stop all processes
signal(SIGHUP, lambda x,y: stopProcesses())
signal(SIGHUP, lambda x, y: stopProcesses())
# Load all filters
openoffice.acquire()
mimemapper.loadFilterList(application_hostname,
......
......@@ -35,6 +35,7 @@ from zipfile import ZipFile, is_zipfile
from shutil import rmtree
import tempfile
class FileSystemDocument(object):
"""Filesystem Document is used to manipulate one temporary document
stored into the filesystem.
......@@ -43,7 +44,6 @@ class FileSystemDocument(object):
def __init__(self, base_folder_url, data, source_format):
"""Create an document into file system and store the URL.
Keyword arguments:
base_folder_url -- Full path to create a temporary folder
data -- Content of the document
......@@ -53,12 +53,12 @@ class FileSystemDocument(object):
self.original_data = data
self.source_format = source_format
self.url = self.load()
def _createDirectory(self):
return tempfile.mkdtemp(dir=self.base_folder_url)
def load(self):
"""Creates one Temporary Document and write data into it.
"""Creates one Temporary Document and write data into it.
Return the url for the document.
"""
# creates only the url of the file.
......@@ -87,7 +87,6 @@ class FileSystemDocument(object):
def getContent(self, zip=False):
"""Open the file and returns the content.
Keyword Arguments:
zip -- Boolean attribute. If True"""
if zip:
......
......@@ -29,6 +29,7 @@
from zope.interface import implements
from interfaces.filter import IFilter
class Filter(object):
"""Filter of OOo."""
......@@ -44,7 +45,7 @@ class Filter(object):
self._preferred = kwargs.get('preferred')
self._sort_index = kwargs.get('sort_index')
self._label = kwargs.get("label")
def getLabel(self):
"""Returns label."""
return self._label
......
......@@ -74,4 +74,3 @@ class OOGranulate(object):
def getChapterItem(self, file, chapter_id):
"""Return the chapter in the form of (title, level)."""
raise NotImplementedError
......@@ -146,7 +146,8 @@ class OOHandler:
if service_type in mimemapper._doc_type_list_by_extension[extension]:
filter_list.append((extension,
service_type,
mimemapper.getFilterName(extension, service_type)))
mimemapper.getFilterName(extension,
service_type)))
return jsonpickle.encode(dict(doc_type_list_by_extension=mimemapper._doc_type_list_by_extension,
filter_list=filter_list,
......@@ -199,7 +200,7 @@ class OOHandler:
metadata['Data'] = self.document.getContent()
else:
metadata['Data'] = ''
self.document.trash()
self.document.trash()
return metadata
def setMetadata(self, metadata):
......
......@@ -16,8 +16,7 @@ def test_openoffice(hostname, port):
def main():
try:
opt_list, arg_list = getopt(sys.argv[1:], "",
/bin/bash: q: comando não encontrado
opt_list, arg_list = getopt(sys.argv[1:], "",
except GetoptError, e:
print >> sys.stderr, "%s \nUse --port and --hostname" % e
sys.exit(2)
......
......@@ -53,6 +53,6 @@ class IApplication(Interface):
def restart():
"""Restarts the application with the same settings"""
def getAddress():
"""Returns the hostname and port in tuple"""
......@@ -43,9 +43,9 @@ class IDocument(Interface):
"""From the data creates one archive into file system using original data"""
def reload(url):
"""In the case of another file with the same content be created, pass the
"""In the case of another file with the same content be created, pass the
url to load the new file"""
def getContent(zip):
"""Returns data of document"""
......@@ -54,6 +54,6 @@ class IDocument(Interface):
def trash():
"""Remove the file in file system"""
def restoreOriginal():
"""Restore the document with the original document"""
......@@ -28,6 +28,7 @@
from zope.interface import Interface
class ITableGranulator(Interface):
"""Provides methods to granulate a document into tables."""
......
......@@ -28,6 +28,7 @@
from zope.interface import Interface
class IHandler(Interface):
"""Handles connections with the openoffice by socket"""
......
......@@ -29,9 +29,10 @@
from zope.interface import Interface
from zope.interface import Attribute
class ILockable(Interface):
"""Controls concurrency control"""
_lock = Attribute("A primitive lock is in one of two states, 'locked' or \
'unlocked'.")
......
......@@ -28,9 +28,10 @@
from zope.interface import Interface
class IManager(Interface):
"""Provides method to manipulate documents and metadatas using OOo"""
def convertFile(file, source_format, destination_format, zip):
"""Returns the converted file in the given format.
......@@ -56,7 +57,7 @@ class IManager(Interface):
class IERP5Compatibility(Interface):
""" Provides compatibility interface with ERP5 Project.
""" Provides compatibility interface with ERP5 Project.
This methods provide same API as OpenOffice.org project.
XXX Unfinished Docstring.
"""
......
......@@ -28,12 +28,13 @@
from zope.interface import Interface
class IMimemapper(Interface):
"""Provide methods to manipulate filters of OOo."""
def isLoaded():
"""Returns if the filters were loaded."""
def getDocumentTypeDict():
"""Returns document type dict."""
......
......@@ -28,6 +28,7 @@
from zope.interface import Interface
class IMonitor(Interface):
"""Provides features to monitor processes or python objects"""
......
......@@ -35,6 +35,7 @@ from handler.oohandler import OOHandler
from mimemapper import mimemapper
from utils import logger
class Manager(object):
"""Manipulates requisitons of client and temporary files in file system."""
implements(IManager, IERP5Compatibility)
......@@ -43,10 +44,9 @@ class Manager(object):
"""Need pass the path where the temporary document will be created."""
self._path_tmp_dir = path_tmp_dir
self.kw = kw
def convertFile(self, file, source_format, destination_format, zip=False):
"""Returns the converted file in the given format.
Keywords arguments:
file -- File as string in base64
source_format -- Format of original file as string
......@@ -84,7 +84,7 @@ class Manager(object):
def getFileMetadataItemList(self, file, source_format, base_document=False):
"""Receives the string of document as encodestring and returns a dict with
metadatas.
e.g.
e.g.
self.getFileMetadataItemList(data = encodestring(data))
return {'Title': 'abc','Description': 'comments', 'Data': None}
......@@ -102,7 +102,7 @@ class Manager(object):
metadata_dict = document.getMetadata(base_document)
metadata_dict['Data'] = encodestring(metadata_dict['Data'])
return metadata_dict
def getAllowedExtensionList(self, request_dict={}):
"""List types which can be generated from given type
Type can be given as:
......@@ -132,25 +132,29 @@ class Manager(object):
elif document_type:
return mimemapper.getAllowedExtensionList(document_type=document_type)
else:
return [('','')]
return [('', '')]
def run_convert(self, filename='', data=None, meta=None, extension=None,
orig_format=None):
"""Method to support the old API. Wrapper getFileMetadataItemList but
"""Method to support the old API. Wrapper getFileMetadataItemList but
returns a dict.
This is a Backwards compatibility provided for ERP5 Project, in order to
keep compatibility with OpenOffice.org Daemon.
keep compatibility with OpenOffice.org Daemon.
"""
if not extension:
extension = filename.split('.')[-1]
try:
response_dict = {}
response_dict['meta'] = self.getFileMetadataItemList(data, extension, True)
response_dict['meta']['MIMEType'] = self.getFileMetadataItemList(response_dict['meta']['Data'],
extension)['MIMEType']
# XXX - Backward compatibility: Previous API expects 'mime' now we use 'MIMEType'"
response_dict['meta'] = self.getFileMetadataItemList(data,
extension,
True)
mimetype = self.getFileMetadataItemList(response_dict['meta']['Data'],
extension)['MIMEType']
response_dict['meta']['MIMEType'] = mimetype
# XXX - Backward compatibility: Previous API expects 'mime' now we
# use 'MIMEType'
response_dict['meta']['mime'] = response_dict['meta']['MIMEType']
response_dict['data'] = response_dict['meta']['Data']
response_dict['data'] = response_dict['meta']['Data']
response_dict['mime'] = response_dict['meta']['MIMEType']
del response_dict['meta']['Data']
return (200, response_dict, "")
......@@ -185,7 +189,8 @@ class Manager(object):
response_dict = {}
try:
response_dict['meta'] = self.getFileMetadataItemList(data, extension)
# XXX - Backward compatibility: Previous API expects 'title' now we use 'Title'"
# XXX - Backward compatibility: Previous API expects 'title' now
# we use 'Title'"
response_dict['meta']['title'] = response_dict['meta']['Title']
return (200, response_dict, '')
except Exception, e:
......@@ -194,9 +199,9 @@ class Manager(object):
def run_generate(self, filename='', data=None, meta=None, extension=None,
orig_format=''):
"""Wrapper convertFile but returns a dict which includes mimetype.
This is a Backwards compatibility provided for ERP5 Project, in order to keep
compatibility with OpenOffice.org Daemon.
"""Wrapper convertFile but returns a dict which includes mimetype.
This is a Backwards compatibility provided for ERP5 Project, in order
to keep compatibility with OpenOffice.org Daemon.
"""
# XXX - ugly way to remove "/" and "."
orig_format = orig_format.split('.')[-1]
......@@ -210,9 +215,12 @@ class Manager(object):
# XXX - use html format instead of xhtml
if orig_format == "presentation" and extension == "xhtml":
extension = 'html'
elif orig_format in ("spreadsheet", 'text') and extension in ("html", "xhtml"):
elif orig_format in ("spreadsheet", 'text') and extension in ("html",
"xhtml"):
extension = "htm"
response_dict['data'] = self.convertFile(data, orig_format, extension,
response_dict['data'] = self.convertFile(data,
orig_format,
extension,
zip)
# FIXME: Fast solution to obtain the html or pdf mimetypes
if zip:
......
......@@ -36,6 +36,7 @@ from interfaces.mimemapper import IMimemapper
from types import InstanceType
from utils import getCleanPythonEnvironment
class MimeMapper(object):
"""Load all filters from OOo. You can get the filter you want or all
filters of the specific extension.
......@@ -55,7 +56,7 @@ class MimeMapper(object):
self.extension_list = []
self._mimetype_by_filter_type = {}
self._document_type_dict = {}
def _addFilter(self, filter):
"""Add filter in mimemapper catalog."""
extension = filter.getExtension()
......@@ -92,7 +93,6 @@ class MimeMapper(object):
def loadFilterList(self, hostname, port, **kw):
"""Load all filters of openoffice.
Keyword arguments:
hostname -- host of OpenOffice
port -- port to connects by socket
......@@ -100,24 +100,25 @@ class MimeMapper(object):
uno_path -- full path to uno library
office_binary_path -- full path to openoffice binary
"""
# Filters that has flag in bad_flag_list is ignored.
# Filters that has flag in bad_flag_list is ignored.
# XXX - Is not good way to remove unnecessary filters
# XXX - try find a good way to remove filters that are not used for export
bad_flag_list = [65, 94217, 536641, 1572929, 268959937, 524373, 85, 524353]
uno_path = kw.get("uno_path", environ.get('uno_path'))
office_binary_path = kw.get("office_binary_path",
environ.get('office_binary_path'))
command = [path.join(office_binary_path, "python")
, pkg_resources.resource_filename(__name__,
path.join("helper","unomimemapper.py"))
, "'--uno_path=%s'" % uno_path
, "'--office_binary_path=%s'" % office_binary_path
, "'--hostname=%s'" % hostname
, "--port=%s" % port]
command = [path.join(office_binary_path, "python"),
pkg_resources.resource_filename(__name__,
path.join("helper", "unomimemapper.py")),
"--uno_path='%s'" % uno_path,
"--office_binary_path='%s'" % office_binary_path,
"--hostname='%s'" % hostname,
"--port=%s" % port]
stdout, stderr = Popen(' '.join(command),
stdout=PIPE,
close_fds=True,
shell=True, env=getCleanPythonEnvironment()).communicate()
shell=True,
env=getCleanPythonEnvironment()).communicate()
exec(stdout)
for key, value in filter_dict.iteritems():
filter_name = key
......@@ -135,7 +136,7 @@ class MimeMapper(object):
continue
preferred = filter_type_dict.get("Preferred")
document_service_str = value.get('DocumentService')
sort_index = flag
sort_index = flag
doc_type = document_service_str.split('.')[-1]
split_type_list = findall(r'[A-Z][a-z]+', doc_type)
......@@ -153,10 +154,10 @@ class MimeMapper(object):
# e.g: {'com.sun.star.text.TextDocument': [] }
if not self._extension_list_by_type.has_key(document_service_str):
self._extension_list_by_type[document_service_str] = []
for ext in iter(filter_extension_list):
# All mimetypes that starts with "application/vnd.oasis.opendocument" are
# ODF.
# All mimetypes that starts with "application/vnd.oasis.opendocument"
# are ODF.
if ext not in self.extension_list:
self.extension_list.append(ext)
if mimetype.startswith("application/vnd.oasis.opendocument"):
......@@ -186,10 +187,8 @@ class MimeMapper(object):
def getMimetypeByFilterType(self, filter_type):
"""Get Mimetype according to the filter type
Keyword arguments:
filter_type -- string of OOo filter
e.g
>>> mimemapper.getMimetypeByFilterType("writer8")
'application/vnd.oasis.opendocument.text'
......@@ -198,7 +197,6 @@ class MimeMapper(object):
def getFilterName(self, extension, document_service):
"""Get filter name according to the parameters passed.
Keyword arguments:
extension -- expected a string of the file format.
document_type -- expected a string of the document type.
......@@ -227,13 +225,12 @@ class MimeMapper(object):
def getFilterList(self, extension):
"""Search filter by extension, and return the filter as string.
Keyword arguments:
extension -- expected a string of the file format.
e.g
>>> mimemapper.getFilterList("doc")
[<filter.Filter object at 0x9a2602c>,
<filter.Filter object at 0x9a21d6c>,
[<filter.Filter object at 0x9a2602c>,
<filter.Filter object at 0x9a21d6c>,
<filter.Filter object at 0x9a261ec>]
"""
return self._filter_by_extension_dict.get(extension, [])
......@@ -243,11 +240,10 @@ class MimeMapper(object):
document type passed.
e.g
>>> mimemapper.getAllowedExtensionList({"extension":"doc"})
or
or
>>> mimemapper.getAllowedExtensionList({"document_type":"text"})
(('rtf', 'Rich Text Format'),
('htm', 'HTML Document'),)
If both params are passed, document_type is discarded.
"""
allowed_extension_list = []
......@@ -265,7 +261,6 @@ class MimeMapper(object):
for ext in iter(extension_list):
if not ext in allowed_extension_list:
allowed_extension_list.append(ext)
return tuple(allowed_extension_list)
mimemapper = MimeMapper()
......@@ -5,6 +5,7 @@ from cloudooo.application.openoffice import openoffice
monitor_request = None
monitor_memory = None
def load(local_config):
"""Start the monitors"""
global monitor_request
......@@ -12,24 +13,25 @@ def load(local_config):
int(local_config.get('monitor_interval')),
int(local_config.get('limit_number_request')))
monitor_request.start()
if bool(local_config.get('enable_memory_monitor')):
global monitor_memory
monitor_memory = MonitorMemory(openoffice,
int(local_config.get('monitor_interval')),
int(local_config.get('limit_memory_used')))
monitor_memory.start()
return
def stop():
"""Stop all monitors"""
if monitor_request:
monitor_request.terminate()
if monitor_memory:
if monitor_memory:
monitor_memory.terminate()
clear()
def clear():
global monitor_request, monitor_memory
monitor_request = None
......
......@@ -32,13 +32,14 @@ import psutil
from cloudooo.utils import logger
from time import sleep
class MonitorMemory(Monitor, Process):
"""Usefull to control the memory and does not allow use it unnecessarily"""
def __init__(self, openoffice, interval, limit_memory_usage):
"""Expects to receive an object that implements the interfaces IApplication
and ILockable, the limit of memory usage that the openoffice can use and the
interval to check the object."""
and ILockable, the limit of memory usage that the openoffice can use and
the interval to check the object."""
Monitor.__init__(self, openoffice, interval)
Process.__init__(self)
self.limit = limit_memory_usage
......
......@@ -29,6 +29,7 @@
from zope.interface import implements
from cloudooo.interfaces.monitor import IMonitor
class Monitor(object):
""" """
......
......@@ -31,6 +31,7 @@ from threading import Thread
from cloudooo.utils import logger
from time import sleep
class MonitorRequest(Monitor, Thread):
"""Usefull to control the number of request in Object"""
......@@ -45,7 +46,7 @@ class MonitorRequest(Monitor, Thread):
def start(self):
self.status_flag = True
Thread.start(self)
def run(self):
"""Is called by start function"""
logger.debug("Start MonitorRequest")
......
......@@ -31,9 +31,10 @@ from multiprocessing import Process
from time import sleep
from cloudooo.utils import logger
class MonitorTimeout(Monitor, Process):
"""Monitors and controls the time of use of an object"""
def __init__(self, openoffice, interval):
"""Expects to receive an object that implements the interfaces IApplication
and ILockable. And the interval to check the object."""
......
......@@ -49,6 +49,7 @@ Options:
-l filter log Folder to store logs
"""
class Log(object):
"""Object to manipulate the log file"""
......@@ -68,6 +69,7 @@ class Log(object):
"""Flush the internal I/O buffer."""
self._log.flush()
class Client(Process):
"""Represents a client that sends requests to the server. The log file by
default is created in current path, but can be created in another path.
......@@ -123,6 +125,7 @@ class Client(Process):
self.log.msg("Total Duration: %s" % sum(time_list))
self.log.close()
def main():
help_msg = "\nUse --help or -h"
......
......@@ -46,15 +46,18 @@ def check_folder(working_path, tmp_dir_path):
if not path.exists(tmp_dir_path):
mkdir(tmp_dir_path)
def make_suite(test_case):
"""Function is used to run all tests together"""
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(test_case))
return suite
def loadConfig(path):
config.read(path)
def startFakeEnvironment(start_openoffice=True, conf_path=None):
"""Create a fake environment"""
if not conf_path and len(sys.argv) >= 1:
......@@ -63,7 +66,8 @@ def startFakeEnvironment(start_openoffice=True, conf_path=None):
uno_path = config.get("app:main", "uno_path")
working_path = config.get("app:main", "working_path")
virtual_display_id = int(config.get("app:main", "virtual_display_id"))
virtual_display_port_int = int(config.get("app:main", "virtual_display_port"))
virtual_display_port_int = int(config.get("app:main",
"virtual_display_port"))
hostname = config.get("server:main", "host")
openoffice_port = int(config.get("app:main", "openoffice_port"))
office_binary_path = config.get("app:main", "office_binary_path")
......@@ -71,18 +75,17 @@ def startFakeEnvironment(start_openoffice=True, conf_path=None):
check_folder(working_path, tmp_dir)
if not environ.get('uno_path'):
environ['uno_path'] = uno_path
office_binary_path = config.get("app:main", "office_binary_path")
if not environ.get('office_binary_path'):
environ['office_binary_path'] = office_binary_path
if uno_path not in sys.path:
sys.path.append(uno_path)
fundamentalrc_file = '%s/fundamentalrc' % office_binary_path
if path.exists(fundamentalrc_file) and \
not environ.has_key('URE_BOOTSTRAP'):
putenv('URE_BOOTSTRAP','vnd.sun.star.pathname:%s' % fundamentalrc_file)
putenv('URE_BOOTSTRAP', 'vnd.sun.star.pathname:%s' % fundamentalrc_file)
xvfb.loadSettings(hostname,
virtual_display_port_int,
......@@ -90,8 +93,8 @@ def startFakeEnvironment(start_openoffice=True, conf_path=None):
virtual_display_id,
virtual_screen='1')
xvfb.start()
waitStartDaemon(xvfb, 10)
waitStartDaemon(xvfb, 10)
if start_openoffice:
openoffice.loadSettings(hostname,
openoffice_port,
......@@ -111,6 +114,7 @@ def startFakeEnvironment(start_openoffice=True, conf_path=None):
return xvfb
def stopFakeEnvironment(stop_openoffice=True):
"""Stop Openoffice and Xvfb """
if stop_openoffice:
......@@ -118,6 +122,7 @@ def stopFakeEnvironment(stop_openoffice=True):
xvfb.stop()
return True
class cloudoooTestCase(unittest.TestCase):
"""Test Case to load cloudooo conf."""
......
......@@ -11,22 +11,26 @@ from subprocess import Popen
ENVIRONMENT_PATH = path.abspath(path.dirname(__file__))
def wait_liberate_port(hostname, port, timeout_limit=10):
for n in range(timeout_limit):
if not socketStatus(hostname, port):
break
sleep(1)
def wait_use_port(hostname, port, timeout_limit=10):
for n in range(timeout_limit):
if socketStatus(hostname, port):
return
sleep(1)
def exit(msg):
sys.stderr.write(msg)
sys.exit(0)
def run_test(test_name):
module = __import__(test_name)
if not hasattr(module, "test_suite"):
......@@ -36,6 +40,7 @@ def run_test(test_name):
suite.addTest(module.test_suite())
TestRunner(verbosity=2).run(suite)
def run():
DAEMON = OPENOFFICE = XVFB = False
test_name = sys.argv[-1]
......
......@@ -32,6 +32,7 @@ from subprocess import Popen, PIPE
from base64 import encodestring, decodestring
from cloudoooTestCase import cloudoooTestCase, make_suite
class TestAllFormats(cloudoooTestCase):
"""Test XmlRpc Server. Needs cloudooo server started"""
......@@ -66,7 +67,7 @@ class TestAllFormats(cloudoooTestCase):
def runTestForType(self, source_format, document_type, filename):
"""Generic test"""
data = open(filename,'r').read()
data = open(filename, 'r').read()
request = {'document_type': document_type}
extension_list = self.proxy.getAllowedExtensionList(request)
fault_list = []
......@@ -90,6 +91,7 @@ class TestAllFormats(cloudoooTestCase):
if fault_list != []:
raise Fault(1, "\n".join(fault_list))
def test_suite():
return make_suite(TestAllFormats)
......
......@@ -30,6 +30,7 @@ import unittest
from cloudooo.application.application import Application
from cloudoooTestCase import make_suite
class TestApplication(unittest.TestCase):
def setUp(self):
......@@ -59,6 +60,7 @@ class TestApplication(unittest.TestCase):
"""As the application do not have the pid() should return None"""
self.assertEquals(self.application.pid(), None)
def test_suite():
return make_suite(TestApplication)
......
......@@ -35,6 +35,7 @@ from zipfile import ZipFile, is_zipfile
from cloudooo.document import FileSystemDocument
from cloudoooTestCase import make_suite
class TestFileSystemDocument(unittest.TestCase):
"""Test to class FileSystemDocument"""
......@@ -54,8 +55,9 @@ class TestFileSystemDocument(unittest.TestCase):
original state"""
old_document_url = self.fsdocument.getUrl()
document_filename = "document"
document_test_url = path.join(self.fsdocument.directory_name, document_filename)
open(document_test_url,'wb').write(decodestring("Test Document"))
document_test_url = path.join(self.fsdocument.directory_name,
document_filename)
open(document_test_url, 'wb').write(decodestring("Test Document"))
self.fsdocument.reload(document_test_url)
self.assertEquals(path.exists(old_document_url), False)
self.assertNotEquals(self.fsdocument.original_data,
......@@ -79,7 +81,7 @@ class TestFileSystemDocument(unittest.TestCase):
def testLoadDocumentFile(self):
"""Test if the document is created correctly"""
url = self.fsdocument.getUrl()
tmp_document = open(url,'r').read()
tmp_document = open(url, 'r').read()
self.assertEquals(self.data, tmp_document)
self.fsdocument.trash()
self.assertEquals(path.exists(url), False)
......@@ -88,8 +90,9 @@ class TestFileSystemDocument(unittest.TestCase):
"""Change url and check if occurs correctly"""
old_document_url = self.fsdocument.getUrl()
document_filename = "document"
document_test_url = path.join(self.fsdocument.directory_name, document_filename)
open(document_test_url,'wb').write(self.data)
document_test_url = path.join(self.fsdocument.directory_name,
document_filename)
open(document_test_url, 'wb').write(self.data)
self.fsdocument.reload(document_test_url)
url = self.fsdocument.getUrl()
self.assertEquals(path.exists(old_document_url), False)
......@@ -126,11 +129,12 @@ class TestFileSystemDocument(unittest.TestCase):
self.assertEquals(is_zipfile(zip_output_url), True)
zipfile = ZipFile(zip_output_url)
self.assertEquals(sorted(zipfile.namelist()),
sorted(['logo.gif','test.htm']))
sorted(['logo.gif', 'test.htm']))
finally:
if path.exists(zip_output_url):
remove(zip_output_url)
def test_suite():
return make_suite(TestFileSystemDocument)
......
......@@ -30,6 +30,7 @@ import unittest
from cloudooo.filter import Filter
from cloudoooTestCase import make_suite
class TestFilter(unittest.TestCase):
"""Test filter and your interface"""
......@@ -52,6 +53,7 @@ class TestFilter(unittest.TestCase):
self.assertEquals(self.filter.getSortIndex(), 1000)
self.assertEquals(self.filter.isPreferred(), True)
def test_suite():
return make_suite(TestFilter)
......
......@@ -34,6 +34,7 @@ from base64 import encodestring, decodestring
from multiprocessing import Process
from cloudoooTestCase import cloudoooTestCase, make_suite
class TestHighLoad(cloudoooTestCase):
"""Test with many simultaneous connection"""
......@@ -45,7 +46,7 @@ class TestHighLoad(cloudoooTestCase):
"""Test to use method generate of server"""
document = self.proxy.convertFile(data, source_format, destination_format)
document_output_url = os.path.join(self.tmp_url, "%s.%s" % (id, destination_format))
open(document_output_url,'wb').write(decodestring(document))
open(document_output_url, 'wb').write(decodestring(document))
stdout, stderr = subprocess.Popen("file -b %s" % document_output_url,
shell=True, stdout=subprocess.PIPE).communicate()
self.assertEquals(stdout, 'PDF document, version 1.4\n')
......@@ -56,7 +57,7 @@ class TestHighLoad(cloudoooTestCase):
def testGenerateHighLoad(self):
"""Sends many request to Server. Calling generate method"""
process_list = []
data = open("data/test.doc",'r').read()
data = open("data/test.doc", 'r').read()
for id in range(50):
process = Process(target=self.basicTestToGenerate, args=(id,
encodestring(data), 'doc', 'pdf'))
......@@ -67,6 +68,7 @@ class TestHighLoad(cloudoooTestCase):
proc.join()
del proc
def test_suite():
return make_suite(TestHighLoad)
......
......@@ -49,6 +49,7 @@ from cloudooo.interfaces.granulate import ITableGranulator, \
ITextGranulator
from cloudoooTestCase import make_suite
class TestInterface(unittest.TestCase):
"""Test All Interfaces"""
......@@ -79,7 +80,7 @@ class TestInterface(unittest.TestCase):
def testIFilter(self):
"""Test if Filter implements IDocument"""
self.assertEquals(IFilter.implementedBy(Filter),True)
self.assertEquals(IFilter.implementedBy(Filter), True)
self.assertEquals(IFilter.names(), ['getLabel', 'getName', 'getSortIndex',
'isPreferred', 'getDocumentService', 'getExtension', 'getMimetype'])
......@@ -100,9 +101,9 @@ class TestInterface(unittest.TestCase):
self.assertEquals(IManager.get('getAllowedExtensionList').required,
('request_dict',))
self.assertEquals(IManager.get('getFileMetadataItemList').required,
('file','source_format', 'base_document'))
('file', 'source_format', 'base_document'))
self.assertEquals(IManager.get('updateFileMetadata').required,
('file','source_format', 'metadata_dict'))
('file', 'source_format', 'metadata_dict'))
def testIMimeMapper(self):
"""Test if Mimemapper implements IMimemapper."""
......@@ -112,10 +113,10 @@ class TestInterface(unittest.TestCase):
for method in method_list:
self.assertEquals(method in IMimemapper.names(), True)
self.assertEquals(IMimemapper.implementedBy(MimeMapper),True)
self.assertEquals(len(method_list),len(IMimemapper.names()))
self.assertEquals(IMimemapper.get('getFilterName').required, ('extension',
'document_type'))
self.assertEquals(IMimemapper.implementedBy(MimeMapper), True)
self.assertEquals(len(method_list), len(IMimemapper.names()))
self.assertEquals(IMimemapper.get('getFilterName').required,
('extension', 'document_type'))
self.assertEquals(IMimemapper.get('loadFilterList').required, ())
self.assertEquals(IMimemapper.get('getFilterList').required, ('extension',))
self.assertEquals(IMimemapper.get('getDocumentTypeDict').required, ())
......@@ -166,4 +167,3 @@ def test_suite():
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TestInterface)
unittest.TextTestRunner(verbosity=2).run(suite)
......@@ -170,6 +170,7 @@ chart_expected_tuple = (('sds', 'StarChart 3.0'),
('sxs', 'OpenOffice.org 1.0 Chart'),
('odc', 'ODF Chart'))
class TestMimeMapper(cloudoooTestCase):
"""Test if object load filters correctly of OOo."""
......@@ -227,11 +228,11 @@ class TestMimeMapper(cloudoooTestCase):
def testGetFilterByExt(self):
"""Test if passing the extension the filter returns corretcly."""
pdf_filter_list = self.mimemapper.getFilterList('pdf')
self.assertEquals(len(pdf_filter_list),7)
self.assertEquals(len(pdf_filter_list), 7)
xls_filter_list = self.mimemapper.getFilterList('xls')
self.assertEquals(len(xls_filter_list),5)
self.assertEquals(len(xls_filter_list), 5)
doc_filter_list = self.mimemapper.getFilterList('doc')
self.assertEquals(len(doc_filter_list),3)
self.assertEquals(len(doc_filter_list), 3)
def testGetDocumentTypeDict(self):
"""Test if dictonary document type returns type correctly."""
......@@ -337,11 +338,14 @@ class TestMimeMapper(cloudoooTestCase):
def testGetFilterName(self):
"""Test if passing extension and document_type, the filter is correct."""
filtername = self.mimemapper.getFilterName("pdf", 'com.sun.star.text.TextDocument')
filtername = self.mimemapper.getFilterName("pdf",
'com.sun.star.text.TextDocument')
self.assertEquals(filtername, "writer_pdf_Export")
filtername = self.mimemapper.getFilterName('ppt', 'com.sun.star.presentation.PresentationDocument')
self.assertEquals(filtername,"MS PowerPoint 97")
filtername = self.mimemapper.getFilterName("html", 'com.sun.star.presentation.PresentationDocument')
filtername = self.mimemapper.getFilterName('ppt',
'com.sun.star.presentation.PresentationDocument')
self.assertEquals(filtername, "MS PowerPoint 97")
filtername = self.mimemapper.getFilterName("html",
'com.sun.star.presentation.PresentationDocument')
self.assertEquals(filtername, "impress_html_Export")
def testGetMimetype(self):
......@@ -353,6 +357,7 @@ class TestMimeMapper(cloudoooTestCase):
self.assertEquals(self.mimemapper.getMimetypeByFilterType("writer_MS_Word_97"),\
'application/msword')
def test_suite():
return make_suite(TestMimeMapper)
......
......@@ -32,6 +32,7 @@ from cloudoooTestCase import cloudoooTestCase, make_suite
from cloudooo.monitor.request import MonitorRequest
from cloudooo.monitor.memory import MonitorMemory
class TestMonitorInit(cloudoooTestCase):
"""Test Case to test if the monitors are controlled correctly"""
......@@ -69,6 +70,7 @@ class TestMonitorInit(cloudoooTestCase):
MonitorMemory),
True)
def test_suite():
return make_suite(TestMonitorInit)
......
......@@ -34,6 +34,7 @@ from psutil import Process
from types import IntType
from cloudoooTestCase import make_suite
class TestMonitorMemory(unittest.TestCase):
"""Test case to see if the MonitorMemory is properly managing the
openoffice."""
......@@ -103,6 +104,7 @@ class TestMonitorMemory(unittest.TestCase):
memory_usage_int = self.monitor.get_memory_usage()
self.assertEquals(type(memory_usage_int), IntType)
def test_suite():
return make_suite(TestMonitorMemory)
......
......@@ -32,6 +32,7 @@ from cloudooo.monitor.request import MonitorRequest
from cloudoooTestCase import cloudoooTestCase, make_suite
from cloudooo.application.openoffice import openoffice
class TestMonitorRequest(cloudoooTestCase):
"""Test all features of a monitor following the interface"""
......@@ -57,6 +58,7 @@ class TestMonitorRequest(cloudoooTestCase):
self.assertEquals(openoffice.request, 0)
monitor_request.terminate()
def test_suite():
return make_suite(TestMonitorRequest)
......
......@@ -32,6 +32,7 @@ from cloudooo.application.openoffice import openoffice
from cloudooo.monitor.timeout import MonitorTimeout
from cloudoooTestCase import make_suite
class TestMonitorTimeout(unittest.TestCase):
"""Test all features of a monitor following the interface"""
......@@ -87,6 +88,7 @@ class TestMonitorTimeout(unittest.TestCase):
monitor_timeout.terminate()
openoffice.release()
def test_suite():
return make_suite(TestMonitorTimeout)
......
......@@ -68,4 +68,3 @@ def test_suite():
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TestOOGranulate)
unittest.TextTestRunner(verbosity=2).run(suite)
......@@ -35,6 +35,7 @@ from cloudooo.handler.oohandler import OOHandler
from cloudooo.application.openoffice import openoffice
from cloudoooTestCase import make_suite
class TestOOHandler(cloudoooTestCase):
"""Test OOHandler and manipulation of OOo Instance"""
......@@ -151,6 +152,7 @@ class TestOOHandler(cloudoooTestCase):
metadata = new_handler.getMetadata()
self.assertEquals(metadata.get('Title'), "cloudooo Test -")
def test_suite():
return make_suite(TestOOHandler)
......
......@@ -32,6 +32,7 @@ from cloudooo.application.openoffice import OpenOffice
from cloudoooTestCase import make_suite
from cloudooo.utils import waitStopDaemon
class TestOpenOffice(cloudoooTestCase):
"""Test OpenOffice object and manipulation of OOo Instance"""
......@@ -85,6 +86,7 @@ class TestOpenOffice(cloudoooTestCase):
self.openoffice.release()
self.assertEquals(self.openoffice.isLocked(), False)
def test_suite():
return make_suite(TestOpenOffice)
......
......@@ -36,6 +36,7 @@ from cloudoooTestCase import cloudoooTestCase, make_suite
from zipfile import ZipFile, is_zipfile
from types import DictType
class TestServer(cloudoooTestCase):
"""Test XmlRpc Server. Needs cloudooo server started"""
......@@ -115,13 +116,13 @@ class TestServer(cloudoooTestCase):
def testGetAllowedExtensionListByType(self):
"""Call getAllowedExtensionList and verify if the returns is a list with
extension and ui_name. The request is by document type"""
text_request = {'document_type':"text"}
text_request = {'document_type': "text"}
text_allowed_list = self.proxy.getAllowedExtensionList(text_request)
text_allowed_list.sort()
for arg in text_allowed_list:
self.assertTrue(arg in self.text_expected_list,
"%s not in %s" % (arg, self.text_expected_list))
request_dict = {'document_type':"presentation"}
request_dict = {'document_type': "presentation"}
presentation_allowed_list = self.proxy.getAllowedExtensionList(request_dict)
presentation_allowed_list.sort()
for arg in presentation_allowed_list:
......@@ -131,7 +132,7 @@ class TestServer(cloudoooTestCase):
def testGetAllowedExtensionListByExtension(self):
"""Call getAllowedExtensionList and verify if the returns is a list with
extension and ui_name. The request is by extension"""
doc_allowed_list = self.proxy.getAllowedExtensionList({'extension':"doc"})
doc_allowed_list = self.proxy.getAllowedExtensionList({'extension': "doc"})
doc_allowed_list.sort()
for arg in doc_allowed_list:
self.assertTrue(arg in self.text_expected_list,
......@@ -140,7 +141,7 @@ class TestServer(cloudoooTestCase):
def testGetAllowedExtensionListByMimetype(self):
"""Call getAllowedExtensionList and verify if the returns is a list with
extension and ui_name. The request is by mimetype"""
request_dict = {"mimetype":"application/msword"}
request_dict = {"mimetype": "application/msword"}
msword_allowed_list = self.proxy.getAllowedExtensionList(request_dict)
msword_allowed_list.sort()
for arg in msword_allowed_list:
......@@ -523,6 +524,7 @@ class TestServer(cloudoooTestCase):
self.assertEquals(len(response_dict['response_data']), 31)
self.assertTrue(['htm', 'HTML Document'] in response_dict['response_data'])
def test_suite():
return make_suite(TestServer)
......
......@@ -35,6 +35,7 @@ from cloudoooTestCase import cloudoooTestCase, make_suite
from cloudooo.application.openoffice import openoffice
from cloudooo.document import FileSystemDocument
class TestUnoConverter(cloudoooTestCase):
"""Test case to test all features of the unoconverter script"""
......@@ -89,6 +90,7 @@ class TestUnoConverter(cloudoooTestCase):
self.document.trash()
self.assertEquals(exists(output_url), False)
def test_suite():
return make_suite(TestUnoConverter)
......
......@@ -33,6 +33,7 @@ from subprocess import Popen, PIPE
from os import environ, path
from cloudoooTestCase import cloudoooTestCase, make_suite
class TestUnoMimeMapper(cloudoooTestCase):
"""Test Case to test all features of script unomimemapper"""
......@@ -107,6 +108,7 @@ class TestUnoMimeMapper(cloudoooTestCase):
self.assertEquals(stderr.endswith(error_msg), True)
openoffice.start()
def test_suite():
return make_suite(TestUnoMimeMapper)
......
......@@ -31,6 +31,7 @@ import logging
from cloudooo.utils import logger, configureLogger, convertStringToBool
from cloudoooTestCase import make_suite
class TestUtils(unittest.TestCase):
"""Test Utils"""
......@@ -51,6 +52,7 @@ class TestUtils(unittest.TestCase):
self.assertEquals(convertStringToBool('faLse'), False)
self.assertEquals(convertStringToBool(''), None)
def test_suite():
return make_suite(TestUtils)
......
......@@ -31,6 +31,7 @@ from cloudoooTestCase import cloudoooTestCase, make_suite
from cloudooo.application.xvfb import Xvfb
from cloudooo.utils import waitStopDaemon
class TestXvfb(cloudoooTestCase):
def afterSetUp(self):
......@@ -64,6 +65,7 @@ class TestXvfb(cloudoooTestCase):
waitStopDaemon(self.xvfb)
self.assertEquals(self.xvfb.status(), False)
def test_suite():
return make_suite(TestXvfb)
......
......@@ -50,6 +50,7 @@ PYTHON_ENVIRONMENT = [
'PYTHONVERBOSE'
]
def getCleanPythonEnvironment():
env = environ.copy()
# Clean python related environment variables
......@@ -57,6 +58,7 @@ def getCleanPythonEnvironment():
env.pop(k, None)
return env
def removeDirectory(path):
"""Remove directory"""
try:
......@@ -64,6 +66,7 @@ def removeDirectory(path):
except OSError, msg:
logger.error(msg)
def socketStatus(hostname, port):
"""Verify if the address is busy."""
try:
......@@ -75,6 +78,7 @@ def socketStatus(hostname, port):
# True if the isn't free
return True
def waitStartDaemon(daemon, attempts):
"""Wait a certain time to start the daemon."""
for num in range(attempts):
......@@ -82,6 +86,7 @@ def waitStartDaemon(daemon, attempts):
if daemon.status():
return
def waitStopDaemon(daemon, attempts=5):
"""Wait a certain time to stop the daemon."""
for num in range(attempts):
......@@ -89,25 +94,25 @@ def waitStopDaemon(daemon, attempts=5):
if not daemon.status():
break
def configureLogger(level=None, debug_mode=False):
"""Configure logger.
Keyword arguments:
level -- Level to prints the log messages
"""
if level is None:
level = logging.INFO
if debug_mode:
level = logging.DEBUG
handler_list = logger.handlers
handler_list = logger.handlers
if handler_list:
for handler in iter(handler_list):
logger.removeHandler(handler)
# The propagate value indicates whether or not parents of this loggers will
# be traversed when looking for handlers. It doesn't really make sense in the
# root logger - it's just there because a root logger is almost like any
# root logger - it's just there because a root logger is almost like any
# other logger.
logger.propagate = 0
logger.setLevel(level)
......@@ -121,15 +126,16 @@ def configureLogger(level=None, debug_mode=False):
# add ch to logger
logger.addHandler(ch)
def remove_file(filepath):
try:
remove(filepath)
except OSError, msg:
print msg.strerror
def convertStringToBool(string):
"""This function is used to convert string 'true' and 'false' only.
Keyword arguments:
string -- string to convert to boolean
"""
......
......@@ -12,14 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from SimpleXMLRPCServer import SimpleXMLRPCDispatcher
from SimpleXMLRPCServer import SimpleXMLRPCDispatcher
class WSGIXMLRPCApplication(object):
"""Application to handle requests to the XMLRPC service"""
def __init__(self, instance=None, methods=[]):
"""Create windmill xmlrpc dispatcher"""
self.dispatcher = SimpleXMLRPCDispatcher(allow_none=True, encoding=None)
self.dispatcher = SimpleXMLRPCDispatcher(allow_none=True,
encoding=None)
if instance is not None:
self.dispatcher.register_instance(instance)
for method in methods:
......@@ -31,16 +33,17 @@ class WSGIXMLRPCApplication(object):
if environ['REQUEST_METHOD'] == 'POST':
return self.handle_POST(environ, start_response)
else:
start_response("400 Bad request", [('Content-Type','text/plain')])
start_response("400 Bad request", [('Content-Type', 'text/plain')])
return ['']
def handle_POST(self, environ, start_response):
"""Handles the HTTP POST request.
Attempts to interpret all HTTP POST requests as XML-RPC calls,
which are forwarded to the server's _dispatch method for handling.
Most code taken from SimpleXMLRPCServer with modifications for wsgi and my custom dispatcher.
Most code taken from SimpleXMLRPCServer with modifications for wsgi and
my custom dispatcher.
"""
try:
# Get arguments by reading body of request.
......@@ -48,28 +51,28 @@ class WSGIXMLRPCApplication(object):
length = int(environ['CONTENT_LENGTH'])
data = environ['wsgi.input'].read(length)
max_chunk_size = 10*1024*1024
max_chunk_size = 10 * 1024 * 1024
size_remaining = length
# In previous versions of SimpleXMLRPCServer, _dispatch
# could be overridden in this class, instead of in
# SimpleXMLRPCDispatcher. To maintain backwards compatibility,
# check to see if a subclass implements _dispatch and
# check to see if a subclass implements _dispatch and
# using that method if present.
response = self.dispatcher._marshaled_dispatch(
data, getattr(self.dispatcher, '_dispatch', None)
)
response += '\n'
except: # This should only happen if the module is buggy
except: # This should only happen if the module is buggy
# internal error, report as HTTP server error
start_response("500 Server error", [('Content-Type', 'text/plain')])
start_response("500 Server error", [('Content-Type',
'text/plain')])
return []
else:
# got a valid XML RPC response
start_response("200 OK", [('Content-Type','text/xml'), ('Content-Length', str(len(response)),)])
start_response("200 OK", [('Content-Type', 'text/xml'),
('Content-Length', str(len(response)),)])
return [response]
def __call__(self, environ, start_response):
return self.handler(environ, start_response)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment