Commit 9c772060 authored by Vincent Pelletier's avatar Vincent Pelletier

utils: Genericise getCertList and saveCertList.

So they can be reused for more PEM-encoded types.
parent 05ca7a95
......@@ -2917,13 +2917,17 @@ class CaucaseTest(unittest.TestCase):
self.assertTrue(os.path.exists(self._client_ca_crt))
self.assertTrue(os.path.isfile(self._client_ca_crt))
self.assertItemsEqual(utils.getCertList(self._client_ca_crt), [crt0_pem])
# Invalid file gets deleted
dummy_file_path = os.path.join(self._client_ca_dir, 'not_a_pem')
with open(dummy_file_path, 'wb') as dummy:
# Invalid file gets deleted only if it has expected extension (.ca.pem)
kept_file_path = os.path.join(self._client_ca_dir, 'not_a_ca.pem')
deleted_file_path = os.path.join(self._client_ca_dir, 'foo.ca.pem')
with open(kept_file_path, 'wb'), open(deleted_file_path, 'wb'):
pass
self.assertTrue(os.path.exists(dummy_file_path))
self.assertTrue(os.path.exists(kept_file_path))
self.assertTrue(os.path.exists(deleted_file_path))
utils.saveCertList(self._client_ca_dir, [crt0_pem])
self.assertFalse(os.path.exists(dummy_file_path))
self.assertTrue(os.path.exists(kept_file_path))
self.assertFalse(os.path.exists(deleted_file_path))
os.unlink(kept_file_path)
# Storing and loading multiple certificates
utils.saveCertList(self._client_ca_dir, [crt0_pem, crt1_pem])
crta, crtb = os.listdir(self._client_ca_dir)
......
......@@ -24,7 +24,7 @@ Caucase - Certificate Authority for Users, Certificate Authority for SErvices
Small-ish functions needed in many places.
"""
from __future__ import absolute_import, print_function
from binascii import a2b_base64, b2a_base64
from binascii import a2b_base64, b2a_base64, hexlify
import calendar
import codecs
from collections import defaultdict
......@@ -124,26 +124,25 @@ def _getPEMTypeDict(path, result=None):
def getCertList(crt_path):
"""
Return a list of certificates.
Raises if there is anything else than a certificate.
"""
if not os.path.exists(crt_path):
return _getPEMListFromPath(crt_path, pem.Certificate)
def _getPEMListFromPath(path, pem_type):
if not os.path.exists(path):
return []
if os.path.isdir(crt_path):
file_list = [os.path.join(crt_path, x) for x in os.listdir(crt_path)]
else:
file_list = [crt_path]
result = []
for file_name in file_list:
type_dict = _getPEMTypeDict(file_name)
crt_list = type_dict.pop(pem.Certificate)
if type_dict:
raise ValueError('%s contains more than just certificates' % (file_name, ))
result.extend(x.as_bytes() for x in crt_list)
return result
return [
pem_object.as_bytes()
for file_name in (
[os.path.join(path, x) for x in os.listdir(path)]
if os.path.isdir(path) else
[path]
)
for pem_object in _getPEMTypeDict(file_name).get(pem_type, ())
]
def saveCertList(crt_path, cert_pem_list):
"""
Store given list of PEm-encoded certificates in given path.
Store given list of PEM-encoded certificates in given path.
crt_path (str)
May point to a directory a file, or nothing.
......@@ -154,61 +153,70 @@ def saveCertList(crt_path, cert_pem_list):
cert_pem_list (list of bytes)
"""
if os.path.exists(crt_path):
if os.path.isfile(crt_path):
saveCertListTo = _saveCertListToFile
elif os.path.isdir(crt_path):
saveCertListTo = _saveCertListToDirectory
_savePEMList(crt_path, cert_pem_list, load_ca_certificate, '.ca.pem')
def _savePEMList(path, pem_list, pem_loader, extension):
if os.path.exists(path):
if os.path.isfile(path):
savePEMList = _savePEMListToFile
elif os.path.isdir(path):
savePEMList = _savePEMListToDirectory
else:
raise TypeError('%s exist and is neither a directory nor a file' % (
crt_path,
path,
))
else:
saveCertListTo = (
_saveCertListToFile
if os.path.splitext(crt_path)[1] else
_saveCertListToDirectory
savePEMList = (
_savePEMListToFile
if os.path.splitext(path)[1] else
_savePEMListToDirectory
)
saveCertListTo(crt_path, cert_pem_list)
def _saveCertListToFile(ca_crt_path, cert_pem_list):
with open(ca_crt_path, 'wb') as ca_crt_file:
ca_crt_file.write(b''.join(cert_pem_list))
def _saveCertListToDirectory(crt_dir, cert_pem_list):
if not os.path.exists(crt_dir):
os.mkdir(crt_dir)
ca_cert_dict = {
'%x.pem' % (load_ca_certificate(x).serial_number, ): x
for x in cert_pem_list
savePEMList(path, pem_list, pem_loader, extension)
def _savePEMListToFile(file_path, pem_list, pem_loader, extension):
_ = pem_loader # Silence pylint
_ = extension # Silence pylint
with open(file_path, 'wb') as pem_file:
for pem_chunk in pem_list:
pem_file.write(pem_chunk)
def _savePEMListToDirectory(dir_path, pem_list, pem_loader, extension):
if not os.path.exists(dir_path):
os.mkdir(dir_path)
pem_dict = {
hexlify(
pem_loader(x).extensions.get_extension_for_class(
x509.AuthorityKeyIdentifier,
).value.key_identifier,
).decode('ascii') + extension: x
for x in pem_list
}
for cert_filename in os.listdir(crt_dir):
ca_crt_path = os.path.join(crt_dir, cert_filename)
if not os.path.isfile(ca_crt_path):
# Not a file and not a symlink to a file, ignore
for filename in os.listdir(dir_path):
filepath = os.path.join(dir_path, filename)
if not filepath.endswith(extension) or not os.path.isfile(filepath):
# Not a managed file name and not a symlink to a file, ignore
continue
if not os.path.islink(ca_crt_path) and cert_filename in ca_cert_dict:
if not os.path.islink(filepath) and filename in pem_dict:
try:
# pylint: disable=unbalanced-tuple-unpacking
cert, = getCertList(ca_crt_path)
file_pem_item, = _getPEMTypeDict(filepath).itervalues()
# pylint: enable=unbalanced-tuple-unpacking
# pylint: disable=broad-except
except Exception:
# pylint: enable=broad-except
# Inconsistent content (multiple certificates, not CA certificates,
# ...): overwrite file
# File contains multiple PEM items: overwrite
pass
else:
if cert == ca_cert_dict[cert_filename]:
if file_pem_item == pem_dict[filename]:
# Already consistent, do not edit.
del ca_cert_dict[cert_filename]
del pem_dict[filename]
else:
# Unknown file (ex: expired certificate), or a symlink to a file: delete
os.unlink(ca_crt_path)
for cert_filename, cert_pem in ca_cert_dict.items():
ca_crt_path = os.path.join(crt_dir, cert_filename)
with open(ca_crt_path, 'wb') as ca_crt_file:
ca_crt_file.write(cert_pem)
os.unlink(filepath)
for filename, pem_item in pem_dict.iteritems():
filepath = os.path.join(dir_path, filename)
with open(filepath, 'wb') as pem_file:
pem_file.write(pem_item)
def getCert(crt_path):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment