Commit bf6a336a authored by Vincent Pelletier's avatar Vincent Pelletier

{ca,test}: Extend backup tests.

Test backup chunk boundaries.
Test absence of a backup before the first user is created.
parent 534a6657
......@@ -62,6 +62,8 @@ _SUBJECT_OID_DICT = {
_BACKUP_MAGIC = b'caucase\0'
_CONFIG_NAME_AUTO_SIGN_CSR_AMOUNT = 'auto_sign_csr_amount'
DEFAULT_BACKUP_SYMETRIC_CIPHER = 'aes256_cbc_pkcs7_hmac_10M_sha256'
def Extension(value, critical):
"""
Avoid oid redundant parameter when creating an extension.
......@@ -776,6 +778,12 @@ class UserCertificateAuthority(CertificateAuthority):
See backup schema in documentation.
"""
# Note to developers: dict structure can and will change
__backup_format_dict = {
'aes256_cbc_pkcs7_hmac_10M_sha256': 10 * 1024 * 1024,
# For tests only, to exercise block boundaries
'TESTS_INTERNAL_USE_ONLY': 32,
}
def doBackup(self, write):
"""
......@@ -800,7 +808,8 @@ class UserCertificateAuthority(CertificateAuthority):
hashes.SHA256(),
backend=_cryptography_backend,
)
HMAC_PAYLOAD_SIZE = 10 * 1024 * 1024
symetric_cipher_name = DEFAULT_BACKUP_SYMETRIC_CIPHER
HMAC_PAYLOAD_SIZE = self.__backup_format_dict[symetric_cipher_name]
key_list = []
for crt_pem in self._storage.iterCertificates():
try:
......@@ -829,7 +838,7 @@ class UserCertificateAuthority(CertificateAuthority):
return False
header = utils.toBytes(json.dumps({
'cipher': {
'name': 'aes256_cbc_pkcs7_hmac_10M_sha256',
'name': symetric_cipher_name,
'parameter': utils.toUnicode(hexlify(iv)),
},
'key_list': key_list,
......@@ -884,7 +893,10 @@ class UserCertificateAuthority(CertificateAuthority):
read(struct.calcsize('<I')),
)
header = json.loads(read(header_len))
if header['cipher']['name'] != 'aes256_cbc_pkcs7_hmac_10M_sha256':
symetric_cipher_name = header['cipher']['name']
try:
HMAC_PAYLOAD_SIZE = cls.__backup_format_dict[symetric_cipher_name]
except KeyError:
raise ValueError('Unrecognised symetric cipher')
private_key = utils.load_privatekey(key_pem)
key_id = hexlify(x509.SubjectKeyIdentifier.from_public_key(
......@@ -921,7 +933,6 @@ class UserCertificateAuthority(CertificateAuthority):
hashes.SHA256(),
backend=_cryptography_backend,
)
HMAC_PAYLOAD_SIZE = 10 * 1024 * 1024
# Each block has its signature
HMAC_SIGNED_SIZE = HMAC_PAYLOAD_SIZE + 32
CHUNK_SIZE = HMAC_SIGNED_SIZE
......
......@@ -53,6 +53,7 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from caucase import cli
import caucase.ca
from caucase.ca import Extension
from caucase.client import CaucaseError, CaucaseClient
from caucase.exceptions import CertificateVerificationError
......@@ -2243,6 +2244,14 @@ class CaucaseTest(unittest.TestCase):
Exercise backup generation and restoration.
"""
backup_glob = os.path.join(self._server_backup_path, '*.sql.caucased')
self._stopServer()
self._startServer('--backup-directory', self._server_backup_path)
# XXX: waiting for nothing to happen
time.sleep(1)
# No backup must have been created, as no user exists.
self.assertFalse(glob.glob(backup_glob))
user_key_path = self._createFirstUser()
user2_key_path = self._createAndApproveCertificate(
user_key_path,
......@@ -2372,7 +2381,16 @@ class CaucaseTest(unittest.TestCase):
)
self.assertItemsEqual(before_backup, after_restore)
self._startServer('--backup-directory', self._server_backup_path)
DEFAULT_BACKUP_SYMETRIC_CIPHER_orig = (
caucase.ca.DEFAULT_BACKUP_SYMETRIC_CIPHER
)
try:
caucase.ca.DEFAULT_BACKUP_SYMETRIC_CIPHER = 'TESTS_INTERNAL_USE_ONLY'
self._startServer('--backup-directory', self._server_backup_path)
finally:
caucase.ca.DEFAULT_BACKUP_SYMETRIC_CIPHER = (
DEFAULT_BACKUP_SYMETRIC_CIPHER_orig
)
# user2 got a new certificate matching their new key
utils.getKeyPair(user2_new_key_path)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment