Commit 53108963 authored by Tomáš Peterka's avatar Tomáš Peterka Committed by Tomáš Peterka

[hal_json_style] Improve ListBox value resolution on search results

/reviewed-on !521
parent a17ce292
"""Hello. This will be long because this goodness script does almost everything.
In general it always returns a JSON reponse in HATEOAS format specification.
:param REQUEST: HttpRequest holding GET and/or POST data
:param response:
:param view: either "view" or absolute URL of an ERP5 Action
:param mode: {str} help to decide what user wants from us "form" | "search" ...
:param relative_url: an URL of `traversed_document` to operate on (it must have an object_view)
Only in mode == 'search'
:param query: string-serialized Query
:param select_list: list of strings to select from search result object
:param limit: tuple(start_index, num_records) which is further passed to list_method BUT not every list_method takes it into account
:param form_relative_url: {str} relative URL of a form FIELD issuing the search (listbox/relation field...)
it can be None in case of special listboxes like List of Modules
or relative path like "portal_skins/erp5_ui_test/FooModule_viewFooList/listbox"
Only in mode == 'form'
:param form:
Only in mode == 'traverse'
# Form
When handling form, we can expect field values to be stored in REQUEST.form in two forms
- raw string value under key "field_" + <field.id>
- python-object parsed from raw values under <field.id>
"""
from ZTUtils import make_query
import json
from base64 import urlsafe_b64encode, urlsafe_b64decode
......@@ -8,6 +38,11 @@ import time
from email.Utils import formatdate
import re
from zExceptions import Unauthorized
from Products.ERP5Type.Utils import UpperCase
from Products.ZSQLCatalog.SQLCatalog import Query, ComplexQuery
from Products.ERP5Type.Log import log
MARKER = []
if REQUEST is None:
REQUEST = context.REQUEST
......@@ -27,6 +62,13 @@ def byteify(string):
return string
def getProtectedProperty(document, select):
"""getProtectedProperty is a security-aware substitution for builtin `getattr`
It resolves Properties on Products (visible via Zope Formulator), which are
accessible as ordinary attributes as well, by following security rules.
See https://lab.nexedi.com/nexedi/erp5/blob/master/product/ERP5Form/ListBox.py#L2293
"""
try:
#see https://lab.nexedi.com/nexedi/erp5/blob/master/product/ERP5Form/ListBox.py#L2293
try:
......@@ -40,6 +82,193 @@ def getProtectedProperty(document, select):
except:
return None
def selectKwargsForCallable(func, initial_kwargs, kwargs_dict):
"""Create a copy of `kwargs_dict` with only items suitable for `func`.
In case the function cannot state required arguments it throws an AttributeError.
"""
if not hasattr(func, 'params'):
return initial_kwargs
func_param_list = [func_param.strip() for func_param in func.params().split(",")]
func_param_name_list = [func_param if '=' not in func_param else func_param.split('=')[0]
for func_param in func_param_list if '*' not in func_param]
for func_param_name in func_param_name_list:
if func_param_name in kwargs_dict and func_param_name not in initial_kwargs:
initial_kwargs[func_param_name] = kwargs_dict.get(func_param_name)
# MIDDLE-DANGEROUS!
# In case of reports (later even exports) substitute None for unknown
# parameters. We suppose Python syntax for parameters!
# What we do here is literally putting every form field from `kwargs_dict`
# into search method parameters - this is later put back into `kwargs_dict`
# this way we can mimic synchronous rendering when all form field values
# were available in `kwargs_dict`. It is obviously wrong behaviour.
for func_param in func_param_list:
if "*" in func_param:
continue
if "=" in func_param:
continue
# now we have only mandatory parameters
func_param = func_param.strip()
if func_param not in initial_kwargs:
initial_kwargs[func_param] = None
return initial_kwargs
def getUidAndAccessorForAnything(search_result, result_index, traversed_document):
"""Return unique ID, unique URL, getter and hasser for any combination of `search_result` and `index`.
You want to use this method when you need a unique reference to random object in iterable (for example
result of list_method or stat_method). This will give you UID and URL for identification within JIO and
accessors to test/access object's properties.
Usage::
for i, random_object in enumerate(unknown_iterable):
uid, url, getter, hasser = object_ids_and_access(random_object, i)
if hasser(random_object, "linkable"):
result[uid] = {'url': portal.abolute_url() + url}
value = getter(random_object, "value")
"""
if hasattr(search_result, "getObject"):
# search_result = search_result.getObject()
contents_uid = search_result.uid
# every document indexed in catalog has to have relativeUrl
contents_relative_url = getRealRelativeUrl(search_result)
# get property in secure way from documents
search_property_getter = getProtectedProperty
def search_property_hasser (doc, attr):
"""Brains cannot access Properties - they use permissioned getters."""
try:
return doc.hasProperty(attr)
except (AttributeError, Unauthorized) as e:
log('Cannot state ownership of property "{}" on {!s} because of "{!s}"'.format(
attr, doc, e))
return False
elif hasattr(search_result, "aq_self"):
# Zope products have at least ID thus we work with that
contents_uid = search_result.uid
# either we got a document with relativeUrl or we got product and use ID
contents_relative_url = getRealRelativeUrl(search_result) or search_result.getId()
# documents and products have the same way of accessing properties
search_property_getter = getProtectedProperty
search_property_hasser = lambda doc, attr: doc.hasProperty(attr)
else:
# In case of reports the `search_result` can be list of
# PythonScripts.standard._Object - a reimplementation of plain dictionary
# means we are iterating over plain objects
# list_method must be defined because POPOs can return only that
contents_uid = "{}#{:d}".format(list_method, result_index)
# JIO requires every item to have _links.self.href so it can construct
# links to the document. Here we have a object in RAM (which should
# never happen!) thus we provide temporary UID
contents_relative_url = "{}/{}".format(traversed_document.getRelativeUrl(), contents_uid)
# property getter must be simple __getattr__ implementation
search_property_getter = lambda obj, attr: getattr(obj, attr, None)
search_property_hasser = lambda obj, attr: hasattr(obj, attr)
return contents_uid, contents_relative_url, search_property_getter, search_property_hasser
def getAttrFromAnything(search_result, select, search_property_getter, search_property_hasser, kwargs):
"""Given `search_result` extract value named `select` using helper getter/hasser.
:param search_result: any dict-like object (usually dict or Brain or Document)
:param select: field name (can represent actual attributes, Properties or even Scripts)
:param kwargs: available arguments for possible callables hidden under `select`
"""
# if the variable does not have a field template we need to find its
# value by resolving value in the correct order. The code is copy&pasted
# from ListBoxRendererLine.getValueList because it is universal
contents_value = None
if not isinstance(select, (str, unicode)) or len(select) == 0:
log('There is an invalid column name "{!s}"!'.format(select), level=200)
return None
if "." in select:
select = select[select.rindex('.') + 1:]
# 1. resolve attribute on a raw object (all wrappers removed) using
# lowest-level secure getattr method given object type
raw_search_result = search_result
if hasattr(search_result, 'aq_base'):
raw_search_result = search_result.aq_base
if search_property_hasser(raw_search_result, select):
contents_value = search_property_getter(raw_search_result, select)
# 2. use the fact that wrappers (brain or acquisition wrapper) use
# permissioned getters
unwrapped_search_result = search_result
if hasattr(search_result, 'aq_self'):
unwrapped_search_result = search_result.aq_self
if contents_value is None:
if not select.startswith('get') and select[0] not in string.ascii_uppercase:
# maybe a hidden getter (variable accessible by a getter)
accessor_name = 'get' + UpperCase(select)
else:
# or obvious getter (starts with "get" or Capital letter - Script)
accessor_name = select
# again we check on a unwrapped object to avoid acquisition resolution
# which would certainly find something which we don't want
try:
if hasattr(raw_search_result, accessor_name) and callable(getattr(search_result, accessor_name)):
# test on raw object but get the actual accessor using wrapper and acquisition
# do not call it here - it will be done later in generic call part
contents_value = getattr(search_result, accessor_name)
except (AttributeError, KeyError, Unauthorized) as error:
log("Could not evaluate {} nor {} on {} with error {!s}".format(
select, accessor_name, search_result, error), level=100) # WARNING
if contents_value is None and search_property_hasser(search_result, select):
# maybe it is just a attribute
contents_value = search_property_getter(search_result, select)
if contents_value is None:
try:
contents_value = getattr(search_result, select, None)
except (Unauthorized, AttributeError, KeyError) as error:
log("Cannot resolve {} on {!s} because {!s}".format(
select, raw_search_result, error), level=100)
if callable(contents_value):
has_mandatory_param = False
has_brain_param = False
if hasattr(contents_value, "params"):
has_mandatory_param = any(map(lambda param: '=' not in param and '*' not in param,
contents_value.params().split(","))) \
if contents_value.params() \
else False # because any([]) == True
has_brain_param = "brain" in contents_value.params()
try:
if has_mandatory_param:
contents_value = contents_value(search_result)
elif has_brain_param:
contents_value = contents_value(brain=search_result)
else:
contents_value = contents_value()
except (AttributeError, KeyError, Unauthorized) as error:
log("Could not evaluate {} on {} with error {!s}".format(
contents_value, search_result, error), level=100) # WARNING
# make resulting value JSON serializable
if contents_value is not None:
if same_type(contents_value, DateTime()):
# Serialize DateTime
contents_value = contents_value.rfc822()
# XXX Kato: what exactly should the later mean?
elif isinstance(contents_value, datetime.date):
contents_value = formatdate(time.mktime(contents_value.timetuple()))
elif hasattr(contents_value, 'translate'):
contents_value = "%s" % contents_value
return contents_value
url_template_dict = {
"form_action": "%(traversed_document_url)s/%(action_id)s",
"traverse_generator": "%(root_url)s/%(script_id)s?mode=traverse" + \
......@@ -98,6 +327,13 @@ def getFieldDefault(traversed_document, field, key, value=None):
def renderField(traversed_document, field, form, value=None, meta_type=None, key=None, key_prefix=None, selection_params=None):
"""Extract important field's attributes into `result` dictionary."""
if selection_params is None:
selection_params = {}
# some TALES expressions are using Base_getRelatedObjectParameter which requires that
previous_request_field = REQUEST.other.pop('field_id', None)
REQUEST.other['field_id'] = field.id
if meta_type is None:
meta_type = field.meta_type
if key is None:
......@@ -237,11 +473,16 @@ def renderField(traversed_document, field, form, value=None, meta_type=None, key
for (listbox_path, listbox_name) in listbox_ids:
(listbox_form_name, listbox_field_name) = listbox_path.split('/', 2)
form = getattr(context, listbox_form_name)
# do not override "global" `form`
rel_form = getattr(context, listbox_form_name)
# find listbox field
listbox_form_field = filter(lambda f: f.getId() == listbox_field_name, form.get_fields())[0]
listbox_form_field = filter(lambda f: f.getId() == listbox_field_name, rel_form.get_fields())[0]
rel_cache = {'form_id': REQUEST.get('form_id', MARKER), 'field_id': REQUEST.get('field_id', MARKER)}
REQUEST.set('form_id', rel_form.id)
REQUEST.set('field_id', listbox_form_field.id)
# get original definition
subfield = renderField(context, listbox_form_field, form)
subfield = renderField(context, listbox_form_field, rel_form)
# overwrite, like Base_getRelatedObjectParameter does
if subfield["portal_type"] == []:
subfield["portal_type"] = field.get_value('portal_type')
......@@ -263,6 +504,11 @@ def renderField(traversed_document, field, form, value=None, meta_type=None, key
subfield["column_list"].append((tmp_column[0], Base_translateString(tmp_column[1])))
listbox[Base_translateString(listbox_name)] = subfield
for key in rel_cache:
if rel_cache[key] is not MARKER:
REQUEST.set(key, rel_cache[key])
result.update({
"url": relative_url,
"translated_portal_types": translated_portal_type,
......@@ -341,6 +587,33 @@ def renderField(traversed_document, field, form, value=None, meta_type=None, key
)
list_method_custom = None
# Search for non-editable documents - all reports goes here
# Reports have custom search scripts which wants parameters from the form
# thus we introspect such parameters and try to find them in REQUEST
list_method = None
if list_method_name and list_method_name not in ("portal_catalog", "searchFolder", "objectValues"):
# we avoid accessing known protected objects and builtin functions above
try:
list_method = getattr(traversed_document, list_method_name)
except (Unauthorized, AttributeError, ValueError) as error:
# we are touching some specially protected (usually builtin) methods
# which we will not introspect
log('ListBox {!s} list_method {} is unavailable because of "{!s}"'.format(
field, list_method_name, error), level=100)
# Put all ListBox's search method params from REQUEST to `default_param_json`
# because old code expects synchronous render thus having all form's values
# still in the request which is not our case because we do asynchronous rendering
if list_method is not None:
selectKwargsForCallable(list_method, list_method_query_dict, REQUEST)
# Now if the list_method does not specify **kwargs we need to remove
# unwanted parameters like "portal_type" which is everywhere
if hasattr(list_method, 'params') and "**" not in list_method.params():
_param_key_list = tuple(list_method_query_dict.keys()) # copy the keys
for param_key in _param_key_list:
if param_key not in list_method.params(): # we search in raw string
del list_method_query_dict[param_key] # but it is enough
if (editable_column_list):
list_method_custom = url_template_dict["custom_search_template"] % {
"root_url": site_root.absolute_url(),
......@@ -452,7 +725,18 @@ def renderField(traversed_document, field, form, value=None, meta_type=None, key
def renderForm(traversed_document, form, response_dict, key_prefix=None, selection_params=None):
"""
:param selection_params: holds parameters to construct ERP5Form.Selection instance
for underlaying ListBox - since we do not use selections in RenderJS UI
we mitigate the functionality here by overriding ListBox's own values
for columns, editable columns, and sort with those found in `selection_params`
"""
previous_request_other = {
'form_id': REQUEST.other.pop('form_id', None),
'here': REQUEST.other.pop('here', None)
}
REQUEST.set('here', traversed_document)
REQUEST.set('form_id', form.id)
field_errors = REQUEST.get('field_errors', {})
#hardcoded
......@@ -979,131 +1263,202 @@ def calculateHateoas(is_portal=None, is_site_root=None, traversed_document=None,
# select_list: ['int_index', 'id', 'title', ...] (column names to select)
# limit: [15, 16] (begin_index, num_records)
# local_roles: TODO
#
# Default Param JSON contains
# portal_type: list of Portal Types to include (singular form matches the
# catalog column name)
#
# Discussion:
#
# Why you didn't use ListBoxRendererLine?
# > Method 'search' is used for getting related objects as well which are
# > not backed up by a ListBox thus the value resolution would have to be
# > there anyway. It is better to use one code for all in this case.
#################################################
if REQUEST.other['method'] != "GET":
response.setStatus(405)
return ""
# set 'here' for field rendering which contain TALES expressions
REQUEST.set('here', traversed_document)
# in case we have custom list method
catalog_kw = {}
# hardcoded responses for site and portal objects (which are not Documents!)
# we let the flow to continue because the result of a list_method call can
# be similar - they can in practice return anything
if query == "__root__":
sql_list = [site_root]
search_result_iterable = [site_root]
elif query == "__portal__":
sql_list = [portal]
search_result_iterable = [portal]
else:
# otherwise gather kwargs for list_method and get whatever result it gives
callable_list_method = portal.portal_catalog
if list_method:
callable_list_method = getattr(traversed_document, list_method)
catalog_kw = {
"local_roles": local_roles,
"limit": limit,
"sort_on": () # default is empty tuple
"sort_on": () # default is an empty tuple
}
if default_param_json is not None:
catalog_kw.update(byteify(json.loads(urlsafe_b64decode(default_param_json))))
if query:
catalog_kw["full_text"] = query
if sort_on is not None:
if isinstance(sort_on, list):
catalog_kw['sort_on'] = tuple((byteify(sort_col), byteify(sort_order))
for sort_col, sort_order in map(json.loads, sort_on))
def parseSortOn(raw_string):
"""Turn JSON serialized array into a tuple (col_name, order)."""
sort_col, sort_order = json.loads(raw_string)
sort_col, sort_order = byteify(sort_col), byteify(sort_order)
# JIO keeps sort order as whole word 'ascending' resp. 'descending'
if sort_order.lower().startswith("asc"):
sort_order = "ASC"
elif sort_order.lower().startswith("desc"):
sort_order = "DESC"
else:
sort_col, sort_order = json.loads(sort_on)
catalog_kw['sort_on'] = ((byteify(sort_col), byteify(sort_order)), )
# should raise an ValueError instead
log('Wrong sort order "{}" in {}! It must start with "asc" or "desc"'.format(sort_order, form_relative_url),
level=200) # error
return (sort_col, sort_order)
if (list_method is None):
callable_list_method = portal.portal_catalog
if isinstance(sort_on, list):
# sort_on argument is always a list of tuples(col_name, order)
catalog_kw['sort_on'] = list(map(parseSortOn, sort_on))
else:
callable_list_method = getattr(traversed_document, list_method)
catalog_kw['sort_on'] = [parseSortOn(sort_on), ]
sql_list = callable_list_method(**catalog_kw)
# Some search scripts impertinently grab their arguments from REQUEST
# instead of being nice and specify them as their input parameters.
#
# We expect that wise and mighty ListBox did copy all form field values
# from its REQUEST into `default_param_json` so we can put them back.
#
# XXX Kato: Seems that current scripts are behaving nicely (using only
# specified input parameters). In case some list_method does not work
# this is the first place to try to uncomment.
#
# for k, v in catalog_kw.items():
# REQUEST.set(k, v)
result_list = [] # returned "content" of the search
search_result_iterable = callable_list_method(**catalog_kw)
# Cast to list if only one element is provided
editable_field_dict = {}
if select_list is None:
select_list = []
elif same_type(select_list, ""):
select_list = [select_list]
if select_list:
if (form_relative_url is not None):
listbox_field = portal.restrictedTraverse(form_relative_url)
listbox_field_id = listbox_field.id
# form field issuing this search
source_field = portal.restrictedTraverse(form_relative_url) if form_relative_url else None
# extract form field definition into `editable_field_dict`
editable_field_dict = {}
listbox_form = None
listbox_field_id = None
source_field_meta_type = source_field.meta_type if source_field is not None else ""
if source_field_meta_type == "ProxyField":
source_field_meta_type = source_field.getRecursiveTemplateField().meta_type
if source_field is not None and source_field_meta_type == "ListBox":
listbox_field_id = source_field.id
# XXX Proxy field are not correctly handled in traversed_document of web site
listbox_form = getattr(traversed_document, listbox_field.aq_parent.id)
listbox_form = getattr(traversed_document, source_field.aq_parent.id)
# field TALES expression evaluated by Base_getRelatedObjectParameter requires that
REQUEST.other['form_id'] = listbox_form.id
for select in select_list:
# See Listbox.py getValueList --> getEditableField & getColumnAliasList method
tmp = select.replace('.', '_')
if listbox_form.has_field("%s_%s" % (listbox_field_id, tmp), include_disabled=1):
editable_field_dict[select] = listbox_form.get_field("%s_%s" % (listbox_field_id, tmp), include_disabled=1)
# In short: there are Form Field definitions which names start with
# matching ListBox name - those are template fields to be rendered in
# cells with actual values defined by row and column
field_name = "{}_{}".format(listbox_field_id, select.replace(".", "_"))
if listbox_form.has_field(field_name, include_disabled=1):
editable_field_dict[select] = listbox_form.get_field(field_name, include_disabled=1)
# handle the case when list-scripts are ignoring `limit` - paginate for them
if limit is not None and isinstance(limit, (tuple, list)):
start, num_items = map(int, limit)
if len(sql_list) <= num_items:
if len(search_result_iterable) <= num_items:
# the limit was most likely taken into account thus we don't need to slice
start, num_items = 0, len(sql_list)
start, num_items = 0, len(search_result_iterable)
else:
start, num_items = 0, len(sql_list)
for document_index, sql_document in enumerate(sql_list):
if document_index < start:
start, num_items = 0, len(search_result_iterable)
contents_list = [] # resolved fields from the search result
result_dict.update({
'_query': query,
'_local_roles': local_roles,
'_limit': limit,
'_select_list': select_list,
'_embedded': {}
})
# now fill in `contents_list` with actual information
# beware that search_result_iterable can hide anything inside!
for result_index, search_result in enumerate(search_result_iterable):
# skip documents out of `limit`
if result_index < start:
continue
if document_index >= start + num_items:
if result_index >= start + num_items:
break
try:
document = sql_document.getObject()
except AttributeError:
# XXX ERP5 Site is not an ERP5 document
document = sql_document
document_uid = sql_document.uid
document_result = {
'_links': {
# we can render fields which need 'here' to be set to currently rendered document
#REQUEST.set('here', search_result)
contents_item = {}
contents_uid, contents_relative_url, property_getter, property_hasser = \
getUidAndAccessorForAnything(search_result, result_index, traversed_document)
# _links.self.href is mandatory for JIO so it can create reference to the
# (listbox) item alone
contents_item['_links'] = {
'self': {
"href": default_document_uri_template % {
"root_url": site_root.absolute_url(),
# XXX ERP5 Site is not an ERP5 document
"relative_url": getRealRelativeUrl(document) or document.getId(),
"relative_url": contents_relative_url,
"script_id": script.id
},
},
}
}
if editable_field_dict:
document_result['listbox_uid:list'] = {
# ERP5 stores&send the list of editable elements in a hidden field called
# only database results can be editable so it belongs here
if editable_field_dict and listbox_field_id:
contents_item['listbox_uid:list'] = {
'key': "%s_uid:list" % listbox_field_id,
'value': document_uid
'value': contents_uid
}
for select in select_list:
if editable_field_dict.has_key(select):
REQUEST.set('cell', sql_document)
if ('default' in editable_field_dict[select].tales):
tmp_value = None
else:
tmp_value = getProtectedProperty(document, select)
# cell has a Form Field template thus render it using the field
# fields are nice because they are standard
REQUEST.set('cell', search_result)
# if default value is given by evaluating Tales expression then we only
# put "cell" to request (expected by tales) and let the field evaluate
if getattr(editable_field_dict[select].tales, "default", "") == "":
# if there is no tales expr (or is empty) we extract the value from search result
default_field_value = getAttrFromAnything(search_result, select, property_getter, property_hasser, {})
contents_item[select] = renderField(
traversed_document,
editable_field_dict[select],
listbox_form,
value=default_field_value,
key='field_%s_%s' % (editable_field_dict[select].id, contents_uid))
property_value = renderField(
traversed_document, editable_field_dict[select], form, tmp_value,
key='field_%s_%s' % (editable_field_dict[select].id, document_uid))
REQUEST.other.pop('cell', None)
else:
property_value = getProtectedProperty(document, select)
if property_value is not None:
if same_type(property_value, DateTime()):
# Serialize DateTime
property_value = property_value.rfc822()
elif isinstance(property_value, datetime.date):
property_value = formatdate(time.mktime(property_value.timetuple()))
elif getattr(property_value, 'translate', None) is not None:
property_value = "%s" % property_value
document_result[select] = property_value
result_list.append(document_result)
result_dict['_embedded'] = {"contents": result_list}
result_dict['_query'] = query
result_dict['_local_roles'] = local_roles
result_dict['_limit'] = limit
result_dict['_select_list'] = select_list
# most of the complicated magic happens here - we need to resolve field names
# given search_result. This name can unfortunately mean almost anything from
# a key name to Python Script with variable number of input parameters.
contents_item[select] = getAttrFromAnything(search_result, select, property_getter, property_hasser, {'brain': search_result})
# endfor select
contents_list.append(contents_item)
result_dict['_embedded']['contents'] = contents_list
return result_dict
elif mode == 'form':
#################################################
......
......@@ -63,18 +63,30 @@ def simulate(script_id, params_string, code_string):
return decorated
return upperWrap
def createIndexedDocument():
"""Create a Foo document inside Foo module and pass it as "document" argument into wrapped function."""
def wipeFolder(folder):
folder.deleteContent(list(folder.objectIds()))
transaction.commit()
def createIndexedDocument(quantity=1):
"""Create `quantity` Foo document(s) in Foo module and pass it as `document(_list)` argument into the wrapped function."""
def decorator(func):
def wrapped(self, *args, **kwargs):
wipeFolder(self.portal.foo_module)
if quantity <= 1:
kwargs.update(document=self._makeDocument())
else:
kwargs.update(document_list=[self._makeDocument() for _ in range(quantity)])
self.portal.portal_caches.clearAllCache()
self.tic()
try:
return func(self, *args, **kwargs)
finally:
wipeFolder(self.portal.foo_module)
self.tic() # unindex
return wrapped
return decorator
def do_fake_request(request_method, headers=None):
def do_fake_request(request_method, headers=None, data=()):
__version__ = "0.1"
if (headers is None):
headers = {}
......@@ -93,7 +105,25 @@ def do_fake_request(request_method, headers=None):
env['GATEWAY_INTERFACE']='CGI/1.1 '
env['SCRIPT_NAME']='Main'
env.update(headers)
return HTTPRequest(StringIO.StringIO(), env, HTTPResponse())
body_stream = StringIO.StringIO()
# for some mysterious reason QUERY_STRING does not get parsed into data fields
if data and request_method.upper() == 'GET':
# see: GET http://www.cgi101.com/book/ch3/text.html
env['QUERY_STRING'] = '&'.join(
'{}={}'.format(urllib.quote_plus(key), urllib.quote(value))
for key, value in data
)
if data and request_method.upper() == 'POST':
# see: POST request body https://tools.ietf.org/html/rfc1866#section-8.2.1
env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
for key, value in data:
body_stream.write('{}={!s}&'.format(
urllib.quote_plus(key), urllib.quote(value)))
return HTTPRequest(body_stream, env, HTTPResponse())
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
......@@ -952,6 +982,145 @@ class TestERP5Document_getHateoas_mode_search(ERP5HALJSONStyleSkinsMixin):
mode="search",
default_param_json='eyJcdTAwZWEiOiAiXHUwMGU4In0=')
@simulate('Base_getRequestUrl', '*args, **kwargs', 'return "http://example.org/bar"')
@simulate('Base_getRequestHeader', '*args, **kwargs', 'return "application/hal+json"')
@simulate('Test_listObjects', '*args, **kwargs', """
from Products.PythonScripts.standard import Object
return [Object(debit_price=1000.00, credit_price=100.00),
Object(debit_price=10.00, credit_price=0.00)]
""")
@simulate('Test_listProducts', '*args, **kwargs', """
return context.getPortalObject().foo_module.values()
""")
@simulate('Test_listCatalog', '*args, **kwargs', """
return context.getPortalObject().portal_catalog(portal_type='Foo', sort_on=[('id', 'ASC')])
""")
@createIndexedDocument(quantity=2)
@changeSkin('Hal')
def test_getHateoas_exotic_search_results(self, document_list):
"""Test that ingestion of `list_method` result does not fail.
The only limit for the result of `list_method` is that it should be an iterable.
Practically, because we code in python, it can be any object.
"""
fake_request = do_fake_request("GET")
result = self.portal.web_site_module.hateoas.ERP5Document_getHateoas(
REQUEST=fake_request,
mode="search",
local_roles=["Assignor", "Assignee"],
list_method='Test_listObjects',
select_list=['credit_price', 'debit_price']
)
self.assertEquals(fake_request.RESPONSE.status, 200)
self.assertEquals(fake_request.RESPONSE.getHeader('Content-Type'),
"application/hal+json"
)
result_dict = json.loads(result)
self.assertEqual(len(result_dict['_embedded']['contents']), 2)
self.assertEqual(result_dict['_embedded']['contents'][0]['debit_price'], 1000.0)
self.assertEqual(result_dict['_embedded']['contents'][0]['credit_price'], 100.0)
self.assertEqual(result_dict['_embedded']['contents'][1]['debit_price'], 10.0)
self.assertEqual(result_dict['_embedded']['contents'][1]['credit_price'], 0.0)
# Render a Document using Form Field template (only for field 'id')
result = self.portal.web_site_module.hateoas.ERP5Document_getHateoas(
REQUEST=fake_request,
mode="search",
local_roles=["Assignor", "Assignee"],
list_method='Test_listProducts',
select_list=['id'],
form_relative_url='portal_skins/erp5_ui_test/FooModule_viewFooList/listbox'
)
result_dict = json.loads(result)
self.assertEqual(2, len(result_dict['_embedded']['contents']))
self.assertIn("field_listbox", result_dict['_embedded']['contents'][0]['id']['key'])
self.assertEqual("StringField", result_dict['_embedded']['contents'][0]['id']['type'])
self.assertEqual(document_list[0].getId(), result_dict['_embedded']['contents'][0]['id']['default'])
self.assertIn("field_listbox", result_dict['_embedded']['contents'][1]['id']['key'])
self.assertEqual("StringField", result_dict['_embedded']['contents'][1]['id']['type'])
self.assertEqual(document_list[1].getId(), result_dict['_embedded']['contents'][1]['id']['default'])
# Test rendering without form template of attribute, getterm and a script
result = self.portal.web_site_module.hateoas.ERP5Document_getHateoas(
REQUEST=fake_request,
mode="search",
local_roles=["Assignor", "Assignee"],
list_method='Test_listCatalog',
select_list=['title', 'Foo_getLocalTitle', 'getTotalQuantity'] # property, Script, method
)
result_dict = json.loads(result)
self.assertEqual(len(result_dict['_embedded']['contents']), 2)
self.assertEqual(result_dict['_embedded']['contents'][0]['title'].encode('utf-8'), document_list[0].getTitle())
self.assertEqual(result_dict['_embedded']['contents'][0]['Foo_getLocalTitle'], None)
self.assertEqual(result_dict['_embedded']['contents'][0]['getTotalQuantity'], 0)
self.assertEqual(result_dict['_embedded']['contents'][1]['title'].encode('utf-8'), document_list[1].getTitle())
self.assertEqual(result_dict['_embedded']['contents'][1]['Foo_getLocalTitle'], None)
self.assertEqual(result_dict['_embedded']['contents'][1]['getTotalQuantity'], 0)
class TestERP5PDM_getHateoas_mode_search(ERP5HALJSONStyleSkinsMixin):
"""This class allows ticking for Movements to be picked up by activities."""
def afterSetUp(self):
self.folder = getattr(self.portal, 'test_hal_json_folder', None)
if self.folder is None:
self.folder = self.portal.newContent(portal_type='Folder', id='test_hal_json_folder')
self.vendor = self.portal.organisation_module.newContent(
portal_type='Organisation', title="Test Vendor")
self.buyer = self.portal.organisation_module.newContent(
portal_type='Organisation', title="Test Buyer")
self.product = self.portal.product_module.newContent(
portal_type='Product', title="Resource")
self.movement = self.folder.newContent(portal_type='Dummy Movement')
self.movement.edit(
resource_value=self.product,
destination_section_value=self.buyer,
source_section_value=self.vendor,
destination_value=self.buyer,
source_value=self.vendor,
)
self.tic()
def beforeTearDown(self):
self.portal.organisation_module.deleteContent([
self.buyer.getId(), self.vendor.getId()])
self.portal.product_module.deleteContent(self.product.getId())
wipeFolder(self.folder)
self.portal.deleteContent(self.folder.getId())
@simulate('Base_getRequestUrl', '*args, **kwargs', 'return "http://example.org/bar"')
@simulate('Base_getRequestHeader', '*args, **kwargs', 'return "application/hal+json"')
@simulate('Organisation_listInventory', '*args, **kwargs', """
portal = context.getPortalObject()
return portal.portal_simulation.getInventoryList(section_uid=context.getUid())
""")
@changeSkin('Hal')
def test_getHateoas_getInventoryasListMethod(self):
"""Test that `list_method` can resolve dynamic objects from Inventory management.
This test has dependency on erp5_pdm, erp5_trade and base_trade_categories!
"""
fake_request = do_fake_request("GET")
result = self.portal.web_site_module.hateoas.ERP5Document_getHateoas(
REQUEST=fake_request,
mode="search",
relative_url=self.vendor.getRelativeUrl(),
local_roles=["Assignor", "Assignee"],
list_method='Organisation_listInventory',
select_list=['total_price', 'total_quantity']
)
self.assertEquals(fake_request.RESPONSE.status, 200)
self.assertEquals(fake_request.RESPONSE.getHeader('Content-Type'), "application/hal+json")
result_dict = json.loads(result)
self.assertEqual(len(result_dict['_embedded']['contents']), 1)
self.assertEqual(result_dict['_embedded']['contents'][0]['total_price'], 0)
self.assertEqual(result_dict['_embedded']['contents'][0]['total_quantity'],0)
class TestERP5Document_getHateoas_mode_bulk(ERP5HALJSONStyleSkinsMixin):
@simulate('Base_getRequestHeader', '*args, **kwargs',
......@@ -963,6 +1132,7 @@ class TestERP5Document_getHateoas_mode_bulk(ERP5HALJSONStyleSkinsMixin):
self.assertEquals(fake_request.RESPONSE.status, 405)
self.assertEquals(result, "")
@simulate('Base_getRequestUrl', '*args, **kwargs',
'return "http://example.org/bar"')
@simulate('Base_getRequestHeader', '*args, **kwargs',
......@@ -1072,7 +1242,6 @@ class TestERP5Document_getHateoas_mode_worklist(ERP5HALJSONStyleSkinsMixin):
self.assertEquals(fake_request.RESPONSE.status, 405)
self.assertEquals(result, "")
@simulate('Base_getRequestUrl', '*args, **kwargs',
'return "http://example.org/bar"')
@simulate('Base_getRequestHeader', '*args, **kwargs',
......
erp5_full_text_mroonga_catalog
erp5_ui_test_core
erp5_ui_test
erp5_dummy_movement
erp5_configurator_standard_trade_template
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment