Commit 4089919c authored by Jérome Perrin's avatar Jérome Perrin

software/erp5/test: use type annotations instead of type comments

using https://github.com/ilevkivskyi/com2ann
parent 3a2f1c1a
......@@ -34,8 +34,7 @@ class EchoHTTPServer(ManagedHTTPServer):
encoded in json.
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
def do_GET(self) -> None:
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps(
......@@ -57,8 +56,7 @@ class EchoHTTP11Server(ManagedHTTPServer):
"""
class RequestHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def do_GET(self):
# type: () -> None
def do_GET(self) -> None:
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps(
......@@ -78,12 +76,11 @@ class EchoHTTP11Server(ManagedHTTPServer):
class CaucaseService(ManagedResource):
"""A caucase service.
"""
url = None # type: str
directory = None # type: str
_caucased_process = None # type: subprocess.Popen
url: str = None
directory: str = None
_caucased_process: subprocess.Popen = None
def open(self):
# type: () -> None
def open(self) -> None:
# start a caucased and server certificate.
software_release_root_path = os.path.join(
self._cls.slap._software_root,
......@@ -121,8 +118,7 @@ class CaucaseService(ManagedResource):
else:
raise RuntimeError('caucased failed to start.')
def close(self):
# type: () -> None
def close(self) -> None:
self._caucased_process.terminate()
self._caucased_process.wait()
self._caucased_process.stdout.close()
......@@ -143,8 +139,7 @@ class BalancerTestCase(ERP5InstanceTestCase):
return 'balancer'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
return {
'tcpv4-port': 8000,
'computer-memory-percent-threshold': 100,
......@@ -179,12 +174,10 @@ class BalancerTestCase(ERP5InstanceTestCase):
}
@classmethod
def getInstanceParameterDict(cls):
# type: () -> dict
def getInstanceParameterDict(cls) -> dict:
return {'_': json.dumps(cls._getInstanceParameterDict())}
def setUp(self):
# type: () -> None
def setUp(self) -> None:
self.default_balancer_url = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])['default']
......@@ -195,8 +188,7 @@ class SlowHTTPServer(ManagedHTTPServer):
Timeout is 2 seconds by default, and can be specified in the path of the URL
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
def do_GET(self) -> None:
self.send_response(200)
self.send_header("Content-Type", "text/plain")
timeout = 2
......@@ -214,8 +206,7 @@ class SlowHTTPServer(ManagedHTTPServer):
class TestTimeout(BalancerTestCase, CrontabMixin):
__partition_reference__ = 't'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
# use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
......@@ -223,8 +214,7 @@ class TestTimeout(BalancerTestCase, CrontabMixin):
parameter_dict['timeout-dict'] = {'default': 1}
return parameter_dict
def test_timeout(self):
# type: () -> None
def test_timeout(self) -> None:
self.assertEqual(
requests.get(
urllib.parse.urljoin(self.default_balancer_url, '/1'),
......@@ -242,15 +232,13 @@ class TestLog(BalancerTestCase, CrontabMixin):
"""
__partition_reference__ = 'l'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
# use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
return parameter_dict
def test_access_log_format(self):
# type: () -> None
def test_access_log_format(self) -> None:
requests.get(
urllib.parse.urljoin(self.default_balancer_url, '/url_path'),
verify=False,
......@@ -274,8 +262,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
self.assertGreater(request_time, 2 * 1000)
self.assertLess(request_time, 20 * 1000)
def test_access_log_apachedex_report(self):
# type: () -> None
def test_access_log_apachedex_report(self) -> None:
# make a request so that we have something in the logs
requests.get(self.default_balancer_url, verify=False)
......@@ -297,8 +284,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
# having this table means that apachedex could parse some lines.
self.assertIn('<h2>Hits per status code</h2>', report_text)
def test_access_log_rotation(self):
# type: () -> None
def test_access_log_rotation(self) -> None:
# run logrotate a first time so that it create state files
self._executeCrontabAtDate('logrotate', '2000-01-01')
......@@ -324,8 +310,7 @@ class TestLog(BalancerTestCase, CrontabMixin):
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
def test_error_log(self):
# type: () -> None
def test_error_log(self) -> None:
# stop backend server
backend_server = self.getManagedResource("slow_web_server", SlowHTTPServer)
self.addCleanup(backend_server.open)
......@@ -356,8 +341,7 @@ class BalancerCookieHTTPServer(ManagedHTTPServer):
def RequestHandler(self):
server = self
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
def do_GET(self) -> None:
self.send_response(200)
self.send_header("Content-Type", "text/plain")
if self.path == '/set_cookie':
......@@ -378,8 +362,7 @@ class TestBalancer(BalancerTestCase):
"""
__partition_reference__ = 'b'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
# use two backend servers
......@@ -389,16 +372,14 @@ class TestBalancer(BalancerTestCase):
]
return parameter_dict
def test_balancer_round_robin(self):
# type: () -> None
def test_balancer_round_robin(self) -> None:
# requests are by default balanced to both servers
self.assertEqual(
{requests.get(self.default_balancer_url, verify=False).text for _ in range(10)},
{'backend_web_server1', 'backend_web_server2'}
)
def test_balancer_server_down(self):
# type: () -> None
def test_balancer_server_down(self) -> None:
# if one backend is down, it is excluded from balancer
self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close()
self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open)
......@@ -407,8 +388,7 @@ class TestBalancer(BalancerTestCase):
{'backend_web_server1',}
)
def test_balancer_set_cookie(self):
# type: () -> None
def test_balancer_set_cookie(self) -> None:
# if backend provides a "SERVERID" cookie, balancer will overwrite it with the
# backend selected by balancing algorithm
self.assertIn(
......@@ -416,8 +396,7 @@ class TestBalancer(BalancerTestCase):
('default-0', 'default-1'),
)
def test_balancer_respects_sticky_cookie(self):
# type: () -> None
def test_balancer_respects_sticky_cookie(self) -> None:
# if request is made with the sticky cookie, the client stick on one balancer
cookies = dict(SERVERID='default-1')
self.assertEqual(
......@@ -432,8 +411,7 @@ class TestBalancer(BalancerTestCase):
requests.get(self.default_balancer_url, verify=False, cookies=cookies).text,
'backend_web_server1')
def test_balancer_stats_socket(self):
# type: () -> None
def test_balancer_stats_socket(self) -> None:
# real time statistics can be obtained by using the stats socket and there
# is a wrapper which makes this a bit easier.
socat_process = subprocess.Popen(
......@@ -458,8 +436,7 @@ class TestTestRunnerEntryPoints(BalancerTestCase):
"""
__partition_reference__ = 't'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['dummy_http_server-test-runner-address-list'] = [
......@@ -478,8 +455,7 @@ class TestTestRunnerEntryPoints(BalancerTestCase):
]
return parameter_dict
def test_use_proper_backend(self):
# type: () -> None
def test_use_proper_backend(self) -> None:
# requests are directed to proper backend based on URL path
test_runner_url_list = self.getRootPartitionConnectionParameterDict(
)['default-test-runner-url-list']
......@@ -532,8 +508,7 @@ class TestHTTP(BalancerTestCase):
"""Check HTTP protocol with a HTTP/1.1 backend
"""
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
# use a HTTP/1.1 server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("HTTP/1.1 Server", EchoHTTP11Server).netloc, 1, False]]
......@@ -541,8 +516,7 @@ class TestHTTP(BalancerTestCase):
__partition_reference__ = 'h'
def test_http_version(self):
# type: () -> None
def test_http_version(self) -> None:
self.assertEqual(
subprocess.check_output([
'curl',
......@@ -558,8 +532,7 @@ class TestHTTP(BalancerTestCase):
b'2',
)
def test_keep_alive(self):
# type: () -> None
def test_keep_alive(self) -> None:
# when doing two requests, connection is established only once
with requests.Session() as session:
session.verify = False
......@@ -594,8 +567,7 @@ class ContentTypeHTTPServer(ManagedHTTPServer):
"""
class RequestHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
def do_GET(self):
# type: () -> None
def do_GET(self) -> None:
self.send_response(200)
if self.path == '/':
self.send_header("Content-Length", '0')
......@@ -615,16 +587,14 @@ class TestContentEncoding(BalancerTestCase):
"""
__partition_reference__ = 'ce'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("content_type_server", ContentTypeHTTPServer).netloc, 1, False],
]
return parameter_dict
def test_gzip_encoding(self):
# type: () -> None
def test_gzip_encoding(self) -> None:
for content_type in (
'text/cache-manifest',
'text/html',
......@@ -652,8 +622,7 @@ class TestContentEncoding(BalancerTestCase):
'{} uses wrong encoding: {}'.format(content_type, resp.headers.get('Content-Encoding')))
self.assertEqual(resp.text, 'OK')
def test_no_gzip_encoding(self):
# type: () -> None
def test_no_gzip_encoding(self) -> None:
resp = requests.get(urllib.parse.urljoin(self.default_balancer_url, '/image/png'), verify=False)
self.assertNotIn('Content-Encoding', resp.headers)
self.assertEqual(resp.text, 'OK')
......@@ -663,14 +632,13 @@ class CaucaseCertificate(ManagedResource):
"""A certificate signed by a caucase service.
"""
ca_crt_file = None # type: str
crl_file = None # type: str
csr_file = None # type: str
cert_file = None # type: str
key_file = None # type: str
ca_crt_file: str = None
crl_file: str = None
csr_file: str = None
cert_file: str = None
key_file: str = None
def open(self):
# type: () -> None
def open(self) -> None:
self.tmpdir = tempfile.mkdtemp()
self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem')
self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem')
......@@ -678,13 +646,11 @@ class CaucaseCertificate(ManagedResource):
self.cert_file = os.path.join(self.tmpdir, 'crt.pem')
self.key_file = os.path.join(self.tmpdir, 'key.pem')
def close(self):
# type: () -> None
def close(self) -> None:
shutil.rmtree(self.tmpdir)
@property
def _caucase_path(self):
# type: () -> str
def _caucase_path(self) -> str:
"""path of caucase executable.
"""
software_release_root_path = os.path.join(
......@@ -693,8 +659,7 @@ class CaucaseCertificate(ManagedResource):
)
return os.path.join(software_release_root_path, 'bin', 'caucase')
def request(self, common_name, caucase):
# type: (str, CaucaseService) -> None
def request(self, common_name: str, caucase: CaucaseService) -> None:
"""Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance.
......@@ -756,8 +721,7 @@ class CaucaseCertificate(ManagedResource):
with open(self.cert_file) as cert_file:
assert 'BEGIN CERTIFICATE' in cert_file.read()
def revoke(self, caucase):
# type: (CaucaseService) -> None
def revoke(self, caucase: CaucaseService) -> None:
"""Revoke the client certificate on this caucase instance.
"""
subprocess.check_call([
......@@ -773,8 +737,7 @@ class TestFrontendXForwardedFor(BalancerTestCase):
__partition_reference__ = 'xff'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
frontend_caucase = cls.getManagedResource('frontend_caucase', CaucaseService)
certificate = cls.getManagedResource('client_certificate', CaucaseCertificate)
certificate.request('shared frontend', frontend_caucase)
......@@ -791,8 +754,7 @@ class TestFrontendXForwardedFor(BalancerTestCase):
parameter_dict['ssl']['frontend-caucase-url-list'] = [frontend_caucase.url]
return parameter_dict
def test_x_forwarded_for_added_when_verified_connection(self):
# type: () -> None
def test_x_forwarded_for_added_when_verified_connection(self) -> None:
client_certificate = self.getManagedResource('client_certificate', CaucaseCertificate)
for backend in ('default', 'default-auth'):
......@@ -805,8 +767,7 @@ class TestFrontendXForwardedFor(BalancerTestCase):
).json()
self.assertEqual(result['Incoming Headers'].get('x-forwarded-for', '').split(', ')[0], '1.2.3.4')
def test_x_forwarded_for_stripped_when_no_certificate(self):
# type: () -> None
def test_x_forwarded_for_stripped_when_no_certificate(self) -> None:
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
result = requests.get(
balancer_url,
......@@ -822,8 +783,7 @@ class TestFrontendXForwardedFor(BalancerTestCase):
verify=False,
)
def test_x_forwarded_for_stripped_when_not_verified_certificate(self):
# type: () -> None
def test_x_forwarded_for_stripped_when_not_verified_certificate(self) -> None:
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
# certificate from an unknown CA
......@@ -855,8 +815,7 @@ class TestServerTLSProvidedCertificate(BalancerTestCase):
__partition_reference__ = 's'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
server_caucase = cls.getManagedResource('server_caucase', CaucaseService)
server_certificate = cls.getManagedResource('server_certificate', CaucaseCertificate)
server_certificate.request(cls._ipv4_address, server_caucase)
......@@ -867,8 +826,7 @@ class TestServerTLSProvidedCertificate(BalancerTestCase):
parameter_dict['ssl']['key'] = f.read()
return parameter_dict
def test_certificate_validates_with_provided_ca(self):
# type: () -> None
def test_certificate_validates_with_provided_ca(self) -> None:
server_certificate = self.getManagedResource("server_certificate", CaucaseCertificate)
requests.get(self.default_balancer_url, verify=server_certificate.ca_crt_file)
......@@ -877,8 +835,7 @@ class TestClientTLS(BalancerTestCase):
__partition_reference__ = 'c'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
frontend_caucase1 = cls.getManagedResource('frontend_caucase1', CaucaseService)
certificate1 = cls.getManagedResource('client_certificate1', CaucaseCertificate)
certificate1.request('client_certificate1', frontend_caucase1)
......@@ -897,8 +854,7 @@ class TestClientTLS(BalancerTestCase):
]
return parameter_dict
def test_refresh_crl(self):
# type: () -> None
def test_refresh_crl(self) -> None:
logger = self.logger
class DebugLogFile:
......@@ -916,8 +872,7 @@ class TestClientTLS(BalancerTestCase):
# when client certificate can be authenticated, backend receive the CN of
# the client certificate in "remote-user" header
def _make_request():
# type: () -> dict
def _make_request() -> dict:
return requests.get(
self.default_balancer_url,
cert=(client_certificate.cert_file, client_certificate.key_file),
......@@ -976,8 +931,7 @@ class TestPathBasedRouting(BalancerTestCase):
__partition_reference__ = 'pbr'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
parameter_dict = super()._getInstanceParameterDict()
parameter_dict['zope-family-dict'][
'second'
......@@ -1003,8 +957,7 @@ class TestPathBasedRouting(BalancerTestCase):
]
return parameter_dict
def test_routing(self):
# type: () -> None
def test_routing(self) -> None:
published_dict = json.loads(self.computer_partition.getConnectionParameterDict()['_'])
scheme = 'scheme'
netloc = 'example.com:8080'
......@@ -1015,8 +968,7 @@ class TestPathBasedRouting(BalancerTestCase):
# For easier reading of test data, visually separating the virtual host
# base from the virtual host root
vhr = '/VirtualHostRoot'
def assertRoutingEqual(family, path, expected_path):
# type: (str, str, str) -> None
def assertRoutingEqual(family: str, path: str, expected_path: str) -> None:
# sanity check: unlike the rules, this test is sensitive to outermost
# slashes, and paths must be absolute-ish for code simplicity.
assert path.startswith('/')
......
......@@ -634,8 +634,7 @@ class ZopeTestMixin(ZopeSkinsMixin, CrontabMixin):
)):
os.unlink(logfile)
def _getCrontabCommand(self, crontab_name):
# type: (str) -> str
def _getCrontabCommand(self, crontab_name: str) -> str:
"""Read a crontab and return the command that is executed.
overloaded to use crontab from zope partition
......@@ -1046,8 +1045,7 @@ class TestNEO(ZopeSkinsMixin, CrontabMixin, ERP5InstanceTestCase):
__partition_reference__ = 'n'
__test_matrix__ = matrix((neo,))
def _getCrontabCommand(self, crontab_name):
# type: (str) -> str
def _getCrontabCommand(self, crontab_name: str) -> str:
"""Read a crontab and return the command that is executed.
overloaded to use crontab from neo partition
......@@ -1127,7 +1125,7 @@ class TestUnsetWithMaxRlimitNofileParameter(ERP5InstanceTestCase, TestPublishedU
"""
__partition_reference__ = 'unset-with-max-rlimit-nofile'
def test_unset_with_max_rlimit_nofile(self):
def test_unset_with_max_rlimit_nofile(self) -> None:
with self.slap.instance_supervisor_rpc as supervisor:
all_process_info = supervisor.getAllProcessInfo()
limit = resource.getrlimit(resource.RLIMIT_NOFILE)
......
......@@ -60,8 +60,7 @@ class MariaDBTestCase(ERP5InstanceTestCase):
return "mariadb"
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> dict
def _getInstanceParameterDict(cls) -> dict:
return {
'tcpv4-port': 3306,
'max-connection-count': 5,
......@@ -76,12 +75,10 @@ class MariaDBTestCase(ERP5InstanceTestCase):
}
@classmethod
def getInstanceParameterDict(cls):
# type: () -> dict
def getInstanceParameterDict(cls) -> dict:
return {'_': json.dumps(cls._getInstanceParameterDict())}
def getDatabaseConnection(self):
# type: () -> MySQLdb.connections.Connection
def getDatabaseConnection(self) -> MySQLdb.connections.Connection:
connection_parameter_dict = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])
db_url = urllib.parse.urlparse(connection_parameter_dict['database-list'][0])
......@@ -106,8 +103,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
'*/srv/backup/*',
)
def test_full_backup(self):
# type: () -> None
def test_full_backup(self) -> None:
self._executeCrontabAtDate('mariadb-backup', '2050-01-01')
full_backup_file, = glob.glob(
os.path.join(
......@@ -121,8 +117,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
with gzip.open(full_backup_file, 'rt') as dump:
self.assertIn('CREATE TABLE', dump.read())
def test_logrotate_and_slow_query_digest(self):
# type: () -> None
def test_logrotate_and_slow_query_digest(self) -> None:
# slow query digest needs to run after logrotate, since it operates on the rotated
# file, so this tests both logrotate and slow query digest.
......@@ -193,8 +188,7 @@ class TestCrontabs(MariaDBTestCase, CrontabMixin):
class TestMariaDB(MariaDBTestCase):
def test_utf8_collation(self):
# type: () -> None
def test_utf8_collation(self) -> None:
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query(
......@@ -219,8 +213,7 @@ class TestMariaDB(MariaDBTestCase):
class TestMroonga(MariaDBTestCase):
def test_mroonga_plugin_loaded(self):
# type: () -> None
def test_mroonga_plugin_loaded(self) -> None:
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
cnx.query("show plugins")
......@@ -229,8 +222,7 @@ class TestMroonga(MariaDBTestCase):
('Mroonga', 'ACTIVE', 'STORAGE ENGINE', 'ha_mroonga.so', 'GPL'),
plugins)
def test_mroonga_normalize_udf(self):
# type: () -> None
def test_mroonga_normalize_udf(self) -> None:
# example from https://mroonga.org/docs/reference/udf/mroonga_normalize.html#usage
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
......@@ -255,8 +247,7 @@ class TestMroonga(MariaDBTestCase):
self.assertEqual((('ABCDあぃうぇ㍑'.encode(),),),
cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_normalizer(self):
# type: () -> None
def test_mroonga_full_text_normalizer(self) -> None:
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-normalizer
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
......@@ -293,8 +284,7 @@ class TestMroonga(MariaDBTestCase):
cnx.store_result().fetch_row(maxrows=2),
)
def test_mroonga_full_text_normalizer_TokenBigramSplitSymbolAlphaDigit(self):
# type: () -> None
def test_mroonga_full_text_normalizer_TokenBigramSplitSymbolAlphaDigit(self) -> None:
# Similar to as ERP5's testI18NSearch with erp5_full_text_mroonga_catalog
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
......@@ -337,8 +327,7 @@ class TestMroonga(MariaDBTestCase):
""")
self.assertEqual(((1,),), cnx.store_result().fetch_row(maxrows=2))
def test_mroonga_full_text_stem(self):
# type: () -> None
def test_mroonga_full_text_stem(self) -> None:
# example from https://mroonga.org//docs/tutorial/storage.html#how-to-specify-the-token-filters
cnx = self.getDatabaseConnection()
with contextlib.closing(cnx):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment