Commit 17325dc0 authored by Vincent Pelletier's avatar Vincent Pelletier Committed by Vincent Pelletier

all: Make caucased https certificate independent from CAS.

This is a step in the direction of being browser-friendly: if caucased
https certificate is issued by CAS CA, then for a browser to trust that
certificate it would have to trust all certificates emitted by CAS CA
certificate. This would be very dangerous, as CAS CA does not constrain
the certificates it may sign, so it exposes users of that caucased to
rogue certificates.
Alone, this step is insufficient, as the new internal "http_cas" does not
constrain certificates yet. This will happen in a separate commit, to
ease review and regression testing.
As a consequence of this step, by default client will not check server
certificate in https. This is consistent with how trust is bootstrapped
with plain http: maybe client is accessing an unexpected/malicious
caucased, but in such case issued certificates will be worthless to a
party which could access the correct caucased. Also, the client
certificate presented to caucased does not allow that caucased to fake
being that user, so there is no privilege escalation possible for
server.
parent bcaebfe7
......@@ -118,7 +118,13 @@ class CaucaseClient(object):
return True
return False
def __init__(self, ca_url, ca_crt_pem_list=None, user_key=None):
def __init__(
self,
ca_url,
ca_crt_pem_list=None,
user_key=None,
http_ca_crt_pem_list=None,
):
# XXX: set timeout to HTTP connections ?
http_url = urlparse(ca_url)
port = http_url.port or 80
......@@ -129,23 +135,25 @@ class CaucaseClient(object):
)
self._ca_crt_pem_list = ca_crt_pem_list
self._path = http_url.path
if ca_crt_pem_list:
ssl_context = ssl.create_default_context(
# unicode object needed as we use PEM, otherwise create_default_context
# expects DER.
cadata=''.join(ca_crt_pem_list).decode('ascii'),
)
if user_key:
try:
ssl_context.load_cert_chain(user_key)
except ssl.SSLError as exc:
raise ValueError('Failed to load user key: %r' % (exc, ))
self._https_connection = self.HTTPSConnection(
http_url.hostname,
443 if port == 80 else port + 1,
#timeout=,
context=ssl_context,
)
ssl_context = ssl.create_default_context(
# unicode object needed as we use PEM, otherwise create_default_context
# expects DER.
cadata=''.join(http_ca_crt_pem_list).decode('ascii') if http_ca_crt_pem_list else None,
)
if not http_ca_crt_pem_list:
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
if user_key:
try:
ssl_context.load_cert_chain(user_key)
except ssl.SSLError as exc:
raise ValueError('Failed to load user key: %r' % (exc, ))
self._https_connection = self.HTTPSConnection(
http_url.hostname,
443 if port == 80 else port + 1,
#timeout=,
context=ssl_context,
)
def _request(self, connection, method, url, body=None, headers=None):
path = self._path + url
......
......@@ -32,7 +32,7 @@ import ssl
import sys
import tempfile
from threading import Thread
from urlparse import urlparse
from urlparse import urlparse, urlunsplit
from wsgiref.simple_server import make_server, WSGIServer
from cryptography import x509
from cryptography.hazmat.backends import default_backend
......@@ -203,7 +203,7 @@ def getSSLContext(
server_key_path,
hostname,
cau,
cas,
http_cas,
renew=False, # Force renewal when True - used in tests
):
"""
......@@ -235,17 +235,26 @@ def getSSLContext(
ssl_context.load_verify_locations(
cadata=cau.getCACertificate().decode('ascii'),
)
cas_certificate_list = cas.getCACertificateList()
http_cas_certificate_list = http_cas.getCACertificateList()
threshold_delta = datetime.timedelta(threshold, 0)
if os.path.exists(server_key_path):
old_crt_pem = utils.getCert(server_key_path)
old_crt = utils.load_certificate(old_crt_pem, cas_certificate_list, None)
exists = os.path.exists(server_key_path)
if exists:
try:
old_crt_pem = utils.getLeafCertificate(server_key_path)
old_crt = utils.load_certificate(
old_crt_pem,
http_cas_certificate_list,
None,
)
except Exception: # pylint: disable=broad-except
exists = False
if exists:
if renew or (
old_crt.not_valid_after - threshold_delta < datetime.datetime.utcnow()
):
new_key = utils.generatePrivateKey(key_len)
new_key_pem = utils.dump_privatekey(new_key)
new_crt_pem = cas.renew(
new_crt_pem = http_cas.renew(
crt_pem=old_crt_pem,
csr_pem=utils.dump_certificate_request(
x509.CertificateSigningRequestBuilder(
......@@ -260,12 +269,14 @@ def getSSLContext(
),
),
)
new_ca_crt_pem = http_cas.getCACertificate()
with _createKey(server_key_path) as crt_file:
crt_file.write(new_key_pem)
crt_file.write(new_crt_pem)
crt_file.write(new_ca_crt_pem)
else:
new_key = utils.generatePrivateKey(key_len)
csr_id = cas.appendCertificateSigningRequest(
csr_id = http_cas.appendCertificateSigningRequest(
csr_pem=utils.dump_certificate_request(
x509.CertificateSigningRequestBuilder(
subject_name=x509.Name([
......@@ -306,18 +317,20 @@ def getSSLContext(
),
override_limits=True,
)
cas.createCertificate(csr_id)
new_crt_pem = cas.getCertificate(csr_id)
http_cas.createCertificate(csr_id)
new_crt_pem = http_cas.getCertificate(csr_id)
new_key_pem = utils.dump_privatekey(new_key)
new_ca_crt_pem = http_cas.getCACertificate()
with _createKey(server_key_path) as crt_file:
crt_file.write(new_key_pem)
crt_file.write(new_crt_pem)
crt_file.write(new_ca_crt_pem)
ssl_context.load_cert_chain(server_key_path)
return (
ssl_context,
utils.load_certificate(
utils.getCert(server_key_path),
cas_certificate_list,
utils.getLeafCertificate(server_key_path),
http_cas_certificate_list,
None,
).not_valid_after - threshold_delta,
)
......@@ -479,6 +492,8 @@ def main(argv=None, until=utils.until):
)
https_port = 443 if http_port == 80 else http_port + 1
cau_crt_life_time = args.user_crt_validity
# Certificate Authority for Users: emitted certificate are trusted by this
# service.
cau = UserCertificateAuthority(
storage=SQLite3Storage(
db_path=args.db,
......@@ -499,6 +514,8 @@ def main(argv=None, until=utils.until):
auto_sign_csr_amount=args.user_auto_approve_count,
lock_auto_sign_csr_amount=args.lock_auto_approve_count,
)
# Certificate Authority for Services: server and client certificates, the
# final produce of caucase.
cas = CertificateAuthority(
storage=SQLite3Storage(
db_path=args.db,
......@@ -516,6 +533,37 @@ def main(argv=None, until=utils.until):
auto_sign_csr_amount=args.service_auto_approve_count,
lock_auto_sign_csr_amount=args.lock_auto_approve_count,
)
# Certificate Authority for caucased https service. Distinct from CAS to be
# able to restrict the validity scope of produced CA certificate, so that it
# can be trusted by genral-purpose https clients without introducing the risk
# of producing rogue certificates.
# This Certificate Authority is only internal to this service, and not exposed
# to http(s), as it can and must only be used to caucased https certificate
# signature. Only the CA certificate is exposed, to allow verification.
https_base_url = urlunsplit((
'https',
'[' + hostname + ']:' + str(https_port),
'/',
None,
None,
))
http_cas = CertificateAuthority(
storage=SQLite3Storage(
db_path=args.db,
table_prefix='http_cas',
),
ca_subject_dict={
'CN': u'Caucased CA at ' + https_base_url,
},
ca_key_size=args.key_len,
# This CA certificate will be installed in browser key stores, where
# automated renewal will be unlikely to happen. As this CA certificate
# will only sign caucased https certificates for this process, assume
# very little will leak from the private key with each signed certificate.
# So it should be safe and more practical to give it a long life.
ca_life_period=40, # approx. 10 years
crt_life_time=args.service_crt_validity,
)
application = Application(cau=cau, cas=cas)
http_list = []
https_list = []
......@@ -568,7 +616,7 @@ def main(argv=None, until=utils.until):
server_key_path=args.server_key,
hostname=hostname,
cau=cau,
cas=cas,
http_cas=http_cas,
)
next_deadline = next_ssl_update
for https in https_list:
......@@ -611,7 +659,7 @@ def main(argv=None, until=utils.until):
server_key_path=args.server_key,
hostname=hostname,
cau=cau,
cas=cas,
http_cas=http_cas,
renew=True,
)
for https in https_list:
......
......@@ -1242,33 +1242,34 @@ class CaucaseTest(unittest.TestCase):
'--renew-crt', user_key_path, '',
)
# Server certificate will expire in 20 days, the key must be renewed
# (but we have to peek at CAS private key, cover your eyes)
(cas_key, ), = sqlite3.connect(
# (but we have to peek at HTTP CAS private key, cover your eyes)
(http_cas_key, ), = sqlite3.connect(
self._server_db,
).cursor().execute(
'SELECT key FROM casca',
'SELECT key FROM http_casca',
).fetchall()
self._stopServer()
crt_pem, key_pem, _ = utils.getKeyPair(
crt_pem, key_pem, ca_crt_pem = utils.getCertKeyAndCACert(
self._server_key,
crl=None,
)
with open(self._server_key, 'w') as server_key_file:
server_key_file.write(key_pem)
server_key_file.write(utils.dump_certificate(
self._setCertificateRemainingLifeTime(
key=utils.load_privatekey(cas_key.encode('ascii')),
key=utils.load_privatekey(http_cas_key.encode('ascii')),
crt=utils.load_certificate(
crt_pem,
[
utils.load_ca_certificate(x)
for x in utils.getCertList(self._client_ca_crt)
utils.load_ca_certificate(ca_crt_pem),
],
None,
),
delta=datetime.timedelta(20, 0)
)
))
server_key_file.write(ca_crt_pem)
reference_server_key = open(self._server_key).read()
self._startServer()
if not retry(
......
......@@ -118,6 +118,53 @@ def getCert(crt_path):
crt, = type_dict.get(pem.Certificate)
return crt.as_bytes()
def getCertKeyAndCACert(crt_path, crl):
"""
Return a certificate with its private key and the certificate which signed
it.
Raises if there is anything else than two certificates and one key, or if
their relationship cannot be validated.
"""
type_dict = _getPEMTypeDict(crt_path)
key, = type_dict[pem.Key]
crt_a, crt_b = type_dict[pem.Certificate]
key = key.as_bytes()
crt_a = crt_a.as_bytes()
crt_b = crt_b.as_bytes()
for crt, ca_crt in (
(crt_a, crt_b),
(crt_b, crt_a),
):
try:
validateCertAndKey(crt, key)
except ValueError:
continue
# key and crt match, check signatures
load_certificate(crt, [load_ca_certificate(ca_crt)], crl)
return crt, key, ca_crt
# Latest error comes from validateCertAndKey
raise # pylint: disable=misplaced-bare-raise
def getLeafCertificate(crt_path):
"""
Return a regular (non-CA) certificate from a file which may contain a CA
certificate and a key.
Raises if there is more or less than one regular certificate.
"""
type_dict = _getPEMTypeDict(crt_path)
result_list = []
for crt in type_dict.get(pem.Certificate, ()):
crt_bytes = crt.as_bytes()
if not x509.load_pem_x509_certificate(
crt_bytes,
_cryptography_backend,
).extensions.get_extension_for_class(
x509.BasicConstraints,
).value.ca:
result_list.append(crt_bytes)
result, = result_list # pylint: disable=unbalanced-tuple-unpacking
return result
def hasOneCert(crt_path):
"""
Returns whether crt_path contains a certificate.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment