Commit ce262b93 authored by Titouan Soulard's avatar Titouan Soulard

erp5_json_form: SPLIT also validate data to output schema

ALSO: add security on methods
parent 8233b311
......@@ -30,6 +30,8 @@ import jsonschema
from collections import OrderedDict
from zExceptions import InternalError
from erp5.component.document.JSONType import JSONType
from erp5.component.document.TextDocument import TextDocument
......@@ -64,26 +66,33 @@ class JSONForm(JSONType, TextDocument):
def __call__(self, json_data, list_error=False): #pylint:disable=arguments-differ
data_dict = json.loads(json_data)
validation_result = self.validateJSON(data_dict, list_error)
validation_result = self._validateJSON(data_dict, self.getInputJSONSchema(), list_error)
if validation_result is not True:
if not list_error:
raise jsonschema.exceptions.ValidationError(validation_result.message)
else:
raise ValueError(json.dumps(validation_result))
if self.getAfterMethodId():
after_method = getattr(getattr(self, "aq_parent", None), self.getAfterMethodId())
mapped_data_dict = self._mapArguments(data_dict, "input")
# XXX: argument name is wrong
mapped_data_dict["form_reference"] = self
output_dict = after_method(**mapped_data_dict)
if not isinstance(output_dict, dict):
output_dict = {}
mapped_output_dict = self._mapArguments(output_dict, "output")
return json.dumps(mapped_output_dict)
if not self.getAfterMethodId():
raise InternalError("No after method defined in JSON Form")
after_method = getattr(getattr(self, "aq_parent", None), self.getAfterMethodId())
mapped_data_dict = self._mapArguments(data_dict, "input")
# XXX: argument name is wrong
mapped_data_dict["form_reference"] = self
output_dict = after_method(**mapped_data_dict)
if not isinstance(output_dict, dict):
output_dict = {}
mapped_output_dict = self._mapArguments(output_dict, "output")
raise NotImplementedError("No after method")
# Also check output to match expected output schema if defined
validation_result = self._validateJSON(mapped_output_dict, self.getOutputJSONSchema())
if validation_result is not True:
raise InternalError("Returned data do not match expected output schema")
return json.dumps(mapped_output_dict)
security.declarePrivate("_mapArguments")
def _mapArguments(self, arguments, mapping_type):
mappings = {x.getSource(): x.getDestination() for x in self.objectValues(portal_type="Argument Mapping") if x.getMappingType() == mapping_type}
mapped_arguments = {}
......@@ -96,11 +105,12 @@ class JSONForm(JSONType, TextDocument):
return mapped_arguments
def validateJSON(self, json_data, list_error=False):
security.declarePrivate("_validateJSON")
def _validateJSON(self, json_data, schema, list_error=False):
"""
Validate contained JSON with the Schema defined in the Portal Type.
"""
defined_schema = json.loads(self.getInputJSONSchema() or "")
defined_schema = json.loads(schema)
try:
jsonschema.validate(json_data, defined_schema, format_checker=jsonschema.FormatChecker())
except jsonschema.exceptions.ValidationError as err:
......@@ -114,7 +124,12 @@ class JSONForm(JSONType, TextDocument):
return err
return True
def returnSchema(self, schema, path, REQUEST):
security.declarePrivate("_returnSchema")
def _returnSchema(self, schema, path, REQUEST):
# Handle empty schemas (especially output)
if not schema:
schema = "{}"
schema = json.loads(schema, object_pairs_hook=OrderedDict)
# Replace user URL by absolute URL
if "$id" in schema:
......@@ -135,7 +150,7 @@ class JSONForm(JSONType, TextDocument):
"""
Method to retrieve the expected JSON Schema for JSON input
"""
return self.returnSchema(
return self._returnSchema(
self.getOutputSchema(),
"/getOutputJSONSchema",
self.REQUEST
......@@ -147,7 +162,7 @@ class JSONForm(JSONType, TextDocument):
"""
Method to retrieve the expected JSON Schema for JSON output
"""
return self.returnSchema(
return self._returnSchema(
self.getTextContent(),
"/getInputJSONSchema",
self.REQUEST
......
......@@ -29,6 +29,8 @@ import json
import re
from DateTime import DateTime
from zExceptions import InternalError
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.utils import createZODBPythonScript
......@@ -71,6 +73,7 @@ class TestJSONForm(ERP5TypeTestCase):
json_form.edit(
text_content=text_content,
after_method_id=after_method_id,
output_schema=""
)
if self.portal.portal_workflow.isTransitionPossible(json_form, 'validate'):
json_form.validate()
......@@ -123,7 +126,7 @@ class TestJSONForm(ERP5TypeTestCase):
json_form = self.fixJSONForm(method_id, schema, "")
self.tic()
self.assertRaisesRegexp(NotImplementedError, "No after method", json_form, json.dumps(data))
self.assertRaisesRegexp(InternalError, "No after method", json_form, json.dumps(data))
def test_call_invalid_json_list_errors(self):
"""
......@@ -232,6 +235,40 @@ class TestJSONForm(ERP5TypeTestCase):
error[json_form.absolute_url() + "/getInputJSONSchema"] = [['Validation Error', u"u'title' is a required property"]]
self.assertRaisesRegexp(ValueError, re.escape(json.dumps(error)), json_form, json.dumps(data), list_error=True)
def test_raises_on_invalid_output(self):
"""
Raises when returned dictionary does not match output schema.
"""
input_schema = """{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "my-schema.json",
"properties":{
"title": {
"type": "string"
}
}
}"""
output_schema = """{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "my-schema.json",
"properties":{
"invalid": {
"type": "string"
}
},
"required": ["invalid"]
}"""
data = {
"title": "foo"
}
method_id = "test_ERP5Site_processSimpleStringAsJSON"
after_method = self.createBasicScriptreturnJSONWithTimestamp()
json_form = self.fixJSONForm(method_id, input_schema, after_method)
json_form.setOutputSchema(output_schema)
self.tic()
self.assertRaisesRegexp(InternalError, "do not match", json_form, json.dumps(data))
def test_supports_argument_mappings(self):
"""
Ensures arguments mappings can be used properly.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment