Commit 9105fd56 authored by Gabriel Monnerat's avatar Gabriel Monnerat

Commit initial version

git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk/utils@37406 20353a03-c40f-0410-a6d1-a30d3c3de9de
parents
all: install
PYTHON=python2.6
install:
@$(PYTHON) setup.py install
0.6
===
- removed the configurationmanager.py.
0.5
===
- Removed the possibility of infinite loop in getAvailableOO.
- Corrected the problem when the document in file system isn't
found(IllegalArgumentException).
- Implemented in oohandler to when not exist OOo Instance free. If don't exist,
the requisition is locked.
0.4
===
Created class to manipulate the configuration file(ConfigurationManager).
Now, all paths are taken by this object and in configuration file is possible
set the number of OOo instance that you want. The OOFactory object was implemented
to start the max number of OOo instance and get the instance available to use.
0.3
===
Split start.py into openoffice.py and xvfb.py. The classes openoffice and xvfb have
methods to control the process(start,stop and status about the process). Created one class
and one object for it, for easier reload the configuration.
0.2
===
Extends the code to convert a document to pdf. Initially the connection with OO
was implemented in class worker, but should be removed and implemented separately.
0.1
===
Create a prototype using WSGI and XMLRPC. One class calculator was used as test.
Install Dependencies
--------------------
System Dependencies
- Xvfb
Python Dependencies
- zope.interface
- WSGIUtils
- PasteDeploy
- PasteScript
Instalation in Mandriva:
- urpmi xvfb
- urpmi python-setuptools
- easy_install-2.6 zope.interface PasteDeploy PasteScript WSGIUtils
Instalation of Openoffice
Was used for testing the package's official openoffice.Follow these steps to install:
- Download the package from the official site;
- unpack the tar.gz and Install;
The instalation is in /opt
How to use
----------
- Start the Daemon
./cloudooo start
Description of Daemon
---------------------
- XMLRPC + WSGI will be one bridge for easy access OpenOffice.org. This will implement one XMLRPC server into WSGI (Paster).
- PyUno is used to connect to OpenOffice.org stated with open socket. The features will be handled all by pyuno.
- Xvfb is used to run Openoffice.org. This is controlled by Daemon(cloudooo).
- Only a process will have access to OpenOffice.org by time.
- All clients receive the same object(proxy) when connects with XMLRPC Server.
Xvfb and OpenOffice
- configure and start Xvfb;
- Use a single Xvfb;
- the xvfb will be started with the XMLRPC Server;
- When start the Daemon(cloudooo), it configures Xvfb, next opens the openoffice(with pyuno) and start XMLRPC Server;
- control Xvfb;
- start openoffice;
- Pyuno start the openoffice processes and the communication is through sockets;
- Openoffice processes run in brackground and in virtual display;
- control openoffice;
- The socket can't lose the connection, if this occurs should kill the process and submit again the file;
XMLRPC Server - XMLRPC + WSGI
-----------------------------
- Send document to openoffice and return the document converted with metadata;
- XMLRPC receives a file and connects to a openoffice by pyuno;
- The pyuno opens a new openoffice, write, add metadata and returns the document edited or converted to xmlrpc and it return the document to the user;
- When finalize the use of openoffice, should make sure that it was finalized;
- Export to another format;
- Invite document and return metadata only;
- Edit metadata of the document;
- Problems and possible solution
- OpenOffice is stalled;
- finalize the process, start openoffice and submit the document again(without restart the cloudooo);
- Openoffice is crashed;
- finalize the process, verify if all the process was killed, start openoffice and submit the document again(without restart the cloudooo)
- OpenOffice received the document and stalled;
- if openoffice isn't responding, kill the process and start
- The document that was sent is corrupt;
- write in log the error and verify that the process aren't in memory
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import implements
from cloudooo.interfaces.application import IApplication
from cloudooo.utils import logger, socketStatus
from psutil import pid_exists, Process
class Application(object):
"""Base object to create an object that is possible manipulation a
process"""
implements(IApplication)
name = "application"
def start(self):
"""Start Application"""
logger.debug("Process Started %s, Port %s. Pid %s" % (self.name,
self.getAddress()[-1],
self.pid()))
def stop(self):
"""Stop the process"""
if hasattr(self, 'process') and self.status():
process_pid = self.process.pid
logger.debug("Stop Pid - %s" % process_pid)
try:
self.process.terminate()
finally:
if pid_exists(process_pid) or self.status():
Process(process_pid).kill()
def loadSettings(self, hostname, port, path_run_dir, display_id, **kwargs):
"""Define attributes for application instance
Keyword arguments:
hostname -- Host to start the instance.
port -- Expected a int number.
path_run_dir -- Full path to create the enviroment.
display_id -- Display to open the OpenOffice.
"""
self.hostname = hostname
self.port = port
self.path_run_dir = path_run_dir
self.display_id = display_id
self.timeout = kwargs.get('start_timeout', 20)
def restart(self):
"""Start and Stop the process"""
self.stop()
self.start()
def status(self):
"""Check by socket if the openoffice work."""
return socketStatus(self.hostname, self.port)
def getAddress(self):
"""Return port and hostname of OOo Instance."""
return self.hostname, self.port
def pid(self):
"""Returns the pid"""
if not hasattr(self, 'process') or not self.status():
return None
return self.process.pid
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from os import environ, remove
from os.path import exists, join
from subprocess import Popen, PIPE
from threading import Lock
from cloudooo.ooolib import setUpUnoEnvironment
from zope.interface import implements
from application import Application
from cloudooo.interfaces.lockable import ILockable
from cloudooo.utils import logger, waitStartDaemon, removeDirectory, waitStopDaemon
class OpenOffice(Application):
"""Object to control one OOo Instance and all features instance."""
implements(ILockable)
name = "openoffice"
def __init__(self):
"""Creates the variable to save the pid, port and hostname of the object.
The lock is a simple lock python that is used when one requisition is
using the openoffice.
"""
self._bin_soffice = 'soffice.bin'
self._lock = Lock()
self._cleanRequest()
def _testOpenOffice(self, host, port):
"""Test if OpenOffice was started correctly"""
logger.debug("Test OpenOffice %s - Pid %s" % (self.getAddress()[-1], self.pid()))
command = [self.python_path
, self.unoconverter_bin
, "--test"
, "--hostname=%s" % host
, "--port=%s" % port
, "--document_url=%s" % self.document_url
, "--unomimemapper_bin=%s" % self.unomimemapper_bin
, "--python_path=%s" % self.python_path
, "--uno_path=%s" % self.uno_path
, "--office_bin_path=%s" % self.office_bin_path]
logger.debug("Testing Openoffice Instance %s" % port)
stdout, stderr = Popen(" ".join(command), shell=True, stdout=PIPE,
stderr=PIPE).communicate()
if not stdout and stderr != "":
logger.debug(stderr)
return False
else:
url = stdout.replace("\n", "")
logger.debug("Instance %s works" % port)
remove(url)
return True
def _cleanRequest(self):
"""Define request attribute as 0"""
self.request = 0
def loadSettings(self, hostname, port, path_run_dir, display_id,
office_bin_path, uno_path, **kw):
"""Method to load the configuratio to control one OpenOffice Instance
Keyword arguments:
office_path -- Full Path of the OOo executable.
e.g office_bin_path='/opt/openoffice.org3/program'
uno_path -- Full path of the Uno Library
"""
Application.loadSettings(self, hostname, port, path_run_dir, display_id)
setUpUnoEnvironment(uno_path, office_bin_path)
self.office_bin_path = office_bin_path
self.uno_path = uno_path
self.process_name = "soffice.bin"
self.document_url = kw.get('document_url', '')
self.unoconverter_bin = kw.get("unoconverter_bin", "unoconverter")
self.python_path = kw.get('python_path', 'python')
self.unomimemapper_bin = kw.get("unomimemapper_bin")
def _start_process(self, command, env):
"""Start OpenOffice.org process"""
self.process = Popen(' '.join(command),
stdout=PIPE,
shell=True,
close_fds=True,
env=env)
waitStartDaemon(self, self.timeout)
if exists(self.document_url):
return self._testOpenOffice(self.hostname, self.port)
return True
def start(self):
"""Start Instance."""
self.path_user_installation = join(self.path_run_dir, \
"cloudooo_instance_%s" % self.port)
if exists(self.path_user_installation):
removeDirectory(self.path_user_installation)
# Create command with all parameters to start the instance
self.command = [join(self.office_bin_path, self._bin_soffice)
, '-invisible'
, '-nologo'
, '-nodefault'
, '-norestore'
, '-nofirststartwizard'
, '"-accept=socket,host=%s,port=%d;urp;StarOffice.ComponentContext"' % \
(self.hostname, self.port)
, '-env:UserInstallation=file://%s' % self.path_user_installation]
# To run the instance OOo is need a environment. So, the "DISPLAY" of Xvfb
# is passed to env and the environment customized is passed to the process
env = environ.copy()
env["DISPLAY"] = ":%s" % self.display_id
process_started = self._start_process(self.command, env)
if not process_started:
self.stop()
waitStopDaemon(self, self.timeout)
self._start_process(self.command, env)
self._cleanRequest()
Application.start(self)
def stop(self):
"""Stop the instance by pid. By the default
the signal is 15."""
Application.stop(self)
if exists(self.path_user_installation):
removeDirectory(self.path_user_installation)
self._cleanRequest()
def isLocked(self):
"""Verify if OOo instance is being used."""
return self._lock.locked()
def acquire(self):
"""Lock Instance to use."""
self.request += 1
self._lock.acquire()
def release(self):
"""Unlock Instance."""
logger.debug("OpenOffice %s, %s unlocked" % self.getAddress())
self._lock.release()
openoffice = OpenOffice()
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from subprocess import Popen, PIPE
from cloudooo.utils import logger, waitStartDaemon, remove_file
from zope.interface import implements
from application import Application
from cloudooo.interfaces.application import IApplication
from os.path import exists
class Xvfb(Application):
"""Start and control Xvfb. It is used to open/run
instances OpenOffice.
"""
implements(IApplication)
name = "xvfb"
def loadSettings(self, hostname, port, path_run_dir, display_id, **kwargs):
"""Method to load the configuration to run and monitoring the Xvfb.
Keyword Arguments:
virtual_screen -- the display number
"""
Application.loadSettings(self, hostname, port, path_run_dir, display_id)
self.virtual_screen = kwargs.get('virtual_screen', '0')
self.process_name = "Xvfb"
def start(self):
"""Method to start Virtual Frame Buffer."""
self.command = ["Xvfb", "-ac", ":%s" % self.display_id, \
"-screen", self.virtual_screen, "800x600x16", \
"-fbdir", self.path_run_dir]
self.process = Popen(self.command,
stdout=PIPE,
close_fds=True)
waitStartDaemon(self, self.timeout)
Application.start(self)
logger.debug("Xvfb pid - %s" % self.pid())
def stop(self):
"""Stop Xvfb processes and remove lock file in file system"""
Application.stop(self)
lock_filepath = '/tmp/.X%s-lock' % self.display_id
if exists(lock_filepath):
remove_file(lock_filepath)
display_filepath = '/tmp/X11/X%s' % self.display_id
if exists(display_filepath):
remove_file(display_filepath)
xvfb = Xvfb()
This diff is collapsed.
#!/usr/bin/env python
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import sys
from os import environ
from getopt import getopt, GetoptError
from cloudooo.ooolib import getServiceManager
from cloudooo.utils import usage
from types import InstanceType
__doc__ = """
usage: unomimemapper.py [options]
Options:
-h, --help this help screen
--hostname=STRING OpenOffice Instance address
--port=STRING OpenOffice Instance port
--office_bin_path=STRING_URL
Folder path were is the binary openoffice
--uno_path=STRING_URL
Folter path were is the uno library
"""
class UnoMimemapper(object):
""" """
def __init__(self, hostname, port):
""" Receives hostname and port from openoffice and create a service manager"""
self.service_manager = getServiceManager(hostname, port)
def _getElementNameByService(self, uno_service, ignore_name_list=[]):
"""Returns an dict with elements."""
name_list = uno_service.getElementNames()
service_dict = {}
for name in iter(name_list):
element_dict = {}
element_list = uno_service.getByName(name)
for obj in iter(element_list):
if obj.Name in ignore_name_list:
continue
elif type(obj.Value) == InstanceType:
continue
element_dict[obj.Name] = obj.Value
service_dict[name] = element_dict
return service_dict
def getFilterDict(self):
"""Return all filters and your properties"""
filter_service = self.service_manager.createInstance("com.sun.star.document.FilterFactory")
filter_dict = self._getElementNameByService(filter_service, ["UINames", "UserData"])
return filter_dict
def getTypeDict(self):
"""Return all types and your properties"""
type_service = self.service_manager.createInstance("com.sun.star.document.TypeDetection")
type_dict = self._getElementNameByService(type_service, ["UINames", "URLPattern"])
return type_dict
def help():
usage(sys.stderr, __doc__)
sys.exit(1)
def main():
try:
opt_list, arg_list = getopt(sys.argv[1:], "h", ["help",
"uno_path=", "office_bin_path=",
"hostname=", "port="])
except GetoptError, msg:
msg = msg.msg + "\nUse --help or -h"
usage(sys.stderr, msg)
sys.exit(2)
if not opt_list:
help()
for opt, arg in opt_list:
if opt in ("-h", "--help"):
help()
if opt == "--uno_path":
environ['uno_path'] = arg
elif opt == "--office_bin_path":
environ['office_bin_path'] = arg
elif opt == '--hostname':
hostname = arg
elif opt == "--port":
port = arg
mimemapper = UnoMimemapper(hostname, port)
filter_dict = mimemapper.getFilterDict()
type_dict = mimemapper.getTypeDict()
print "filter_dict = %s\ntype_dict = %s" % (filter_dict, type_dict)
if "__main__" == __name__:
main()
#!/bin/sh
lsb_functions="/etc/rc.d/init.d/functions"
if test -f $lsb_functions ; then
. $lsb_functions
else
log_success_msg()
{
echo " SUCCESS! $@"
}
log_failure_msg()
{
echo " ERROR! $@"
}
fi
# complete path of the cloudooo folder
FOLDER=$(cd $(dirname $0); pwd -P)/
# get the pid of the cloudooo
CLOUDOOO_PID=$FOLDER/paster.pid
# complete path of this script
SELF=$FOLDER$(basename $0)
case "$1" in
start)
paster serve ./samples/cloudooo.conf --daemon;;
stop)
kill -1 `cat $CLOUDOOO_PID`;
paster serve ./samples/cloudooo.conf --stop-daemon;;
restart)
$SELF stop;
$SELF start;;
*)
echo "Usage: ./cloudooo {start|stop|restart}";
exit 1;;
esac
exit 0
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from signal import signal, SIGHUP
from application.openoffice import openoffice
from application.xvfb import xvfb
from wsgixmlrpcapplication import WSGIXMLRPCApplication
from utils import convertStringToBool, configureLogger, cleanDirectory
from os import path
import monitor
from mimemapper import mimemapper
import gc
def stopProcesses():
monitor.stop()
xvfb.stop()
openoffice.stop()
def application(global_config, **local_config):
"""Method to load all configuration of cloudooo and start the application.
To start the application a number of params are required:
Keyword arguments:
debug_mode -- Mode as the application prints the messages.
e.g debug_mode=logging.DEBUG
path_dir_run_cloudooo -- Full path to create the environment of the processes.
e.g path_dir_run_cloudooo='/var/run/cloudooo'
virtual_display_port -- Port to start the Xvfb.
virtual_display_id -- Sets the display.
e.g virtual_display_id='99'
application_hostname -- Sets the host to Xvfb and Openoffice.
virtual_screen -- Use to define the screen to Xvfb
e.g virtual_screen='0'
number_instances_openoffice -- Defines a number of OOo Instances.
pool_port_range_start -- Initial number to start all OOo Instances.
e.g pool_port_range_start=4060
office_bin_path -- Full Path of the OOo executable.
e.g office_bin_path='/opt/openoffice.org3/program'
uno_path -- Full path to pyuno library.
e.g uno_path='/opt/openoffice.org/program'
"""
gc.enable()
debug_mode = convertStringToBool(local_config.get('debug_mode'))
configureLogger(debug_mode=debug_mode)
document_name = local_config.get("document_name")
# path of directory to run cloudooo
path_dir_run_cloudooo = local_config.get('path_dir_run_cloudooo')
cleanDirectory(path_dir_run_cloudooo, ignore_list=["tmp", document_name])
# directory to create temporary files
cloudooo_path_tmp_dir = path.join(path_dir_run_cloudooo, 'tmp')
cleanDirectory(cloudooo_path_tmp_dir)
# The Xvfb will run in the same local of the OpenOffice
application_hostname = local_config.get('application_hostname')
openoffice_port = int(local_config.get('openoffice_port'))
# Before start Xvfb, first loads the configuration
xvfb.loadSettings(application_hostname,
int(local_config.get('virtual_display_port')),
path_dir_run_cloudooo,
local_config.get('virtual_display_id'),
virtual_screen=local_config.get('virtual_screen'),
start_timeout=local_config.get('start_timeout'))
xvfb.start()
document_url = path.join(path.dirname(__file__),
"tests/data/%s" % document_name)
# Loading Configuration to start OOo Instance and control it
openoffice.loadSettings(application_hostname,
openoffice_port,
path_dir_run_cloudooo,
local_config.get('virtual_display_id'),
local_config.get('office_bin_path'),
local_config.get('uno_path'),
document_url=document_url,
unoconverter_bin=local_config.get('unoconverter_bin'),
python_path=local_config.get('python_path'),
unomimemapper_bin=local_config.get('unomimemapper_bin'))
openoffice.start()
monitor.load(local_config)
# Signal to stop all processes
signal(SIGHUP, lambda x,y: stopProcesses())
# Load all filters
openoffice.acquire()
mimemapper.loadFilterList(application_hostname,
openoffice_port,
unomimemapper_bin=local_config.get('unomimemapper_bin'),
python_path=local_config.get('python_path'))
openoffice.release()
from manager import Manager
timeout_response = int(local_config.get('timeout_response'))
kw = dict(timeout=timeout_response,
unoconverter_bin=local_config.get('unoconverter_bin'),
python_path=local_config.get('python_path'))
cloudooo_manager = Manager(cloudooo_path_tmp_dir, **kw)
return WSGIXMLRPCApplication(instance=cloudooo_manager)
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import mimetypes
from os.path import join, exists, curdir, abspath
from os import listdir, remove, chdir
from zope.interface import implements
from interfaces.document import IDocument
from zipfile import ZipFile, is_zipfile
from shutil import rmtree
import tempfile
class FileSystemDocument(object):
"""Filesystem Document is used to manipulate one temporary document
stored into the filesystem.
"""
implements(IDocument)
def __init__(self, base_folder_url, data, source_format):
"""Create an document into file system and store the URL.
Keyword arguments:
base_folder_url -- Full path to create a temporary folder
data -- Content of the document
"""
self.base_folder_url = base_folder_url
self.directory_name = self._createDirectory()
self.original_data = data
self.source_format = source_format
self.url = self.load()
def _createDirectory(self):
return tempfile.mkdtemp(dir=self.base_folder_url)
def load(self):
"""Creates one Temporary Document and write data into it.
Return the url for the document.
"""
# creates only the url of the file.
file_path = tempfile.mktemp(suffix=".%s" % self.source_format,
dir=self.directory_name)
# stores the data in temporary file
open(file_path, 'wb').write(self.original_data)
# If is a zipfile is need extract all files from whitin the compressed file
if is_zipfile(file_path):
zipfile = ZipFile(file_path)
if 'mimetype' not in zipfile.namelist() and \
'[Content_Types].xml' not in zipfile.namelist():
zipfile.extractall(path=self.directory_name)
zipfile.close()
remove(file_path)
file_list = listdir(self.directory_name)
if 'index.html' in file_list:
file_path = join(self.directory_name, 'index.html')
else:
extension_list = ['text/html', 'application/xhtml+xml']
for file in file_list:
if mimetypes.guess_type(file)[0] in extension_list:
file_path = join(self.directory_name, file)
break
return file_path
def getContent(self, zip=False):
"""Open the file and returns the content.
Keyword Arguments:
zip -- Boolean attribute. If True"""
if zip:
current_dir_url = abspath(curdir)
chdir(self.directory_name)
zip_path = tempfile.mktemp(suffix=".zip", dir=self.directory_name)
file_list = listdir(self.directory_name)
zipfile = ZipFile(zip_path, 'w')
try:
for file in iter(file_list):
zipfile.write(file)
finally:
zipfile.close()
opened_zip = open(zip_path, 'r').read()
remove(zip_path)
chdir(current_dir_url)
return opened_zip
else:
return open(self.url, 'r').read()
def getUrl(self):
"""Returns full path."""
return self.url
def restoreOriginal(self):
"""Remake the document with the original document"""
self.trash()
self.directory_name = self._createDirectory()
self.url = self.load()
def reload(self, url=None):
"""If the first document is temporary and another document is created. Use
reload to load the new document.
Keyword Arguments:
url -- Full path of the new document(default None)
"""
if self.url != url and url is not None:
remove(self.url)
self.url = url
def trash(self):
"""Remove the file in file system."""
if exists(self.directory_name):
rmtree(self.directory_name)
self.url = None
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import implements
from interfaces.filter import IFilter
class Filter(object):
"""Filter of OOo."""
implements(IFilter)
def __init__(self, extension, filter, mimetype, document_service, **kwargs):
"""Receives extension, filter and mimetype of filter and saves in object.
"""
self._extension = extension
self._filter = filter
self._mimetype = mimetype
self._document_service = document_service
self._preferred = kwargs.get('preferred')
self._sort_index = kwargs.get('sort_index')
self._label = kwargs.get("label")
def getLabel(self):
"""Returns label."""
return self._label
def getSortIndex(self):
"""Returns sort index."""
return self._sort_index
def isPreferred(self):
"""Check if this filter is preferred."""
return self._preferred
def getName(self):
"""Returns name of filter."""
return self._filter
def getDocumentService(self):
"""Returns the type of document that can use this filter."""
return self._document_service
def getExtension(self):
"""Returns extension as string."""
return self._extension
def getMimetype(self):
"""Returns mimetype."""
return self._mimetype
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
# Rafael Monnerat <rafael@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import implements
from cloudooo.interfaces.handler import IHandler
from cloudooo.document import FileSystemDocument
class FFMpegHandler:
"""
For each Document inputed is created on instance of this class to manipulate
the document. This Document must be able to create and remove a temporary
document at FS, load, export and export.
"""
implements(IHandler)
def __init__(self, base_folder_url, data, **kw):
"""Creates document in file system and loads it in OOo."""
self.document = FileSystemDocument(base_folder_url, data)
def _getCommand(self, *args, **kw):
"""Transforms all parameters passed in a command"""
return ""
def convert(self, source_format=None, destination_format=None, **kw):
"""Executes a procedure in accordance with parameters passed."""
return ""
def getMetadata(self, converted_data=False):
"""Returns a dictionary with all metadata of document.
Keywords Arguments:
converted_data -- Boolean variable. if true, the document is also returned
along with the metadata."""
return {}
def setMetadata(self, metadata):
"""Returns a document with new metadata.
Keyword arguments:
metadata -- expected an dictionary with metadata.
"""
# Is it supportable?
return ""
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
# Rafael Monnerat <rafael@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import implements
from cloudooo.interfaces.handler import IHandler
from cloudooo.document import FileSystemDocument
class ImageMagickHandler(FileSystemDocument):
"""
For each Document inputed is created on instance of this class to manipulate
the document. This Document must be able to create and remove a temporary
document at FS, load, export and export.
"""
implements(IHandler)
def __init__(self, base_folder_url, data, **kw):
"""Creates document in file system and loads it in OOo."""
self.document = FileSystemDocument(base_folder_url, data)
def _getCommand(self, *args, **kw):
"""Transforms all parameters passed in a command"""
return ""
def convert(self, source_format=None, destination_format=None, **kw):
"""Executes a procedure in accordance with parameters passed."""
return ""
def getMetadata(self, converted_data=False):
"""Returns a dictionary with all metadata of document.
Keywords Arguments:
converted_data -- Boolean variable. if true, the document is also returned
along with the metadata."""
return {}
def setMetadata(self, metadata):
"""Returns a document with new metadata.
Keyword arguments:
metadata -- expected an dictionary with metadata.
"""
# Is it supportable?
return ""
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import jsonpickle
from os import environ
from subprocess import Popen, PIPE
from cloudooo.application.openoffice import openoffice
from zope.interface import implements
from cloudooo.interfaces.handler import IHandler
from cloudooo.mimemapper import mimemapper
from cloudooo.document import FileSystemDocument
from cloudooo.monitor.timeout import MonitorTimeout
from cloudooo.utils import logger
from psutil import pid_exists
class OOHandler:
"""OOHandler is used to access the one Document and OpenOffice.
For each Document inputed is created on instance of this class to manipulate
the document. This Document must be able to create and remove a temporary
document at FS, load, export and export.
"""
implements(IHandler)
def __init__(self, base_folder_url, data, source_format, **kw):
"""Creates document in file system and loads it in OOo."""
self.document = FileSystemDocument(base_folder_url,
data,
source_format)
self.zip = kw.get('zip', False)
self.uno_path = kw.get("uno_path", None)
self.office_bin_path = kw.get("office_bin_path", None)
self.timeout = kw.get("timeout", 600)
self.unoconverter_bin = kw.get('unoconverter_bin', "unoconverter")
self.source_format = source_format
self.python_path = kw.get('python_path', 'python')
if not self.uno_path:
self.uno_path = environ.get("uno_path")
if not self.office_bin_path:
self.office_bin_path = environ.get("office_bin_path")
def _getCommand(self, *args, **kw):
"""Transforms all parameters passed in a command"""
hostname, port = openoffice.getAddress()
kw['hostname'] = hostname
kw['port'] = port
command_list = [self.python_path
, self.unoconverter_bin
, "--uno_path='%s'" % self.uno_path
, "--office_bin_path='%s'" % self.office_bin_path
, "--document_url='%s'" % self.document.getUrl()]
for arg in args:
command_list.insert(2, "--%s" % arg)
for k, v in kw.iteritems():
command_list.append("--%s='%s'" % (k,v))
return ' '.join(command_list)
def _startTimeout(self):
"""start the Monitor"""
self.monitor = MonitorTimeout(openoffice, self.timeout)
self.monitor.start()
return
def _stopTimeout(self):
"""stop the Monitor"""
self.monitor.terminate()
return
def _subprocess(self, command):
"""Run one procedure"""
try:
self._startTimeout()
process = Popen(command,
shell=True,
stdout=PIPE,
stderr=PIPE,
close_fds=True)
stdout, stderr = process.communicate()
finally:
self._stopTimeout()
if pid_exists(process.pid):
process.terminate()
return stdout, stderr
def _callUnoConverter(self, *feature_list, **kw):
""" """
if not openoffice.status():
openoffice.start()
command = self._getCommand(*feature_list, **kw)
stdout, stderr = self._subprocess(command)
if not stdout and stderr != '':
logger.debug(stderr)
if "NoConnectException" in stderr or \
"RuntimeException" in stderr or \
"DisposedException" in stderr:
self.document.restoreOriginal()
openoffice.restart()
kw['document_url'] = self.document.getUrl()
command = self._getCommand(*feature_list, **kw)
stdout, stderr = self._subprocess(command)
elif "ErrorCodeIOException" in stderr:
raise Exception, stderr + \
"This document can not be converted to this format"
else:
raise Exception, stderr
return stdout, stderr
def convert(self, destination_format=None, **kw):
"""Convert a document to another format supported by the OpenOffice
Keyword Arguments:
destination_format -- extension of document as String
"""
logger.debug("Convert: %s > %s" % (self.source_format, destination_format))
openoffice.acquire()
kw['source_format'] = self.source_format
if destination_format:
kw['destination_format'] = destination_format
kw['mimemapper'] = jsonpickle.encode(mimemapper)
try:
stdout, stderr = self._callUnoConverter(*['convert'], **kw)
finally:
openoffice.release()
if self.monitor.is_alive():
self._stopTimeout()
url = stdout.replace('\n', '')
self.document.reload(url)
content = self.document.getContent(self.zip)
self.document.trash()
return content
def getMetadata(self, base_document=False):
"""Returns a dictionary with all metadata of document.
Keywords Arguments:
base_document -- Boolean variable. if true, the document is also returned
along with the metadata."""
openoffice.acquire()
mimemapper_pickled = jsonpickle.encode(mimemapper)
logger.debug("getMetadata")
kw = dict(mimemapper=mimemapper_pickled)
if base_document:
feature_list = ['getmetadata', 'convert']
else:
feature_list = ['getmetadata']
try:
stdout, stderr = self._callUnoConverter(*feature_list, **kw)
finally:
openoffice.release()
if self.monitor.is_alive():
self._stopTimeout()
metadata={}
exec("metadata=%s" % stdout)
if metadata.get("Data"):
self.document.reload(metadata['Data'])
metadata['Data'] = self.document.getContent()
else:
metadata['Data'] = ''
self.document.trash()
return metadata
def setMetadata(self, metadata):
"""Returns a document with new metadata.
Keyword arguments:
metadata -- expected an dictionary with metadata.
"""
openoffice.acquire()
metadata_pickled = jsonpickle.encode(metadata)
logger.debug("setMetadata")
kw = dict(metadata=metadata_pickled)
try:
stdout, stderr = self._callUnoConverter(*['setmetadata'], **kw)
finally:
openoffice.release()
if self.monitor.is_alive():
self._stopTimeout()
doc_loaded = self.document.getContent()
self.document.trash()
return doc_loaded
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import Interface
class IApplication(Interface):
"""Controls and monitors an application"""
def start():
"""Start the application"""
def stop():
"""Stop the application"""
def pid():
"""Get the process pid"""
def loadSettings(hostname,
port,
virtual_display_id,
path_dir_run_cloudooo,
office_bin_path):
"""Load configuration to control OOo Instances"""
def status():
"""Checks if the application works correctly"""
def restart():
"""Restarts the application with the same settings"""
def getAddress():
"""Returns the hostname and port in tuple"""
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import Interface
from zope.interface import Attribute
class IDocument(Interface):
"""Manipulates documents in file system"""
base_folder_url = Attribute("Url of folder that is used to create temporary files")
directory_name = Attribute("String of directory name")
source_format = Attribute("Document Extension")
url = Attribute("Complete path of document in file system")
original_data = Attribute("Original data")
def load():
"""From the data creates one archive into file system using original data"""
def reload(url):
"""In the case of another file with the same content be created, pass the
url to load the new file"""
def getContent(zip):
"""Returns data of document"""
def getUrl():
"""Get the url of temporary file in file system"""
def trash():
"""Remove the file in file system"""
def restoreOriginal():
"""Restore the document with the original document"""
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import Interface
class IFilter(Interface):
"""This is a type filter"""
def getName():
"""Returns filter as string"""
def getDocumentService():
"""Returns document_service"""
def getSortIndex():
"""Returns the calculated value"""
def isPreferred():
"""Returns boolean variable"""
def getLabel():
"""Returns label of filter"""
def getMimetype():
"""Returns mimetype as string"""
def getExtension():
"""Returns extension as string"""
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import Interface
class IHandler(Interface):
"""Handles connections with the openoffice by socket"""
def convert(destination_format):
"""Convert document to ODF"""
def getMetadata(converted_data):
"""Returns a dictionary with all metadata of document. If converted_data is
True, the document is added in dictionary."""
def setMetadata(metadata_dict):
"""Returns a document with the new metadata"""
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import Interface
from zope.interface import Attribute
class ILockable(Interface):
"""Controls concurrency control"""
_lock = Attribute("A primitive lock is in one of two states, 'locked' or \
'unlocked'.")
def acquire():
"""Acquire the lock."""
def release():
"""Release the lock. If this thread doesn't currently have the lock, an
assertion error is raised."""
def isLocked():
"""Boolean method that checks if the object is acquired(locked) or not"""
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import Interface
class IManager(Interface):
"""Provides method to manipulate documents and metadatas using OOo"""
def convertFile(file, source_format, destination_format, zip):
"""Returns the converted file in the given format.
zip parameter can be specified to return the result of conversion
in the form of a zip archive (which may contain multiple parts).
This can be useful to convert a single ODF file to HTML
and png images.
"""
def getFileMetadataItemList(file, source_format, base_document):
"""Returns a list key, value pairs representing the
metadata values for the document. The structure of this
list is "unpredictable" and follows the convention of each file.
"""
def updateFileMetadata(file, source_format, metadata_dict):
"""Updates the file in the given source_format with provided metadata and
return the resulting new file."""
def getAllowedExtensionList(request_dict):
"""Returns a list extension which can be generated from given extension or
document type."""
class IERP5Compatibility(Interface):
""" Provides compatibility interface with ERP5 Project.
This methods provide same API as OpenOffice.org project.
XXX Unfinished Docstring.
"""
def run_convert(filename, file):
"""Returns the metadata and the ODF in dictionary"""
return (200 or 402, dict(), '')
def run_setmetadata(filename, file, metadata_dict):
"""Adds metadata in ODF and returns a new ODF with metadata in
dictionary"""
return (200 or 402, dict(), '')
def run_getmetadata(self, filename, file):
"""Extracts metadata from ODF and returns in dictionary"""
return (200 or 402, dict(), '')
def run_generate(filename, file, something, format, orig_format):
"""It exports a ODF to given format"""
return (200 or 402, dict(), '')
def getAllowedTargetItemList(self, content_type):
"""List types which can be generated from given content type"""
return (200 or 402, dict(), '')
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import Interface
class IMimemapper(Interface):
"""Provide methods to manipulate filters of OOo."""
def isLoaded():
"""Returns if the filters were loaded."""
def getDocumentTypeDict():
"""Returns document type dict."""
def getFilterName(extension, document_type):
"""Returns the name of filter according to parematers passed."""
def loadFilterList(**kwargs):
"""Load all filters of openoffice."""
def getFilterList(extension, **kwargs):
"""Returns a filter list according to extension or other parameters passed.
"""
def getAllowedExtensionList(document_type, **kwargs):
"""Returns a list with extensions which can be used to export according to
document type passed."""
def getMimetypeByFilterType(filter_type):
"""Returns mimetype according to the filter type passed"""
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import Interface
class IMonitor(Interface):
"""Provides features to monitor processes or python objects"""
def run():
"""Start to monitor"""
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from mimetypes import guess_all_extensions
from base64 import encodestring, decodestring
from zope.interface import implements
from interfaces.manager import IManager, IERP5Compatibility
from handler.oohandler import OOHandler
from mimemapper import mimemapper
from utils import logger
class Manager(object):
"""Manipulates requisitons of client and temporary files in file system."""
implements(IManager, IERP5Compatibility)
def __init__(self, path_tmp_dir, **kw):
"""Need pass the path where the temporary document will be created."""
self._path_tmp_dir = path_tmp_dir
self.kw = kw
def convertFile(self, file, source_format, destination_format, zip=False):
"""Returns the converted file in the given format.
Keywords arguments:
file -- File as string in base64
source_format -- Format of original file as string
destination_format -- Target format as string
zip -- Boolean Attribute. If true, returns the file in the form of a
zip archive
"""
if not mimemapper.getFilterList(destination_format):
raise ValueError, "This format is not supported or is invalid"
self.kw['zip'] = zip
document = OOHandler(self._path_tmp_dir,
decodestring(file),
source_format,
**self.kw)
decode_data = document.convert(destination_format)
return encodestring(decode_data)
def updateFileMetadata(self, file, source_format, metadata_dict):
"""Receives the string of document and a dict with metadatas. The metadata
is added in document.
e.g
self.updateFileMetadata(data = encodestring(data), metadata = \
{"title":"abc","description":...})
return encodestring(document_with_metadata)
"""
document = OOHandler(self._path_tmp_dir,
decodestring(file),
source_format,
**self.kw)
metadata_dict = dict([(key.capitalize(), value) \
for key, value in metadata_dict.iteritems()])
decode_data = document.setMetadata(metadata_dict)
return encodestring(decode_data)
def getFileMetadataItemList(self, file, source_format, base_document=False):
"""Receives the string of document as encodestring and returns a dict with
metadatas.
e.g.
self.getFileMetadataItemList(data = encodestring(data))
return {'Title': 'abc','Description': 'comments', 'Data': None}
If converted_data is True, the ODF of data is added in dictionary.
e.g
self.getFileMetadataItemList(data = encodestring(data), True)
return {'Title': 'abc','Description': 'comments', 'Data': string_ODF}
Note that all keys of the dictionary have the first word in uppercase.
"""
document = OOHandler(self._path_tmp_dir,
decodestring(file),
source_format,
**self.kw)
metadata_dict = document.getMetadata(base_document)
metadata_dict['Data'] = encodestring(metadata_dict['Data'])
return metadata_dict
def getAllowedExtensionList(self, request_dict={}):
"""List types which can be generated from given type
Type can be given as:
- filename extension
- document type ('text', 'spreadsheet', 'presentation' or 'drawing')
e.g
self.getAllowedMimetypeList(dict(document_type="text"))
return extension_list
"""
mimetype = request_dict.get('mimetype')
extension = request_dict.get('extension')
document_type = request_dict.get('document_type')
if mimetype:
allowed_extension_list = []
for ext in guess_all_extensions(mimetype):
ext = ext.replace('.', '')
extension_list = mimemapper.getAllowedExtensionList(extension=ext,
document_type=document_type)
for extension in extension_list:
if extension not in allowed_extension_list:
allowed_extension_list.append(extension)
return allowed_extension_list
elif extension:
extension = extension.replace('.', '')
return mimemapper.getAllowedExtensionList(extension=extension,
document_type=document_type)
elif document_type:
return mimemapper.getAllowedExtensionList(document_type=document_type)
else:
return [('','')]
def run_convert(self, filename, file):
"""Method to support the old API. Wrapper getFileMetadataItemList but
returns a dict.
This is a Backwards compatibility provided for ERP5 Project, in order to
keep compatibility with OpenOffice.org Daemon.
"""
orig_format = filename.split('.')[-1]
try:
response_dict = {}
response_dict['meta'] = self.getFileMetadataItemList(file, orig_format, True)
response_dict['data'] = response_dict['meta']['Data']
del response_dict['meta']['Data']
return (200, response_dict, "")
except Exception, e:
logger.error(e)
return (402, {}, e.args[0])
def run_setmetadata(self, filename, file, meta):
"""Wrapper updateFileMetadata but returns a dict.
This is a Backwards compatibility provided for ERP5 Project, in order to
keep compatibility with OpenOffice.org Daemon.
"""
orig_format = filename.split('.')[-1]
response_dict = {}
try:
response_dict['data'] = self.updateFileMetadata(file, orig_format, meta)
return (200, response_dict, '')
except Exception, e:
logger.error(e)
return (402, {}, e.args[0])
def run_getmetadata(self, filename, file):
"""Wrapper for getFileMetadataItemList.
This is a Backwards compatibility provided for ERP5 Project, in order to
keep compatibility with OpenOffice.org Daemon.
"""
orig_format = filename.split('.')[-1]
response_dict = {}
try:
response_dict['meta'] = self.getFileMetadataItemList(file, orig_format)
return (200, response_dict, '')
except Exception, e:
logger.error(e)
return (402, {}, e.args[0])
def run_generate(self, filename, file, something, format, orig_format):
"""Wrapper convertFile but returns a dict which includes mimetype.
This is a Backwards compatibility provided for ERP5 Project, in order to keep
compatibility with OpenOffice.org Daemon.
"""
orig_format = orig_format.split('.')[-1]
if "htm" in format:
zip = True
else:
zip = False
try:
response_dict = {}
response_dict['data'] = self.convertFile(file, orig_format, format, zip)
response_dict['mime'] = self.getFileMetadataItemList(response_dict['data'],
format)['MIMEType']
return (200, response_dict, "")
except Exception, e:
logger.error(e)
return (402, {}, e.args[0])
def getAllowedTargetItemList(self, content_type):
"""Wrapper getAllowedExtensionList but returns a dict.
This is a Backwards compatibility provided for ERP5 Project, in order to
keep compatibility with OpenOffice.org Daemon.
"""
response_dict = {}
try:
extension_list = self.getAllowedExtensionList({"mimetype": content_type})
response_dict['response_data'] = extension_list
return (200, response_dict, '')
except Exception, e:
logger.error(e)
return (402, {}, e.args[0])
This diff is collapsed.
from request import MonitorRequest
from memory import MonitorMemory
from cloudooo.application.openoffice import openoffice
monitor_request = None
monitor_memory = None
def load(local_config):
"""Start the monitors"""
global monitor_request
monitor_request = MonitorRequest(openoffice,
int(local_config.get('monitor_interval')),
int(local_config.get('limit_number_request')))
monitor_request.start()
if bool(local_config.get('enable_memory_monitor')):
global monitor_memory
monitor_memory = MonitorMemory(openoffice,
int(local_config.get('monitor_interval')),
int(local_config.get('limit_memory_used')))
monitor_memory.start()
return
def stop():
"""Stop all monitors"""
if monitor_request:
monitor_request.terminate()
if monitor_memory:
monitor_memory.terminate()
clear()
def clear():
global monitor_request, monitor_memory
monitor_request = None
monitor_memory = None
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from cloudooo.monitor.monitor import Monitor
from threading import Thread
from psutil import Process
from cloudooo.utils import logger
from time import sleep
class MonitorMemory(Monitor, Thread):
"""Usefull to control the memory and does not allow use it unnecessarily"""
def __init__(self, openoffice, interval, limit_memory_usage):
"""Expects to receive an object that implements the interfaces IApplication
and ILockable, the limit of memory usage that the openoffice can use and the
interval to check the object."""
Monitor.__init__(self, openoffice, interval)
Thread.__init__(self)
self.limit = limit_memory_usage
def create_process(self):
self.process = Process(int(self.openoffice.pid()))
def get_memory_usage(self):
try:
if not hasattr(self, 'process'):
self.create_process()
elif self.process.pid != int(self.openoffice.pid()):
self.create_process()
except TypeError:
logger.debug("OpenOffice is stopped")
return 0
# convert bytes to GB
return sum(self.process.get_memory_info())/(1024*1024)
def run(self):
"""Is called by start function. this function is responsible for
controlling the amount of memory used, and if the process exceeds the limit
it is stopped forcibly
"""
self.status_flag = True
logger.debug("Start MonitorMemory")
while self.status_flag:
if self.get_memory_usage() > self.limit:
logger.debug("Stopping OpenOffice")
self.openoffice.stop()
sleep(self.interval)
logger.debug("Stop MonitorMemory")
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from zope.interface import implements
from cloudooo.interfaces.monitor import IMonitor
class Monitor(object):
""" """
implements(IMonitor)
def __init__(self, openoffice, interval):
"""Expects an openoffice object and the interval"""
self.status_flag = False
self.openoffice = openoffice
self.interval = interval
def terminate(self):
"""Set False in monitor flag"""
self.status_flag = False
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from cloudooo.monitor.monitor import Monitor
from threading import Thread
from cloudooo.utils import logger
from time import sleep
class MonitorRequest(Monitor, Thread):
"""Usefull to control the number of request in Object"""
def __init__(self, openoffice, interval, request_limit):
"""Expects to receive an object that implements the interfaces IApplication
and ILockable, the limit of request that the openoffice can receive and the
interval to check the object."""
Monitor.__init__(self, openoffice, interval)
Thread.__init__(self)
self.request_limit = request_limit
def run(self):
"""Is called by start function"""
self.status_flag = True
logger.debug("Start MonitorRequest")
while self.status_flag:
if self.openoffice.request > self.request_limit:
self.openoffice.acquire()
logger.debug("Openoffice: %s, %s will be restarted" % \
self.openoffice.getAddress())
self.openoffice.restart()
self.openoffice.release()
sleep(self.interval)
logger.debug("Stop MonitorRequest ")
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from cloudooo.monitor.monitor import Monitor
from multiprocessing import Process
from time import sleep
from cloudooo.utils import logger
class MonitorTimeout(Monitor, Process):
"""Monitors and controls the time of use of an object"""
def __init__(self, openoffice, interval):
"""Expects to receive an object that implements the interfaces IApplication
and ILockable. And the interval to check the object."""
Monitor.__init__(self, openoffice, interval)
Process.__init__(self)
def run(self):
"""Start the process"""
port = self.openoffice.getAddress()[-1]
pid = self.openoffice.pid()
logger.debug("Monitoring OpenOffice: Port %s, Pid: %s" % (port, pid))
self.status_flag = True
sleep(self.interval)
if self.openoffice.isLocked():
logger.debug("Stop OpenOffice - Port %s - Pid %s" % (port, pid))
self.openoffice.stop()
def terminate(self):
"""Stop the process"""
Monitor.terminate(self)
Process.terminate(self)
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from os import environ, putenv
from sys import path
from os.path import exists
def setUpUnoEnvironment(uno_path=None, office_bin_path=None):
"""Set up the environment to use the uno library and connect with the
openoffice by socket"""
if uno_path is not None:
environ['uno_path'] = uno_path
else:
uno_path = environ.get('uno_path')
if office_bin_path is not None:
environ['office_bin_path'] = office_bin_path
else:
office_bin_path = environ.get('office_bin_path')
# Add in sys.path the path of pyuno
if uno_path not in path:
path.append(uno_path)
fundamentalrc_file = '%s/fundamentalrc' % office_bin_path
if exists(fundamentalrc_file) and \
not environ.has_key('URE_BOOTSTRAP'):
putenv('URE_BOOTSTRAP','vnd.sun.star.pathname:%s' % fundamentalrc_file)
def createProperty(name, value):
"""Create property"""
setUpUnoEnvironment()
from com.sun.star.beans import PropertyValue
property = PropertyValue()
property.Name = name
property.Value = value
return property
def getServiceManager(host, port):
"""Get the ServiceManager from the running OpenOffice.org."""
setUpUnoEnvironment()
import uno
# Get the uno component context from the PyUNO runtime
uno_context = uno.getComponentContext()
# Create the UnoUrlResolver on the Python side.
url_resolver = "com.sun.star.bridge.UnoUrlResolver"
resolver = uno_context.ServiceManager.createInstanceWithContext(url_resolver,
uno_context)
# Connect to the running OpenOffice.org and get its
# context.
uno_connection = resolver.resolve("uno:socket,host=%s,port=%s;urp;StarOffice.ComponentContext" % (host, port))
# Get the ServiceManager object
return uno_connection.ServiceManager
def systemPathToFileUrl(path):
"""Returns a path in uno library patterns"""
setUpUnoEnvironment()
from unohelper import systemPathToFileUrl
return systemPathToFileUrl(path)
[app:main]
use = egg:cloudooo
#
## System config
#
debug_mode = True
# Folder where all cloudooo core files are installed
cloudooo_home = /usr/share/cloudooo
# Folder where pid files, lock files and virtual frame buffer mappings
# are stored. In this folder is necessary create a folder tmp, because this
# folder is used to create all temporary documents.
path_dir_run_cloudooo = /var/run/cloudooo
# Folder where OpenOffice Uno interpreter is installed
uno_path = /opt/openoffice.org3/basis-link/program
# Folder where OpenOffice Binarie is installed
office_bin_path = /opt/openoffice.org3/program
#
## Monitor Settings
#
# Limit to use the Openoffice Instance. if pass of the limit, the instance is
# stopped and another is started.
limit_number_request = 100
# Interval to check the factory
monitor_interval = 10
timeout_response = 180
enable_memory_monitor = True
# Set the limit in MB
# e.g 1000 = 1 GB, 100 = 100 MB
limit_memory_used = 1000
# Filename that will be used to test the connection with openoffice.
document_name = test.odt
#
## OOFactory Settings
#
# The pool consist of several OpenOffice.org instances
openoffice_hostname = localhost
# OpenOffice Port
openoffice_port = 4062
#
## Xvfb Settings
#
# Default port to xvfb
virtual_display_port = 6097
# ID of the virtual display where OOo instances are launched
virtual_display_id = 97
virtual_screen = 0
# Put where is the conversors-bin
unoconverter_bin = /usr/bin/unoconverter.py
unomimemapper_bin = /usr/bin/unomimemapper.py
# Python path e.g /usr/local/bin/python
python_path = /usr/bin/python
[server:main]
use = egg:PasteScript#wsgiutils
host = 0.0.0.0
port = 8011
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SARL and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
require 'xmlrpc/client'
input_filename = '../tests/data/test.doc'
output_filename = '../tests/output/ruby_test.odt'
in_data = File.read input_filename
enc_data = XMLRPC::Base64.encode in_data
server = XMLRPC::Client.new2 'http://localhost:8008'
result = server.call('convert', enc_data)
out_data = XMLRPC::Base64.decode result
out_file = File.open(output_filename, 'w')
out_file.print out_data
out_file.close
data_length = out_data.length
if data_length == 8124 || data_length == 8101
puts data_length
puts "OK"
end
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import sys
from multiprocessing import Process
from os import listdir
from cloudooo.utils import usage
from xmlrpclib import ServerProxy
from os.path import join
from getopt import getopt, GetoptError
from time import ctime, time
from base64 import encodestring
__doc__ = """
usage: python HighTestLoad.py [options]
Options:
-h, --help this help screen
-f folder Full url of folder with documents
-c concurrency Number of clients
-n requests Number of requests
-s server Server Address e.g http://ip:port/
-l filter log Folder to store logs
"""
class Log(object):
"""Object to manipulate the log file"""
def __init__(self, log_path, mode='a'):
"""Open the log file in filesystem"""
self._log = open(log_path, mode)
def msg(self, message):
"""Write the message in file"""
self._log.write(message)
def close(self):
"""Close the file"""
self._log.close()
def flush(self):
"""Flush the internal I/O buffer."""
self._log.flush()
class Client(Process):
"""Represents a client that sends requests to the server. The log file by
default is created in current path, but can be created in another path.
In log are stored:
- Date and time that the client initiated the test;
- Duration of each request;
- Total time of all requets;
- Data and time that the client finished the test;
"""
def __init__(self, address, number_request, folder, **kw):
"""Client constructor
address -- Complete address as string
e.g http://localhost:8008
number_request -- number of request that client will send to the server
folder -- Full path to folder.
e.g '/home/user/documents'
"""
Process.__init__(self)
self.address = address
self.number_of_request = number_request
self.folder = folder
log_filename = kw['log_filename'] or "%s.log" % self.name
log_folder_path = kw.get("log_folder_url", '')
log_path = join(log_folder_path, log_filename)
self.log = Log(log_path, 'w')
def run(self):
""" """
time_list = []
self.log.msg("Test Start: %s\n" % ctime())
proxy = ServerProxy(self.address)
while self.number_of_request:
folder_list = listdir(self.folder)[:self.number_of_request]
for filename in folder_list:
file_path = join(self.folder, filename)
data = encodestring(open(file_path).read())
self.log.msg("Input: %s\n" % file_path)
try:
now = time()
proxy.convertFile(data, 'odt', 'doc')
response_duration = time() - now
self.log.msg("Duration: %s\n" % response_duration)
time_list.append(response_duration)
self.log.flush()
except Exception, err:
self.log.msg("%s\n" % str(err))
self.number_of_request -= 1
self.log.msg("Test Stop: %s\n" % ctime())
self.log.msg("Total Duration: %s" % sum(time_list))
self.log.close()
def main():
help_msg = "\nUse --help or -h"
try:
opt_list, arg_list = getopt(sys.argv[1:], "hc:n:f:s:l:", ["help"])
except GetoptError, msg:
msg = msg.msg + help_msg
usage(sys.stderr, msg)
sys.exit(2)
kw = {}
for opt, arg in opt_list:
if opt in ('-h', '--help'):
usage(sys.stdout, __doc__)
sys.exit(2)
elif opt == '-c':
number_client = int(arg)
elif opt == '-n':
number_request = int(arg)
elif opt == '-f':
document_folder = arg
elif opt == '-s':
server_address = arg
elif opt == '-l':
kw['log_folder_url'] = arg
client_list = []
for num in range(number_client):
kw['log_filename'] = "client%s.log" % num
client = Client(server_address, number_request, document_folder, **kw)
client_list.append(client)
client.start()
for client in client_list:
client.join()
if __name__ == "__main__":
main()
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
from sys import path
from ConfigParser import ConfigParser
from os.path import join, exists, dirname
from os import environ, putenv
from cloudooo.application.xvfb import xvfb
from cloudooo.application.openoffice import openoffice
from cloudooo.utils import waitStartDaemon
from cloudooo.mimemapper import mimemapper
config = ConfigParser()
testcase_path = dirname(__file__)
def make_suite(test_case):
"""Function is used to run all tests together"""
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(test_case))
return suite
def loadConfig(path):
config.read(path)
def startFakeEnvironment(start_openoffice=True, conf_path=None):
"""Create a fake environment"""
server_conf_path = conf_path or join(testcase_path, "..", "samples/cloudooo.conf")
loadConfig(server_conf_path)
uno_path = config.get("app:main", "uno_path")
path_dir_run_cloudooo = config.get("app:main", "path_dir_run_cloudooo")
virtual_display_id = int(config.get("app:main", "virtual_display_id"))
virtual_display_port_int = int(config.get("app:main", "virtual_display_port"))
hostname = config.get("server:main", "host")
openoffice_port = int(config.get("app:main", "openoffice_port"))
office_bin_path = config.get("app:main", "office_bin_path")
if not environ.get('uno_path'):
environ['uno_path'] = uno_path
office_bin_path = config.get("app:main", "office_bin_path")
if not environ.get('office_bin_path'):
environ['office_bin_path'] = office_bin_path
if not uno_path in path:
path.append(uno_path)
fundamentalrc_file = '%s/fundamentalrc' % office_bin_path
if exists(fundamentalrc_file) and \
not environ.has_key('URE_BOOTSTRAP'):
putenv('URE_BOOTSTRAP','vnd.sun.star.pathname:%s' % fundamentalrc_file)
xvfb.loadSettings(hostname,
virtual_display_port_int,
path_dir_run_cloudooo,
virtual_display_id,
virtual_screen='1')
xvfb.start()
waitStartDaemon(xvfb, 10)
if start_openoffice:
openoffice.loadSettings(hostname,
openoffice_port,
path_dir_run_cloudooo,
virtual_display_id,
office_bin_path,
uno_path)
openoffice.start()
openoffice.acquire()
hostname, port = openoffice.getAddress()
kw = dict(python_path=config.get("app:main", "python_path"),
unomimemapper_bin=config.get("app:main", "unomimemapper_bin"),
uno_path=config.get("app:main", "uno_path"),
office_bin_path=config.get("app:main", "office_bin_path"))
if not mimemapper.isLoaded():
mimemapper.loadFilterList(hostname, port, **kw)
openoffice.release()
return openoffice, xvfb
return xvfb
def stopFakeEnvironment(stop_openoffice=True):
"""Stop Openoffice and Xvfb """
if stop_openoffice:
openoffice.stop()
xvfb.stop()
return True
class cloudoooTestCase(unittest.TestCase):
"""Test Case to load cloudooo conf."""
def setUp(self):
"""Creates a environment to run the tests. Is called always before the
tests."""
self.hostname = config.get("server:main", "host")
self.cloudooo_port = config.get("server:main", "port")
self.openoffice_port = config.get("app:main", "openoffice_port")
self.office_bin_path = config.get("app:main", "office_bin_path")
self.path_dir_run_cloudooo = config.get("app:main", "path_dir_run_cloudooo")
self.tmp_url = join(self.path_dir_run_cloudooo, "tmp")
self.uno_path = config.get("app:main", "uno_path")
self.unomimemapper_bin = config.get("app:main", "unomimemapper_bin")
self.unoconverter_bin = config.get("app:main", "unoconverter_bin")
self.python_path = config.get("app:main", "python_path")
self.virtual_display_id = config.get("app:main", "virtual_display_id")
self.virtual_display_port_int = config.get("app:main", "virtual_display_port")
self.afterSetUp()
def afterSetUp(self):
""" """
This diff is collapsed.
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import unittest
from cloudoooTestCase import startFakeEnvironment, stopFakeEnvironment
TestRunner = unittest.TextTestRunner
suite = unittest.TestSuite()
tests = os.listdir(os.curdir)
tests = [n[:-3] for n in tests if n.startswith('test') and n.endswith('.py')]
for test in tests:
m = __import__(test)
if hasattr(m, 'test_suite'):
suite.addTest(m.test_suite())
if __name__ == '__main__':
startFakeEnvironment()
TestRunner(verbosity=2).run(suite)
stopFakeEnvironment()
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
from xmlrpclib import ServerProxy
from subprocess import Popen, PIPE
from base64 import encodestring, decodestring
from cloudoooTestCase import cloudoooTestCase, make_suite
class TestAllFormats(cloudoooTestCase):
"""Test XmlRpc Server. Needs cloudooo server started"""
def afterSetUp(self):
"""Create connection with cloudooo server"""
self.proxy = ServerProxy("http://%s:%s/RPC2" % (self.hostname,
self.cloudooo_port))
def testTextFormats(self):
"""Test all text formats"""
self.runTestForType('odt', 'text', 'data/test.odt')
def testPresentationFormats(self):
"""Test all presentation formats"""
self.runTestForType('odp', 'presentation', 'data/test.odp')
def testDrawingFormats(self):
"""Test all drawing formats"""
self.runTestForType('odg', 'drawing', 'data/test.odg')
def testSpreadSheetFormats(self):
"""Test all spreadsheet formats"""
self.runTestForType('ods', 'spreadsheet', 'data/test.ods')
def testWebFormats(self):
"""Test all web formats"""
self.runTestForType('html', 'web', 'data/test.html')
def testGlobalFormats(self):
"""Test all global formats"""
self.runTestForType('sxg', 'global', 'data/test.sxg')
def runTestForType(self, source_format, document_type, filename):
"""Generic test"""
data = open(filename,'r').read()
request = {'document_type': document_type}
extension_list = self.proxy.getAllowedExtensionList(request)
for extension in extension_list:
data_output = self.proxy.convertFile(encodestring(data),
source_format,
extension[0])
output_file_url = 'output/test_%s.%s' % (document_type, extension[0])
open(output_file_url, 'w').write(decodestring(data_output))
stdout, stderr = Popen("file %s" % output_file_url,
shell=True,
stdout=PIPE,
stderr=PIPE).communicate()
self.assertEquals(stdout.endswith(": empty"), False, stdout)
def test_suite():
return make_suite(TestAllFormats)
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TestAllFormats)
unittest.TextTestRunner(verbosity=2).run(suite)
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
from cloudooo.application.application import Application
from os.path import exists
from cloudoooTestCase import make_suite
class TestApplication(unittest.TestCase):
def setUp(self):
"""Instantiate one application object and load settings on object"""
self.application = Application()
self.application.loadSettings('localhost', 9999, '/tmp/', '99')
def testLoadSettings(self):
"""Test if settings are defined correctly"""
self.assertEquals(self.application.hostname, 'localhost')
self.assertEquals(self.application.port, 9999)
self.assertEquals(self.application.path_run_dir, '/tmp/')
self.assertEquals(self.application.display_id, '99')
self.assertEquals(self.application.pid_filepath, "/tmp/application.pid")
def testStartTimeout(self):
"""Test if the attribute timeout is defined correctly"""
self.assertEquals(self.application.timeout, 20)
application = Application()
application.loadSettings('localhost', 9999, '/', '99', start_timeout=25)
self.assertEquals(application.timeout, 25)
def testgetAddress(self):
"""Test if getAddress() returns tuple with address correctly """
self.assertEquals(self.application.getAddress(), ('localhost', 9999))
def testPid(self):
"""As the application do not have the pid() should return None"""
self.assertEquals(self.application.pid(), None)
def testStart(self):
""" """
self.application.start()
self.assertEquals(exists(self.application.pid_filepath), True)
self.application.stop()
self.assertEquals(exists(self.application.pid_filepath), False)
def test_suite():
return make_suite(TestApplication)
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TestApplication)
unittest.TextTestRunner(verbosity=2).run(suite)
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
from subprocess import Popen, PIPE
from base64 import decodestring
from os.path import exists, join
from os import remove
from zipfile import ZipFile, is_zipfile
from cloudooo.document import FileSystemDocument
from cloudoooTestCase import make_suite
class TestFileSystemDocument(unittest.TestCase):
"""Test to class FileSystemDocument"""
def setUp(self):
"""Create data to tests and instantiated a FileSystemDocument"""
self.tmp_url = '/tmp'
self.data = decodestring("cloudooo Test")
self.fsdocument = FileSystemDocument(self.tmp_url, self.data, 'txt')
def tearDown(self):
"""Remove the file in file system"""
if self.fsdocument.getUrl() is not None:
self.fsdocument.trash()
def testRestoreOriginal(self):
"""Test if changing the document and call remake, the document back to
original state"""
old_document_url = self.fsdocument.getUrl()
document_filename = "document"
document_test_url = join(self.fsdocument.directory_name, document_filename)
open(document_test_url,'wb').write(decodestring("Test Document"))
self.fsdocument.reload(document_test_url)
self.assertEquals(exists(old_document_url), False)
self.assertNotEquals(self.fsdocument.original_data,
self.fsdocument.getContent())
old_document_url = self.fsdocument.getUrl()
self.fsdocument.restoreOriginal()
self.assertEquals(exists(old_document_url), False)
self.assertNotEquals(old_document_url, self.fsdocument.getUrl())
self.assertEquals(exists(self.fsdocument.getUrl()), True)
self.assertEquals(self.fsdocument.getContent(), self.data)
def testgetContent(self):
"""Test if returns the data correctly"""
self.assertEquals(self.fsdocument.getContent(), self.data)
def testgetUrl(self):
"""Check if the url is correct"""
url = self.fsdocument.getUrl()
self.assertEquals(exists(url), True)
def testLoadDocumentFile(self):
"""Test if the document is created correctly"""
url = self.fsdocument.getUrl()
tmp_document = open(url,'r').read()
self.assertEquals(self.data, tmp_document)
self.fsdocument.trash()
self.assertEquals(exists(url), False)
def testReload(self):
"""Change url and check if occurs correctly"""
old_document_url = self.fsdocument.getUrl()
document_filename = "document"
document_test_url = join(self.fsdocument.directory_name, document_filename)
open(document_test_url,'wb').write(self.data)
self.fsdocument.reload(document_test_url)
url = self.fsdocument.getUrl()
self.assertEquals(exists(old_document_url), False)
self.assertEquals(self.fsdocument.getContent(), self.data)
self.fsdocument.trash()
self.assertEquals(exists(url), False)
def testZipDocumentList(self):
"""Tests if the zip file is returned correctly"""
zip_output_url = 'output/ziptest.zip'
open(join(self.fsdocument.directory_name, 'document2'), 'w').write('test')
zip_file = self.fsdocument.getContent(True)
open(zip_output_url, 'w').write(zip_file)
stdout, stderr = Popen("file %s" % zip_output_url,
shell=True, stdout=PIPE).communicate()
self.assertEquals(stdout, 'output/ziptest.zip: Zip archive data, at least v2.0 to extract\n')
ziptest = ZipFile(zip_output_url, 'r')
self.assertEquals(len(ziptest.filelist), 2)
for file in ziptest.filelist:
if file.filename.endswith("document2"):
self.assertEquals(file.file_size, 4)
else:
self.assertEquals(file.file_size, 9)
remove(zip_output_url)
def testSendZipFile(self):
"""Tests if the htm is extrated from zipfile"""
zip_input_url = 'data/test.zip'
zip_output_url = 'output/zipdocument.zip'
data = open(zip_input_url).read()
zipdocument = FileSystemDocument(self.tmp_url, data, 'zip')
open(zip_output_url, 'w').write(zipdocument.getContent(True))
self.assertEquals(is_zipfile(zip_output_url), True)
zipfile = ZipFile(zip_output_url)
self.assertEquals(sorted(zipfile.namelist()),
sorted(['logo.gif','test.htm']))
def test_suite():
return make_suite(TestFileSystemDocument)
if "__main__" == __name__:
suite = unittest.TestLoader().loadTestsFromTestCase(TestFileSystemDocument)
unittest.TextTestRunner(verbosity=2).run(suite)
##############################################################################
#
# Copyright (c) 2002-2010 Nexedi SA and Contributors. All Rights Reserved.
# Gabriel M. Monnerat <gmonnerat@iff.edu.br>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import unittest
from cloudooo.filter import Filter
from cloudoooTestCase import make_suite
class TestFilter(unittest.TestCase):
"""Test filter and your interface"""
def setUp(self):
"""Instatiated Filter with properties"""
extension = 'pdf'
filter = 'writer_pdf_Export'
mimetype = 'application/pdf'
document_type = "text"
preferred = True
sort_index = 1000
self.filter = Filter(extension, filter, mimetype, document_type,
preferred=preferred, sort_index=sort_index)
def testFilter(self):
"""Tests filter gets"""
self.assertEquals(self.filter.getExtension(), 'pdf')
self.assertEquals(self.filter.getName(), 'writer_pdf_Export')
self.assertEquals(self.filter.getMimetype(), 'application/pdf')
self.assertEquals(self.filter.getSortIndex(), 1000)
self.assertEquals(self.filter.isPreferred(), True)
def test_suite():
return make_suite(TestFilter)
if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(TestFilter)
unittest.TextTestRunner(verbosity=2).run(suite)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment