WIP: Why?
... | ... | @@ -32,6 +32,7 @@ if six.PY2: |
from email import message_from_string as message_from_bytes | ||
else: | ||
from email import message_from_bytes | ||
from email.generator import Generator | ||
from Products.ERP5Type.tests.ERP5TypeLiveTestCase import ERP5TypeTestCase | ||
from Products.ERP5Type.tests.Sequence import SequenceList | ||
... | ... | @@ -39,6 +40,41 @@ from Products.ERP5Type.Utils import bytes2str, str2bytes |
from Products.ZSQLCatalog.SQLCatalog import SimpleQuery | ||
from DateTime import DateTime | ||
from six import StringIO | ||
import re | ||
def normalize_email_bytes(email_bytes): | ||
# type: (bytes) -> str | ||
""" | ||
Normalizes the representation of email text, so that it can be compared. | ||
The fields of the message are written in a predefined order, with | ||
the `unixfrom` field removed and no line wrapping. | ||
The code is intended to be compatible with both Python 2 and Python 3. | ||
|
||
Args: | ||
email_bytes: Content of the email, including headers. | ||
Returns: | ||
Normalized string representation of the e-mail contents. | ||
""" | ||
# Unfolding removes newlines followed by whitespace, as per RFC5322. | ||
# This SHOULD be done by Python itself, but seemingly no-one cared | ||
# enough. | ||
email_bytes_unfolded = re.sub( | ||
br"\r?\n(?P<space>\s)", | ||
br"\g<space>", | ||
email_bytes, | ||
) | ||
msg = message_from_bytes(email_bytes_unfolded) | ||
fp = StringIO() | ||
g = Generator(fp, mangle_from_=False, maxheaderlen=0) | ||
g.flatten(msg) | ||
return fp.getvalue() | ||
class TestInterfacePost(ERP5TypeTestCase): | ||
""" | ||
... | ... | @@ -253,7 +289,10 @@ class TestInterfacePost(ERP5TypeTestCase): |
last_message, = self.portal.MailHost._message_list | ||
self.assertNotEqual((), last_message) | ||
_, _, message_text = last_message | ||
self.assertIn(message_text, sequence['internet_message_post'].getData()) | ||
self.assertEqual( | ||
normalize_email_bytes(message_text), | ||
normalize_email_bytes(sequence['internet_message_post'].getData()), | ||
) | ||
def _getMailHostMessageForRecipient(self, recipient_email_address): | ||
message_list = self.portal.MailHost._message_list | ||
... | ... | @@ -274,7 +313,10 @@ class TestInterfacePost(ERP5TypeTestCase): |
self.assertEqual(len(message_list), 1) | ||
message = message_list[0] | ||
_, _, message_text = message | ||
self.assertIn(message_text, post.getData()) | ||
self.assertEqual( | ||
normalize_email_bytes(message_text), | ||
normalize_email_bytes(post.getData()), | ||
) | ||
def stepCheckMailMessagePreviewDisplaysLatestInternetMessagePostData(self, sequence=None, sequence_list=None): | ||
mail_message = sequence['mail_message'] | ||
... | ... |
... | ... | @@ -31,6 +31,9 @@ from Products.ERP5Type import PropertySheet |
from Products.ERP5Type.Permissions import AccessContentsInformation | ||
from Products.ERP5Type.Base import Base | ||
import six | ||
import zope.component | ||
import zope.interface | ||
from ZPublisher.interfaces import IXmlrpcChecker | ||
try: | ||
from spyne import MethodContext | ||
except ImportError: | ||
... | ... | @@ -42,6 +45,20 @@ else: |
from spyne.server.http import HttpBase | ||
_default_xmrpc_checker = zope.component.queryUtility(IXmlrpcChecker) | ||
@zope.interface.implementer(IXmlrpcChecker) | ||
def soap_xmlrpc_checker(request): | ||
if request.getHeader('SOAPACTION'): | ||
return False | ||
return _default_xmrpc_checker is None or _default_xmrpc_checker(request) | ||
zope.component.getGlobalSiteManager().registerUtility( | ||
soap_xmlrpc_checker, IXmlrpcChecker) | ||
|
||
class SOAPBinding(Base): | ||
meta_type = 'ERP5 SOAP Binding' | ||
... | ... |
... | ... | @@ -62,6 +62,10 @@ class TestTradeModelLineMixin(TestBPMMixin, UserDict): |
order_date = DateTime() | ||
amount_generator_line_portal_type = 'Trade Model Line' | ||
# XXX so that unittest.suite._isnotsuite return False | ||
def __iter__(self): | ||
raise TypeError() | ||
|
||
def setBaseAmountQuantityMethod(self, base_amount_id, text): | ||
"""Populate TradeModelLine_getBaseAmountQuantityMethod shared script | ||
... | ... | @@ -416,6 +420,7 @@ class TestTradeModelLine(TestTradeModelLineMixin): |
expected_result_dict = self[delivery.getPath()] | ||
for line in delivery.getMovementList(): | ||
currency_precision = line.getPricePrecision() | ||
simulation_movement_list_list = self.getTradeModelSimulationMovementList(line) | ||
self.assertEqual(len(simulation_movement_list_list), 1) | ||
simulation_movement_list = simulation_movement_list_list[0] | ||
... | ... | @@ -426,7 +431,7 @@ class TestTradeModelLine(TestTradeModelLineMixin): |
for use in 'discount', 'tax': | ||
total_price = expected_result_dict[use].get(line.getId()) or 0.0 | ||
sm = result_dict.pop(use) | ||
self.assertEqual(str(sm.getTotalPrice() or 0.0), str(total_price)) | ||
self.assertEqual(round(sm.getTotalPrice() or 0.0, currency_precision), round(total_price, currency_precision)) | ||
|
||
self.assertEqual(3, len(sm.getCausalityValueList())) | ||
self.assertEqual(1, len(sm.getCausalityValueList( | ||
portal_type=self.business_link_portal_type))) | ||
... | ... | @@ -466,12 +471,12 @@ class TestTradeModelLine(TestTradeModelLineMixin): |
rounded_total_price = round(line_dict['normal'], currency_precision) | ||
rounded_tax_price = round(line_dict['tax'], currency_precision) | ||
rounded_discount_price = round(line_dict['discount'], currency_precision) | ||
self.assertEqual(str(abs(line_dict['payable_receivable'])), | ||
str(rounded_total_price + rounded_tax_price + rounded_discount_price)) | ||
self.assertEqual(str(abs(line_dict['vat'])), | ||
str(rounded_tax_price)) | ||
self.assertEqual(str(abs(line_dict['income_expense'])), | ||
str(rounded_total_price + rounded_discount_price)) | ||
self.assertEqual(round(abs(line_dict['payable_receivable']), currency_precision), | ||
round(rounded_total_price + rounded_tax_price + rounded_discount_price, currency_precision)) | ||
self.assertEqual(round(abs(line_dict['vat']), currency_precision), | ||
rounded_tax_price) | ||
self.assertEqual(round(abs(line_dict['income_expense']), currency_precision), | ||
round(rounded_total_price + rounded_discount_price, currency_precision)) | ||
|
||
def buildPackingLists(self): | ||
self.portal.portal_alarms.packing_list_builder_alarm.activeSense() | ||
... | ... |
... | ... | @@ -376,7 +376,7 @@ class PortalTypeMetaClass(GhostBaseMetaClass, PropertyHolder): |
for key, value in six.iteritems(attribute_dict): | ||
setattr(klass, key, value) | ||
if getattr(klass.__setstate__, 'im_func', None) is \ | ||
if getattr(klass.__setstate__, '__func__', klass.__setstate__) is \ | ||
|
||
persistent_migration.__setstate__: | ||
# optimization to reduce overhead of compatibility code | ||
klass.__setstate__ = persistent_migration.Base__setstate__ | ||
... | ... |
... | ... | @@ -34,6 +34,7 @@ |
# class may be copied in the pickle of the container, and we can't access it | ||
# from __setstate__. | ||
import six | ||
import logging, re | ||
from AccessControl import ClassSecurityInfo | ||
from Acquisition import aq_base | ||
... | ... | @@ -97,7 +98,6 @@ class PickleUpdater(ObjectReader, ObjectWriter, object): |
if _setOb: | ||
if isinstance(_setOb, WorkflowMethod): | ||
_setOb = _setOb._m | ||
import six | ||
if six.get_unbound_function(_setOb) is six.get_unbound_function(OFS_Folder._setOb): | ||
self.lazy = Ghost | ||
elif klass.__module__[:7] == 'BTrees.' and klass.__name__ != 'Length': | ||
... | ... | @@ -114,8 +114,12 @@ class PickleUpdater(ObjectReader, ObjectWriter, object): |
self.do_migrate = args != (klass.__module__, klass.__name__) and \ | ||
not isOldBTree('%s.%s' % args) | ||
unpickler.find_global = self._get_class | ||
if six.PY3: | ||
unpickler.find_class = self._get_class | ||
return self._get_class(*args) | ||
unpickler.find_global = find_global | ||
if six.PY3: | ||
unpickler.find_class = find_global | ||
|
||
unpickler.load() # class | ||
state = unpickler.load() | ||
if isinstance(self.lazy, LazyPersistent): | ||
... | ... |
... | ... | @@ -46,7 +46,11 @@ class ZSQLBrain(Acquisition.Implicit): |
return self.path | ||
def getPath(self): | ||
return self.path | ||
path = self.path | ||
# TODO py3: understand why this is bytes sometimes | ||
|
||
if not isinstance(path, str): | ||
path = path.decode() | ||
return path | ||
def getUid(self): | ||
return self.uid | ||
... | ... |