Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
caucase
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Vincent Pelletier
caucase
Commits
809c04cd
Commit
809c04cd
authored
Jun 28, 2017
by
Alain Takoudjou
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
allow to set custom subject when signing certificate
parent
52d85d1e
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
131 additions
and
25 deletions
+131
-25
caucase/ca.py
caucase/ca.py
+44
-4
caucase/test/testca.py
caucase/test/testca.py
+69
-12
caucase/test/teststorage.py
caucase/test/teststorage.py
+2
-5
caucase/web.py
caucase/web.py
+16
-4
No files found.
caucase/ca.py
View file @
809c04cd
...
...
@@ -42,6 +42,22 @@ MIN_CA_RENEW_PERIOD = 2
DEFAULT_DIGEST_LIST
=
[
'sha256'
,
'sha384'
,
'sha512'
]
SUBJECT_KEY_LIST
=
[
'C'
,
'ST'
,
'L'
,
'OU'
,
'O'
,
'CN'
,
'emailAddress'
]
def
x509_name
(
**
attrs
):
"""
Return a new X509Name with the given attributes.
"""
# XXX There's no other way to get a new X509Name.
name
=
crypto
.
X509
().
get_subject
()
attrs
=
list
(
attrs
.
items
())
# Make the order stable - order matters!
def
key
(
attr
):
return
attr
[
1
]
attrs
.
sort
(
key
=
key
)
for
k
,
v
in
attrs
:
setattr
(
name
,
k
,
v
)
return
name
class
CertificateAuthority
(
object
):
def
__init__
(
self
,
storage
,
ca_life_period
,
ca_renew_period
,
crt_life_time
,
crl_renew_period
,
digest_list
=
None
,
...
...
@@ -210,7 +226,7 @@ class CertificateAuthority(object):
"""
return
self
.
_storage
.
getPendingCertificateRequestList
(
limit
,
with_data
)
def
createCertificate
(
self
,
csr_id
,
ca_key_pair
=
None
):
def
createCertificate
(
self
,
csr_id
,
ca_key_pair
=
None
,
subject_dict
=
None
):
"""
Generate new signed certificate. `ca_key_pair` is the CA key_pair to use
if None, use the latest CA key_pair
...
...
@@ -219,6 +235,8 @@ class CertificateAuthority(object):
new certificate (string).
@param ca_key_pair: The CA key_pair to used for signature. If None, the
latest key_pair is used.
@param subject_dict: dict of subject attributes to use in x509 subject,
if None, csr subject is used (dict).
"""
# Apply extensions (ex: "not a certificate", ...)
# Generate a certificate from the CSR
...
...
@@ -229,9 +247,22 @@ class CertificateAuthority(object):
# Certificate serial is the csr_id without extension .csr.pem
serial
=
int
(
csr_id
[:
-
8
],
16
)
subject
=
None
if
ca_key_pair
is
None
:
ca_key_pair
=
self
.
_ca_key_pairs_list
[
-
1
]
cert_pem
=
self
.
_generateCertificateObjects
(
ca_key_pair
,
csr_pem
,
serial
)
if
subject_dict
:
for
attr
in
subject_dict
.
keys
():
if
not
attr
in
SUBJECT_KEY_LIST
:
raise
ValueError
(
"Subject key %r is not allowed. Certificate subject "
\
"key should be one of %r"
%
(
attr
,
SUBJECT_KEY_LIST
))
if
subject_dict
.
has_key
(
'C'
)
and
len
(
subject_dict
[
'C'
])
!=
2
:
# Country code size is 2
raise
ValueError
(
"Country Code size in subject should be equal to 2."
)
subject
=
x509_name
(
**
subject_dict
)
cert_pem
=
self
.
_generateCertificateObjects
(
ca_key_pair
,
csr_pem
,
serial
,
subject
=
subject
)
crt_id
=
self
.
_storage
.
storeCertificate
(
csr_id
,
cert_pem
)
return
crt_id
...
...
@@ -442,12 +473,21 @@ class CertificateAuthority(object):
"""
return
crypto
.
dump_privatekey
(
crypto
.
FILETYPE_PEM
,
pkey_object
)
def
_generateCertificateObjects
(
self
,
ca_key_pair
,
req
,
serial
):
def
_generateCertificateObjects
(
self
,
ca_key_pair
,
req
,
serial
,
subject
=
None
):
"""
Generate certificate from CSR PEM Object.
This method set default certificate extensions, later will allow to set custom extensions
ca_key_pair: ca_key_pair which should be used to sign certificate
req: csr object to sign
serial: serial to apply to the new signed certificate
subject: give a dict containing new subject to apply on signed certificate
if subject is None, req.get_subject() is used.
"""
if
subject
is
None
:
subject
=
req
.
get_subject
()
# Here comes the actual certificate
cert
=
crypto
.
X509
()
# version v3
...
...
@@ -456,7 +496,7 @@ class CertificateAuthority(object):
cert
.
gmtime_adj_notBefore
(
0
)
cert
.
gmtime_adj_notAfter
(
self
.
crt_life_time
)
cert
.
set_issuer
(
ca_key_pair
[
'crt'
].
get_subject
())
cert
.
set_subject
(
req
.
get_subject
()
)
cert
.
set_subject
(
subject
)
cert
.
set_pubkey
(
req
.
get_pubkey
())
self
.
extension_manager
.
setDefaultExtensions
(
cert
,
...
...
caucase/test/testca.py
View file @
809c04cd
...
...
@@ -66,7 +66,7 @@ class CertificateAuthorityTest(unittest.TestCase):
auto_sign_csr
=
auto_sign_csr
)
def
generateCSR
(
self
,
cn
=
"toto.example.com"
):
def
generateCSR
(
self
,
cn
=
"toto.example.com"
,
email
=
"toto@example.com"
):
key
=
crypto
.
PKey
()
key
.
generate_key
(
crypto
.
TYPE_RSA
,
2048
)
...
...
@@ -78,7 +78,7 @@ class CertificateAuthorityTest(unittest.TestCase):
subject
.
L
=
"LOU"
subject
.
O
=
"OOU"
subject
.
OU
=
"OU"
subject
.
emailAddress
=
"toto@example.com"
subject
.
emailAddress
=
email
req
.
set_pubkey
(
key
)
utils
.
X509Extension
().
setDefaultCsrExtensions
(
req
)
req
.
sign
(
key
,
self
.
default_digest
)
...
...
@@ -226,6 +226,63 @@ m4DpuP4nL0ixQJWZuV+qrx6Tow==
cert
=
ca
.
getCertificate
(
cert_id
)
x509
=
crypto
.
load_certificate
(
crypto
.
FILETYPE_PEM
,
cert
)
self
.
assertTrue
(
utils
.
validateCertAndKey
(
x509
,
key
))
subj_dict
=
{
'CN'
:
'toto.example.com'
,
'C'
:
'CC'
,
'ST'
:
'ST'
,
'L'
:
'LOU'
,
'O'
:
'OOU'
,
'OU'
:
'OU'
,
'emailAddress'
:
'toto@example.com'
}
for
attr
in
[
'C'
,
'ST'
,
'L'
,
'OU'
,
'O'
,
'CN'
,
'emailAddress'
]:
self
.
assertEqual
(
getattr
(
x509
.
get_subject
(),
attr
),
subj_dict
[
attr
])
with
self
.
assertRaises
(
NotFound
):
ca
.
getPendingCertificateRequest
(
csr_id
)
def
test_createCertificate_custom_subject
(
self
):
ca
=
self
.
make_ca
(
190
)
csr
,
key
=
self
.
generateCSR
(
cn
=
"test certificate"
,
email
=
"some@test.com"
)
csr_id
=
ca
.
createCertificateSigningRequest
(
self
.
csr_tostring
(
csr
))
# sign certificate with default ca keypair
subject_dict
=
dict
(
CN
=
"real cn"
,
emailAddress
=
"caucase@email.com"
)
# sign certificate but change subject
cert_id
=
ca
.
createCertificate
(
csr_id
,
subject_dict
=
subject_dict
)
cert
=
ca
.
getCertificate
(
cert_id
)
x509
=
crypto
.
load_certificate
(
crypto
.
FILETYPE_PEM
,
cert
)
self
.
assertTrue
(
utils
.
validateCertAndKey
(
x509
,
key
))
self
.
assertEqual
(
x509
.
get_subject
().
CN
,
subject_dict
[
'CN'
])
self
.
assertEqual
(
x509
.
get_subject
().
emailAddress
,
subject_dict
[
'emailAddress'
])
# Others attributes are empty
for
attr
in
[
'C'
,
'ST'
,
'L'
,
'OU'
,
'O'
]:
self
.
assertEqual
(
getattr
(
x509
.
get_subject
(),
attr
),
None
)
with
self
.
assertRaises
(
NotFound
):
ca
.
getPendingCertificateRequest
(
csr_id
)
def
test_createCertificate_custom_subject2
(
self
):
ca
=
self
.
make_ca
(
190
)
csr
,
key
=
self
.
generateCSR
(
cn
=
"test certificate"
,
email
=
"some@test.com"
)
csr_id
=
ca
.
createCertificateSigningRequest
(
self
.
csr_tostring
(
csr
))
subject_dict
=
{
'CN'
:
'some.site.com'
,
'C'
:
'FR'
,
'ST'
:
'State'
,
'L'
:
'Localisation'
,
'O'
:
'My Organisation'
,
'OU'
:
'Organisation U'
,
'emailAddress'
:
'toto@example.com'
}
# sign certificate but change subject
cert_id
=
ca
.
createCertificate
(
csr_id
,
subject_dict
=
subject_dict
)
cert
=
ca
.
getCertificate
(
cert_id
)
x509
=
crypto
.
load_certificate
(
crypto
.
FILETYPE_PEM
,
cert
)
# certificate is still valid
self
.
assertTrue
(
utils
.
validateCertAndKey
(
x509
,
key
))
# check that all attributes are set
for
attr
in
[
'C'
,
'ST'
,
'L'
,
'OU'
,
'O'
]:
self
.
assertEqual
(
getattr
(
x509
.
get_subject
(),
attr
),
subject_dict
[
attr
])
with
self
.
assertRaises
(
NotFound
):
ca
.
getPendingCertificateRequest
(
csr_id
)
...
...
@@ -482,8 +539,8 @@ m4DpuP4nL0ixQJWZuV+qrx6Tow==
crl2_string
=
ca
.
getCertificateRevocationList
()
crl2
=
crypto
.
load_crl
(
crypto
.
FILETYPE_PEM
,
crl2_string
)
self
.
assertEquals
(
len
(
crl2
.
get_revoked
()),
1
)
serial
=
'0%s'
%
cert_2
.
get_serial_number
(
)
self
.
assertEquals
(
crl2
.
get_revoked
()[
0
].
get_serial
(),
serial
)
serial
=
utils
.
getSerialToInt
(
cert_2
)
self
.
assertEquals
(
crl2
.
get_revoked
()[
0
].
get_serial
(),
serial
.
upper
()
)
payload
=
dict
(
reason
=
""
,
...
...
@@ -498,9 +555,9 @@ m4DpuP4nL0ixQJWZuV+qrx6Tow==
self
.
assertEquals
(
len
(
crl3
.
get_revoked
()),
2
)
matches
=
0
for
revoked
in
crl3
.
get_revoked
():
if
revoked
.
get_serial
()
==
'0%s'
%
cert_3
.
get_serial_numb
er
():
if
revoked
.
get_serial
()
==
utils
.
getSerialToInt
(
cert_3
).
upp
er
():
matches
+=
1
elif
revoked
.
get_serial
()
==
'0%s'
%
cert_2
.
get_serial_numb
er
():
elif
revoked
.
get_serial
()
==
utils
.
getSerialToInt
(
cert_2
).
upp
er
():
matches
+=
1
self
.
assertEquals
(
matches
,
2
)
...
...
@@ -539,8 +596,8 @@ m4DpuP4nL0ixQJWZuV+qrx6Tow==
crl2_string
=
ca
.
getCertificateRevocationList
()
crl2
=
crypto
.
load_crl
(
crypto
.
FILETYPE_PEM
,
crl2_string
)
self
.
assertEquals
(
len
(
crl2
.
get_revoked
()),
1
)
serial
=
'0%s'
%
cert_2
.
get_serial_number
(
)
self
.
assertEquals
(
crl2
.
get_revoked
()[
0
].
get_serial
(),
serial
)
serial
=
utils
.
getSerialToInt
(
cert_2
)
self
.
assertEquals
(
crl2
.
get_revoked
()[
0
].
get_serial
(),
serial
.
upper
()
)
# wait until cert_2 expire
time
.
sleep
(
3
)
...
...
@@ -561,8 +618,8 @@ m4DpuP4nL0ixQJWZuV+qrx6Tow==
# cert_2 is not longer into crl (expired)
self
.
assertEquals
(
len
(
crl3
.
get_revoked
()),
1
)
serial
=
'0%s'
%
cert_3
.
get_serial_number
(
)
self
.
assertEquals
(
crl3
.
get_revoked
()[
0
].
get_serial
(),
serial
)
serial
=
utils
.
getSerialToInt
(
cert_3
)
self
.
assertEquals
(
crl3
.
get_revoked
()[
0
].
get_serial
(),
serial
.
upper
()
)
def
test_getCertificateRevocationList_with_validation
(
self
):
ca
=
self
.
make_ca
(
158
)
...
...
@@ -592,8 +649,8 @@ m4DpuP4nL0ixQJWZuV+qrx6Tow==
crl_string
=
ca
.
getCertificateRevocationList
()
crl
=
crypto
.
load_crl
(
crypto
.
FILETYPE_PEM
,
crl_string
)
self
.
assertEquals
(
len
(
crl
.
get_revoked
()),
1
)
serial
=
'0%s'
%
cert_2
.
get_serial_number
(
)
self
.
assertEquals
(
crl
.
get_revoked
()[
0
].
get_serial
(),
serial
)
serial
=
utils
.
getSerialToInt
(
cert_2
)
self
.
assertEquals
(
crl
.
get_revoked
()[
0
].
get_serial
(),
serial
.
upper
()
)
with
self
.
assertRaises
(
CertificateVerificationError
):
utils
.
verifyCertificateChain
(
cert_2
,
...
...
caucase/test/teststorage.py
View file @
809c04cd
...
...
@@ -29,6 +29,7 @@ from OpenSSL import crypto, SSL
from
caucase.exceptions
import
(
NoStorage
,
NotFound
,
Found
)
from
sqlite3
import
IntegrityError
from
caucase
import
utils
import
uuid
class
StorageTest
(
unittest
.
TestCase
):
...
...
@@ -113,7 +114,7 @@ class StorageTest(unittest.TestCase):
return
(
req
,
key
)
def
createCertificate
(
self
,
ca_key_pair
,
req
,
expire_sec
=
180
):
serial
=
self
.
_storage
.
getNextCertificateSerialNumber
()
serial
=
uuid
.
uuid1
().
int
cert
=
crypto
.
X509
()
# 3 = v3
cert
.
set_version
(
3
)
...
...
@@ -301,10 +302,6 @@ class StorageTest(unittest.TestCase):
# there is only on csr in the list
self
.
assertEquals
(
len
(
csr_list
),
1
)
def
test_getNextCertificateSerialNumber_empty
(
self
):
serial
=
self
.
_storage
.
getNextCertificateSerialNumber
()
self
.
assertEquals
(
serial
,
1
)
def
test_storeCertificate
(
self
):
keypair
=
self
.
createCAKeyPair
()
self
.
_storage
.
storeCAKeyPair
(
keypair
)
...
...
caucase/web.py
View file @
809c04cd
...
...
@@ -468,10 +468,10 @@ def get_cacert_json():
return
jsonify
(
ca_chain_list
)
def
signcert
(
csr_key
,
redirect_to
=
''
):
def
signcert
(
csr_key
,
subject_dict
=
None
,
redirect_to
=
''
):
try
:
cert_id
=
app
.
config
.
ca
.
createCertificate
(
csr_key
)
cert_id
=
app
.
config
.
ca
.
createCertificate
(
csr_key
,
subject_dict
=
subject_dict
)
except
NotFound
,
e
:
raise
FlaskException
(
"%s"
%
str
(
e
),
status_code
=
404
,
payload
=
{
"name"
:
"FileNotFound"
,
"code"
:
1
})
...
...
@@ -501,7 +501,15 @@ def sign_cert():
raise
FlaskException
(
"'csr_id' parameter is a mandatory parameter"
,
payload
=
{
"name"
:
"MissingParameter"
,
"code"
:
2
})
return
signcert
(
key
)
try
:
subject
=
request
.
form
.
get
(
'subject'
,
''
).
encode
(
'utf-8'
)
subject_dict
=
None
if
subject
:
subject_dict
=
json
.
loads
(
subject
)
return
signcert
(
key
,
subject_dict
=
subject_dict
)
except
ValueError
,
e
:
raise
FlaskException
(
str
(
e
),
payload
=
{
"name"
:
"FileFormat"
,
"code"
:
3
})
@
app
.
route
(
'/crt/renew'
,
methods
=
[
'PUT'
])
def
renew_cert
():
...
...
@@ -693,7 +701,11 @@ def do_signcert_web():
if
not
csr_id
:
raise
FlaskException
(
"'csr_id' parameter is a mandatory parameter"
,
payload
=
{
"name"
:
"MissingParameter"
,
"code"
:
2
})
return
signcert
(
csr_id
,
'manage_csr'
)
try
:
return
signcert
(
csr_id
,
subject_dict
=
None
,
redirect_to
=
'manage_csr'
)
except
ValueError
,
e
:
raise
FlaskException
(
str
(
e
),
payload
=
{
"name"
:
"FileFormat"
,
"code"
:
3
})
@
app
.
route
(
'/admin/deletecsr'
,
methods
=
[
'GET'
])
@
login_required
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment