Commit eec5a29e authored by Carlos Ramos Carreño's avatar Carlos Ramos Carreño

Refactor CloudOoo tests.

- Each class now correspond to an instance with different configuration,
and possibly several test methods.
- Upgraded to more recent pypdf version.
- The code has been reviewed to reduce complexity and repetition.
- Adds documentation to all public functions, in Google style.
- Some other cosmetic changes have been made for
consistency (consistent parentheses indentation, consistent string
literal quotes), courtesy of ruff.

See merge request nexedi/slapos!1628
parent 1ad0bc4a
Pipeline #36591 failed with stage
in 0 seconds
...@@ -26,27 +26,28 @@ ...@@ -26,27 +26,28 @@
############################################################################## ##############################################################################
from setuptools import setup, find_packages from setuptools import setup, find_packages
version = '0.0.1.dev0' version = "0.0.1.dev0"
name = 'slapos.test.cloudooo' name = "slapos.test.cloudooo"
with open("README.md") as f: with open("README.md") as f:
long_description = f.read() long_description = f.read()
setup(name=name, setup(
version=version, name=name,
description="Test for SlapOS' cloudooo", version=version,
long_description=long_description, description="Test for SlapOS' CloudOoo",
long_description_content_type='text/markdown', long_description=long_description,
maintainer="Nexedi", long_description_content_type="text/markdown",
maintainer_email="info@nexedi.com", maintainer="Nexedi",
url="https://lab.nexedi.com/nexedi/slapos", maintainer_email="info@nexedi.com",
packages=find_packages(), url="https://lab.nexedi.com/nexedi/slapos",
install_requires=[ packages=find_packages(),
'slapos.core', install_requires=[
'slapos.cookbook', "slapos.core",
'slapos.libnetworkcache', "slapos.cookbook",
'requests', "slapos.libnetworkcache",
'PyPDF2', "requests",
], "pypdf",
zip_safe=True, ],
test_suite='test', zip_safe=True,
) test_suite="test",
)
...@@ -24,394 +24,550 @@ ...@@ -24,394 +24,550 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# #
############################################################################## ##############################################################################
from __future__ import annotations import base64
import codecs import codecs
import csv import csv
import multiprocessing import io
import os
import json import json
import xmlrpc.client as xmlrpclib import multiprocessing
import urllib.parse as urllib_parse
import ssl import ssl
import base64
import io
import textwrap import textwrap
from typing import Mapping import urllib.parse as urllib_parse
import xmlrpc.client as xmlrpclib
from functools import partial
from pathlib import Path
from typing import AbstractSet, Callable, ClassVar, Dict, Iterable, Mapping
import requests
import PIL.Image import PIL.Image
import PyPDF2 import pypdf
import requests
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
from slapos.testing.utils import ImageComparisonTestCase from slapos.testing.utils import ImageComparisonTestCase
setUpModule, _CloudOooTestCase = makeModuleSetUpAndTestCaseClass( setUpModule, _CloudOooTestCase = makeModuleSetUpAndTestCaseClass(
os.path.abspath( str((Path(__file__).parent.parent / "software.cfg").absolute())
os.path.join(os.path.dirname(__file__), '..', 'software.cfg'))) )
def open_cloudooo_connection(url):
"""
Open a RPC connection with Cloudooo.
Args:
url: The URL of the CloudOoo server.
Returns:
A object that manages communication with CloudOoo via XML-RCP.
"""
# XXX ignore certificate errors
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
return xmlrpclib.ServerProxy(
url,
context=ssl_context,
allow_none=True,
)
def convert_file(
file,
source_format,
destination_format,
*,
zip=False,
refresh=False,
conversion_kw={},
server,
):
"""
Converts a file using CloudOoo.
This is a helper function that does the necessary encoding/decoding,
providing type-safety and keyword arguments.
Args:
file: The file contents to send to CloudOoo.
source_format: Format of the input file.
destination_format: Format of the output file.
zip: Whether a zip file should be returned.
refresh: Whether dynamic properties of document should be replaced
before conversion.
conversion_kw: Additional arguments for the conversion.
server: The server used to send requests to Cloudooo.
Returns:
Contents of the converted file.
"""
converted_file = server.convertFile(
base64.encodebytes(file).decode(),
source_format,
destination_format,
zip,
refresh,
conversion_kw,
)
assert isinstance(converted_file, str)
return base64.decodebytes(converted_file.encode())
class CloudOooTestCase(_CloudOooTestCase): class CloudOooTestCase(_CloudOooTestCase):
"""
Parent class for all CloudOoo tests.
This class sets some attributes in the `setUp` method that are necessary
for testing CloudOoo.
Attributes:
url: The URL of the CloudOoo server.
server: A object that manages communication with CloudOoo via XML-RCP.
"""
# Cloudooo needs a lot of time before being available. # Cloudooo needs a lot of time before being available.
instance_max_retry = 30 instance_max_retry = 30
def setUp(self): def setUp(self):
self.url = json.loads( self.url = json.loads(self.computer_partition.getConnectionParameterDict()["_"])[
self.computer_partition.getConnectionParameterDict()["_"])['cloudooo'] "cloudooo"
# XXX ignore certificate errors ]
ssl_context = ssl.create_default_context() self.server = open_cloudooo_connection(self.url)
ssl_context.check_hostname = False self.addCleanup(self.server("close"))
ssl_context.verify_mode = ssl.CERT_NONE
self.server = xmlrpclib.ServerProxy( def convert_file(
self.url, self,
context=ssl_context, file,
allow_none=True, source_format,
destination_format,
*,
zip=False,
refresh=False,
conversion_kw={},
):
"""
Converts a file using CloudOoo.
This is a helper method that does the necessary encoding/decoding,
providing type-safety and keyword arguments.
Args:
file: The file contents to send to CloudOoo.
source_format: Format of the input file.
destination_format: Format of the output file.
zip: Whether a zip file should be returned.
refresh: Whether dynamic properties of document should be replaced
before conversion.
conversion_kw: Additional arguments for the conversion.
Returns:
Contents of the converted file.
"""
return convert_file(
file=file,
source_format=source_format,
destination_format=destination_format,
zip=zip,
refresh=refresh,
conversion_kw=conversion_kw,
server=self.server,
) )
self.addCleanup(self.server('close'))
def script_test_basic(self):
"""
Tries to execute a hello world script.
Returns:
The file contents in base64. If the script is executed
properly, it should contain the string ``"Hello World"``,
preceded by the UTF-8 BOM and with a trailing newline.
"""
script = textwrap.dedent(
"""\
# Get the XText interface
text = Document.Text
# Create an XTextRange at the end of the document
tRange = text.End
def script_test_basic(server: xmlrpclib.ServerProxy) -> bytes: # Set the string
tRange.String = "Hello World"
""",
)
return self.convert_file(
b"<html></html>",
"html",
"txt",
conversion_kw={"script": script},
)
QT_FONT_MAPPING = {
"Arial": "LiberationSans",
"Arial Black": "LiberationSans",
"Avant Garde": "LiberationSans",
"Bookman": "LiberationSans",
"Carlito": "Carlito",
"Comic Sans MS": "LiberationSans",
"Courier New": "LiberationSans",
"DejaVu Sans": "DejaVuSans",
"DejaVu Sans Condensed": "LiberationSans",
"DejaVu Sans Mono": "DejaVuSansMono",
"DejaVu Serif": "DejaVuSerif",
"DejaVu Serif Condensed": "LiberationSans",
"Garamond": "LiberationSans",
"Gentium Basic": "GentiumBasic",
"Gentium Book Basic": "GentiumBookBasic",
"Georgia": "LiberationSans",
"Helvetica": "LiberationSans",
"IPAex Gothic": "LiberationSans",
"IPAex Mincho": "LiberationSans",
"Impact": "LiberationSans",
"Liberation Mono": "LiberationMono",
"Liberation Sans": "LiberationSans",
"Liberation Sans Narrow": "LiberationSansNarrow",
"Liberation Serif": "LiberationSerif",
"Linux LibertineG": "LiberationSans",
"OpenSymbol": {"NotoSans-Regular", "OpenSymbol"},
"Palatino": "LiberationSans",
"Roboto Black": "LiberationSans",
"Roboto Condensed Light": "LiberationSans",
"Roboto Condensed": "RobotoCondensed-Regular",
"Roboto Light": "LiberationSans",
"Roboto Medium": "LiberationSans",
"Roboto Thin": "LiberationSans",
"Times New Roman": "LiberationSans",
"Trebuchet MS": "LiberationSans",
"Verdana": "LiberationSans",
"ZZZdefault fonts when no match": "LiberationSans",
}
LIBREOFFICE_FONT_MAPPING = {
"Arial": "LiberationSans",
"Arial Black": "NotoSans-Regular",
"Avant Garde": "NotoSans-Regular",
"Bookman": "NotoSans-Regular",
"Carlito": "Carlito",
"Comic Sans MS": "NotoSans-Regular",
"Courier New": "LiberationMono",
"DejaVu Sans": "DejaVuSans",
"DejaVu Sans Condensed": "DejaVuSansCondensed",
"DejaVu Sans Mono": "DejaVuSansMono",
"DejaVu Serif": "DejaVuSerif",
"DejaVu Serif Condensed": "DejaVuSerifCondensed",
"Garamond": "NotoSerif-Regular",
"Gentium Basic": "GentiumBasic",
"Gentium Book Basic": "GentiumBookBasic",
"Georgia": "NotoSerif-Regular",
"Helvetica": "LiberationSans",
"IPAex Gothic": "IPAexGothic",
"IPAex Mincho": "IPAexMincho",
"Impact": "NotoSans-Regular",
"Liberation Mono": "LiberationMono",
"Liberation Sans": "LiberationSans",
"Liberation Sans Narrow": "LiberationSansNarrow",
"Liberation Serif": "LiberationSerif",
"Linux LibertineG": "LinuxLibertineG",
"OpenSymbol": {"OpenSymbol", "IPAMincho"},
"Palatino": "NotoSerif-Regular",
"Roboto Black": "Roboto-Black",
"Roboto Condensed Light": "RobotoCondensed-Light",
"Roboto Condensed": "RobotoCondensed-Regular",
"Roboto Light": "Roboto-Light",
"Roboto Medium": "Roboto-Medium",
"Roboto Thin": "Roboto-Thin",
"Times New Roman": "LiberationSerif",
"Trebuchet MS": "NotoSans-Regular",
"Verdana": "NotoSans-Regular",
"ZZZdefault fonts when no match": "NotoSans-Regular",
}
def normalize_font_name(font_name):
""" """
Tries to execute a hello world script. Normalize a font name.
As with other PostScript markup, font names are written with a leading
slash symbol ("/"), which has to be stripped to obtain the conventional font
name.
Moreover, the standard allows also to define "font subsets", for which a tag
followed by a plus sign ("+") precedes the actual font name:
https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf#page=266
This tag is also removed by this function.
Args: Args:
server: The server used to send requests to Cloudooo. font_name: The font name to normalize.
Returns: Returns:
The file contents in base64. If the script is executed Normalized font name.
properly, it should contain the string ``"Hello World"``,
preceded by the UTF-8 BOM and with a trailing newline.
"""
script = textwrap.dedent(
"""\
# Get the XText interface
text = Document.Text
# Create an XTextRange at the end of the document """
tRange = text.End if "+" in font_name:
return font_name.split("+")[1]
# Set the string if font_name.startswith("/"):
tRange.String = "Hello World" return font_name[1:]
""",
)
file = server.convertFile( raise ValueError("Invalid font name")
base64.encodebytes(b"<html></html>").decode(),
"html",
"txt",
False, # zip
False, # refresh
{"script": script},
)
assert isinstance(file, str)
return file.encode() def get_referenced_fonts(
pdf_file_reader,
):
"""
Return fonts referenced in a pdf.
Returns a set with all font names (normalized) present in a PDF.
def normalizeFontName(font_name): Args:
if '+' in font_name: pdf_file_reader: PDF reader.
return font_name.split('+')[1]
if font_name.startswith('/'):
return font_name[1:]
Returns:
Set of font names present in the PDF.
def getReferencedFonts(pdf_file_reader):
"""Return fonts referenced in this pdf
""" """
fonts = set()
def collectFonts(obj): def fonts_in_obj(obj):
"""Recursively visit PDF objects and collect referenced fonts in `fonts`
""" """
if hasattr(obj, 'keys'): Yield fonts from a PDF object.
if '/BaseFont' in obj:
fonts.add(obj['/BaseFont'])
for k in obj.keys():
collectFonts(obj[k])
for page in pdf_file_reader.pages: Recursively visit PDF objects and yield referenced fonts in `fonts`.
collectFonts(page.getObject()['/Resources'])
return {normalizeFontName(font) for font in fonts} Args:
obj: A reference to a PDF object.
Yields:
The font names in the object.
class HTMLtoPDFConversionFontTestMixin: """
"""Mix-In class to test how fonts are selected during if hasattr(obj, "keys"):
HTML to PDF conversions. if "/BaseFont" in obj:
yield obj["/BaseFont"]
for k in obj.keys():
yield from fonts_in_obj(obj[k])
This needs to be mixed with a test case defining: return {
normalize_font_name(font)
for page in pdf_file_reader.pages
for font in fonts_in_obj(page.get_object()["/Resources"])
}
* pdf_producer : the name of /Producer in PDF metadata
* expected_font_mapping : a mapping of resulting font name in pdf,
keyed by font-family in the input html
* _convert_html_to_pdf: a method to to convert html to pdf
"""
def _convert_html_to_pdf(self, src_html):
# type: (str) -> bytes
"""Convert the HTML source to pdf bytes.
"""
def test(self): class TestDefaultInstance(CloudOooTestCase, ImageComparisonTestCase):
"""Tests for CloudOoo instance with default configuration."""
__partition_reference__ = "co_default"
def assert_pdf_conversion_metadata(
self,
convert_html_to_pdf,
*,
expected_producer,
expected_font_mapping,
):
actual_font_mapping_mapping = {} actual_font_mapping_mapping = {}
for font in self.expected_font_mapping:
src_html = f''' for font in expected_font_mapping:
<style> src_html = f"""
p {{ font-family: "{font}"; font-size: 20pt; }} <style>
</style> p {{ font-family: "{font}"; font-size: 20pt; }}
<p>the quick brown fox jumps over the lazy dog.</p> </style>
<p>THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.</p> <p>the quick brown fox jumps over the lazy dog.</p>
''' <p>THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.</p>
"""
pdf_data = self._convert_html_to_pdf(src_html)
pdf_reader = PyPDF2.PdfFileReader(io.BytesIO(pdf_data)) pdf_data = convert_html_to_pdf(src_html.encode())
pdf_reader = pypdf.PdfReader(io.BytesIO(pdf_data))
metadata = pdf_reader.metadata
assert metadata
self.assertEqual( self.assertEqual(
self.pdf_producer, metadata.producer,
pdf_reader.getDocumentInfo()['/Producer']) expected_producer,
fonts_in_pdf = getReferencedFonts(pdf_reader) )
fonts_in_pdf = get_referenced_fonts(pdf_reader)
font_or_set = fonts_in_pdf
if len(fonts_in_pdf) == 1: if len(fonts_in_pdf) == 1:
actual_font_mapping_mapping[font] = fonts_in_pdf.pop() # Tuple unpacking
else: (font_or_set,) = fonts_in_pdf
actual_font_mapping_mapping[font] = fonts_in_pdf
self.maxDiff = None
self.assertEqual(self.expected_font_mapping, actual_font_mapping_mapping)
class TestWkhtmlToPDF(HTMLtoPDFConversionFontTestMixin, CloudOooTestCase):
__partition_reference__ = 'wk'
pdf_producer = 'Qt 4.8.7'
expected_font_mapping = {
'Arial': 'LiberationSans',
'Arial Black': 'LiberationSans',
'Avant Garde': 'LiberationSans',
'Bookman': 'LiberationSans',
'Carlito': 'Carlito',
'Comic Sans MS': 'LiberationSans',
'Courier New': 'LiberationSans',
'DejaVu Sans': 'DejaVuSans',
'DejaVu Sans Condensed': 'LiberationSans',
'DejaVu Sans Mono': 'DejaVuSansMono',
'DejaVu Serif': 'DejaVuSerif',
'DejaVu Serif Condensed': 'LiberationSans',
'Garamond': 'LiberationSans',
'Gentium Basic': 'GentiumBasic',
'Gentium Book Basic': 'GentiumBookBasic',
'Georgia': 'LiberationSans',
'Helvetica': 'LiberationSans',
'IPAex Gothic': 'LiberationSans',
'IPAex Mincho': 'LiberationSans',
'Impact': 'LiberationSans',
'Liberation Mono': 'LiberationMono',
'Liberation Sans': 'LiberationSans',
'Liberation Sans Narrow': 'LiberationSansNarrow',
'Liberation Serif': 'LiberationSerif',
'Linux LibertineG': 'LiberationSans',
'OpenSymbol': {'NotoSans-Regular', 'OpenSymbol'},
'Palatino': 'LiberationSans',
'Roboto Black': 'LiberationSans',
'Roboto Condensed Light': 'LiberationSans',
'Roboto Condensed': 'RobotoCondensed-Regular',
'Roboto Light': 'LiberationSans',
'Roboto Medium': 'LiberationSans',
'Roboto Thin': 'LiberationSans',
'Times New Roman': 'LiberationSans',
'Trebuchet MS': 'LiberationSans',
'Verdana': 'LiberationSans',
'ZZZdefault fonts when no match': 'LiberationSans'
}
def _convert_html_to_pdf(self, src_html): actual_font_mapping_mapping[font] = font_or_set
return base64.decodebytes(
self.server.convertFile( self.assertEqual(actual_font_mapping_mapping, expected_font_mapping)
base64.encodebytes(src_html.encode()).decode(),
'html', def html_to_pdf_wkhtmltopdf_convert(self, src_html):
'pdf', """HTML to PDF conversion using wkhtmltopdf."""
False, return self.convert_file(
False, src_html,
{ "html",
'encoding': 'utf-8' "pdf",
}, conversion_kw={"encoding": "utf-8"},
).encode()) )
def test_html_to_pdf_wkhtmltopdf(self):
class TestLibreoffice(HTMLtoPDFConversionFontTestMixin, CloudOooTestCase): """Test HTML to PDF conversion using wkhtmltopdf."""
__partition_reference__ = 'lo' self.assert_pdf_conversion_metadata(
pdf_producer = 'LibreOffice 7.5' self.html_to_pdf_wkhtmltopdf_convert,
expected_font_mapping = { expected_producer="Qt 4.8.7",
'Arial': 'LiberationSans', expected_font_mapping=QT_FONT_MAPPING,
'Arial Black': 'NotoSans-Regular', )
'Avant Garde': 'NotoSans-Regular',
'Bookman': 'NotoSans-Regular', def html_to_pdf_libreoffice_convert(self, src_html):
'Carlito': 'Carlito', """HTML to PDF conversion using LibreOffice."""
'Comic Sans MS': 'NotoSans-Regular', return self.convert_file(
'Courier New': 'LiberationMono', src_html,
'DejaVu Sans': 'DejaVuSans', "html",
'DejaVu Sans Condensed': 'DejaVuSansCondensed', "pdf",
'DejaVu Sans Mono': 'DejaVuSansMono', )
'DejaVu Serif': 'DejaVuSerif',
'DejaVu Serif Condensed': 'DejaVuSerifCondensed', def test_html_to_pdf_libreoffice_convert(self):
'Garamond': 'NotoSerif-Regular', """Test HTML to PDF conversion using wkhtmltopdf."""
'Gentium Basic': 'GentiumBasic', self.assert_pdf_conversion_metadata(
'Gentium Book Basic': 'GentiumBookBasic', self.html_to_pdf_libreoffice_convert,
'Georgia': 'NotoSerif-Regular', expected_producer="LibreOffice 7.5",
'Helvetica': 'LiberationSans', expected_font_mapping=LIBREOFFICE_FONT_MAPPING,
'IPAex Gothic': 'IPAexGothic', )
'IPAex Mincho': 'IPAexMincho',
'Impact': 'NotoSans-Regular',
'Liberation Mono': 'LiberationMono',
'Liberation Sans': 'LiberationSans',
'Liberation Sans Narrow': 'LiberationSansNarrow',
'Liberation Serif': 'LiberationSerif',
'Linux LibertineG': 'LinuxLibertineG',
'OpenSymbol': {'OpenSymbol', 'IPAMincho'},
'Palatino': 'NotoSerif-Regular',
'Roboto Black': 'Roboto-Black',
'Roboto Condensed Light': 'RobotoCondensed-Light',
'Roboto Condensed': 'RobotoCondensed-Regular',
'Roboto Light': 'Roboto-Light',
'Roboto Medium': 'Roboto-Medium',
'Roboto Thin': 'Roboto-Thin',
'Times New Roman': 'LiberationSerif',
'Trebuchet MS': 'NotoSans-Regular',
'Verdana': 'NotoSans-Regular',
'ZZZdefault fonts when no match': 'NotoSans-Regular'
}
def _convert_html_to_pdf(self, src_html): def test_draw_to_png(self):
return base64.decodebytes( """Test Draw's ODG to PNG conversion."""
self.server.convertFile( reference_png = PIL.Image.open("data/test_draw_to_png.png")
base64.encodebytes(src_html.encode()).decode(), with open("data/test_draw_to_png.odg", "rb") as f:
'html', actual_png_data = self.convert_file(
'pdf', f.read(),
).encode()) "odg",
"png",
)
class TestLibreofficeDrawToPNGConversion(CloudOooTestCase, ImageComparisonTestCase):
__partition_reference__ = 'l'
def test(self):
reference_png = PIL.Image.open(os.path.join('data', f'{self.id()}.png'))
with open(os.path.join('data', f'{self.id()}.odg'), 'rb') as f:
actual_png_data = base64.decodebytes(
self.server.convertFile(
base64.encodebytes(f.read()).decode(),
'odg',
'png',
).encode())
actual_png = PIL.Image.open(io.BytesIO(actual_png_data)) actual_png = PIL.Image.open(io.BytesIO(actual_png_data))
# save a snapshot # Save a snapshot
with open(os.path.join(self.computer_partition_root_path, self.id() + '.png'), 'wb') as f: with open(
Path(self.computer_partition_root_path) / "test_draw_to_png.png",
"wb",
) as f:
f.write(actual_png_data) f.write(actual_png_data)
self.assertImagesSame(actual_png, reference_png) self.assertImagesSame(actual_png, reference_png)
class TestLibreOfficeTextConversion(CloudOooTestCase):
__partition_reference__ = 'txt'
def test_html_to_text(self): def test_html_to_text(self):
"""Test HTML to TXT conversion."""
file_content = self.convert_file(
"<html>héhé</html>".encode(),
"html",
"txt",
)
self.assertEqual( self.assertEqual(
base64.decodebytes( file_content,
self.server.convertFile( codecs.BOM_UTF8 + b"h\xc3\xa9h\xc3\xa9\n",
base64.encodebytes( )
'<html>héhé</html>'.encode()).decode(),
'html', def test_scripting_disabled(self):
'txt', """Test that the basic script raises when scripting is disabled."""
).encode()), with self.assertRaisesRegex(Exception, "ooo: scripting is disabled"):
codecs.BOM_UTF8 + b'h\xc3\xa9h\xc3\xa9\n', self.script_test_basic()
def _convert_html_to_text(
src_html,
*,
url,
):
"""
Convert HTML to TXT.
This is a helper method for using with map.
Args:
src_html: HTML to convert.
url: URL of the CloudOoo server.
Returns:
Converted file contents.
"""
with open_cloudooo_connection(url) as server:
return convert_file(
src_html,
"html",
"txt",
server=server,
) )
class TestLibreOfficeCluster(CloudOooTestCase): class TestLibreOfficeCluster(CloudOooTestCase):
__partition_reference__ = 'lc' """Class for testing a cluster with multiple backends."""
__partition_reference__ = "co_cluster"
@classmethod @classmethod
def getInstanceParameterDict(cls): def getInstanceParameterDict(cls):
return {'backend-count': 4} return {"backend-count": 4}
def test_multiple_conversions(self): def test_multiple_conversions(self):
# make this function global so that it can be picked and used by multiprocessing """Test that concurrent requests are distributed in the cluster."""
global _convert_html_to_text
def _convert_html_to_text(src_html):
return base64.decodebytes(
self.server.convertFile(
base64.encodebytes(src_html.encode()).decode(),
'html',
'txt',
).encode())
pool = multiprocessing.Pool(5) pool = multiprocessing.Pool(5)
with pool: with pool:
converted = pool.map( converted = pool.map(
_convert_html_to_text, partial(_convert_html_to_text, url=self.url),
['<html><body>hello</body></html>'] * 100) [b"<html><body>hello</body></html>"] * 100,
)
self.assertEqual(converted, [codecs.BOM_UTF8 + b'hello\n'] * 100) self.assertEqual(converted, [codecs.BOM_UTF8 + b"hello\n"] * 100)
# haproxy stats are exposed # Haproxy stats are exposed
res = requests.get( res = requests.get(
urllib_parse.urljoin(self.url, '/haproxy;csv'), urllib_parse.urljoin(self.url, "/haproxy;csv"),
verify=False, verify=False,
) )
reader = csv.DictReader(io.StringIO(res.text)) reader = csv.DictReader(io.StringIO(res.text))
line_list = list(reader) line_list = list(reader)
# requests have been balanced # Requests have been balanced
total_hrsp_2xx = { total_hrsp_2xx = {line["svname"]: int(line["hrsp_2xx"]) for line in line_list}
line['svname']: int(line['hrsp_2xx']) self.assertEqual(total_hrsp_2xx["FRONTEND"], 100)
for line in line_list self.assertEqual(total_hrsp_2xx["BACKEND"], 100)
} for backend in "cloudooo_1", "cloudooo_2", "cloudooo_3", "cloudooo_4":
self.assertEqual(total_hrsp_2xx['FRONTEND'], 100) # Ideally there should be 25% of requests on each backend, because we use
self.assertEqual(total_hrsp_2xx['BACKEND'], 100)
for backend in 'cloudooo_1', 'cloudooo_2', 'cloudooo_3', 'cloudooo_4':
# ideally there should be 25% of requests on each backend, because we use
# round robin scheduling, but it can happen that some backend take longer # round robin scheduling, but it can happen that some backend take longer
# to start, so we are tolerant here and just check that each backend # to start, so we are tolerant here and just check that each backend
# process at least one request. # process at least one request.
self.assertGreater(total_hrsp_2xx[backend], 0) self.assertGreater(total_hrsp_2xx[backend], 0)
# no errors # No errors
total_eresp = { total_eresp = {line["svname"]: int(line["eresp"] or 0) for line in line_list}
line['svname']: int(line['eresp'] or 0)
for line in line_list
}
self.assertEqual( self.assertEqual(
total_eresp, { total_eresp,
'FRONTEND': 0, {
'cloudooo_1': 0, "FRONTEND": 0,
'cloudooo_2': 0, "cloudooo_1": 0,
'cloudooo_3': 0, "cloudooo_2": 0,
'cloudooo_4': 0, "cloudooo_3": 0,
'BACKEND': 0, "cloudooo_4": 0,
}) "BACKEND": 0,
},
)
class TestLibreOfficeScripting(CloudOooTestCase): class TestLibreOfficeScripting(CloudOooTestCase):
"""Class with scripting enabled, to try that functionality.""" """Class with scripting enabled, to try that functionality."""
__partition_reference__ = "se"
__partition_reference__ = "co_script"
@classmethod @classmethod
def getInstanceParameterDict(cls) -> Mapping[str, object]: def getInstanceParameterDict(cls):
"""Enable scripting for this instance.""" """Enable scripting for this instance."""
return {"enable-scripting": True} return {"enable-scripting": True}
def test_scripting_basic(self) -> None: def test_scripting_basic(self):
"""Test that the basic script works.""" """Test that the basic script works."""
file = script_test_basic(self.server) file = self.script_test_basic()
self.assertEqual( self.assertEqual(
base64.decodebytes(file), file,
codecs.BOM_UTF8 + b"Hello World\n", codecs.BOM_UTF8 + b"Hello World\n",
) )
class TestScriptingDisabled(CloudOooTestCase):
"""Class with scripting disabled (the default), to test that."""
__partition_reference__ = "sd"
def test_scripting_disabled(self) -> None:
"""Test that the basic script raises when scripting is disabled."""
with self.assertRaisesRegex(Exception, "ooo: scripting is disabled"):
script_test_basic(self.server)
...@@ -410,11 +410,6 @@ eggs += ...@@ -410,11 +410,6 @@ eggs +=
# custom eggs pre-installed, not our special python interpreter. # custom eggs pre-installed, not our special python interpreter.
interpreter = python_for_test interpreter = python_for_test
# patches for eggs
patch-binary = ${patch:location}/bin/patch
PyPDF2-patches = ${:_profile_base_location_}/../../component/egg-patch/PyPDF2/0001-Custom-implementation-of-warnings.formatwarning-remo.patch#d25bb0f5dde7f3337a0a50c2f986f5c8
PyPDF2-patch-options = -p1
[eggs/scripts] [eggs/scripts]
recipe = zc.recipe.egg recipe = zc.recipe.egg
eggs = ${python-interpreter:eggs} eggs = ${python-interpreter:eggs}
...@@ -514,6 +509,7 @@ Pillow = 10.2.0+SlapOSPatched001 ...@@ -514,6 +509,7 @@ Pillow = 10.2.0+SlapOSPatched001
forcediphttpsadapter = 1.0.1 forcediphttpsadapter = 1.0.1
image = 1.5.25 image = 1.5.25
plantuml = 0.3.0:whl plantuml = 0.3.0:whl
pypdf = 3.6.0:whl
pysftp = 0.2.9 pysftp = 0.2.9
requests-toolbelt = 0.8.0 requests-toolbelt = 0.8.0
testfixtures = 6.11.0 testfixtures = 6.11.0
...@@ -521,6 +517,3 @@ mysqlclient = 2.1.1 ...@@ -521,6 +517,3 @@ mysqlclient = 2.1.1
paho-mqtt = 1.5.0 paho-mqtt = 1.5.0
pcpp = 1.30 pcpp = 1.30
xmltodict = 0.13.0 xmltodict = 0.13.0
# Patched eggs
PyPDF2 = 1.26.0+SlapOSPatched001
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment