Commit af7a0208 authored by Jérome Perrin's avatar Jérome Perrin

ERP5: Test balancer partition and use caucase certificate for balancer

Revert f8f72a17 ([erp5] don't use caucase generated certificate for now, 2019-03-12) since nothing prevents us drom using caucase certificate now.
 
Use [managed resources](nexedi/slapos.core!259) to simplify existing tests and introduce tests for:

## Access Log

 - [x] balancer partition should produce logs in apache "combined" log format with microsecond timing of requests.
 - [x] these logs should be rotated daily
 - [x] an [apachedex](https://lab.nexedi.com/nexedi/apachedex) report is ran on these logs daily.

## Balancing

 - [x] requests are balanced to multiple backends using round-robin algorithm
 - [x] if backend is down it is excluded
 - [x] a "sticky cookie" is used so that clients are associated to the same backend
    - [x] the cookie is set by balancer
    - [x] when client comes with a cookie it "sticks" on the associated backend
    - [x] if "sticked" backend is down, another backend will be used

## Content-Encoding

 - [x] balancer encodes responses in gzip for some configured content types.

## HTTP

 - [x] Server uses HTTP/1.1 or more and keep connection with clients

## TLS (server certificate)

In this MR we also change apache to use a caucase managed certificate and add test coverage for:

 - [x] balancer listen on https with a certificate that can be verified using the CA from caucase.
 - [x] balancer uses the new certificate when its own certificate is renewed.

But we don't add support for:
 -  ~~balancer can be instantiated with a certificate and key passed as SlapOS request parameters (code [here](https://lab.nexedi.com/nexedi/slapos/blob/757c1a4ddee93659d5e2649e4252d87bf9494566/stack/erp5/instance-balancer.cfg.in#L208-213))~~ this use case is the job of caucase, so we no longer support this.

## TLS (client certificate)
 - [x] balancer verifies frontend certificates from frontend caucases ( also tested in "Forwarded-For" section )
 - [x] if frontend provided a verified certificate, balancer set `remote-user` header
 - [x] balancer updates CRL from caucases ( `caucase-updater-housekeeper` )
 - (NOT TESTED) balancer updates CA certificate from caucase ( `caucase-updater-housekeeper` ). Since this is would be complex to test and basic functionality of `caucase-updater-housekeeper` for frontend caucases is covered by CRL test, we don't test this for simplicity.

## "Forwarded-For" header

This was also covered by existing tests:  

 - [x] balancer set `X-Forwarded-For` header when frontend certificate can be verified
 - [x] balancer strips existing `X-Forwarded-For`

## Integration with the rest of ERP5 software release

This was also covered by existing tests:  

- [x] The https URL of each Zope family is published and replies properly
- [x] Some https URLs are generated for `runUnitTest`, so that test run with an https certificate. This is also covered by regular ERP5 functional tests.

See merge request nexedi/slapos!840
parents babec085 f1173ea5
Pipeline #12115 failed with stage
in 0 seconds
......@@ -33,7 +33,7 @@ with open("README.md") as f:
setup(name=name,
version=version,
description="Test for SlapOS' ERP5 software releae",
description="Test for SlapOS' ERP5 software release",
long_description=long_description,
long_description_content_type='text/markdown',
maintainer="Nexedi",
......@@ -50,8 +50,9 @@ setup(name=name,
'mysqlclient',
'backports.lzma',
'cryptography',
'pexpect',
'pyOpenSSL',
],
zip_safe=True,
'typing; python_version<"3"',
],
test_suite='test',
)
from . import ERP5InstanceTestCase
from . import setUpModule
from slapos.testing.utils import findFreeTCPPort
from BaseHTTPServer import HTTPServer
from BaseHTTPServer import BaseHTTPRequestHandler
import OpenSSL.SSL
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography import x509
from cryptography.x509.oid import NameOID
import glob
import hashlib
import json
import multiprocessing
import logging
import os
import requests
import re
import shutil
import socket
import subprocess
import tempfile
import time
import urlparse
from BaseHTTPServer import BaseHTTPRequestHandler
from typing import Any, Dict, Optional
import idna
import mock
import OpenSSL.SSL
import pexpect
import psutil
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
from slapos.testing.testcase import ManagedResource
from slapos.testing.utils import (CrontabMixin, ManagedHTTPServer,
findFreeTCPPort)
from . import ERP5InstanceTestCase, setUpModule
setUpModule # pyflakes
class TestHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = {
'Path': self.path,
'Incoming Headers': self.headers.dict
}
response = json.dumps(response, indent=2)
self.end_headers()
self.wfile.write(response)
class TestFrontendXForwardedFor(ERP5InstanceTestCase):
__partition_reference__ = 'xff'
http_server_process = None
frontend_caucase_dir = None
frontend_caucased_process = None
backend_caucase_dir = None
backend_caucased_process = None
class EchoHTTPServer(ManagedHTTPServer):
"""An HTTP Server responding with the request path and incoming headers,
encoded in json.
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "application/json")
response = json.dumps(
{
'Path': self.path,
'Incoming Headers': self.headers.dict
},
indent=2,
)
self.end_headers()
self.wfile.write(response)
@classmethod
def getInstanceSoftwareType(cls):
return 'balancer'
log_message = logging.getLogger(__name__ + '.HeaderEchoHandler').info
@classmethod
def setUpClass(cls):
# start a dummy web server echoing headers.
http_server_port = findFreeTCPPort(cls._ipv4_address)
server = HTTPServer(
(cls._ipv4_address, http_server_port),
TestHandler)
cls.http_server_process = multiprocessing.Process(
target=server.serve_forever, name='HTTPServer')
cls.http_server_process.start()
cls.http_server_netloc = '%s:%s' % (cls._ipv4_address, http_server_port)
# start a caucased and generate a valid client certificate.
cls.computer_partition_root_path = os.path.abspath(os.curdir)
cls.frontend_caucase_dir = tempfile.mkdtemp()
frontend_caucased_dir = os.path.join(cls.frontend_caucase_dir, 'caucased')
os.mkdir(frontend_caucased_dir)
frontend_user_dir = os.path.join(cls.frontend_caucase_dir, 'user')
os.mkdir(frontend_user_dir)
frontend_service_dir = os.path.join(cls.frontend_caucase_dir, 'service')
os.mkdir(frontend_service_dir)
frontend_caucased_netloc = '%s:%s' % (cls._ipv4_address, findFreeTCPPort(cls._ipv4_address))
cls.frontend_caucased_url = 'http://' + frontend_caucased_netloc
cls.user_certificate = frontend_user_key = os.path.join(frontend_user_dir, 'client.key.pem')
frontend_user_csr = os.path.join(frontend_user_dir, 'client.csr.pem')
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(frontend_user_key, 'wb') as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'user'),
])).sign(key, hashes.SHA256(), default_backend())
with open(frontend_user_csr, 'wb') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
class CaucaseService(ManagedResource):
"""A caucase service.
"""
url = None # type: str
directory = None # type: str
_caucased_process = None # type: subprocess.Popen
cls.software_release_root_path = os.path.join(
cls.slap._software_root,
hashlib.md5(cls.getSoftwareURL()).hexdigest(),
def open(self):
# type: () -> None
# start a caucased and server certificate.
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
caucased_path = os.path.join(cls.software_release_root_path, 'bin', 'caucased')
caucase_path = os.path.join(cls.software_release_root_path, 'bin', 'caucase')
cls.frontend_caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(frontend_caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(frontend_caucased_dir, 'server.key.pem'),
'--netloc', frontend_caucased_netloc,
'--service-auto-approve-count', '1',
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
caucased_path = os.path.join(software_release_root_path, 'bin', 'caucased')
self.directory = tempfile.mkdtemp()
caucased_dir = os.path.join(self.directory, 'caucased')
os.mkdir(caucased_dir)
os.mkdir(os.path.join(caucased_dir, 'user'))
os.mkdir(os.path.join(caucased_dir, 'service'))
backend_caucased_netloc = '%s:%s' % (self._cls._ipv4_address, findFreeTCPPort(self._cls._ipv4_address))
self.url = 'http://' + backend_caucased_netloc
self._caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(10):
for _ in range(30):
try:
if requests.get(cls.frontend_caucased_url).status_code == 200:
if requests.get(self.url).status_code == 200:
break
except Exception:
pass
......@@ -118,173 +100,591 @@ class TestFrontendXForwardedFor(ERP5InstanceTestCase):
else:
raise RuntimeError('caucased failed to start.')
cau_args = [
caucase_path,
'--ca-url', cls.frontend_caucased_url,
'--ca-crt', os.path.join(frontend_user_dir, 'service-ca-crt.pem'),
'--crl', os.path.join(frontend_user_dir, 'service.crl'),
'--user-ca-crt', os.path.join(frontend_user_dir, 'user-ca-crt.pem'),
'--user-crl', os.path.join(frontend_user_dir, 'user.crl'),
]
def close(self):
# type: () -> None
self._caucased_process.terminate()
self._caucased_process.wait()
shutil.rmtree(self.directory)
cas_args = [
caucase_path,
'--ca-url', cls.frontend_caucased_url,
'--ca-crt', os.path.join(frontend_service_dir, 'service-ca-crt.pem'),
'--crl', os.path.join(frontend_service_dir, 'service.crl'),
'--user-ca-crt', os.path.join(frontend_service_dir, 'user-ca-crt.pem'),
'--user-crl', os.path.join(frontend_service_dir, 'user.crl'),
@property
def ca_crt_path(self):
# type: () -> str
"""Path of the CA certificate from this caucase.
"""
ca_crt_path = os.path.join(self.directory, 'ca.crt.pem')
if not os.path.exists(ca_crt_path):
with open(ca_crt_path, 'w') as f:
f.write(
requests.get(urlparse.urljoin(
self.url,
'/cas/crt/ca.crt.pem',
)).text)
return ca_crt_path
class BalancerTestCase(ERP5InstanceTestCase):
@classmethod
def getInstanceSoftwareType(cls):
return 'balancer'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
return {
'tcpv4-port': 8000,
'computer-memory-percent-threshold': 100,
# XXX what is this ? should probably not be needed here
'name': cls.__name__,
'monitor-passwd': 'secret',
'apachedex-configuration': '--erp5-base +erp5 .*/VirtualHostRoot/erp5(/|\\?|$) --base +other / --skip-user-agent Zabbix --error-detail --js-embed --quiet',
'apachedex-promise-threshold': 100,
'haproxy-server-check-path': '/',
'zope-family-dict': {
'default': ['dummy_http_server'],
},
'dummy_http_server': [[cls.getManagedResource("backend_web_server", EchoHTTPServer).netloc, 1, False]],
'backend-path-dict': {
'default': '',
},
'ssl-authentication-dict': {},
'ssl': {
'caucase-url': cls.getManagedResource("caucase", CaucaseService).url,
}
}
@classmethod
def getInstanceParameterDict(cls):
# type: () -> Dict
return {'_': json.dumps(cls._getInstanceParameterDict())}
def setUp(self):
self.default_balancer_url = json.loads(
self.computer_partition.getConnectionParameterDict()['_'])['default']
class SlowHTTPServer(ManagedHTTPServer):
"""An HTTP Server which reply after 3 seconds.
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "text/plain")
time.sleep(3)
self.end_headers()
self.wfile.write("OK\n")
log_message = logging.getLogger(__name__ + '.SlowHandler').info
class TestAccessLog(BalancerTestCase, CrontabMixin):
"""Check access logs emitted by balancer
"""
__partition_reference__ = 'l'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestAccessLog, cls)._getInstanceParameterDict()
# use a slow server instead
parameter_dict['dummy_http_server'] = [[cls.getManagedResource("slow_web_server", SlowHTTPServer).netloc, 1, False]]
return parameter_dict
def test_access_log_format(self):
# type: () -> None
requests.get(
urlparse.urljoin(self.default_balancer_url, '/url_path'),
verify=False,
)
with open(os.path.join(self.computer_partition_root_path, 'var', 'log', 'apache-access.log')) as access_log_file:
access_line = access_log_file.read()
self.assertIn('/url_path', access_line)
# last \d is the request time in micro seconds, since this SlowHTTPServer
# sleeps for 3 seconds, it should take between 3 and 4 seconds to process
# the request - but our test machines can be slow sometimes, so we tolerate
# it can take up to 20 seconds.
match = re.match(
r'([(\d\.)]+) - - \[(.*?)\] "(.*?)" (\d+) (\d+) "(.*?)" "(.*?)" (\d+)',
access_line
)
self.assertTrue(match)
assert match
request_time = int(match.groups()[-1])
self.assertGreater(request_time, 3 * 1000 * 1000)
self.assertLess(request_time, 20 * 1000 * 1000)
def test_access_log_apachedex_report(self):
# type: () -> None
# make a request so that we have something in the logs
requests.get(self.default_balancer_url, verify=False)
# crontab for apachedex is executed
self._executeCrontabAtDate('generate-apachedex-report', '23:59')
# it creates a report for the day
apachedex_report, = glob.glob(
os.path.join(
self.computer_partition_root_path,
'srv',
'monitor',
'private',
'apachedex',
'ApacheDex-*.html',
))
with open(apachedex_report, 'r') as f:
report_text = f.read()
self.assertIn('APacheDEX', report_text)
# having this table means that apachedex could parse some lines.
self.assertIn('<h2>Hits per status code</h2>', report_text)
def test_access_log_rotation(self):
# type: () -> None
# run logrotate a first time so that it create state files
self._executeCrontabAtDate('logrotate', '2000-01-01')
# make a request so that we have something in the logs
requests.get(self.default_balancer_url, verify=False).raise_for_status()
# slow query crontab depends on crontab for log rotation
# to be executed first.
self._executeCrontabAtDate('logrotate', '2050-01-01')
# this logrotate leaves the log for the day as non compressed
rotated_log_file = os.path.join(
self.computer_partition_root_path,
'srv',
'backup',
'logrotate',
'apache-access.log-20500101',
)
self.assertTrue(os.path.exists(rotated_log_file))
requests.get(self.default_balancer_url, verify=False).raise_for_status()
# on next day execution of logrotate, log files are compressed
self._executeCrontabAtDate('logrotate', '2050-01-02')
self.assertTrue(os.path.exists(rotated_log_file + '.xz'))
self.assertFalse(os.path.exists(rotated_log_file))
class BalancerCookieHTTPServer(ManagedHTTPServer):
"""An HTTP Server which can set balancer cookie.
This server set cookie when requested /set-cookie path.
The reply body is the name used when registering this resource
using getManagedResource. This way we can assert which
backend replied.
"""
@property
def RequestHandler(self):
server = self
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
self.send_header("Content-Type", "text/plain")
if self.path == '/set_cookie':
# the balancer tells the backend what's the name of the balancer cookie with
# the X-Balancer-Current-Cookie header.
self.send_header('Set-Cookie', '%s=anything' % self.headers['X-Balancer-Current-Cookie'])
# The name of this cookie is SERVERID
assert self.headers['X-Balancer-Current-Cookie'] == 'SERVERID'
self.end_headers()
self.wfile.write(server._name)
log_message = logging.getLogger(__name__ + '.BalancerCookieHTTPServer').info
return RequestHandler
class TestBalancer(BalancerTestCase):
"""Check balancing capabilities
"""
__partition_reference__ = 'b'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestBalancer, cls)._getInstanceParameterDict()
# use two backend servers
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("backend_web_server1", BalancerCookieHTTPServer).netloc, 1, False],
[cls.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).netloc, 1, False],
]
return parameter_dict
caucase_process = subprocess.Popen(
cau_args + [
'--mode', 'user',
'--send-csr', frontend_user_csr,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
def test_balancer_round_robin(self):
# requests are by default balanced to both servers
self.assertEqual(
{requests.get(self.default_balancer_url, verify=False).text for _ in range(10)},
{'backend_web_server1', 'backend_web_server2'}
)
result = caucase_process.communicate()
csr_id = result[0].split()[0]
subprocess.check_call(
cau_args + [
'--mode', 'user',
'--get-crt', csr_id, frontend_user_key,
],
def test_balancer_server_down(self):
# if one backend is down, it is excluded from balancer
self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close()
self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open)
self.assertEqual(
{requests.get(self.default_balancer_url, verify=False).text for _ in range(10)},
{'backend_web_server1',}
)
def test_balancer_set_cookie(self):
# if backend provides a "SERVERID" cookie, balancer will overwrite it with the
# backend selected by balancing algorithm
self.assertIn(
requests.get(urlparse.urljoin(self.default_balancer_url, '/set_cookie'), verify=False).cookies['SERVERID'],
('default-0', 'default-1'),
)
def test_balancer_respects_sticky_cookie(self):
# if request is made with the sticky cookie, the client stick on one balancer
cookies = dict(SERVERID='default-1')
self.assertEqual(
{requests.get(self.default_balancer_url, verify=False, cookies=cookies).text for _ in range(10)},
{'backend_web_server2',}
)
# if that backend becomes down, requests are balanced to another server
self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).close()
self.addCleanup(self.getManagedResource("backend_web_server2", BalancerCookieHTTPServer).open)
self.assertEqual(
requests.get(self.default_balancer_url, verify=False, cookies=cookies).text,
'backend_web_server1')
class TestHTTP(BalancerTestCase):
"""Check HTTP protocol
"""
__partition_reference__ = 'h'
def test_http_version(self):
# type: () -> None
# https://stackoverflow.com/questions/37012486/python-3-x-how-to-get-http-version-using-requests-library/37012810
self.assertEqual(
requests.get(self.default_balancer_url, verify=False).raw.version, 11)
def test_keep_alive(self):
# type: () -> None
# when doing two requests, connection is established only once
session = requests.Session()
session.verify = False
# do a first request, which establish a first connection
session.get(self.default_balancer_url).raise_for_status()
# "break" new connection method and check we can make another request
with mock.patch(
"requests.packages.urllib3.connectionpool.HTTPSConnectionPool._new_conn",
) as new_conn:
session.get(self.default_balancer_url).raise_for_status()
new_conn.assert_not_called()
parsed_url = urlparse.urlparse(self.default_balancer_url)
# check that we have an open file for the ip connection
self.assertTrue([
c for c in psutil.Process(os.getpid()).connections()
if c.status == 'ESTABLISHED' and c.raddr.ip == parsed_url.hostname
and c.raddr.port == parsed_url.port
])
class TestTLS(BalancerTestCase):
"""Check TLS
"""
__partition_reference__ = 's'
def _getServerCertificate(self, hostname, port):
# type: (Optional[str], Optional[int]) -> Any
hostname_idna = idna.encode(hostname)
sock = socket.socket()
sock.connect((hostname, port))
ctx = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
ctx.check_hostname = False
ctx.verify_mode = OpenSSL.SSL.VERIFY_NONE
sock_ssl = OpenSSL.SSL.Connection(ctx, sock)
sock_ssl.set_connect_state()
sock_ssl.set_tlsext_host_name(hostname_idna)
sock_ssl.do_handshake()
cert = sock_ssl.get_peer_certificate()
crypto_cert = cert.to_cryptography()
sock_ssl.close()
sock.close()
return crypto_cert
def test_certificate_validates_with_caucase_ca(self):
# type: () -> None
caucase = self.getManagedResource("caucase", CaucaseService)
requests.get(self.default_balancer_url, verify=caucase.ca_crt_path)
def test_certificate_renewal(self):
# type: () -> None
caucase = self.getManagedResource("caucase", CaucaseService)
balancer_parsed_url = urlparse.urlparse(self.default_balancer_url)
certificate_before_renewal = self._getServerCertificate(
balancer_parsed_url.hostname,
balancer_parsed_url.port)
# run caucase updater 90 days in the future, so that certificate is
# renewed.
caucase_updater = os.path.join(
self.computer_partition_root_path,
'etc',
'service',
'caucase-updater',
)
process = pexpect.spawnu(
"faketime +90days %s" % caucase_updater,
env=dict(os.environ, PYTHONPATH=''),
)
logger = self.logger
class DebugLogFile:
def write(self, msg):
logger.info("output from caucase_updater: %s", msg)
def flush(self):
pass
process.logfile = DebugLogFile()
process.expect(u"Renewing .*\nNext wake-up.*")
process.terminate()
process.wait()
cls.client_certificate = frontend_service_key = os.path.join(frontend_service_dir, 'crt.pem')
frontend_service_csr = os.path.join(frontend_service_dir, 'csr.pem')
# wait for server to use new certificate
for _ in range(30):
certificate_after_renewal = self._getServerCertificate(
balancer_parsed_url.hostname,
balancer_parsed_url.port)
if certificate_after_renewal.not_valid_before > certificate_before_renewal.not_valid_before:
break
time.sleep(.5)
self.assertGreater(
certificate_after_renewal.not_valid_before,
certificate_before_renewal.not_valid_before,
)
# requests are served properly after cert renewal
requests.get(self.default_balancer_url, verify=caucase.ca_crt_path).raise_for_status()
class ContentTypeHTTPServer(ManagedHTTPServer):
"""An HTTP Server which reply with content type from path.
For example when requested http://host/text/plain it will reply
with Content-Type: text/plain header.
The body is always "OK"
"""
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
# type: () -> None
self.send_response(200)
if self.path == '/':
return self.end_headers()
content_type = self.path[1:]
self.send_header("Content-Type", content_type)
self.end_headers()
self.wfile.write("OK")
log_message = logging.getLogger(__name__ + '.ContentTypeHTTPServer').info
class TestContentEncoding(BalancerTestCase):
"""Test how responses are gzip encoded or not depending on content type header.
"""
__partition_reference__ = 'ce'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
parameter_dict = super(TestContentEncoding, cls)._getInstanceParameterDict()
parameter_dict['dummy_http_server'] = [
[cls.getManagedResource("content_type_server", ContentTypeHTTPServer).netloc, 1, False],
]
return parameter_dict
def test_gzip_encoding(self):
# type: () -> None
for content_type in (
'text/cache-manifest',
'text/html',
'text/plain',
'text/css',
'application/hal+json',
'application/json',
'application/x-javascript',
'text/xml',
'application/xml',
'application/rss+xml',
'text/javascript',
'application/javascript',
'image/svg+xml',
'application/x-font-ttf',
'application/font-woff',
'application/font-woff2',
'application/x-font-opentype',
'application/wasm',):
resp = requests.get(urlparse.urljoin(self.default_balancer_url, content_type), verify=False)
self.assertEqual(resp.headers['Content-Type'], content_type)
self.assertEqual(
resp.headers['Content-Encoding'],
'gzip',
'%s uses wrong encoding: %s' % (content_type, resp.headers['Content-Encoding']))
self.assertEqual(resp.text, 'OK')
def test_no_gzip_encoding(self):
# type: () -> None
resp = requests.get(urlparse.urljoin(self.default_balancer_url, '/image/png'), verify=False)
self.assertNotIn('Content-Encoding', resp.headers)
self.assertEqual(resp.text, 'OK')
class CaucaseClientCertificate(ManagedResource):
"""A client certificate issued by a caucase services.
"""
ca_crt_file = None # type: str
crl_file = None # type: str
csr_file = None # type: str
cert_file = None # type: str
key_file = None # type: str
def open(self):
# type: () -> None
self.tmpdir = tempfile.mkdtemp()
self.ca_crt_file = os.path.join(self.tmpdir, 'ca-crt.pem')
self.crl_file = os.path.join(self.tmpdir, 'ca-crl.pem')
self.csr_file = os.path.join(self.tmpdir, 'csr.pem')
self.cert_file = os.path.join(self.tmpdir, 'crt.pem')
self.key_file = os.path.join(self.tmpdir, 'key.pem')
def close(self):
# type: () -> None
shutil.rmtree(self.tmpdir)
@property
def _caucase_path(self):
# type: () -> str
"""path of caucase executable.
"""
software_release_root_path = os.path.join(
self._cls.slap._software_root,
hashlib.md5(self._cls.getSoftwareURL().encode()).hexdigest(),
)
return os.path.join(software_release_root_path, 'bin', 'caucase')
def request(self, common_name, caucase):
# type: (str, CaucaseService) -> None
"""Generate certificate and request signature to the caucase service.
This overwrite any previously requested certificate for this instance.
"""
cas_args = [
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
]
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
with open(frontend_service_key, 'wb') as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'service'),
])).sign(key, hashes.SHA256(), default_backend())
with open(frontend_service_csr, 'wb') as f:
with open(self.key_file, 'wb') as f:
f.write(
key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
))
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(
NameOID.COMMON_NAME,
common_name,
),
])).sign(
key,
hashes.SHA256(),
default_backend(),
)
with open(self.csr_file, 'wb') as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
caucase_process = subprocess.Popen(
csr_id = subprocess.check_output(
cas_args + [
'--send-csr', frontend_service_csr,
'--send-csr', self.csr_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
result = caucase_process.communicate()
csr_id = result[0].split()[0]
).split()[0]
assert csr_id
for _ in range(10):
for _ in range(30):
if not subprocess.call(
cas_args + [
'--get-crt', csr_id, frontend_service_key,
'--get-crt', csr_id, self.cert_file,
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) == 0:
break
else:
time.sleep(1)
else:
raise RuntimeError('getting service certificate failed.')
with open(self.cert_file) as f:
assert 'BEGIN CERTIFICATE' in f.read()
# start a caucased and server certificate.
cls.backend_caucase_dir = tempfile.mkdtemp()
backend_caucased_dir = os.path.join(cls.backend_caucase_dir, 'caucased')
os.mkdir(backend_caucased_dir)
backend_caucased_netloc = '%s:%s' % (cls._ipv4_address, findFreeTCPPort(cls._ipv4_address))
cls.backend_caucased_url = 'http://' + backend_caucased_netloc
cls.backend_caucased_process = subprocess.Popen(
[
caucased_path,
'--db', os.path.join(backend_caucased_dir, 'caucase.sqlite'),
'--server-key', os.path.join(backend_caucased_dir, 'server.key.pem'),
'--netloc', backend_caucased_netloc,
'--service-auto-approve-count', '1',
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
for _ in range(10):
try:
if requests.get(cls.backend_caucased_url).status_code == 200:
break
except Exception:
pass
time.sleep(1)
else:
raise RuntimeError('caucased failed to start.')
def revoke(self, caucase):
# type: (str, CaucaseService) -> None
"""Revoke the client certificate on this caucase instance.
"""
subprocess.check_call([
self._caucase_path,
'--ca-url', caucase.url,
'--ca-crt', self.ca_crt_file,
'--crl', self.crl_file,
'--revoke-crt', self.cert_file, self.key_file,
])
super(TestFrontendXForwardedFor, cls).setUpClass()
@classmethod
def getInstanceParameterDict(cls):
return {
'_': json.dumps({
'tcpv4-port': 3306,
'computer-memory-percent-threshold': 100,
# XXX what is this ? should probably not be needed here
'name': cls.__name__,
'monitor-passwd': 'secret',
'apachedex-configuration': '',
'apachedex-promise-threshold': 100,
'haproxy-server-check-path': '/',
'zope-family-dict': {
'default': ['dummy_http_server'],
'default-auth': ['dummy_http_server'],
},
'dummy_http_server': [[cls.http_server_netloc, 1, False]],
'backend-path-dict': {
'default': '/',
'default-auth': '/',
},
'ssl-authentication-dict': {
'default': False,
'default-auth': True,
},
'ssl': {
'caucase-url': cls.backend_caucased_url,
'frontend-caucase-url-list': [cls.frontend_caucased_url],
},
})
}
class TestFrontendXForwardedFor(BalancerTestCase):
__partition_reference__ = 'xff'
@classmethod
def _cleanup(cls, snapshot_name):
if cls.http_server_process:
cls.http_server_process.terminate()
if cls.frontend_caucased_process:
cls.frontend_caucased_process.terminate()
if cls.frontend_caucase_dir:
shutil.rmtree(cls.frontend_caucase_dir)
if cls.backend_caucased_process:
cls.backend_caucased_process.terminate()
if cls.backend_caucase_dir:
shutil.rmtree(cls.backend_caucase_dir)
super(TestFrontendXForwardedFor, cls)._cleanup(snapshot_name)
def _getInstanceParameterDict(cls):
# type: () -> Dict
frontend_caucase = cls.getManagedResource('frontend_caucase', CaucaseService)
certificate = cls.getManagedResource('client_certificate', CaucaseClientCertificate)
certificate.request(u'shared frontend', frontend_caucase)
parameter_dict = super(TestFrontendXForwardedFor, cls)._getInstanceParameterDict()
# add another "-auth" backend, that will have ssl-authentication enabled
parameter_dict['zope-family-dict']['default-auth'] = ['dummy_http_server']
parameter_dict['backend-path-dict']['default-auth'] = '/'
parameter_dict['ssl-authentication-dict'] = {
'default': False,
'default-auth': True,
}
parameter_dict['ssl']['frontend-caucase-url-list'] = [frontend_caucase.url]
return parameter_dict
def test_x_forwarded_for_added_when_verified_connection(self):
# type: () -> None
client_certificate = self.getManagedResource('client_certificate', CaucaseClientCertificate)
for backend in ('default', 'default-auth'):
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])[backend]
result = requests.get(
balancer_url,
headers={'X-Forwarded-For': '1.2.3.4'},
cert=self.client_certificate,
cert=(client_certificate.cert_file, client_certificate.key_file),
verify=False,
).json()
self.assertEqual(result['Incoming Headers'].get('x-forwarded-for').split(', ')[0], '1.2.3.4')
def test_x_forwarded_for_stripped_when_not_verified_connection(self):
# type: () -> None
balancer_url = json.loads(self.computer_partition.getConnectionParameterDict()['_'])['default']
result = requests.get(
balancer_url,
......@@ -299,3 +699,103 @@ class TestFrontendXForwardedFor(ERP5InstanceTestCase):
headers={'X-Forwarded-For': '1.2.3.4'},
verify=False,
)
class TestClientTLS(BalancerTestCase):
__partition_reference__ = 'c'
@classmethod
def _getInstanceParameterDict(cls):
# type: () -> Dict
frontend_caucase1 = cls.getManagedResource('frontend_caucase1', CaucaseService)
certificate1 = cls.getManagedResource('client_certificate1', CaucaseClientCertificate)
certificate1.request(u'client_certificate1', frontend_caucase1)
frontend_caucase2 = cls.getManagedResource('frontend_caucase2', CaucaseService)
certificate2 = cls.getManagedResource('client_certificate2', CaucaseClientCertificate)
certificate2.request(u'client_certificate2', frontend_caucase2)
parameter_dict = super(TestClientTLS, cls)._getInstanceParameterDict()
parameter_dict['ssl-authentication-dict'] = {
'default': True,
}
parameter_dict['ssl']['frontend-caucase-url-list'] = [
frontend_caucase1.url,
frontend_caucase2.url,
]
return parameter_dict
def test_refresh_crl(self):
# type: () -> None
logger = self.logger
class DebugLogFile:
def write(self, msg):
logger.info("output from caucase_updater: %s", msg)
def flush(self):
pass
for client_certificate_name, caucase_name in (
('client_certificate1', 'frontend_caucase1'),
('client_certificate2', 'frontend_caucase2'),
):
client_certificate = self.getManagedResource(client_certificate_name,
CaucaseClientCertificate)
# when client certificate can be authenticated, backend receive the CN of
# the client certificate in "remote-user" header
def _make_request():
return requests.get(
self.default_balancer_url,
cert=(client_certificate.cert_file, client_certificate.key_file),
verify=False,
).json()
self.assertEqual(_make_request()['Incoming Headers'].get('remote-user'),
client_certificate_name)
# when certificate is revoked, updater service should update the CRL
# used by balancer from the caucase service used for client certificates
# (ie. the one used by frontend).
caucase = self.getManagedResource(caucase_name, CaucaseService)
client_certificate.revoke(caucase)
# until the CRL is updated, the client certificate is still accepted.
self.assertEqual(_make_request()['Incoming Headers'].get('remote-user'),
client_certificate_name)
# We have two services, in charge of updating CRL and CA certificates for
# each frontend CA
caucase_updater_list = glob.glob(
os.path.join(
self.computer_partition_root_path,
'etc',
'service',
'caucase-updater-*',
))
self.assertEqual(len(caucase_updater_list), 2)
# find the one corresponding to this caucase
for caucase_updater_candidate in caucase_updater_list:
with open(caucase_updater_candidate) as f:
if caucase.url in f.read():
caucase_updater = caucase_updater_candidate
break
else:
self.fail("Could not find caucase updater script for %s" % caucase.url)
# simulate running updater service in the future, to confirm that it fetches
# the new CRL and make sure balancer uses that new CRL.
process = pexpect.spawnu(
"faketime +1day %s" % caucase_updater,
env=dict(os.environ, PYTHONPATH=''),
)
process.logfile = DebugLogFile()
process.expect(u"Got new CRL.*Next wake-up at.*")
process.terminate()
process.wait()
with self.assertRaisesRegexp(Exception, 'certificate revoked'):
_make_request()
......@@ -31,6 +31,7 @@ import glob
import urlparse
import socket
import time
import tempfile
import psutil
import requests
......@@ -43,7 +44,7 @@ setUpModule # pyflakes
class TestPublishedURLIsReachableMixin(object):
"""Mixin that checks that default page of ERP5 is reachable.
"""
def _checkERP5IsReachable(self, url):
def _checkERP5IsReachable(self, url, verify):
# What happens is that instanciation just create the services, but does not
# wait for ERP5 to be initialized. When this test run ERP5 instance is
# instanciated, but zope is still busy creating the site and haproxy replies
......@@ -51,7 +52,7 @@ class TestPublishedURLIsReachableMixin(object):
# erp5 site is not created, with 500 when mysql is not yet reachable, so we
# retry in a loop until we get a succesful response.
for i in range(1, 60):
r = requests.get(url, verify=False) # XXX can we get CA from caucase already ?
r = requests.get(url, verify=verify)
if r.status_code != requests.codes.ok:
delay = i * 2
self.logger.warn("ERP5 was not available, sleeping for %ds and retrying", delay)
......@@ -62,19 +63,36 @@ class TestPublishedURLIsReachableMixin(object):
self.assertIn("ERP5", r.text)
def _getCaucaseServiceCACertificate(self):
ca_cert = tempfile.NamedTemporaryFile(
prefix="ca.crt.pem",
mode="w",
delete=False,
)
ca_cert.write(
requests.get(
urlparse.urljoin(
self.getRootPartitionConnectionParameterDict()['caucase-http-url'],
'/cas/crt/ca.crt.pem',
)).text)
self.addCleanup(os.unlink, ca_cert.name)
return ca_cert.name
def test_published_family_default_v6_is_reachable(self):
"""Tests the IPv6 URL published by the root partition is reachable.
"""
param_dict = self.getRootPartitionConnectionParameterDict()
self._checkERP5IsReachable(
urlparse.urljoin(param_dict['family-default-v6'], param_dict['site-id']))
urlparse.urljoin(param_dict['family-default-v6'], param_dict['site-id']),
self._getCaucaseServiceCACertificate())
def test_published_family_default_v4_is_reachable(self):
"""Tests the IPv4 URL published by the root partition is reachable.
"""
param_dict = self.getRootPartitionConnectionParameterDict()
self._checkERP5IsReachable(
urlparse.urljoin(param_dict['family-default'], param_dict['site-id']))
urlparse.urljoin(param_dict['family-default'], param_dict['site-id']),
self._getCaucaseServiceCACertificate())
class TestDefaultParameters(ERP5InstanceTestCase, TestPublishedURLIsReachableMixin):
......
......@@ -34,8 +34,10 @@ import unittest
import urlparse
import base64
import hashlib
import logging
import contextlib
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from BaseHTTPServer import BaseHTTPRequestHandler
from io import BytesIO
import paramiko
......@@ -48,81 +50,69 @@ from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from slapos.testing.testcase import makeModuleSetUpAndTestCaseClass
from slapos.testing.utils import findFreeTCPPort, ImageComparisonTestCase
from slapos.testing.utils import findFreeTCPPort, ImageComparisonTestCase, ManagedHTTPServer
setUpModule, SeleniumServerTestCase = makeModuleSetUpAndTestCaseClass(
os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', 'software.cfg')))
class WebServer(ManagedHTTPServer):
class RequestHandler(BaseHTTPRequestHandler):
"""Request handler for our test server.
The implemented server is:
- submit q and you'll get a page with q as title
- upload a file and the file content will be displayed in div.uploadedfile
"""
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(
'''
<html>
<title>Test page</title>
<body>
<style> p { font-family: Arial; } </style>
<form action="/" method="POST" enctype="multipart/form-data">
<input name="q" type="text"></input>
<input name="f" type="file" ></input>
<input type="submit" value="I'm feeling lucky"></input>
</form>
<p>the quick brown fox jumps over the lazy dog</p>
</body>
</html>''')
def do_POST(self):
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
})
self.send_response(200)
self.end_headers()
file_data = 'no file'
if form.has_key('f'):
file_data = form['f'].file.read()
self.wfile.write(
'''
<html>
<title>%s</title>
<div>%s</div>
</html>
''' % (form['q'].value, file_data))
log_message = logging.getLogger(__name__ + '.WebServer').info
class WebServerMixin(object):
"""Mixin class which provides a simple web server reachable at self.server_url
"""
def setUp(self):
"""Start a minimal web server.
"""
class TestHandler(BaseHTTPRequestHandler):
"""Request handler for our test server.
The implemented server is:
- submit q and you'll get a page with q as title
- upload a file and the file content will be displayed in div.uploadedfile
"""
def log_message(self, *args, **kw):
if SeleniumServerTestCase._debug:
BaseHTTPRequestHandler.log_message(self, *args, **kw)
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(
'''
<html>
<title>Test page</title>
<body>
<style> p { font-family: Arial; } </style>
<form action="/" method="POST" enctype="multipart/form-data">
<input name="q" type="text"></input>
<input name="f" type="file" ></input>
<input type="submit" value="I'm feeling lucky"></input>
</form>
<p>the quick brown fox jumps over the lazy dog</p>
</body>
</html>''')
def do_POST(self):
form = cgi.FieldStorage(
fp=self.rfile,
headers=self.headers,
environ={
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': self.headers['Content-Type'],
})
self.send_response(200)
self.end_headers()
file_data = 'no file'
if form.has_key('f'):
file_data = form['f'].file.read()
self.wfile.write(
'''
<html>
<title>%s</title>
<div>%s</div>
</html>
''' % (form['q'].value, file_data))
super(WebServerMixin, self).setUp()
ip = os.environ.get('SLAPOS_TEST_IPV4', '127.0.1.1')
port = findFreeTCPPort(ip)
server = HTTPServer((ip, port), TestHandler)
self.server_process = multiprocessing.Process(target=server.serve_forever)
self.server_process.start()
self.server_url = 'http://%s:%s/' % (ip, port)
def tearDown(self):
self.server_process.terminate()
self.server_process.join()
super(WebServerMixin, self).tearDown()
self.server_url = self.getManagedResource('web_server', WebServer).url
class BrowserCompatibilityMixin(WebServerMixin):
......
......@@ -316,3 +316,4 @@ funcsigs = 1.0.2
mysqlclient = 1.3.12
pexpect = 4.8.0
ptyprocess = 0.6.0
typing = 3.7.4.3
......@@ -90,7 +90,7 @@ md5sum = 2f3ddd328ac1c375e483ecb2ef5ffb57
[template-balancer]
filename = instance-balancer.cfg.in
md5sum = bb9a953ce22f7d5188385f0171b6198e
md5sum = ecf119142e6b5cd85a2ba397552d2142
[template-haproxy-cfg]
filename = haproxy.cfg.in
......
......@@ -18,19 +18,52 @@ per partition. No more (undefined result), no less (IndexError).
recipe = slapos.recipe.template:jinja2
mode = 644
[balancer-csr-request-config]
< = jinja2-template-base
template = inline:
[req]
prompt = no
req_extensions = req_ext
distinguished_name = dn
[ dn ]
CN = example.com
[ req_ext ]
subjectAltName = @alt_names
[ alt_names ]
IP.1 = {{ ipv4 }}
{% if ipv6_set -%}
IP.2 = {{ ipv6 }}
{% endif %}
rendered = ${buildout:parts-directory}/${:_buildout_section_name_}/${:_buildout_section_name_}.txt
[balancer-csr-request]
recipe = plone.recipe.command
command = {{ parameter_dict["openssl"] }}/bin/openssl req \
-newkey rsa:2048 \
-batch \
-new \
-nodes \
-keyout '${apache-conf-ssl:key}' \
-config '${balancer-csr-request-config:rendered}' \
-out '${:csr}'
stop-on-error = true
csr = ${directory:etc}/${:_buildout_section_name_}.csr.pem
{{ caucase.updater(
prefix='caucase-updater',
buildout_bin_directory=parameter_dict['bin-directory'],
updater_path='${directory:services-on-watch}/caucase-updater',
url=ssl_parameter_dict['caucase-url'],
data_dir='${directory:srv}/caucase-updater',
crt_path='${apache-conf-ssl:caucase-cert}',
crt_path='${apache-conf-ssl:cert}',
ca_path='${directory:srv}/caucase-updater/ca.crt',
crl_path='${directory:srv}/caucase-updater/crl.pem',
key_path='${apache-conf-ssl:caucase-key}',
key_path='${apache-conf-ssl:key}',
on_renew='${apache-graceful:output}',
max_sleep=ssl_parameter_dict.get('max-crl-update-delay', 1.0),
template_csr_pem=ssl_parameter_dict.get('csr'),
template_csr=None if ssl_parameter_dict.get('csr') else '${balancer-csr-request:csr}',
openssl=parameter_dict['openssl'] ~ '/bin/openssl',
)}}
{% do section('caucase-updater') -%}
......@@ -176,9 +209,6 @@ hash-files = ${haproxy-cfg:rendered}
[apache-conf-ssl]
cert = ${directory:apache-conf}/apache.crt
key = ${directory:apache-conf}/apache.pem
# XXX caucase certificate is not supported by caddy for now
caucase-cert = ${directory:apache-conf}/apache-caucase.crt
caucase-key = ${directory:apache-conf}/apache-caucase.pem
{% if frontend_caucase_url_list -%}
depends = ${caucase-updater-housekeeper-run:recipe}
ca-cert-dir = ${directory:apache-ca-cert-dir}
......@@ -201,19 +231,6 @@ context = key content {{content_section_name}}:content
mode = {{ mode }}
{%- endmacro %}
[apache-ssl]
{% if ssl_parameter_dict.get('key') -%}
key = ${apache-ssl-key:rendered}
cert = ${apache-ssl-cert:rendered}
{{ simplefile('apache-ssl-key', '${apache-conf-ssl:key}', ssl_parameter_dict['key']) }}
{{ simplefile('apache-ssl-cert', '${apache-conf-ssl:cert}', ssl_parameter_dict['cert']) }}
{% else %}
recipe = plone.recipe.command
command = "{{ parameter_dict['openssl'] }}/bin/openssl" req -newkey rsa -batch -new -x509 -days 3650 -nodes -keyout "${:key}" -out "${:cert}"
key = ${apache-conf-ssl:key}
cert = ${apache-conf-ssl:cert}
{%- endif %}
[apache-conf-parameter-dict]
backend-list = {{ dumps(apache_dict.values()) }}
zope-virtualhost-monster-backend-dict = {{ dumps(zope_virtualhost_monster_backend_dict) }}
......@@ -225,8 +242,8 @@ access-log = ${directory:log}/apache-access.log
# Apache 2.4's default value (60 seconds) can be a bit too short
timeout = 300
# Basic SSL server configuration
cert = ${apache-ssl:cert}
key = ${apache-ssl:key}
cert = ${apache-conf-ssl:cert}
key = ${apache-conf-ssl:key}
cipher =
ssl-session-cache = ${directory:log}/apache-ssl-session-cache
{% if frontend_caucase_url_list -%}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment