Commit 023c4913 authored by Alain Takoudjou's avatar Alain Takoudjou Committed by Vincent Pelletier

Add tests script for ca.py, storage and web.py

parent 26015ada
# This file is part of caucase
# Copyright (C) 2017 Nexedi
# Alain Takoudjou <alain.takoudjou@nexedi.com>
# Vincent Pelletier <vincent@nexedi.com>
#
# caucase is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# caucase is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>.
# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
try:
__import__('pkg_resources').declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# This file is part of caucase
# Copyright (C) 2017 Nexedi
# Alain Takoudjou <alain.takoudjou@nexedi.com>
# Vincent Pelletier <vincent@nexedi.com>
#
# caucase is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# caucase is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>.
import os, time
import shutil
import tempfile
import unittest
import json
from datetime import datetime
from caucase.web import app, db
from OpenSSL import crypto, SSL
from caucase.exceptions import (NoStorage, NotFound, Found, BadSignature,
BadCertificateSigningRequest,
BadCertificate,
CertificateVerificationError,
ExpiredCertificate)
from caucase.ca import CertificateAuthority, DEFAULT_DIGEST_LIST, MIN_CA_RENEW_PERIOD
from caucase import utils
class CertificateAuthorityTest(unittest.TestCase):
def setUp(self):
self.ca_dir = tempfile.mkdtemp()
self.db_file = os.path.join(self.ca_dir, 'ca.db')
self.max_request_amount = 3
self.default_digest = "sha256"
self.crt_keep_time = 0
app.config.update(
DEBUG=True,
CSRF_ENABLED=True,
SECRET_KEY = 'This is an UNSECURE Secret. Please CHANGE THIS for production environments.',
TESTING=True,
SQLALCHEMY_DATABASE_URI='sqlite:///%s' % self.db_file
)
from caucase.storage import Storage
self._storage = Storage(db)
def make_ca(self, crt_life_time, auto_sign_csr=False):
return CertificateAuthority(
storage=self._storage,
ca_life_period=4,
ca_renew_period=2,
crt_life_time=crt_life_time,
crl_renew_period=0.1,
crl_base_url='http://crl.url.com',
ca_subject='/C=XX/ST=State/L=City/OU=OUnit/O=Company/CN=CAAuth/emailAddress=xx@example.com',
max_csr_amount=self.max_request_amount,
crt_keep_time=self.crt_keep_time,
auto_sign_csr=auto_sign_csr
)
def generateCSR(self, cn="toto.example.com"):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
req = crypto.X509Req()
subject = req.get_subject()
subject.CN = cn
subject.C = "CC"
subject.ST = "ST"
subject.L = "LOU"
subject.O = "OOU"
subject.OU = "OU"
subject.emailAddress = "toto@example.com"
req.set_pubkey(key)
utils.X509Extension().setDefaultCsrExtensions(req)
req.sign(key, self.default_digest)
return (req, key)
def csr_tostring(self, csr):
return crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)
def get_fake_cert_key(self):
cert_string = """-----BEGIN CERTIFICATE-----
MIID6DCCAtCgAwIBAwIBBDANBgkqhkiG9w0BAQsFADCBljEiMCAGA1UEAwwZVGhl
IENlcnRpZmljYXRlIEF1dGhvcml0eTELMAkGA1UEBhMCWFgxDjAMBgNVBAgMBVN0
YXRlMREwDwYDVQQKDAhDb21wYWdueTENMAsGA1UECwwEVW5pdDENMAsGA1UEBwwE
Q2l0eTEiMCAGCSqGSIb3DQEJARYTY2EuYXV0aEBleGFtcGxlLmNvbTAeFw0xNzA0
MTYyMTQyNDVaFw0xODA0MTYyMTQyNDVaMBsxGTAXBgNVBAMMEGRhbmRAZXhhbXBs
ZS5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDrJNil62fkNm1j
UgEZ33hg5qYiLeNOEUgKaINCqhfQDuH7tTho+nRNxY2FSpv7ooyMckLYojNm+XEl
lUREE8hgIBiBWgiXazvoaxKFW2BIm7kpfxSC0t4pxwjehftm0Ny/nvms6SiE0ruL
3u+oUI9VVvgofra7mvhX3OZuZNb2QADaPnmyfB8VYGfzYAl0QgyFkrWzPflb/UXD
LdIP/niTpUHMgDTChQ+jf3tHm0pbsRZTxXISJY2+O5qpEgumW+Qcw5sKpjdfMYvH
hoM0IzMofBSfmQCZOjJRcisVLTASj5qN5+GFJi6Pz7oih203Ur8elq+iA0hRisA7
g37LNGXTAgMBAAGjgbowgbcwCQYDVR0TBAIwADAsBglghkgBhvhCAQ0EHxYdT3Bl
blNTTCBHZW5lcmF0ZWQgQ2VydGlmaWNhdGUwHQYDVR0OBBYEFPOBwIfuP8mulzmk
MgC1VGLCkwEDMB8GA1UdIwQYMBaAFGXLZo/7QINvcowfmpUEllOAJ6I5MDwGA1Ud
HwQ1MDMwMaAvoC2GK2h0dHBzOi8vWzIwMDE6NjdjOjEyNTQ6ZTpjNDo6Yzc0OF06
ODAwOS9jcmwwDQYJKoZIhvcNAQELBQADggEBAKarSr7WKXznFjbLfbedrci9mtwo
TYVpOUt/nt6lCiJ2wTGQea/e4KQ3WRwlUUHCX/K+G4QEV8OeDIA4uXnx24fclj35
hCYQCaJfIM96Z+elYIisOX3eFZ9cuo4fkODnry+vQNkYuOn/mFe0sVxoBK+oqSl5
/tN7pTFB2CaSBnRrNHquEc6YFoglCjQW4fXzHdQCdx5B7oOg/yloIst2WagXbyvE
zQvWKm6jjB5/xdT5mpxHB/lanSZMGXFnITh8qXrlTxd/tSa3ic6+k1WC++5brUvE
MtldUnSV++fZh9C4xsXyi26ytr2KkwcJcXQbU2PF4iuV6L0eYO2xOgpDCoQ=
-----END CERTIFICATE-----"""
key_string = """-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDrJNil62fkNm1j
UgEZ33hg5qYiLeNOEUgKaINCqhfQDuH7tTho+nRNxY2FSpv7ooyMckLYojNm+XEl
lUREE8hgIBiBWgiXazvoaxKFW2BIm7kpfxSC0t4pxwjehftm0Ny/nvms6SiE0ruL
3u+oUI9VVvgofra7mvhX3OZuZNb2QADaPnmyfB8VYGfzYAl0QgyFkrWzPflb/UXD
LdIP/niTpUHMgDTChQ+jf3tHm0pbsRZTxXISJY2+O5qpEgumW+Qcw5sKpjdfMYvH
hoM0IzMofBSfmQCZOjJRcisVLTASj5qN5+GFJi6Pz7oih203Ur8elq+iA0hRisA7
g37LNGXTAgMBAAECggEASMRxSv9LekMhnN/OuWv/e7VE6kTbF9ifO6FWJXYvwlIo
utU87Le88ChXgE0zci6+YeQmLZYcZByDWEcWBh89Hgowqy7qg7lKo8UmySAa7r1K
Er5h4Y5R9AnFA9/gidPOzHns+AZ7ZIc2RLWr4qFzicxNJXL5J5twiPgyUy1fnHpg
Ktk3ccgtIe8IJNYS9hGW40X6DZfbiNqnUlGxS0Nsk7RQhEowcuAob7sBf2k6tSAx
qaaB3PYBwGsfgF6Zkq81/ZmzZPoD0vSLURAlglTWgGljqXOqXVnqUFbcyaZeKwqX
4b5MlQMZ09puOz5CGG0HhnTFmt+N1rU3Vsx74G5hWQKBgQD4wHt9WL6DQPQ2/i0l
o6afSd0+DV5HldHGCRt4aF3//bGU3OugNvHVK2ijXEUIDHcpvEW1vwjbqmkFI3Sn
wANCr8YAmu/51uYYyeUP4V35SKBtBBdhUvFOGE3MThJQusAdEYg65T9STQvmpRnJ
Yv5QRlX/jawEtS2H9pZo+WvZpQKBgQDx/tuiU2isBfOrT5MAVxtu/VEZvafwRcHe
0EmRCyW6+rHSA3o2/3f43t6x63qvvk6NYY9rSz/0TZkZ/Y3QihPNqGwGuuzSvsG+
yDfnv6YmtcnBPv2kXHwEeIsd9DdjqT4D3MIHGHo05cu9Ta7oTHVAo8OSQbEqkGwj
oYpuQTz4FwKBgQCxuo1A9OxByWHz/M1zDCdbviHGWTTYftH/5bfr4t3urmt4ChSM
R1WoUjiUJ7Pm2Uk2158TCSgiEvKwSjHqPUXXGtGk0w7M+l8yrOXt378OAnclDPxL
fECO5MyJQerSJWxoGIO2WN9SRVxQcfwnqIQ+BNMjIS0bu/uJHoU/AZ6uRQKBgQC5
FT9Oa5TG3NZ806OOwxCMVtpMYa2sKu4YSB27/VaiJ1MRWO+EWOedRHf2hC+VcmwJ
3fAfE7KaWy8Znb91G+YBiSr2CslOde8gx2laqk2dlbP1RQQhTUrc8IUWJ86lPq/b
rGAJpULyaj7lTiDUMoYLJjVSC0RBVawfpFGH+gVziQKBgQDI+dgmgFv3ULUUTBtP
iPwYrGFWmzcKwevPaIQqrsjJ0TY2INmQZi6pn4ogelUtcRMFjzpBGHrsxJM0RkSy
3A855c5gfp60XOQB3ab0OS0X5/gzDZHThN7wvspUFnZ9i6LhSEOHMEAxwSklCtPq
m4DpuP4nL0ixQJWZuV+qrx6Tow==
-----END PRIVATE KEY-----"""
return (cert_string, key_string)
def check_cert_equal(self, first, second):
if isinstance(first, crypto.X509):
first_string = crypto.dump_certificate(crypto.FILETYPE_PEM, first)
else:
first_string = first
second_string = crypto.dump_certificate(crypto.FILETYPE_PEM, second)
return first_string == second_string
def check_key_equal(self, first, second):
if isinstance(first, crypto.PKey):
first_string = crypto.dump_privatekey(crypto.FILETYPE_PEM, first)
else:
first_string = first
second_string = crypto.dump_privatekey(crypto.FILETYPE_PEM, second)
return first_string == second_string
def check_csr_equal(self, first, second):
if isinstance(first, crypto.X509Req):
first_string = crypto.dump_certificate_request(crypto.FILETYPE_PEM, first)
else:
first_string = first
second_string = crypto.dump_certificate_request(crypto.FILETYPE_PEM, second)
return first_string == second_string
def tearDown(self):
db.session.remove()
db.drop_all()
if os.path.exists(self.ca_dir):
shutil.rmtree(self.ca_dir)
def test_createCAKeyPair(self):
# initials keypair are generated when instanciating ca
ca = self.make_ca(160)
self.assertEquals(len(ca._ca_key_pairs_list), 1)
def renewCAKetPair(self):
ca = self.make_ca(2)
self.assertEquals(len(ca._ca_key_pairs_list), 1)
first = ca._ca_key_pairs_list
# No renew possible
self.assertFalse(ca.renewCAKeyPair())
time.sleep(5)
# ca Certificate should be renewed
self.assertTrue(ca.renewCAKeyPair())
self.assertEquals(len(ca._ca_key_pairs_list), 2)
self.assertTrue(self.check_cert_equal(ca._ca_key_pairs_list[0]['crt'], first['crt']))
self.assertTrue(self.check_key_equal(ca._ca_key_pairs_list[0]['key'], first['key']))
def test_getPendingCertificateRequest(self):
ca = self.make_ca(190)
csr, key = self.generateCSR()
csr_string = self.csr_tostring(csr)
csr_id = ca.createCertificateSigningRequest(csr_string)
stored = ca.getPendingCertificateRequest(csr_id)
self.assertEquals(csr_string, stored)
def test_deletePendingCertificateRequest(self):
ca = self.make_ca(190)
csr, key = self.generateCSR()
csr_string = self.csr_tostring(csr)
csr_id = ca.createCertificateSigningRequest(csr_string)
stored = ca.getPendingCertificateRequest(csr_id)
self.assertEquals(csr_string, stored)
ca.deletePendingCertificateRequest(csr_id)
with self.assertRaises(NotFound):
ca.getPendingCertificateRequest(csr_id)
def test_createCertificate(self):
ca = self.make_ca(190)
csr, key = self.generateCSR()
csr_id = ca.createCertificateSigningRequest(self.csr_tostring(csr))
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
with self.assertRaises(NotFound):
ca.getPendingCertificateRequest(csr_id)
def test_getCAKeypairForCertificate(self):
csr, key = self.generateCSR()
ca = self.make_ca(3)
csr_id = ca.createCertificateSigningRequest(self.csr_tostring(csr))
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
cert_keypair = ca._ca_key_pairs_list[0]
# 2 crt_life_time cycles + 1s
time.sleep(7)
# Create new CA keypair
self.assertTrue(ca.renewCAKeyPair())
# get keypair which should be used to renew this cert
calculated = ca.getCAKeypairForCertificate(x509)
self.assertTrue(self.check_cert_equal(cert_keypair['crt'], calculated['crt']))
self.assertTrue(self.check_key_equal(cert_keypair['key'], calculated['key']))
self.assertEquals(len(ca._ca_key_pairs_list), 2)
new_keypair = ca._ca_key_pairs_list[-1]
time.sleep(3)
# the first ca keypair cannot be used to renew cert (7+3 -12 = 2 < crt_life_time)
calculated = ca.getCAKeypairForCertificate(x509)
self.assertTrue(self.check_cert_equal(new_keypair['crt'], calculated['crt']))
self.assertTrue(self.check_key_equal(new_keypair['key'], calculated['key']))
def test_renewCertificate(self):
ca = self.make_ca(158)
csr, key = self.generateCSR()
csr_string = self.csr_tostring(csr)
csr_id = ca.createCertificateSigningRequest(csr_string)
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
payload = dict(
renew_csr=csr_string,
crt=cert
)
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key), [self.default_digest])
new_csr_id = ca.renew(wrapped)
new_cert_id = '%s.crt.pem' % new_csr_id[:-8]
new_cert = ca.getCertificate(new_cert_id)
new_x509 = crypto.load_certificate(crypto.FILETYPE_PEM, new_cert)
self.assertTrue(utils.validateCertAndKey(new_x509, key))
self.assertEquals(new_x509.get_subject().CN, x509.get_subject().CN)
# current certificate is not revoked
self.assertTrue(utils.validateCertAndKey(x509, key))
def test_renewCertificate_bad_cert(self):
ca = self.make_ca(158)
csr, key = self.generateCSR()
csr_string = self.csr_tostring(csr)
csr_id = ca.createCertificateSigningRequest(csr_string)
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
# second certificate
csr2, key2 = self.generateCSR()
csr_id2 = ca.createCertificateSigningRequest(self.csr_tostring(csr2))
cert_id2 = ca.createCertificate(csr_id2)
cert2 = ca.getCertificate(cert_id2)
x509_2 = crypto.load_certificate(crypto.FILETYPE_PEM, cert2)
self.assertTrue(utils.validateCertAndKey(x509_2, key2))
payload = dict(
renew_csr=csr_string,
crt=cert
)
# sign with bad key
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key2), [self.default_digest])
with self.assertRaises(BadSignature):
ca.renew(wrapped)
# payload with invalid PEM certificate
payload = dict(
renew_csr=csr_string,
crt="BAD PEM CERTIFICATE"
)
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key), [self.default_digest])
with self.assertRaises(BadSignature):
ca.renew(wrapped)
# payload with invalid PEM certificate request content
payload = dict(
renew_csr="BAD PEM CERTIFICATE REQUEST",
crt=cert
)
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key), [self.default_digest])
with self.assertRaises(BadCertificateSigningRequest):
ca.renew(wrapped)
# payload with Fake certificate
fcert, fkey = self.get_fake_cert_key()
payload = dict(
renew_csr=csr_string,
crt=cert
)
wrapped = utils.wrap(payload, fkey, [self.default_digest])
with self.assertRaises(BadSignature):
ca.renew(wrapped)
payload = dict(
renew_csr=csr_string,
crt=fcert
)
wrapped = utils.wrap(payload, fkey, [self.default_digest])
with self.assertRaises(BadCertificateSigningRequest):
# Fake certificate and renew_csr has not the same csr
ca.renew(wrapped)
def test_revokeCertificate(self):
ca = self.make_ca(158)
csr, key = self.generateCSR()
csr_id = ca.createCertificateSigningRequest(self.csr_tostring(csr))
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
payload = dict(
reason='',
revoke_crt=cert
)
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key), [self.default_digest])
ca.revokeCertificate(wrapped)
with self.assertRaises(NotFound):
ca.getCertificate(cert_id)
revocation_list = self._storage.getRevocationList()
self.assertEquals(len(revocation_list), 1)
self.assertEquals(revocation_list[0].serial, utils.getSerialToInt(x509))
def test_revokeCertificate_expired(self):
ca = self.make_ca(2)
csr, key = self.generateCSR()
csr_id = ca.createCertificateSigningRequest(self.csr_tostring(csr))
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
# wait until certificate expire
time.sleep(3)
payload = dict(
reason='',
revoke_crt=cert
)
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key), [self.default_digest])
with self.assertRaises(CertificateVerificationError):
# if certificate expire, verification fail
ca.revokeCertificate(wrapped)
def test_revokeCertificate_bad_cert(self):
ca = self.make_ca(158)
csr, key = self.generateCSR()
csr_id = ca.createCertificateSigningRequest(self.csr_tostring(csr))
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
# second certificate
csr2, key2 = self.generateCSR()
csr_id2 = ca.createCertificateSigningRequest(self.csr_tostring(csr2))
cert_id2 = ca.createCertificate(csr_id2)
cert2 = ca.getCertificate(cert_id2)
x509_2 = crypto.load_certificate(crypto.FILETYPE_PEM, cert2)
self.assertTrue(utils.validateCertAndKey(x509_2, key2))
payload = dict(
reason="",
revoke_crt=cert
)
# sign with bad key
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key2), [self.default_digest])
with self.assertRaises(BadSignature):
ca.revokeCertificate(wrapped)
# payload with invalid PEM certificate
payload = dict(
reason="",
revoke_crt="BAD PEM CERTIFICATE"
)
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key), [self.default_digest])
with self.assertRaises(BadSignature):
ca.revokeCertificate(wrapped)
# payload with Fake certificate
fcert, fkey = self.get_fake_cert_key()
payload = dict(
reason="",
revoke_crt=cert
)
wrapped = utils.wrap(payload, fkey, [self.default_digest])
with self.assertRaises(BadSignature):
ca.revokeCertificate(wrapped)
payload = dict(
reason="",
revoke_crt=fcert
)
wrapped = utils.wrap(payload, fkey, [self.default_digest])
with self.assertRaises(CertificateVerificationError):
ca.revokeCertificate(wrapped)
def test_getCertificateRevocationList(self):
ca = self.make_ca(158)
def signcert():
csr, key = self.generateCSR()
csr_id = ca.createCertificateSigningRequest(self.csr_tostring(csr))
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
return x509, key
cert_1, key_1 = signcert()
cert_2, key_2 = signcert()
cert2_string = crypto.dump_certificate(crypto.FILETYPE_PEM, cert_2)
cert_3, key_3 = signcert()
cert3_string = crypto.dump_certificate(crypto.FILETYPE_PEM, cert_3)
cert_4, key_4 = signcert()
crl = ca.getCertificateRevocationList()
crl_obj = crypto.load_crl(crypto.FILETYPE_PEM, crl)
self.assertEquals(crl_obj.get_revoked(), None)
payload = dict(
reason="",
revoke_crt=cert2_string
)
# sign with bad key
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key_2), [self.default_digest])
ca.revokeCertificate(wrapped)
crl2_string = ca.getCertificateRevocationList()
crl2 = crypto.load_crl(crypto.FILETYPE_PEM, crl2_string)
self.assertEquals(len(crl2.get_revoked()), 1)
serial = '0%s' % cert_2.get_serial_number()
self.assertEquals(crl2.get_revoked()[0].get_serial(), serial)
payload = dict(
reason="",
revoke_crt=cert3_string
)
# sign with bad key
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key_3), [self.default_digest])
ca.revokeCertificate(wrapped)
crl3_string = ca.getCertificateRevocationList()
crl3 = crypto.load_crl(crypto.FILETYPE_PEM, crl3_string)
self.assertEquals(len(crl3.get_revoked()), 2)
matches = 0
for revoked in crl3.get_revoked():
if revoked.get_serial() == '0%s' % cert_3.get_serial_number():
matches += 1
elif revoked.get_serial() == '0%s' % cert_2.get_serial_number():
matches += 1
self.assertEquals(matches, 2)
crl4_string = ca.getCertificateRevocationList()
# nothing changed, crl not expired
self.assertEquals(crl3_string, crl4_string)
def test_getCertificateRevocationList_with_expire(self):
ca = self.make_ca(2)
def signcert():
csr, key = self.generateCSR()
csr_id = ca.createCertificateSigningRequest(self.csr_tostring(csr))
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
return x509, key
cert_1, key_1 = signcert()
cert_2, key_2 = signcert()
cert2_string = crypto.dump_certificate(crypto.FILETYPE_PEM, cert_2)
crl_string = ca.getCertificateRevocationList()
crl = crypto.load_crl(crypto.FILETYPE_PEM, crl_string)
self.assertEquals(crl.get_revoked(), None)
payload = dict(
reason="",
revoke_crt=cert2_string
)
# sign with bad key
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key_2), [self.default_digest])
ca.revokeCertificate(wrapped)
crl2_string = ca.getCertificateRevocationList()
crl2 = crypto.load_crl(crypto.FILETYPE_PEM, crl2_string)
self.assertEquals(len(crl2.get_revoked()), 1)
serial = '0%s' % cert_2.get_serial_number()
self.assertEquals(crl2.get_revoked()[0].get_serial(), serial)
# wait until cert_2 expire
time.sleep(3)
cert_3, key_3 = signcert()
cert3_string = crypto.dump_certificate(crypto.FILETYPE_PEM, cert_3)
payload = dict(
reason="",
revoke_crt=cert3_string
)
# sign with bad key
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key_3), [self.default_digest])
ca.revokeCertificate(wrapped)
crl3_string = ca.getCertificateRevocationList()
crl3 = crypto.load_crl(crypto.FILETYPE_PEM, crl3_string)
# cert_2 is not longer into crl (expired)
self.assertEquals(len(crl3.get_revoked()), 1)
serial = '0%s' % cert_3.get_serial_number()
self.assertEquals(crl3.get_revoked()[0].get_serial(), serial)
def test_getCertificateRevocationList_with_validation(self):
ca = self.make_ca(158)
def signcert():
csr, key = self.generateCSR()
csr_id = ca.createCertificateSigningRequest(self.csr_tostring(csr))
# sign certificate with default ca keypair
cert_id = ca.createCertificate(csr_id)
cert = ca.getCertificate(cert_id)
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
self.assertTrue(utils.validateCertAndKey(x509, key))
return x509, key
cert_1, key_1 = signcert()
cert_2, key_2 = signcert()
cert2_string = crypto.dump_certificate(crypto.FILETYPE_PEM, cert_2)
cert_3, key_3 = signcert()
payload = dict(
reason="",
revoke_crt=cert2_string
)
# sign with bad key
wrapped = utils.wrap(payload, crypto.dump_privatekey(crypto.FILETYPE_PEM, key_2), [self.default_digest])
ca.revokeCertificate(wrapped)
crl_string = ca.getCertificateRevocationList()
crl = crypto.load_crl(crypto.FILETYPE_PEM, crl_string)
self.assertEquals(len(crl.get_revoked()), 1)
serial = '0%s' % cert_2.get_serial_number()
self.assertEquals(crl.get_revoked()[0].get_serial(), serial)
with self.assertRaises(CertificateVerificationError):
utils.verifyCertificateChain(cert_2,
[x['crt'] for x in ca._ca_key_pairs_list], crl)
utils.verifyCertificateChain(cert_3,
[x['crt'] for x in ca._ca_key_pairs_list], crl)
utils.verifyCertificateChain(cert_1,
[x['crt'] for x in ca._ca_key_pairs_list], crl)
# This file is part of caucase
# Copyright (C) 2017 Nexedi
# Alain Takoudjou <alain.takoudjou@nexedi.com>
# Vincent Pelletier <vincent@nexedi.com>
#
# caucase is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# caucase is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>.
import os, time
import urllib2
import shutil
import tempfile
import json
from datetime import datetime
from caucase.web import parseArguments, configure_flask
from OpenSSL import crypto, SSL
from caucase.exceptions import (NoStorage, NotFound, Found)
from caucase import utils
from flask_testing import TestCase
from flask import url_for
class CertificateAuthorityWebTest(TestCase):
def setUp(self):
self.ca_dir = tempfile.mkdtemp()
configure_flask(parseArguments(['--ca-dir', self.ca_dir, '-s', '/CN=CA Auth Test/emailAddress=xx@example.com']))
def tearDown(self):
from caucase.web import db
db.session.remove()
db.drop_all()
if os.path.exists(self.ca_dir):
shutil.rmtree(self.ca_dir)
def create_app(self):
from caucase.web import app
app.config['TESTING'] = True
app.config['LIVESERVER_PORT'] = 0
return app
def generateCSR(self, cn="toto.example.com"):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
req = crypto.X509Req()
subject = req.get_subject()
subject.CN = cn
subject.C = "CC"
subject.ST = "ST"
subject.L = "LO"
subject.O = "ORG"
subject.OU = "OU"
subject.emailAddress = "toto@example.com"
req.set_pubkey(key)
utils.X509Extension().setDefaultCsrExtensions(req)
req.sign(key, 'sha256')
return (req, key)
def _init_password(self, password):
response = self.client.post('/admin/setpassword', data=dict(password=password),
follow_redirects=True)
self.assert200(response)
def test_get_crl(self):
response = self.client.get('/crl')
self.assert200(response)
crypto.load_crl(crypto.FILETYPE_PEM, response.data)
def test_put_csr(self):
csr, _ = self.generateCSR()
csr_string = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)
data = dict(csr=csr_string)
response = self.client.put(url_for('request_cert'), data=data)
self.assertEquals(response.status_code, 201)
csr_key = response.headers['Location'].split('/')[-1]
# the first csr is signed and csr is no longer available
response2 = self.client.get('/csr/%s' % csr_key)
self.assert404(response2)
cert_url = '/crt/%s.crt.pem' % csr_key[:-8]
response3 = self.client.get(cert_url)
self.assert200(response3)
crypto.load_certificate(crypto.FILETYPE_PEM, response3.data)
# put another csr
csr2, _ = self.generateCSR()
csr2_string = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)
response4 = self.client.put(url_for('request_cert'), data=dict(csr=csr2_string))
self.assertEquals(response4.status_code, 201)
csr_key = response4.headers['Location'].split('/')[-1]
# get csr will return the csr
response5 = self.client.get('/csr/%s' % csr_key)
self.assert200(response5)
# the certificate is not signed
cert_url = '/crt/%s.crt.pem' % csr_key[:-8]
response6 = self.client.get(cert_url)
self.assert404(response6)
self.assertEquals(response5.data, csr2_string)
def test_get_csr_notfound(self):
response = self.client.get('/csr/1234')
self.assert404(response)
# self.assertEquals(response.json['name'], 'FileNotFound'))
def test_get_cacert(self):
response = self.client.get('/crt/ca.crt.pem')
self.assert200(response)
crypto.load_certificate(crypto.FILETYPE_PEM, response.data)
# This file is part of caucase
# Copyright (C) 2017 Nexedi
# Alain Takoudjou <alain.takoudjou@nexedi.com>
# Vincent Pelletier <vincent@nexedi.com>
#
# caucase is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# caucase is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>.
import os, time
import shutil
import tempfile
import unittest
import json
from datetime import datetime, timedelta
from caucase.web import app, db
from OpenSSL import crypto, SSL
from caucase.exceptions import (NoStorage, NotFound, Found)
from sqlite3 import IntegrityError
from caucase import utils
class StorageTest(unittest.TestCase):
def setUp(self):
self.ca_dir = tempfile.mkdtemp()
self.db_file = os.path.join(self.ca_dir, 'ca.db')
#os.mkdir(self.ca_dir)
self.max_request_amount = 3
self.default_digest = "sha256"
app.config.update(
DEBUG=True,
CSRF_ENABLED=True,
SECRET_KEY = 'This is an UNSECURE Secret. Please CHANGE THIS for production environments.',
TESTING=True,
SQLALCHEMY_DATABASE_URI='sqlite:///%s' % self.db_file
)
from caucase.storage import Storage
self._storage = Storage(db)
def setConfig(self, key, value):
entry = self._storage._getConfig(key)
from caucase.storage import Config
if not entry:
entry = Config(key=key, value='%s' % value)
db.session.add(entry)
else:
# update value
entry.value = value
db.session.commit()
def tearDown(self):
db.session.remove()
db.drop_all()
if os.path.exists(self.ca_dir):
shutil.rmtree(self.ca_dir)
def createCAKeyPair(self, life_time=60):
key_pair = {}
key = crypto.PKey()
# Use 2048 bits key size
key.generate_key(crypto.TYPE_RSA, 2048)
key_pair['key'] = key
ca = crypto.X509()
ca.set_version(3)
ca.set_serial_number(int(time.time()))
ca.get_subject().CN = "CA Cert %s" % int(time.time())
ca.get_subject().C = "FR"
ca.get_subject().ST = "XX"
ca.get_subject().O = "ORG"
ca.get_subject().OU = "OU"
ca.get_subject().L = "LL"
ca.get_subject().emailAddress = "xxx@exemple.com"
ca.gmtime_adj_notBefore(0)
ca.gmtime_adj_notAfter(life_time)
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
utils.X509Extension().setCaExtensions(ca)
ca.sign(key, self.default_digest)
key_pair['crt'] = ca
return key_pair
def generateCSR(self, cn="toto.example.com"):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
req = crypto.X509Req()
subject = req.get_subject()
subject.CN = cn
subject.C = "CC"
subject.ST = "ST"
subject.L = "LL"
subject.O = "ORG"
subject.OU = "OU"
subject.emailAddress = "toto@example.com"
req.set_pubkey(key)
utils.X509Extension().setDefaultCsrExtensions(req)
req.sign(key, self.default_digest)
return (req, key)
def createCertificate(self, ca_key_pair, req, expire_sec=180):
serial = self._storage.getNextCertificateSerialNumber()
cert = crypto.X509()
# 3 = v3
cert.set_version(3)
cert.set_serial_number(serial)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(expire_sec)
cert.set_issuer(ca_key_pair['crt'].get_subject())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
utils.X509Extension().setDefaultExtensions(
cert,
subject=cert,
issuer=ca_key_pair['crt'],
crl_url="http://ca.crl.com")
cert.sign(ca_key_pair['key'], self.default_digest)
return cert
def generateCRL(self, ca_key_pair, revocation_list, version_number):
now = datetime.now()
crl = crypto.CRL()
num_crl_days = 1
for revocation in revocation_list:
revoked = crypto.Revoked()
revoked.set_rev_date(
revocation.creation_date.strftime("%Y%m%d%H%M%SZ").encode("ascii")
)
revoked.set_serial(revocation.serial.encode("ascii"))
revoked.set_reason(None) #b'%s' % revocation.reason)
crl.add_revoked(revoked)
crl.set_version(version_number)
# XXX - set how to get the cacert here
cert = ca_key_pair['crt']
key = ca_key_pair['key']
return crypto.load_crl(
crypto.FILETYPE_PEM,
crl.export(
cert,
key,
type=crypto.FILETYPE_PEM,
days=num_crl_days,
digest=self.default_digest)
)
def check_cert_equal(self, first, second):
if isinstance(first, crypto.X509):
first_string = crypto.dump_certificate(crypto.FILETYPE_PEM, first)
else:
first_string = first
second_string = crypto.dump_certificate(crypto.FILETYPE_PEM, second)
return first_string == second_string
def check_key_equal(self, first, second):
if isinstance(first, crypto.PKey):
first_string = crypto.dump_privatekey(crypto.FILETYPE_PEM, first)
else:
first_string = first
second_string = crypto.dump_privatekey(crypto.FILETYPE_PEM, second)
return first_string == second_string
def check_csr_equal(self, first, second):
if isinstance(first, crypto.X509Req):
first_string = crypto.dump_certificate_request(crypto.FILETYPE_PEM, first)
else:
first_string = first
second_string = crypto.dump_certificate_request(crypto.FILETYPE_PEM, second)
return first_string == second_string
def test_db_exists(self):
self.assertTrue(os.path.exists(self.db_file))
def test_config(self):
# no config exists
self.assertEquals(self._storage.getConfig('config-sample'), None)
# with default value
self.assertEquals(self._storage.getConfig('config-sample', "tototo"), "tototo")
# set config (stored as string)
self.setConfig('config-test', 1458)
self.setConfig('test-dict', dict(first=23, second="sec"))
# values are string
self.assertEquals(self._storage.getConfig('config-test'), "1458")
self.assertEquals(self._storage.getConfig('test-dict'), str(dict(first=23, second="sec")))
def test_store_CAKeypair(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
# check stored keypair
stored = self._storage.getCAKeyPairList()
self.assertEquals(len(stored), 1)
self.assertTrue(self.check_cert_equal(stored[0]['crt'], keypair['crt']))
self.assertTrue(self.check_key_equal(stored[0]['key'], keypair['key']))
def test_store_same_keypair(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
# check stored keypair
stored = self._storage.getCAKeyPairList()
self.assertEquals(len(stored), 1)
self.assertTrue(self.check_cert_equal(stored[0]['crt'], keypair['crt']))
self.assertTrue(self.check_key_equal(stored[0]['key'], keypair['key']))
# store again the same keypair
with self.assertRaises(Found):
self._storage.storeCAKeyPair(keypair)
def test_store_keypair_multiple(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
# check stored keypair
stored = self._storage.getCAKeyPairList()
self.assertEquals(len(stored), 1)
self.assertTrue(self.check_cert_equal(stored[0]['crt'], keypair['crt']))
self.assertTrue(self.check_key_equal(stored[0]['key'], keypair['key']))
time.sleep(1)
keypair2 = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair2)
stored2 = self._storage.getCAKeyPairList()
self.assertEquals(len(stored2), 2)
# check that order of keypair is good
first = stored2[0]
self.assertTrue(self.check_cert_equal(first['crt'], keypair['crt']))
self.assertTrue(self.check_key_equal(first['key'], keypair['key']))
second = stored2[1]
self.assertTrue(self.check_cert_equal(second['crt'], keypair2['crt']))
self.assertTrue(self.check_key_equal(second['key'], keypair2['key']))
def test_storeCertificateSigningRequest(self):
csr, key = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
stored = self._storage.getPendingCertificateRequest(csr_id)
self.assertTrue(self.check_csr_equal(stored, csr))
def test_deletePendingCertificateRequest(self):
csr, key = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
stored = self._storage.getPendingCertificateRequest(csr_id)
self.assertTrue(self.check_csr_equal(stored, csr))
self._storage.deletePendingCertificateRequest(csr_id)
with self.assertRaises(NotFound):
self._storage.getPendingCertificateRequest(csr_id)
def test_storeCertificateSigningRequest_no_storage(self):
self.setConfig('max-csr-amount', self.max_request_amount)
self.assertEquals(int(self._storage.getConfig('max-csr-amount')), self.max_request_amount)
for i in range(0, self.max_request_amount):
csr, _ = self.generateCSR()
self._storage.storeCertificateSigningRequest(csr)
# will raise NoStorage now
csr, _ = self.generateCSR()
with self.assertRaises(NoStorage):
self._storage.storeCertificateSigningRequest(csr)
csr_list = self._storage.getPendingCertificateRequestList()
self.assertEquals(len(csr_list), self.max_request_amount)
def store_storeCertificateSigningRequest_same(self):
csr, key = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
stored = self._storage.getPendingCertificateRequest(csr_id)
self.assertTrue(self.check_csr_equal(stored, csr))
# store a second time the same csr
csr2_id = self._storage.storeCertificateSigningRequest(csr)
self.assertEquals(csr2_id, csr_id)
stored2 = self._storage.getPendingCertificateRequest(csr2_id)
self.assertEquals(stored2, stored)
csr_list = self._storage.getPendingCertificateRequestList()
# there is only on csr in the list
self.assertEquals(len(csr_list), 1)
def test_getNextCertificateSerialNumber_empty(self):
serial = self._storage.getNextCertificateSerialNumber()
self.assertEquals(serial, 1)
def test_storeCertificate(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
csr, _ = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
cert = self.createCertificate(keypair, csr)
cert_id = self._storage.storeCertificate(csr_id, cert)
stored = self._storage.getCertificate(cert_id)
self.assertTrue(self.check_cert_equal(stored, cert))
def test_storeCertificate_wrong_csr(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
csr, _ = self.generateCSR()
cert = self.createCertificate(keypair, csr)
csr_id="1234"
# csr_id not exists
with self.assertRaises(NotFound):
self._storage.storeCertificate(csr_id, cert)
csr_id = self._storage.storeCertificateSigningRequest(csr)
self._storage.deletePendingCertificateRequest(csr_id)
# csr was deleted
with self.assertRaises(NotFound):
self._storage.storeCertificate(csr_id, cert)
def test_storeCertificate_same(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
csr, _ = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
cert = self.createCertificate(keypair, csr)
cert_id = self._storage.storeCertificate(csr_id, cert)
stored = self._storage.getCertificate(cert_id)
self.assertTrue(self.check_cert_equal(stored, cert))
with self.assertRaises(NotFound):
# CSR not found
self._storage.storeCertificate(csr_id, cert)
def test_storeCertificate_same_csr(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
csr, _ = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
cert = self.createCertificate(keypair, csr)
cert_id = self._storage.storeCertificate(csr_id, cert)
stored = self._storage.getCertificate(cert_id)
self.assertTrue(self.check_cert_equal(stored, cert))
with self.assertRaises(NotFound):
# CSR not found
self._storage.storeCertificate(csr_id, cert)
csr2_id = self._storage.storeCertificateSigningRequest(csr)
self.assertNotEquals(csr2_id, csr_id)
#with self.assertRaises(IntegrityError):
# cannot store 2 certificate with same serial
# self._storage.storeCertificate(csr2_id, cert)
# can store new certificate, using a different csr2_id from csr
cert2 = self.createCertificate(keypair, csr)
cert2_id = self._storage.storeCertificate(csr2_id, cert2)
self.assertNotEquals(cert2_id, cert_id)
def test_revokeCertificate(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
csr, _ = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
cert = self.createCertificate(keypair, csr)
cert_id = self._storage.storeCertificate(csr_id, cert)
expiration_date = datetime.strptime(cert.get_notAfter(), '%Y%m%d%H%M%SZ')
self._storage.revokeCertificate(utils.getSerialToInt(cert), expiration_date)
with self.assertRaises(NotFound):
# certificate was revoked
self._storage.getCertificate(cert_id)
revocation_list = self._storage.getRevocationList()
self.assertEquals(len(revocation_list), 1)
self.assertEquals(revocation_list[0].serial, utils.getSerialToInt(cert))
self.assertEquals(revocation_list[0].crt_expire_after, expiration_date)
def test_revokeCertificate_not_exists(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
csr, _ = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
cert = self.createCertificate(keypair, csr)
cert_id = self._storage.storeCertificate(csr_id, cert)
expiration_date = datetime.strptime(cert.get_notAfter(), '%Y%m%d%H%M%SZ')
revocation_list = self._storage.getRevocationList()
self.assertEquals(revocation_list, [])
self._storage.revokeCertificate(utils.getSerialToInt(cert), expiration_date)
with self.assertRaises(NotFound):
# certificate was revoked
self._storage.revokeCertificate(utils.getSerialToInt(cert), expiration_date)
def test_storeCertificateRevocation_with_expired(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
def store_cert(expire_sec):
csr, _ = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
cert = self.createCertificate(keypair, csr, expire_sec=expire_sec)
cert_id = self._storage.storeCertificate(csr_id, cert)
return cert_id, cert
id1, cert1 = store_cert(5)
id2, cert2 = store_cert(260)
id3, cert3 = store_cert(180)
expiration_date1 = datetime.strptime(cert1.get_notAfter(), '%Y%m%d%H%M%SZ')
expiration_date3 = datetime.strptime(cert3.get_notAfter(), '%Y%m%d%H%M%SZ')
self._storage.revokeCertificate(utils.getSerialToInt(cert1), expiration_date1)
self._storage.revokeCertificate(utils.getSerialToInt(cert3), expiration_date3)
revocation_list = self._storage.getRevocationList()
self.assertEquals(len(revocation_list), 2)
stored_cert = self._storage.getCertificate(id2)
self.assertTrue(self.check_cert_equal(stored_cert, cert2))
for i in [0, 1]:
if revocation_list[0].serial == utils.getSerialToInt(cert1):
self.assertEquals(revocation_list[0].crt_expire_after, expiration_date1)
revocation_list.pop(0)
elif revocation_list[0].serial == utils.getSerialToInt(cert3):
self.assertEquals(revocation_list[0].crt_expire_after, expiration_date3)
revocation_list.pop(0)
# All items was removed with pop(0)
self.assertEquals(revocation_list, [])
time.sleep(5)
revocation_list = self._storage.getRevocationList()
# revoked cert1 certificate has expired notAfter
self.assertEquals(len(revocation_list), 1)
self.assertEquals(revocation_list[0].serial, utils.getSerialToInt(cert3))
with self.assertRaises(NotFound):
self._storage.getCertificate(id1)
def test_getNextCRLVersionNumber(self):
serial = self._storage.getNextCRLVersionNumber()
self.assertEquals(serial, 1)
def test_storeCertificateRevocationList(self):
keypair = self.createCAKeyPair()
self._storage.storeCAKeyPair(keypair)
def store_cert(expire_sec):
csr, _ = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
cert = self.createCertificate(keypair, csr, expire_sec=expire_sec)
cert_id = self._storage.storeCertificate(csr_id, cert)
return cert_id, cert
id1, cert1 = store_cert(60)
id2, cert2 = store_cert(260)
id3, cert3 = store_cert(180)
expiration_date1 = datetime.strptime(cert1.get_notAfter(), '%Y%m%d%H%M%SZ')
expiration_date3 = datetime.strptime(cert3.get_notAfter(), '%Y%m%d%H%M%SZ')
self._storage.revokeCertificate(utils.getSerialToInt(cert1), expiration_date1)
self._storage.revokeCertificate(utils.getSerialToInt(cert3), expiration_date3)
revocation_list = self._storage.getRevocationList()
self.assertEquals(len(revocation_list), 2)
crl = self.generateCRL(keypair, revocation_list, 1)
self._storage.storeCertificateRevocationList(
crl,
expiration_date=(datetime.utcnow() + timedelta(0, 160))
)
stored = self._storage.getCertificateRevocationList()
self.assertNotEqual(stored, None)
crl_string = crypto.dump_crl(crypto.FILETYPE_PEM, crl)
self.assertEquals(crl_string, stored)
def test_getCertificateRevocationList_empty(self):
keypair = self.createCAKeyPair()
crl = self._storage.getCertificateRevocationList()
self.assertEqual(crl, None)
crl = self.generateCRL(keypair, [], 1)
self._storage.storeCertificateRevocationList(
crl,
expiration_date=(datetime.utcnow() + timedelta(0, 3))
)
stored = self._storage.getCertificateRevocationList()
self.assertNotEqual(stored, None)
crl_string = crypto.dump_crl(crypto.FILETYPE_PEM, crl)
self.assertEquals(crl_string, stored)
time.sleep(4)
stored = self._storage.getCertificateRevocationList()
# crl is not valid anymore
self.assertEqual(stored, None)
def test_housekeep(self):
from caucase.storage import CertificateRequest, Certificate, CAKeypair, CertificateRevocationList, Revocation
self.setConfig('crt-keep-time', 5)
self.setConfig('csr-keep-time', 5)
# ca cert expire after 4 seconds
keypair = self.createCAKeyPair(5)
self._storage.storeCAKeyPair(keypair)
time.sleep(1)
keypair2 = self.createCAKeyPair(15)
self._storage.storeCAKeyPair(keypair2)
self.assertEquals(len(self._storage.getCAKeyPairList()), 2)
def store_cert(expire_sec):
csr, _ = self.generateCSR()
csr_id = self._storage.storeCertificateSigningRequest(csr)
cert = self.createCertificate(keypair, csr, expire_sec=expire_sec)
cert_id = self._storage.storeCertificate(csr_id, cert)
return cert_id, cert
id1, cert1 = store_cert(4)
id2, cert2 = store_cert(4)
id3, cert3 = store_cert(8)
expiration_date1 = datetime.strptime(cert2.get_notAfter(), '%Y%m%d%H%M%SZ')
self._storage.revokeCertificate(utils.getSerialToInt(cert2), expiration_date1)
revocation_list = self._storage.getRevocationList()
self.assertEquals(len(revocation_list), 1)
crl = self.generateCRL(keypair, revocation_list, 2)
self._storage.storeCertificateRevocationList(
crl,
expiration_date=(datetime.utcnow() + timedelta(0, 4))
)
time.sleep(2)
id_4, cert4 = store_cert(10)
id_5, cert5 = store_cert(10)
expiration_date3 = datetime.strptime(cert4.get_notAfter(), '%Y%m%d%H%M%SZ')
self._storage.revokeCertificate(utils.getSerialToInt(cert4), expiration_date3)
revocation_list = self._storage.getRevocationList()
self.assertEquals(len(revocation_list), 2)
crl2 = self.generateCRL(keypair, revocation_list, 3)
self._storage.storeCertificateRevocationList(
crl,
expiration_date=(datetime.utcnow() + timedelta(0, 10))
)
time.sleep(3)
crl_list = CertificateRevocationList.query.filter().all()
self.assertEquals(len(crl_list), 2)
revocation_list = Revocation.query.filter().all()
self.assertEquals(len(revocation_list), 2)
self._storage.housekeep()
# One key_pair is expired
stored = self._storage.getCAKeyPairList()
self.assertEquals(len(stored), 1)
self.assertTrue(self.check_cert_equal(stored[0]['crt'], keypair2['crt']))
self.assertTrue(self.check_key_equal(stored[0]['key'], keypair2['key']))
csr_count = CertificateRequest.query.filter().count()
# 3 csr should be removed
self.assertEquals(csr_count, 2)
with self.assertRaises(NotFound):
self._storage.getCertificate(id1)
self._storage.getCertificate(id2)
self._storage.getCertificate(id3)
with self.assertRaises(NotFound):
# this certificate is revoked
self._storage.getCertificate(id_4)
# content still exists (keep-time not expired)
self._storage.getCertificate(id_5)
# certificate informations are not remove
cert_count = Certificate.query.filter().count()
self.assertEquals(cert_count, 5)
revocation_list = Revocation.query.filter().all()
self.assertEquals(len(revocation_list), 1)
self.assertTrue(revocation_list[0].serial == utils.getSerialToInt(cert4))
# one crl is removed
crl_list = CertificateRevocationList.query.filter().all()
self.assertEquals(len(crl_list), 1)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment