Commit ce467e26 authored by Rafael Monnerat's avatar Rafael Monnerat

slapos_cloud: use cryptography library to extract certificate information

   The string (certificate) dont contains metadata text anymore only the certificate itself.
parent e79d358e
...@@ -24,7 +24,8 @@ from time import sleep ...@@ -24,7 +24,8 @@ from time import sleep
from zExceptions import Unauthorized from zExceptions import Unauthorized
from unittest import expectedFailure from unittest import expectedFailure
from Products.ERP5Type.Errors import UnsupportedWorkflowMethod from Products.ERP5Type.Errors import UnsupportedWorkflowMethod
from cryptography import x509
from cryptography.x509.oid import NameOID
class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
...@@ -60,10 +61,15 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -60,10 +61,15 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login = certificate_login_list[0] certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated') self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None) self.assertNotEqual(certificate_login.getSourceReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertIn(serial, compute_node_certificate) ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertIn(certificate_login.getReference(), compute_node_certificate.decode('string_escape')) self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
def test_generateCertificate_twice(self): def test_generateCertificate_twice(self):
self.login(self.compute_node.getUserId()) self.login(self.compute_node.getUserId())
...@@ -72,18 +78,23 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -72,18 +78,23 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate') compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate')
self.assertNotEqual(None, compute_node_key) self.assertNotEqual(None, compute_node_key)
self.assertNotEqual(None, compute_node_certificate) self.assertNotEqual(None, compute_node_certificate)
self.assertEqual(None, self.compute_node.getDestinationReference()) self.assertEqual(None, self.compute_node.getSourceReference())
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login") certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 1) self.assertEqual(len(certificate_login_list), 1)
certificate_login = certificate_login_list[0] certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated') self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None) self.assertNotEqual(certificate_login.getSourceReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertIn(serial, compute_node_certificate) ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertIn(certificate_login.getReference(), compute_node_certificate.decode('string_escape')) self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
self.assertRaises(ValueError, self.compute_node.generateCertificate) self.assertRaises(ValueError, self.compute_node.generateCertificate)
self.assertEqual(None, self.portal.REQUEST.get('compute_node_key')) self.assertEqual(None, self.portal.REQUEST.get('compute_node_key'))
self.assertEqual(None, self.portal.REQUEST.get('compute_node_certificate')) self.assertEqual(None, self.portal.REQUEST.get('compute_node_certificate'))
...@@ -272,10 +283,16 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -272,10 +283,16 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login = certificate_login_list[0] certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated') self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None) self.assertNotEqual(certificate_login.getSourceReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertIn(serial, compute_node_certificate) ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertIn(certificate_login.getReference(), compute_node_certificate.decode('string_escape')) self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
self.assertNotEqual(certificate_login.getReference(), self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference()) self.compute_node.getReference())
...@@ -290,7 +307,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -290,7 +307,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
self.assertRaises(ValueError, self.compute_node.revokeCertificate) self.assertRaises(ValueError, self.compute_node.revokeCertificate)
self.assertEqual(None, self.portal.REQUEST.get('compute_node_key')) self.assertEqual(None, self.portal.REQUEST.get('compute_node_key'))
self.assertEqual(None, self.portal.REQUEST.get('compute_node_certificate')) self.assertEqual(None, self.portal.REQUEST.get('compute_node_certificate'))
self.assertEqual(None, self.compute_node.getDestinationReference()) self.assertEqual(None, self.compute_node.getSourceReference())
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login") certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 0) self.assertEqual(len(certificate_login_list), 0)
...@@ -306,13 +323,18 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -306,13 +323,18 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login = certificate_login_list[0] certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated') self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None) self.assertNotEqual(certificate_login.getSourceReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertIn(serial, compute_node_certificate)
self.assertIn(certificate_login.getReference(), compute_node_certificate.decode('string_escape'))
self.assertNotEqual(certificate_login.getReference(), self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference()) self.compute_node.getReference())
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
self.compute_node.revokeCertificate() self.compute_node.revokeCertificate()
self.assertEqual(None, self.portal.REQUEST.get('compute_node_key')) self.assertEqual(None, self.portal.REQUEST.get('compute_node_key'))
self.assertEqual(None, self.portal.REQUEST.get('compute_node_certificate')) self.assertEqual(None, self.portal.REQUEST.get('compute_node_certificate'))
...@@ -337,17 +359,22 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -337,17 +359,22 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login") certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 1) self.assertEqual(len(certificate_login_list), 1)
certificate_login = certificate_login_list[0] certificate_login = certificate_login_list[0]
destination_reference = certificate_login.getDestinationReference() source_reference = certificate_login.getSourceReference()
self.assertEqual(certificate_login.getValidationState(), 'validated') self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None) self.assertNotEqual(certificate_login.getSourceReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertIn(serial, compute_node_certificate)
self.assertIn(certificate_login.getReference(), compute_node_certificate.decode('string_escape'))
self.assertNotEqual(certificate_login.getReference(), self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference()) self.compute_node.getReference())
self.assertNotEqual(None, destination_reference)
ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertTrue(certificate_login.getSourceReference() in compute_node_certificate)
self.assertNotEqual(None, source_reference)
self.compute_node.revokeCertificate() self.compute_node.revokeCertificate()
self.compute_node.generateCertificate() self.compute_node.generateCertificate()
...@@ -358,7 +385,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -358,7 +385,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
self.assertNotEqual(compute_node_certificate, self.portal.REQUEST.get('compute_node_certificate')) self.assertNotEqual(compute_node_certificate, self.portal.REQUEST.get('compute_node_certificate'))
self.assertEqual(certificate_login.getValidationState(), 'invalidated') self.assertEqual(certificate_login.getValidationState(), 'invalidated')
self.assertEqual(certificate_login.getDestinationReference(), destination_reference) self.assertEqual(certificate_login.getSourceReference(), source_reference)
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login") certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
...@@ -366,21 +393,25 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -366,21 +393,25 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
new_certificate_login = [i for i in certificate_login_list \ new_certificate_login = [i for i in certificate_login_list \
if i.getId() != certificate_login.getId()][0] if i.getId() != certificate_login.getId()][0]
destination_reference = certificate_login.getDestinationReference() source_reference = certificate_login.getSourceReference()
self.assertEqual(new_certificate_login.getValidationState(), 'validated') self.assertEqual(new_certificate_login.getValidationState(), 'validated')
self.assertNotEqual(new_certificate_login.getReference(), None) self.assertNotEqual(new_certificate_login.getReference(), None)
self.assertNotEqual(new_certificate_login.getReference(), self.assertNotEqual(new_certificate_login.getReference(),
certificate_login.getReference()) certificate_login.getReference())
self.assertNotEqual(new_certificate_login.getDestinationReference(), None) self.assertNotEqual(new_certificate_login.getSourceReference(), None)
self.assertNotEqual(new_certificate_login.getDestinationReference(), self.assertNotEqual(new_certificate_login.getSourceReference(),
certificate_login.getDestinationReference()) certificate_login.getSourceReference())
serial = '0x%x' % int(new_certificate_login.getDestinationReference(), 16)
compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate') compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate')
self.assertIn(serial, compute_node_certificate)
self.assertIn(new_certificate_login.getReference(), compute_node_certificate.decode('string_escape')) ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertNotIn(certificate_login.getReference(), compute_node_certificate.decode('string_escape')) self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(new_certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(certificate_login.getReference().decode("UTF-8"), cn)
# TODO: Should we check for csr_id
#self.assertIn(certificate_login.getSourceReference(), compute_node_certificate)
self.assertNotEqual(certificate_login.getReference(), self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference()) self.compute_node.getReference())
...@@ -396,17 +427,19 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -396,17 +427,19 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login") certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 1) self.assertEqual(len(certificate_login_list), 1)
certificate_login = certificate_login_list[0] certificate_login = certificate_login_list[0]
destination_reference = certificate_login.getDestinationReference() source_reference = certificate_login.getSourceReference()
self.assertEqual(certificate_login.getValidationState(), 'validated') self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None) self.assertNotEqual(certificate_login.getSourceReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16)
self.assertIn(serial, compute_node_certificate) ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertIn(certificate_login.getReference(), compute_node_certificate.decode('string_escape')) self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(certificate_login.getReference(), self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference()) self.compute_node.getReference())
self.assertNotEqual(None, destination_reference) self.assertNotEqual(None, source_reference)
self.compute_node.revokeCertificate() self.compute_node.revokeCertificate()
self.compute_node.generateCertificate() self.compute_node.generateCertificate()
...@@ -417,7 +450,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -417,7 +450,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
self.assertNotEqual(compute_node_certificate, self.portal.REQUEST.get('compute_node_certificate')) self.assertNotEqual(compute_node_certificate, self.portal.REQUEST.get('compute_node_certificate'))
self.assertEqual(certificate_login.getValidationState(), 'invalidated') self.assertEqual(certificate_login.getValidationState(), 'invalidated')
self.assertEqual(certificate_login.getDestinationReference(), destination_reference) self.assertEqual(certificate_login.getSourceReference(), source_reference)
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login") certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
...@@ -425,22 +458,22 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -425,22 +458,22 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
new_certificate_login = [i for i in certificate_login_list \ new_certificate_login = [i for i in certificate_login_list \
if i.getId() != certificate_login.getId()][0] if i.getId() != certificate_login.getId()][0]
destination_reference = certificate_login.getDestinationReference() source_reference = certificate_login.getSourceReference()
self.assertEqual(new_certificate_login.getValidationState(), 'validated') self.assertEqual(new_certificate_login.getValidationState(), 'validated')
self.assertNotEqual(new_certificate_login.getReference(), None) self.assertNotEqual(new_certificate_login.getReference(), None)
self.assertNotEqual(new_certificate_login.getReference(), self.assertNotEqual(new_certificate_login.getReference(),
certificate_login.getReference()) certificate_login.getReference())
self.assertNotEqual(new_certificate_login.getDestinationReference(), None) self.assertNotEqual(new_certificate_login.getSourceReference(), None)
self.assertNotEqual(new_certificate_login.getDestinationReference(), self.assertNotEqual(new_certificate_login.getSourceReference(),
certificate_login.getDestinationReference()) certificate_login.getSourceReference())
serial = '0x%x' % int(new_certificate_login.getDestinationReference(), 16)
compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate') compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate')
self.assertIn(serial, compute_node_certificate)
self.assertIn(new_certificate_login.getReference(), compute_node_certificate.decode('string_escape')) ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertNotIn(certificate_login.getReference(), compute_node_certificate.decode('string_escape')) self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(new_certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(certificate_login.getReference(), self.assertNotEqual(certificate_login.getReference(),
self.compute_node.getReference()) self.compute_node.getReference())
...@@ -453,7 +486,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -453,7 +486,7 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
self.assertNotEqual(compute_node_certificate, self.portal.REQUEST.get('compute_node_certificate')) self.assertNotEqual(compute_node_certificate, self.portal.REQUEST.get('compute_node_certificate'))
self.assertEqual(new_certificate_login.getValidationState(), 'invalidated') self.assertEqual(new_certificate_login.getValidationState(), 'invalidated')
self.assertNotEqual(new_certificate_login.getDestinationReference(), destination_reference) self.assertNotEqual(new_certificate_login.getSourceReference(), source_reference)
self.assertNotEqual(new_certificate_login.getReference(), None) self.assertNotEqual(new_certificate_login.getReference(), None)
certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login") certificate_login_list = self.compute_node.objectValues(portal_type="Certificate Login")
...@@ -462,22 +495,23 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin): ...@@ -462,22 +495,23 @@ class TestSlapOSCoreComputeNodeSlapInterfaceWorkflow(SlapOSTestCaseMixin):
third_certificate_login = [i for i in certificate_login_list \ third_certificate_login = [i for i in certificate_login_list \
if i.getId() not in [certificate_login.getId(), new_certificate_login.getId()]][0] if i.getId() not in [certificate_login.getId(), new_certificate_login.getId()]][0]
destination_reference = new_certificate_login.getDestinationReference() source_reference = new_certificate_login.getSourceReference()
self.assertEqual(third_certificate_login.getValidationState(), 'validated') self.assertEqual(third_certificate_login.getValidationState(), 'validated')
self.assertNotEqual(third_certificate_login.getReference(), None) self.assertNotEqual(third_certificate_login.getReference(), None)
self.assertNotEqual(third_certificate_login.getReference(), self.assertNotEqual(third_certificate_login.getReference(),
certificate_login.getReference()) certificate_login.getReference())
self.assertNotEqual(third_certificate_login.getDestinationReference(), None) self.assertNotEqual(third_certificate_login.getSourceReference(), None)
self.assertNotEqual(third_certificate_login.getDestinationReference(), self.assertNotEqual(third_certificate_login.getSourceReference(),
new_certificate_login.getDestinationReference()) new_certificate_login.getSourceReference())
serial = '0x%x' % int(third_certificate_login.getDestinationReference(), 16)
compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate') compute_node_certificate = self.portal.REQUEST.get('compute_node_certificate')
self.assertIn(serial, compute_node_certificate)
self.assertIn(third_certificate_login.getReference(), compute_node_certificate.decode('string_escape')) ssl_certificate = x509.load_pem_x509_certificate(compute_node_certificate)
self.assertNotIn(new_certificate_login.getReference(), compute_node_certificate.decode('string_escape')) self.assertEqual(len(ssl_certificate.subject), 2)
cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(third_certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(new_certificate_login.getReference().decode("UTF-8"), cn)
self.assertNotEqual(third_certificate_login.getReference(), self.assertNotEqual(third_certificate_login.getReference(),
self.compute_node.getReference()) self.compute_node.getReference())
......
...@@ -24,10 +24,12 @@ from erp5.component.document.SoftwareInstance import SoftwareInstance, \ ...@@ -24,10 +24,12 @@ from erp5.component.document.SoftwareInstance import SoftwareInstance, \
import transaction import transaction
from time import sleep from time import sleep
from zExceptions import Unauthorized from zExceptions import Unauthorized
from cryptography import x509
from cryptography.x509.oid import NameOID
class TestSlapOSCoreInstanceSlapInterfaceWorkflow(SlapOSTestCaseMixin): class TestSlapOSCoreInstanceSlapInterfaceWorkflow(SlapOSTestCaseMixin):
"""Tests instance.requestInstance""" """Tests instance.requestInstance"""
launch_caucase = 1 launch_caucase = 1
def afterSetUp(self): def afterSetUp(self):
...@@ -1330,7 +1332,6 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin): ...@@ -1330,7 +1332,6 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin):
self.software_instance.generateCertificate() self.software_instance.generateCertificate()
self.assertNotEqual(self.software_instance.getSslKey(), None) self.assertNotEqual(self.software_instance.getSslKey(), None)
self.assertNotEqual(self.software_instance.getSslCertificate(), None) self.assertNotEqual(self.software_instance.getSslCertificate(), None)
self.assertEqual(self.software_instance.getDestinationReference(), None)
certificate_login_list = self.software_instance.objectValues(portal_type="Certificate Login") certificate_login_list = self.software_instance.objectValues(portal_type="Certificate Login")
self.assertEqual(len(certificate_login_list), 1) self.assertEqual(len(certificate_login_list), 1)
...@@ -1338,11 +1339,12 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin): ...@@ -1338,11 +1339,12 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin):
self.assertEqual(certificate_login.getValidationState(), 'validated') self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None)
serial = '0x%x' % int(certificate_login.getDestinationReference(), 16) self.assertNotEqual(certificate_login.getSourceReference(), None)
self.assertIn(serial, self.software_instance.getSslCertificate()) ssl_certificate = x509.load_pem_x509_certificate(self.software_instance.getSslCertificate())
self.assertIn(certificate_login.getReference(), \ self.assertEqual(len(ssl_certificate.subject), 2)
self.software_instance.getSslCertificate().decode('string_escape')) cn = [i.value for i in ssl_certificate.subject if i.oid == NameOID.COMMON_NAME][0]
self.assertEqual(certificate_login.getReference().decode("UTF-8"), cn)
self.assertRaises(ValueError, self.software_instance.generateCertificate) self.assertRaises(ValueError, self.software_instance.generateCertificate)
def test_revokeCertificate(self): def test_revokeCertificate(self):
...@@ -1371,7 +1373,7 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin): ...@@ -1371,7 +1373,7 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin):
certificate_login = certificate_login_list[0] certificate_login = certificate_login_list[0]
self.assertEqual(certificate_login.getValidationState(), 'validated') self.assertEqual(certificate_login.getValidationState(), 'validated')
self.assertNotEqual(certificate_login.getReference(), None) self.assertNotEqual(certificate_login.getReference(), None)
self.assertNotEqual(certificate_login.getDestinationReference(), None) self.assertNotEqual(certificate_login.getSourceReference(), None)
self.assertNotEqual(self.software_instance.getSslKey(), self.assertNotEqual(self.software_instance.getSslKey(),
ssl_key) ssl_key)
...@@ -1398,11 +1400,11 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin): ...@@ -1398,11 +1400,11 @@ class TestSlapOSCoreInstanceSlapInterfaceWorkflowTransfer(SlapOSTestCaseMixin):
self.assertEqual(another_certificate_login.getValidationState(), 'validated') self.assertEqual(another_certificate_login.getValidationState(), 'validated')
self.assertNotEqual(another_certificate_login.getReference(), None) self.assertNotEqual(another_certificate_login.getReference(), None)
self.assertNotEqual(another_certificate_login.getDestinationReference(), None) self.assertNotEqual(another_certificate_login.getSourceReference(), None)
self.assertEqual(certificate_login.getValidationState(), 'invalidated') self.assertEqual(certificate_login.getValidationState(), 'invalidated')
self.assertNotEqual(certificate_login.getReference(), self.assertNotEqual(certificate_login.getReference(),
another_certificate_login.getReference()) another_certificate_login.getReference())
self.assertNotEqual(certificate_login.getDestinationReference(), self.assertNotEqual(certificate_login.getSourceReference(),
another_certificate_login.getDestinationReference()) another_certificate_login.getSourceReference())
...@@ -618,7 +618,7 @@ class TestPerson_get_Certificate(TestSlapOSHalJsonStyleMixin): ...@@ -618,7 +618,7 @@ class TestPerson_get_Certificate(TestSlapOSHalJsonStyleMixin):
self.assertSameSet(response_dict.keys(), ["common_name", "certificate", "id", "key"]) self.assertSameSet(response_dict.keys(), ["common_name", "certificate", "id", "key"])
self.assertEqual(response_dict["id"], login.getDestinationReference()) self.assertEqual(response_dict["id"], login.getSourceReference())
self.assertEqual(json.dumps(response_dict["common_name"]), json.dumps(login.getReference())) self.assertEqual(json.dumps(response_dict["common_name"]), json.dumps(login.getReference()))
self.assertEqual(self.portal.REQUEST.RESPONSE.getStatus(), 200) self.assertEqual(self.portal.REQUEST.RESPONSE.getStatus(), 200)
...@@ -632,7 +632,7 @@ class TestPerson_get_Certificate(TestSlapOSHalJsonStyleMixin): ...@@ -632,7 +632,7 @@ class TestPerson_get_Certificate(TestSlapOSHalJsonStyleMixin):
self.assertEqual("validated" , login.getValidationState()) self.assertEqual("validated" , login.getValidationState())
self.assertEqual("validated" , new_login.getValidationState()) self.assertEqual("validated" , new_login.getValidationState())
self.assertNotEqual(login.getReference(), new_login.getReference()) self.assertNotEqual(login.getReference(), new_login.getReference())
self.assertNotEqual(login.getDestinationReference(), new_login.getDestinationReference()) self.assertNotEqual(login.getSourceReference(), new_login.getSourceReference())
self.assertSameSet(new_response_dict.keys(), ["common_name", "certificate", "id", "key"]) self.assertSameSet(new_response_dict.keys(), ["common_name", "certificate", "id", "key"])
self.assertEqual(json.dumps(new_response_dict["common_name"]), json.dumps(new_login.getReference())) self.assertEqual(json.dumps(new_response_dict["common_name"]), json.dumps(new_login.getReference()))
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment