Commit 2636acbb authored by Łukasz Nowak's avatar Łukasz Nowak

promise/plugin: check_url_available implement verify

Also correctly cover ca-cert-file
parent 2640aeba
......@@ -66,6 +66,7 @@ setup(name=name,
},
tests_require = [
'mock',
'cryptography',
],
zip_safe=False, # proxy depends on Flask, which has issues with
# accessing templates
......
......@@ -25,9 +25,12 @@ class RunPromise(GenericPromise):
ca_cert_file = self.getConfig('ca-cert-file')
cert_file = self.getConfig('cert-file')
key_file = self.getConfig('key-file')
verify = int(self.getConfig('verify', 0))
if ca_cert_file:
verify = ca_cert_file
elif verify:
verify = True
else:
verify = False
......@@ -39,6 +42,14 @@ class RunPromise(GenericPromise):
try:
result = requests.get(
url, verify=verify, allow_redirects=True, timeout=timeout, cert=cert)
except requests.exceptions.SSLError as e:
if 'certificate verify failed' in str(e.message):
self.logger.error(
"ERROR SSL verify failed while accessing %r" % (url,))
else:
self.logger.error(
"ERROR Unknown SSL error %r while accessing %r" % (e, url))
return
except requests.ConnectionError as e:
self.logger.error(
"ERROR connection not possible while accessing %r" % (url, ))
......
......@@ -28,15 +28,103 @@
from slapos.grid.promise import PromiseError
from slapos.test.promise.plugin import TestPromisePluginMixin
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509.oid import NameOID
import BaseHTTPServer
import datetime
import ipaddress
import json
import multiprocessing
import ssl
import tempfile
import time
import unittest
SLAPOS_TEST_IPV4 = '127.0.0.1'
SLAPOS_TEST_IPV4_PORT = 57965
HTTPS_ENDPOINT = "http://%s:%s/" % (SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV4_PORT)
HTTPS_ENDPOINT = "https://%s:%s/" % (SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV4_PORT)
def createKey():
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend())
key_pem = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
return key, key_pem
def createCSR(common_name, ip=None):
key, key_pem = createKey()
subject_alternative_name_list = []
if ip is not None:
subject_alternative_name_list.append(
x509.IPAddress(ipaddress.ip_address(unicode(ip)))
)
csr = x509.CertificateSigningRequestBuilder().subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, unicode(common_name)),
]))
if len(subject_alternative_name_list):
csr = csr.add_extension(
x509.SubjectAlternativeName(subject_alternative_name_list),
critical=False
)
csr = csr.sign(key, hashes.SHA256(), default_backend())
csr_pem = csr.public_bytes(serialization.Encoding.PEM)
return key, key_pem, csr, csr_pem
class CertificateAuthority(object):
def __init__(self, common_name):
self.key, self.key_pem = createKey()
public_key = self.key.public_key()
builder = x509.CertificateBuilder()
builder = builder.subject_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, unicode(common_name)),
]))
builder = builder.issuer_name(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, unicode(common_name)),
]))
builder = builder.not_valid_before(
datetime.datetime.utcnow() - datetime.timedelta(days=2))
builder = builder.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=30))
builder = builder.serial_number(x509.random_serial_number())
builder = builder.public_key(public_key)
builder = builder.add_extension(
x509.BasicConstraints(ca=True, path_length=None), critical=True,
)
self.certificate = builder.sign(
private_key=self.key, algorithm=hashes.SHA256(),
backend=default_backend()
)
self.certificate_pem = self.certificate.public_bytes(
serialization.Encoding.PEM)
def signCSR(self, csr):
builder = x509.CertificateBuilder(
subject_name=csr.subject,
extensions=csr.extensions,
issuer_name=self.certificate.subject,
not_valid_before=datetime.datetime.utcnow() - datetime.timedelta(days=1),
not_valid_after=datetime.datetime.utcnow() + datetime.timedelta(days=30),
serial_number=x509.random_serial_number(),
public_key=csr.public_key(),
)
certificate = builder.sign(
private_key=self.key,
algorithm=hashes.SHA256(),
backend=default_backend()
)
return certificate, certificate.public_bytes(serialization.Encoding.PEM)
class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
......@@ -64,10 +152,37 @@ class TestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
class CheckUrlAvailableMixin(TestPromisePluginMixin):
@classmethod
def setUpClass(cls):
cls.another_server_ca = CertificateAuthority("Another Server Root CA")
cls.test_server_ca = CertificateAuthority("Test Server Root CA")
key, key_pem, csr, csr_pem = createCSR(
"testserver.example.com", SLAPOS_TEST_IPV4)
_, cls.test_server_certificate_pem = cls.test_server_ca.signCSR(csr)
cls.test_server_certificate_file = tempfile.NamedTemporaryFile(
delete=False
)
cls.test_server_certificate_file.write(
cls.test_server_certificate_pem + key_pem
)
cls.test_server_certificate_file.close()
cls.test_server_ca_certificate_file = tempfile.NamedTemporaryFile(
delete=False
)
cls.test_server_ca_certificate_file.write(
cls.test_server_ca.certificate_pem)
cls.test_server_ca_certificate_file.close()
server = BaseHTTPServer.HTTPServer(
(SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV4_PORT),
TestHandler)
server.socket = ssl.wrap_socket(
server.socket,
certfile=cls.test_server_certificate_file.name,
server_side=True)
cls.server_process = multiprocessing.Process(
target=server.serve_forever)
cls.server_process.start()
......@@ -89,6 +204,28 @@ extra_config_dict = {
'check-secure': %(check_secure)s,
'ignore-code': %(ignore_code)s,
}
"""
self.base_content_verify = """from slapos.promise.plugin.check_url_available import RunPromise
extra_config_dict = {
'url': '%(url)s',
'timeout': %(timeout)s,
'check-secure': %(check_secure)s,
'ignore-code': %(ignore_code)s,
'verify': %(verify)s,
}
"""
self.base_content_ca_cert = """from slapos.promise.plugin.check_url_available import RunPromise
extra_config_dict = {
'url': '%(url)s',
'timeout': %(timeout)s,
'check-secure': %(check_secure)s,
'ignore-code': %(ignore_code)s,
'ca-cert-file': %(ca_cert_file)r,
}
"""
self.base_content_http_code = """from slapos.promise.plugin.check_url_available import RunPromise
......@@ -181,6 +318,75 @@ class TestCheckUrlAvailable(CheckUrlAvailableMixin):
"%r is available" % (url,)
)
def test_check_200_verify(self):
url = HTTPS_ENDPOINT + '200'
content = self.base_content_verify % {
'url': url,
'timeout': 10,
'check_secure': 0,
'ignore_code': 0,
'verify': 1,
}
try:
old = os.environ.get('REQUESTS_CA_BUNDLE')
# simulate system provided CA bundle
os.environ[
'REQUESTS_CA_BUNDLE'] = self.test_server_ca_certificate_file.name
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.launcher.run()
finally:
if old is None:
del os.environ['REQUESTS_CA_BUNDLE']
else:
os.environ['REQUESTS_CA_BUNDLE'] = old
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(
result['result']['message'],
"%r is available" % (url,)
)
def test_check_200_verify_fail(self):
url = HTTPS_ENDPOINT + '200'
content = self.base_content_verify % {
'url': url,
'timeout': 10,
'check_secure': 0,
'ignore_code': 0,
'verify': 1,
}
self.writePromise(self.promise_name, content)
self.configureLauncher()
with self.assertRaises(PromiseError):
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], True)
self.assertEqual(
result['result']['message'],
"ERROR SSL verify failed while accessing %r" % (url,)
)
def test_check_200_verify_own(self):
url = HTTPS_ENDPOINT + '200'
content = self.base_content_ca_cert % {
'url': url,
'timeout': 10,
'check_secure': 0,
'ignore_code': 0,
'ca_cert_file': self.test_server_ca_certificate_file.name
}
self.writePromise(self.promise_name, content)
self.configureLauncher()
self.launcher.run()
result = self.getPromiseResult(self.promise_name)
self.assertEqual(result['result']['failed'], False)
self.assertEqual(
result['result']['message'],
"%r is available" % (url,)
)
def test_check_401(self):
url = HTTPS_ENDPOINT + '401'
content = self.base_content % {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment