Commit 325666f9 authored by Alain Takoudjou's avatar Alain Takoudjou

certificate_authority: reimplement with new api

parent 3cdc9427
......@@ -198,6 +198,19 @@ class CertificateBase(object):
else:
return False
def checkCertificateValidity(self, ca_cert_file, cert_file, key_file=None):
with open(ca_cert_file) as f_ca:
ca_cert = f_ca.read()
with open(cert_file) as f_cert:
cert = f_cert.read()
# XXX Considering only one trusted certificate here
if not self.verifyCertificateChain(cert, [ca_cert]):
return False
if key_file:
return self.validateCertAndKey(cert_file, key_file)
return True
def generatePrivatekey(self, output_file, size=2048):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, size)
......@@ -215,7 +228,7 @@ class CertificateBase(object):
def generateCertificateRequest(self, key_file, output_file, cn,
country, state, locality='', email='', organization='',
organization_unit='', digest="sha1"):
organization_unit='', digest="sha256"):
with open(key_file) as fkey:
key = crypto.load_privatekey(crypto.FILETYPE_PEM, fkey.read())
......@@ -238,20 +251,38 @@ class CertificateBase(object):
os.chmod(output_file, 0644)
def checkCertificateValidity(self, ca_cert_file, cert_file, key_file=None):
with open(ca_cert_file) as f_ca:
ca_cert = f_ca.read()
with open(cert_file) as f_cert:
cert = f_cert.read()
def signData(self, key_file, data, digest="sha256", output_file=None):
"""
Sign a data using digest and return signature. If output_file is provided
the signature will be written into the file.
"""
with open(key_file) as fkey:
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, fkey.read())
sign = crypto.sign(pkey, data, digest)
# data_base64 = base64.b64encode(sign)
if output_file is None:
return sign
fd = os.open(output_file,
os.O_CREAT|os.O_WRONLY|os.O_EXCL|os.O_TRUNC,
0644)
try:
os.write(fd, sign)
finally:
os.close(fd)
return sign
# XXX Considering only one trusted certificate here
if not self.verifyCertificateChain(cert, [ca_cert]):
return False
return False
if key_file:
return self.validateCertAndKey(cert_file, key_file)
return True
def verifyData(self, cert_string, signature, data, digest="sha256"):
"""
Verify the signature for a data string.
cert_string: is the certificate content as string
signature: is generate using 'signData' from the data to verify
data: content to verify
digest: by default is sha256, set the correct value
"""
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
return crypto.verify(x509, signature, data, digest)
def readCertificateRequest(self, csr):
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
......@@ -415,6 +446,11 @@ class CertificateAuthorityRequest(CertificateBase):
self.ca_url = ca_url
self.logger = logger
self.max_retry = max_retry
self.X509Extension = X509Extension()
while self.ca_url.endswith('/'):
# remove all / at end or ca_url
self.ca_url = self.ca_url[:-1]
if self.logger is None:
self.logger = logging.getLogger('Certificate Request')
......@@ -446,7 +482,7 @@ class CertificateAuthorityRequest(CertificateBase):
if os.path.exists(self.cacertificate) and os.stat(self.cacertificate).st_size > 0:
return
ca_cert_url = '%s/get/cacert.pem' % self.ca_url
ca_cert_url = '%s/crt/cacert.pem' % self.ca_url
self.logger.info("getting CA certificate file %s" % ca_cert_url)
response = None
while not response or response.status_code != 200:
......@@ -479,7 +515,7 @@ class CertificateAuthorityRequest(CertificateBase):
data = {'csr': csr}
retry = 0
sleep_time = 10
request_url = '%s/request' % self.ca_url
request_url = '%s/csr' % self.ca_url
# Save Cert in tmp to check later
cert_temp = '%s.tmp' % self.certificate
csr_key_file = '%s.key' % csr_file
......@@ -491,31 +527,34 @@ class CertificateAuthorityRequest(CertificateBase):
csr_key = fkey.read()
if csr_key:
self.logger.info("Csr was already sent to CA, key is: %s" % csr_key)
self.logger.info("Csr was already sent to CA, using csr : %s" % csr_key)
else:
response = self._request('post', request_url, data=data)
response = self._request('put', request_url, data=data)
while (not response or response.status_code != 200) and retry < self.max_retry:
while (not response or response.status_code != 201) and retry < self.max_retry:
self.logger.error("%s: Failed to send CSR. \n%s" % (
self.logger.error("%s: Failed to sent CSR. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % sleep_time)
time.sleep(sleep_time)
retry += 1
response = self._request('post', request_url, data=data)
response = self._request('put', request_url, data=data)
if response.status_code != 200:
raise Exception("ERROR: failed to post CSR after % retry. Exiting..." % retry)
if response.status_code != 201:
raise Exception("ERROR: failed to put CSR after % retry. Exiting..." % retry)
self.logger.info("CSR succefully sent.")
self.logger.debug("Server reponse with csr key is %s" % response.text)
csr_key = response.text
# Get csr Location from request header: http://xxx.com/csr/key
self.logger.debug("Csr location is: %s" % response.headers['Location'])
csr_key = response.headers['Location'].split('/')[-1]
with open(csr_key_file, 'w') as fkey:
fkey.write(response.text)
# csr is xxx.csr.pem so cert is xxx.cert.pem
self.logger.info("Waiting for signed certificate...")
reply_url = '%s/get/%s.cert.pem' % (self.ca_url, csr_key)
reply_url = '%s/crt/%s.cert.pem' % (self.ca_url, csr_key[:-8])
response = self._request('get', reply_url)
while not response or response.status_code != 200:
......@@ -535,7 +574,7 @@ class CertificateAuthorityRequest(CertificateBase):
os.close(fd)
os.unlink(cert_temp)
else:
if auto_revoke:
"""if auto_revoke:
self.logger.error("Certificate validation failed. " \
"The signed certificate is going to be revoked...")
self.revokeCertificateRequest(cert_temp,
......@@ -547,38 +586,46 @@ class CertificateAuthorityRequest(CertificateBase):
except OSError, e:
if e.errno != errno.ENOENT:
# raise
pass
pass"""
raise Exception("Error: Certificate validation failed. " \
"This signed certificate should be revoked!")
self.logger.info("Certificate correctly saved at %s." % self.certificate)
def revokeCertificateRequest(self, cert_file, key_name, message=""):
def revokeCertificateRequest(self, cert_file, message=""):
"""
Send a revocation request for the givent certificate to the master.
"""
sleep_time = 10
retry = 0
cert = self.freadX509(cert_file)
serial = '{0:x}'.format(int(cert.get_serial_number()))
request_url = '%s/requestrevoke' % self.ca_url
data = {'serial': serial, 'name': key_name, 'reason': message}
self.logger.info("Sent Certificate revocation request for %s, serial=%s." % (
key_name, serial))
response = self._request('post', request_url, data=data)
with open(cert_file) as f:
cert_string = f.read()
cert = self.readX509(cert_string)
digest = "sha256"
payload = json.dumps(dict(
reason=message,
cert=cert_string))
signature = self.signData(self.key, payload, digest)
request_url = '%s/crt/revoke' % self.ca_url
data = {'digest': digest, 'payload': payload, 'signature': signature}
self.logger.info("Sent Certificate revocation request for CN: %s." % (
cert.get_subject().CN))
response = self._request('put', request_url, data=data)
while (not response or response.status_code != 200) and retry < self.max_retry:
while (not response or response.status_code != 201) and retry < self.max_retry:
self.logger.error("%s: Failed to send Rovocation request. \n%s" % (
response.status_code, response.text))
self.logger.info("will retry in %s seconds..." % sleep_time)
time.sleep(sleep_time)
retry += 1
response = self._request('post', request_url, data=data)
response = self._request('put', request_url, data=data)
if response.status_code != 200:
if response.status_code != 201:
raise Exception("ERROR: failed to post revoke request after %s retry. Exiting..." % retry)
self.logger.info("Certificate revocation request for %s.cert.pem successfully sent." % (
key_name))
self.logger.info("Certificate revocation request for %s successfully sent." % (
cert_file))
......@@ -50,9 +50,12 @@ def parseArguments():
parser.add_argument('--organization_unit',
default='Company Unit',
help='The Organisation Unit Name')
parser.add_argument('--auto_revoke',
default=True, action="store_true",
help='Request Revoke Certificate if validation fail')
parser.add_argument('--revoke',
default=False, action="store_true",
help='Revoke the current certificate')
parser.add_argument('--revoke_reason',
help='Say why the certificat should be revoked')
return parser
......@@ -84,7 +87,7 @@ def requestCertificateWeb():
cn=config.cn, country=config.country, state=config.state,
locality=config.locality, email=config.email,
organization=config.organization,
organization_unit=config.organization_unit, digest="sha1")
organization_unit=config.organization_unit, digest="sha256")
ca.signCertificateWeb(config.csr_file, auto_revoke=config.auto_revoke)
......@@ -13,7 +13,6 @@ import traceback
from flask_user import UserManager, SQLAlchemyAdapter
from flask_mail import Mail
from slapos.certificate_authority.web.views import app
from slapos.certificate_authority.web.start_web import app, db, init_app
from slapos.certificate_authority.certificate_authority import CertificateAuthority
def parseArguments():
......@@ -53,6 +52,9 @@ def parseArguments():
help='Path for log output')
parser.add_argument('--db_file',
help='Path of file to use to store User Account information. Default: $ca_dir/ca.db')
#parser.add_argument('--external_url',
# default='',
# help='The HTTP URL used to connect to CA server')
parser.add_argument('--trusted_host',
default=[],
action='append', dest='trusted_host_list',
......@@ -89,13 +91,16 @@ def start():
"""
start certificate authority service
"""
flask.config.Config.__getattr__ = getConfig
options = parseArguments()
if not options.ca_dir:
options.ca_dir = os.getcwd()
else:
options.ca_dir = os.path.abspath(options.ca_dir)
os.environ['CA_INSTANCE_PATH'] = options.ca_dir
from slapos.certificate_authority.web.start_web import app, db, init_app
flask.config.Config.__getattr__ = getConfig
if not options.config_file:
options.config_file = os.path.join(options.ca_dir, 'openssl.cnf')
if not options.db_file:
......@@ -116,11 +121,15 @@ def start():
os.chdir(options.ca_dir)
logger = getLogger(options.debug, options.log_file)
app.logger.addHandler(logger)
ca = CertificateAuthority(options.openssl_bin,
openssl_configuration=options.config_file, certificate=options.cert_file,
key=options.key_file, crl=options.crl_file, ca_directory=options.ca_dir)
app.config.from_object('slapos.certificate_authority.web.settings')
if options.debug:
app.config.from_object('slapos.certificate_authority.web.settings.Development')
else:
app.config.from_object('slapos.certificate_authority.web.settings.Production')
app.config.update(
ca_dir=options.ca_dir,
trusted_host_list=options.trusted_host_list,
......@@ -134,8 +143,12 @@ def start():
SQLALCHEMY_DATABASE_URI='sqlite:///%s' % options.db_file,
ca=ca,
log_file=options.log_file,
# base_url=(options.external_url or 'http://%s:%s' % (options.host, options.port))
)
if os.path.exists(os.path.join(options.ca_dir, 'local.setting.py')):
app.config.from_pyfile('local.setting.py', silent=True)
for key in ['csr', 'req', 'cert', 'crl', 'key', 'newcert']:
try:
path = app.config['%s_dir' % key]
......@@ -154,7 +167,6 @@ def start():
# Initialize Flask extensions
init_app()
app.logger.addHandler(logger)
app.logger.info("Certificate Authority server started on http://%s:%s" % (
options.host, options.port))
app.run(
......
......@@ -49,22 +49,7 @@ class Certificate(db.Model):
def __repr__(self):
return '<CertificateMap %r>' % (self.serial)
class Revoke(db.Model):
"""
This table contains information about certificate revocation
"""
__tablename__ = 'revoke'
id = db.Column(db.Integer, primary_key=True)
comment = db.Column(db.Text())
serial = db.Column(db.String(50), unique=True)
revoke_date = db.Column(db.DateTime)
# link to revoke request if was requested by users
revoke_request_id = db.Column(db.Integer, server_default='')
def __repr__(self):
return '<CertificateMap %r>' % (self.serial)
class RevokeRequest(db.Model):
class Revocation(db.Model):
"""
This table store certificate revocation request from users
"""
......
import os
# Application settings
APP_NAME = "Certificate Authority web app"
# DO NOT use "DEBUG = True" in production environments
DEBUG = True
class BaseConfig(object):
# DO NOT use Unsecure Secrets in production environments
# Generate a safe one with:
# python -c "import os; print repr(os.urandom(24));"
SECRET_KEY = 'This is an UNSECURE Secret. CHANGE THIS for production environments.'
# Application settings
APP_NAME = "Certificate Authority web app"
# DO NOT use Unsecure Secrets in production environments
# Generate a safe one with:
# python -c "import os; print repr(os.urandom(24));"
SECRET_KEY = 'This is an UNSECURE Secret. CHANGE THIS for production environments.'
# SQLAlchemy settings
SQLALCHEMY_DATABASE_URI = 'sqlite:///ca.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
CSRF_ENABLED = True
# Flask-Mail settings
# For smtp.gmail.com to work, you MUST set "Allow less secure apps" to ON in Google Accounts.
# Change it in https://myaccount.google.com/security#connectedapps (near the bottom).
MAIL_SERVER = 'smtp.gmail.com'
MAIL_PORT = 587
MAIL_USE_SSL = False
MAIL_USE_TLS = True
MAIL_USERNAME = 'yourname@gmail.com'
MAIL_PASSWORD = 'password'
MAIL_DEFAULT_SENDER = '"Your Name" <yourname@gmail.com>'
# Used by email templates
USER_APP_NAME = "Certificate Authority"
# Internal view application
USER_AFTER_LOGIN_ENDPOINT = ''
USER_AFTER_LOGOUT_ENDPOINT = ''
USER_ENABLE_USERNAME = True
USER_ENABLE_EMAIL = False
USER_ENABLE_REGISTRATION = False
USER_ENABLE_CHANGE_USERNAME = False
# Allowed digest for signature
CA_DIGEST_LIST = ['sha256', 'sha384', 'sha512']
# SQLAlchemy settings
SQLALCHEMY_DATABASE_URI = 'sqlite:///ca.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
CSRF_ENABLED = True
# Flask-Mail settings
# For smtp.gmail.com to work, you MUST set "Allow less secure apps" to ON in Google Accounts.
# Change it in https://myaccount.google.com/security#connectedapps (near the bottom).
MAIL_SERVER = 'smtp.gmail.com'
MAIL_PORT = 587
MAIL_USE_SSL = False
MAIL_USE_TLS = True
MAIL_USERNAME = 'yourname@gmail.com'
MAIL_PASSWORD = 'password'
MAIL_DEFAULT_SENDER = '"Your Name" <yourname@gmail.com>'
class Development(BaseConfig):
DEBUG = True
TESTING = True
# Used by email templates
USER_APP_NAME = "Certificate Authority"
# Internal application
USER_AFTER_LOGIN_ENDPOINT = ''
USER_AFTER_LOGOUT_ENDPOINT = ''
USER_ENABLE_USERNAME = True
USER_ENABLE_EMAIL = False
USER_ENABLE_REGISTRATION = False
USER_ENABLE_CHANGE_USERNAME = False
\ No newline at end of file
class Production(BaseConfig):
# DO NOT use "DEBUG = True" in production environments
DEBUG = False
TESTING = False
\ No newline at end of file
......@@ -2,9 +2,16 @@ from flask_sqlalchemy import SQLAlchemy
from flask_user import UserManager, SQLAlchemyAdapter
from flask import Flask
from flask_mail import Mail
from slapos.certificate_authority.web.settings import BaseConfig
import os
app = Flask(__name__)
# CA_INSTANCE_PATH is the base directory of application, send to environ
app = Flask(__name__,
instance_path=config_name = os.getenv('CA_INSTANCE_PATH', os.getcwd()),
instance_relative_config=True)
# Use default value so SQLALCHEMY will not warn because there is not db_uri
app.config['SQLALCHEMY_DATABASE_URI'] = BaseConfig.SQLALCHEMY_DATABASE_URI
db = SQLAlchemy(app)
def init_app():
......
......@@ -5,45 +5,20 @@ import time
import urllib
from datetime import datetime
from slapos.certificate_authority.web.start_web import app, db
from slapos.certificate_authority.web.models import (User, Certificate,
from slapos.certificate_authority.web.models import (Certificate,
RevokeRequest, Revoke, CERT_STATUS_VALIDATED, CERT_STATUS_REVOKED,
CERT_STATUS_PENDING, CERT_STATUS_REJECTED)
from slapos.certificate_authority.certificate_authority import CertificateBase
def find_or_create_user(first_name, last_name, email, username, password):
""" Find existing user or create new user """
class CertificateTools:
user = User.query.filter(User.username == username).first()
if not user:
user = User(email=email,
first_name=first_name,
last_name=last_name,
username=username,
password=app.user_manager.hash_password(password),
active=True,
confirmed_at=datetime.utcnow()
)
db.session.add(user)
db.session.commit()
return user
def find_user(username):
return User.query.filter(User.username == username).first()
def get_string_num(number):
if number < 10:
return '0%s' % number
return str(number)
class CertificateTools(object):
def signCertificate(self, cert_id, req_file):
def signCertificate(self, req_file, cert_id):
"""
Sign a certificate, cert_id is the name used by the user to download the cert
"""
csr_dest = os.path.join(app.config.csr_dir, '%s.csr.pem' % cert_id)
cert_name = '%s.cert.pem' % cert_id
try:
# Avoid signing two certificate at the same time (for unique serial)
app.config.ca._lock()
......@@ -58,7 +33,7 @@ class CertificateTools(object):
app.config.ca.signCertificateRequest(req_file, output)
cert = app.config.ca.freadX509(output)
cert_db = Certificate(
name='%s.cert.pem' % cert_id,
name=cert_name,
serial=next_serial,
filename='%s.cert.pem' % next_serial,
common_name=cert.get_subject().CN,
......@@ -89,17 +64,17 @@ class CertificateTools(object):
finally:
app.config.ca._unlock()
def addRevokeRequest(self, serial, hash_name, message):
cert_path = os.path.join(app.config.cert_dir, '%s.cert.pem' % serial)
if not os.path.exists(cert_path):
# This check is fast but 'serial'.cert.pem should the the cert filename in db
return False
return cert_name
def addRevokeRequest(self, cert, message):
x509 = app.config.ca.readX509(cert)
serial = self.getSerialToInt(x509)
cert = Certificate.query.filter(
Certificate.status == CERT_STATUS_VALIDATED
).filter(Certificate.serial == serial).first()
).filter(Certificate.serial == get_string_num(serial)).first()
if not cert or cert.name != '%s.cert.pem' % hash_name:
# This certificate not found or not match
# This certificate not found or not match or was revoked
return False
# Create Request
......@@ -161,8 +136,8 @@ class CertificateTools(object):
return ""
def getCertificateList(self, with_cacerts=True):
ca_cert = app.config.ca.freadX509(app.config.ca.certificate)
if with_cacerts:
ca_cert = app.config.ca.freadX509(app.config.ca.certificate)
data_list = [
{
'index': 1,
......@@ -170,6 +145,7 @@ class CertificateTools(object):
'name': os.path.basename(app.config.ca.certificate),
'cn': ca_cert.get_subject().CN,
'expiration_date': datetime.strptime(ca_cert.get_notAfter(),"%Y%m%d%H%M%SZ"),
'start_date': datetime.strptime(ca_cert.get_notBefore(), "%Y%m%d%H%M%SZ")
},
{
'index': 2,
......@@ -177,6 +153,7 @@ class CertificateTools(object):
'name': os.path.basename(app.config.ca.ca_crl),
'cn': "Certificate Revocation List",
'expiration_date': '---',
'start_date': '---',
}
]
index = 3
......@@ -197,7 +174,7 @@ class CertificateTools(object):
'start_date': signed_cert.start_before,
})
index += 1
return data_list
def getRevokedCertificateList(self):
......@@ -217,7 +194,7 @@ class CertificateTools(object):
'start_date': revoked_cert.start_before,
})
index += 1
return data_list
def getRevocationRequestList(self):
......
import os
class Error():
MISSING_PARAM = {
code: 1,
name: "MissingParameter",
message: "Parameter(s) required is missing or empty."
}
CSR_FORMAT = {
code: 2,
name: "FileFormat",
message: "Not a valid PEM certificate signing request"
}
CSR_INVALID_CN = {
code: 3,
name: "CertificateSigningRequestContent",
message: "Request does not contain a Common Name"
}
CERT_FORMAT = {
code: 4,
name: "FileFormat",
message: "Not a valid PEM certificate"
}
SIGNATURE_VERIFICATION = {
code: 5,
name: "SignatureMismatch",
message: "Signature verification failed. Request was not signed with the correct key"
}
BAD_SIGNATURE_DIGEST = {
code: 6,
name: "SignatureMismatch",
message: "Hash algorithm not supported"
}
CSR_CONTENT_MISMATCH = {
code: 7,
name: "CertificateSigningRequestContent",
message: "Request content does not match replaced certificate"
}
JSON_FORMAT = {
code: 8,
name: "JsonFormat",
message: "Not a valid Json content submitted"
}
PAYLOAD_CONTENT = {
code: 9,
name: "PayloadContentInvalid",
message: "Submitted payload parameter is not valid"
}
INVALID_DIGEST = {
code: 4,
name: "IvalidORNotAllowedDigest",
message: "The Digest submitted is not accepted by CA or is invalid"
}
# -*- coding: utf-8 -*-
import os
def get_string_num(number):
if number < 10:
return '0%s' % number
return str(number)
\ No newline at end of file
# -*- coding: utf-8 -*-
import os
from datetime import datetime
from slapos.certificate_authority.web.start_web import app, db
from slapos.certificate_authority.web.models import User
def check_authentication(username, password):
user = self.find_user(username):
if user:
return app.user_manager.hash_password(password) == user.password
else:
return False
def find_or_create_user(first_name, last_name, email, username, password):
""" Find existing user or create new user """
user = User.query.filter(User.username == username).first()
if not user:
user = User(email=email,
first_name=first_name,
last_name=last_name,
username=username,
password=app.user_manager.hash_password(password),
active=True,
confirmed_at=datetime.utcnow()
)
db.session.add(user)
db.session.commit()
return user
def find_user(username):
return User.query.filter(User.username == username).first()
\ No newline at end of file
......@@ -8,50 +8,107 @@ from OpenSSL import crypto
import uuid
from datetime import datetime
from flask import (Flask, session, request, redirect, url_for, render_template,
jsonify, session, abort, send_file, flash, g)
jsonify, session, abort, send_file, flash, g, Response)
from slapos.certificate_authority.web.start_web import app, db
from slapos.certificate_authority.web.tools import (find_user, find_or_create_user,
get_string_num, CertificateTools)
from slapos.certificate_authority.web.tools.users import find_user, find_or_create_user
from slapos.certificate_authority.web.tools.tools get_string_num
from slapos.certificate_authority.web.tools.error import Error as error
from slapos.certificate_authority.web.tools.certificate import CertificateTools
from flask_user import login_required, current_user
from flask_login import logout_user #, login_user, current_user, login_required
from slapos.certificate_authority.web.models import (UserProfileForm, Certificate)
from slapos.runner.utils import tail
from functools import wraps
cert_tools = CertificateTools()
def authenticated_method(func):
""" This decorator ensures that the current user is logged in before calling the actual view.
Abort with 401 when the user is not logged in."""
@wraps(func)
def decorated_view(*args, **kwargs):
# User must be authenticated
auth = request.authorization
if not auth:
return abort(401)
elif not Users.check_authentication(auth.username, auth.password):
return abort(401)
# Call the actual view
return func(*args, **kwargs)
return decorated_view
@app.errorhandler(401)
def error401(error):
if error.description is None:
message = {
'code': 401,
'name': 'Unauthorized',
'message': "Authenticate."
}
else:
message = error.description
response = jsonify(message)
response.status_code = 401
response.headers['WWW-Authenticate'] = 'Basic realm="Login Required"'
return response
@app.errorhandler(403)
def error403(res):
return "403: Forbidden. Your are not allowed to access %s" % res, 403
def error403(error):
if error.description is None:
message = {
'code': 404,
'name': 'Forbidden',
'message': 'Forbidden. Your are not allowed to access %s' % request.url,
}
else:
message = error.description
response = jsonify(message)
response.status_code = 404
return response
@app.errorhandler(404)
def error404(msg=""):
return "404: Resource not found.\n%s\n" % msg, 404
@app.errorhandler(501)
def error501(msg=""):
return "501: Internal Error. %s\n" % msg, 501
def error404(error):
if error.description is None:
message = {
'code': 404,
'name': 'NotFound',
'message': 'Resource not found: ' + request.url,
}
else:
message = error.description
response = jsonify(message)
response.status_code = 404
@app.errorhandler(412)
def error412(msg):
return "412: Precondition Failed: %s.\n" % msg, 412
return response
@app.errorhandler(400)
def error400(msg):
return "400: Bad Request", 400
def error400(error):
if error.description is None:
message = {
'code': 400,
'name': 'BadRequest',
'message': 'The request could not be understood by the server, you probably provided wrong parameters.'
}
else:
message = error.description
response = jsonify(message)
response.status_code = 400
return response
def writefile(path, content, mode=0640):
with open(path, 'w') as fd:
fd.write(content)
os.chmod(path, mode)
@app.before_request
def before_request():
if request.path.startswith('/static/') \
or request.path.startswith == '/get/':
if not request.path.startswith('/admin/'):
return
if request.path == '/configure' or request.path == '/setpassword':
if request.path == '/admin/configure' or request.path == '/admin/setpassword':
# check if password file exists, if yes go to index
if find_user('admin'):
return redirect(url_for('home'))
......@@ -61,171 +118,265 @@ def before_request():
return redirect(url_for('configure'))
g.user = current_user
@app.route('/')
def index():
# page to list certificates, also connection link
data_list = cert_tools.getCertificateList()
return render_template("index.html", data_list=data_list)
@app.route('/get/<string:name>', methods=['GET'])
def getfile(name):
ca_name = os.path.basename(app.config.ca.certificate)
crl_name = os.path.basename(app.config.ca.ca_crl)
if name == ca_name:
return send_file(app.config.ca.certificate,
attachment_filename=ca_name,
as_attachment=True)
elif name == crl_name:
return send_file(app.config.ca.ca_crl,
attachment_filename=crl_name,
as_attachment=True)
else:
# name is like f9c4-4a04-8bad-70bb45c6406e.cert.pem
cert_filename = cert_tools.getFilenameFromHash(name)
cert_file = os.path.join(app.config.cert_dir, cert_filename)
if os.path.exists(cert_file) and os.path.isfile(cert_file):
filename = os.path.basename(cert_file)
return send_file(cert_file,
attachment_filename=filename,
as_attachment=True)
return abort(404, name)
@app.route('/logout')
def logout():
logout_user()
return redirect(url_for('home'))
@app.route('/configure', methods=['GET'])
def configure():
return render_template("configure.html")
@app.route('/setpassword', methods=['POST'])
def setpassword():
username = 'admin'
password = request.form.get('password', '').encode('utf-8')
if not password:
return abort(412, "password not set or empty")
find_or_create_user(
"Admin",
"admin",
"admin@example.com",
username,
password)
logout_user()
return redirect(url_for('home'))
# Routes for certificate Authority
@app.route('/crl', methods=['GET'])
def get_crl():
if os.path.exists(app.config.ca.ca_crl):
return send_file(app.config.ca.ca_crl,
attachment_filename=os.path.basename(app.config.ca.ca_crl),
as_attachment=True)
else:
return abort(404)
# Routes for certificate Authority
def signcert(key, redirect_to=''):
req_file = os.path.join(app.config.req_dir, '%s.csr.pem' % key)
if os.path.exists(req_file) and os.path.isfile(req_file):
cert_id = key.split('+')[0]
try:
cert_tools.signCertificate(cert_id, req_file)
except Exception, e:
# XXX - need to check this
raise
# return abort(501, str(e))
@app.route('/csr/<string:csr_id>', methods=['GET'])
def get_csr(csr_id):
csr_file = os.path.join(app.config.req_dir, csr_id)
if os.path.exists(csr_file) and os.path.isfile(csr_file):
return send_file(csr_file,
attachment_filename=os.path.basename(csr_file),
as_attachment=True)
else:
return abort(403)
flash('Certificate is signed!', 'success')
if redirect_to:
return redirect(url_for(redirect_to))
return "Certificate is signed"
return abort(404)
@app.route('/request', methods=['POST'])
def do_request():
@app.route('/csr', methods=['PUT'])
def request_cert():
csr_content = request.form.get('csr', '').encode('utf-8')
if not csr_content:
return abort(400)
return abort(400, error.MISSING_PARAM)
try:
# check if a valid csr content
req = app.config.ca.readCertificateRequest(csr_content)
# XXX - Maybe more check if the csr is valid with CA
except crypto.Error, e:
return abort(412, str(e))
return abort(400, error.CSR_FORMAT)
cert_id = str(uuid.uuid1())
csr_keyfile = '%s+%s.csr.pem' % (cert_id, str(uuid.uuid4()))
request_file = os.path.join(app.config.req_dir, csr_keyfile)
return do_requestcert(csr_content)
if os.path.exists(request_file):
# The request already exist
raise Exception("Certificate Signature Request file should be unique")
try:
writefile(request_file, csr_content)
except OSError, e:
raise
@app.route('/csr/<string:csr_id>', methods=['DELETE'])
@authenticated_method
def remove_csr(csr_id):
req_file = os.path.join(app.config.req_dir, csr_id)
if os.path.exists(req_file) and os.path.isfile(req_file):
os.unlink(req_file)
else:
return abort(404)
return cert_id
@app.route('/crt/<string:cert_id>', methods=['GET'])
def get_crt(cert_id):
cacert_id = os.path.basename(app.config.ca.certificate)
if cert_id == cacert_id:
cert_file = app.config.ca.certificate
else:
cert_filename = cert_tools.getFilenameFromHash(name)
cert_file = os.path.join(app.config.cert_dir, cert_filename)
@app.route('/signcert', methods=['POST'])
def do_signcert():
"""
This method should be called by a list of host allowed, can be used to sign with command line
For security, it's good to only allow local ip (127.0.0.1) or ::1
"""
if os.path.exists(cert_file) and os.path.isfile(cert_file):
return send_file(cert_file,
attachment_filename=os.path.basename(cert_file),
as_attachment=True)
else:
return abort(404, cert_id)
@app.route('/crt', methods=['PUT'])
@authenticated_method
def sign_cert():
key = request.form.get('key', '').encode('utf-8')
if not key:
return abort(400)
remote_client = request.remote_addr
x_forwarded_for_list = request.headers.getlist("X-Forwarded-For")
if remote_client not in app.config.trusted_host_list or \
(x_forwarded_for_list and x_forwarded_for_list[0] not in app.config.trusted_host_list):
return abort(403) # Forbidden
return signcert(key)
@app.route('/renewcert', methods=['POST'])
def renewcert(serial):
@app.route('/crt/renew', methods=['PUT'])
def renew_cert(serial):
"""
this method is used to renew expired certificate.
should recieve csr and old cert serial.
"""
csr_content = request.form.get('csr', '').encode('utf-8')
serial = request.form.get('serial', '').encode('utf-8')
return abort(200, "Done. The method is not implemented yet!")
digest = request.form.get('digest', '')
signature = request.form.get('signature', '')
# payload: a Json string containing certificate to revoke and reason msg
payload_string = request.form.get('payload', '')
if not digest or not signature or not payload_string:
# Bad parameters
return abort(400)
try:
payload = json.loads(payload_string)
except ValueError, exc:
return abort(400, error.JSON_FORMAT, str(exc))
if not payload.has_key('cert') and payload.has_key('csr'):
return abort(400)
@app.route('/requestrevoke', methods=['POST'])
def do_revoke():
if not digest in app.config.CA_DIGEST_LIST:
return abort(400, error.INVALID_DIGEST, '%r not in allowed signature hash set: %s' % (digest, app.config.CA_DIGEST_LIST))
try:
req = app.config.ca.readCertificateRequest(payload['csr'])
except crypto.Error, e:
return abort(400, error.CSR_FORMAT)
try:
cert = app.config.ca.readX509(payload['cert'])
except crypto.Error, e:
return abort(400, error.CERT_FORMAT)
if not app.config.ca.verifyData(payload['cert'], signature, payload, digest):
return abort(400, Error.SIGNATURE_VERIFICATION)
if req.get_subject().CN != cert.get_subject().CN:
return abort(400, error.CSR_CONTENT_MISMATCH)
return do_requestcert(payload['csr'])
@app.route('/crt/revoke', methods=['PUT'])
def request_revoke_crt():
"""
Revoke method required certificat: serial(Hex), name and reason
'name' is used here to add more verification
'reason' is a message that say why this certificate should be revoked
"""
serial = request.form.get('serial', '').upper()
name = request.form.get('name', '')
# A msg explaining why the certificate should be revoked
reason = request.form.get('reason', '')
if not serial or not name or not reason:
digest = request.form.get('digest', '')
signature = request.form.get('signature', '')
# payload: a Json string containing certificate to revoke and reason msg
payload_string = request.form.get('payload', '')
if not digest or not signature or not payload_string:
# Bad parameters
return abort(400)
if len(serial) == 1:
serial = '0%s' % serial
try:
payload = json.loads(payload_string)
except ValueError:
return abort(400)
if not payload.has_key('cert') and payload.has_key('reason'):
return abort(400)
if not digest in app.config.CA_DIGEST_LIST:
return abort(400)
if cert_tools.addRevokeRequest(serial, name, reason):
return "Certificate revocation created"
try:
cert = app.config.ca.readX509(payload['cert'])
except crypto.Error, e:
return abort(400, error.CERT_FORMAT)
if not app.config.ca.verifyData(payload['cert'], signature, payload, digest):
return abort(400, Error.SIGNATURE_VERIFICATION)
if cert_tools.addRevokeRequest(payload['cert'], name, payload['reason']):
message = {
"code": 201,
"name": "Created",
"message": "Content created - Certificate was revoked"
}
response = Response(message,
status=201,
mimetype='application/json')
return response
else:
return abort(412, "Parameters not valid for the request")
return abort(404)
def do_requestcert(csr):
key = str(uuid.uuid1())
csr_filename = '%s.csr.pem' % key
csr_file = os.path.join(app.config.req_dir, csr_filename)
fd = os.open(csr_file,
os.O_CREAT|os.O_WRONLY|os.O_EXCL|os.O_TRUNC,
0640)
try:
os.write(fd, csr)
finally:
os.close(fd)
message = {
"code": 201,
"name": "Created",
"message": "Content created - Certificate Signing Request was accepted"
}
response = Response(message,
status=201,
mimetype='application/json')
response.headers['Location'] = url_for('get_csr', _external=True, cert_id=csr_filename)
return response
#Manage routes (access admin)
def signcert(csr_key, redirect_to=''):
req_file = os.path.join(app.config.req_dir, csr_key)
if os.path.exists(req_file) and os.path.isfile(req_file):
cert_id = csr_key.split('.')[0]
# cert_id = csr_key[:-8]
try:
cert_name = cert_tools.signCertificate(req_file, cert_id)
except Exception, e:
# XXX - need to check this
raise
# return abort(501, str(e))
else:
return abort(404)
# XXX - to remove
flash('Certificate is signed!', 'success')
if redirect_to:
return redirect(url_for(redirect_to))
message = {
"code": 201,
"name": "Created",
"message": "Content created - Certificate was signed"
}
response = Response(message,
status=201,
mimetype='application/json')
response.headers['Location'] = url_for('get_crt', _external=True, cert_id=cert_name)
return response
#Manage routes (Authentication required) - Flask APP
@app.route('/')
def index():
# page to list certificates, also connection link
data_list = cert_tools.getCertificateList()
return render_template("index.html", data_list=data_list)
@app.route('/admin/logout')
def logout():
logout_user()
return redirect(url_for('home'))
@app.route('/admin/configure', methods=['GET'])
def configure():
return render_template("configure.html")
@app.route('/admin/setpassword', methods=['POST'])
def setpassword():
username = 'admin'
password = request.form.get('password', '').encode('utf-8')
if not password:
return abort(412, "password not set or empty")
find_or_create_user(
"Admin",
"admin",
"admin@example.com",
username,
password)
logout_user()
return redirect(url_for('home'))
@app.route('/manage', methods=['GET'])
@app.route('/admin/manage', methods=['GET'])
@login_required
def manage_home():
data_list = cert_tools.getCertificateRequestList()
......@@ -237,7 +388,7 @@ def manage_home():
)
return render_template('manage_page.html', data_list=data_list, counter=c)
@app.route('/profile', methods=['GET', 'POST'])
@app.route('/admin/profile', methods=['GET', 'POST'])
@login_required
def user_profile():
form = UserProfileForm(request.form, obj=current_user)
......@@ -253,7 +404,7 @@ def user_profile():
return render_template('user_profile.html',
form=form)
@app.route('/viewcert', methods=['GET'])
@app.route('/admin/viewcert', methods=['GET'])
@login_required
def viewcert():
type_dict = {
......@@ -282,7 +433,7 @@ def viewcert():
return render_template('view_cert.html', content=content, name=name, cert_type=cert_type)
@app.route('/signcert_web', methods=['GET'])
@app.route('/admin/signcert_web', methods=['GET'])
@login_required
def do_signcert_web():
filename = request.args.get('name', '').encode('utf-8')
......@@ -290,7 +441,7 @@ def do_signcert_web():
return abort(412)
return signcert(filename[:-8], 'manage')
@app.route('/deletecsr', methods=['GET'])
@app.route('/admin/deletecsr', methods=['GET'])
@login_required
def deletecsr():
"""
......@@ -307,21 +458,21 @@ def deletecsr():
else:
return abort(404)
@app.route('/signed_certs', methods=['GET'])
@app.route('/admin/signed_certs', methods=['GET'])
@login_required
def signed_certificate_list():
# page to list certificates, also connection link
data_list = cert_tools.getCertificateList(False)
return render_template("signed_certs.html", data_list=data_list)
@app.route('/revoked_certs', methods=['GET'])
@app.route('/admin/revoked_certs', methods=['GET'])
@login_required
def revoked_certificate_list():
# page to list certificates, also connection link
data_list = cert_tools.getRevokedCertificateList()
return render_template("revoked_certs.html", data_list=data_list)
@app.route('/revoke_cert', methods=['GET', 'POST'])
@app.route('/admin/revoke_cert', methods=['GET', 'POST'])
@login_required
def revoke_cert():
redirect_url = 'signed_certs'
......@@ -338,13 +489,13 @@ def revoke_cert():
return abort(404)
@app.route('/revocation_requests', methods=['GET'])
@app.route('/admin/revocation_requests', methods=['GET'])
@login_required
def revocation_request_list():
data_list = cert_tools.getRevocationRequestList()
return render_template("revocation_requests.html", data_list=data_list)
@app.route('/reject_revocation_request', methods=['GET'])
@app.route('/admin/reject_revocation_request', methods=['GET'])
@login_required
def reject_revocation_request():
req_id = request.args.get('request_id', '')
......@@ -357,7 +508,7 @@ def reject_revocation_request():
return redirect(url_for('revocation_requests'))
@app.route('/view_logs', methods=['GET'])
@app.route('/admin/view_logs', methods=['GET'])
@login_required
def view_logs():
content = ""
......@@ -370,7 +521,7 @@ def view_logs():
content = tail(f, 500)
return render_template("view_logs.html", content=content, filename=app.config.log_file)
@app.route('/generate_crl', methods=['GET'])
@app.route('/admin/generate_crl', methods=['GET'])
@login_required
def generate_crl():
app.config.ca.genCertificateRevocationList()
......@@ -378,6 +529,6 @@ def generate_crl():
app.add_url_rule('/', 'home', index)
app.add_url_rule('/manage', 'manage', manage_home)
app.add_url_rule('/signed_certs', 'signed_certs', signed_certificate_list)
app.add_url_rule('/revocation_requests', 'revocation_requests', revocation_request_list)
\ No newline at end of file
app.add_url_rule('/admin/manage', 'manage', manage_home)
app.add_url_rule('/admin/signed_certs', 'signed_certs', signed_certificate_list)
app.add_url_rule('/admin/revocation_requests', 'revocation_requests', revocation_request_list)
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment