validate-schema 4.53 KB
#!/usr/bin/env python
import argparse
import json
import os.path
import urllib
from urlparse import urlparse, urlunparse, ParseResult
import jsonschema

# Adapted from slapos.core.git/slapos/slap/util.py
from lxml import etree
def xml2dict(infile):
    result_dict = {}
    for element in etree.parse(infile).iter(tag=etree.Element):
        if element.tag == 'parameter':
            key = element.get('id')
            value = result_dict.get(key, None)
            if value is not None:
                value = value + ' ' + element.text
            else:
                value = element.text
            result_dict[key] = value
    return result_dict

def jsonInXML2dict(infile):
    wrapped = xml2dict(infile)
    if '_' in wrapped:
        return json.loads(wrapped['_'])
    return wrapped

class ValidatingRefResolver(jsonschema.RefResolver):
    def resolve_remote(self, *args, **kw):
        result = super(ValidatingRefResolver, self).resolve_remote(*args, **kw)
        jsonschema.Draft4Validator.check_schema(result)
        return result

DEFAULT_SLAPOS_SCHEMA_PATH = os.path.join(
    os.path.dirname(__file__),
    'schema.json',
)

def main():
    parser = argparse.ArgumentParser(
        description='Validates SlapOS Software Release descriptor schemas, '
        'and optionally validates specific requests and/or responses with '
        'these schemas.',
    )
    parser.add_argument('--software-type', type=str, help='Software type given request/response is intended for')
    parser.add_argument('--request', type=argparse.FileType('r'), help='File containing request parameters')
    parser.add_argument('--response', type=argparse.FileType('r'), help='File containing published values')
    parser.add_argument('--slapos-schema', type=argparse.FileType('r'), help='SlapOS base schema. Default: %s' % (DEFAULT_SLAPOS_SCHEMA_PATH, ))
    parser.add_argument('software_release', nargs=1, help='URL of the software release')
    args = parser.parse_args()

    if args.software_type is None and (
            args.request is not None or
            args.response is not None
        ):
        parser.error(
            '--software-type is required if --request or --response is provided',
        )

    slapos_schema_file = args.slapos_schema
    if slapos_schema_file is None:
        slapos_schema_file = open(DEFAULT_SLAPOS_SCHEMA_PATH)
    slapos_schema = json.load(slapos_schema_file)
    jsonschema.Draft4Validator.check_schema(slapos_schema)

    software_release_url = urlparse(args.software_release[0])
    if software_release_url.scheme == software_release_url.netloc == '':
      # Assume software_release_url.path is a relative path for "file" scheme
      software_release_url = ParseResult(
        'file',
        '',
        os.path.abspath(software_release_url.path),
        software_release_url.params,
        software_release_url.query,
        software_release_url.fragment,
      )
    software_release_base_path, software_release_file = software_release_url.path.rsplit('/', 1)
    def loadurl(relative_url):
        url = urlunparse((
            software_release_url.scheme,
            software_release_url.netloc,
            software_release_base_path + '/' + relative_url,
            software_release_url.params,
            software_release_url.query,
            software_release_url.fragment,
        ))
        return url, json.loads(urllib.urlopen(url).read())

    _, software_release_descriptor = loadurl(software_release_file + '.json')
    jsonschema.Draft4Validator.check_schema(software_release_descriptor)
    jsonschema.validate(software_release_descriptor, slapos_schema)
    load = {
        'xml': xml2dict,
        'json-in-xml': jsonInXML2dict,
    }[software_release_descriptor['serialisation']]
    for software_type_name, software_type in software_release_descriptor['software-type'].iteritems():
        for key in ('request', 'response'):
            relative_url = software_type[key]
            url, software_release_schema = loadurl(relative_url)
            jsonschema.Draft4Validator.check_schema(software_release_schema)
            validator = jsonschema.Draft4Validator(
                schema=software_release_schema,
                resolver=ValidatingRefResolver(
                    base_uri=url,
                    referrer=software_release_schema,
                ),
            )
            infile = getattr(args, key)
            if infile is not None and software_type_name == args.software_type:
                validator.validate(load(infile))
            else:
                validator.validate({})

if __name__ == '__main__':
    main()