Commit a6acb627 authored by Łukasz Nowak's avatar Łukasz Nowak

Really sign and verify.

Previously just empty strings were signed and verified.

Squashed commit of the following:

commit b3e1bf51989599a136f2a42f63688aa3048aae12
Author: Łukasz Nowak <luke@nexedi.com>
Date:   Thu Sep 1 15:59:40 2011 +0200

    Really sign the content.

commit c4a6abd652f7ca19fad66188ec7c782956392b68
Author: Łukasz Nowak <luke@nexedi.com>
Date:   Thu Sep 1 15:59:07 2011 +0200

    Show that hacked content is not trusted.

commit 08340918639598b227dd346d4ab6054428296d60
Author: Łukasz Nowak <luke@nexedi.com>
Date:   Thu Sep 1 15:12:09 2011 +0200

    Positive test of signed content.

commit a00a2a5b788542d324561d4d6418b86c1cb9f86d
Author: Łukasz Nowak <luke@nexedi.com>
Date:   Thu Sep 1 15:00:54 2011 +0200

    Skip if no standalone server is available.

commit f31ef55187c46305b2ae8034ee496cc28af12893
Author: Łukasz Nowak <luke@nexedi.com>
Date:   Thu Sep 1 14:32:39 2011 +0200

    Move offline tests to one place.

    Remove non sense tests.

commit d725dcd24b95f5bbbb340a6fe6ce5ea7f8aed4d5
Author: Łukasz Nowak <luke@nexedi.com>
Date:   Thu Sep 1 14:14:34 2011 +0200

    Start to really sign the content.

    Previously empty strings were signed and verified.
parent 0817910e
......@@ -160,8 +160,9 @@ class NetworkcacheClient(object):
if architecture is not None:
kw['architecture'] = architecture
signature = self._getSignatureString()
data = [kw, signature]
sha_entry = json.dumps(kw)
signature = self._getSignatureString(sha_entry)
data = [sha_entry, signature]
if self.shadir_scheme == 'https':
shadir_connection = httplib.HTTPSConnection(self.shadir_host,
......@@ -205,22 +206,26 @@ class NetworkcacheClient(object):
data = urllib2.urlopen(request).read()
# Filtering...
data_list = json.loads(data)
filtered_data_list = []
if self.signature_certificate_list is not None:
data_list = filter(lambda x: self._verifySignatureInCertificateList(
x[1]), data_list)
if not data_list:
for data in data_list:
if self._verifySignatureInCertificateList(data[0], data[1]):
filtered_data_list.append(data)
if not filtered_data_list:
raise DirectoryNotFound('Could not find a trustable entry.')
else:
filtered_data_list = data_list
if len(data_list) > 1:
if len(filtered_data_list) > 1:
raise DirectoryNotFound('Too many entries for a given key %r. ' \
'Entries: %s.' % (key, str(data_list)))
information_dict, signature = data_list[0]
information_json, signature = filtered_data_list[0]
information_dict = json.loads(information_json)
sha512 = information_dict.get('sha512')
return self.download(sha512)
def _getSignatureString(self):
def _getSignatureString(self, content):
"""
Return the signature based on certification file.
"""
......@@ -229,22 +234,23 @@ class NetworkcacheClient(object):
SignEVP = M2Crypto.EVP.load_key(self.signature_private_key_file)
SignEVP.sign_init()
SignEVP.sign_update('')
SignEVP.sign_update(content)
StringSignature = SignEVP.sign_final()
return StringSignature.encode('base64')
signature = StringSignature.encode('base64')
return signature
def _verifySignatureInCertificateList(self, signature_string):
def _verifySignatureInCertificateList(self, content, signature_string):
"""
Returns true if it can find any valid certificate or false if it does not
find any.
"""
if self.signature_certificate_list is not None:
for certificate in self.signature_certificate_list:
if self._verifySignatureCertificate(signature_string, certificate):
if self._verifySignatureCertificate(content, signature_string, certificate):
return True
return False
def _verifySignatureCertificate(self, signature_string, certificate):
def _verifySignatureCertificate(self, content, signature_string, certificate):
""" verify if the signature is valid for a given certificate. """
certificate_file = tempfile.NamedTemporaryFile()
certificate_file.write(certificate)
......@@ -255,7 +261,7 @@ class NetworkcacheClient(object):
VerifyEVP = M2Crypto.EVP.PKey()
VerifyEVP.assign_rsa(PubKey.get_pubkey().get_rsa())
VerifyEVP.verify_init()
VerifyEVP.verify_update('')
VerifyEVP.verify_update(str(content))
return VerifyEVP.verify_final(signature_string.decode('base64'))
finally:
certificate_file.close()
......
......@@ -131,17 +131,50 @@ def _run_nc(tree, host, port):
class OfflineTest(unittest.TestCase):
shacache_url = 'http://localhost:1/shacache'
shadir_url = 'http://localhost:1/shadir'
host = 'localhost'
port = 1
shacache_path = '/shacache'
shadir_path = '/shadir'
def test_download_offline(self):
nc = slapos.libnetworkcache.NetworkcacheClient('http://127.0.0.1:0',
'http://127.0.0.1:0')
nc = slapos.libnetworkcache.NetworkcacheClient(self.shacache_url,
self.shadir_url)
self.assertRaises(IOError, nc.download, 'sha512sum')
def test_upload_offline(self):
content = tempfile.TemporaryFile()
nc = slapos.libnetworkcache.NetworkcacheClient('http://127.0.0.1:0',
'http://127.0.0.1:0')
nc = slapos.libnetworkcache.NetworkcacheClient(self.shacache_url,
self.shadir_url)
self.assertRaises(IOError, nc.upload, content)
def test_init_method_normal_http_url(self):
"""
Check if the init method is setting the attributes correctly.
"""
nc = slapos.libnetworkcache.NetworkcacheClient(shacache=self.shacache_url,
shadir=self.shadir_url)
self.assertEquals({'Content-Type': 'application/json'}, \
nc.shacache_header_dict)
self.assertEquals(self.host, nc.shacache_host)
self.assertEquals(self.shacache_path, nc.shacache_path)
self.assertEquals(self.port, nc.shacache_port)
self.assertEquals(self.shacache_url, nc.shacache_url)
self.assertEquals({'Content-Type': 'application/json'}, \
nc.shadir_header_dict)
self.assertEquals(self.host, nc.shadir_host)
self.assertEquals(self.shadir_path, nc.shadir_path)
self.assertEquals(self.port, nc.shadir_port)
def test_init_backward_compatible(self):
"""Checks that invocation with minimal parameter works fine"""
nc = slapos.libnetworkcache.NetworkcacheClient(shacache=self.shacache_url,
shadir=self.shadir_url)
self.assertEqual(nc.shacache_url, self.shacache_url)
self.assertTrue(nc.shadir_host in self.shadir_url)
def wait(host, port):
addr = host, port
......@@ -381,6 +414,62 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
except urllib2.HTTPError, error:
self.assertEqual(error.code, httplib.NOT_FOUND)
def test_select_signed_content(self):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.certificate])
signed_nc.upload(self.test_data, key, urlmd5=urlmd5, file_name=file_name)
result = signed_nc.select(key)
self.assertEqual(result.read(), self.test_string)
def test_select_signed_content_server_hacked(self):
key = 'somekey' + str(random.random())
urlmd5 = str(random.random())
file_name = 'my file'
test_data = tempfile.TemporaryFile()
test_string = str(random.random())
test_data.write(test_string)
test_data.seek(0)
key_file = tempfile.NamedTemporaryFile()
key_file.write(self.key)
key_file.flush()
key_file.seek(0)
signed_nc = slapos.libnetworkcache.NetworkcacheClient(
self.shacache, self.shadir, key_file.name, [self.certificate])
signed_nc.upload(self.test_data, key, urlmd5=urlmd5, file_name=file_name)
# Hacker entered the server...
f = os.path.join(self.tree, 'shadir', key)
original_data = open(f).read()
original_json = json.loads(original_data)
hacked_entry_json = json.loads(original_json[0][0])
# ...he modifies something...
hacked_entry_json['file'] = 'hacked'
hacked_json = original_json[:]
# ...and stores...
hacked_json[0][0] = json.dumps(hacked_entry_json)
# ...but as he has no access to key, no way to sign data..
# hacked_json[0][1] is still a good key
open(f, 'w').write(json.dumps(hacked_json))
try:
result = signed_nc.select(key)
except slapos.libnetworkcache.DirectoryNotFound, msg:
# hacked content is not trusted
self.assertEqual(str(msg), 'Could not find a trustable entry.')
def test_select_DirectoryNotFound_too_many_for_key(self):
nc = slapos.libnetworkcache.NetworkcacheClient(self.shacache, self.shadir)
key = 'somekey' + str(random.random())
......@@ -466,6 +555,8 @@ class OnlineTest(OnlineMixin, unittest.TestCase):
self.assertEqual(nc.shadir_key_file, key_file)
@unittest.skipUnless(os.environ.get('TEST_SHA_CACHE', '') != '',
"Requires standalone test server")
class OnlineTestSSLServer(OnlineMixin, unittest.TestCase):
schema = 'https'
......@@ -621,100 +712,3 @@ class GenerateSignatureScriptTest(LibNetworkCacheMixin):
'''
self.assertTrue(os.path.exists(self.signature_certificate_file))
self.assertTrue(os.path.exists(self.signature_private_key_file))
class TestNetworkcacheClient(LibNetworkCacheMixin):
"""
Class to test the slapos.libnetworkcache.NetworkcacheClient implementation.
"""
def setUp(self):
""" Setup the test. """
LibNetworkCacheMixin.setUp(self)
self.host = 'localhost'
self.port = 8000
self.shacache_path = '/shacache'
self.shadir_path = '/shadir'
url = 'http://%s:%s' % (self.host, self.port)
self.shacache_url = url + self.shacache_path
self.shadir_url = url + self.shadir_path
def test_init_backward_compatible(self):
"""Checks that invocation with minimal parameter works fine"""
nc = slapos.libnetworkcache.NetworkcacheClient(shacache=self.shacache_url,
shadir=self.shadir_url)
self.assertEqual(nc.shacache_url, self.shacache_url)
self.assertTrue(nc.shadir_host in self.shadir_url)
def test_init_method_normal_http_url(self):
"""
Check if the init method is setting the attributes correctly.
"""
nc = slapos.libnetworkcache.NetworkcacheClient(shacache=self.shacache_url,
shadir=self.shadir_url)
self.assertEquals({'Content-Type': 'application/json'}, \
nc.shacache_header_dict)
self.assertEquals(self.host, nc.shacache_host)
self.assertEquals(self.shacache_path, nc.shacache_path)
self.assertEquals(self.port, nc.shacache_port)
self.assertEquals(self.shacache_url, nc.shacache_url)
self.assertEquals({'Content-Type': 'application/json'}, \
nc.shadir_header_dict)
self.assertEquals(self.host, nc.shadir_host)
self.assertEquals(self.shadir_path, nc.shadir_path)
self.assertEquals(self.port, nc.shadir_port)
def test_signature_creation_without_private_key_file(self):
"""
Without the private key file, it is not possible to create the
signature so it must return an empty string.
"""
nc = slapos.libnetworkcache.NetworkcacheClient(shacache=self.shacache_url,
shadir=self.shadir_url)
self.assertEquals('', nc._getSignatureString())
def test_signature_creation_with_private_key_file(self):
"""
Check if the signature creation does not have any error.
"""
nc = slapos.libnetworkcache.NetworkcacheClient(
shacache=self.shacache_url,
shadir=self.shadir_url,
signature_private_key_file=self.signature_private_key_file)
self.assertNotEquals('', nc._getSignatureString())
def test_verification_without_signature_certificate_list(self):
"""
Without the signature certificate list it is not possible to
verify if the signature if trusted or not.
So, the _verifySignatureInCertificateList should return False.
"""
nc = slapos.libnetworkcache.NetworkcacheClient(
shacache=self.shacache_url,
shadir=self.shadir_url,
signature_private_key_file=self.signature_private_key_file)
signature_string = nc._getSignatureString()
self.assertFalse(nc._verifySignatureInCertificateList(signature_string))
def test_verification_with_signature_certificate_list(self):
"""
With the signature certificate list it is possible to
verify if the signature if trusted or not.
So, the _verifySignatureInCertificateList should return True
if the signature_string is valid and it should return False if the
signature_string is not correct.
"""
nc = slapos.libnetworkcache.NetworkcacheClient(
shacache=self.shacache_url,
shadir=self.shadir_url,
signature_private_key_file=self.signature_private_key_file,
signature_certificate_list=[
open(self.signature_certificate_file).read()])
signature_string = nc._getSignatureString()
self.assertTrue(nc._verifySignatureInCertificateList(signature_string))
wrong_signature_string = 'InvalidSignatureString'.encode('base64')
result_bool = nc._verifySignatureInCertificateList(wrong_signature_string)
self.assertFalse(result_bool)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment