Commit 701ca50d authored by Łukasz Nowak's avatar Łukasz Nowak

- simplify usage of subprocess

 - kill only when returncode is none
 - to not play with hiding exceptions during runtime, so
   CertificateGenerationError, as simple ValueError is acceptable for
   simplicity
 - during revoking generate hashed links to the newest CRL


git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@42381 20353a03-c40f-0410-a6d1-a30d3c3de9de
parent d8a95452
......@@ -32,15 +32,22 @@ from Products.ERP5Type.Globals import InitializeClass
from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions
from Products.PageTemplates.PageTemplateFile import PageTemplateFile
from zLOG import LOG, INFO, ERROR
from zLOG import LOG, INFO
import os
import subprocess
import base64
class CertificateGenerationError(Exception):
"""Exception raised when certificate authority failed to work"""
pass
def popenCommunicate(command_list, input=None, **kwargs):
kwargs.update(stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
popen = subprocess.Popen(command_list, **kwargs)
result = popen.communicate(input)[0]
if popen.returncode is None:
popen.kill()
if popen.returncode != 0:
raise ValueError('Issue during calling %r, result was: %r' % (command_list,
result))
return result
class CertificateAuthorityBusy(Exception):
"""Exception raised when certificate authority is busy"""
......@@ -183,23 +190,11 @@ class CertificateAuthorityTool(BaseTool):
csr = os.path.join(self.certificate_authority_path, new_id + '.csr')
cert = os.path.join(self.certificate_authority_path, 'certs', new_id + '.crt')
try:
keygen = subprocess.Popen([self.openssl_binary, 'req', '-nodes', '-config',
popenCommunicate([self.openssl_binary, 'req', '-nodes', '-config',
self.openssl_config, '-new', '-keyout', key, '-out', csr, '-days',
'3650'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE)
result = keygen.communicate('%s\n' % cn)[0]
if keygen.returncode is None or keygen.returncode != 0:
LOG('CertificateAuthorityTool', ERROR, 'Issue during key generation, result was:%r' % result)
keygen.kill()
raise CertificateGenerationError
keysign = subprocess.Popen([self.openssl_binary, 'ca', '-batch', '-config',
self.openssl_config, '-out', cert, '-infiles', csr], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
result = keysign.communicate()[0]
if keysign.returncode is None or keysign.returncode != 0:
LOG('CertificateAuthorityTool', ERROR, 'Issue during key signing, result was:%r' % result)
keygen.kill()
raise CertificateGenerationError
'3650'], '%s\n' % cn, stdin=subprocess.PIPE)
popenCommunicate([self.openssl_binary, 'ca', '-batch', '-config',
self.openssl_config, '-out', cert, '-infiles', csr])
os.unlink(csr)
return dict(
key=open(key).read(),
......@@ -224,27 +219,20 @@ class CertificateAuthorityTool(BaseTool):
self._lockCertificateAuthority()
try:
new_id = open(self.crl, 'r').read().strip().lower()
crl = os.path.join(self.certificate_authority_path, 'crl', new_id + '.crl')
crl_path = os.path.join(self.certificate_authority_path, 'crl')
crl = os.path.join(crl_path, new_id + '.crl')
cert = os.path.join(self.certificate_authority_path, 'certs', serial + '.crt')
if not os.path.exists(cert):
raise ValueError('Certificate with serial %r does not exists' % serial)
try:
crl_update = subprocess.Popen([self.openssl_binary, 'ca', '-config',
self.openssl_config, '-revoke', cert], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
result = crl_update.communicate()[0]
if crl_update.returncode is None or crl_update.returncode != 0:
LOG('CertificateAuthorityTool', ERROR, 'Issue during CRL update, result was:%r' % result)
crl_update.kill()
raise CertificateGenerationError
crl_gen = subprocess.Popen([self.openssl_binary, 'ca', '-config',
self.openssl_config, '-gencrl', '-out', crl], stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
result = crl_gen.communicate()[0]
if crl_gen.returncode is None or crl_gen.returncode != 0:
LOG('CertificateAuthorityTool', ERROR, 'Issue during CRL generation, result was:%r' % result)
crl_gen.kill()
raise CertificateGenerationError
popenCommunicate([self.openssl_binary, 'ca', '-config',
self.openssl_config, '-revoke', cert])
popenCommunicate([self.openssl_binary, 'ca', '-config',
self.openssl_config, '-gencrl', '-out', crl])
hash = popenCommunicate([self.openssl_binary, 'crl', '-noout',
'-hash', '-in', crl]).strip()
previous_id = int(len([q for q in os.listdir(crl_path) if hash in q]))
os.symlink(crl, os.path.join(crl_path, '%s.%s' % (hash, previous_id)))
return dict(crl=open(crl).read())
except:
try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment