Commit 45baf54f authored by Vincent Pelletier's avatar Vincent Pelletier

http.manage: Add support for importing revocations.

Export is already provided by the regular protocol.
parent 47acf71f
......@@ -37,6 +37,7 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import pem
from . import exceptions
from . import utils
from .wsgi import Application
from .ca import CertificateAuthority, UserCertificateAuthority
......@@ -588,6 +589,16 @@ def manage(argv=None):
'use this unless you are certain you need it and it is safe for your '
'use-case.',
)
parser.add_argument(
'--import-crl',
default=[],
metavar='PEM_FILE',
action='append',
type=argparse.FileType('r'),
help='Import service revocation list. Corresponding CA certificate must '
'be already present in the database (including added in the same run '
'using --import-ca).',
)
parser.add_argument(
'--export-ca',
metavar='PEM_FILE',
......@@ -723,6 +734,32 @@ def manage(argv=None):
if not imported:
raise ValueError('No CA certificate imported')
print('Imported %i CA certificates' % imported)
if args.import_crl:
db = SQLite3Storage(db_path, table_prefix='cas')
trusted_ca_crt_set = [
utils.load_ca_certificate(x['crt_pem'])
for x in db.getCAKeyPairList()
]
latest_ca_not_after = max(
x.not_valid_after
for x in trusted_ca_crt_set
)
already_revoked_count = revoked_count = 0
for crl_file in args.import_crl:
for revoked in utils.load_crl(crl_file.read(), trusted_ca_crt_set):
try:
db.revoke(
revoked.serial_number,
latest_ca_not_after,
)
except exceptions.Found:
already_revoked_count += 1
else:
revoked_count += 1
print('Revoked %i certificates (%i were already revoked)' % (
revoked_count,
already_revoked_count,
))
if args.export_ca is not None:
encryption_algorithm = serialization.BestAvailableEncryption(
getBytePass('CA export passphrase: ')
......
......@@ -1625,9 +1625,23 @@ class CaucaseTest(unittest.TestCase):
Exercise CA export and import code.
"""
exported_ca = os.path.join(self._server_dir, 'exported.ca.pem')
user_key_path = self._createFirstUser()
# Create a service certificate...
service_key = self._createAndApproveCertificate(
user_key_path,
'service',
)
# ...and revoke it
# Note: Fetches CRL right after revoking certificate
self._runClient(
'--revoke-crt', service_key, service_key,
)
self._runClient()
getBytePass_orig = http.getBytePass
http.getBytePass = lambda x: 'test'
orig_stdout = sys.stdout
try:
http.getBytePass = lambda x: 'test'
sys.stdout = stdout = StringIO()
self.assertFalse(os.path.exists(exported_ca), exported_ca)
http.manage(
argv=(
......@@ -1642,10 +1656,19 @@ class CaucaseTest(unittest.TestCase):
argv=(
'--db', server_db2,
'--import-ca', exported_ca,
'--import-crl', self._client_crl,
),
)
self.assertTrue(os.path.exists(server_db2), server_db2)
self.assertEqual(
[
'Imported 1 CA certificates',
'Revoked 1 certificates (0 were already revoked)',
],
stdout.getvalue().splitlines(),
)
finally:
sys.stdout = orig_stdout
http.getBytePass = getBytePass_orig
def testWSGIBase(self):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment