Commit 552ffed5 authored by Kazuhiko Shiozaki's avatar Kazuhiko Shiozaki Committed by Jérome Perrin

Zope2: revive patches for Zope2.

parent d1051b63
...@@ -26,7 +26,7 @@ try: ...@@ -26,7 +26,7 @@ try:
from ZPublisher.httpexceptions import HTTPExceptionHandler from ZPublisher.httpexceptions import HTTPExceptionHandler
except ImportError: except ImportError:
# BBB Zope2 # BBB Zope2
from Products.ERP5Type.patches.WSGIPublisher import publish_module from Products.ERP5Type.patches.WSGIPublisherZope2 import publish_module
HTTPExceptionHandler = lambda app: app HTTPExceptionHandler = lambda app: app
......
...@@ -21,16 +21,21 @@ ...@@ -21,16 +21,21 @@
############################################################################## ##############################################################################
import six import six
from Products.ERP5Type import WITH_LEGACY_WORKFLOW from Products.ERP5Type import WITH_LEGACY_WORKFLOW, IS_ZOPE2
# Load all monkey patches # Load all monkey patches
from Products.ERP5Type.patches import WSGIPublisher if IS_ZOPE2: # BBB Zope2
from Products.ERP5Type.patches import WSGIPublisherZope2
else:
from Products.ERP5Type.patches import WSGIPublisher
from Products.ERP5Type.patches import HTTPRequest from Products.ERP5Type.patches import HTTPRequest
from Products.ERP5Type.patches import AccessControl_patch from Products.ERP5Type.patches import AccessControl_patch
from Products.ERP5Type.patches import Restricted from Products.ERP5Type.patches import Restricted
from Products.ERP5Type.patches import m2crypto from Products.ERP5Type.patches import m2crypto
from Products.ERP5Type.patches import ObjectManager from Products.ERP5Type.patches import ObjectManager
from Products.ERP5Type.patches import PropertyManager from Products.ERP5Type.patches import PropertyManager
if IS_ZOPE2: # BBB Zope2
from Products.ERP5Type.patches import TM
from Products.ERP5Type.patches import DA from Products.ERP5Type.patches import DA
if WITH_LEGACY_WORKFLOW: if WITH_LEGACY_WORKFLOW:
from Products.ERP5Type.patches import DCWorkflow from Products.ERP5Type.patches import DCWorkflow
......
...@@ -45,6 +45,12 @@ if six.PY3: ...@@ -45,6 +45,12 @@ if six.PY3:
else: else:
WITH_LEGACY_WORKFLOW = True WITH_LEGACY_WORKFLOW = True
from App.version_txt import getZopeVersion
if getZopeVersion()[0] == 2: # BBB Zope2
IS_ZOPE2 = True
else:
IS_ZOPE2 = False
# We have a name conflict with source_reference and destination_reference, # We have a name conflict with source_reference and destination_reference,
# which are at the same time property accessors for 'source_reference' # which are at the same time property accessors for 'source_reference'
# property, and category accessors (similar to getSourceValue().getReference()) # property, and category accessors (similar to getSourceValue().getReference())
......
...@@ -487,3 +487,16 @@ CachingPolicyManager.addPolicy = addPolicy ...@@ -487,3 +487,16 @@ CachingPolicyManager.addPolicy = addPolicy
CachingPolicyManager._addPolicy = _addPolicy CachingPolicyManager._addPolicy = _addPolicy
CachingPolicyManager.manage_cachingPolicies = DTMLFile( 'cachingPolicies', _dtmldir ) CachingPolicyManager.manage_cachingPolicies = DTMLFile( 'cachingPolicies', _dtmldir )
CachingPolicyManager.getModTimeAndETag = getModTimeAndETag CachingPolicyManager.getModTimeAndETag = getModTimeAndETag
# BBB Zope2
# Make CachingPolicyManager.CPMCache a new style classes already on
# Zope2, so that we can install business templates exported on Zope4 in
# Zope2 instances.
import Products.CMFCore.CachingPolicyManager
_CPMCache = Products.CMFCore.CachingPolicyManager.CPMCache
if not isinstance(_CPMCache, type):
class CPMCache(_CPMCache, object):
pass
CPMCache.__module__ = _CPMCache.__module__
Products.CMFCore.CachingPolicyManager.CPMCache = CPMCache
...@@ -17,9 +17,13 @@ import re ...@@ -17,9 +17,13 @@ import re
try: from IOBTree import Bucket try: from IOBTree import Bucket
except: Bucket=lambda:{} except: Bucket=lambda:{}
from Shared.DC.ZRDB.Aqueduct import decodestring, parse from Shared.DC.ZRDB.Aqueduct import decodestring, parse
from Shared.DC.ZRDB.DA import DA, DatabaseError, SQLMethodTracebackSupplement, getBrain from Shared.DC.ZRDB.DA import DA, DatabaseError, SQLMethodTracebackSupplement
from Shared.DC.ZRDB import RDB from Shared.DC.ZRDB import RDB
from Shared.DC.ZRDB.Results import Results from Shared.DC.ZRDB.Results import Results
try: # BBB Zope 2.12
from App.Extensions import getBrain
except ImportError:
from Shared.DC.ZRDB.DA import getBrain
from AccessControl import ClassSecurityInfo, getSecurityManager from AccessControl import ClassSecurityInfo, getSecurityManager
from Products.ERP5Type.Globals import InitializeClass from Products.ERP5Type.Globals import InitializeClass
from Acquisition import aq_base, aq_parent from Acquisition import aq_base, aq_parent
......
from App.special_dtml import DTMLFile from App.special_dtml import DTMLFile
from OFS.Image import File from OFS.Image import File
from Products.ERP5Type import _dtmldir from Products.ERP5Type import IS_ZOPE2, _dtmldir
def _setData(self, data): def _setData(self, data):
...@@ -18,3 +18,19 @@ def _setData(self, data): ...@@ -18,3 +18,19 @@ def _setData(self, data):
# We call this method to make sure size is set and caches reset # We call this method to make sure size is set and caches reset
self.update_data(data, size=size) self.update_data(data, size=size)
File._setData = _setData File._setData = _setData
if IS_ZOPE2: # BBB Zope2
from OFS.SimpleItem import Item
# Patch for displaying textearea in full window instead of
# remembering a quantity of lines to display in a cookie
manage_editForm = DTMLFile("fileEdit", _dtmldir)
manage_editForm._setName('manage_editForm')
File.manage_editForm = manage_editForm
File.manage = manage_editForm
File.manage_main = manage_editForm
File.manage_editDocument = manage_editForm
File.manage_editForm = manage_editForm
# restore __repr__ after persistent > 4.4
# https://github.com/zopefoundation/Zope/issues/379
File.__repr__ = Item.__repr__
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
from AccessControl import ClassSecurityInfo from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import InitializeClass from Products.ERP5Type.Globals import InitializeClass
from OFS.Folder import Folder from OFS.Folder import Folder
from Products.ERP5Type import Permissions from Products.ERP5Type import IS_ZOPE2, Permissions
""" """
This patch modifies OFS.Folder._setOb to update portal_skins cache when This patch modifies OFS.Folder._setOb to update portal_skins cache when
...@@ -59,16 +59,25 @@ def Folder_isERP5SitePresent(self): ...@@ -59,16 +59,25 @@ def Folder_isERP5SitePresent(self):
Folder.isERP5SitePresent = Folder_isERP5SitePresent Folder.isERP5SitePresent = Folder_isERP5SitePresent
def Folder_zope_quick_start(self): security = ClassSecurityInfo()
security.declareProtected(Permissions.ManagePortal, 'isERP5SitePresent')
if not IS_ZOPE2:
def Folder_zope_quick_start(self):
"""Compatibility for old `zope_quick_start` that is referenced in """Compatibility for old `zope_quick_start` that is referenced in
/index_html (at the root) /index_html (at the root)
""" """
return 'OK' return 'OK'
Folder.zope_quick_start = Folder_zope_quick_start Folder.zope_quick_start = Folder_zope_quick_start
security.declarePublic('zope_quick_start')
security = ClassSecurityInfo()
security.declareProtected(Permissions.ManagePortal, 'isERP5SitePresent')
security.declarePublic('zope_quick_start')
Folder.security = security Folder.security = security
InitializeClass(Folder) InitializeClass(Folder)
if IS_ZOPE2: # BBB Zope2
from OFS.SimpleItem import Item
# restore __repr__ after persistent > 4.4
# https://github.com/zopefoundation/Zope/issues/379
Folder.__repr__ = Item.__repr__
from AccessControl import ClassSecurityInfo from AccessControl import ClassSecurityInfo
from OFS.SimpleItem import SimpleItem from OFS.SimpleItem import SimpleItem
from Products.ERP5Type import IS_ZOPE2
""" """
Very simple volatile-attribute-based caching. Very simple volatile-attribute-based caching.
...@@ -44,6 +45,9 @@ def volatileCached(self, func): ...@@ -44,6 +45,9 @@ def volatileCached(self, func):
self._v_SimpleItem_Item_vCache = cache_dict = {} self._v_SimpleItem_Item_vCache = cache_dict = {}
# Use whole func_code as a key, as it is the only reliable way to identify a # Use whole func_code as a key, as it is the only reliable way to identify a
# function. # function.
if IS_ZOPE2: # BBB Zope2
key = func.func_code
else:
key = func.__code__ key = func.__code__
try: try:
return cache_dict[key] return cache_dict[key]
......
##############################################################################
#
# Copyright (c) 2001 Zope Corporation and Contributors. All Rights Reserved.
# Copyright (c) 2009 Nexedi SARL and Contributors. All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import transaction
from Shared.DC.ZRDB.TM import TM, Surrogate
# ZPublisher error path can aggravate error:
# https://bugs.launchpad.net/bugs/229863
def TM__register(self):
if not self._registered:
#try:
transaction.get().register(Surrogate(self))
self._begin()
self._registered = 1
self._finalize = 0
#except: pass
TM._register = TM__register
# sortKey should return str in transaction 1.4.1 or later.
TM._sort_key = '1'
# Backport (with modified code) from Zope4
##############################################################################
#
# Copyright (c) 2002 Zope Foundation and Contributors.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
""" Python Object Publisher -- Publish Python objects on web servers
"""
import sys
from contextlib import closing
from contextlib import contextmanager
from io import BytesIO
from io import IOBase
import itertools
import logging
from six import binary_type
from six import PY3
from six import reraise
from six import text_type
from six.moves._thread import allocate_lock
import transaction
from AccessControl.SecurityManagement import newSecurityManager
from AccessControl.SecurityManagement import noSecurityManager
from Acquisition import aq_acquire
from Acquisition import aq_inner
from Acquisition import aq_parent
from Products.ERP5Type.Timeout import getPublisherDeadlineValue
from transaction.interfaces import TransientError
from zExceptions import Redirect
from zExceptions import Unauthorized
from zExceptions import upgradeException
from zope.component import queryMultiAdapter
from zope.event import notify
from zope.globalrequest import clearRequest
from zope.globalrequest import setRequest
from zope.publisher.skinnable import setDefaultSkin
from zope.security.management import endInteraction
from zope.security.management import newInteraction
from ZPublisher import pubevents, Retry
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.Iterators import IStreamIterator, IUnboundStreamIterator
from ZPublisher.mapply import mapply
from ZPublisher.WSGIPublisher import call_object
from ZPublisher.WSGIPublisher import missing_name, WSGIResponse
if sys.version_info >= (3, ):
_FILE_TYPES = (IOBase, )
else:
_FILE_TYPES = (IOBase, file) # NOQA
_DEFAULT_DEBUG_MODE = False
_DEFAULT_REALM = None
_MODULE_LOCK = allocate_lock()
_MODULES = {}
AC_LOGGER = logging.getLogger('event.AccessControl')
if 1: # upstream moved WSGIResponse to HTTPResponse.py
def setBody(self, body, title='', is_error=False, lock=None):
# allow locking of the body in the same way as the status
if self._locked_body:
return
if isinstance(body, IOBase):
body.seek(0, 2)
length = body.tell()
body.seek(0)
self.setHeader('Content-Length', '%d' % length)
self.body = body
elif IStreamIterator.providedBy(body):
self.body = body
HTTPResponse.setBody(self, b'', title, is_error)
elif IUnboundStreamIterator.providedBy(body):
self.body = body
self._streaming = 1
HTTPResponse.setBody(self, b'', title, is_error)
else:
HTTPResponse.setBody(self, body, title, is_error)
# Have to apply the lock at the end in case the super class setBody
# is called, which will observe the lock and do nothing
if lock:
self._locked_body = 1
WSGIResponse.setBody = setBody
def write(self, data):
if not self._streaming:
notify(pubevents.PubBeforeStreaming(self))
self._streaming = 1
self._locked_body = 1
self.finalize()
self.stdout.flush()
self.stdout.write(data)
WSGIResponse.write = write
# According to PEP 333, WSGI applications and middleware are forbidden from
# using HTTP/1.1 "hop-by-hop" features or headers. This patch prevents Zope
# from sending 'Connection' and 'Transfer-Encoding' headers.
def _finalize(self):
headers = self.headers
body = self.body
# <patch>
# There's a bug in 'App.ImageFile.index_html': when it returns a 304 status
# code, 'Content-Length' is equal to a nonzero value.
if self.status == 304:
headers.pop('content-length', None)
# Force the removal of "hop-by-hop" headers
headers.pop('Connection', None)
# </patch>
# set 204 (no content) status if 200 and response is empty
# and not streaming
if ('content-type' not in headers and
'content-length' not in headers and
not self._streaming and self.status == 200):
self.setStatus('nocontent')
# add content length if not streaming
content_length = headers.get('content-length')
if content_length is None and not self._streaming:
self.setHeader('content-length', len(body))
# <patch>
# backport from Zope 4.0b1
# (see commit be5b14bd858da787c41a39e2533b0aabcd246fd5)
# </patch>
return '%s %s' % (self.status, self.errmsg), self.listHeaders()
WSGIResponse._finalized = None
def finalize(self):
if not self._finalized:
self._finalized = _finalize(self)
return self._finalized
WSGIResponse.finalize = finalize
# From ZPublisher.utils
def recordMetaData(object, request):
if hasattr(object, 'getPhysicalPath'):
path = '/'.join(object.getPhysicalPath())
else:
# Try hard to get the physical path of the object,
# but there are many circumstances where that's not possible.
to_append = ()
if hasattr(object, '__self__') and hasattr(object, '__name__'):
# object is a Python method.
to_append = (object.__name__,)
object = object.__self__
while object is not None and not hasattr(object, 'getPhysicalPath'):
if getattr(object, '__name__', None) is None:
object = None
break
to_append = (object.__name__,) + to_append
object = aq_parent(aq_inner(object))
if object is not None:
path = '/'.join(object.getPhysicalPath() + to_append)
else:
# As Jim would say, "Waaaaaaaa!"
# This may cause problems with virtual hosts
# since the physical path is different from the path
# used to retrieve the object.
path = request.get('PATH_INFO')
T = transaction.get()
T.note(safe_unicode(path))
auth_user = request.get('AUTHENTICATED_USER', None)
if auth_user:
auth_folder = aq_parent(auth_user)
if auth_folder is None:
AC_LOGGER.warning(
'A user object of type %s has no aq_parent.',
type(auth_user))
auth_path = request.get('AUTHENTICATION_PATH')
else:
auth_path = '/'.join(auth_folder.getPhysicalPath()[1:-1])
user_id = auth_user.getId()
user_id = safe_unicode(user_id) if user_id else u'None'
T.setUser(user_id, safe_unicode(auth_path))
def safe_unicode(value):
if isinstance(value, text_type):
return value
elif isinstance(value, binary_type):
try:
value = text_type(value, 'utf-8')
except UnicodeDecodeError:
value = value.decode('utf-8', 'replace')
return value
def dont_publish_class(klass, request):
request.response.forbiddenError("class %s" % klass.__name__)
def get_module_info(module_name='Zope2'):
global _MODULES
info = _MODULES.get(module_name)
if info is not None:
return info
with _MODULE_LOCK:
module = __import__(module_name)
app = getattr(module, 'bobo_application', module)
realm = _DEFAULT_REALM if _DEFAULT_REALM is not None else module_name
error_hook = getattr(module,'zpublisher_exception_hook', None)
validated_hook = getattr(module,'zpublisher_validated_hook', None)
_MODULES[module_name] = info = (app, realm, _DEFAULT_DEBUG_MODE, validated_hook, error_hook)
return info
def _exc_view_created_response(exc, request, response):
view = queryMultiAdapter((exc, request), name=u'index.html')
parents = request.get('PARENTS')
if view is None and parents:
# Try a fallback based on the old standard_error_message
# DTML Method in the ZODB
view = queryMultiAdapter((exc, request),
name=u'standard_error_message')
root_parent = parents[0]
try:
aq_acquire(root_parent, 'standard_error_message')
except (AttributeError, KeyError):
view = None
if view is not None:
# Wrap the view in the context in which the exception happened.
if parents:
view.__parent__ = parents[0]
# Set status and headers from the exception on the response,
# which would usually happen while calling the exception
# with the (environ, start_response) WSGI tuple.
response.setStatus(exc.__class__)
if hasattr(exc, 'headers'):
for key, value in exc.headers.items():
response.setHeader(key, value)
# Set the response body to the result of calling the view.
response.setBody(view())
return True
return False
@contextmanager
def transaction_pubevents(request, response, err_hook, tm=transaction.manager):
try:
setDefaultSkin(request)
newInteraction()
tm.begin()
notify(pubevents.PubStart(request))
yield
notify(pubevents.PubBeforeCommit(request))
if tm.isDoomed():
tm.abort()
else:
tm.commit()
notify(pubevents.PubSuccess(request))
except Exception as exc:
# Normalize HTTP exceptions
# (For example turn zope.publisher NotFound into zExceptions NotFound)
exc_type, _ = upgradeException(exc.__class__, None)
if not isinstance(exc, exc_type):
exc = exc_type(str(exc))
# Create new exc_info with the upgraded exception.
exc_info = (exc_type, exc, sys.exc_info()[2])
try:
retry = False
try:
# Raise exception from app if handle-errors is False
# (set by zope.testbrowser in some cases)
if request.environ.get('x-wsgiorg.throw_errors', False):
reraise(*exc_info)
if err_hook is not None:
parents = request.get('PARENTS')
if parents:
parents = parents[0]
try:
try:
r = err_hook(parents, request, *exc_info)
assert r is response
exc_view_created = True
except Retry:
if request.supports_retry():
retry = True
else:
r = err_hook(parents, request, *sys.exc_info())
assert r is response
exc_view_created = True
except (Redirect, Unauthorized):
response.exception()
exc_view_created = True
except BaseException as e:
if e is not exc:
raise
exc_view_created = True
else:
# Handle exception view
exc_view_created = _exc_view_created_response(
exc, request, response)
if isinstance(exc, Unauthorized):
# _unauthorized modifies the response in-place. If this hook
# is used, an exception view for Unauthorized has to merge
# the state of the response and the exception instance.
exc.setRealm(response.realm)
response._unauthorized()
response.setStatus(exc.getStatus())
retry = isinstance(exc, TransientError) and request.supports_retry()
finally:
notify(pubevents.PubBeforeAbort(request, exc_info, retry))
tm.abort()
notify(pubevents.PubFailure(request, exc_info, retry))
if retry:
reraise(*exc_info)
if not (exc_view_created or isinstance(exc, Unauthorized)):
reraise(*exc_info)
finally:
# Avoid traceback / exception reference cycle.
del exc, exc_info
finally:
endInteraction()
def publish(request, module_info):
with getPublisherDeadlineValue(request):
obj, realm, debug_mode, validated_hook = module_info
request.processInputs()
response = request.response
if debug_mode:
response.debug_mode = debug_mode
if realm and not request.get('REMOTE_USER', None):
response.realm = realm
noSecurityManager()
# Get the path list.
# According to RFC1738 a trailing space in the path is valid.
path = request.get('PATH_INFO')
request['PARENTS'] = [obj]
obj = request.traverse(path, validated_hook=validated_hook)
notify(pubevents.PubAfterTraversal(request))
recordMetaData(obj, request)
result = mapply(obj,
request.args,
request,
call_object,
1,
missing_name,
dont_publish_class,
request,
bind=1)
if result is not response:
response.setBody(result)
return response
@contextmanager
def load_app(module_info):
app_wrapper, realm, debug_mode, validated_hook = module_info
# Loads the 'OFS.Application' from ZODB.
app = app_wrapper()
try:
yield (app, realm, debug_mode, validated_hook)
finally:
if transaction.manager._txn is not None:
# Only abort a transaction, if one exists. Otherwise the
# abort creates a new transaction just to abort it.
transaction.abort()
app._p_jar.close()
def publish_module(environ, start_response,
_publish=publish, # only for testing
_response=None,
_response_factory=WSGIResponse,
_request=None,
_request_factory=HTTPRequest,
_module_name='Zope2'):
module_info = get_module_info(_module_name)
module_info, err_hook = module_info[:4], module_info[4]
result = ()
path_info = environ.get('PATH_INFO')
if path_info and PY3:
# The WSGI server automatically treats the PATH_INFO as latin-1 encoded
# bytestrings. Typically this is a false assumption as the browser
# delivers utf-8 encoded PATH_INFO. We, therefore, need to encode it
# again with latin-1 to get a utf-8 encoded bytestring.
path_info = path_info.encode('latin-1')
# But in Python 3 we need text here, so we decode the bytestring.
path_info = path_info.decode('utf-8')
environ['PATH_INFO'] = path_info
with closing(BytesIO()) as stdout, closing(BytesIO()) as stderr:
new_response = (
_response
if _response is not None
else _response_factory(stdout=stdout, stderr=stderr))
new_response._http_version = environ['SERVER_PROTOCOL'].split('/')[1]
new_response._server_version = environ.get('SERVER_SOFTWARE')
new_request = (
_request
if _request is not None
else _request_factory(environ['wsgi.input'],
environ,
new_response))
for i in range(getattr(new_request, 'retry_max_count', 3) + 1):
request = new_request
response = new_response
setRequest(request)
try:
with load_app(module_info) as new_mod_info:
with transaction_pubevents(request, response, err_hook):
response = _publish(request, new_mod_info)
break
except TransientError:
if request.supports_retry():
new_request = request.retry()
new_response = new_request.response
else:
raise
finally:
request.close()
clearRequest()
# Start the WSGI server response
status, headers = response.finalize()
start_response(status, headers)
result = response.body
if isinstance(result, _FILE_TYPES):
if response.stdout.getvalue():
raise ValueError(
'Cannot both return a file type and write to response.',
)
elif IUnboundStreamIterator.providedBy(result):
result = itertools.chain(result, (response.stdout.getvalue(), ))
else:
result = (result, response.stdout.getvalue())
for func in response.after_list:
func()
# Return the result body iterable.
return result
sys.modules['ZPublisher.WSGIPublisher'] = sys.modules[__name__]
...@@ -13,10 +13,100 @@ ...@@ -13,10 +13,100 @@
from Shared.DC.ZRDB.sqltest import * from Shared.DC.ZRDB.sqltest import *
from Shared.DC.ZRDB import sqltest from Shared.DC.ZRDB import sqltest
from DateTime import DateTime from DateTime import DateTime
from Products.ERP5Type import IS_ZOPE2
list_type_list = list, tuple, set, frozenset, dict list_type_list = list, tuple, set, frozenset, dict
if 1: # For easy diff with original (ZSQLMethods 3.14) if IS_ZOPE2: # BBB Zope2
def render(self, md):
name=self.__name__
t=self.type
args=self.args
try:
expr=self.expr
if type(expr) is type(''):
v=md[expr]
else:
v=expr(md)
except (KeyError, NameError):
if 'optional' in args and args['optional']:
return ''
raise ValueError('Missing input variable, <em>%s</em>' % name)
# PATCH: use isinstance instead of type comparison, to allow
# subclassing.
if isinstance(v, list_type_list):
if len(v) > 1 and not self.multiple:
raise ValueError(
'multiple values are not allowed for <em>%s</em>'
% name)
else: v=[v]
vs=[]
for v in v:
if not v and type(v) is StringType and t != 'string': continue
if t=='int':
try:
if type(v) is StringType:
if v[-1:]=='L':
v=v[:-1]
atoi(v)
else: v=str(int(v))
except ValueError:
raise ValueError(
'Invalid integer value for <em>%s</em>' % name)
elif t=='float':
if not v and type(v) is StringType: continue
try:
if type(v) is StringType: atof(v)
else: v=str(float(v))
except ValueError:
raise ValueError(
'Invalid floating-point value for <em>%s</em>' % name)
elif t.startswith('datetime'):
# For subsecond precision, use 'datetime(N)' MySQL type,
# where N is the number of digits after the decimal point.
n = 0 if t == 'datetime' else int(t[9])
v = (v if isinstance(v, DateTime) else DateTime(v)).toZone('UTC')
v = "'%s%s'" % (v.ISO(),
('.%06u' % (v.micros() % 1000000))[:1+n] if n else '')
else:
if not isinstance(v, (str, unicode)):
v = str(v)
v=md.getitem('sql_quote__',0)(v)
#if find(v,"\'") >= 0: v=join(split(v,"\'"),"''")
#v="'%s'" % v
vs.append(v)
if not vs and t=='nb':
if 'optional' in args and args['optional']:
return ''
else:
raise ValueError(
'Invalid empty string value for <em>%s</em>' % name)
if not vs:
if self.optional: return ''
raise ValueError(
'No input was provided for <em>%s</em>' % name)
if len(vs) > 1:
vs=join(map(str,vs),', ')
if self.op == '<>':
## Do the equivalent of 'not-equal' for a list,
## "a not in (b,c)"
return "%s not in (%s)" % (self.column, vs)
else:
## "a in (b,c)"
return "%s in (%s)" % (self.column, vs)
return "%s %s %s" % (self.column, self.op, vs[0])
SQLTest.render = SQLTest.__call__ = render
sqltest.valid_type = (('int', 'float', 'string', 'nb', 'datetime') + tuple('datetime(%s)' % x for x in xrange(7))).__contains__
else: # For easy diff with original (ZSQLMethods 3.14)
def render(self, md): def render(self, md):
name = self.__name__ name = self.__name__
...@@ -117,12 +207,12 @@ if 1: # For easy diff with original (ZSQLMethods 3.14) ...@@ -117,12 +207,12 @@ if 1: # For easy diff with original (ZSQLMethods 3.14)
return '%s %s %s' % (self.column, self.op, vs[0]) return '%s %s %s' % (self.column, self.op, vs[0])
SQLTest.render = SQLTest.__call__ = render SQLTest.render = SQLTest.__call__ = render
from builtins import range from builtins import range
new_valid_types = (('int', 'float', 'string', 'nb', 'datetime') + tuple('datetime(%s)' % x for x in range(7))) new_valid_types = (('int', 'float', 'string', 'nb', 'datetime') + tuple('datetime(%s)' % x for x in range(7)))
try: try:
# BBB # BBB
from Shared.DC.ZRDB.sqltest import valid_type from Shared.DC.ZRDB.sqltest import valid_type
sqltest.valid_type = new_valid_types.__contains__ sqltest.valid_type = new_valid_types.__contains__
except ImportError: except ImportError:
sqltest.valid_types = new_valid_types sqltest.valid_types = new_valid_types
...@@ -17,8 +17,83 @@ ...@@ -17,8 +17,83 @@
from Shared.DC.ZRDB.sqlvar import * from Shared.DC.ZRDB.sqlvar import *
from Shared.DC.ZRDB import sqlvar from Shared.DC.ZRDB import sqlvar
from DateTime import DateTime from DateTime import DateTime
from Products.ERP5Type import IS_ZOPE2
if IS_ZOPE2: # BBB Zope2
from string import atoi,atof
if 1: # For easy diff with original (ZSQLMethods 3.14) def render(self, md):
args=self.args
t=args['type']
try:
expr=self.expr
if type(expr) is str: v=md[expr]
else: v=expr(md)
except Exception:
if args.get('optional'):
return 'null'
if type(expr) is not str:
raise
raise ValueError('Missing input variable, <em>%s</em>' % self.__name__)
if v is None and args.get('optional'):
return 'null'
if t=='int':
try:
if type(v) is str:
if v[-1:]=='L':
v=v[:-1]
atoi(v)
return v
return str(int(v))
except Exception:
t = 'integer'
elif t=='float':
try:
if type(v) is str:
if v[-1:]=='L':
v=v[:-1]
atof(v)
return v
# ERP5 patch, we use repr that have better precision than str for
# floats
return repr(float(v))
except Exception:
t = 'floating-point'
elif t.startswith('datetime'):
# For subsecond precision, use 'datetime(N)' MySQL type,
# where N is the number of digits after the decimal point.
n = 0 if t == 'datetime' else int(t[9])
try:
v = (v if isinstance(v, DateTime) else DateTime(v)).toZone('UTC')
return "'%s%s'" % (v.ISO(),
('.%06u' % (v.micros() % 1000000))[:1+n] if n else '')
except Exception:
t = 'datetime'
elif t=='nb' and not v:
t = 'empty string'
else:
v = md.getitem('sql_quote__',0)(
v if isinstance(v, basestring) else str(v))
#if find(v,"\'") >= 0: v=join(split(v,"\'"),"''")
#v="'%s'" % v
return v
if args.get('optional'):
return 'null'
raise ValueError('Invalid %s value for <em>%s</em>: %r'
% (t, self.__name__, v))
valid_type = 'int', 'float', 'string', 'nb', 'datetime'
valid_type += tuple(map('datetime(%s)'.__mod__, xrange(7)))
valid_type = valid_type.__contains__
SQLVar.render = render
SQLVar.__call__ = render
sqlvar.valid_type = valid_type
else: # For easy diff with original (ZSQLMethods 3.14)
def render(self, md): def render(self, md):
name = self.__name__ name = self.__name__
args = self.args args = self.args
...@@ -98,15 +173,15 @@ if 1: # For easy diff with original (ZSQLMethods 3.14) ...@@ -98,15 +173,15 @@ if 1: # For easy diff with original (ZSQLMethods 3.14)
return v return v
# Patched by yo. datetime is added. # Patched by yo. datetime is added.
new_valid_types = 'int', 'float', 'string', 'nb', 'datetime' new_valid_types = 'int', 'float', 'string', 'nb', 'datetime'
new_valid_types += tuple(map('datetime(%s)'.__mod__, range(7))) new_valid_types += tuple(map('datetime(%s)'.__mod__, range(7)))
try: try:
# BBB # BBB
from Shared.DC.ZRDB.sqlvar import valid_type from Shared.DC.ZRDB.sqlvar import valid_type
sqlvar.valid_type = new_valid_types.__contains__ sqlvar.valid_type = new_valid_types.__contains__
except ImportError: except ImportError:
sqlvar.valid_types = new_valid_types sqlvar.valid_types = new_valid_types
SQLVar.render = render SQLVar.render = render
SQLVar.__call__ = render SQLVar.__call__ = render
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment