Commit e7e2df78 authored by Alain Takoudjou's avatar Alain Takoudjou

cli renew now takes option to check if renew is required and on-renew script

parent a9a134d0
......@@ -25,13 +25,25 @@ import argparse
import traceback
import pem
import json
import subprocess
from OpenSSL import crypto
from caucase import utils
from datetime import datetime
from datetime import datetime, timedelta
CSR_KEY_FILE = 'csr.key.txt'
RENEW_CSR_KEY_FILE = 'renew_csr.key.txt'
def popenCommunicate(command_list):
subprocess_kw = dict(stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen = subprocess.Popen(command_list, **subprocess_kw)
result = popen.communicate()[0]
if popen.returncode is None:
popen.kill()
if popen.returncode != 0:
raise ValueError('Issue during calling %r, result was:\n%s' % (
command_list, result))
return result
def parseArguments():
"""
Parse arguments for Certificate Authority Request.
......@@ -55,19 +67,29 @@ def parseArguments():
parser.add_argument('--digest',
default="sha256",
help='Digest used to sign data. default: %(default)s')
parser.add_argument('--cn',
help='Common name to use for request new certificate')
help='Common name to use when request new certificate.')
parser.add_argument('--threshold',
help='The minimum remaining certificate validity time in' \
' seconds after which renew of certificate can be triggered.',
type=int)
parser.add_argument('--on-renew',
help='Path of an executable file to call after certificate'\
' renewal.')
parser.add_argument('--no-check-certificate',
action='store_false', default=True, dest='verify_certificate',
help='When connecting to CA on HTTPS, disable certificate verification.')
group = parser.add_mutually_exclusive_group()
group.add_argument('--request', action='store_true',
help='Request a new Certificate')
help='Request a new Certificate.')
group.add_argument('--revoke', action='store_true',
help='Revoke existing certificate')
help='Revoke existing certificate.')
group.add_argument('--renew', action='store_true',
help='Renew current certificate and and replace with existing files')
help='Renew current certificate and and replace with existing files.')
return parser
......@@ -120,7 +142,10 @@ def renewCertificate(config, backup_dir):
# download or update ca crt file
ca_renew.getCACertificateChain()
ca_renew.renewCertificate(config.csr_file, backup_dir)
ca_renew.renewCertificate(config.csr_file,
backup_dir,
config.threshold,
after_script=config.on_renew)
def main():
parser = parseArguments()
......@@ -129,32 +154,38 @@ def main():
base_dir = os.path.dirname(config.crt_file)
os.chdir(os.path.abspath(base_dir))
if not config.ca_url:
parser.error('`ca-url` parameter is required. Use --ca-url URL')
parser.print_help()
exit(1)
if config.request:
if not config.cn or not config.ca_url:
if not config.cn:
parser.error('Option --cn is required for request.')
parser.print_help()
exit(1)
requestCertificate(config)
elif config.revoke:
if not config.ca_url:
parser.print_help()
exit(1)
revokeCertificate(config)
elif config.renew:
if not config.threshold:
parser.error('`threshold` parameter is required with renew. Use --threshold VALUE')
parser.print_help()
exit(1)
backup_dir = os.path.join('.', 'old-%s' % datetime.now().strftime('%Y%m%d%H%M%S'))
os.mkdir(backup_dir)
# cleanup
if os.path.exists(CSR_KEY_FILE):
os.rename(CSR_KEY_FILE, os.path.join(backup_dir, CSR_KEY_FILE))
os.unlink(CSR_KEY_FILE)
if os.path.exists(config.csr_file):
base_name = os.path.basename(config.csr_file)
os.rename(config.csr_file, os.path.join(backup_dir, base_name))
os.unlink(config.csr_file)
renewCertificate(config, backup_dir)
else:
parser.error('Please set one of options: --request | --revoke | --renew.')
parser.print_help()
exit(1)
......@@ -290,6 +321,21 @@ class CertificateAuthorityRequest(object):
cert_pem,
key_pem)
def isCertExpirationDateValid(self, x509, threshold):
"""
Return True if remaning certificate valid time is second is lower than
the threshold value
"""
expiration_date = datetime.strptime(
x509.get_notAfter(), '%Y%m%d%H%M%SZ'
)
now_date = datetime.utcnow()
limit_date = now_date + timedelta(0, threshold)
expire_in = expiration_date - limit_date
if expire_in.days > 0.0:
return True
return False
def getValidCACertificateChain(self):
ca_cert_url = '%s/crt/ca.crt.json' % self.ca_url
self.logger.info("Updating CA certificate file from %s" % ca_cert_url)
......@@ -490,9 +536,10 @@ class CertificateAuthorityRequest(object):
self.logger.info("Certificate %s was successfully revoked." % (
self.certificate))
def renewCertificate(self, csr_file, backup_dir, backup_key=True):
def renewCertificate(self, csr_file, backup_dir, threshold, renew_key=True,
after_script=''):
"""
Renew the current certificate. Regenerate private key if backup_key is `True`
Renew the current certificate. Regenerate private key if renew_key is `True`
"""
sleep_time = 10
retry = 1
......@@ -503,22 +550,26 @@ class CertificateAuthorityRequest(object):
cert_pem = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
csr_key = ""
if self.isCertExpirationDateValid(cert_pem, threshold):
self.logger.info("Nothing to do, no need to renew the certificate.")
return
try:
if backup_key:
if renew_key:
self.generatePrivatekey(new_key_path)
key_file = new_key_path
if not os.path.exists(csr_file):
csr = self.generateCertificateRequest(key_file,
cn=cert_pem.get_subject().CN,
csr_file=csr_file)
else:
csr = open(csr_file).read()
if os.path.exists(RENEW_CSR_KEY_FILE):
csr_key = open(RENEW_CSR_KEY_FILE).read()
if not csr_key:
if not os.path.exists(csr_file):
csr = self.generateCertificateRequest(key_file,
cn=cert_pem.get_subject().CN,
csr_file=csr_file)
else:
csr = open(csr_file).read()
payload = dict(
renew_csr=csr,
crt=cert)
......@@ -562,11 +613,12 @@ class CertificateAuthorityRequest(object):
time.sleep(sleep_time)
response = self._request('get', reply_url)
self.logger.info("Validating signed certificate...")
if not os.path.exists(backup_dir):
os.mkdir(backup_dir)
self._writeNewFile(new_cert_path, response.text)
# change location of files
if backup_key:
if renew_key:
os.rename(self.key,
os.path.join(backup_dir, os.path.basename(self.key)))
os.rename(new_key_path, self.key)
......@@ -576,6 +628,7 @@ class CertificateAuthorityRequest(object):
os.path.join(backup_dir, os.path.basename(self.certificate)))
os.rename(new_cert_path, self.certificate)
self.logger.info("Validating signed certificate...")
if not self.checkCertificateValidity(response.text):
# certificate verification failed, should raise ?
......@@ -590,8 +643,13 @@ class CertificateAuthorityRequest(object):
for path in [csr_file, RENEW_CSR_KEY_FILE]:
if os.path.exists(path):
os.unlink(path)
if after_script:
output = popenCommunicate([os.path.realpath(after_script)])
self.logger.info("Successfully executed script '%s' with output:\n%s" % (
after_script, output))
finally:
for path in [new_cert_path, new_key_path]:
if os.path.exists(path):
os.unlink(path)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment