Commit 910c93c3 authored by Arnaud Fontaine's avatar Arnaud Fontaine

WIP: Create several commits...

parent c402fc83
......@@ -5,10 +5,7 @@ if brain.getValidationState() == 'embedded':
else:
reference = brain.getReference()
return unicode(
"javascript:SelectFile('%s?format=%s')" % (
reference.replace("'", "\\'"),
context.getPortalObject().portal_preferences.getPreferredImageFormat()
),
'utf-8',
)
return "javascript:SelectFile('%s?format=%s')" % (
reference.replace("'", "\\'"),
context.getPortalObject().portal_preferences.getPreferredImageFormat()
)
\ No newline at end of file
......@@ -1920,12 +1920,17 @@ document.write('<sc'+'ript type="text/javascript" src="http://somosite.bg/utb.ph
</html>
"""
web_page.edit(text_content=html_content)
from HTMLParser import HTMLParseError
try:
if six.PY3:
web_page.asStrippedHTML()
except HTMLParseError:
expectedFailure(self.fail)(
'Even BeautifulSoup is not able to parse such HTML')
# TODO(emmuvouriot): this test shouldn't exist anymore in Py3 since HTMLParseError has been removed
# The rationale being that the HTML parser never fails (except in strict mode, which is not used here).
else:
from six.moves.html_parser import HTMLParseError
try:
web_page.asStrippedHTML() # TODO(emmuvouriot): does this even attempt to parse the code? Can the exception ever be thrown?
except HTMLParseError:
expectedFailure(self.fail)(
'Even BeautifulSoup is not able to parse such HTML')
def test_safeHTML_unknown_codec(self):
"""Some html declare unknown codecs.
......
......@@ -1684,10 +1684,12 @@ class TestERP5Document_getHateoas_mode_search(ERP5HALJSONStyleSkinsMixin):
def test_getHateoas_default_param_json_param(self):
fake_request = do_fake_request("GET")
unknown_columns_re = re.escape("Unknown columns ['ê']")
if six.PY2:
unknown_columns_re = "Unknown columns.*\\\\xc3\\\\xaa.*"
self.assertRaisesRegex(
TypeError,
# "Unknown columns.*'\\xc3\\xaa'.",
"Unknown columns.*\\\\xc3\\\\xaa.*",
unknown_columns_re,
self.portal.web_site_module.hateoas.ERP5Document_getHateoas,
REQUEST=fake_request,
mode="search",
......
......@@ -32,12 +32,49 @@ if six.PY2:
from email import message_from_string as message_from_bytes
else:
from email import message_from_bytes
from email.generator import Generator
from Products.ERP5Type.tests.ERP5TypeLiveTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.Sequence import SequenceList
from Products.ERP5Type.Utils import bytes2str, str2bytes
from Products.ZSQLCatalog.SQLCatalog import SimpleQuery
from DateTime import DateTime
from six import StringIO
import re
def normalize_email_bytes(email_bytes):
# type: (bytes) -> str
"""
Normalizes the representation of email text, so that it can be compared.
The fields of the message are written in a predefined order, with
the `unixfrom` field removed and no line wrapping.
The code is intended to be compatible with both Python 2 and Python 3.
Args:
email_bytes: Content of the email, including headers.
Returns:
Normalized string representation of the e-mail contents.
"""
# Unfolding removes newlines followed by whitespace, as per RFC5322.
# This SHOULD be done by Python itself, but seemingly no-one cared
# enough.
email_bytes_unfolded = re.sub(
br"\r?\n(?P<space>\s)",
br"\g<space>",
email_bytes,
)
msg = message_from_bytes(email_bytes_unfolded)
fp = StringIO()
g = Generator(fp, mangle_from_=False, maxheaderlen=0)
g.flatten(msg)
return fp.getvalue()
class TestInterfacePost(ERP5TypeTestCase):
"""
......@@ -252,7 +289,10 @@ class TestInterfacePost(ERP5TypeTestCase):
last_message, = self.portal.MailHost._message_list
self.assertNotEqual((), last_message)
_, _, message_text = last_message
self.assertIn(message_text, sequence['internet_message_post'].getData())
self.assertEqual(
normalize_email_bytes(message_text),
normalize_email_bytes(sequence['internet_message_post'].getData()),
)
def _getMailHostMessageForRecipient(self, recipient_email_address):
message_list = self.portal.MailHost._message_list
......@@ -273,7 +313,10 @@ class TestInterfacePost(ERP5TypeTestCase):
self.assertEqual(len(message_list), 1)
message = message_list[0]
_, _, message_text = message
self.assertIn(message_text, post.getData())
self.assertEqual(
normalize_email_bytes(message_text),
normalize_email_bytes(post.getData()),
)
def stepCheckMailMessagePreviewDisplaysLatestInternetMessagePostData(self, sequence=None, sequence_list=None):
mail_message = sequence['mail_message']
......
......@@ -82,7 +82,7 @@ class JSONForm(JSONType, TextDocument):
if list_error:
validator = jsonschema.validators.validator_for(defined_schema)(defined_schema, format_checker=jsonschema.FormatChecker())
return {
defined_schema["$id"].decode(): [
defined_schema["$id"]: [
("Validation Error", x.message) for x in sorted(validator.iter_errors(json_data), key=lambda e: e.path)
]
}
......
......@@ -70,7 +70,7 @@ from DateTime import DateTime
from Products.ERP5Type import Permissions
from Products.ERP5Type.Message import translateString
from Products.ERP5Type.UnrestrictedMethod import super_user
from Products.ERP5Type.Utils import bytes2str
from Products.ERP5Type.Utils import bytes2str, str2bytes, unicode2str, str2unicode
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Security.ERP5GroupManager import (
disableCache as ERP5GroupManager_disableCache,
......@@ -92,6 +92,14 @@ from Products.ERP5Security.ERP5OAuth2ResourceServerPlugin import (
)
from ZPublisher.HTTPResponse import HTTPResponse
def ensure_ascii(s):
if six.PY2:
return s.encode('ascii')
else:
if isinstance(s, str):
s = bytes(s, 'ascii')
return s.decode('ascii')
_DEFAULT_BACKEND = default_backend()
_SIGNATURE_ALGORITHM_TO_KEY_BYTE_LENGTH_DICT = {
'HS256': 32,
......@@ -440,7 +448,7 @@ class _ERP5AuthorisationEndpoint(AuthorizationEndpoint):
}
for x in (
portal.portal_categories.resolveCategory(
'oauth2_scope/' + y.encode('utf-8'),
'oauth2_scope/' + unicode2str(y),
)
for y in scope_list
)
......@@ -528,7 +536,7 @@ class _ERP5RequestValidator(RequestValidator):
def _getClientValue(self, client_id):
try:
result = self._authorisation_server_connector_value[client_id.encode('utf-8')]
result = self._authorisation_server_connector_value[unicode2str(client_id)]
except KeyError:
return
if result.getValidationState() == 'validated':
......@@ -691,7 +699,7 @@ class _ERP5RequestValidator(RequestValidator):
client_value=request.client.erp5_client_value,
redirect_uri=request.redirect_uri,
scope_list=[
x.encode('utf-8')
unicode2str(x)
for x in request.scopes
],
code_challenge=request.code_challenge,
......@@ -851,7 +859,7 @@ def _callEndpoint(endpoint, self, REQUEST):
# not have to care about intermediate proxies).
request_header_dict['X_FORWARDED_FOR'] = REQUEST.getClientAddr()
request_body = REQUEST.get('BODY')
if request_body is None and content_type == 'application/x-www-form-urlencoded':
if (not request_body) and content_type == 'application/x-www-form-urlencoded':
# XXX: very imperfect, but should be good enough for OAuth2 usage:
# no standard OAuth2 POST field should be marshalled by Zope.
request_body = urlencode([
......@@ -1276,7 +1284,7 @@ class OAuth2AuthorisationServerConnector(XMLObject):
continue
else:
token_dict[JWT_PAYLOAD_KEY] = decodeAccessTokenPayload(
token_dict[JWT_PAYLOAD_KEY].encode('ascii'),
ensure_ascii(token_dict[JWT_PAYLOAD_KEY]),
)
return token_dict
raise # pylint:disable=misplaced-bare-raise
......@@ -1308,7 +1316,7 @@ class OAuth2AuthorisationServerConnector(XMLObject):
Validate non-standard jwt claims against request.
"""
if not isAddressInNetworkList(
address=request.headers['X_FORWARDED_FOR'].decode('utf-8'),
address=str2unicode(request.headers['X_FORWARDED_FOR']),
network_list=token[JWT_CLAIM_NETWORK_LIST_KEY],
):
raise jwt.InvalidTokenError
......@@ -1391,9 +1399,9 @@ class OAuth2AuthorisationServerConnector(XMLObject):
def _getSessionValueFromTokenDict(self, token_dict):
session_value = self._getSessionValue(
token_dict[JWT_PAYLOAD_KEY][
unicode2str(token_dict[JWT_PAYLOAD_KEY][
JWT_PAYLOAD_AUTHORISATION_SESSION_ID_KEY
].encode('utf-8'),
]),
'validated',
)
if session_value is not None:
......@@ -1680,15 +1688,15 @@ class OAuth2AuthorisationServerConnector(XMLObject):
(
now,
access_token_signature_algorithm,
private_key.private_bytes(
ensure_ascii(private_key.private_bytes(
encoding=Encoding.PEM,
format=PrivateFormat.PKCS8,
encryption_algorithm=NoEncryption(),
).encode('ascii'),
private_key.public_key().public_bytes(
)),
ensure_ascii(private_key.public_key().public_bytes(
encoding=Encoding.PEM,
format=PublicFormat.SubjectPublicKeyInfo,
).encode('ascii'),
)),
),
) + tuple(
x
......
......@@ -51,7 +51,7 @@ from OFS.Traversable import NotFound
from Products.ERP5Type import Permissions
from Products.ERP5Type.XMLObject import XMLObject
from Products.ERP5Type.Timeout import getTimeLeft
from Products.ERP5Type.Utils import bytes2str, str2bytes
from Products.ERP5Type.Utils import bytes2str, str2bytes, unicode2str, str2unicode
from Products.ERP5Security.ERP5OAuth2ResourceServerPlugin import (
OAuth2AuthorisationClientConnectorMixIn,
ERP5OAuth2ResourceServerPlugin,
......@@ -205,7 +205,7 @@ class _OAuth2AuthorisationServerProxy(object):
self._bind_address = (bind_address, 0) if bind_address else None
if ca_certificate_pem is not None:
# On python2 cadata is expected as an unicode object only.
ca_certificate_pem = ca_certificate_pem.decode('utf-8')
ca_certificate_pem = str2unicode(ca_certificate_pem)
self._ca_certificate_pem = ca_certificate_pem
#
......@@ -320,7 +320,7 @@ class _OAuth2AuthorisationServerProxy(object):
def getAccessTokenSignatureAlgorithmAndPublicKeyList(self):
return tuple(
(signature_algorithm.encode('ascii'), public_key.encode('ascii'))
(unicode2str(signature_algorithm), unicode2str(public_key))
for signature_algorithm, public_key in self._queryERP5(
'getAccessTokenSignatureAlgorithmAndPublicKeyList',
)
......@@ -795,7 +795,7 @@ class OAuth2AuthorisationClientConnector(
# done above), this means the key may be attacked using (partially)
# chosen-cleartext (if AES128 is found vulnerable to such attack).
_STATE_CAME_FROM_NAME: (
came_from.decode('utf-8')
str2unicode(came_from)
if came_from else
came_from
),
......@@ -803,7 +803,7 @@ class OAuth2AuthorisationClientConnector(
# Authorisation Code converted into tokens. To be kept secret from
# everyone other than this server.
_STATE_CODE_VERIFIER_NAME: code_verifier,
})),
})))),
),
('code_challenge_method', 'S256'),
(
......@@ -889,7 +889,7 @@ class OAuth2AuthorisationClientConnector(
came_from = state_dict.get(_STATE_CAME_FROM_NAME)
if came_from:
context = self # whatever
kw['redirect_url'] = came_from.encode('utf-8')
kw['redirect_url'] = unicode2str(came_from)
else:
context = self._getNeutralContextValue()
context.Base_redirect(**kw)
......@@ -937,7 +937,7 @@ class OAuth2AuthorisationClientConnector(
REQUEST=REQUEST,
RESPONSE=RESPONSE,
)
identifier_from_state = state_dict[_STATE_IDENTIFIER_NAME].encode('ascii')
identifier_from_state = unicode2str(state_dict[_STATE_IDENTIFIER_NAME])
for (
state_cookie_name,
identifier_from_cookie,
......@@ -972,7 +972,7 @@ class OAuth2AuthorisationClientConnector(
'code': code,
'redirect_uri': self.getRedirectUri(),
'client_id': self.getReference(),
'code_verifier': state_dict[_STATE_CODE_VERIFIER_NAME].encode('ascii'),
'code_verifier': unicode2str(state_dict[_STATE_CODE_VERIFIER_NAME])
},
)
access_token, _, error_message = self._setCookieFromTokenResponse(
......
import six
import time
request = container.REQUEST
......@@ -48,8 +49,10 @@ elif code is not None:
"""
if "error" in response_dict:
return handleError(response_dict.get('error'), response_dict.get('error_description'), state)
access_token = response_dict['access_token'].encode('utf-8')
hash_str = context.Base_getHMAC(access_token, access_token)
access_token = response_dict['access_token']
if six.PY2:
access_token = access_token.encode('utf-8')
hash_str = context.Base_getHMAC(access_token.encode('utf-8'), access_token.encode('utf-8'))
context.setAuthCookie(response, '__ac_openidconnect_hash', hash_str)
# store timestamp in second since the epoch in UTC is enough
response_dict["response_timestamp"] = time.time()
......
from Products.ERP5Type.Utils import str2unicode
import unicodedata
data = unicodedata.normalize('NFKD', context.getTextContent().decode('utf-8')).encode('iso-8859-1', 'ignore')
data = unicodedata.normalize('NFKD', str2unicode(context.getTextContent())).encode('iso-8859-1', 'ignore')
# Update sending mode "on the fly"
dsn_line_list = data.split(b'\n')
......
......@@ -53,7 +53,7 @@ def extractTest(text):
# Include Macros as it is defined by the user.
testcode += row[0].text
else:
testcode += lxml.html.tostring(row)
testcode += lxml.html.tostring(row, encoding='unicode')
return testcode.strip()
"""
......
......@@ -62,6 +62,10 @@ class TestTradeModelLineMixin(TestBPMMixin, UserDict):
order_date = DateTime()
amount_generator_line_portal_type = 'Trade Model Line'
# XXX so that unittest.suite._isnotsuite return False
def __iter__(self):
raise TypeError()
def setBaseAmountQuantityMethod(self, base_amount_id, text):
"""Populate TradeModelLine_getBaseAmountQuantityMethod shared script
......@@ -416,6 +420,7 @@ class TestTradeModelLine(TestTradeModelLineMixin):
expected_result_dict = self[delivery.getPath()]
for line in delivery.getMovementList():
currency_precision = line.getPricePrecision()
simulation_movement_list_list = self.getTradeModelSimulationMovementList(line)
self.assertEqual(len(simulation_movement_list_list), 1)
simulation_movement_list = simulation_movement_list_list[0]
......@@ -426,7 +431,7 @@ class TestTradeModelLine(TestTradeModelLineMixin):
for use in 'discount', 'tax':
total_price = expected_result_dict[use].get(line.getId()) or 0.0
sm = result_dict.pop(use)
self.assertEqual(str(sm.getTotalPrice() or 0.0), str(total_price))
self.assertEqual(round(sm.getTotalPrice() or 0.0, currency_precision), round(total_price, currency_precision))
self.assertEqual(3, len(sm.getCausalityValueList()))
self.assertEqual(1, len(sm.getCausalityValueList(
portal_type=self.business_link_portal_type)))
......@@ -466,12 +471,12 @@ class TestTradeModelLine(TestTradeModelLineMixin):
rounded_total_price = round(line_dict['normal'], currency_precision)
rounded_tax_price = round(line_dict['tax'], currency_precision)
rounded_discount_price = round(line_dict['discount'], currency_precision)
self.assertEqual(str(abs(line_dict['payable_receivable'])),
str(rounded_total_price + rounded_tax_price + rounded_discount_price))
self.assertEqual(str(abs(line_dict['vat'])),
str(rounded_tax_price))
self.assertEqual(str(abs(line_dict['income_expense'])),
str(rounded_total_price + rounded_discount_price))
self.assertEqual(round(abs(line_dict['payable_receivable']), currency_precision),
round(rounded_total_price + rounded_tax_price + rounded_discount_price, currency_precision))
self.assertEqual(round(abs(line_dict['vat']), currency_precision),
rounded_tax_price)
self.assertEqual(round(abs(line_dict['income_expense']), currency_precision),
round(rounded_total_price + rounded_discount_price, currency_precision))
def buildPackingLists(self):
self.portal.portal_alarms.packing_list_builder_alarm.activeSense()
......
# -*- coding: utf-8 -*-
import six
from Products.PortalTransforms.interfaces import ITransform
from zope.interface import implementer
from erp5.component.module.TransformLib import DocumentConversionServerTransform
......@@ -19,11 +20,13 @@ class TransformHtmlToPdf(DocumentConversionServerTransform):
# (https://lab.nexedi.com/nexedi/cloudooo/merge_requests/20)
return 'html' if mimetype == 'text/html' else 'pdf'
def convert(self, *args, **kwargs):
def convert(self, orig, *args, **kwargs):
# wkhtmltopdf handler currently requires conversion_kw (hack in convertFile())...
if 'conversion_kw' not in kwargs:
kwargs['conversion_kw'] = {'encoding': 'utf-8'}
return DocumentConversionServerTransform.convert(self, *args, **kwargs)
if six.PY3 and isinstance(orig, str):
orig = orig.encode()
return DocumentConversionServerTransform.convert(self, orig, *args, **kwargs)
def register():
return TransformHtmlToPdf()
\ No newline at end of file
......@@ -148,7 +148,9 @@ for attachment in attachment_list:
for key, value in attachment.get("add_header_list", []):
part.add_header(key, value)
if attachment.get("filename", None) is not None:
part.add_header("Content-Disposition", "attachment", attachment["filename"])
# XXX disable too-many-function-args because there is no error with this code,
# but it might just be not tested.
part.add_header("Content-Disposition", "attachment", attachment["filename"]) # pylint:disable=too-many-function-args
outer.attach(part)
#return outer.as_string()
......
# This script has 'Anonymous' proxy role to check 'View' permission for Anonymous.
return 'format' in context.REQUEST and context.getPortalObject().portal_membership.checkPermission('View', context)
if 'format' in container.REQUEST:
from zExceptions import Unauthorized
portal = context.getPortalObject()
try:
return portal.portal_membership.checkPermission('View', context)
except Unauthorized: # six.PY3:
pass
return False
......@@ -260,6 +260,7 @@ class TestERP5Web(ERP5TypeTestCase):
page.edit(text_content='<p>Hé Hé Hé!</p>', content_type='text/html')
self.tic()
self.assertEqual('Hé Hé Hé!', page.asText().strip())
self.assertIn('Hé Hé Hé!', page.getSearchableText())
def test_WebPageAsTextHTMLEntities(self):
"""Check if Web Page's asText() converts html entities properly
......
"""Returns the `text_content` that should be set on the translation data script for this RJS website.
"""
import json
from Products.ERP5Type.Utils import str2unicode, unicode2str
portal = context.getPortalObject()
Base_translateString = context.Base_translateString
......@@ -38,14 +39,13 @@ tmp = {}
for language in context.getAvailableLanguageSet():
tmp[language] = {}
for word in translatable_message_set:
tmp[language][word] = unicode(Base_translateString(word, lang = language), 'utf-8')
tmp[language][word] = str2unicode(Base_translateString(word, lang = language))
# We pass unicode to this json.dump(ensure_ascii=False), so that it produce
# UTF-8 string and not escaped characters. At the end we return an UTF-8
# encoded string and not an unicode instance, because text_content property
# is usually UTF-8 encoded str (not unicode).
return (u"""/**
return unicode2str(u"""/**
* This translation data is generated automatically and updated with upgrader in post-upgarde.
* Do not edit manually, but use "Update Translation Data" action on web site to update from
* Localizer and from data-i18n tags on web pages.
......@@ -64,4 +64,4 @@ return (u"""/**
sort_keys=True,
indent=2,
ensure_ascii=False,
separators=(',', ': ')).splitlines()))).encode('utf-8')
separators=(',', ': ')).splitlines())))
last_message = context.getLastLog()[-1]
line_list = filter(None, last_message.replace("=\n","").split("\n"))
for line in line_list:
if "http" in line:
return context.REQUEST.RESPONSE.redirect(line.replace("=3D", "="))
raise RuntimeError("URL not found in the email")
import re
last_message_text = context.getMessageList()[-1][2]
return container.REQUEST.RESPONSE.redirect(re.findall(r"http.*", last_message_text)[0])
......@@ -86,7 +86,9 @@ def File_viewAsWeb(self):
# For Pdata type, we must iterate and send chunk by chunk.
# And no need to continue if the client closed the connection.
while data and not RESPONSE.stdout._channel.closed:
while data:
if six.PY2 and RESPONSE.stdout._channel.closed:
break
# Send data to the client.
RESPONSE.write(data.data)
# Load next object without keeping previous chunks in memory.
......
......@@ -80,6 +80,7 @@ def WebSection_setObject(self, id, ob, **kw):
Make any change related to the file uploaded.
"""
portal = self.getPortalObject()
ob = ob.getOriginalDocument()
data = self.REQUEST.get('BODY')
try:
metadata, signature = loads(data)
......@@ -128,7 +129,13 @@ def WebSection_putFactory(self, name, typ, body):
document = portal.portal_contributions.newContent(data=body,
filename=name,
discover_metadata=False)
return document
# return a document for which getId() returns the name for _setObject to be
# called with id=name ( for WebSection_setObject ), but for which
# getRelativeUrl returns the relative url of the real document, for
# VirtualFolderMixin transactional variable cache between _setObject and
# _getOb
return document.asContext(getId=lambda: name)
# The following scripts are helpers to search & clean up shadir entries.
# XXX: Due to lack of View skin for shadir, external methods are currently
......
......@@ -105,19 +105,23 @@ class TestShaDir(ShaDirMixin, ERP5TypeTestCase):
data_set = self.portal.portal_catalog.getResultValue(
reference=self.key)
self.assertEqual(self.key, data_set.getReference())
self.assertNotEqual(self.key, data_set.getId())
self.assertEqual('published', data_set.getValidationState())
self.assertEqual(len(self.portal.data_set_module.contentValues()), 1)
# Asserting Document
document = self.portal.portal_catalog.getResultValue(
reference=self.sha512sum)
self.assertEqual(self.sha512sum, document.getTitle())
self.assertEqual(self.sha512sum, document.getReference())
self.assertNotEqual(self.sha512sum, document.getId())
self.assertEqual(self.data, document.getData())
self.assertEqual(data_set, document.getFollowUpValue())
self.assertEqual(str(self.expiration_date),
str(document.getExpirationDate()))
self.assertEqual('application/json', document.getContentType())
self.assertEqual('Published', document.getValidationStateTitle())
self.assertEqual(len(self.portal.document_module.contentValues()), 1)
def test_get_information(self):
"""
......
......@@ -26,9 +26,13 @@
##############################################################################
import string, re
import six
redundant_chars='"\'.:;,-+<>()*~' # chars we need to strip from a word before we see if it matches, and from the searchwords to eliminate boolean mode chars
tr=string.maketrans(redundant_chars,' '*len(redundant_chars))
if six.PY2:
tr=string.maketrans(redundant_chars,' '*len(redundant_chars))
else:
tr = str.maketrans('', '', redundant_chars)
class Done(Exception):
pass
......
......@@ -101,8 +101,12 @@ class CachedConvertableMixin:
http://pypi.python.org/pypi/uuid/ to generate
a uuid stored as private property.
"""
format_cache_id = str(makeSortedTuple(kw)).\
translate(string.maketrans('', ''), '[]()<>\'", ')
if six.PY2:
format_cache_id = str(makeSortedTuple(kw)).\
translate(string.maketrans('', ''), '[]()<>\'", ')
else:
format_cache_id = str(makeSortedTuple(kw)).\
translate(str.maketrans('', '', '[]()<>\'", '))
return '%s:%s:%s' % (aq_base(self).getUid(), self.getRevision(),
format_cache_id)
......
......@@ -26,10 +26,11 @@
#
##############################################################################
import six
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import InitializeClass
from Products.ERP5Type import Permissions
from Products.ERP5Type.Utils import guessEncodingFromText
from Products.ERP5Type.Utils import guessEncodingFromText # TODO: guessEncodingFromBytes
from zLOG import LOG, INFO
from email.header import decode_header, HeaderParseError
......@@ -39,20 +40,24 @@ import re
filename_regexp = 'name="([^"]*)"'
def testCharsetAndConvert(text_content, content_type, encoding):
try:
if encoding is not None:
text_content = text_content.decode(encoding).encode('utf-8')
else:
text_content = text_content.decode().encode('utf-8')
except (UnicodeDecodeError, LookupError):
encoding = guessEncodingFromText(text_content, content_type)
if encoding is not None:
try:
text_content = text_content.decode(encoding).encode('utf-8')
except (UnicodeDecodeError, LookupError):
if not isinstance(text_content, six.text_type):
try:
if encoding is not None:
text_content = text_content.decode(encoding)
else:
text_content = text_content.decode()
if six.PY2:
text_content = text_content.encode('utf-8')
except (UnicodeDecodeError, LookupError):
encoding = guessEncodingFromText(text_content, content_type)
if encoding is not None:
try:
text_content = text_content.decode(encoding)
except (UnicodeDecodeError, LookupError):
# TODO: errors= repr ?
text_content = repr(text_content)[1:-1]
else:
text_content = repr(text_content)[1:-1]
else:
text_content = repr(text_content)[1:-1]
return text_content, encoding
......@@ -111,25 +116,25 @@ class MailMessageMixin:
"""
Returns the content information from the header information.
This is used by the metadata discovery system.
Header information is converted in UTF-8 since this is the standard
way of representing strings in ERP5.
"""
result = {}
for (name, value) in self._getMessage().items():
try:
decoded_header = decode_header(value)
decoded_header_parts = decode_header(value)
except HeaderParseError as error_message:
decoded_header = ()
decoded_header_parts = ()
LOG('MailMessageMixin.getContentInformation', INFO,
'Failed to decode %s header of %s with error: %s' %
(name, self.getPath(), error_message))
for text, encoding in decoded_header:
text, encoding = testCharsetAndConvert(text, 'text/plain', encoding)
if name in result:
result[name] = '%s %s' % (result[name], text)
else:
result[name] = text
header_parts = []
for text, encoding in decoded_header_parts:
text, _ = testCharsetAndConvert(text, 'text/plain', encoding)
header_parts.append(text)
if six.PY3:
result[name] = ''.join(header_parts)
else:
# https://bugs.python.org/issue1079
result[name] = ' '.join(header_parts)
return result
security.declareProtected(Permissions.AccessContentsInformation, 'getAttachmentInformationList')
......@@ -215,6 +220,8 @@ class MailMessageMixin:
encoding=part_encoding,
index=index) # add index to generate
# a unique cache key per attachment
if six.PY3:
content = content.encode()
else:
content = part.get_payload(decode=1)
return content
......
import six
from Products.PythonScripts.standard import Object
from ZODB.POSException import ConflictError
from zExceptions import Unauthorized
......@@ -23,6 +24,20 @@ result = []
binary_data_explanation = Base_translateString("Binary data can't be displayed")
base_error_message = Base_translateString('(value retrieval failed)')
def get_value_as_text(value):
"""check if values are unicode convertible (binary are not)
"""
if not isinstance(value, six.text_type):
try:
if isinstance(value, bytes):
value.decode('utf-8')
else:
str(value)
except UnicodeDecodeError:
value = binary_data_explanation
return value
for prop_dict in sorted(context.getPropertyMap(), key=lambda prop: prop['id']):
prop = prop_dict['id']
error = False
......@@ -42,22 +57,9 @@ for prop_dict in sorted(context.getPropertyMap(), key=lambda prop: prop['id']):
error = True
new_value = base_error_message
if new_value != old_value or error:
# check if values are unicode convertible (binary are not)
if isinstance(new_value, (str, unicode)):
try:
unicode(str(new_value), 'utf-8')
except UnicodeDecodeError:
new_value = binary_data_explanation
if isinstance(old_value, (str, unicode)):
try:
unicode(str(old_value), 'utf-8')
except UnicodeDecodeError:
old_value = binary_data_explanation
if isinstance(current_value, (str, unicode)):
try:
unicode(str(current_value), 'utf-8')
except UnicodeDecodeError:
current_value = binary_data_explanation
new_value = get_value_as_text(new_value)
old_value = get_value_as_text(old_value)
current_value = get_value_as_text(current_value)
x = {'property_name': prop,
'new_value': new_value,
'old_value': old_value,
......
......@@ -18,9 +18,13 @@ if is_gadget_mode:
def getRandomDocumentTextExcerpt(document_text):
# try to get somewhat arbitrary choice of searchable attrs
if isinstance(document_text, str) and document_text!='':
document_text = document_text.decode(encoding, 'ignore')
if six.PY2:
document_text = document_text.decode(encoding, 'ignore')
start = min(len(document_text) - 300, 200)
return '... %s ...' %document_text[start:start + max_text_length].encode(encoding)
result = '... %s ...' %document_text[start:start + max_text_length]
if six.PY2:
result = result.encode(encoding)
return result
else:
return ''
......@@ -54,7 +58,8 @@ else:
result = ' '.join(map(str, found_text_fragments))
# Document may contains charactors which utf8 codec cannot decode.
unicode_result = result.decode(encoding, 'ignore')
result = unicode_result.encode(encoding)
if six.PY2:
unicode_result = result.decode(encoding, 'ignore')
result = unicode_result.encode(encoding)
return result
......@@ -63,6 +63,7 @@ from hashlib import md5
from warnings import warn
from six.moves.cPickle import loads, dumps
from copy import deepcopy
import base64
import six
MYSQL_MIN_DATETIME_RESOLUTION = 1/86400.
......@@ -1439,7 +1440,7 @@ class SimulationTool(BaseTool):
if src__:
sql_source_list.append(Resource_zGetInventoryCacheResult(src__=1, **inventory_cache_kw))
if cached_sql_result:
brain_result = loads(cached_sql_result[0].result)
brain_result = loads(base64.b64decode(cached_sql_result[0].result))
# Rebuild the brains
cached_result = Results(
(brain_result['items'], brain_result['data']),
......@@ -1488,10 +1489,10 @@ class SimulationTool(BaseTool):
self.Resource_zInsertInventoryCacheResult(
query=sql_text_hash,
date=cached_date,
result=dumps({
result=base64.b64encode(dumps({
'items': result.__items__,
'data': result._data,
}),
})),
)
else:
# Cache miss and this getInventory() not specifying to_date,
......
......@@ -48,7 +48,7 @@ class TestFormPrintoutMixin(ERP5TypeTestCase):
'''return odf document from the printout
'''
document_file = getattr(self.portal, printout_form.template, None)
document_file = StringIO(document_file).read()
document_file = bytes(document_file)
if document_file is not None:
return document_file
raise ValueError ('%s template not found' % printout_form.template)
......
......@@ -17,3 +17,72 @@ def initialize(registry):
mime.extensions = tuple(x)
MimeTypesRegistry.initialize = initialize
# patched from https://github.com/plone/Products.MimetypesRegistry/blob/2.1.8/Products/MimetypesRegistry/MimeTypesRegistry.py#L305-L359
# to change type of `data`. Originally, Products.MimetypesRegistry only "native str" for data, but this data is passed
# to magic.guessMime(data), which expects bytes and later before passing it to guess_content_type
# it is .encode()'ed, which so this expectes str - which is inconsistent.
# This relaxes the data type to tolerate bytes or str on python3
from Products.MimetypesRegistry.mime_types import magic
from zope.contenttype import guess_content_type
from Acquisition import aq_base
def classify(self, data, mimetype=None, filename=None):
"""Classify works as follows:
1) you tell me the rfc-2046 name and I give you an IMimetype
object
2) the filename includes an extension from which we can guess
the mimetype
3) we can optionally introspect the data
4) default to self.defaultMimetype if no data was provided
else to application/octet-stream of no filename was provided,
else to text/plain
Return an IMimetype object or None
"""
mt = None
if mimetype:
mt = self.lookup(mimetype)
if mt:
mt = mt[0]
elif filename:
mt = self.lookupExtension(filename)
if mt is None:
mt = self.globFilename(filename)
if data and not mt:
for c in self._classifiers():
if c.classify(data):
mt = c
break
if not mt:
if six.PY3 and isinstance(data, str): # <<<< patch allow bytes or str
data = data.encode()
mstr = magic.guessMime(data)
if mstr:
_mt = self.lookup(mstr)
if len(_mt) > 0:
mt = _mt[0]
if not mt:
if not data:
mtlist = self.lookup(self.defaultMimetype)
elif filename:
mtlist = self.lookup('application/octet-stream')
else:
failed = 'text/x-unknown-content-type'
filename = filename or ''
data = data or ''
if six.PY3 and isinstance(data, str): # <<<< patch allow bytes or str
data = data.encode()
ct, enc = guess_content_type(filename, data, None)
if ct == failed:
ct = 'text/plain'
mtlist = self.lookup(ct)
if len(mtlist) > 0:
mt = mtlist[0]
else:
return None
# Remove acquisition wrappers
return aq_base(mt)
MimeTypesRegistry.MimeTypesRegistry.classify = classify
......@@ -46,7 +46,7 @@ def PropertyManager_updateProperty(self, id, value, local_properties=False):
if not hasattr(self, 'isRADContent'):
if not self.hasProperty(id):
raise BadRequest('The property %s does not exist' % escape(id))
if isinstance(value, str):
if isinstance(value, (str, bytes)):
proptype=self.getPropertyType(id, local_properties=local_properties) \
or 'string'
if proptype in type_converters:
......
......@@ -42,7 +42,7 @@ from ZPublisher.HTTPResponse import HTTPResponse
from zExceptions.ExceptionFormatter import format_exception
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5Type.tests.runUnitTest import log_directory
from Products.ERP5Type.Utils import stopProcess, PR_SET_PDEATHSIG
from Products.ERP5Type.Utils import stopProcess, PR_SET_PDEATHSIG, unicode2str
from lxml import etree
from lxml.html.builder import E
import certifi
......@@ -397,7 +397,7 @@ class FunctionalTestRunner:
for test_tr in test_table.xpath('.//tr[contains(@class, "status_failed")]'):
test_tr.set('style', 'background-color: red;')
details_attribute_dict = {}
if etree.tostring(test_table).find("expected failure") != -1:
if u"expected failure" in etree.tostring(test_table, encoding="unicode"):
expected_failure_amount += 1
else:
failure_amount += 1
......@@ -405,7 +405,7 @@ class FunctionalTestRunner:
details_attribute_dict['open'] = 'true'
detail_element = E.div()
detail_element.append(E.details(E.summary(test_name), test_table, **details_attribute_dict))
detail += etree.tostring(detail_element)
detail += unicode2str(etree.tostring(detail_element, encoding="unicode"))
tr_count += 1
success_amount = tr_count - 1 - failure_amount - expected_failure_amount
if detail:
......
......@@ -912,6 +912,8 @@ def normalizeFullWidthNumber(value):
if value[0] in fullwidth_minus_character_list:
value = u'-' + value[1:]
value = value.encode('ASCII', 'ignore')
if six.PY3:
value = value.decode()
except UnicodeDecodeError:
pass
return value
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import six
from . import XMLObjects
from Products.Formulator.TALESField import TALESMethod
from Products.Formulator.MethodField import Method
......@@ -127,7 +128,7 @@ def XMLToForm(s, form, override_encoding=None):
form.remove_group('Default')
def encode(text, encoding):
if encoding is None:
if six.PY3 or encoding is None:
return text
else:
return text.encode(encoding)
......@@ -226,7 +226,7 @@ class StrippingParser(HTMLParser):
def handle_data(self, data):
if self.suppress: return
data = html_quote(data)
if self.original_charset and isinstance(data, str):
if self.original_charset and isinstance(data, bytes):
data = data.decode(self.original_charset)
self.result.append(data)
......@@ -332,7 +332,7 @@ class StrippingParser(HTMLParser):
k = len(self.rawdata)
data = self.rawdata[i+9:k]
j = k+3
if self.original_charset and isinstance(data, str):
if self.original_charset and isinstance(data, bytes):
data = data.decode(self.original_charset)
self.result.append("<![CDATA[%s]]>" % data)
else:
......@@ -378,7 +378,7 @@ def scrubHTML(html, valid=VALID_TAGS, nasty=NASTY_TAGS,
parser.feed(html)
parser.close()
result = parser.getResult()
if parser.original_charset and isinstance(result, str):
if parser.original_charset and isinstance(result, bytes):
result = result.decode(parser.original_charset).encode(default_encoding)
return result
......
import six
from Products.PortalTransforms.interfaces import ITransform
from zope.interface import implementer
from DocumentTemplate.html_quote import html_quote
......@@ -30,6 +31,7 @@ class TextPreToHTML:
raise AttributeError(attr)
def convert(self, orig, data, **kwargs):
orig = six.ensure_text(orig, errors='replace')
data.setData('<pre class="data">%s</pre>' % html_quote(orig))
return data
......
import six
from Products.PortalTransforms.interfaces import ITransform
from zope.interface import implementer
from DocumentTemplate.html_quote import html_quote
......@@ -30,6 +31,7 @@ class TextToHTML:
raise AttributeError(attr)
def convert(self, orig, data, **kwargs):
orig = six.ensure_text(orig, errors='replace')
# Replaces all line breaks with a br tag, and wraps it in a p tag.
data.setData('<p>%s</p>' % html_quote(orig.strip()).replace('\n', '<br />'))
return data
......
......@@ -46,7 +46,11 @@ class ZSQLBrain(Acquisition.Implicit):
return self.path
def getPath(self):
return self.path
path = self.path
# TODO py3: understand why this is bytes sometimes
if not isinstance(path, str):
path = path.decode()
return path
def getUid(self):
return self.uid
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment