Commit b007f41a authored by Julien Muchembled's avatar Julien Muchembled

Reduce number of created temporary files

parent 2bd3bf5b
......@@ -343,80 +343,47 @@ class NetworkcacheClient(object):
filtered_data_list.append(data)
return filtered_data_list
def _openssl(self, input, *args):
p = subprocess.Popen((self.openssl,) + args,
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
output = p.communicate(input)[0]
if p.returncode:
raise subprocess.CalledProcessError(p.returncode, self.openssl, output)
return output
def _getSignatureString(self, content):
"""
Return the signature based on certification file.
"""
if self.signature_private_key_file is None:
return ''
content_file = tempfile.NamedTemporaryFile()
content_file.write(content)
content_file.flush()
content_file.seek(0)
try:
signature = subprocess.check_output([self.openssl, "dgst", "-sha1",
"-sign", self.signature_private_key_file, content_file.name])
return signature.encode('base64')
finally:
content_file.close()
return self._openssl(content, "dgst", "-sha1", "-sign",
self.signature_private_key_file).encode('base64')
def _verifySignatureInCertificateList(self, content, signature_string):
"""
Returns true if it can find any valid certificate or false if it does not
find any.
"""
if self.signature_certificate_list is not None:
for certificate in self.signature_certificate_list:
if self._verifySignatureCertificate(content, signature_string,
certificate):
return True
if self.signature_certificate_list:
with tempfile.NamedTemporaryFile() as signature_file:
signature_file.write(signature_string.decode('base64'))
signature_file.flush()
for certificate in self.signature_certificate_list:
try:
pubkey = self._openssl(certificate, "x509", "-pubkey", "-noout")
with tempfile.NamedTemporaryFile() as pubkey_file:
pubkey_file.write(pubkey)
pubkey_file.flush()
if self._openssl(content, "dgst", "-sha1", "-verify",
pubkey_file.name, "-signature", signature_file.name
).startswith('Verified OK'):
return True
except Exception:
# in case of failure, emit *anything*, but swallow all what possible
traceback.print_exc()
return False
def _verifySignatureCertificate(self, content, signature_string,
certificate):
""" verify if the signature is valid for a given certificate. """
certificate_file = tempfile.NamedTemporaryFile()
certificate_file.write(certificate)
certificate_file.flush()
certificate_file.seek(0)
signature_file = tempfile.NamedTemporaryFile()
signature_file.write(signature_string.decode('base64'))
signature_file.flush()
signature_file.seek(0)
content_file = tempfile.NamedTemporaryFile()
content_file.write(content)
content_file.flush()
content_file.seek(0)
pubkey_file = tempfile.NamedTemporaryFile()
try:
last_output = ''
try:
last_output = subprocess.check_output([self.openssl, "x509", "-pubkey",
"-noout", "-in", certificate_file.name])
pubkey_file.write(last_output)
pubkey_file.flush()
pubkey_file.seek(0)
try:
last_output = subprocess.check_output([self.openssl, "dgst", "-sha1",
"-verify", pubkey_file.name, "-signature", signature_file.name,
content_file.name])
except subprocess.CalledProcessError, e:
# in case if verification failed
last_output = e.output
if last_output.startswith('Verified OK'):
return True
except Exception:
# in case of failure, emit *anything*, but swallow all what possible
print last_output
print traceback.format_exc()
return False
finally:
certificate_file.close()
signature_file.close()
content_file.close()
pubkey_file.close()
class DirectoryNotFound(Exception):
pass
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment