diff --git a/caucase/wsgi.py b/caucase/wsgi.py index 79d205fce0d6789dae471a916db811e72cac8c50..1bc08e3f31a7dba849010d9ae02a29b5e83bc3c2 100644 --- a/caucase/wsgi.py +++ b/caucase/wsgi.py @@ -98,6 +98,24 @@ class Application(object): """ self._cau = cau self._cas = cas + self._context_dict = { + 'cau': cau, + 'cas': cas, + } + self._routing_dict = { + 'crl': { + 'GET': self.getCRL, + }, + 'csr': { + 'GET': self.getCSR, + 'PUT': self.putCSR, + 'DELETE': self.deleteCSR, + }, + 'crt': { + 'GET': self.getCRT, + 'PUT': self.putCRT, + }, + } def __call__(self, environ, start_response): """ @@ -107,22 +125,26 @@ class Application(object): try: # Convert non-wsgi exceptions into WSGI exceptions path_item_list = [x for x in environ['PATH_INFO'].split('/') if x] try: - context_id, method_id = path_item_list[:2] + context_id, base_path = path_item_list[:2] except ValueError: raise NotFound - if context_id == 'cau': - context = self._cau - elif context_id == 'cas': - context = self._cas - else: - raise NotFound - if method_id.startswith('_'): - raise NotFound try: - method = getattr(self, method_id) - except AttributeError: + context = self._context_dict[context_id] + method_dict = self._routing_dict[base_path] + except KeyError: raise NotFound - status, header_list, result = method(context, environ, path_item_list[2:]) + request_method = environ['REQUEST_METHOD'] + if request_method == 'OPTIONS': + status = STATUS_NO_CONTENT + header_list = [ + ] + result = [] + else: + try: + method = method_dict[request_method] + except KeyError: + raise BadMethod + status, header_list, result = method(context, environ, path_item_list[2:]) except ApplicationError: raise except exceptions.NotFound: @@ -208,14 +230,12 @@ class Application(object): raise BadRequest('Invalid json') @staticmethod - def crl(context, environ, subpath): + def getCRL(context, environ, subpath): """ - Handle /{context}/crl urls. + Handle GET /{context}/crl . """ if subpath: raise NotFound - if environ['REQUEST_METHOD'] != 'GET': - raise BadMethod data = context.getCertificateRevocationList() return ( STATUS_OK, @@ -226,145 +246,153 @@ class Application(object): [data], ) - def csr(self, context, environ, subpath): + def getCSR(self, context, environ, subpath): """ - Handle /{context}/csr urls. + Handle GET /{context}/csr/{csr_id} and GET /{context}/csr. """ - http_method = environ['REQUEST_METHOD'] - if http_method == 'GET': - if subpath: - try: - csr_id, = subpath - except ValueError: - raise NotFound - try: - csr_id = int(csr_id) - except ValueError: - raise BadRequest('Invalid integer') - data = context.getCertificateSigningRequest(csr_id) - content_type = 'application/pkcs10' - else: - self._authenticate(environ) - data = json.dumps(context.getCertificateRequestList()) - content_type = 'application/json' - return ( - STATUS_OK, - [ - ('Content-Type', content_type), - ('Content-Length', str(len(data))), - ], - [data], - ) - elif http_method == 'PUT': - if subpath: - raise NotFound - csr_id = context.appendCertificateSigningRequest(self._read(environ)) - return ( - STATUS_CREATED, - [ - ('Location', str(csr_id)), - ], - [], - ) - elif http_method == 'DELETE': + if subpath: try: csr_id, = subpath except ValueError: raise NotFound - self._authenticate(environ) try: - context.deletePendingCertificateSigningRequest(csr_id) - except exceptions.NotFound: - raise NotFound - return (STATUS_NO_CONTENT, [], []) + csr_id = int(csr_id) + except ValueError: + raise BadRequest('Invalid integer') + data = context.getCertificateSigningRequest(csr_id) + content_type = 'application/pkcs10' + else: + self._authenticate(environ) + data = json.dumps(context.getCertificateRequestList()) + content_type = 'application/json' + return ( + STATUS_OK, + [ + ('Content-Type', content_type), + ('Content-Length', str(len(data))), + ], + [data], + ) + + def putCSR(self, context, environ, subpath): + """ + Handle PUT /{context}/csr . + """ + if subpath: + raise NotFound + csr_id = context.appendCertificateSigningRequest(self._read(environ)) + return ( + STATUS_CREATED, + [ + ('Location', str(csr_id)), + ], + [], + ) + + def deleteCSR(self, context, environ, subpath): + """ + Handle DELETE /{context}/csr/{csr_id} . + """ + try: + csr_id, = subpath + except ValueError: + raise NotFound + self._authenticate(environ) + try: + context.deletePendingCertificateSigningRequest(csr_id) + except exceptions.NotFound: + raise NotFound + return (STATUS_NO_CONTENT, [], []) + + def getCRT(self, context, environ, subpath): + """ + Handle GET /{context}/crt/{crt_id} urls. + """ + try: + crt_id, = subpath + except ValueError: + raise NotFound + if crt_id == 'ca.crt.pem': + data = context.getCACertificate() + content_type = 'application/pkix-cert' + elif crt_id == 'ca.crt.json': + data = json.dumps(context.getValidCACertificateChain()) + content_type = 'application/json' else: - raise BadMethod + try: + crt_id = int(crt_id) + except ValueError: + raise BadRequest('Invalid integer') + data = context.getCertificate(crt_id) + content_type = 'application/pkix-cert' + return ( + STATUS_OK, + [ + ('Content-Type', content_type), + ('Content-Length', str(len(data))), + ], + [data], + ) - def crt(self, context, environ, subpath): + def putCRT(self, context, environ, subpath): """ - Handle /{context}/crt urls. + Handle PUT /{context}/crt/{crt_id} urls. """ - http_method = environ['REQUEST_METHOD'] try: crt_id, = subpath except ValueError: raise NotFound - if http_method == 'GET': - if crt_id == 'ca.crt.pem': - data = context.getCACertificate() - content_type = 'application/pkix-cert' - elif crt_id == 'ca.crt.json': - data = json.dumps(context.getValidCACertificateChain()) - content_type = 'application/json' - else: - try: - crt_id = int(crt_id) - except ValueError: - raise BadRequest('Invalid integer') - data = context.getCertificate(crt_id) - content_type = 'application/pkix-cert' + if crt_id == 'renew': + payload = utils.unwrap( + self._readJSON(environ), + lambda x: x['crt_pem'], + context.digest_list, + ) + data = context.renew( + crt_pem=payload['crt_pem'].encode('ascii'), + csr_pem=payload['renew_csr_pem'].encode('ascii'), + ) return ( STATUS_OK, [ - ('Content-Type', content_type), + ('Content-Type', 'application/pkix-cert'), ('Content-Length', str(len(data))), ], [data], ) - elif http_method == 'PUT': - if crt_id == 'renew': + elif crt_id == 'revoke': + data = self._readJSON(environ) + if data['digest'] is None: + self._authenticate(environ) + payload = utils.nullUnwrap(data) + if 'revoke_crt_pem' not in payload: + context.revokeSerial(payload['revoke_serial']) + return (STATUS_NO_CONTENT, [], []) + else: payload = utils.unwrap( - self._readJSON(environ), - lambda x: x['crt_pem'], + data, + lambda x: x['revoke_crt_pem'], context.digest_list, ) - data = context.renew( - crt_pem=payload['crt_pem'].encode('ascii'), - csr_pem=payload['renew_csr_pem'].encode('ascii'), - ) - return ( - STATUS_OK, - [ - ('Content-Type', 'application/pkix-cert'), - ('Content-Length', str(len(data))), - ], - [data], - ) - elif crt_id == 'revoke': - data = self._readJSON(environ) - if data['digest'] is None: - self._authenticate(environ) - payload = utils.nullUnwrap(data) - if 'revoke_crt_pem' not in payload: - context.revokeSerial(payload['revoke_serial']) - return (STATUS_NO_CONTENT, [], []) - else: - payload = utils.unwrap( - data, - lambda x: x['revoke_crt_pem'], - context.digest_list, - ) - context.revoke( - crt_pem=payload['revoke_crt_pem'].encode('ascii'), - ) - return (STATUS_NO_CONTENT, [], []) - else: - try: - crt_id = int(crt_id) - except ValueError: - raise BadRequest('Invalid integer') - body = self._read(environ) - if not body: - template_csr = None - elif environ.get('CONTENT_TYPE') == 'application/pkcs10': - template_csr = utils.load_certificate_request(body) - else: - raise BadRequest('Bad Content-Type') - self._authenticate(environ) - context.createCertificate( - csr_id=crt_id, - template_csr=template_csr, - ) - return (STATUS_NO_CONTENT, [], []) + context.revoke( + crt_pem=payload['revoke_crt_pem'].encode('ascii'), + ) + return (STATUS_NO_CONTENT, [], []) else: - raise BadMethod + try: + crt_id = int(crt_id) + except ValueError: + raise BadRequest('Invalid integer') + body = self._read(environ) + if not body: + template_csr = None + elif environ.get('CONTENT_TYPE') == 'application/pkcs10': + template_csr = utils.load_certificate_request(body) + else: + raise BadRequest('Bad Content-Type') + self._authenticate(environ) + context.createCertificate( + csr_id=crt_id, + template_csr=template_csr, + ) + return (STATUS_NO_CONTENT, [], [])