Commit f6399d7c authored by Jérome Perrin's avatar Jérome Perrin

software/erp5/test: add python3 support

Also fix a few typos and type annotations
parent c735a507
......@@ -24,6 +24,7 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
from __future__ import absolute_import
from setuptools import setup, find_packages
version = '0.0.1.dev0'
......@@ -52,7 +53,6 @@ setup(name=name,
'cryptography',
'pexpect',
'pyOpenSSL',
'typing; python_version<"3"',
],
test_suite='test',
)
......@@ -25,6 +25,7 @@
#
##############################################################################
from __future__ import absolute_import
import json
import os
......
from __future__ import absolute_import
import glob
import hashlib
import json
......@@ -8,10 +9,8 @@ import shutil
import subprocess
import tempfile
import time
import urllib
import urlparse
from BaseHTTPServer import BaseHTTPRequestHandler
from typing import Dict
import six.moves.urllib.request, six.moves.urllib.parse, six.moves.urllib.error
from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
import mock
import OpenSSL.SSL
......@@ -28,6 +27,7 @@ from slapos.testing.utils import (CrontabMixin, ManagedHTTPServer,
findFreeTCPPort)
from . import ERP5InstanceTestCase, setUpModule
from six.moves import range
setUpModule # pyflakes
......@@ -44,10 +44,10 @@ class EchoHTTPServer(ManagedHTTPServer):
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': self.headers.dict
'Incoming Headers': dict(self.headers.items()),
},
indent=2,
)
).encode('utf-8')
self.end_headers()
self.wfile.write(response)
......@@ -67,11 +67,11 @@ class EchoHTTP11Server(ManagedHTTPServer):
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': self.headers.dict
'Incoming Headers': dict(self.headers.items()),
},
indent=2,
)
self.send_header("Content-Length", len(response))
).encode('utf-8')
self.send_header("Content-Length", str(len(response)))
self.end_headers()
self.wfile.write(response)
......@@ -139,7 +139,7 @@ class BalancerTestCase(ERP5InstanceTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
return {
'tcpv4-port': 8000,
'computer-memory-percent-threshold': 100,
......@@ -174,10 +174,11 @@ class BalancerTestCase(ERP5InstanceTestCase):
@classmethod
def getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
return {'_': json.dumps(cls._getInstanceParameterDict())}
def setUp(self):
# type: () -> None
self.default_balancer_url = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])['default']
......@@ -192,7 +193,7 @@ class SlowHTTPServer(ManagedHTTPServer):
self.send_header("Content-Type", "text/plain")
time.sleep(2)
self.end_headers()
self.wfile.write("OK\n")
self.wfile.write(b"OK\n")
log_message = logging.getLogger(__name__ + '.SlowHandler').info
......@@ -203,7 +204,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
__partition_reference__ = 'l'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
parameter_dict = super(TestLog, cls)._getInstanceParameterDict()
# use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
......@@ -212,7 +213,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
def test_access_log_format(self):
# type: () -> None
requests.get(
urlparse.urljoin(self.default_balancer_url, '/url_path'),
six.moves.urllib.parse.urljoin(self.default_balancer_url, '/url_path'),
verify=False,
)
time.sleep(.5) # wait a bit more until access is logged
......@@ -285,6 +286,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
self.assertFalse(os.path.exists(rotated_log_file))
def test_error_log(self):
# type: () -> None
# stop backend server
backend_server = self.getManagedResource("slow_web_server", SlowHTTPServer)
self.addCleanup(backend_server.open)
......@@ -326,7 +328,7 @@ class BalancerCookieHTTPServer(ManagedHTTPServer):
# The name of this cookie is SERVERID
assert self.headers['X-Balancer-Current-Cookie'] == 'SERVERID'
self.end_headers()
self.wfile.write(server._name)
self.wfile.write(server._name.encode('utf-8'))
log_message = logging.getLogger(__name__ + '.BalancerCookieHTTPServer').info
return RequestHandler
......@@ -338,7 +340,7 @@ class TestBalancer(BalancerTestCase):
__partition_reference__ = 'b'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
parameter_dict = super(TestBalancer, cls)._getInstanceParameterDict()
# use two backend servers
......@@ -349,6 +351,7 @@ class TestBalancer(BalancerTestCase):
return parameter_dict
def test_balancer_round_robin(self):
# type: () -> None
# requests are by default balanced to both servers
self.assertEqual(
{requests.get(self.default_balancer_url, verify=False).text for _ in range(10)},
......@@ -356,6 +359,7 @@ class TestBalancer(BalancerTestCase):
)
def test_balancer_server_down(self):
# type: () -> None
# if one backend is down, it is excluded from balancer
self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close()
self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open)
......@@ -365,14 +369,16 @@ class TestBalancer(BalancerTestCase):
)
def test_balancer_set_cookie(self):
# type: () -> None
# if backend provides a "SERVERID" cookie, balancer will overwrite it with the
# backend selected by balancing algorithm
self.assertIn(
requests.get(urlparse.urljoin(self.default_balancer_url, '/set_cookie'), verify=False).cookies['SERVERID'],
requests.get(six.moves.urllib.parse.urljoin(self.default_balancer_url, '/set_cookie'), verify=False).cookies['SERVERID'],
('default-0', 'default-1'),
)
def test_balancer_respects_sticky_cookie(self):
# type: () -> None
# if request is made with the sticky cookie, the client stick on one balancer
cookies = dict(SERVERID='default-1')
self.assertEqual(
......@@ -388,6 +394,7 @@ class TestBalancer(BalancerTestCase):
'backend_web_server1')
def test_balancer_stats_socket(self):
# type: () -> None
# real time statistics can be obtained by using the stats socket and there
# is a wrapper which makes this a bit easier.
socat_process = subprocess.Popen(
......@@ -397,14 +404,14 @@ class TestBalancer(BalancerTestCase):
stderr=subprocess.STDOUT
)
try:
output, _ = socat_process.communicate("show stat\n")
output, _ = socat_process.communicate(b"show stat\n")
except:
socat_process.kill()
socat_process.wait()
raise
self.assertEqual(socat_process.poll(), 0)
# output is a csv
self.assertIn('family_default,FRONTEND,', output)
self.assertIn(b'family_default,FRONTEND,', output)
class TestTestRunnerEntryPoints(BalancerTestCase):
......@@ -413,7 +420,7 @@ class TestTestRunnerEntryPoints(BalancerTestCase):
__partition_reference__ = 't'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
parameter_dict = super(
TestTestRunnerEntryPoints,
cls,
......@@ -436,23 +443,24 @@ class TestTestRunnerEntryPoints(BalancerTestCase):
return parameter_dict
def test_use_proper_backend(self):
# type: () -> None
# requests are directed to proper backend based on URL path
test_runner_url_list = self.getRootPartitionConnectionParameterDict(
)['default-test-runner-url-list']
url_0, url_1, url_2 = test_runner_url_list
self.assertEqual(
urlparse.urlparse(url_0).netloc,
urlparse.urlparse(url_1).netloc)
six.moves.urllib.parse.urlparse(url_0).netloc,
six.moves.urllib.parse.urlparse(url_1).netloc)
self.assertEqual(
urlparse.urlparse(url_0).netloc,
urlparse.urlparse(url_2).netloc)
six.moves.urllib.parse.urlparse(url_0).netloc,
six.moves.urllib.parse.urlparse(url_2).netloc)
path_0 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_0/something'.format(
netloc=urlparse.urlparse(url_0).netloc)
netloc=six.moves.urllib.parse.urlparse(url_0).netloc)
path_1 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_1/something'.format(
netloc=urlparse.urlparse(url_0).netloc)
netloc=six.moves.urllib.parse.urlparse(url_0).netloc)
path_2 = '/VirtualHostBase/https/{netloc}/VirtualHostRoot/_vh_unit_test_2/something'.format(
netloc=urlparse.urlparse(url_0).netloc)
netloc=six.moves.urllib.parse.urlparse(url_0).netloc)
self.assertEqual(
{
......@@ -489,7 +497,7 @@ class TestHTTP(BalancerTestCase):
"""
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
parameter_dict = super(TestHTTP, cls)._getInstanceParameterDict()
# use a HTTP/1.1 server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("HTTP/1.1 Server", EchoHTTP11Server).netloc, 1, False]]
......@@ -511,7 +519,7 @@ class TestHTTP(BalancerTestCase):
'%{http_version}',
self.default_balancer_url,
]),
'2',
b'2',
)
def test_keep_alive(self):
......@@ -530,7 +538,7 @@ class TestHTTP(BalancerTestCase):
session.get(self.default_balancer_url).raise_for_status()
new_conn.assert_not_called()
parsed_url = urlparse.urlparse(self.default_balancer_url)
parsed_url = six.moves.urllib.parse.urlparse(self.default_balancer_url)
# check that we have an open file for the ip connection
self.assertTrue([
c for c in psutil.Process(os.getpid()).connections()
......@@ -553,12 +561,12 @@ class ContentTypeHTTPServer(ManagedHTTPServer):
# type: () -> None
self.send_response(200)
if self.path == '/':
self.send_header("Content-Length", 0)
self.send_header("Content-Length", '0')
return self.end_headers()
content_type = self.path[1:]
body = "OK"
body = b"OK"
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", len(body))
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
......@@ -571,7 +579,7 @@ class TestContentEncoding(BalancerTestCase):
__partition_reference__ = 'ce'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
parameter_dict = super(TestContentEncoding, cls)._getInstanceParameterDict()
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("content_type_server", ContentTypeHTTPServer).netloc, 1, False],
......@@ -599,7 +607,7 @@ class TestContentEncoding(BalancerTestCase):
'application/font-woff2',
'application/x-font-opentype',
'application/wasm',):
resp = requests.get(urlparse.urljoin(self.default_balancer_url, content_type), verify=False)
resp = requests.get(six.moves.urllib.parse.urljoin(self.default_balancer_url, content_type), verify=False)
self.assertEqual(resp.headers['Content-Type'], content_type)
self.assertEqual(
resp.headers.get('Content-Encoding'),
......@@ -609,7 +617,7 @@ class TestContentEncoding(BalancerTestCase):
def test_no_gzip_encoding(self):
# type: () -> None
resp = requests.get(urlparse.urljoin(self.default_balancer_url, '/image/png'), verify=False)
resp = requests.get(six.moves.urllib.parse.urljoin(self.default_balancer_url, '/image/png'), verify=False)
self.assertNotIn('Content-Encoding', resp.headers)
self.assertEqual(resp.text, 'OK')
......@@ -692,7 +700,7 @@ class CaucaseCertificate(ManagedResource):
cas_args + [
'--send-csr', self.csr_file,
],
).split()[0]
).split()[0].decode()
assert csr_id
for _ in range(30):
......@@ -708,11 +716,11 @@ class CaucaseCertificate(ManagedResource):
time.sleep(1)
else:
raise RuntimeError('getting service certificate failed.')
with open(self.cert_file) as f:
assert 'BEGIN CERTIFICATE' in f.read()
with open(self.cert_file) as cert_file:
assert 'BEGIN CERTIFICATE' in cert_file.read()
def revoke(self, caucase):
# type: (str, CaucaseService) -> None
# type: (CaucaseService) -> None
"""Revoke the client certificate on this caucase instance.
"""
subprocess.check_call([
......@@ -729,7 +737,7 @@ class TestFrontendXForwardedFor(BalancerTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
frontend_caucase = cls.getManagedResource('frontend_caucase', CaucaseService)
certificate = cls.getManagedResource('client_certificate', CaucaseCertificate)
certificate.request(u'shared frontend', frontend_caucase)
......@@ -784,10 +792,10 @@ class TestServerTLSProvidedCertificate(BalancerTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
server_caucase = cls.getManagedResource('server_caucase', CaucaseService)
server_certificate = cls.getManagedResource('server_certificate', CaucaseCertificate)
server_certificate.request(cls._ipv4_address.decode(), server_caucase)
server_certificate.request(six.ensure_text(cls._ipv4_address), server_caucase)
parameter_dict = super(TestServerTLSProvidedCertificate, cls)._getInstanceParameterDict()
with open(server_certificate.cert_file) as f:
parameter_dict['ssl']['cert'] = f.read()
......@@ -806,7 +814,7 @@ class TestClientTLS(BalancerTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
frontend_caucase1 = cls.getManagedResource('frontend_caucase1', CaucaseService)
certificate1 = cls.getManagedResource('client_certificate1', CaucaseCertificate)
certificate1.request(u'client_certificate1', frontend_caucase1)
......@@ -827,7 +835,6 @@ class TestClientTLS(BalancerTestCase):
def test_refresh_crl(self):
# type: () -> None
logger = self.logger
class DebugLogFile:
......@@ -846,6 +853,7 @@ class TestClientTLS(BalancerTestCase):
# when client certificate can be authenticated, backend receive the CN of
# the client certificate in "remote-user" header
def _make_request():
# type: () -> dict
return requests.get(
self.default_balancer_url,
cert=(client_certificate.cert_file, client_certificate.key_file),
......@@ -897,6 +905,7 @@ class TestClientTLS(BalancerTestCase):
with self.assertRaisesRegexp(Exception, 'certificate revoked'):
_make_request()
class TestPathBasedRouting(BalancerTestCase):
"""Check path-based routing rewrites URLs as expected.
"""
......@@ -904,7 +913,7 @@ class TestPathBasedRouting(BalancerTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
# type: () -> dict
parameter_dict = super(
TestPathBasedRouting,
cls,
......@@ -936,14 +945,15 @@ class TestPathBasedRouting(BalancerTestCase):
published_dict = json.loads(self.computer_partition.getConnectionParameterDict()['_'])
scheme = 'scheme'
netloc = 'example.com:8080'
prefix = '/VirtualHostBase/' + scheme + '//' + urllib.quote(
prefix = '/VirtualHostBase/' + scheme + '//' + six.moves.urllib.parse.quote(
netloc,
safe='',
)
# For easier reading of test data, visualy separating the virtual host
# For easier reading of test data, visually separating the virtual host
# base from the virtual host root
vhr = '/VirtualHostRoot'
def assertRoutingEqual(family, path, expected_path):
# type: (str, str, str) -> None
# sanity check: unlike the rules, this test is sensitive to outermost
# slashes, and paths must be absolute-ish for code simplicity.
assert path.startswith('/')
......@@ -959,7 +969,7 @@ class TestPathBasedRouting(BalancerTestCase):
# test will need to be updated accordingly.
self.assertEqual(
requests.get(
urlparse.urljoin(published_dict[family], prefix + vhr + path),
six.moves.urllib.parse.urljoin(published_dict[family], prefix + vhr + path),
verify=False,
).json()['Path'],
expected_path,
......@@ -978,7 +988,7 @@ class TestPathBasedRouting(BalancerTestCase):
# Rule precedence: family rules applied before general rules.
assertRoutingEqual('default', '/next', prefix + '/erp5/web_site_module/another_next_website' + vhr + '/_vh_next')
# Fallback on general rules when no family-specific rule matches
# Note: the root is special in that there is aways a trailing slash in the
# Note: the root is special in that there is always a trailing slash in the
# produced URL.
assertRoutingEqual('default', '/', prefix + '/erp5/web_site_module/123' + vhr + '/')
# Rule-less family reach general rules.
......
......@@ -25,10 +25,11 @@
#
##############################################################################
from __future__ import absolute_import
import os
import json
import glob
import urlparse
import six.moves.urllib.parse
import socket
import time
......@@ -37,6 +38,9 @@ import requests
from . import ERP5InstanceTestCase
from . import setUpModule
import six
from six.moves import map
from six.moves import range
setUpModule # pyflakes
......@@ -48,7 +52,7 @@ class TestPublishedURLIsReachableMixin(object):
# We access ERP5 trough a "virtual host", which should make
# ERP5 produce URLs using https://virtual-host-name:1234/virtual_host_root
# as base.
virtual_host_url = urlparse.urljoin(
virtual_host_url = six.moves.urllib.parse.urljoin(
base_url,
'/VirtualHostBase/https/virtual-host-name:1234/{}/VirtualHostRoot/_vh_virtual_host_root/'
.format(site_id))
......@@ -76,7 +80,7 @@ class TestPublishedURLIsReachableMixin(object):
# login page can be rendered and contain the text "ERP5"
r = session.get(
urlparse.urljoin(base_url, '{}/login_form'.format(site_id)),
six.moves.urllib.parse.urljoin(base_url, '{}/login_form'.format(site_id)),
verify=verify,
allow_redirects=False,
)
......@@ -119,6 +123,7 @@ class TestMedusa(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
def getInstanceParameterDict(cls):
return {'_': json.dumps({'wsgi': False})}
class TestJupyter(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
"""Test ERP5 Jupyter notebook
"""
......@@ -143,6 +148,7 @@ class TestJupyter(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
[result.status_code, result.is_redirect, result.headers['Location']]
)
class TestBalancerPorts(ERP5InstanceTestCase):
"""Instantiate with two zope families, this should create for each family:
- a balancer entry point with corresponding haproxy
......@@ -169,7 +175,7 @@ class TestBalancerPorts(ERP5InstanceTestCase):
}
def checkValidHTTPSURL(self, url):
parsed = urlparse.urlparse(url)
parsed = six.moves.urllib.parse.urlparse(url)
self.assertEqual(parsed.scheme, 'https')
self.assertTrue(parsed.hostname)
self.assertTrue(parsed.port)
......@@ -254,7 +260,7 @@ class TestSeleniumTestRunner(ERP5InstanceTestCase, TestPublishedURLIsReachableMi
with open(config_file.strip()) as f:
self.assertEqual(
f.read(),
json.dumps(json.loads(self.getInstanceParameterDict()['_'])['test-runner']))
json.dumps(json.loads(self.getInstanceParameterDict()['_'])['test-runner'], sort_keys=True))
class TestDisableTestRunner(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
......@@ -270,8 +276,8 @@ class TestDisableTestRunner(ERP5InstanceTestCase, TestPublishedURLIsReachableMix
"""
# self.computer_partition_root_path is the path of root partition.
# we want to assert that no scripts exist in any partition.
bin_programs = map(os.path.basename,
glob.glob(self.computer_partition_root_path + "/../*/bin/*"))
bin_programs = list(map(os.path.basename,
glob.glob(self.computer_partition_root_path + "/../*/bin/*")))
self.assertTrue(bin_programs) # just to check the glob was correct.
self.assertNotIn('runUnitTest', bin_programs)
......@@ -352,7 +358,7 @@ class TestZopeNodeParameterOverride(ERP5InstanceTestCase, TestPublishedURLIsReac
storage["storage"] = "root"
storage["server"] = zeo_addr
with open('%s/etc/zope-%s.conf' % (partition, zope)) as f:
conf = map(str.strip, f.readlines())
conf = list(map(str.strip, f.readlines()))
i = conf.index("<zodb_db root>") + 1
conf = iter(conf[i:conf.index("</zodb_db>", i)])
for line in conf:
......@@ -361,23 +367,23 @@ class TestZopeNodeParameterOverride(ERP5InstanceTestCase, TestPublishedURLIsReac
if line == '</zeoclient>':
break
checkParameter(line, storage)
for k, v in storage.iteritems():
for k, v in six.iteritems(storage):
self.assertIsNone(v, k)
del storage
else:
checkParameter(line, zodb)
for k, v in zodb.iteritems():
for k, v in six.iteritems(zodb):
self.assertIsNone(v, k)
partition = self.getComputerPartitionPath('zope-a')
for zope in xrange(3):
for zope in range(3):
checkConf({
"cache-size-bytes": "20MB",
}, {
"cache-size": "50MB",
})
partition = self.getComputerPartitionPath('zope-bb')
for zope in xrange(5):
for zope in range(5):
checkConf({
"cache-size-bytes": "500MB" if zope else 1<<20,
}, {
......
......@@ -26,10 +26,11 @@
#
##############################################################################
from __future__ import absolute_import
import os
import json
import glob
import urlparse
import six.moves.urllib.parse
import socket
import sys
import time
......@@ -60,6 +61,7 @@ class MariaDBTestCase(ERP5InstanceTestCase):
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
return {
'tcpv4-port': 3306,
'max-connection-count': 5,
......@@ -75,12 +77,14 @@ class MariaDBTestCase(ERP5InstanceTestCase):
@classmethod
def getInstanceParameterDict(cls):
# type: () -> dict
return {'_': json.dumps(cls._getInstanceParameterDict())}
def getDatabaseConnection(self):
# type: () -> MySQLdb.connections.Connection
connection_parameter_dict = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])
db_url = urlparse.urlparse(connection_parameter_dict['database-list'][0])
db_url = six.moves.urllib.parse.urlparse(connection_parameter_dict['database-list'][0])
self.assertEqual('mysql', db_url.scheme)
self.assertTrue(db_url.path.startswith('/'))
......@@ -91,12 +95,15 @@ class MariaDBTestCase(ERP5InstanceTestCase):
host=db_url.hostname,
port=db_url.port,
db=database_name,
use_unicode=True,
charset='utf8mb4'
)
class TestCrontabs(MariaDBTestCase, CrontabMixin):
def test_full_backup(self):
# type: () -> None
self._executeCrontabAtDate('mariadb-backup', '2050-01-01')
with gzip.open(
os.path.join(
......@@ -106,10 +113,11 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
'mariadb-full',
'20500101000000.sql.gz',
),
'r') as dump:
'rt') as dump:
self.assertIn('CREATE TABLE', dump.read())
def test_logrotate_and_slow_query_digest(self):
# type: () -> None
# slow query digest needs to run after logrotate, since it operates on the rotated
# file, so this tests both logrotate and slow query digest.
......@@ -148,7 +156,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
'slowquery_digest',
'slowquery_digest.txt-2050-01-01.xz',
)
with lzma.open(slow_query_report, 'r') as f:
with lzma.open(slow_query_report, 'rt') as f:
# this is the hash for our "select sleep(n)" slow query
self.assertIn("ID 0xF9A57DD5A41825CA", f.read())
......@@ -170,7 +178,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
subprocess.check_output('faketime 2050-01-01 %s' % check_slow_query_promise_plugin['command'], shell=True)
self.assertEqual(
error_context.exception.output,
"""\
b"""\
Threshold is lower than expected:
Expected total queries : 1.0 and current is: 2
Expected slowest query : 0.1 and current is: 3
......@@ -179,6 +187,7 @@ Expected slowest query : 0.1 and current is: 3
class TestMariaDB(MariaDBTestCase):
def test_utf8_collation(self):
# type: () -> None
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
......@@ -199,11 +208,12 @@ class TestMariaDB(MariaDBTestCase):
"""
select * from test_utf8_collation where col1 = "a"
""")
self.assertEqual((('à',),), cnx.store_result().fetch_row(maxrows=2))
self.assertEqual(((u'à',),), cnx.store_result().fetch_row(maxrows=2))
class TestMroonga(MariaDBTestCase):
def test_mroonga_plugin_loaded(self):
# type: () -> None
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("show plugins")
......@@ -213,6 +223,7 @@ class TestMroonga(MariaDBTestCase):
plugins)
def test_mroonga_normalize_udf(self):
# type: () -> None
# example from https://mroonga.org/docs/reference/udf/mroonga_normalize.html#usage
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
......@@ -220,7 +231,8 @@ class TestMroonga(MariaDBTestCase):
"""
SELECT mroonga_normalize("ABCDあぃうぇ㍑")
""")
self.assertEqual((('abcdあぃうぇリットル',),),
# XXX this is returned as bytes by mroonga/mariadb (this might be a bug)
self.assertEqual(((u'abcdあぃうぇリットル'.encode('utf-8'),),),
cnx.store_result().fetch_row(maxrows=2))
if 0:
......@@ -233,10 +245,11 @@ class TestMroonga(MariaDBTestCase):
"""
SELECT mroonga_normalize("aBcDあぃウェ㍑", "NormalizerMySQLUnicodeCIExceptKanaCIKanaWithVoicedSoundMark")
""")
self.assertEqual((('ABCDあぃうぇ㍑',),),
self.assertEqual(((u'ABCDあぃうぇ㍑'.encode('utf-8'),),),
cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_normalizer(self):
# type: () -> None
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-normalizer
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
......@@ -269,11 +282,12 @@ class TestMroonga(MariaDBTestCase):
WHERE MATCH (content) AGAINST ("+ブラック" IN BOOLEAN MODE)
""")
self.assertEqual(
((datetime.date(2013, 4, 23), 'ブラックコーヒーを飲んだ。'),),
((datetime.date(2013, 4, 23), u'ブラックコーヒーを飲んだ。'),),
cnx.store_result().fetch_row(maxrows=2),
)
def test_mroonga_full_text_normalizer_TokenBigramSplitSymbolAlphaDigit(self):
# type: () -> None
# Similar to as ERP5's testI18NSearch with erp5_full_text_mroonga_catalog
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
......@@ -317,11 +331,12 @@ class TestMroonga(MariaDBTestCase):
self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_stem(self):
# type: () -> None
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-token-filters
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("SELECT mroonga_command('register token_filters/stem')")
self.assertEqual((('true',),), cnx.store_result().fetch_row(maxrows=2))
self.assertEqual(((b'true',),), cnx.store_result().fetch_row(maxrows=2))
cnx.query(
"""
CREATE TABLE memos (
......
......@@ -16,6 +16,7 @@
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
from __future__ import absolute_import
import json
import os.path
import unittest
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment