Commit e7636b86 authored by zhifan huang's avatar zhifan huang Committed by zhifan huang

test: add unit tests for registry, conf, tunnel

registry: add test for RegistryServer and registryClient,
testRegistryServer mainly test methods concerned to http rpc request.
Other methods that call request_dump like getBootStrpPeer is not
include.
testRegistryClient test the rpc call with or not with "cn" parameter.

cli.conf: test each situation call the cli conf.

tunnel: add test for BaseTunnelManager, MultGatewayManager

tools is a util cotain method to make cert and ket files
parent 308eaa7f
__all__ = ["test_unit"]
-----BEGIN DH PARAMETERS-----
MIIBCAKCAQEAoXvAhNiPPi9WTYjDhkrLfSGV7lQdAnKJHohSdsR85SdH8u9whvlb
a4Jt2aEJCFqL1LMziF8Dy3ipcUe/xYbmZJ+w1wNAnuzzeJWH5z57duZy6jPvPxsW
uLTsnjlUn+nYG7vrkmWEqgzQDLY2aV9maPREmqAvxorIdffWXKsh6wyBhVuOghC/
8pqxDY5C+VewBMkqibZnNtQ8IWMw+6SmPKx4bLA44P1VlfpF+3VBNgs3JD26djFX
vJs+Pd4+j0GM2hPlxTSIB12cKSiDix0YXdHrVQtnatsnG3wyzVSTKbvxQRMEFM1X
BnUtiqlB3IlGCg6RWXRGcdEZ50blLQZ0kwIBAg==
-----END DH PARAMETERS-----
{
"verbose": 1,
"ca": "root.crt",
"port": 9090,
"anonymous_prefix_length": null,
"smtp_pwd": null,
"client_count": 10,
"authorized_origin": [
"127.0.0.1",
"::1"
],
"bind6": "::",
"ipv4": null,
"prefix_length": 16,
"min_protocol": 1,
"smtp_starttls": false,
"run": "run",
"bind4": "0.0.0.0",
"dh": "dh2048.pem",
"db": "registry.db",
"mailhost": "miku@miku.com",
"key": "registry.key",
"encrypt": false,
"logfile": "registry.log",
"max_clients": null,
"smtp_user": null,
"same_country": null,
"tunnel_refresh": 300,
"hello": 15
}
\ No newline at end of file
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEA2yS7SlkxfMc8ydVbFPv8kZFQQp1dyda4Dt41P3+klO01TE0k
JY25pvpBcYFz3tbADB/m6zJlvuA9eVTy7cxodioRaYiUCZjgUbXZ2/BziUIGygDL
WcYbvf0A1aX7qGYPx1cLwLxvwZlHa+tVB/MKgRDC+L/qRAftRc6+uMrdjddUaKbC
B2k8GPxUWPpKeSB4ymXDXeuey0nHZ7KgjHqRcHKVEK24eBR9NR38Epm4bAkp8GAT
wA9qhDLlKJZXqMJCl3VilOaRpwoO9dmTvIPEHx0R9DNFFRUlc4DPq60UIYsfvUkp
467PfpJH7aXLM7PIyvU0aZ5QHtdcKHXnsRc+yQIDAQABAoIBACvgpOd0CGaVdeRr
pbsD4UQ8NjfAToEVTvEbKMo4AnoXLK7EW1JxmBSI0wWpB8w8b2N+F7xL8PdQ6r4a
djGK1fei4K2ivRFW3MM/iAlzkY6P+9ACbLTi57cYq0wb2dGT7eDZ2u6STEYVLKm9
Ct92mEnTU1Z/Bqbsd2Ocy68wX0AA2Ho/ktpL3hGhlBTOm9PwMV/aj/99YjEN+iLD
Q2e/YGNOslHvil4YErC01g6WR2SMujhQ/42ValYpacIVlirajaYgKdQ0PctwB1es
6Miwbwt/BrA8yH8gDCpHdqrfjAnb1J1xr5idu3i09Dt1F3FxHLKfU8DOng2WQD9v
HCNNaIkCgYEA7XllFs490pd7fraEde3IIRUVd/fXu/1qWjjpbijaMirtH9zWOE8b
JSklLrjYDXd6OS1dDft4DeRHbKOj3qe51Ux6kt/Zw1WdTfo3UtNjcK+EETpJuKyG
ctQHQG8HiQtgN36mkVUnc0W5IsURUnjNFO51r9V9U5ItYcBon5bGIUsCgYEA7D1A
3fMoDKbZzVrtDue9h+UmyCPCJgZASTt6iEm98guswbSQ/Qa1C94YbmQXAGO4NPwS
b/b78/JK6VTZpuuPB97m/antq1MlI8iNwomp1QR9m7jKg7WHui+nvSewh1o35B90
GstfIWmm6Pvqw0iu8UgSTEW7y3/SK51nEknPp7sCgYEAy4hiNfuqTRZ8SAxS12hn
QMN7VQldI8h9ILrqhvoImTrlZYu3JyfV0jHDppnSwygF33+b4+IF8ZIYDWrrhmgn
BEO6QqwNTjfQzQaJ6Dk5X1lvTfyxNtDXow9K79S5lqHjY2zvglyDpW660Kwqvo6+
5xPCVmQaOEhvEPsCMNXfFqUCgYEAjztINAm0c49KKNcDOfFJmbZXACumEBXkLkKQ
tUc4kiN/9+X5rl+9r1dWKsAmrgbH7eATca0m764suzHF0Q2rJ9N+67d2sVR1BTAY
uyVqQgw5+AtfReHvS/SO2AHTZw1NK9PiOkiqAgEjwMjUeth7sTDIX1Q8W1LBY85I
au8zpvcCgYEA2ZpA8QsbU6mijBsrc4aOSqnP4VBiqS5aLNqwOcxwlYtSn5/8xTvb
eI1imNQf5Js4l9/7fRTuxSYhZIPM8SLbHWCkesJubiiKc+R+m7uBZ+0h7W6ZtlY3
avFFoiUUtviOdvqBketnWkIiwzL8lzjHOwjYvdRiGv9Fkoty0SKXbp8=
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIID4jCCAsqgAwIBAgIHASABDbgAQjANBgkqhkiG9w0BAQsFADCBhjELMAkGA1UE
BhMCRlIxDjAMBgNVBAgMBUxJTExFMQ4wDAYDVQQHDAVMSUxMRTEhMB8GA1UECgwY
SW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMQwwCgYDVQQDDANIWkYxJjAkBgkqhkiG
9w0BCQEWF3poaWZhbi5odWFuZ0BuZXhlZGkuY29tMB4XDTIyMDIxNDA4NDQxN1oX
DTMyMDIxMjA4NDQxN1owgYYxCzAJBgNVBAYTAkZSMQ4wDAYDVQQIDAVMSUxMRTEO
MAwGA1UEBwwFTElMTEUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0
ZDEMMAoGA1UEAwwDSFpGMSYwJAYJKoZIhvcNAQkBFhd6aGlmYW4uaHVhbmdAbmV4
ZWRpLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMvG4CCFLEJ8
vl9KiCXkfqrPhlSwktKs0PrT2kxncIJGwlkMdVlgKY5AG41ayf3+ZhpXKOWA7CDw
D653ugPc6JADh9fxWCAxzeXyvkBVMRBZ1zADDdiLKrTox8pCPu0wVPnxdsq+4TZW
lB3CGkn2wxPNenI4aFiK6J79C35kX6pMupDgbwOUuh4e+Z+fTe2ouAGRlG/s69F5
5ehjChUpNKRrIekKdFzVF76XK5twH9N2x6Iyd+dFfrQ0qiAhDugKWbqVxD1bdPlQ
rtU4LpQ9z1c3cXVGViopL7CMZN3qT4x9bA/j41ISDpdRm9cwJGLuEo+bWTf+nFaM
jzQAoPbyPpMCAwEAAaNTMFEwHQYDVR0OBBYEFOMXLeN2qnczE67jRn6PTAarT/nO
MB8GA1UdIwQYMBaAFOMXLeN2qnczE67jRn6PTAarT/nOMA8GA1UdEwEB/wQFMAMB
Af8wDQYJKoZIhvcNAQELBQADggEBAEfYZF1NijKxcB8dU58mJWUtvx3LNUfOlasB
4ykaEzDsA2zpnJ31msJ86G4VHy5umA4bbX80Eo1fBXT4W7GmfakbYIahb5A05Vrf
b1lggQAGsVEptfAAFgRynaPOCgyBmor55izPBt64jnZOS1Hgx8kmSowDYR+CVqc6
Ur+9e71jmkTv2LQwOl0fRD77vw2QZMV68C7y3SY32ErPb2anGuBzdrlrGHFy4Jam
FdiYcw7uEkdrJX3eXRI9gUBcIuljTiYQv2NZzhmeL+qbWDb/DI0NXNvZ8oe0ZHOd
UKfcEWjnPMyZHpuyPOeV6ywTLOdHXG9GqwTgAfln+vOhoKQ8fkE=
-----END CERTIFICATE-----
"""Re6st unittest module
"""
# contatin the test case
__all__ = ["test_registry",
"test_registry_client",
"test_conf",
"test_tunnel"]
#!/usr/bin/python2
""" unit test for re6st-conf
"""
import os
import sys
import unittest
from shutil import rmtree
from StringIO import StringIO
from mock import patch
from re6st.cli import conf
from re6st.tests.tools import generate_cert, serial2prefix
# gloable value from conf.py
conf_path = 're6stnet.conf'
ca_path = 'ca.crt'
cert_path = 'cert.crt'
key_path = 'cert.key'
# TODO test for is needed
class TestConf(unittest.TestCase):
""" Unit test case for re6st-conf"""
@classmethod
def setUpClass(cls):
# because conf will change directory
cls.origin_dir = os.getcwd()
cls.work_dir = "temp"
if not os.path.exists(cls.work_dir):
os.makedirs(cls.work_dir)
# mocked service cert and pkey
with open("root.crt") as f:
cls.cert = f.read()
with open("registry.key") as f:
cls.pkey = f.read()
cls.command = "re6st-conf --registry http://localhost/" \
" --dir %s" % cls.work_dir
cls.serial = 0
cls.stdout = sys.stdout
cls.null = open(os.devnull, 'w')
sys.stdout = cls.null
@classmethod
def tearDownClass(cls):
# remove work directory
rmtree(cls.work_dir)
cls.null.close()
sys.stdout = cls.stdout
def setUp(self):
patcher = patch("re6st.registry.RegistryClient")
self.addCleanup(patcher.stop)
self.client = patcher.start()()
self.client.getCa.return_value = self.cert
prefix = serial2prefix(self.serial)
self.client.requestCertificate.side_effect = \
lambda _, req: generate_cert(self.cert, self.pkey, req, prefix, self.serial)
self.serial += 1
def tearDown(self):
# go back to original dir
os.chdir(self.origin_dir)
@patch("__builtin__.raw_input")
def test_basic(self, mock_raw_input):
""" go through all the step
getCa, requestToken, requestCertificate
"""
mail = "example@email.com"
token = "a_token"
mock_raw_input.side_effect = [mail, token]
command = self.command \
+ " --fingerprint sha1:a1861330f1299b98b529fa52c3d8e5d1a94dc63a" \
+ " --req L lille"
sys.argv = command.split()
conf.main()
self.client.requestToken.assert_called_once_with(mail)
self.assertEqual(self.client.requestCertificate.call_args[0][0],
token)
# created file part
self.assertTrue(os.path.exists(ca_path))
self.assertTrue(os.path.exists(key_path))
self.assertTrue(os.path.exists(cert_path))
self.assertTrue(os.path.exists(conf_path))
def test_fingerprint_mismatch(self):
""" wrong fingerprint with same size,
"""
command = self.command \
+ " --fingerprint sha1:a1861330f1299b98b529fa52c3d8e5d1a94dc000"
sys.argv = command.split()
with self.assertRaises(SystemExit) as e:
conf.main()
self.assertIn("fingerprint doesn't match", str(e.exception))
def test_ca_only(self):
""" only create ca file and exit
"""
command = self.command + " --ca-only"
sys.argv = command.split()
with self.assertRaises(SystemExit):
conf.main()
self.assertTrue(os.path.exists(ca_path))
def test_anonymous(self):
""" with args anonymous, so script will use '' as token
"""
command = self.command + " --anonymous"
sys.argv = command.split()
conf.main()
self.assertEqual(self.client.requestCertificate.call_args[0][0],
'')
def test_anonymous_failed(self):
""" with args anonymous and token, so script will failed
"""
command = self.command + " --anonymous" \
+ " --token a"
sys.argv = command.split()
text = StringIO()
old_err = sys.stderr
sys.stderr = text
with self.assertRaises(SystemExit):
conf.main()
# check the error message
self.assertIn("anonymous conflicts", text.getvalue())
sys.stderr = old_err
def test_req_reserved(self):
""" with args req, but contain reserved value
"""
command = self.command + " --req CN 1111"
sys.argv = command.split()
with self.assertRaises(SystemExit) as e:
conf.main()
self.assertIn("CN field", str(e.exception))
def test_get_null_cert(self):
""" simulate fake token, and get null cert
"""
command = self.command + " --token a"
sys.argv = command.split()
self.client.requestCertificate.side_effect = "",
with self.assertRaises(SystemExit) as e:
conf.main()
self.assertIn("invalid or expired token", str(e.exception))
if __name__ == "__main__":
unittest.main()
This diff is collapsed.
import sys
import os
import unittest
import hmac
import httplib
import base64
import hashlib
from mock import Mock, patch
from re6st import registry
class TestRegistryClient(unittest.TestCase):
@classmethod
def setUpClass(cls):
server_url = "http://10.0.0.2/"
cls.client = registry.RegistryClient(server_url)
cls.client._conn = Mock()
def test_init(self):
url1 = "https://localhost/example/"
url2 = "http://10.0.0.2/"
client1 = registry.RegistryClient(url1)
client2 = registry.RegistryClient(url2)
self.assertEqual(client1._path, "/example")
self.assertEqual(client1._conn.host, "localhost")
self.assertIsInstance(client1._conn, httplib.HTTPSConnection)
self.assertIsInstance(client2._conn, httplib.HTTPConnection)
def test_rpc_hello(self):
prefix = "0000000011111111"
protocol = "7"
body = "a_hmac_key"
query = "/hello?client_prefix=0000000011111111&protocol=7"
response = fakeResponse(body, httplib.OK)
self.client._conn.getresponse.return_value = response
res = self.client.hello(prefix, protocol)
self.assertEqual(res, body)
conn = self.client._conn
conn.putrequest.assert_called_once_with('GET', query, skip_accept_encoding=1)
conn.close.assert_not_called()
conn.endheaders.assert_called_once()
def test_rpc_with_cn(self):
query = "/getNetworkConfig?cn=0000000011111111"
cn = "0000000011111111"
# hmac part
self.client._hmac = None
self.client.hello = Mock(return_value = "aaabbb")
self.client.cert = Mock()
key = "this_is_a_key"
self.client.cert.decrypt.return_value = key
h = hmac.HMAC(key, query, hashlib.sha1).digest()
key = hashlib.sha1(key).digest()
# response part
body = None
response = fakeResponse(body, httplib.NO_CONTENT)
response.msg = dict(Re6stHMAC=hmac.HMAC(key, body, hashlib.sha1).digest())
self.client._conn.getresponse.return_value = response
res = self.client.getNetworkConfig(cn)
self.client.cert.verify.assert_called_once_with("bbb", "aaa")
self.assertEqual(self.client._hmac, hashlib.sha1(key).digest())
conn = self.client._conn
conn.putheader.assert_called_with("Re6stHMAC", base64.b64encode(h))
conn.close.assert_called_once()
self.assertEqual(res, body)
class fakeResponse:
def __init__(self, body, status, reason = None):
self.body = body
self.status = status
self.reason = reason
def read(self):
return self.body
if __name__ == "__main__":
unittest.main()
__all__ = ["test_multi_gateway_manager", "test_base_tunnel_manager"]
#!/usr/bin/python2
import os
import sys
import unittest
import time
from mock import patch, Mock
from re6st import tunnel
from re6st import x509
from re6st import cache
from re6st.tests import tools
class testBaseTunnelManager(unittest.TestCase):
@classmethod
def setUpClass(cls):
ca_key, ca = tools.create_ca_file("ca.key", "ca.cert")
tools.create_cert_file("node.key", "node.cert", ca, ca_key, "00000001", 1)
cls.cert = x509.Cert("ca.cert", "node.key", "node.cert")
cls.control_socket = "babeld.sock"
def setUp(self):
patcher = patch("re6st.cache.Cache")
pacher_sock = patch("socket.socket")
self.addCleanup(patcher.stop)
self.addCleanup(pacher_sock.stop)
self.cache = patcher.start()()
self.sock = pacher_sock.start()
self.cache.same_country = False
address = [(2, [('10.0.0.2', '1194', 'udp'), ('10.0.0.2', '1194', 'tcp')])]
self.tunnel = tunnel.BaseTunnelManager(self.control_socket,
self.cache, self.cert, None, address)
def tearDown(self):
self.tunnel.close()
del self.tunnel
@patch("re6st.tunnel.BaseTunnelManager._babel_dump_one", create=True)
@patch("re6st.tunnel.BaseTunnelManager._babel_dump_two", create=True)
def test_babel_dump(self, two, one):
""" case two func in requesting_dump"""
self.tunnel._BaseTunnelManager__requesting_dump = set(['one', 'two'])
self.tunnel.babel_dump()
# assert is empty
self.assertFalse(self.tunnel._BaseTunnelManager__requesting_dump)
one.assert_called_once()
two.assert_called_once()
@patch("re6st.ctl.Babel.request_dump")
def test_request_dump_empty(self, request_dump):
"""case when self.__requesting_dump is None or empty"""
reason = "rina"
self.tunnel._BaseTunnelManager__request_dump(reason)
self.assertEqual(self.tunnel._BaseTunnelManager__requesting_dump, set([reason]))
request_dump.assert_called_once()
@patch("re6st.ctl.Babel.request_dump")
def test___request_dump_not_empty(self, request_dump):
"""case when self.__requesting_dump is not empty"""
self.tunnel._BaseTunnelManager__requesting_dump = set(["rina"])
reason = "reason"
self.tunnel._BaseTunnelManager__request_dump(reason)
self.assertEqual(self.tunnel._BaseTunnelManager__requesting_dump, set([reason, "rina"]))
request_dump.assert_not_called()
def test_selectTimeout_add_callback(self):
"""case add new callback"""
self.tunnel._timeouts = [(1, self.tunnel.close)]
callback = self.tunnel.babel_dump
self.tunnel.selectTimeout(10, callback)
self.assertIn((10, callback), self.tunnel._timeouts)
def test_selectTimeout_removing(self):
"""case remove a callback"""
removed = self.tunnel.babel_dump
self.tunnel._timeouts = [(1, self.tunnel.close), (10, removed)]
self.tunnel.selectTimeout(None, removed)
self.assertEqual(self.tunnel._timeouts, [(1, self.tunnel.close)])
def test_selectTimeout_update(self):
"""case update a callback"""
updated = self.tunnel.babel_dump
self.tunnel._timeouts = [(1, self.tunnel.close), (10, updated)]
self.tunnel.selectTimeout(100, updated)
self.assertEqual(self.tunnel._timeouts, [(1, self.tunnel.close), (100, updated)])
@patch("re6st.tunnel.BaseTunnelManager.selectTimeout")
def test_invalidatePeers(self, selectTimeout):
"""normal case, stop_date: p2 < now < p1 < p3
expect:
_peers -> [p1, p3]
next = p1.stoptime
"""
p1 = x509.Peer("00")
p2 = x509.Peer("01")
p3 = x509.Peer("10")
p1.stop_date = time.time() + 1000
p2.stop_date = 1
p3.stop_date = p1.stop_date + 500
self.tunnel._peers = [p1, p2, p3]
self.tunnel.invalidatePeers()
self.assertEqual(self.tunnel._peers, [p1, p3])
selectTimeout.assert_called_once_with(p1.stop_date, self.tunnel.invalidatePeers)
# Because _makeTunnel is defined in sub class of BaseTunnelManager, so i comment
# the follow test
# @patch("re6st.tunnel.BaseTunnelManager._makeTunnel", create=True)
# def test_processPacket_address_with_msg_peer(self, makeTunnel):
# """code is 1, peer and msg not none """
# c = chr(1)
# msg = "address"
# peer = x509.Peer("000001")
# self.tunnel._connecting = {peer}
# self.tunnel._processPacket(c + msg, peer)
# self.cache.addPeer.assert_called_once_with(peer, msg)
# self.assertFalse(self.tunnel._connecting)
# makeTunnel.assert_called_once_with(peer, msg)
def test_processPacket_address(self):
"""code is 1, for address. And peer or msg are none"""
c = chr(1)
self.tunnel._address = {1: "1,1", 2: "2,2"}
res = self.tunnel._processPacket(c)
self.assertEqual(res, "1,1;2,2")
def test_processPacket_address_with_peer(self):
"""code is 1, peer is not none, msg is none
in my opion, this function return address in form address,port,portocl
and each address join by ;
it will truncate address which has more than 3 element
"""
c = chr(1)
peer = x509.Peer("000001")
peer.protocol = 1
self.tunnel._peers.append(peer)
self.tunnel._address = {1: "1,1,1;0,0,0", 2: "2,2,2,2"}
res = self.tunnel._processPacket(c, peer)
self.assertEqual(res, "1,1,1;0,0,0;2,2,2")
@patch("re6st.tunnel.BaseTunnelManager.selectTimeout")
def test_processPacket_version(self, selectTimeout):
"""code is 0, for network version, peer is none"""
c = chr(0)
self.tunnel._processPacket(c)
self.assertEqual(selectTimeout.call_args[0][1], self.tunnel.newVersion)
@patch("re6st.x509.Cert.verifyVersion", Mock(return_value=True))
@patch("re6st.tunnel.BaseTunnelManager.selectTimeout")
def test_processPacket_version(self, selectTimeout):
"""code is 0, for network version, peer is not none
2 case, one modify the version, one not
"""
c = chr(0)
peer = x509.Peer("000001")
version1 = "00003"
version2 = "00007"
self.tunnel._version = "00005"
self.tunnel._peers.append(peer)
res = self.tunnel._processPacket(c + version1, peer)
self.tunnel._processPacket(c + version2, peer)
self.assertEqual(res, "00005")
self.assertEqual(self.tunnel._version, version2)
self.assertEqual(peer.version, version2)
self.assertEqual(selectTimeout.call_args[0][1], self.tunnel.newVersion)
if __name__ == "__main__":
unittest.main()
#!/usr/bin/python2
import os
import sys
import unittest
from mock import patch
from re6st import tunnel
class testMultGatewayManager(unittest.TestCase):
def setUp(self):
self.manager = tunnel.MultiGatewayManager(lambda x:x+x)
patcher = patch("subprocess.check_call")
self.addCleanup(patcher.stop)
self.sub = patcher.start()
@patch("logging.trace", create=True)
def test_add(self, log_trace):
"""add new dest twice"""
dest = "dest"
self.manager.add(dest, True)
self.manager.add(dest, True)
self.assertEqual(self.manager[dest][1], 1)
self.sub.assert_called_once()
cmd = log_trace.call_args[0][1]
self.assertIn(dest+dest, cmd)
self.assertIn("add", cmd)
def test_add_null_route(self):
""" add two dest which don't call ip route"""
dest1 = "dest1"
dest2 = ""
self.manager.add(dest1, False)
self.manager.add(dest2, True)
self.sub.assert_not_called()
@patch("logging.trace", create=True)
def test_remove(self, log_trace):
"remove a dest twice"
dest = "dest"
gw = "gw"
self.manager[dest] = [gw,1]
self.manager.remove(dest)
self.assertEqual(self.manager[dest][1], 0)
self.manager.remove(dest)
self.sub.assert_called_once()
self.assertIsNone(self.manager.get(dest))
cmd = log_trace.call_args[0][1]
self.assertIn(gw, cmd)
self.assertIn("del", cmd)
def test_remove_null_gw(self):
""" remove a dest which don't have gw"""
dest = "dest"
gw = ""
self.manager[dest] = [gw, 0]
self.manager.remove(dest)
self.assertIsNone(self.manager.get(dest))
self.sub.assert_not_called()
if __name__ == "__main__":
unittest.main()
\ No newline at end of file
import sys
import os
import time
import subprocess
from OpenSSL import crypto
from re6st import registry
def generate_csr():
"""generate a certificate request
return:
crypto.Pekey and crypto.X509Req both in pem format
"""
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
req = crypto.X509Req()
req.set_pubkey(key)
req.get_subject().CN = "test ca"
req.sign(key, 'sha256')
csr = crypto.dump_certificate_request(crypto.FILETYPE_PEM, req)
pkey = crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
return pkey, csr
def generate_cert(ca, ca_key, csr, prefix, serial, not_after=None):
"""generate a certificate
return
crypto.X509Cert in pem format
"""
if type(ca) is str:
ca = crypto.load_certificate(crypto.FILETYPE_PEM, ca)
if type(ca_key) is str:
ca_key = crypto.load_privatekey(crypto.FILETYPE_PEM, ca_key)
req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
cert = crypto.X509()
cert.gmtime_adj_notBefore(0)
if not_after:
cert.set_notAfter(
time.strftime("%Y%m%d%H%M%SZ", time.gmtime(not_after)))
else:
cert.gmtime_adj_notAfter(registry.RegistryServer.cert_duration)
subject = req.get_subject()
if prefix:
subject.CN = prefix2cn(prefix)
cert.set_subject(req.get_subject())
cert.set_issuer(ca.get_subject())
cert.set_pubkey(req.get_pubkey())
cert.set_serial_number(serial)
cert.sign(ca_key, 'sha512')
cert = crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
return cert
def create_cert_file(pkey_file, cert_file, ca, ca_key, prefix, serial):
pkey, csr = generate_csr()
cert = generate_cert(ca, ca_key, csr, prefix, serial)
with open(pkey_file, 'w') as f:
f.write(pkey)
with open(cert_file, 'w') as f:
f.write(cert)
return pkey, cert
def create_ca_file(pkey_file, cert_file):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(registry.RegistryServer.cert_duration)
subject= cert.get_subject()
subject.C = "FR"
subject.ST = "Lille"
subject.L = "Lille"
subject.O = "nexedi"
subject.CN = "TEST-CA"
cert.set_issuer(cert.get_subject())
cert.set_serial_number(10000)
cert.set_pubkey(key)
cert.sign(key, "sha512")
with open(pkey_file, 'w') as pkey_file:
pkey_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
with open(cert_file, 'w') as cert_file:
cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
return key, cert
def prefix2cn(prefix):
return "%u/%u" % (int(prefix, 2), len(prefix))
def serial2prefix(serial):
return bin(serial)[2:].rjust(16, '0')
# pkey: private key
def decrypt(pkey, incontent):
with open("node.key", 'w') as f:
f.write(pkey)
args = "openssl rsautl -decrypt -inkey node.key".split()
p = subprocess.Popen(
args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
outcontent, err = p.communicate(incontent)
return outcontent
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment