Commit 5f6857b7 authored by Alain Takoudjou's avatar Alain Takoudjou

factorise cli_flask code, add more comments to functions, add admin revoke crt by serial

    Split cli_flask functions used to renew, sign and revoke certificate.
    Allow to revoke a certificate by serial PUT /crt/revoke/serial, this
    method required admin authentication. Also add GET /crt/serial/<string:serial>
parent 2d6b3bed
......@@ -158,6 +158,8 @@ class CertificateAuthority(object):
def getPendingCertificateRequest(self, csr_id):
"""
Retrieve the content of a pending signing request.
@param csr_id: The id of CSR returned by the storage
"""
return self._storage.getPendingCertificateRequest(csr_id)
......@@ -165,6 +167,8 @@ class CertificateAuthority(object):
"""
Sanity-check CSR, stores it and generates a unique signing request
identifier (crt_id).
@param csr: CSR string in PEM format
"""
# Check number of already-pending signing requests
# Check if csr is self-signed
......@@ -191,12 +195,18 @@ class CertificateAuthority(object):
def deletePendingCertificateRequest(self, csr_id):
"""
Reject a pending certificate signing request.
@param csr_id: The id of CSR returned by the storage
"""
self._storage.deletePendingCertificateRequest(csr_id)
def getPendingCertificateRequestList(self, limit=0, with_data=False):
"""
Return list of signed certificate
Return list of pending certificate signature request
@param limit: number of element to fetch, 0 is not limit (int)
@param with_data: True or False, say if return csr PEM string associated
to others informations (bool).
"""
return self._storage.getPendingCertificateRequestList(limit, with_data)
......@@ -204,6 +214,11 @@ class CertificateAuthority(object):
"""
Generate new signed certificate. `ca_key_pair` is the CA key_pair to use
if None, use the latest CA key_pair
@param csr_id: CSR ID returned by storage, csr should be linked to the
new certificate (string).
@param ca_key_pair: The CA key_pair to used for signature. If None, the
latest key_pair is used.
"""
# Apply extensions (ex: "not a certificate", ...)
# Generate a certificate from the CSR
......@@ -220,11 +235,33 @@ class CertificateAuthority(object):
return crt_id
def getCertificate(self, crt_id):
"""
Return a Certificate string in PEM format
@param crt_id: Certificate ID returned by storage during certificate creation
"""
return self._storage.getCertificate(crt_id)
def getCertificateFromSerial(self, serial):
"""
Return a Certificate string in PEM format
@param serial: serial of the certificate (string)
"""
cert = self._storage.getCertificateFromSerial(serial)
if not cert.content:
raise NotFound('Content certificate with serial %r is not found.' % (
serial,
))
return cert.content
def getSignedCertificateList(self, limit=0, with_data=False):
"""
Return list of signed certificate
@param limit: number of element to fetch, 0 is not limit (int)
@param with_data: True or False, say if return cert PEM string associated
to others informations (bool).
"""
return self._storage.getSignedCertificateList(limit, with_data)
......@@ -236,7 +273,7 @@ class CertificateAuthority(object):
def getValidCACertificateChain(self):
"""
Return the ca certificate chain for all valid certificates
Return the ca certificate chain for all valid certificates with key
"""
result = []
iter_key_pair = iter(self._ca_key_pairs_list)
......@@ -251,6 +288,8 @@ class CertificateAuthority(object):
def getCAKeypairForCertificate(self, cert):
"""
Return the nearest CA key_pair to the next extension date of the cert
@param cert: X509 certificate
"""
cert_valid_date = datetime.strptime(cert.get_notAfter(), '%Y%m%d%H%M%SZ')
next_valid_date = datetime.utcnow() + timedelta(0, self.crt_life_time)
......@@ -274,6 +313,20 @@ class CertificateAuthority(object):
return selected_keypair
def revokeCertificate(self, wrapped_crt):
"""
Revoke a certificate
@param wrapped_crt: The revoke request dict containing certificate to
revoke and signature algorithm used to sign the request.
{
"signature": "signature string for payload",
"digest": "Signature algorithm (ex: SHA256"),
"payload": dict of data: {
"revoke_crt": "Certificate to revoke",
"reason": "Revoke reason"
}
}
"""
payload = utils.unwrap(wrapped_crt, lambda x: x['revoke_crt'], self.digest_list)
try:
......@@ -287,18 +340,36 @@ class CertificateAuthority(object):
"The CA couldn't reconize the certificate to revoke.")
crt = self._loadCertificate(payload['revoke_crt'])
expiration_date = datetime.strptime(crt.get_notAfter(), '%Y%m%d%H%M%SZ')
expire_in = expiration_date - datetime.now()
if crt.has_expired():
raise ExpiredCertificate("Could not revoke a certificate which has expired" \
"since %r days." % -1*expire_in.days)
reason = payload['reason']
return self._storage.revokeCertificate(
utils.getSerialToInt(crt),
expiration_date,
reason)
def revokeCertificateFromSerial(self, serial):
"""
Directly revoke a certificate from serial
@param serial: The serial of the certificate (int)
"""
return self._storage.revokeCertificate(
serial,
reason="")
def renew(self, wrapped_csr):
"""
Renew a certificate
@param wrapped_csr: The revoke request dict containing certificate to
revoke and signature algorithm used to sign the request.
{
"signature": "signature string for payload",
"digest": "Signature algorithm (ex: SHA256"),
"payload": dict of data: {
"crt": "Old certificate to renew",
"renew_csr": "New CSR to sign"
}
}
"""
payload = utils.unwrap(wrapped_csr, lambda x: x['crt'], self.digest_list)
csr = payload['renew_csr']
......@@ -377,8 +448,8 @@ class CertificateAuthority(object):
# Here comes the actual certificate
serial = self._storage.getNextCertificateSerialNumber()
cert = crypto.X509()
# 3 = v3
cert.set_version(3)
# version v3
cert.set_version(2)
cert.set_serial_number(serial)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(self.crt_life_time)
......
......@@ -175,7 +175,8 @@ def main():
parser.error('`threshold` parameter is required with renew. Use --threshold VALUE')
parser.print_help()
exit(1)
backup_dir = os.path.join('.', 'old-%s' % datetime.now().strftime('%Y%m%d%H%M%S'))
backup_dir = os.path.join('.',
'backup-%s' % datetime.now().strftime('%Y-%m-%d-%H%M%S'))
# cleanup
if os.path.exists(CSR_KEY_FILE):
......@@ -192,7 +193,7 @@ def main():
class CertificateAuthorityRequest(object):
def __init__(self, key, certificate, cacertificate, ca_url,
max_retry=10, digest="sha256",
max_retry=10, digest="sha256", sleep_time=5,
verify_certificate=False, logger=None):
self.key = key
......@@ -200,7 +201,10 @@ class CertificateAuthorityRequest(object):
self.cacertificate = cacertificate
self.ca_url = ca_url
self.logger = logger
# maximum retry number of post/put request
self.max_retry = max_retry
# time to sleep before retry failed request
self.sleep_time = sleep_time
self.digest = digest
self.extension_manager = utils.X509Extension()
self.ca_certificate_list = []
......@@ -218,7 +222,7 @@ class CertificateAuthorityRequest(object):
if self.logger is None:
self.logger = logging.getLogger('Certificate Request')
self.logger = logging.getLogger('Caucase Request')
self.logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
......@@ -229,7 +233,7 @@ class CertificateAuthorityRequest(object):
self.generatePrivatekey(self.key)
def _request(self, method, url, data={}):
def _request(self, method, url, data=None):
try:
req = getattr(requests, method)
kw = {}
......@@ -260,9 +264,88 @@ class CertificateAuthorityRequest(object):
finally:
os.close(fd)
def _sendCertificateSigningRequest(self, csr_string):
request_url = '%s/csr' % self.ca_url
data = {'csr': csr_string}
retry = 0
response = self._request('put', request_url, data=data)
while (not response or response.status_code != 201) and retry < self.max_retry:
self.logger.error("%s: Failed to sent CSR. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % self.sleep_time)
time.sleep(self.sleep_time)
retry += 1
response = self._request('put', request_url, data=data)
if not response or response.status_code != 201:
raise Exception("ERROR: failed to send CSR after %s retry." % retry)
self.logger.info("CSR succefully sent.")
# Get csr Location from request header: http://xxx.com/csr/key
self.logger.debug("CSR location is: %s" % response.headers['Location'])
csr_key = response.headers['Location'].split('/')[-1]
with open(CSR_KEY_FILE, 'w') as fkey:
fkey.write(csr_key)
return csr_key
def _sendCertificateRenewal(self, cert, csr):
payload = dict(renew_csr=csr, crt=cert)
pkey = open(self.key).read()
wrapped = utils.wrap(payload, pkey, [self.digest])
request_url = '%s/crt/renew' % self.ca_url
data = {'payload': json.dumps(wrapped)}
self.logger.info("Sending Certificate Renewal request...")
response = self._request('put', request_url, data=data)
break_code = [201, 404, 500, 404]
retry = 1
while response is None or response.status_code not in break_code:
self.logger.error("%s: Failed to send renewal request. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % self.sleep_time)
time.sleep(self.sleep_time)
response = self._request('put', request_url, data=data)
retry += 1
if retry > self.max_retry:
break
if not response or response.status_code != 201:
raise Exception("ERROR: failed to send certificate renewal request "\
"after %s retry.\n%s" % (
retry, response.text))
csr_key = response.headers['Location'].split('/')[-1]
with open(RENEW_CSR_KEY_FILE, 'w') as fkey:
fkey.write(csr_key)
return csr_key
def _getSignedCertificate(self, crt_id):
reply_url = '%s/crt/%s' % (self.ca_url, crt_id)
response = self._request('get', reply_url)
while not response or response.status_code != 200:
time.sleep(self.sleep_time)
response = self._request('get', reply_url)
return response.text
def generateCertificateRequest(self, key_file, cn,
country='', state='', locality='', email='', organization='',
organization_unit='', csr_file=None):
"""
Generate certificate Signature request.
Parameter `cn` is mandatory
"""
with open(key_file) as fkey:
key = crypto.load_privatekey(crypto.FILETYPE_PEM, fkey.read())
......@@ -291,14 +374,15 @@ class CertificateAuthorityRequest(object):
if csr_file is not None:
with open(csr_file, 'w') as req_file:
req_file.write(csr)
os.chmod(csr_file, 0640)
return csr
def generatePrivatekey(self, output_file, size=2048):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, size)
"""
Generate private key into `output_file`
"""
try:
key_fd = os.open(output_file,
......@@ -308,10 +392,15 @@ class CertificateAuthorityRequest(object):
if e.errno != errno.EEXIST:
raise
else:
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, size)
os.write(key_fd, crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
os.close(key_fd)
def checkCertificateValidity(self, cert):
"""
validate the certificate PEM string with the CA Certificate and private key
"""
cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
pkey = open(self.key).read()
key_pem = crypto.load_privatekey(crypto.FILETYPE_PEM, pkey)
......@@ -336,7 +425,17 @@ class CertificateAuthorityRequest(object):
return True
return False
def getValidCACertificateChain(self):
def updateCACertificateChain(self):
"""
Request to CA all valid certificates an update in to cacertificate file
@note: if the CA has more that one valid certificate, the cacertificate
file will be updated contain concatenated cert them like:
CA_1
CA_2
...
CA_N
"""
ca_cert_url = '%s/crt/ca.crt.json' % self.ca_url
self.logger.info("Updating CA certificate file from %s" % ca_cert_url)
cert_list = response_json = []
......@@ -367,14 +466,15 @@ class CertificateAuthorityRequest(object):
"that a year." % self.cacertificate)
# if not old_x509.has_expired():
# XXX - TODO: check if expired old_x509 can break certificate validation
cert_list.append(old_x509)
cert_list.append(
crypto.load_certificate(crypto.FILETYPE_PEM, payload['new'])
)
cert_list_chain = "%s\n%s" % (payload['old'], payload['new'])
for next_itmen in iter_ca_cert:
payload = utils.unwrap(next_itmen, lambda x: x['old'], [self.digest])
for next_item in iter_ca_cert:
payload = utils.unwrap(next_item, lambda x: x['old'], [self.digest])
old_x509 = crypto.load_certificate(crypto.FILETYPE_PEM, payload['old'])
if self._checkCertEquals(cert_list[-1], old_x509):
cert_list.append(
......@@ -382,26 +482,31 @@ class CertificateAuthorityRequest(object):
)
cert_list_chain += "\n%s" % payload['new']
else:
raise CertificateVerificationError("Get updated CA Certificate " \
"retourned %s but validation of data failed" % response_json)
raise CertificateVerificationError("Get CA Certificate chain " \
"retourned %s \n\nbut validation of data failed" % response_json)
# dump into file
if not cert_list_chain or not cert_list:
if not cert_list:
# Nothing to do...
return
self.ca_certificate_list = cert_list
fd = os.open(self.cacertificate, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0640)
fd = os.open(self.cacertificate, os.O_CREAT|os.O_WRONLY, 0640)
try:
for cert in cert_list:
os.write(fd, cert_list_chain)
os.write(fd, cert_list_chain)
finally:
os.close(fd)
def getCACertificateChain(self):
"""
Get CA certificate file.
If it's the first download, get the latest valid certificate at ca.crt.pem
else, update current cacertificate with list of valid ca certificat chain
"""
# If cert file exists exist
if os.path.exists(self.cacertificate) and os.stat(self.cacertificate).st_size > 0:
# Get all valids CA certificate
return self.getValidCACertificateChain()
return self.updateCACertificateChain()
ca_cert_url = '%s/crt/ca.crt.pem' % self.ca_url
self.logger.info("getting CA certificate file %s" % ca_cert_url)
......@@ -412,6 +517,7 @@ class CertificateAuthorityRequest(object):
try:
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, response.text)
except crypto.Error, e:
# XXX - we got a bad certificate, break here ?
traceback.print_exc()
response = None
else:
......@@ -428,16 +534,17 @@ class CertificateAuthorityRequest(object):
os.close(fd)
def signCertificate(self, csr):
"""
Send certificate signature request and wait until the certificate is
signed.
csr parameter is string in PEM format
"""
if os.path.exists(self.certificate) and os.stat(self.certificate).st_size > 0:
# exit because the certificate exists
return
data = {'csr': csr}
retry = 0
sleep_time = 10
request_url = '%s/csr' % self.ca_url
csr_key = ""
self.logger.info("Request signed certificate from CA...")
if os.path.exists(CSR_KEY_FILE):
with open(CSR_KEY_FILE) as fkey:
......@@ -446,40 +553,15 @@ class CertificateAuthorityRequest(object):
if csr_key:
self.logger.info("Csr was already sent to CA, using csr : %s" % csr_key)
else:
response = self._request('put', request_url, data=data)
while (not response or response.status_code != 201) and retry < self.max_retry:
self.logger.error("%s: Failed to sent CSR. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % sleep_time)
time.sleep(sleep_time)
retry += 1
response = self._request('put', request_url, data=data)
if response.status_code != 201:
raise Exception("ERROR: failed to put CSR after % retry. Exiting..." % retry)
csr_key = self._sendCertificateSigningRequest(csr)
self.logger.info("CSR succefully sent.")
# Get csr Location from request header: http://xxx.com/csr/key
self.logger.debug("Csr location is: %s" % response.headers['Location'])
csr_key = response.headers['Location'].split('/')[-1]
with open(CSR_KEY_FILE, 'w') as fkey:
fkey.write(csr_key)
# csr is xxx.csr.pem so cert is xxx.cert.pem
self.logger.info("Waiting for signed certificate...")
reply_url = '%s/crt/%s.crt.pem' % (self.ca_url, csr_key[:-8])
response = self._request('get', reply_url)
while not response or response.status_code != 200:
time.sleep(sleep_time)
response = self._request('get', reply_url)
# csr is xxx.csr.pem so cert is xxx.cert.pem
certificate = self._getSignedCertificate('%s.crt.pem' % csr_key[:-8])
self.logger.info("Validating signed certificate...")
if not self.checkCertificateValidity(response.text):
if not self.checkCertificateValidity(certificate):
# certificate verification failed, should raise ?
self.logger.warn("Certificate validation failed.\n" \
"Please double check the signed certificate before use. Also consider" \
......@@ -488,16 +570,15 @@ class CertificateAuthorityRequest(object):
fd = os.open(self.certificate,
os.O_CREAT | os.O_EXCL | os.O_WRONLY | os.O_TRUNC, 0644)
try:
os.write(fd, response.text)
os.write(fd, certificate)
finally:
os.close(fd)
self.logger.info("Certificate correctly saved at %s." % self.certificate)
def revokeCertificate(self, message=""):
"""
Send a revocation request for the givent certificate to the master.
Revoke the current certificate on CA.
"""
sleep_time = 10
retry = 1
pkey = open(self.key).read()
......@@ -512,7 +593,7 @@ class CertificateAuthorityRequest(object):
request_url = '%s/crt/revoke' % self.ca_url
data = {'payload': json.dumps(wrapped)}
self.logger.info("Sent Certificate revocation request for CN: %s." % (
self.logger.info("Sending Certificate revocation request of CN: %s." % (
cert_pem.get_subject().CN))
response = self._request('put', request_url, data=data)
......@@ -522,15 +603,15 @@ class CertificateAuthorityRequest(object):
self.logger.error("%s: Failed to send revoke request. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % sleep_time)
time.sleep(sleep_time)
self.logger.info("will retry in %s seconds..." % self.sleep_time)
time.sleep(self.sleep_time)
response = self._request('put', request_url, data=data)
retry += 1
if retry < self.max_retry:
if retry > self.max_retry:
break
if response.status_code != 201:
if not response or response.status_code != 201:
raise Exception("ERROR: failed to put revoke certificate after %s retry. Exiting..." % retry)
self.logger.info("Certificate %s was successfully revoked." % (
......@@ -541,8 +622,7 @@ class CertificateAuthorityRequest(object):
"""
Renew the current certificate. Regenerate private key if renew_key is `True`
"""
sleep_time = 10
retry = 1
new_key_path = '%s.renew' % self.key
new_cert_path = '%s.renew' % self.certificate
key_file = self.key
......@@ -570,53 +650,15 @@ class CertificateAuthorityRequest(object):
else:
csr = open(csr_file).read()
payload = dict(
renew_csr=csr,
crt=cert)
pkey = open(self.key).read()
wrapped = utils.wrap(payload, pkey, [self.digest])
request_url = '%s/crt/renew' % self.ca_url
data = {'payload': json.dumps(wrapped)}
self.logger.info("Send Certificate Renewal request for CN: %s." % (
cert_pem.get_subject().CN))
response = self._request('put', request_url, data=data)
break_code = [201, 404, 500, 404]
while response is None or response.status_code not in break_code:
self.logger.error("%s: Failed to send renewal request. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % sleep_time)
time.sleep(sleep_time)
response = self._request('put', request_url, data=data)
retry += 1
if retry < self.max_retry:
break
if response.status_code != 201:
raise Exception("ERROR: failed to put certificate renewal request after %s retry. Exiting...\n%s" % (
retry, response.text))
csr_key = response.headers['Location'].split('/')[-1]
with open(RENEW_CSR_KEY_FILE, 'w') as fkey:
fkey.write(csr_key)
csr_key = self._sendCertificateRenewal(cert, csr)
self.logger.info("Waiting for signed certificate...")
reply_url = '%s/crt/%s.crt.pem' % (self.ca_url, csr_key[:-8])
response = self._request('get', reply_url)
while not response or response.status_code != 200:
time.sleep(sleep_time)
response = self._request('get', reply_url)
new_cert = self._getSignedCertificate('%s.crt.pem' % csr_key[:-8])
if not os.path.exists(backup_dir):
os.mkdir(backup_dir)
self._writeNewFile(new_cert_path, response.text)
self._writeNewFile(new_cert_path, new_cert)
# change location of files
if renew_key:
os.rename(self.key,
......@@ -630,7 +672,7 @@ class CertificateAuthorityRequest(object):
self.logger.info("Validating signed certificate...")
if not self.checkCertificateValidity(response.text):
if not self.checkCertificateValidity(new_cert):
# certificate verification failed, should raise ?
self.logger.warn("Certificate validation failed.\n" \
"Please double check the signed certificate before use. Also consider" \
......@@ -652,4 +694,3 @@ class CertificateAuthorityRequest(object):
if os.path.exists(path):
os.unlink(path)
......@@ -24,7 +24,7 @@ from datetime import datetime, timedelta
from OpenSSL import crypto
from caucase import db
from caucase import utils
from caucase.exceptions import (NoStorage, NotFound, Found)
from caucase.exceptions import (NoStorage, NotFound, Found, ExpiredCertificate)
from flask_user import UserMixin
STATUS_VALIDATED = 'validated'
......@@ -326,7 +326,7 @@ class Storage(object):
return data_list
def revokeCertificate(self, serial, not_after_date, reason=''):
def revokeCertificate(self, serial, reason=''):
"""
Add serial to the list of revoked certificates.
Associated certificate must expire at (or before) not_after_date, so
......@@ -341,11 +341,16 @@ class Storage(object):
if not cert:
raise NotFound('No certficate with serial %r' % (serial, ))
expire_in = cert.expire_after - datetime.utcnow()
if expire_in.days < 0:
raise ExpiredCertificate("Certificate with serial %r has expired" \
" since %r day(s)." % (serial, -1*expire_in.days))
revoke = Revocation(
serial=serial,
creation_date=datetime.utcnow(),
reason=reason,
crt_expire_after=not_after_date
crt_expire_after=cert.expire_after
)
# Set latest CRL as expired, it will be regenerated
crl = CertificateRevocationList.query.filter(
......
......@@ -27,7 +27,6 @@ from caucase.web import parseArguments, configure_flask
from OpenSSL import crypto, SSL
from caucase.exceptions import (NoStorage, NotFound, Found)
from caucase import utils
from caucase import db, app
from flask_testing import TestCase
from flask import url_for
......@@ -38,14 +37,16 @@ class CertificateAuthorityWebTest(TestCase):
configure_flask(parseArguments(['--ca-dir', self.ca_dir, '-s', '/CN=CA Auth Test/emailAddress=xx@example.com']))
def tearDown(self):
db.session.remove()
db.drop_all()
self.db.session.remove()
self.db.drop_all()
if os.path.exists(self.ca_dir):
shutil.rmtree(self.ca_dir)
def create_app(self):
from caucase import db, app
app.config['TESTING'] = True
app.config['LIVESERVER_PORT'] = 0
self.db = db
return app
def generateCSR(self, cn="toto.example.com"):
......
......@@ -38,8 +38,7 @@ from caucase.exceptions import (NoStorage, NotFound, Found, BadSignature,
CertificateVerificationError,
ExpiredCertificate)
from functools import wraps
from caucase import utils
from caucase import app, db
from caucase import utils, app, db
class DisabledStringField(StringField):
def __call__(self, *args, **kwargs):
......@@ -191,7 +190,6 @@ def configure_flask(options):
app.logger.addHandler(logger)
# Instanciate storage
# XXX - check loaded_crt_life_time here
from caucase.storage import Storage
storage = Storage(db,
max_csr_amount=options.max_request_amount,
......@@ -221,7 +219,7 @@ def configure_flask(options):
def check_authentication(username, password):
user = app.config.storage.findUser(username)
if user:
return app.user_manager.hash_password(password) == user.password
return app.user_manager.verify_password(password, user)
else:
return False
......@@ -234,7 +232,7 @@ def authenticated_method(func):
auth = request.authorization
if not auth:
return abort(401)
elif not Users.check_authentication(auth.username, auth.password):
elif not check_authentication(auth.username, auth.password):
return abort(401)
# Call the actual view
return func(*args, **kwargs)
......@@ -360,12 +358,18 @@ def before_request():
@app.route('/crl', methods=['GET'])
def get_crl():
"""
Get the lastest CRL (certificate revocation list)
"""
crl_content = app.config.ca.getCertificateRevocationList()
return send_file_content(crl_content, 'ca.crl.pem')
@app.route('/csr/<string:csr_id>', methods=['GET'])
def get_csr(csr_id):
"""
Get a CSR string in PEM format from identified by `csr_id`.
"""
try:
csr_content = app.config.ca.getPendingCertificateRequest(csr_id)
......@@ -377,6 +381,9 @@ def get_csr(csr_id):
@app.route('/csr', methods=['PUT'])
def request_cert():
"""
Store certificate signature request (csr) in PEM format
"""
csr_content = request.form.get('csr', '').encode('utf-8')
if not csr_content:
raise FlaskException("'csr' parameter is mandatory",
......@@ -399,6 +406,9 @@ def request_cert():
@app.route('/csr/<string:csr_id>', methods=['DELETE'])
@authenticated_method
def remove_csr(csr_id):
"""
Delete a Certificate signature request. Authentication required
"""
try:
app.config.ca.deletePendingCertificateRequest(csr_id)
......@@ -412,6 +422,9 @@ def remove_csr(csr_id):
@app.route('/crt/<string:cert_id>', methods=['GET'])
def get_crt(cert_id):
"""
Get a certificate by the id `cert_id`
"""
try:
cert_content = app.config.ca.getCertificate(cert_id)
......@@ -421,8 +434,24 @@ def get_crt(cert_id):
return send_file_content(cert_content, cert_id)
@app.route('/crt/serial/<string:serial>', methods=['GET'])
def crt_fromserial(serial):
"""
Get a certificate by the serial
"""
try:
cert_content = app.config.ca.getCertificateFromSerial(serial)
except NotFound, e:
raise FlaskException("%s" % str(e),
status_code=404, payload={"name": "FileNotFound", "code": 1})
return send_file_content(cert_content, '%s.crt.pem' % serial)
@app.route('/crt/ca.crt.pem', methods=['GET'])
def get_cacert():
"""
Get CA Certificate in PEM format string.
"""
ca_cert = app.config.ca.getCACertificate()
......@@ -430,7 +459,11 @@ def get_cacert():
@app.route('/crt/ca.crt.json', methods=['GET'])
def get_cacert_json():
"""
Return CA certificate chain list, if the CA certificate is being renewed
the list will contain the next certificate and the old certificate which
will expire soon.
"""
ca_chain_list = app.config.ca.getValidCACertificateChain()
return jsonify(ca_chain_list)
......@@ -460,6 +493,9 @@ def signcert(csr_key, redirect_to=''):
@app.route('/crt', methods=['PUT'])
@authenticated_method
def sign_cert():
"""
Sign a certificate, require authentication
"""
key = request.form.get('csr_id', '').encode('utf-8')
if not key:
raise FlaskException("'csr_id' parameter is a mandatory parameter",
......@@ -570,9 +606,30 @@ def request_revoke_crt():
response = Response("", status=201, )
return response
@app.route('/crt/revoke/serial', methods=['PUT'])
@authenticated_method
def revoke_crt():
"""
Directly revoke a certificate from his serial
"""
try:
serial = request.form.get('serial', '')
app.config.ca.revokeCertificateFromSerial(serial)
except ValueError, e:
traceback.print_exc()
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except ExpiredCertificate, e:
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except NotFound, e:
raise FlaskException(str(e),
status_code=404, payload={"name": "FileNotFound", "code": 1})
response = Response("", status=201)
return response
#Manage routes (Authentication required) - Flask APP
......
......@@ -18,7 +18,8 @@
import os
from caucase.web import parseArguments, configure_flask, app
from caucase import app
from caucase.web import parseArguments, configure_flask
from werkzeug.contrib.fixers import ProxyFix
def readConfigFromFile(config_file):
......@@ -52,4 +53,4 @@ def start_wsgi():
app.logger.info("Certificate Authority server ready...")
if __name__ == 'caucase.wsgi':
start_wsgi()
\ No newline at end of file
start_wsgi()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment