Commit b3b4ebf1 authored by Vincent Pelletier's avatar Vincent Pelletier

client: Promote updateCAFile and updateCRLFile to class methods.

So that they do not hardcode the class to instanciate.
This prepares further class tweaking.
Also, update users.
parent e72324c6
......@@ -34,8 +34,6 @@ from .client import (
CaucaseError,
CaucaseClient,
HTTPSOnlyCaucaseClient,
updateCAFile,
updateCRLFile,
)
_cryptography_backend = default_backend()
......@@ -492,10 +490,10 @@ def main(argv=None):
try:
# Get a working, up-to-date CAS CA certificate file.
updated |= updateCAFile(cas_url, args.ca_crt)
updated |= CaucaseClient.updateCAFile(cas_url, args.ca_crt)
# --update-user, CA part
if args.update_user or args.mode == MODE_USER:
updated |= updateCAFile(cau_url, args.user_ca_crt)
updated |= CaucaseClient.updateCAFile(cau_url, args.user_ca_crt)
client = CLICaucaseClient(
client=CaucaseClient(
......@@ -538,13 +536,13 @@ def main(argv=None):
if args.list_csr:
client.listCSR(args.mode)
# update our CRL after all revocations we were requested
updated |= updateCRLFile(cas_url, args.crl, [
updated |= CaucaseClient.updateCRLFile(cas_url, args.crl, [
utils.load_ca_certificate(x)
for x in utils.getCertList(args.ca_crt)
])
# --update-user, CRL part
if args.update_user:
updated |= updateCRLFile(cau_url, args.user_crl, [
updated |= CaucaseClient.updateCRLFile(cau_url, args.user_crl, [
utils.load_ca_certificate(x)
for x in utils.getCertList(args.user_ca_crt)
])
......@@ -695,7 +693,10 @@ def updater(argv=None, until=utils.until):
}[args.mode]
threshold = datetime.timedelta(args.threshold, 0)
max_sleep = datetime.timedelta(args.max_sleep, 0)
updated = updateCAFile(cas_url, args.cas_ca) and args.cas_ca == args.ca
updated = CaucaseClient.updateCAFile(
cas_url,
args.cas_ca,
) and args.cas_ca == args.ca
client = CaucaseClient(
ca_url=ca_url,
ca_crt_pem_list=utils.getCertList(args.cas_ca)
......@@ -732,12 +733,15 @@ def updater(argv=None, until=utils.until):
)
now = until(next_deadline)
next_deadline = now + max_sleep
if args.cas_ca != args.ca and updateCAFile(cas_url, args.cas_ca):
if args.cas_ca != args.ca and CaucaseClient.updateCAFile(
cas_url,
args.cas_ca,
):
client = CaucaseClient(
ca_url=ca_url,
ca_crt_pem_list=utils.getCertList(args.cas_ca)
)
if updateCAFile(ca_url, args.ca):
if CaucaseClient.updateCAFile(ca_url, args.ca):
print 'Got new CA'
updated = True
# Note: CRL expiration should happen several time during CA renewal
......@@ -747,7 +751,7 @@ def updater(argv=None, until=utils.until):
utils.load_ca_certificate(x)
for x in utils.getCertList(args.ca)
]
if updateCRLFile(ca_url, args.crl, ca_crt_list):
if CaucaseClient.updateCRLFile(ca_url, args.crl, ca_crt_list):
print 'Got new CRL'
updated = True
next_deadline = min(
......
......@@ -35,8 +35,6 @@ __all__ = (
'CaucaseError',
'CaucaseClient',
'HTTPSOnlyCaucaseClient',
'updateCAFile',
'updateCRLFile',
)
_cryptography_backend = default_backend()
......@@ -47,81 +45,77 @@ class CaucaseError(Exception):
"""
pass
def updateCAFile(url, ca_crt_path):
class CaucaseClient(object):
"""
Bootstrap anf maintain a CA file up-to-date.
url (str)
URL to caucase, ending in eithr /cas or /cau.
ca_crt_path (str)
Path to the CA certificate file, which may not exist.
Caucase HTTP(S) client.
Return whether an update happened (including whether an already-known
certificate expired and was discarded).
Expose caucase REST API as pythonic methods.
"""
if not os.path.exists(ca_crt_path):
ca_pem = CaucaseClient(
ca_url=url,
).getCACertificate()
with open(ca_crt_path, 'w') as ca_crt_file:
ca_crt_file.write(ca_pem)
updated = True
else:
updated = False
now = datetime.datetime.utcnow()
loaded_ca_pem_list = utils.getCertList(ca_crt_path)
ca_pem_list = [
x
for x in loaded_ca_pem_list
if utils.load_ca_certificate(x).not_valid_after > now
]
ca_pem_list.extend(
CaucaseClient(
ca_url=url,
ca_crt_pem_list=ca_pem_list,
).getCACertificateChain(),
)
if ca_pem_list != loaded_ca_pem_list:
data = ''.join(ca_pem_list)
with open(ca_crt_path, 'w') as ca_crt_file:
ca_crt_file.write(data)
updated = True
return updated
def updateCRLFile(url, crl_path, ca_list):
"""
Bootstrap anf maintain a CRL file up-to-date.
@classmethod
def updateCAFile(cls, url, ca_crt_path):
"""
Bootstrap anf maintain a CA file up-to-date.
url (str)
URL to caucase, ending in eithr /cas or /cau.
crl_path (str)
Path to the CRL file, which may not exist.
ca_list (list of cryptography.x509.Certificate instances)
One of these CA certificates must have signed the CRL for it to be
accepted.
url (str)
URL to caucase, ending in eithr /cas or /cau.
ca_crt_path (str)
Path to the CA certificate file, which may not exist.
Return whether an update happened.
"""
if os.path.exists(crl_path):
my_crl = utils.load_crl(open(crl_path).read(), ca_list)
else:
my_crl = None
latest_crl_pem = CaucaseClient(
ca_url=url,
).getCertificateRevocationList()
latest_crl = utils.load_crl(latest_crl_pem, ca_list)
if my_crl is None or latest_crl.signature != my_crl.signature:
with open(crl_path, 'w') as crl_file:
crl_file.write(latest_crl_pem)
return True
return False
Return whether an update happened (including whether an already-known
certificate expired and was discarded).
"""
if not os.path.exists(ca_crt_path):
ca_pem = cls(ca_url=url).getCACertificate()
with open(ca_crt_path, 'w') as ca_crt_file:
ca_crt_file.write(ca_pem)
updated = True
else:
updated = False
now = datetime.datetime.utcnow()
loaded_ca_pem_list = utils.getCertList(ca_crt_path)
ca_pem_list = [
x
for x in loaded_ca_pem_list
if utils.load_ca_certificate(x).not_valid_after > now
]
ca_pem_list.extend(
cls(ca_url=url, ca_crt_pem_list=ca_pem_list).getCACertificateChain(),
)
if ca_pem_list != loaded_ca_pem_list:
data = ''.join(ca_pem_list)
with open(ca_crt_path, 'w') as ca_crt_file:
ca_crt_file.write(data)
updated = True
return updated
class CaucaseClient(object):
"""
Caucase HTTP(S) client.
@classmethod
def updateCRLFile(cls, url, crl_path, ca_list):
"""
Bootstrap anf maintain a CRL file up-to-date.
url (str)
URL to caucase, ending in eithr /cas or /cau.
crl_path (str)
Path to the CRL file, which may not exist.
ca_list (list of cryptography.x509.Certificate instances)
One of these CA certificates must have signed the CRL for it to be
accepted.
Return whether an update happened.
"""
if os.path.exists(crl_path):
my_crl = utils.load_crl(open(crl_path).read(), ca_list)
else:
my_crl = None
latest_crl_pem = cls(ca_url=url).getCertificateRevocationList()
latest_crl = utils.load_crl(latest_crl_pem, ca_list)
if my_crl is None or latest_crl.signature != my_crl.signature:
with open(crl_path, 'w') as crl_file:
crl_file.write(latest_crl_pem)
return True
return False
Expose caucase REST API as pythonic methods.
"""
def __init__(self, ca_url, ca_crt_pem_list=None, user_key=None):
# XXX: set timeout to HTTP connections ?
http_url = urlparse(ca_url)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment