Commit 26015ada authored by Alain Takoudjou's avatar Alain Takoudjou Committed by Vincent Pelletier

initial implementation of certificate authority

The certificate authority is used to generate and sign certificate, there is 3 parts:
- web: which contains API to submit certificate signature request and to download signed certificate
- cliweb: which is a command line tool used to quickly generate private key and send certificate signature request, he will
also downlaod automatically the signed certificate as well as ca certificate.
- cli: is used to garbage collect certificate authority, all expired certificate, csr, crl and revocation will be trashed using this tool.

The first csr can be automatically signed, the rest will be signed by the adminitrator, first connection to /admin/ will ask to set password
the admin can see all csr (pending) then sign them. As soon as csr is signed, the client will download (cliweb) the certificate.

client can also renew or revoke his certificate using CA API. Renew and revoke are immediate, there is no admin approval.

on server side, the storage storage.py use sqlite to store all informations (certificat, csr, crl and revocations), there is no use of openssl here.
ca.py will invoke the storage to store or to get certificates.

the client store certificate directly on filesystem, so it can be read by apache, nginx, etc.
parent d2f8ede9
...@@ -15,3 +15,11 @@ ...@@ -15,3 +15,11 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>. # along with caucase. If not, see <http://www.gnu.org/licenses/>.
# See http://peak.telecommunity.com/DevCenter/setuptools#namespace-packages
try:
__import__('pkg_resources').declare_namespace(__name__)
except ImportError:
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
# -*- coding: utf-8 -*-
# This file is part of caucase # This file is part of caucase
# Copyright (C) 2017 Nexedi # Copyright (C) 2017 Nexedi
# Alain Takoudjou <alain.takoudjou@nexedi.com> # Alain Takoudjou <alain.takoudjou@nexedi.com>
...@@ -15,33 +16,150 @@ ...@@ -15,33 +16,150 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>. # along with caucase. If not, see <http://www.gnu.org/licenses/>.
import json import json
import os
import sys
import subprocess
import re
import time
import uuid
import errno
import tempfile
from OpenSSL import crypto, SSL
import traceback
from pyasn1.codec.der import encoder as der_encoder
from pyasn1.type import tag
from pyasn1_modules import rfc2459
from datetime import datetime, timedelta
from caucase.exceptions import (ExpiredCertificate, NotFound,
BadCertificateSigningRequest, CertificateVerificationError)
from caucase import utils
MIN_CA_RENEW_PERIOD = 2
DEFAULT_DIGEST_LIST = ['sha256', 'sha384', 'sha512']
SUBJECT_KEY_LIST = ['C', 'ST', 'L', 'OU', 'O', 'CN', 'emailAddress']
class CertificateAuthority(object): class CertificateAuthority(object):
def __init__(self, storage, ca_life_time, digest_list, crt_constraints...): def __init__(self, storage, ca_life_period, ca_renew_period,
crt_life_time, crl_renew_period, digest_list=None,
crl_base_url=None, ca_subject='',
max_csr_amount=50, crt_keep_time=0,
auto_sign_csr=True):
self._storage = storage self._storage = storage
self.reload() self.ca_life_period = ca_life_period
self.digest_list = digest_list
self.crt_life_time = crt_life_time
self.crl_renew_period = crl_renew_period
self.ca_renew_period = ca_renew_period
self.default_digest = 'sha256'
self.crl_base_url = crl_base_url
self.auto_sign_csr = auto_sign_csr
self.extension_manager = utils.X509Extension()
self.mandatory_subject_key_list = ['CN']
self.ca_subject_dict = self._getCASubjectDict(ca_subject)
# XXX - ERR_SSL_SERVER_CERT_BAD_FORMAT on browser
# Because if two certificate has the same serial from a CA with the same CN
# self.ca_subject_dict['CN'] = '%s %s' % (self.ca_subject_dict['CN'], int(time.time()))
if not self.digest_list:
self.digest_list = DEFAULT_DIGEST_LIST
if self.ca_life_period < MIN_CA_RENEW_PERIOD:
raise ValueError("'ca_life_period' value should be upper than %s" % MIN_CA_RENEW_PERIOD)
if self.crl_renew_period > 1:
raise ValueError("'crl_renew_period' is too high and should be less than a certificate life time.")
self.crl_life_time = int(self.crt_life_time * self.crl_renew_period)
self.ca_life_time = int(self.crt_life_time * self.ca_life_period)
self.ca_renew_time = int(self.crt_life_time * self.ca_renew_period)
self._ca_key_pairs_list = self._storage.getCAKeyPairList()
if not self._ca_key_pairs_list:
self.createCAKeyPair()
def reload(self): def _getCASubjectDict(self, ca_subject):
"""
Parse CA Subject from provided sting format
Ex: /C=XX/ST=State/L=City/OU=OUnit/O=Company/CN=CA Auth/emailAddress=xx@example.com
"""
ca_subject_dict = {}
regex = r"\/([C|ST|L|O|OU|CN|emailAddress]+)=([\w\s\@\.\d\-_\(\)\,\+:']+)"
matches = re.finditer(regex, ca_subject)
for match in matches:
key = match.group(1)
if not key in SUBJECT_KEY_LIST:
raise ValueError("Item %r is not a valid CA Subject key, please" \
"Check that the provided key is in %s" % (key,
SUBJECT_KEY_LIST))
ca_subject_dict[key] = match.group(2)
for key in self.mandatory_subject_key_list:
if key not in ca_subject_dict:
raise ValueError("The subject key '%r' is mandatory." % key)
return ca_subject_dict
def renewCAKeyPair(self):
""" """
Refresh instance's knowledge of database content Refresh instance's knowledge of database content
(as storage house-keeping may/will happen outside our control) (as storage house-keeping may/will happen outside our control)
""" """
self._ca_key_pairs_list = storage.getCAKeyPairList()
if not self._ca_key_pairs_list: cert = self._ca_key_pairs_list[-1]['crt']
self.createCAKeyPair() expire_date = datetime.strptime(cert.get_notAfter(), '%Y%m%d%H%M%SZ')
self._ca_key_pairs_list = storage.getCAKeyPairList() renew_date = expire_date - timedelta(0, self.ca_renew_time)
assert self._ca_key_pairs_list
if renew_date > datetime.now():
# The ca certificat should not be renewed now
return False
self.createCAKeyPair()
return True
def createCAKeyPair(self): def createCAKeyPair(self):
""" """
Create a new ca key + certificate pair Create a new ca key + certificate pair
""" """
# generate CA key pair key_pair = {}
key = crypto.PKey()
# Use 2048 bits key size
key.generate_key(crypto.TYPE_RSA, 2048)
key_pair['key'] = key
ca = crypto.X509()
# 3 = v3
ca.set_version(3)
ca.set_serial_number(int(time.time()))
subject = ca.get_subject()
for name, value in self.ca_subject_dict.items():
setattr(subject, name, value)
ca.gmtime_adj_notBefore(0)
ca.gmtime_adj_notAfter(self.ca_life_time)
ca.set_issuer(ca.get_subject())
ca.set_pubkey(key)
self.extension_manager.setCaExtensions(ca)
ca.sign(key, self.default_digest)
key_pair['crt'] = ca
self._storage.storeCAKeyPair(key_pair) self._storage.storeCAKeyPair(key_pair)
self._ca_key_pairs_list = self._storage.getCAKeyPairList()
assert self._ca_key_pairs_list
def getPendingCertificateRequestList(self): def getPendingCertificateRequest(self, csr_id):
return self._storage.getPendingCertificateRequestList() """
Retrieve the content of a pending signing request.
"""
return self._storage.getPendingCertificateRequest(csr_id)
def createCertificateSigningRequest(self, csr): def createCertificateSigningRequest(self, csr):
""" """
...@@ -53,62 +171,276 @@ class CertificateAuthority(object): ...@@ -53,62 +171,276 @@ class CertificateAuthority(object):
# Check it has a CN (?) # Check it has a CN (?)
# Check its extensions # Check its extensions
# more ? # more ?
return self._storage.storeCertificateSigningRequest(csr) try:
csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
except crypto.Error, e:
raise BadCertificateSigningRequest(str(e))
if not hasattr(csr_pem.get_subject(), 'CN') or not csr_pem.get_subject().CN:
raise BadCertificateSigningRequest("CSR has no common name set")
def deletePendingCertificateRequest(self, crt_id): # XXX check extensions
csr_id = self._storage.storeCertificateSigningRequest(csr_pem)
if self._storage.getCertificateSigningRequestAmount() == 1 \
and self.auto_sign_csr:
# if this is the first csr, sign immediately
self.createCertificate(csr_id)
return csr_id
def deletePendingCertificateRequest(self, csr_id):
""" """
Reject a pending certificate signing request. Reject a pending certificate signing request.
""" """
self._storage.deletePendingCertificateRequest(crt_id) self._storage.deletePendingCertificateRequest(csr_id)
def getCertificateSigningRequest(self, crt_id): def getPendingCertificateRequestList(self, limit=0, with_data=False):
""" """
Retrieve the content of a pending signing request. Return list of signed certificate
""" """
return self._storage.getCertificateSigningRequest(crt_id) return self._storage.getPendingCertificateRequestList(limit, with_data)
def createCertificate(self, crt_id): def createCertificate(self, csr_id, ca_key_pair=None):
csr = self.getCertificateSigningRequest(crt_id) """
Generate new signed certificate. `ca_key_pair` is the CA key_pair to use
if None, use the latest CA key_pair
"""
# Apply extensions (ex: "not a certificate", ...) # Apply extensions (ex: "not a certificate", ...)
# Generate a certificate from the CSR # Generate a certificate from the CSR
# Sign the certificate with the current CA key # Sign the certificate with the current CA key
self._storage.storeCertificate(crt_id, crt)
return crt csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM,
self._storage.getPendingCertificateRequest(csr_id))
if ca_key_pair is None:
ca_key_pair = self._ca_key_pairs_list[-1]
cert_pem = self._generateCertificateObjects(ca_key_pair, csr_pem)
crt_id = self._storage.storeCertificate(csr_id, cert_pem)
return crt_id
def getCertificate(self, crt_id): def getCertificate(self, crt_id):
return self._database.getCertificate(crt_id) return self._storage.getCertificate(crt_id)
def getSignedCertificateList(self, limit=0, with_data=False):
"""
Return list of signed certificate
"""
return self._storage.getSignedCertificateList(limit, with_data)
def getCACertificate(self): def getCACertificate(self):
""" """
Return current CA certificate Return current CA certificate
""" """
return self._ca_key_pairs_list[-1].crt return self._dumpCertificate(self._ca_key_pairs_list[-1]['crt'])
def getValidCACertificateChain(self): def getValidCACertificateChain(self):
""" """
Return the ca certificate chain for all valid certificates Return the ca certificate chain for all valid certificates
""" """
result = [] result = []
iter_key_pair = iter(self._ca_key_paid_list) iter_key_pair = iter(self._ca_key_pairs_list)
previous_key_pair = iter_key_pair.next() previous_key_pair = iter_key_pair.next()
for key_pair in iter_key_pair: for key_pair in iter_key_pair:
result.append(utils.wrap({ result.append(utils.wrap({
'old': previous_key_pair.crt, 'old': self._dumpCertificate(previous_key_pair['crt']),
'new': key_pair.crt, 'new': self._dumpCertificate(key_pair.crt),
}, previous_key_pair.key, self.digest_list)) }, self._dumpPrivatekey(previous_key_pair['key']), self.digest_list))
return result return result
def getCAKeypairForCertificate(self, cert):
"""
Return the nearest CA key_pair to the next extension date of the cert
"""
cert_valid_date = datetime.strptime(cert.get_notAfter(), '%Y%m%d%H%M%SZ')
next_valid_date = datetime.utcnow() + timedelta(0, self.crt_life_time)
# check which ca certificate should be used to renew the cert
selected_keypair = None
selected_date = None
for key_pair in self._ca_key_pairs_list:
expiration_date = datetime.strptime(key_pair['crt'].get_notAfter(), '%Y%m%d%H%M%SZ')
if expiration_date < next_valid_date:
continue
if selected_date and selected_date < expiration_date:
# Only get the lowest expiration_date which cover the renew notbefore date
continue
selected_keypair = key_pair
selected_date = expiration_date
if selected_keypair is None:
raise ValueError("No valid CA key_pair found with validity date upper than %r certificate lifetime" %
next_valid_date)
return selected_keypair
def revokeCertificate(self, wrapped_crt): def revokeCertificate(self, wrapped_crt):
payload = utils.unwrap(wrapped_crt, lambda x: x['crt'], self.digest_list) payload = utils.unwrap(wrapped_crt, lambda x: x['revoke_crt'], self.digest_list)
crt = payload['revoke_crt']
self._storage.revokeCertificate(crt.getSerial(), crt.getNotAfterDate()) try:
x509 = self._loadCertificate(payload['revoke_crt'])
except crypto.Error, e:
raise BadCertificate(str(e))
if not utils.verifyCertificateChain(x509,
[x['crt'] for x in self._ca_key_pairs_list]):
raise CertificateVerificationError("Certificate verification failed:" \
"The CA couldn't reconize the certificate to revoke.")
crt = self._loadCertificate(payload['revoke_crt'])
expiration_date = datetime.strptime(crt.get_notAfter(), '%Y%m%d%H%M%SZ')
expire_in = expiration_date - datetime.now()
if crt.has_expired():
raise ExpiredCertificate("Could not revoke a certificate which has expired" \
"since %r days." % -1*expire_in.days)
reason = payload['reason']
return self._storage.revokeCertificate(
utils.getSerialToInt(crt),
expiration_date,
reason)
def renew(self, wrapped_csr): def renew(self, wrapped_csr):
payload = utils.unwrap(wrapped_crt, lambda x: x['crt'], self.digest_list) payload = utils.unwrap(wrapped_csr, lambda x: x['crt'], self.digest_list)
csr = payload['renew_csr'] csr = payload['renew_csr']
crt_id = self.createCertificateSigningRequest(csr)
self.createCertificate(crt_id) try:
return crt_id x509 = self._loadCertificate(payload['crt'])
except crypto.Error, e:
raise BadCertificate(str(e))
try:
csr_pem = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
except crypto.Error, e:
raise BadCertificateSigningRequest(str(e))
if csr_pem.get_subject().CN != x509.get_subject().CN:
raise BadCertificateSigningRequest(
"Request common name does not match replaced certificate.")
if not self._storage.getCertificateFromSerial(utils.getSerialToInt(x509)):
raise NotFound('No Certificate with serial %r and Common Name %r found.' % (
x509.get_serial_number(),
x509.get_subject().CN,
))
if not utils.verifyCertificateChain(x509,
[x['crt'] for x in self._ca_key_pairs_list]):
raise CertificateVerificationError("Certificate verification failed:" \
"The CA couldn't reconize signed certificate.")
csr_id = self.createCertificateSigningRequest(csr)
# sign the new certificate using a specific ca key_pair
ca_key_pair = self.getCAKeypairForCertificate(x509)
self.createCertificate(csr_id, ca_key_pair)
return csr_id
def getCertificateRevocationList(self): def getCertificateRevocationList(self):
self._storage.getCertificateRevocationList() """
Generate certificate revocation list PEM
"""
crl = self._storage.getCertificateRevocationList()
if not crl:
# Certificate revocation list needs to be regenerated
return self._createCertificateRevocationList()
return crl
def _loadCertificate(self, cert_string):
"""
Load certificate in PEM format
"""
return crypto.load_certificate(crypto.FILETYPE_PEM, cert_string)
def _dumpCertificate(self, cert_object):
"""
Dump certificate in PEM format
"""
return crypto.dump_certificate(crypto.FILETYPE_PEM, cert_object)
def _loadPrivatekey(self, pkey):
"""
Load private key in PEM format
"""
return crypto.load_privatekey(crypto.FILETYPE_PEM, pkey)
def _dumpPrivatekey(self, pkey_object):
"""
Load private key in PEM format
"""
return crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey_object)
def _generateCertificateObjects(self, ca_key_pair, req):
"""
Generate certificate from CSR PEM Object.
This method set default certificate extensions, later will allow to set custom extensions
"""
# Here comes the actual certificate
serial = self._storage.getNextCertificateSerialNumber()
cert = crypto.X509()
# 3 = v3
cert.set_version(3)
cert.set_serial_number(serial)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(self.crt_life_time)
cert.set_issuer(ca_key_pair['crt'].get_subject())
cert.set_subject(req.get_subject())
cert.set_pubkey(req.get_pubkey())
self.extension_manager.setDefaultExtensions(
cert,
subject=cert,
issuer=ca_key_pair['crt'],
crl_url=self.crl_base_url)
cert.sign(ca_key_pair['key'], self.default_digest)
return cert
def _createCertificateRevocationList(self):
"""
Create CRL from certification revocation_list and return a PEM string content
"""
revocation_list = self._storage.getRevocationList()
now = datetime.utcnow()
crl = crypto.CRL()
# XXX - set_nextUpdate() doesn't update Next Update in generated CRL,
# So we have used export() which takes the number of days in param
#
# next_date = now + timedelta(0, 864000) #self.crl_life_time)
# crl.set_lastUpdate(now.strftime("%Y%m%d%H%M%SZ").encode("ascii"))
# crl.set_nextUpdate(next_date.strftime("%Y%m%d%H%M%SZ").encode("ascii"))
num_crl_days = int(round(self.crl_life_time/(24.0*60*60), 0))
if num_crl_days == 0:
# At least one day
num_crl_days = 1
for revocation in revocation_list:
revoked = crypto.Revoked()
revoked.set_rev_date(
revocation.creation_date.strftime("%Y%m%d%H%M%SZ").encode("ascii")
)
revoked.set_serial(revocation.serial.encode("ascii"))
revoked.set_reason(None) #b'%s' % revocation.reason)
crl.add_revoked(revoked)
version_number = self._storage.getNextCRLVersionNumber()
crl.set_version(version_number)
# XXX - set how to get the cacert here
cert = self._ca_key_pairs_list[-1]['crt']
key = self._ca_key_pairs_list[-1]['key']
#crl.sign(cert, key, self.default_digest)
dumped_crl = crl.export(
cert,
key,
type=crypto.FILETYPE_PEM,
days=num_crl_days,
digest=self.default_digest)
return self._storage.storeCertificateRevocationList(
crypto.load_crl(crypto.FILETYPE_PEM, dumped_crl),
expiration_date=(now + timedelta(num_crl_days, 0))
)
...@@ -15,3 +15,35 @@ ...@@ -15,3 +15,35 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>. # along with caucase. If not, see <http://www.gnu.org/licenses/>.
import os
import argparse
from caucase.web import app, db
def parseArguments():
"""
Parse arguments for Certificate Authority cli instance.
"""
parser = argparse.ArgumentParser()
parser.add_argument('-f', '--db-file', required=True,
help='Certificate authority data base file path')
return parser
def housekeeping(config):
"""
Start Storage housekeep method to cleanup garbages
"""
app.config.update(
DEBUG=False,
CSRF_ENABLED=True,
TESTING=False,
SQLALCHEMY_DATABASE_URI='sqlite:///%s' % config.db_file
)
from caucase.storage import Storage
storage = Storage(db)
storage.housekeep()
def main():
parser = parseArguments()
config = parser.parse_args()
housekeeping(config)
\ No newline at end of file
...@@ -15,3 +15,583 @@ ...@@ -15,3 +15,583 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>. # along with caucase. If not, see <http://www.gnu.org/licenses/>.
import os, errno
import time
import ConfigParser
import logging
import requests
import argparse
import traceback
import pem
import json
from OpenSSL import crypto
from caucase import utils
from datetime import datetime
CSR_KEY_FILE = 'csr.key.txt'
RENEW_CSR_KEY_FILE = 'renew_csr.key.txt'
def parseArguments():
"""
Parse arguments for Certificate Authority Request.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--ca-url',
required=True,
help='Certificate Authority URL')
parser.add_argument('-c', '--ca-crt-file',
default='ca.crt.pem',
help='Path for CA Cert file. default: %(default)s')
parser.add_argument('-x', '--crt-file',
default='crt.pem',
help='Path for Certificate file. default: %(default)s')
parser.add_argument('-k', '--key-file',
default='key.pem',
help='Path of key file. default: %(default)s')
parser.add_argument('-s', '--csr-file',
default='csr.pem',
help='Path where to store csr file. default: %(default)s')
parser.add_argument('--digest',
default="sha256",
help='Digest used to sign data. default: %(default)s')
parser.add_argument('--cn',
help='Common name to use for request new certificate')
parser.add_argument('--no-check-certificate',
action='store_false', default=True, dest='verify_certificate',
help='When connecting to CA on HTTPS, disable certificate verification.')
group = parser.add_mutually_exclusive_group()
group.add_argument('--request', action='store_true',
help='Request a new Certificate')
group.add_argument('--revoke', action='store_true',
help='Revoke existing certificate')
group.add_argument('--renew', action='store_true',
help='Renew current certificate and and replace with existing files')
return parser
def requestCertificate(config):
ca_request = CertificateAuthorityRequest(config.key_file, config.crt_file,
config.ca_crt_file, config.ca_url, digest=config.digest,
verify_certificate=config.verify_certificate)
# download or update ca crt file
ca_request.getCACertificateChain()
if os.path.exists(config.crt_file):
return
if not os.path.exists(config.csr_file):
csr = ca_request.generateCertificateRequest(config.key_file,
cn=config.cn, csr_file=config.csr_file)
else:
csr = open(config.csr_file).read()
ca_request.signCertificate(csr)
def revokeCertificate(config):
os.close(os.open(config.key_file, os.O_RDONLY))
os.close(os.open(config.crt_file, os.O_RDONLY))
ca_revoke = CertificateAuthorityRequest(config.key_file, config.crt_file,
config.ca_crt_file, config.ca_url, digest=config.digest,
verify_certificate=config.verify_certificate)
# download or update ca crt file
ca_revoke.getCACertificateChain()
ca_revoke.revokeCertificate()
def renewCertificate(config, backup_dir):
os.close(os.open(config.key_file, os.O_RDONLY))
os.close(os.open(config.crt_file, os.O_RDONLY))
ca_renew = CertificateAuthorityRequest(config.key_file, config.crt_file,
config.ca_crt_file, config.ca_url, digest=config.digest,
verify_certificate=config.verify_certificate)
# download or update ca crt file
ca_renew.getCACertificateChain()
ca_renew.renewCertificate(config.csr_file, backup_dir)
def main():
parser = parseArguments()
config = parser.parse_args()
base_dir = os.path.dirname(config.crt_file)
os.chdir(os.path.abspath(base_dir))
if config.request:
if not config.cn or not config.ca_url:
parser.print_help()
exit(1)
requestCertificate(config)
elif config.revoke:
if not config.ca_url:
parser.print_help()
exit(1)
revokeCertificate(config)
elif config.renew:
backup_dir = os.path.join('.', 'old-%s' % datetime.now().strftime('%Y%m%d%H%M%S'))
os.mkdir(backup_dir)
if os.path.exists(CSR_KEY_FILE):
os.rename(CSR_KEY_FILE, os.path.join(backup_dir, CSR_KEY_FILE))
if os.path.exists(config.csr_file):
base_name = os.path.basename(config.csr_file)
os.rename(config.csr_file, os.path.join(backup_dir, base_name))
renewCertificate(config, backup_dir)
else:
parser.print_help()
exit(1)
class CertificateAuthorityRequest(object):
def __init__(self, key, certificate, cacertificate, ca_url,
max_retry=10, digest="sha256",
verify_certificate=False, logger=None):
self.key = key
self.certificate = certificate
self.cacertificate = cacertificate
self.ca_url = ca_url
self.logger = logger
self.max_retry = max_retry
self.digest = digest
self.extension_manager = utils.X509Extension()
self.ca_certificate_list = []
self.verify_certificate = verify_certificate
while self.ca_url.endswith('/'):
# remove all / at end or ca_url
self.ca_url = self.ca_url[:-1]
if os.path.exists(self.cacertificate):
self.ca_certificate_list = [
crypto.load_certificate(crypto.FILETYPE_PEM, x._pem_bytes) for x in
pem.parse_file(self.cacertificate)
]
if self.logger is None:
self.logger = logging.getLogger('Certificate Request')
self.logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.generatePrivatekey(self.key)
def _request(self, method, url, data={}):
try:
req = getattr(requests, method)
kw = {}
if data:
kw['data'] = data
kw['verify'] = self.verify_certificate
return req(url, **kw)
except requests.ConnectionError, e:
self.logger.error("Got ConnectionError while sending request to CA. Url is %s\n%s" % (
url, str(e)))
return None
def _checkCertEquals(self, first_cert, second_cert):
"""
Say if two certificate PEM object are the same
XXX - more checks should be done ?
"""
return first_cert.set_subject().CN == second_cert.get_subject().CN and \
first_cert.get_serial_number() == second_cert.get_serial_number()
def _writeNewFile(self, file_path, content, mode=0644):
fd = os.open(file_path,
os.O_CREAT | os.O_EXCL | os.O_WRONLY | os.O_TRUNC, mode)
try:
os.write(fd, content)
finally:
os.close(fd)
def generateCertificateRequest(self, key_file, cn,
country='', state='', locality='', email='', organization='',
organization_unit='', csr_file=None):
with open(key_file) as fkey:
key = crypto.load_privatekey(crypto.FILETYPE_PEM, fkey.read())
req = crypto.X509Req()
subject = req.get_subject()
subject.CN = cn
if country:
subject.C = country
if state:
subject.ST = state
if locality:
subject.L = locality
if organization:
subject.O = organization
if organization_unit:
subject.OU = organization_unit
if email:
subject.emailAddress = email
req.set_pubkey(key)
self.extension_manager.setDefaultCsrExtensions(req)
req.sign(key, self.digest)
csr = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
if csr_file is not None:
with open(csr_file, 'w') as req_file:
req_file.write(csr)
os.chmod(csr_file, 0640)
return csr
def generatePrivatekey(self, output_file, size=2048):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, size)
try:
key_fd = os.open(output_file,
os.O_CREAT|os.O_WRONLY|os.O_EXCL|os.O_TRUNC,
0600)
except OSError, e:
if e.errno != errno.EEXIST:
raise
else:
os.write(key_fd, crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
os.close(key_fd)
def checkCertificateValidity(self, cert):
cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
pkey = open(self.key).read()
key_pem = crypto.load_privatekey(crypto.FILETYPE_PEM, pkey)
return utils.checkCertificateValidity(
self.ca_certificate_list,
cert_pem,
key_pem)
def getValidCACertificateChain(self):
ca_cert_url = '%s/crt/ca.crt.json' % self.ca_url
self.logger.info("Updating CA certificate file from %s" % ca_cert_url)
cert_list = response_json = []
cert_list_chain = ""
response = self._request('get', ca_cert_url)
while not response or response.status_code != 200:
# sleep a bit then try again until ca cert is ready
time.sleep(10)
response = self._request('get', ca_cert_url)
response_json = json.loads(response.text)
if len(response_json) > 0:
iter_ca_cert = iter(response_json)
is_valid = False
payload = utils.unwrap(iter_ca_cert.next(), lambda x: x['old'], [self.digest])
# check that old certificate is known
old_x509 = crypto.load_certificate(crypto.FILETYPE_PEM, payload['old'])
for x509 in self.ca_certificate_list:
if self._checkCertEquals(x509, old_x509):
is_valid = True
if not is_valid:
# no local certificate matches
raise CertificateVerificationError("Updated CA Certificate chain could " \
"not be validated using local CA Certificate at %r. \nYou can " \
"try removing your local ca file if it was not updated for more " \
"that a year." % self.cacertificate)
# if not old_x509.has_expired():
cert_list.append(old_x509)
cert_list.append(
crypto.load_certificate(crypto.FILETYPE_PEM, payload['new'])
)
cert_list_chain = "%s\n%s" % (payload['old'], payload['new'])
for next_itmen in iter_ca_cert:
payload = utils.unwrap(next_itmen, lambda x: x['old'], [self.digest])
old_x509 = crypto.load_certificate(crypto.FILETYPE_PEM, payload['old'])
if self._checkCertEquals(cert_list[-1], old_x509):
cert_list.append(
crypto.load_certificate(crypto.FILETYPE_PEM, payload['new'])
)
cert_list_chain += "\n%s" % payload['new']
else:
raise CertificateVerificationError("Get updated CA Certificate " \
"retourned %s but validation of data failed" % response_json)
# dump into file
if not cert_list_chain or not cert_list:
return
self.ca_certificate_list = cert_list
fd = os.open(self.cacertificate, os.O_CREAT | os.O_WRONLY | os.O_TRUNC, 0640)
try:
for cert in cert_list:
os.write(fd, cert_list_chain)
finally:
os.close(fd)
def getCACertificateChain(self):
# If cert file exists exist
if os.path.exists(self.cacertificate) and os.stat(self.cacertificate).st_size > 0:
# Get all valids CA certificate
return self.getValidCACertificateChain()
ca_cert_url = '%s/crt/ca.crt.pem' % self.ca_url
self.logger.info("getting CA certificate file %s" % ca_cert_url)
response = None
while not response or response.status_code != 200:
response = self._request('get', ca_cert_url)
if response is not None:
try:
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, response.text)
except crypto.Error, e:
traceback.print_exc()
response = None
else:
self.ca_certificate_list = [x509]
break
# sleep a bit then try again until ca cert is ready
time.sleep(10)
fd = os.open(self.cacertificate,
os.O_CREAT | os.O_EXCL | os.O_WRONLY | os.O_TRUNC, 0640)
try:
os.write(fd, response.text)
finally:
os.close(fd)
def signCertificate(self, csr):
if os.path.exists(self.certificate) and os.stat(self.certificate).st_size > 0:
return
data = {'csr': csr}
retry = 0
sleep_time = 10
request_url = '%s/csr' % self.ca_url
csr_key = ""
self.logger.info("Request signed certificate from CA...")
if os.path.exists(CSR_KEY_FILE):
with open(CSR_KEY_FILE) as fkey:
csr_key = fkey.read()
if csr_key:
self.logger.info("Csr was already sent to CA, using csr : %s" % csr_key)
else:
response = self._request('put', request_url, data=data)
while (not response or response.status_code != 201) and retry < self.max_retry:
self.logger.error("%s: Failed to sent CSR. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % sleep_time)
time.sleep(sleep_time)
retry += 1
response = self._request('put', request_url, data=data)
if response.status_code != 201:
raise Exception("ERROR: failed to put CSR after % retry. Exiting..." % retry)
self.logger.info("CSR succefully sent.")
# Get csr Location from request header: http://xxx.com/csr/key
self.logger.debug("Csr location is: %s" % response.headers['Location'])
csr_key = response.headers['Location'].split('/')[-1]
with open(CSR_KEY_FILE, 'w') as fkey:
fkey.write(csr_key)
# csr is xxx.csr.pem so cert is xxx.cert.pem
self.logger.info("Waiting for signed certificate...")
reply_url = '%s/crt/%s.crt.pem' % (self.ca_url, csr_key[:-8])
response = self._request('get', reply_url)
while not response or response.status_code != 200:
time.sleep(sleep_time)
response = self._request('get', reply_url)
self.logger.info("Validating signed certificate...")
if not self.checkCertificateValidity(response.text):
# certificate verification failed, should raise ?
self.logger.warn("Certificate validation failed.\n" \
"Please double check the signed certificate before use. Also consider" \
"revoke it and request a new signed certificate.")
fd = os.open(self.certificate,
os.O_CREAT | os.O_EXCL | os.O_WRONLY | os.O_TRUNC, 0644)
try:
os.write(fd, response.text)
finally:
os.close(fd)
self.logger.info("Certificate correctly saved at %s." % self.certificate)
def revokeCertificate(self, message=""):
"""
Send a revocation request for the givent certificate to the master.
"""
sleep_time = 10
retry = 1
pkey = open(self.key).read()
cert = open(self.certificate).read()
cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
payload = dict(
reason=message,
revoke_crt=cert)
wrapped = utils.wrap(payload, pkey, [self.digest])
request_url = '%s/crt/revoke' % self.ca_url
data = {'payload': json.dumps(wrapped)}
self.logger.info("Sent Certificate revocation request for CN: %s." % (
cert_pem.get_subject().CN))
response = self._request('put', request_url, data=data)
break_code = [201, 404, 500, 404]
while response is None or response.status_code not in break_code:
self.logger.error("%s: Failed to send revoke request. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % sleep_time)
time.sleep(sleep_time)
response = self._request('put', request_url, data=data)
retry += 1
if retry < self.max_retry:
break
if response.status_code != 201:
raise Exception("ERROR: failed to put revoke certificate after %s retry. Exiting..." % retry)
self.logger.info("Certificate %s was successfully revoked." % (
self.certificate))
def renewCertificate(self, csr_file, backup_dir, backup_key=True):
"""
Renew the current certificate. Regenerate private key if backup_key is `True`
"""
sleep_time = 10
retry = 1
new_key_path = '%s.renew' % self.key
new_cert_path = '%s.renew' % self.certificate
key_file = self.key
cert = open(self.certificate).read()
cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
csr_key = ""
try:
if backup_key:
self.generatePrivatekey(new_key_path)
key_file = new_key_path
if not os.path.exists(csr_file):
csr = self.generateCertificateRequest(key_file,
cn=cert_pem.get_subject().CN,
csr_file=csr_file)
else:
csr = open(csr_file).read()
if os.path.exists(RENEW_CSR_KEY_FILE):
csr_key = open(RENEW_CSR_KEY_FILE).read()
if not csr_key:
payload = dict(
renew_csr=csr,
crt=cert)
pkey = open(self.key).read()
wrapped = utils.wrap(payload, pkey, [self.digest])
request_url = '%s/crt/renew' % self.ca_url
data = {'payload': json.dumps(wrapped)}
self.logger.info("Send Certificate Renewal request for CN: %s." % (
cert_pem.get_subject().CN))
response = self._request('put', request_url, data=data)
break_code = [201, 404, 500, 404]
while response is None or response.status_code not in break_code:
self.logger.error("%s: Failed to send renewal request. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % sleep_time)
time.sleep(sleep_time)
response = self._request('put', request_url, data=data)
retry += 1
if retry < self.max_retry:
break
if response.status_code != 201:
raise Exception("ERROR: failed to put certificate renewal request after %s retry. Exiting...\n%s" % (
retry, response.text))
csr_key = response.headers['Location'].split('/')[-1]
with open(RENEW_CSR_KEY_FILE, 'w') as fkey:
fkey.write(csr_key)
self.logger.info("Waiting for signed certificate...")
reply_url = '%s/crt/%s.crt.pem' % (self.ca_url, csr_key[:-8])
response = self._request('get', reply_url)
while not response or response.status_code != 200:
time.sleep(sleep_time)
response = self._request('get', reply_url)
self.logger.info("Validating signed certificate...")
self._writeNewFile(new_cert_path, response.text)
# change location of files
if backup_key:
os.rename(self.key,
os.path.join(backup_dir, os.path.basename(self.key)))
os.rename(new_key_path, self.key)
self.logger.info("Private correctly renewed at %s." % self.key)
os.rename(self.certificate,
os.path.join(backup_dir, os.path.basename(self.certificate)))
os.rename(new_cert_path, self.certificate)
if not self.checkCertificateValidity(response.text):
# certificate verification failed, should raise ?
self.logger.warn("Certificate validation failed.\n" \
"Please double check the signed certificate before use. Also consider" \
"revoke it and request a new signed certificate.")
else:
self.logger.info("Certificate correctly renewed at %s." % self.certificate)
except:
raise
else:
for path in [csr_file, RENEW_CSR_KEY_FILE]:
if os.path.exists(path):
os.unlink(path)
finally:
for path in [new_cert_path, new_key_path]:
if os.path.exists(path):
os.unlink(path)
...@@ -33,3 +33,19 @@ class Found(CertificateAuthorityException): ...@@ -33,3 +33,19 @@ class Found(CertificateAuthorityException):
class BadSignature(CertificateAuthorityException): class BadSignature(CertificateAuthorityException):
"""Non-x509 signature check failed""" """Non-x509 signature check failed"""
class BadCertificateSigningRequest(CertificateAuthorityException):
"""CSR content doesn't contain all required elements"""
pass
class BadCertificate(CertificateAuthorityException):
"""Certificate is not a valid PEM content"""
pass
class CertificateVerificationError(CertificateAuthorityException):
"""Certificate is not valid, it was not signed by CA"""
pass
class ExpiredCertificate(CertificateAuthorityException):
"""Certificate has expired and could not be used"""
pass
\ No newline at end of file
# This file is part of caucase
# Copyright (C) 2017 Nexedi
# Alain Takoudjou <alain.takoudjou@nexedi.com>
# Vincent Pelletier <vincent@nexedi.com>
#
# caucase is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# caucase is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>.
.ui-table th, .ui-table td {
line-height: 1.5em;
text-align: left;
padding: .4em .5em;
vertical-align: middle;
}
.ui-overlay-a, .ui-page-theme-a, .ui-page-theme-a .ui-panel-wrapper {
background-color: #fff !important;
}
table .ui-table th, table .ui-table td {
vertical-align: middle;
}
.noshadow buton, .noshadow a, .noshadow input, .noshadow select {
-webkit-box-shadow: none !important;
-moz-box-shadow: none !important;
box-shadow: none !important;
}
.ui-error, html .ui-content .ui-error a, .ui-content a.ui-error {
color: red;
font-weight: bold;
}
html body {
overflow-x: hidden;
background: #fbfbfb;
}
.form-signin {
max-width: 330px;
padding: 15px;
margin: 0 auto;
}
.form-signin .form-signin-heading,
.form-signin .checkbox {
margin-bottom: 10px;
}
.form-signin .checkbox {
font-weight: normal;
}
.form-signin .form-control {
position: relative;
height: auto;
-webkit-box-sizing: border-box;
-moz-box-sizing: border-box;
box-sizing: border-box;
padding: 10px;
font-size: 16px;
}
.form-signin .form-control:focus {
z-index: 2;
}
.form-signin input[type="email"] {
margin-bottom: -1px;
border-bottom-right-radius: 0;
border-bottom-left-radius: 0;
}
.form-signin input[type="password"] {
margin-bottom: 10px;
border-top-left-radius: 0;
border-top-right-radius: 0;
}
/* Toggle Styles */
#wrapper {
padding-left: 0;
-webkit-transition: all 0.6s ease;
-moz-transition: all 0.6s ease;
-o-transition: all 0.6s ease;
transition: all 0.6s ease;
}
#wrapper.toggled {
padding-left: 200px;
}
#sidebar-wrapper {
z-index: 1000;
position: fixed;
left: 250px;
width: 0;
height: 100%;
margin-left: -250px;
overflow-y: auto;
background-color:#312A25 !Important;
-webkit-transition: all 0.2s ease;
-moz-transition: all 0.2s ease;
-o-transition: all 0.2s ease;
transition: all 0.2s ease;
}
#wrapper.toggled #sidebar-wrapper {
width: 0;
}
#page-content-wrapper {
width: 100%;
position: absolute;
padding: 10px;
}
#wrapper.toggled #page-content-wrapper {
position: absolute;
margin-left:-250px;
}
/* Sidebar Styles */
.nav-side-menu {
overflow: auto;
font-family: verdana;
font-size: 12px;
font-weight: 200;
background-color: #2a2f35; /* #313130 */
position: fixed;
top: 0px;
width: 300px;
height: 100%;
color: #e1ffff;
}
.nav-side-menu .brand {
background-color: #404040;
line-height: 50px;
display: block;
text-align: center;
font-size: 14px;
}
.nav-side-menu .toggle-btn {
display: none;
}
.nav-side-menu ul,
.nav-side-menu li {
list-style: none;
padding: 0px;
margin: 0px;
line-height: 35px;
cursor: pointer;
/*
.collapsed{
.arrow:before{
font-family: FontAwesome;
content: "\f053";
display: inline-block;
padding-left:10px;
padding-right: 10px;
vertical-align: middle;
float:right;
}
}
*/
}
.nav-side-menu ul :not(collapsed) .arrow:before,
.nav-side-menu li :not(collapsed) .arrow:before {
font-family: FontAwesome;
content: "\f078";
display: inline-block;
padding-left: 10px;
padding-right: 10px;
vertical-align: middle;
float: right;
}
.nav-side-menu ul .active,
.nav-side-menu li .active {
border-left: 3px solid #d19b3d;
background-color: #4f5b69;
}
.nav-side-menu ul .sub-menu li.active,
.nav-side-menu li .sub-menu li.active {
color: #d19b3d;
}
.nav-side-menu ul .sub-menu li.active a,
.nav-side-menu li .sub-menu li.active a {
color: #d19b3d;
}
.nav-side-menu ul .sub-menu li,
.nav-side-menu li .sub-menu li {
background-color: #181c20;
border: none;
line-height: 28px;
border-bottom: 1px solid #23282e;
margin-left: 0px;
}
.nav-side-menu ul .sub-menu li:hover,
.nav-side-menu li .sub-menu li:hover {
background-color: #020203;
}
.nav-side-menu ul .sub-menu li:before,
.nav-side-menu li .sub-menu li:before {
font-family: FontAwesome;
content: "\f105";
display: inline-block;
padding-left: 10px;
padding-right: 10px;
vertical-align: middle;
}
.nav-side-menu li {
padding-left: 0px;
border-left: 3px solid #2e353d;
border-bottom: 1px solid #23282e;
}
.nav-side-menu li a {
text-decoration: none;
color: #e1ffff;
display: block;
}
.nav-side-menu li a i {
padding-left: 10px;
width: 20px;
padding-right: 25px;
font-size: 18px;
}
.nav-side-menu li:hover {
border-left: 3px solid #d19b3d;
background-color: #4f5b69;
-webkit-transition: all 1s ease;
-moz-transition: all 1s ease;
-o-transition: all 1s ease;
-ms-transition: all 1s ease;
transition: all 1s ease;
}
@media (max-width: 767px) {
.nav-side-menu {
position: relative;
width: 100%;
margin-bottom: 10px;
}
.nav-side-menu .toggle-btn {
display: block;
cursor: pointer;
position: absolute;
right: 10px;
top: 10px;
z-index: 10 !important;
padding: 3px;
background-color: #ffffff;
color: #000;
width: 40px;
text-align: center;
}
.brand {
text-align: left !important;
font-size: 22px;
padding-left: 20px;
line-height: 50px !important;
}
}
@media (min-width: 767px) {
.nav-side-menu .menu-list .menu-content {
display: block;
}
#main {
width:calc(100% - 300px);
float: right;
}
}
body {
margin: 0px;
padding: 0px;
}
pre {
max-height: 600px;
}
.col-centered {
float: none;
margin: 0 auto;
}
.clickable{
cursor: pointer;
}
.table .panel-heading div {
margin-top: -18px;
font-size: 15px;
}
.table .panel-heading div span{
margin-left:5px;
}
.table .panel-body{
display: none;
}
.container .table>tbody>tr>td, .table>tbody>tr>th, .table>tfoot>tr>td, .table>tfoot>tr>th, .table>thead>tr>td, .table>thead>tr>th {
vertical-align: middle;
}
.margin-top-40 {
margin-top:40px;
}
.margin-lr-20 {
margin: 0 20px;
}
.flashes-messages div:first-child {
margin-top: 30px;
}
/* Dashboard boxes */
.dash-panel {
text-align: center;
padding: 1px 0;
}
html body a:hover > .dash-panel h4 {
text-decoration: none;
}
.dash-panel:hover {
background-color: #e6e6e6;
border-color: #adadad;
cursor: pointer;
}
.dash {
position: relative;
text-align: center;
width: 120px;
height: 55px;
margin: 10px auto 10px auto;
}
#dash-blue .number {
color: #30a5ff;
}
#dash-orange .number {
color: #ffb53e;
}
#dash-teal .number {
color: #1ebfae;
}
#dash-red .number {
color: #ef4040;
}
#dash-darkred .number {
color: #bd0849;
}
.dash .number {
display: block;
position: absolute;
font-size: 46px;
width: 120px;
}
.alert-error {
color: #a94442;
background-color: #f2dede;
border-color: #ebccd1;
}
$( document ).ready(function() {
$("#menu-toggle").click(function(e) {
e.preventDefault();
$("#wrapper").toggleClass("toggled");
});
});
(function(){
'use strict';
var $ = jQuery;
$.fn.extend({
filterTable: function(){
return this.each(function(){
$(this).on('keyup', function(e){
$('.filterTable_no_results').remove();
var $this = $(this),
search = $this.val().toLowerCase(),
target = $this.attr('data-filters'),
$target = $(target),
$rows = $target.find('tbody tr');
if(search === '') {
$rows.show();
} else {
$rows.each(function(){
var $this = $(this);
$this.text().toLowerCase().indexOf(search) === -1 ? $this.hide() : $this.show();
});
if($target.find('tbody tr:visible').size() === 0) {
var col_count = $target.find('tr').first().find('td').size();
var no_results = $('<tr class="filterTable_no_results"><td colspan="'+col_count+'">No results found</td></tr>');
$target.find('tbody').append(no_results);
}
}
});
});
}
});
$('[data-action="filter"]').filterTable();
})(jQuery);
$(function(){
// attach table filter plugin to inputs
$('[data-action="filter"]').filterTable();
$('.container').on('click', '.panel-heading span.filter', function(e){
var $this = $(this),
$panel = $this.parents('.panel');
$panel.find('.panel-body').slideToggle();
if($this.css('display') != 'none') {
$panel.find('.panel-body input').focus();
}
});
$('[data-toggle="tooltip"]').tooltip();
});
\ No newline at end of file
...@@ -16,54 +16,421 @@ ...@@ -16,54 +16,421 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>. # along with caucase. If not, see <http://www.gnu.org/licenses/>.
import os
import errno
import uuid
import hashlib
from datetime import datetime, timedelta
from OpenSSL import crypto
from caucase.web import db
from caucase import utils
from caucase.exceptions import (NoStorage, NotFound, Found)
from flask_user import UserMixin
STATUS_VALIDATED = 'validated'
STATUS_REVOKED = 'invalidated'
STATUS_REJECTED = 'rejected'
STATUS_PENDING = 'pending'
class Storage(object): class Storage(object):
def __init__(self, crt_life_time, loaded_crt_life_time):
def __init__(self, db_instance, max_csr_amount=None,
crt_keep_time=None, csr_keep_time=None):
self.db = db_instance
# initialise tables
self.db.create_all()
# store some config in storage
if max_csr_amount:
self.__setConfig('max-csr-amount', max_csr_amount)
if crt_keep_time is not None:
self.__setConfig('crt-keep-time', crt_keep_time) # 0 mean always keep in storage
if csr_keep_time is not None:
self.__setConfig('csr-keep-time', csr_keep_time) # 0 mean always keep non pending csr in storage
def _getConfig(self, key):
return Config.query.filter(Config.key == key).first()
def __setConfig(self, key, value):
"""
Add new config to storage
"""
entry = self._getConfig(key)
if not entry:
entry = Config(key=key, value='%s' % value)
self.db.session.add(entry)
else:
# update value
entry.value = value
self.db.session.commit()
def getConfig(self, key, default=None):
"""
Return a config value or default
"""
entry = self._getConfig(key)
if not entry:
return default
return entry.value
def _getMaxCsrCount(self):
return int(self.getConfig('max-csr-amount', 50))
def getNextCertificateSerialNumber(self):
last_cert = Certificate.query.order_by(
Certificate.id.desc()
).first()
if last_cert:
return last_cert.id + 1
else:
return 1
def getCAKeyPairList(self): def getCAKeyPairList(self):
""" """
Return the chronologically sorted (oldest in [0], newest in [-1]) certificate authority Return the chronologically sorted (oldest in [0], newest in [-1]) certificate authority
key pairs. key pairs.
""" """
item_list = CAKeypair.query.filter(
CAKeypair.active == True
).order_by(
CAKeypair.creation_date.asc()
).all()
if not item_list:
return []
keypair_list = []
for keypair in item_list:
keypair_list.append({
'crt': crypto.load_certificate(crypto.FILETYPE_PEM, keypair.certificate),
'key': crypto.load_privatekey(crypto.FILETYPE_PEM, keypair.key)
})
return keypair_list
def storeCAKeyPair(self, key_pair): def storeCAKeyPair(self, key_pair):
""" """
Store a certificate authority key pair. Store a certificate authority key pair.
""" """
serial = utils.getSerialToInt(key_pair['crt'])
crt_string = crypto.dump_certificate(crypto.FILETYPE_PEM, key_pair['crt'])
key_string = crypto.dump_privatekey(crypto.FILETYPE_PEM, key_pair['key'])
# check that keypair is not stored
keypair = CAKeypair.query.filter(
CAKeypair.active == True
).filter(
CAKeypair.serial == serial
).first()
if keypair:
raise Found('Another CA certificate exists with serial %r' % (serial, ))
saved_pair = CAKeypair(
serial=serial,
common_name=key_pair['crt'].get_subject().CN,
expire_after=datetime.strptime(
key_pair['crt'].get_notAfter(), '%Y%m%d%H%M%SZ'
),
start_before=datetime.strptime(
key_pair['crt'].get_notBefore(), '%Y%m%d%H%M%SZ'
),
key=key_string,
certificate=crt_string,
active=True,
creation_date=datetime.utcnow()
)
self.db.session.add(saved_pair)
self.db.session.commit()
def storeCertificateSigningRequest(self, csr): def storeCertificateSigningRequest(self, csr):
""" """
Store acertificate signing request and generate a unique ID for it. Store acertificate signing request and generate a unique ID for it.
""" """
raise NoStorage('Too many pending CSRs') csr_amount = self.countPendingCertificateSiningRequest()
if csr_amount >= self._getMaxCsrCount():
raise NoStorage('Too many pending CSRs')
content = crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr)
checksum = hashlib.md5(content).hexdigest()
check_csr = CertificateRequest.query.filter(
CertificateRequest.status == STATUS_PENDING
).filter(
CertificateRequest.checksum == checksum
).first()
if check_csr:
# this only prevent client loop sending the same csr until csr_amount is reached
return check_csr.csr_id
def deletePendingCertificateRequest(self, crt_id): key = str(uuid.uuid1())
raise NotFound('No pending CSR with id %r' % (crt_id, )) csr_id = '%s.csr.pem' % key
crt_id = '%s.crt.pem' % key
req = CertificateRequest(
content=content,
creation_date=datetime.utcnow(),
common_name=csr.get_subject().CN,
checksum=checksum,
csr_id=csr_id,
crt_id=crt_id)
def getCertificateSigningRequest(self, crt_id): request_amount = self._getConfig('csr-requested-amount')
raise NotFound('No pending CSR with id %r' % (crt_id, )) if not request_amount:
request_amount = Config(key='csr-requested-amount', value='1')
self.db.session.add(request_amount)
else:
request_amount.value = '%s' % (int(request_amount.value) + 1,)
def storeCertificate(self, crt_id, crt): self.db.session.add(req)
self.db.session.commit()
return csr_id
def deletePendingCertificateRequest(self, csr_id):
csr = CertificateRequest.query.filter(
CertificateRequest.status == STATUS_PENDING
).filter(
CertificateRequest.csr_id == csr_id
).first()
if csr:
self.db.session.delete(csr)
self.db.session.commit()
else:
raise NotFound('No pending CSR with id %r' % (csr_id, ))
def getPendingCertificateRequest(self, csr_id):
csr = CertificateRequest.query.filter(
CertificateRequest.status == STATUS_PENDING
).filter(
CertificateRequest.csr_id == csr_id
).first()
if csr:
return csr.content
raise NotFound('No pending CSR with id %r' % (csr_id, ))
def getPendingCertificateRequestList(self, limit=0, with_data=False):
""" """
Store certificate as crt_id. Return list of all CSR
""" """
raise Found('CRT already exists') data_list = []
index = 1
query = CertificateRequest.query.filter(
CertificateRequest.status == STATUS_PENDING
)
if limit > 0:
query.limit(limit)
csr_list = query.all()
for request_csr in csr_list:
csr = {
'index': index,
'csr_id': request_csr.csr_id,
'crt_id': request_csr.crt_id,
'common_name': request_csr.common_name,
'creation_date': request_csr.creation_date
}
if with_data:
certificate['content'] = request_csr.content
data_list.append(csr)
index += 1
return data_list
def storeCertificate(self, csr_id, crt):
"""
Store certificate as crt_id. crt is a certificate PEM object.
"""
csr = csr = CertificateRequest.query.filter(
CertificateRequest.status == STATUS_PENDING
).filter(
CertificateRequest.csr_id == csr_id
).first()
if not csr:
raise NotFound('No pending CSR with id %r' % (csr_id, ))
cert = Certificate.query.filter(
Certificate.status == STATUS_VALIDATED
).filter(
Certificate.crt_id == csr.crt_id
).first()
if cert:
raise Found('CRT already exists')
cert_db = Certificate(
crt_id=csr.crt_id,
serial=utils.getSerialToInt(crt),
common_name=crt.get_subject().CN,
expire_after=datetime.strptime(crt.get_notAfter(), '%Y%m%d%H%M%SZ'),
start_before=datetime.strptime(crt.get_notBefore(), '%Y%m%d%H%M%SZ'),
creation_date=datetime.utcnow(),
content=crypto.dump_certificate(crypto.FILETYPE_PEM, crt)
)
# Change Csr status as 'validated', so it can be trashed
csr.status = STATUS_VALIDATED
self.db.session.add(cert_db)
self.db.session.commit()
return csr.crt_id
def getCertificateFromSerial(self, serial):
cert = Certificate.query.filter(
Certificate.status == STATUS_VALIDATED
).filter(
Certificate.serial == '%s' % serial
).first()
if cert:
return cert
raise NotFound('No certficate with serial %r' % (serial, ))
# schedule certificate removal
def getCertificate(self, crt_id): def getCertificate(self, crt_id):
cert = Certificate.query.filter(
Certificate.status == STATUS_VALIDATED
).filter(
Certificate.crt_id == crt_id
).first()
if cert and cert.content:
# if content is emtpy, maybe the certificate content was stripped ?
return cert.content
raise NotFound('No certficate with id %r' % (crt_id, )) raise NotFound('No certficate with id %r' % (crt_id, ))
# schedule certificate removal # schedule certificate removal
def revokeCertificate(self, serial, not_after_date): def getSignedCertificateList(self, limit=0, with_data=False):
data_list = []
index = 1
query = Certificate.query.filter(
Certificate.status == STATUS_VALIDATED
)
if limit > 0:
query.limit(limit)
signed_cert_list = query.all()
for signed_cert in signed_cert_list:
certificate = {
'index': index,
'serial': signed_cert.serial,
'crt_id': signed_cert.crt_id,
'common_name': signed_cert.common_name,
'expire_after': signed_cert.expire_after,
'start_before': signed_cert.start_before,
}
if with_data:
certificate['content'] = signed_cert.content
data_list.append(certificate)
index += 1
return data_list
def revokeCertificate(self, serial, not_after_date, reason=''):
""" """
Add serial to the list of revoked certificates. Add serial to the list of revoked certificates.
Associated certificate must expire at (or before) not_after_date, so Associated certificate must expire at (or before) not_after_date, so
revocation can be pruned. revocation can be pruned.
""" """
# Store & make visible (ie, commit if applicable) cert = Certificate.query.filter(
# Flush cached CRL (forcing re-generation). Certificate.status == STATUS_VALIDATED
).filter(
Certificate.serial == serial
).first()
if not cert:
raise NotFound('No certficate with serial %r' % (serial, ))
revoke = Revocation(
serial=serial,
creation_date=datetime.utcnow(),
reason=reason,
crt_expire_after=not_after_date
)
# Set latest CRL as expired, it will be regenerated
crl = CertificateRevocationList.query.filter(
CertificateRevocationList.active == True
).first()
if crl:
crl.active = False
# this certificate is not valid anymore
cert.status = STATUS_REVOKED
self.db.session.add(revoke)
self.db.session.commit()
def getCertificateRevocationList(self): def getCertificateRevocationList(self):
"""
Get Certificate Rovocation List of None if there is no valid CRL
"""
last_revocation = CertificateRevocationList.query.order_by(
CertificateRevocationList.id.desc()
).first()
if last_revocation and last_revocation.active:
if (last_revocation.crl_expire_after - datetime.utcnow()).days >= 0:
return last_revocation.content
return None
def getNextCRLVersionNumber(self):
last_revocation = CertificateRevocationList.query.order_by(
CertificateRevocationList.id.desc()
).first()
if last_revocation:
return last_revocation.id + 1
else:
return 1
def storeCertificateRevocationList(self, crl, expiration_date):
"""
Store Certificate Revocation List, return stored crl string
XXX - send expiration_date because crypto.crl has no method to read this from crl object
"""
# Fetch cached CRL (or re-generate and store if not cached). # Fetch cached CRL (or re-generate and store if not cached).
return crl dumped_crl = crypto.dump_crl(crypto.FILETYPE_PEM, crl)
revocation_list = CertificateRevocationList(
creation_date=datetime.utcnow(),
crl_expire_after=expiration_date,
content=dumped_crl,
active=True
)
self.db.session.add(revocation_list)
self.db.session.commit()
return dumped_crl
def getRevocationList(self):
"""
Get the list of all revoked certificate which are not expired
"""
return Revocation.query.filter(
Revocation.crt_expire_after >= datetime.utcnow()
).all()
def getCertificateSigningRequestAmount(self):
"""
Return number of CSR which was requested until now
"""
return int(self.getConfig('csr-requested-amount', 0))
def countValidatedCertificate(self):
return Certificate.query.filter(
Certificate.status == STATUS_VALIDATED
).count()
def countPendingCertificateSiningRequest(self):
return CertificateRequest.query.filter(
CertificateRequest.status == STATUS_PENDING
).count()
def countRevokedCertificate(self):
return Certificate.query.filter(
Certificate.status == STATUS_REVOKED
).count()
def countCertificateRevocation(self):
return Revocation.query.count()
def housekeep(self): def housekeep(self):
""" """
...@@ -71,3 +438,163 @@ class Storage(object): ...@@ -71,3 +438,163 @@ class Storage(object):
ca certificates (because they exceeded their "not valid after" date), ca certificates (because they exceeded their "not valid after" date),
revocation of anway-expired certificates. revocation of anway-expired certificates.
""" """
crt_keep_time = int(self.getConfig('crt-keep-time', 0))
csr_keep_time = int(self.getConfig('csr-keep-time', 0))
expired_keypair_list = CAKeypair.query.filter(
CAKeypair.expire_after < datetime.utcnow()
).all()
for key_pair in expired_keypair_list:
# Desactivate this ca certificate
key_pair.active = False
# wipe certificate content
if crt_keep_time > 0:
check_date = datetime.utcnow() - timedelta(0, crt_keep_time)
cert_list = Certificate.query.filter(
Certificate.creation_date <= check_date
)
for cert in cert_list:
# clear x509 certificate information
cert.content = ""
# delete certificate request
if csr_keep_time > 0:
check_date = datetime.utcnow() - timedelta(0, csr_keep_time)
csr_list = CertificateRequest.query.filter(
CertificateRequest.status != STATUS_PENDING
).filter(
CertificateRequest.creation_date <= check_date
)
for csr in csr_list:
self.db.session.delete(csr)
# delete all expired Certificate Rovocation
revocation_list = Revocation.query.filter(
Revocation.crt_expire_after < datetime.utcnow()
).all()
for revocation in revocation_list:
self.db.session.delete(revocation)
# Delete all expired Certificate Rovocation List (CRL)
crl_list = CertificateRevocationList.query.filter(
CertificateRevocationList.crl_expire_after < datetime.utcnow()
).all()
for crl in crl_list:
self.db.session.delete(crl)
self.db.session.commit()
def findOrCreateUser(self, first_name, last_name, email, username, hash_password):
""" Find existing user or create new user """
user = User.query.filter(User.username == username).first()
if not user:
user = User(email=email,
first_name=first_name,
last_name=last_name,
username=username,
password=hash_password,
active=True,
confirmed_at=datetime.utcnow()
)
db.session.add(user)
db.session.commit()
return user
def findUser(self, username):
return User.query.filter(User.username == username).first()
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
# User authentication information
username = db.Column(db.String(50), nullable=False, unique=True)
password = db.Column(db.String(255), nullable=False, server_default='')
# User email information
email = db.Column(db.String(255), nullable=False, unique=True)
confirmed_at = db.Column(db.DateTime())
# User information
active = db.Column('is_active', db.Boolean(), nullable=False, server_default='0')
first_name = db.Column(db.String(100), nullable=False, server_default='')
last_name = db.Column(db.String(100), nullable=False, server_default='')
class Config(db.Model):
"""
This table store some configs and information
"""
__tablename__ = 'config'
key = db.Column(db.String(50), primary_key=True)
value = db.Column(db.Text)
class CAKeypair(db.Model):
"""
This table is used ca certificate key pair
"""
__tablename__ = 'ca_keypair'
id = db.Column(db.Integer, primary_key=True)
serial = db.Column(db.String(50), unique=True)
expire_after = db.Column(db.DateTime)
start_before = db.Column(db.DateTime)
common_name = db.Column(db.String(50), unique=False)
active = db.Column(db.Boolean(), nullable=False, server_default='1')
certificate = db.Column(db.Text)
key = db.Column(db.Text)
creation_date = db.Column(db.DateTime)
class CertificateRequest(db.Model):
"""
This table is used to store certificate signature request
"""
__tablename__ = 'csr'
id = db.Column(db.Integer, primary_key=True)
csr_id=db.Column(db.String(80), unique=True)
crt_id = db.Column(db.String(80), unique=True)
common_name = db.Column(db.String(50), unique=False)
content = db.Column(db.Text)
creation_date = db.Column(db.DateTime)
status = db.Column(db.String(20), unique=False, server_default=STATUS_PENDING)
# checksum prevent to store twice the same csr
checksum = db.Column(db.String(50))
class Certificate(db.Model):
"""
This table is used to store some informations about certificate
"""
__tablename__ = 'certificate'
id = db.Column(db.Integer, primary_key=True)
crt_id = db.Column(db.String(80), unique=True)
serial = db.Column(db.String(50), unique=True)
common_name = db.Column(db.String(50), unique=False)
expire_after = db.Column(db.DateTime)
start_before = db.Column(db.DateTime)
creation_date = db.Column(db.DateTime)
# status = validated or revoked
status = db.Column(db.String(20), unique=False, server_default=STATUS_VALIDATED)
content = db.Column(db.Text)
class Revocation(db.Model):
"""
This table store certificate revocation request from users
"""
__tablename__ = 'revoked'
id = db.Column(db.Integer, primary_key=True)
serial = db.Column(db.String(50), unique=False)
crt_expire_after = db.Column(db.DateTime)
reason = db.Column(db.String(200), unique=False)
creation_date = db.Column(db.DateTime)
class CertificateRevocationList(db.Model):
"""
This table store certificate revocation list content
"""
__tablename__ = 'crl'
id = db.Column(db.Integer, primary_key=True)
active = db.Column(db.Boolean(), nullable=False, server_default='1')
creation_date = db.Column(db.DateTime)
crl_expire_after = db.Column(db.DateTime)
content = db.Column(db.Text)
{% extends "layout.html" %}
{% block content %}
<form class="form-signin" method="POST" action="/admin/setpassword">
<h2 class="form-signin-heading" style="margin-bottom: 30px">Set admin password</h2>
<label for="pw" class="sr-only">Password</label>
<input type="inputPassword" name="password" id="pw" class="form-control" placeholder="password" required autofocus>
<label for="pw2" class="sr-only">Confirm Password:</label>
<input type="inputPassword" name="password2" id="pw2" class="form-control" placeholder="Confirm password" required autofocus>
<br/>
<button class="btn btn-lg btn-primary btn-block" type="submit">configure</button>
</form>
{% endblock %}
{% extends "layout.html" %}
{% block pre_content %}
<div class="row">
<div class="col-sm-7 col-md-6 col-lg-5 col-centered">
{% endblock %}
{% block post_content %}
</div>
</div>
{% endblock %}
\ No newline at end of file
{% extends 'flask_user/flask_user_base.html' %}
\ No newline at end of file
{% extends 'flask_user/flask_user_base.html' %}
\ No newline at end of file
<!-- Placed at the end of the document so the pages load faster -->
<script src="https://ajax.googleapis.com/ajax/libs/jquery/1.12.4/jquery.min.js"></script>
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/js/bootstrap.min.js" integrity="sha384-Tc5IQib027qvyjSMfHjOMaLkfuWVxZxUPnCJA7l2mCWNIpG9mGCD8wGNIcPD7Txa" crossorigin="anonymous"></script>
<script type=application/javascript src="{{ url_for('static', filename='scripts/index.js') }}"></script>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Certificate Authority web</title>
<link rel=stylesheet type=text/css href="{{ url_for('static', filename='css/styles.css') }}">
<link href="//netdna.bootstrapcdn.com/font-awesome/4.2.0/css/font-awesome.min.css" rel="stylesheet" type="text/css" />
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
{% extends "layout.html" %}
{% block content %}
<div class="page-header">
<h1>Certificate authority<small> Signed Certificates</small></h1>
</div>
<div class="row">
<div class="col-md-12">
<div class="panel panel-success margin-top-40 table">
<div class="panel-heading">
<h3 class="panel-title">List of signed Certificates</h3>
<div class="pull-right">
<span class="clickable filter" data-toggle="tooltip" title="Toggle table filter" data-container="body">
<i class="glyphicon glyphicon-filter"></i>
</span>
</div>
</div>
<div class="panel-body">
<input type="text" class="form-control" id="cacert-table-filter" data-action="filter" data-filters="#cacert-table" placeholder="Filter Columns" />
</div>
<table class="table table-hover" id="cacert-table">
<thead>
<tr>
<th>#</th>
<th>Name</th>
<th>Common Name</th>
<th>Signature Date</th>
<th>Expiration Date</th>
<th></th>
</tr>
</thead>
<tbody>
{% for cert in data_list -%}
<tr>
<td>{{ cert['index'] }}</td>
<td><a href="/crt/{{ cert['crt_id'] }}">{{ cert['crt_id'] }}</a></td>
<td>{{ cert['common_name'] }}</td>
<td>{{ cert['start_before'] }}</td>
<td>{{ cert['expire_after'] }}</td>
<td><a class="btn btn-default" href="/crt/{{ cert['crt_id'] }}" role="button" title="Download file"><i class="fa fa-download" aria-hidden="true"></i></a></td>
</tr>
{% endfor -%}
</tbody>
</table>
</div>
</div>
</div>
{% endblock %}
\ No newline at end of file
<!DOCTYPE html>
<html>
<head>
{% include "head.html" %}
</head>
<body>
{% block body %}
<div class="container-fluid">
<div class="row">
<div class="nav-side-menu">
<div class="brand">Certificate Authority</div>
<i class="fa fa-bars fa-2x toggle-btn" data-toggle="collapse" data-target="#menu-content"></i>
<div class="menu-list">
<ul id="menu-content" class="menu-content collapse out">
{% if not session.user_id %}
<li>
<a href="/">
<i class="fa fa-home" aria-hidden="true"></i> Public Home
</a>
</li>
<li>
<a href="/user/sign-in">
<i class="fa fa-cog" aria-hidden="true"></i> Manage Certificates
</a>
</li>
{% else -%}
<li>
<a href="/admin/">
<i class="fa fa-home" aria-hidden="true"></i> Signed Certificates
</a>
</li>
<li><a href="/admin/csr_requests"><i class="fa fa-tachometer" aria-hidden="true"></i> Manage CSR
<span style="font-weight: bold; margin-left: 5px; margin-right: 5px; color: #56d8ce;">[ {{ session.count_csr }} ] </span></a></li>
<!--<li><a href="/signed_certs"><i class="fa fa-check-square" aria-hidden="true"></i> Signed Certificates</a></li>
<li><a href="/revoked_certs"><i class="fa fa-minus-square" aria-hidden="true"></i> Revoked Certificates</a></li>-->
<li><a href="/admin/profile"><i class="fa fa-user" aria-hidden="true"></i> User Profile</a></li>
<!--<li><a href="/admin/logs"><i class="fa fa-book" aria-hidden="true"></i> Certificate Authority Logs</a></li>-->
<li><a href="/admin/logout"><i class="fa fa-sign-out" aria-hidden="true"></i> Logout</a></li>
{% endif -%}
</ul>
</div>
</div>
<div class="container" id="main">
<div class="flashes-messages">
{% with messages = get_flashed_messages(with_categories=true) %}
<!-- Categories: success (green), info (blue), warning (yellow), danger (red) -->
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category }} alert-dismissible" role="alert">
<button type="button" class="close" data-dismiss="alert" aria-label="Close"><span aria-hidden="true">&times;</span></button>
{{ message|safe }}
</div>
{% endfor %}
{% endif %}
{% endwith %}
</div>
{% block pre_content %}{% endblock %}
{% block content %}{% endblock %}
{% block post_content %}{% endblock %}
</div>
</div>
</div>
{% endblock %}
{% include "footer.html" %}
</body>
</html>
\ No newline at end of file
{% macro render_field(field, label=None, label_visible=true, right_url=None, right_label=None, readonly=false) -%}
<div class="form-group {% if field.errors %}has-error{% endif %} {{ kwargs.pop('class_', '') }}">
{% if field.type != 'HiddenField' and label_visible %}
{% if not label %}{% set label=field.label.text %}{% endif %}
<label for="{{ field.id }}" class="control-label">{{ label|safe }}</label>
{% endif %}
{{ field(class_='form-control', readonly=readonly, **kwargs) }}
{% if field.errors %}
{% for e in field.errors %}
<p class="help-block">{{ e }}</p>
{% endfor %}
{% endif %}
</div>
{%- endmacro %}
{% macro render_checkbox_field(field, label=None) -%}
{% if not label %}{% set label=field.label.text %}{% endif %}
<div class="checkbox">
<label>
{{ field(type='checkbox', **kwargs) }} {{ label }}
</label>
</div>
{%- endmacro %}
{% macro render_radio_field(field) -%}
{% for value, label, checked in field.iter_choices() %}
<div class="radio">
<label>
<input type="radio" name="{{ field.id }}" id="{{ field.id }}" value="{{ value }}"{% if checked %} checked{% endif %}>
{{ label }}
</label>
</div>
{% endfor %}
{%- endmacro %}
{% macro render_submit_field(field, label=None, tabindex=None) -%}
{% if not label %}{% set label=field.label.text %}{% endif %}
{#<button type="submit" class="form-control btn btn-lg btn-primary btn-block">{{label}}</button>#}
<input type="submit" class="btn btn-lg btn-primary btn-block" value="{{label}}"
{% if tabindex %}tabindex="{{ tabindex }}"{% endif %}
>
{%- endmacro %}
\ No newline at end of file
{% extends "layout.html" %}
{% block content %}
<div class="page-header">
<h1>Administration <small>Certificate authority web</small></h1>
</div>
<div class="row">
<div class="col-md-12">
<div class="panel panel-primary margin-top-40 table">
<div class="panel-heading">
<h3 class="panel-title">Certificates Signature request list</h3>
<div class="pull-right">
<span class="clickable filter" data-toggle="tooltip" title="Toggle table filter" data-container="body">
<i class="glyphicon glyphicon-filter"></i>
</span>
</div>
</div>
<div class="panel-body">
<input type="text" class="form-control" id="cacert-table-filter" data-action="filter" data-filters="#cacert-table" placeholder="Filter Columns" />
</div>
<table class="table table-hover" id="cacert-table">
<thead>
<tr>
<th class="hidden-xs">#</th>
<th class="hidden-xs">CSR ID</th>
<th class="hidden-xs">CRT ID</th>
<th>Common Name</th>
<th class="hidden-xs">Request Date</th>
<th></th>
</tr>
</thead>
<tbody>
{% for csr in data_list -%}
<tr>
<td class="hidden-xs">{{ csr['index'] }}</td>
<td class="hidden-xs">{{ csr['csr_id'] }}</td>
<td class="hidden-xs">{{ csr['crt_id'] }}</td>
<td>{{ csr['common_name'] }}</td>
<td class="hidden-xs">{{ csr['creation_date'] }}</td>
<td>
<div class="btn-group">
<button type="button" class="btn btn-default dropdown-toggle" data-toggle="dropdown" aria-haspopup="true" aria-expanded="false">
Action <span class="caret"></span>
</button>
<ul class="dropdown-menu dropdown-menu-right">
<li><a href="/admin/signcert?csr_id={{ csr['csr_id'] }}"><i class="fa fa-check-square" aria-hidden="true"></i> Sign</a></li>
<li><a href="/csr/{{ csr['csr_id'] }}"><i class="fa fa-eye" aria-hidden="true"></i> Download</a></li>
<li role="separator" class="divider"></li>
<li><a href="/admin/deletecsr?csr_id={{ csr['csr_id'] }}"><i class="fa fa-times" aria-hidden="true"></i> Delete</a></li>
</ul>
</div>
</td>
</tr>
{% endfor -%}
</tbody>
</table>
</div>
</div>
</div>
{% endblock %}
\ No newline at end of file
{% extends "layout.html" %}
{% block content %}
<div class="page-header">
<h1>User account information</h1>
</div>
<div style="padding: 15px">
<p>
<a class="btn btn-default" href="{{ url_for('user.change_password') }}" role="button" title="Download file">
<i class="fa fa-pencil-square-o" aria-hidden="true"></i> Change password</a>
</p>
{% from "macros.html" import render_field, render_submit_field %}
<form action="" method="POST" class="form" role="form">
<div class="row">
<div class="col-sm-6 col-md-5 col-lg-4">
{{ form.hidden_tag() }}
{{ render_field(form.username, tabindex=230, readonly=true) }}
{{ render_field(form.first_name, tabindex=240) }}
{{ render_field(form.last_name, tabindex=250) }}
{{ render_field(form.email, tabindex=260) }}
{{ render_submit_field(form.submit, tabindex=280) }}
</div>
</div>
</form>
</div>
{% endblock %}
\ No newline at end of file
{% extends "layout.html" %}
{% block content %}
<div class="page-header">
<h1>View {{ cert_type }} Details <small></small></h1>
</div>
<div class="row">
<div class="col-md-12">
<div class="panel panel-default">
<div class="panel-heading">
<h3 class="panel-title">{{ name |safe }}</h3>
</div>
<div class="panel-body">
<pre>{{ content }}</pre>
</div>
</div>
</div>
</div>
{% endblock %}
\ No newline at end of file
{% extends "layout.html" %}
{% block content %}
<div class="page-header">
<h1>Certificate Authority <small>view logs content</small></h1>
</div>
<div class="row">
<div class="col-md-12">
<div class="panel panel-default">
<div class="panel-heading">
<h3 class="panel-title">{{ filename }}</h3>
</div>
<div class="panel-body">
<p>
<a class="btn btn-default" href="/admin/view_logs?full=true" role="button" title="Download file">
<i class="fa fa-file-text" aria-hidden="true"></i> Full log</a>
<a class="btn btn-default" href="/admin/view_logs" role="button" title="Download file">
<i class="fa fa-file-text" aria-hidden="true"></i> Tail log</a>
</p>
<pre style="max-height: 600px;">{{ content }}</pre>
</div>
</div>
</div>
</div>
{% endblock %}
\ No newline at end of file
# This file is part of caucase
# Copyright (C) 2017 Nexedi
# Alain Takoudjou <alain.takoudjou@nexedi.com>
# Vincent Pelletier <vincent@nexedi.com>
#
# caucase is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# caucase is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>.
import json
from caucase.exceptions import BadSignature, CertificateVerificationError
from OpenSSL import crypto, SSL
from pyasn1.codec.der import encoder as der_encoder
from pyasn1.type import tag
from pyasn1_modules import rfc2459
class GeneralNames(rfc2459.GeneralNames):
"""
rfc2459 has wrong tagset.
"""
tagSet = tag.TagSet(
(),
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0),
)
class DistributionPointName(rfc2459.DistributionPointName):
"""
rfc2459 has wrong tagset.
"""
tagSet = tag.TagSet(
(),
tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0),
)
class ExtensionType():
CRL_DIST_POINTS = "crlDistributionPoints"
BASIC_CONSTRAINTS = "basicConstraints"
KEY_USAGE = "keyUsage"
NS_CERT_TYPE = "nsCertType"
NS_COMMENT = "nsComment"
SUBJECT_KEY_ID = "subjectKeyIdentifier"
AUTH_KEY_ID = "authorityKeyIdentifier"
class X509Extension(object):
known_extension_list = [name for (attr, name) in vars(ExtensionType).items()
if attr.isupper()]
def setX509Extension(self, ext_type, critical, value, subject=None, issuer=None):
if not ext_type in self.known_extension_list:
raise ValueError('Extension type is not known from ExtensionType class')
if ext_type == ExtensionType.CRL_DIST_POINTS:
cdp = self._getCrlDistPointExt(value)
return crypto.X509Extension(
b'%s' % ext_type,
critical,
'DER:' + cdp.encode('hex'),
subject=subject,
issuer=issuer,
)
else:
return crypto.X509Extension(
ext_type,
critical,
value,
subject=subject,
issuer=issuer,
)
def _getCrlDistPointExt(self, cdp_list):
cdp = rfc2459.CRLDistPointsSyntax()
position = 0
for cdp_type, cdp_value in cdp_list:
cdp_entry = rfc2459.DistributionPoint()
general_name = rfc2459.GeneralName()
if not cdp_type in ['dNSName', 'directoryName', 'uniformResourceIdentifier']:
raise ValueError("crlDistributionPoints GeneralName '%s' is not valid" % cdp_type)
general_name.setComponentByName(cdp_type, cdp_value)
general_names = GeneralNames()
general_names.setComponentByPosition(0, general_name)
name = DistributionPointName()
name.setComponentByName('fullName', general_names)
cdp_entry.setComponentByName('distributionPoint', name)
cdp.setComponentByPosition(position, cdp_entry)
position += 1
return der_encoder.encode(cdp)
def setCaExtensions(self, cert_obj):
"""
extensions for default certificate
"""
cert_obj.add_extensions([
self.setX509Extension(ExtensionType.BASIC_CONSTRAINTS, True,
"CA:TRUE, pathlen:0"),
self.setX509Extension(ExtensionType.NS_COMMENT,
False, "OpenSSL CA Certificate"),
self.setX509Extension(ExtensionType.KEY_USAGE,
True, "keyCertSign, cRLSign"),
self.setX509Extension(ExtensionType.SUBJECT_KEY_ID,
False, "hash", subject=cert_obj),
])
cert_obj.add_extensions([
self.setX509Extension(ExtensionType.AUTH_KEY_ID,
False, "keyid:always,issuer", issuer=cert_obj)
])
def setDefaultExtensions(self, cert_obj, subject=None, issuer=None, crl_url=None):
"""
extensions for default certificate
"""
cert_obj.add_extensions([
self.setX509Extension(ExtensionType.BASIC_CONSTRAINTS, False, "CA:FALSE"),
self.setX509Extension(ExtensionType.NS_COMMENT,
False, "OpenSSL Generated Certificate"),
self.setX509Extension(ExtensionType.SUBJECT_KEY_ID,
False, "hash", subject=subject),
])
cert_obj.add_extensions([
self.setX509Extension(ExtensionType.AUTH_KEY_ID,
False, "keyid,issuer", issuer=issuer)
])
if crl_url:
cert_obj.add_extensions([
self.setX509Extension(ExtensionType.CRL_DIST_POINTS,
False, [("uniformResourceIdentifier", crl_url)])
])
def setDefaultCsrExtensions(self, cert_obj, subject=None, issuer=None):
"""
extensions for certificate signature request
"""
cert_obj.add_extensions([
self.setX509Extension(ExtensionType.BASIC_CONSTRAINTS, False, "CA:FALSE"),
self.setX509Extension(ExtensionType.KEY_USAGE,
False, "nonRepudiation, digitalSignature, keyEncipherment"),
])
def getSerialToInt(x509):
return '{0:x}'.format(int(x509.get_serial_number()))
def validateCertAndKey(cert_pem, key_pem):
ctx = SSL.Context(SSL.TLSv1_METHOD)
ctx.use_privatekey(key_pem)
ctx.use_certificate(cert_pem)
try:
ctx.check_privatekey()
except SSL.Error:
return False
else:
return True
def verifyCertificateChain(cert_pem, trusted_cert_list, crl=None):
# Create and fill a X509Sore with trusted certs
store = crypto.X509Store()
for trusted_cert in trusted_cert_list:
store.add_cert(trusted_cert)
if crl:
store.add_crl(crl)
store.set_flags(crypto.X509StoreFlags.CRL_CHECK)
store_ctx = crypto.X509StoreContext(store, cert_pem)
# Returns None if certificate can be validated
try:
result = store_ctx.verify_certificate()
except crypto.X509StoreContextError, e:
raise CertificateVerificationError('Certificate verification error: %s' % str(e))
except crypto.Error, e:
raise CertificateVerificationError('Certificate verification error: %s' % str(e))
if result is None:
return True
else:
return False
def checkCertificateValidity(ca_cert_list, cert_pem, key_pem=None):
if not verifyCertificateChain(cert_pem, ca_cert_list):
return False
if key_pem:
return validateCertAndKey(cert_pem, key_pem)
return True
def sign(data, key, digest="sha256"):
"""
Sign a data using digest and return signature.
"""
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
sign = crypto.sign(pkey, data, digest)
#data_base64 = base64.b64encode(sign)
return sign
def verify(data, cert_string, signature, digest="sha256"):
"""
Verify the signature for a data string.
cert_string: is the certificate content as string
signature: is generate using 'signData' from the data to verify
data: content to verify
digest: by default is sha256, set the correct value
"""
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert_string)
return crypto.verify(x509, signature, data, digest.encode("ascii", 'ignore'))
def wrap(payload, key, digest_list):
"""
Sign payload (json-serialised) with key, using one of the given digests.
"""
# Choose a digest between the ones supported
# how to choose the default digest ?
digest = digest_list[0]
payload = json.dumps(payload)
return {
"payload": payload,
"digest": digest,
"signature": sign(payload + digest + ' ', key, digest).encode('base64'),
}
def unwrap(wrapped, getCertificate, digest_list):
"""
Raise if signature does not match payload. Returns payload.
"""
# Check whether given digest is allowed
if wrapped['digest'] not in digest_list:
raise BadSignature('Given digest is not supported')
payload = json.loads(wrapped['payload'])
crt = getCertificate(payload)
try:
verify(wrapped['payload'] + wrapped['digest'] + ' ', crt, wrapped['signature'].decode('base64'), wrapped['digest'])
except crypto.Error, e:
raise BadSignature('Signature mismatch: %s' % str(e))
return payload
def tail(path, lines=100):
"""
Returns the last `lines` lines of file `path`.
"""
f = open(path).read()
BUFSIZ = 1024
f.seek(0, 2)
bytes = f.tell()
size = lines + 1
block = -1
data = []
while size > 0 and bytes > 0:
if bytes - BUFSIZ > 0:
f.seek(block * BUFSIZ, 2)
data.insert(0, f.read(BUFSIZ))
else:
f.seek(0, 0)
data.insert(0, f.read(bytes))
linesFound = data[0].count('\n')
size -= linesFound
bytes -= BUFSIZ
block -= 1
return '\n'.join(''.join(data).splitlines()[-lines:])
# This file is part of caucase
# Copyright (C) 2017 Nexedi
# Alain Takoudjou <alain.takoudjou@nexedi.com>
# Vincent Pelletier <vincent@nexedi.com>
#
# caucase is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# caucase is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>.
import logging
import os, errno
import argparse
import traceback
import json
import flask
from flask import (Flask, session, request, redirect, url_for, render_template,
jsonify, session, abort, send_file, flash, g, Response)
from flask_sqlalchemy import SQLAlchemy
from flask_user import UserManager, SQLAlchemyAdapter
from wtforms import StringField, SubmitField, validators
from flask_wtf import FlaskForm
from flask_mail import Mail
from flask_user import login_required, current_user
from flask_login import logout_user #, login_user, current_user, login_required
from caucase.ca import CertificateAuthority, DEFAULT_DIGEST_LIST, MIN_CA_RENEW_PERIOD
from caucase.exceptions import (NoStorage, NotFound, Found, BadSignature,
BadCertificateSigningRequest,
BadCertificate,
CertificateVerificationError,
ExpiredCertificate)
from functools import wraps
from caucase import utils
app = Flask(__name__)
# Use default value so SQLALCHEMY will not warn because there is not db_uri
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///ca.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
class DisabledStringField(StringField):
def __call__(self, *args, **kwargs):
kwargs.setdefault('disabled', True)
return super(ReadonlyStringField, self).__call__(*args, **kwargs)
# Define the User profile form
class UserProfileForm(FlaskForm):
username = StringField('Username')
first_name = StringField('First name', validators=[
validators.DataRequired('First name is required')])
last_name = StringField('Last name', validators=[
validators.DataRequired('Last name is required')])
email = StringField('Email', validators=[
validators.DataRequired('Email is required')])
submit = SubmitField('Save')
def parseArguments(argument_list=[]):
"""
Parse arguments for Certificate Authority instance.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--ca-dir',
help='Certificate authority base directory')
parser.add_argument('-H', '--host',
default='127.0.0.1',
help='Host or IP of ca server. Default: %(default)s')
parser.add_argument('-P', '--port',
default='9086', type=int,
help='Port for ca server. Default: %(default)s')
parser.add_argument('-d', '--debug',
action="store_true", dest="debug", default=False,
help='Enable debug mode.')
parser.add_argument('-l', '--log-file',
help='Path for log output')
parser.add_argument('--crt-life-time',
default=365*24*60*60, type=int,
help='The time in seconds before a generated certificate will expire. Default: 365*24*60*60 seconds (1 year)')
parser.add_argument('-s', '--subject',
default='',
help='Formatted subject string to put into generated CA Certificate file. Ex: /C=XX/ST=State/L=City/OU=OUnit/O=Company/CN=CAAuth/emailAddress=xx@example.com')
parser.add_argument('--ca-life-period',
default=10, type=float,
help='Number of individual certificate validity periods during which the CA certificate is valid. Default: %(default)s')
parser.add_argument('--ca-renew-period',
default=MIN_CA_RENEW_PERIOD, type=float,
help='Number of individual certificate validity periods during which both the existing and the new CA Certificates are valid. Default: %(default)s')
parser.add_argument('--crl-life-period',
default=1/50., type=float,
help='Number of individual certificate validity periods during which the CRL is valid. Default: %(default)s')
parser.add_argument('-D', '--digest',
action='append', dest='digest_list', default=DEFAULT_DIGEST_LIST,
help='Allowed digest for all signature. Default: %(default)s')
parser.add_argument('--max-request-amount',
default=50,
help='Maximun pending certificate signature request amount. Default: %(default)s')
parser.add_argument('--crt-keep-time',
default=30*24*60*60, type=int,
help='The time in seconds before a generated certificate will be deleted on CA server. Set 0 to never delete. Default: 30*24*60*60 seconds (30 days)')
parser.add_argument('--external-url',
help="The HTTP URL at which this tool's \"/\" path is reachable by all certificates users in order to retrieve latest CRL.")
parser.add_argument('--no-auto-sign-csr', action='store_true',
help='Say if the first csr must be signed automatically. Has no effect if there is more that one submitted CSR')
if argument_list:
return parser.parse_args(argument_list)
return parser.parse_args()
def getLogger(debug=False, log_file=None):
logger = logging.getLogger("CertificateAuthority")
logger.setLevel(logging.INFO)
if not log_file:
logger.addHandler(logging.StreamHandler())
else:
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
logger.addHandler(handler)
logger.info('Configured logging to file %r' % log_file)
if debug:
logger.setLevel(logging.DEBUG)
return logger
def getConfig(self, key):
if key in self.keys():
temp_dict = dict()
temp_dict.update(self)
return temp_dict[key]
else:
raise KeyError
def start():
"""
Start Web Flask application server
"""
options = parseArguments()
configure_flask(options)
app.logger.info("Certificate Authority server started on http://%s:%s" % (
options.host, options.port))
app.run(
host=options.host,
port=int(options.port)
)
def configure_flask(options):
"""
Configure certificate authority service
"""
if not options.ca_dir:
options.ca_dir = os.getcwd()
else:
options.ca_dir = os.path.abspath(options.ca_dir)
if not options.external_url:
options.external_url = 'http://[%s]:%s' % (options.host, options.port)
db_file = "sqlite:///%s"% os.path.join(options.ca_dir, 'ca.db')
# work in ca directory
os.chdir(options.ca_dir)
# init Flask app
app.config.update(
DEBUG=options.debug,
CSRF_ENABLED=True,
USER_AFTER_LOGIN_ENDPOINT='manage_csr',
USER_AFTER_LOGOUT_ENDPOINT='index',
USER_ENABLE_USERNAME=True,
USER_ENABLE_EMAIL=False,
USER_ENABLE_REGISTRATION=False,
USER_ENABLE_CHANGE_USERNAME=False,
SECRET_KEY = 'This is an UNSECURE Secret. Please CHANGE THIS for production environments.',
SQLALCHEMY_DATABASE_URI=db_file
)
flask.config.Config.__getattr__ = getConfig
mail = Mail(app)
# Setup Flask-User
# XXX - User table Will go away when switching to CA for Users
if not app.config['TESTING']:
from caucase.storage import User
db_adapter = SQLAlchemyAdapter(db, User) # Register the User model
user_manager = UserManager(db_adapter, app) # Initialize Flask-User
logger = getLogger(options.debug, options.log_file)
app.logger.addHandler(logger)
# Instanciate storage
# XXX - check loaded_crt_life_time here
from caucase.storage import Storage
storage = Storage(db,
max_csr_amount=options.max_request_amount,
crt_keep_time=options.crt_keep_time,
csr_keep_time=options.crt_keep_time)
ca = CertificateAuthority(
storage=storage,
ca_life_period=options.ca_life_period,
ca_renew_period=options.ca_renew_period,
crt_life_time=options.crt_life_time,
crl_renew_period=options.crl_life_period,
digest_list=options.digest_list,
crl_base_url='%s/crl' % options.external_url,
ca_subject=options.subject,
auto_sign_csr=(not options.no_auto_sign_csr)
)
# XXX - Storage argument Will go away when switching to CA for Users
app.config.update(
storage=storage,
ca=ca,
log_file=options.log_file,
)
def check_authentication(username, password):
user = app.config.storage.findUser(username)
if user:
return app.user_manager.hash_password(password) == user.password
else:
return False
def authenticated_method(func):
""" This decorator ensures that the current user is logged in before calling the actual view.
Abort with 401 when the user is not logged in."""
@wraps(func)
def decorated_view(*args, **kwargs):
# User must be authenticated
auth = request.authorization
if not auth:
return abort(401)
elif not Users.check_authentication(auth.username, auth.password):
return abort(401)
# Call the actual view
return func(*args, **kwargs)
return decorated_view
class FlaskException(Exception):
status_code = 400
code = 400
def __init__(self, message, status_code=None, payload=None):
Exception.__init__(self)
self.message = message
if status_code is not None:
self.status_code = status_code
self.payload = payload
def to_dict(self):
rv = dict(self.payload or ())
# rv['code'] = self.code
rv['message'] = self.message
return rv
@app.errorhandler(FlaskException)
def handle_invalid_usage(error):
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response
@app.errorhandler(401)
def error401(error):
if error.description is None:
message = {
'code': 401,
'name': 'Unauthorized',
'message': "Authenticate."
}
else:
message = error.description
response = jsonify(message)
response.status_code = 401
response.headers['WWW-Authenticate'] = 'Basic realm="Login Required"'
return response
@app.errorhandler(403)
def error403(error):
if error.description is None:
message = {
'code': 404,
'name': 'Forbidden',
'message': 'Forbidden. Your are not allowed to access %s' % request.url,
}
else:
message = error.description
response = jsonify(message)
response.status_code = 404
return response
@app.errorhandler(404)
def error404(error):
if error.description is None:
message = {
'code': 404,
'name': 'NotFound',
'message': 'Resource not found: ' + request.url,
}
else:
message = error.description
response = jsonify(message)
response.status_code = 404
return response
@app.errorhandler(400)
def error400(error):
if error.description is None:
message = {
'code': 400,
'name': 'BadRequest',
'message': 'The request could not be understood by the server, you probably provided wrong parameters.'
}
else:
message = error.description
response = jsonify(message)
response.status_code = 400
return response
def send_file_content(content, filename, mimetype='text/plain'):
return Response(content,
mimetype=mimetype,
headers={"Content-Disposition":
"attachment;filename=%s" % filename})
@app.before_request
def before_request():
# XXX - This function Will be modified or removed when switching to CA for Users
is_admin_path = request.path.startswith('/admin') or request.path.startswith('/user')
if not is_admin_path and not request.path.startswith('/certificates'):
return
if is_admin_path:
csr_count = app.config.storage.countPendingCertificateSiningRequest()
if csr_count < 10:
csr_count = '0%s' % csr_count
session['count_csr'] = csr_count
if request.path == '/admin/configure' or request.path == '/admin/setpassword':
# check if password file exists, if yes go to index
if app.config.storage.findUser('admin'):
return redirect(url_for('admin'))
return
# XXX - using hard username
if not app.config.storage.findUser('admin'):
return redirect(url_for('configure'))
g.user = current_user
# Routes for certificate Authority
@app.route('/crl', methods=['GET'])
def get_crl():
crl_content = app.config.ca.getCertificateRevocationList()
return send_file_content(crl_content, 'ca.crl.pem')
@app.route('/csr/<string:csr_id>', methods=['GET'])
def get_csr(csr_id):
try:
csr_content = app.config.ca.getPendingCertificateRequest(csr_id)
except NotFound, e:
raise FlaskException(str(e),
status_code=404, payload={"name": "FileNotFound", "code": 1})
return send_file_content(csr_content, csr_id)
@app.route('/csr', methods=['PUT'])
def request_cert():
csr_content = request.form.get('csr', '').encode('utf-8')
if not csr_content:
raise FlaskException("'csr' parameter is mandatory",
payload={"name": "MissingParameter", "code": 2})
try:
csr_id = app.config.ca.createCertificateSigningRequest(csr_content)
except BadCertificateSigningRequest, e:
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except NoStorage, e:
raise FlaskException(str(e),
payload={"name": "NoStorage", "code": 4})
response = Response("", status=201)
response.headers['Location'] = url_for('get_csr', _external=True, csr_id=csr_id)
return response
@app.route('/csr/<string:csr_id>', methods=['DELETE'])
@authenticated_method
def remove_csr(csr_id):
try:
app.config.ca.deletePendingCertificateRequest(csr_id)
except NotFound, e:
raise FlaskException("%s" % str(e),
status_code=404, payload={"name": "FileNotFound", "code": 1})
response = Response("", status=200)
return response
@app.route('/crt/<string:cert_id>', methods=['GET'])
def get_crt(cert_id):
try:
cert_content = app.config.ca.getCertificate(cert_id)
except NotFound, e:
raise FlaskException("%s" % str(e),
status_code=404, payload={"name": "FileNotFound", "code": 1})
return send_file_content(cert_content, cert_id)
@app.route('/crt/ca.crt.pem', methods=['GET'])
def get_cacert():
ca_cert = app.config.ca.getCACertificate()
return send_file_content(ca_cert, 'ca.crt.pem')
@app.route('/crt/ca.crt.json', methods=['GET'])
def get_cacert_json():
ca_chain_list = app.config.ca.getValidCACertificateChain()
return jsonify(ca_chain_list)
def signcert(csr_key, redirect_to=''):
try:
cert_id = app.config.ca.createCertificate(csr_key)
except NotFound, e:
raise FlaskException("%s" % str(e),
status_code=404, payload={"name": "FileNotFound", "code": 1})
except Found, e:
# Certificate is found
raise FlaskException("%s" % str(e),
payload={"name": "FileFound", "code": 5})
# XXX - to remove (flask UI)
flash('Certificate is signed!', 'success')
if redirect_to:
return redirect(url_for(redirect_to))
response = Response("", status=201)
response.headers['Location'] = url_for('get_crt', _external=True, cert_id=cert_id)
return response
@app.route('/crt', methods=['PUT'])
@authenticated_method
def sign_cert():
key = request.form.get('csr_id', '').encode('utf-8')
if not key:
raise FlaskException("'csr_id' parameter is a mandatory parameter",
payload={"name": "MissingParameter", "code": 2})
return signcert(key)
@app.route('/crt/renew', methods=['PUT'])
def renew_cert():
"""
this method is used to renew expired certificate.
"""
payload = request.form.get('payload', '')
if not payload:
# Bad parameters
raise FlaskException("'payload' parameter is mandatory",
payload={"name": "MissingParameter", "code": 2})
try:
wrapped = json.loads(payload)
except ValueError, e:
raise FlaskException("payload parameter is not a valid Json string: %s" % str(e),
payload={"name": "FileFormat", "code": 3})
try:
cert_id = app.config.ca.renew(wrapped)
except ValueError, e:
traceback.print_exc()
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except KeyError, e:
traceback.print_exc()
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except BadSignature, e:
raise FlaskException(str(e),
payload={"name": "SignatureMismatch", "code": 6})
except BadCertificateSigningRequest, e:
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except BadCertificate, e:
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except CertificateVerificationError, e:
raise FlaskException(str(e),
payload={"name": "SignatureMismatch", "code": 6})
except NoStorage, e:
raise FlaskException(str(e),
payload={"name": "NoStorage", "code": 4})
except NotFound, e:
raise FlaskException(str(e),
status_code=404, payload={"name": "FileNotFound", "code": 1})
except Found, e:
# Certificate is found
raise FlaskException(str(e),
payload={"name": "FileFound", "code": 5})
response = Response("", status=201)
response.headers['Location'] = url_for('get_crt', _external=True, cert_id=cert_id)
return response
@app.route('/crt/revoke', methods=['PUT'])
def request_revoke_crt():
"""
Revoke method existing and valid certificate
"""
payload = request.form.get('payload', '')
if not payload:
# Bad parameters
raise FlaskException("'payload' parameter is mandatory",
payload={"name": "MissingParameter", "code": 2})
try:
wrapped = json.loads(payload)
except ValueError, e:
raise FlaskException("payload parameter is not a valid Json string: %s" % str(e),
payload={"name": "FileFormat", "code": 3})
try:
app.config.ca.revokeCertificate(wrapped)
except ValueError, e:
traceback.print_exc()
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except KeyError, e:
traceback.print_exc()
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except BadSignature, e:
raise FlaskException(str(e),
payload={"name": "SignatureMismatch", "code": 6})
except CertificateVerificationError, e:
raise FlaskException(str(e),
payload={"name": "SignatureMismatch", "code": 6})
except BadCertificate, e:
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except ExpiredCertificate, e:
raise FlaskException(str(e),
payload={"name": "FileFormat", "code": 3})
except NotFound, e:
raise FlaskException(str(e),
status_code=404, payload={"name": "FileNotFound", "code": 1})
response = Response("", status=201, )
return response
#Manage routes (Authentication required) - Flask APP
# XXX - this routes will be updated or removed after implement ca_user
@app.route('/')
def home():
return redirect(url_for('index'))
@app.route('/certificates')
def index():
# page to list certificates, also connection link
data_list = app.config.ca.getSignedCertificateList()
return render_template("index.html", data_list=data_list)
@app.route('/admin/logout')
def logout():
logout_user()
return redirect(url_for('index'))
@app.route('/admin/')
@login_required
def admin():
return index()
@app.route('/admin/csr_requests', methods=['GET'])
@login_required
def manage_csr():
data_list = app.config.ca.getPendingCertificateRequestList()
return render_template('manage_page.html', data_list=data_list)
@app.route('/admin/configure', methods=['GET'])
def configure():
return render_template("configure.html")
@app.route('/admin/setpassword', methods=['POST'])
def setpassword():
username = 'admin'
password = request.form.get('password', '').encode('utf-8')
if not password:
raise FlaskException("'password' parameter is mandatory",
payload={"name": "MissingParameter", "code": 2})
app.config.storage.findOrCreateUser(
"Admin",
"admin",
"admin@example.com",
username,
app.user_manager.hash_password(password))
logout_user()
return redirect(url_for('manage_csr'))
@app.route('/admin/signcert', methods=['GET'])
@login_required
def do_signcert_web():
csr_id = request.args.get('csr_id', '').encode('utf-8')
if not csr_id:
raise FlaskException("'csr_id' parameter is a mandatory parameter",
payload={"name": "MissingParameter", "code": 2})
return signcert(csr_id, 'manage_csr')
@app.route('/admin/deletecsr', methods=['GET'])
@login_required
def deletecsr():
"""
Delete certificate signature request file
"""
csr_id = request.args.get('csr_id', '').encode('utf-8')
if not csr_id:
raise FlaskException("'csr_id' parameter is a mandatory parameter",
payload={"name": "MissingParameter", "code": 2})
try:
app.config.ca.deletePendingCertificateRequest(csr_id)
except NotFound, e:
raise FlaskException("%s" % str(e),
status_code=404, payload={"name": "FileNotFound", "code": 1})
return redirect(url_for('manage_csr'))
@app.route('/admin/profile', methods=['GET', 'POST'])
@login_required
def user_profile():
form = UserProfileForm(request.form, obj=current_user)
if request.method == 'POST' and form.validate():
# Copy form fields to user_profile fields
del form.username # revove username from recieved form
form.populate_obj(current_user)
db.session.commit()
# Redirect to home page
return redirect(url_for('index'))
return render_template('user_profile.html',
form=form)
@app.route('/admin/logs', methods=['GET'])
@login_required
def view_logs():
content = ""
full_log = request.args.get('full', '')
if app.config.log_file and os.path.exists(app.config.log_file):
if full_log == 'true':
with open(app.config.log_file) as f:
content = f.read()
else:
content = utils.tail(app.config.log_file, 500)
return render_template("view_logs.html", content=content, filename=app.config.log_file)
...@@ -15,30 +15,41 @@ ...@@ -15,30 +15,41 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with caucase. If not, see <http://www.gnu.org/licenses/>. # along with caucase. If not, see <http://www.gnu.org/licenses/>.
import json
from .exceptions import BadSignature
def wrap(payload, key, digest_list): import os
""" from caucase.web import parseArguments, configure_flask, app
Sign payload (json-serialised) with key, using one of the given digests. from werkzeug.contrib.fixers import ProxyFix
"""
# Choose a digest between the ones supported def readConfigFromFile(config_file):
payload = json.dumps(payload) config_list = []
return { with open(config_file) as f:
"payload": payload, for line in f.readlines():
"digest": digest, if not line or line.startswith('#'):
"signature": sign(payload + digest + ' ', key, digest).encode('base64'), continue
} line_list = line.strip().split(' ')
if len(line_list) == 1:
def unwrap(wrapped, getCertificate, digest_list): config_list.append('--%s' % line_list[0].strip())
elif len(line_list) > 1:
config_list.append('--%s' % line_list[0].strip())
config_list.append(' '.join(line_list[1:]))
return parseArguments(config_list)
def start_wsgi():
""" """
Raise if signature does not match payload. Returns payload. Start entry for wsgi, do not run app.run, read config from file
""" """
# Check whether given digest is allowed if os.environ.has_key('CA_CONFIGURATION_FILE'):
raise BadSignature('Given digest is not supported') config_file = os.environ['CA_CONFIGURATION_FILE']
payload = json.loads(wrapped['payload']) else:
crt = getCertificate(payload) config_file = 'ca.conf'
if not check(wrapped['payload'] + wrapped['digest'] + ' ', crt, wrapped['signature'].decode('base64'), wrapped['digest']):
raise BadSignature('Signature mismatch') configure_flask(readConfigFromFile(config_file))
return payload
app.wsgi_app = ProxyFix(app.wsgi_app)
app.logger.info("Certificate Authority server ready...")
if __name__ == 'caucase.wsgi':
start_wsgi()
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment