Commit efbc429f authored by Jérome Perrin's avatar Jérome Perrin

CRM py3

parent 4a9161db
......@@ -50,9 +50,6 @@ def makeFilePath(name):
return os.path.join(os.path.dirname(Products.ERP5.tests.__file__),
'test_data', 'crm_emails', name)
def makeFileUpload(name):
path = makeFilePath(name)
return FileUpload(path, name)
clear_module_name_list = """
campaign_module
......@@ -80,6 +77,13 @@ class BaseTestCRM(ERP5TypeTestCase):
self.tic()
super(BaseTestCRM, self).beforeTearDown()
def makeFileUpload(self, name):
path = makeFilePath(name)
fu = FileUpload(path, name)
self.addCleanup(fu.close)
return fu
class TestCRM(BaseTestCRM):
def getTitle(self):
return "CRM"
......@@ -885,67 +889,67 @@ class TestCRMMailIngestion(BaseTestCRM):
return object_list[-1]
portal = self.portal
message = message_from_string(self._readTestData('simple'))
message = message_from_string(self._readTestData('simple').decode())
message.replace_header('subject', 'Visit:Company A')
data = message.as_string()
data = message.as_string().encode()
self._ingestMail(data=data)
self.tic()
document = getLastCreatedEvent(portal.event_module)
self.assertEqual(document.getPortalType(), 'Visit')
message = message_from_string(self._readTestData('simple'))
message = message_from_string(self._readTestData('simple').decode())
message.replace_header('subject', 'Fax:Company B')
data = message.as_string()
data = message.as_string().encode()
self._ingestMail(data=data)
self.tic()
document = getLastCreatedEvent(portal.event_module)
self.assertEqual(document.getPortalType(), 'Fax Message')
message = message_from_string(self._readTestData('simple'))
message = message_from_string(self._readTestData('simple').decode())
message.replace_header('subject', 'TEST:Company B')
data = message.as_string()
data = message.as_string().encode()
self._ingestMail(data=data)
self.tic()
document = getLastCreatedEvent(portal.event_module)
self.assertEqual(document.getPortalType(), 'Mail Message')
message = message_from_string(self._readTestData('simple'))
message = message_from_string(self._readTestData('simple').decode())
message.replace_header('subject', 'visit:Company A')
data = message.as_string()
data = message.as_string().encode()
self._ingestMail(data=data)
self.tic()
document = getLastCreatedEvent(portal.event_module)
self.assertEqual(document.getPortalType(), 'Visit')
message = message_from_string(self._readTestData('simple'))
message = message_from_string(self._readTestData('simple').decode())
message.replace_header('subject', 'phone:Company B')
data = message.as_string()
data = message.as_string().encode()
self._ingestMail(data=data)
self.tic()
document = portal.event_module[portal.event_module.objectIds()[-1]]
self.assertEqual(document.getPortalType(), 'Phone Call')
message = message_from_string(self._readTestData('simple'))
message = message_from_string(self._readTestData('simple').decode())
message.replace_header('subject', 'LETTER:Company C')
data = message.as_string()
data = message.as_string().encode()
self._ingestMail(data=data)
self.tic()
document = getLastCreatedEvent(portal.event_module)
self.assertEqual(document.getPortalType(), 'Letter')
message = message_from_string(self._readTestData('simple'))
message = message_from_string(self._readTestData('simple').decode())
body = message.get_payload()
message.set_payload('Visit:%s' % body)
data = message.as_string()
data = message.as_string().encode()
self._ingestMail(data=data)
self.tic()
document = getLastCreatedEvent(portal.event_module)
self.assertEqual(document.getPortalType(), 'Visit')
message = message_from_string(self._readTestData('simple'))
message = message_from_string(self._readTestData('simple').decode())
body = message.get_payload()
message.set_payload('PHONE CALL:%s' % body)
data = message.as_string()
data = message.as_string().encode()
self._ingestMail(data=data)
self.tic()
document = getLastCreatedEvent(portal.event_module)
......@@ -1006,23 +1010,23 @@ class TestCRMMailIngestion(BaseTestCRM):
self.tic()
stripped_html = document.asStrippedHTML()
self.assertNotIn('<form', stripped_html)
self.assertNotIn('<form', document.getAttachmentData(4))
self.assertEqual('This is my content.\n*ERP5* is a Free _Software_\n',
self.assertNotIn(b'<form', document.getAttachmentData(4))
self.assertEqual(b'This is my content.\n*ERP5* is a Free _Software_\n',
document.getAttachmentData(2))
self.assertEqual('text/html', document.getContentType())
self.assertEqual('\n<html>\n<head>\n\n<meta http-equiv="content-type"'\
' content="text/html; charset=utf-8" />\n'\
'</head>\n<body text="#000000"'\
' bgcolor="#ffffff">\nThis is my content.<br />\n'\
'<b>ERP5</b> is a Free <u>Software</u><br />'\
'\n\n</body>\n</html>\n', document.getAttachmentData(3))
self.assertEqual(document.getAttachmentData(3), document.getTextContent())
self.assertEqual(b'\n<html>\n<head>\n\n<meta http-equiv="content-type"'\
b' content="text/html; charset=utf-8" />\n'\
b'</head>\n<body text="#000000"'\
b' bgcolor="#ffffff">\nThis is my content.<br />\n'\
b'<b>ERP5</b> is a Free <u>Software</u><br />'\
b'\n\n</body>\n</html>\n', document.getAttachmentData(3))
self.assertEqual(document.getAttachmentData(3), document.getTextContent().encode())
# now check a message with multipart/mixed
mixed_document = self._ingestMail(filename='sample_html_attachment')
self.tic()
self.assertEqual(mixed_document.getAttachmentData(1),
mixed_document.getTextContent())
mixed_document.getTextContent().encode())
self.assertEqual('Hi, this is the Message.\nERP5 is a free software.\n\n',
mixed_document.getTextContent())
self.assertEqual('text/plain', mixed_document.getContentType())
......@@ -1035,7 +1039,8 @@ class TestCRMMailIngestion(BaseTestCRM):
file_path = '%s/test_data/%s' % (
os.path.dirname(Products.ERP5.tests.__file__),
html_filename)
html_message = open(file_path, 'r').read()
with open(file_path, 'rb') as f:
html_message = f.read()
message = MIMEMultipart('alternative')
message.attach(MIMEText('text plain content', _charset='utf-8'))
part = MIMEBase('text', 'html')
......@@ -1046,7 +1051,7 @@ class TestCRMMailIngestion(BaseTestCRM):
part.add_header('Content-ID', '<%s>' % \
''.join(['%s' % ord(i) for i in html_filename]))
message.attach(part)
event.setData(message.as_string())
event.setData(message.as_string().encode())
self.tic()
self.assertIn('html', event.getTextContent())
self.assertEqual(len(event.getAttachmentInformationList()), 2)
......@@ -1153,14 +1158,14 @@ class TestCRMMailSend(BaseTestCRM):
self.assertEqual('"Me," <me@erp5.org>', mfrom)
self.assertEqual(['"Recipient," <recipient@example.com>'], mto)
self.assertEqual(event.getTextContent(), text_content)
message = message_from_string(messageText)
message = message_from_string(messageText.decode())
self.assertEqual('A Mail', decode_header(message['Subject'])[0][0])
part = None
for i in message.get_payload():
if i.get_content_type()=='text/plain':
part = i
self.assertEqual(text_content, part.get_payload(decode=True))
self.assertEqual(text_content, part.get_payload(decode=True).decode())
#
# Test multiple recipients.
......@@ -1237,13 +1242,13 @@ class TestCRMMailSend(BaseTestCRM):
self.assertEqual('"Me," <me@erp5.org>', mfrom)
self.assertEqual(['"Recipient," <recipient@example.com>'], mto)
message = message_from_string(messageText)
message = message_from_string(messageText.decode())
part = None
for i in message.get_payload():
if i.get_content_type()=='text/html':
part = i
self.assertNotEqual(part, None)
self.assertEqual('<html><body>%s</body></html>' % text_content, part.get_payload(decode=True))
self.assertEqual('<html><body>%s</body></html>' % text_content, part.get_payload(decode=True).decode())
def test_MailMessageEncoding(self):
# test sending a mail message with non ascii characters
......@@ -1260,16 +1265,16 @@ class TestCRMMailSend(BaseTestCRM):
self.assertEqual('=?utf-8?q?Me=2C_=F0=9F=90=88_fan?= <me@erp5.org>', mfrom)
self.assertEqual(['=?utf-8?q?Recipient=2C_=F0=9F=90=88_fan?= <recipient@example.com>'], mto)
message = message_from_string(messageText)
message = message_from_string(messageText.decode())
self.assertEqual('Héhé', decode_header(message['Subject'])[0][0])
self.assertEqual('Me, 🐈 fan', decode_header(message['From'])[0][0])
self.assertEqual('Recipient, 🐈 fan', decode_header(message['To'])[0][0])
self.assertEqual(u'Héhé', decode_header(message['Subject'])[0][0].decode('utf-8'))
self.assertEqual(u'Me, 🐈 fan', decode_header(message['From'])[0][0].decode('utf-8'))
self.assertEqual(u'Recipient, 🐈 fan', decode_header(message['To'])[0][0].decode('utf-8'))
part = None
for i in message.get_payload():
if i.get_content_type()=='text/plain':
part = i
self.assertEqual('Hàhà', part.get_payload(decode=True))
self.assertEqual(u'Hàhà', part.get_payload(decode=True).decode('utf-8'))
def test_MailAttachmentPdf(self):
"""
......@@ -1278,7 +1283,7 @@ class TestCRMMailSend(BaseTestCRM):
# Add a document which will be attached.
# pdf
filename = 'sample_attachment.pdf'
file_object = makeFileUpload(filename)
file_object = self.makeFileUpload(filename)
document = self.portal.portal_contributions.newContent(file=file_object)
self.tic()
......@@ -1325,7 +1330,7 @@ class TestCRMMailSend(BaseTestCRM):
"""
# Add a document which will be attached.
filename = 'sample_attachment.odt'
file_object = makeFileUpload(filename)
file_object = self.makeFileUpload(filename)
document = self.portal.portal_contributions.newContent(file=file_object)
self.tic()
......@@ -1372,7 +1377,7 @@ class TestCRMMailSend(BaseTestCRM):
"""
# Add a document which will be attached.
filename = 'sample_attachment.zip'
file_object = makeFileUpload(filename)
file_object = self.makeFileUpload(filename)
document = self.portal.portal_contributions.newContent(file=file_object)
self.tic()
......@@ -1417,7 +1422,7 @@ class TestCRMMailSend(BaseTestCRM):
"""
# Add a document which will be attached.
filename = 'sample_attachment.gif'
file_object = makeFileUpload(filename)
file_object = self.makeFileUpload(filename)
document = self.portal.portal_contributions.newContent(file=file_object)
self.tic()
......@@ -1504,7 +1509,7 @@ class TestCRMMailSend(BaseTestCRM):
if i.get_filename() == filename:
part = i
self.assertEqual(part.get_payload(decode=True),
document.getTextContent())
document.getTextContent().encode('utf-8'))
self.assertEqual(part.get_content_type(), 'text/html')
def test_AttachPdfToMailUsingNewEventDialog(self):
......@@ -1514,7 +1519,7 @@ class TestCRMMailSend(BaseTestCRM):
# Add a document which will be attached.
# pdf
filename = 'sample_attachment.pdf'
file_object = makeFileUpload(filename)
file_object = self.makeFileUpload(filename)
# Add a ticket
ticket = self.portal.campaign_module.newContent(portal_type='Campaign',
......@@ -1562,7 +1567,7 @@ class TestCRMMailSend(BaseTestCRM):
"""
# Add a document which will be attached.
filename = 'sample_attachment.zip'
file_object = makeFileUpload(filename)
file_object = self.makeFileUpload(filename)
# Add a ticket
ticket = self.portal.campaign_module.newContent(portal_type='Campaign',
......@@ -1611,7 +1616,7 @@ class TestCRMMailSend(BaseTestCRM):
"""
# Add a document which will be attached.
filename = 'sample_attachment.zip'
file_object = makeFileUpload(filename)
file_object = self.makeFileUpload(filename)
# Add a ticket
ticket = self.portal.campaign_module.newContent(portal_type='Campaign',
......@@ -1688,7 +1693,7 @@ class TestCRMMailSend(BaseTestCRM):
# Add a document on a person which will be attached.
def add_document(filename, container, portal_type):
f = makeFileUpload(filename)
f = self.makeFileUpload(filename)
document = container.newContent(portal_type=portal_type)
document.edit(file=f, reference=filename)
return document
......@@ -1745,7 +1750,7 @@ class TestCRMMailSend(BaseTestCRM):
# Add a document on a person which will be attached.
def add_document(filename, container, portal_type):
f = makeFileUpload(filename)
f = self.makeFileUpload(filename)
document = container.newContent(portal_type=portal_type)
document.edit(file=f, reference=filename)
return document
......@@ -1821,7 +1826,7 @@ class TestCRMMailSend(BaseTestCRM):
self.tic()
new_event = event.Base_createCloneDocument(batch_mode=1)
self.assertFalse(new_event.hasFile(), '%r has a file' % (new_event,))
self.assertEqual(new_event.getData(), '')
self.assertEqual(new_event.getData(), b'')
self.assertEqual(new_event.getTitle(), real_title)
self.assertEqual(new_event.getTextContent(), real_content)
self.assertNotEqual(new_event.getReference(), event.getReference())
......@@ -2036,8 +2041,8 @@ class TestCRMMailSend(BaseTestCRM):
self.assertEqual(5, len(self.portal.MailHost._message_list))
for message_info in self.portal.MailHost._message_list:
self.assertIn(mail_text_content, message_info[-1])
message = message_from_string(message_info[-1])
self.assertIn(mail_text_content, message_info[-1].decode())
message = message_from_string(message_info[-1].decode())
self.assertTrue(DateTime(message.get("Date")).isCurrentDay())
def test_MailMessage_send_simple_case(self):
......@@ -2069,7 +2074,7 @@ class TestCRMMailSend(BaseTestCRM):
mail_message.send(extra_header_dict={"X-test-header": "test"})
self.tic()
(_, _, last_message,), = self.portal.MailHost._message_list
message = message_from_string(last_message)
message = message_from_string(last_message.decode())
self.assertEqual("test", message.get("X-test-header"))
......
......@@ -49,9 +49,10 @@ except ImportError:
"""
if six.PY2:
from email import message_from_string as message_from_x
from email import message_from_string as message_from_bytes
else:
from email import message_from_bytes as message_from_x
from email import message_from_bytes
from email.utils import parsedate_tz, mktime_tz
DEFAULT_TEXT_FORMAT = 'text/html'
......@@ -162,7 +163,7 @@ class EmailDocument(TextDocument, MailMessageMixin):
# store it in 'data' property.
result = getattr(self, '_v_message', None)
if result is None:
data = self.getData()
data = bytes(self.getData() or b'')
if not data:
# Generated a mail message temporarily to provide backward compatibility.
document_type_list = list(self.getPortalEmbeddedDocumentTypeList()) + list(self.getPortalDocumentTypeList())
......@@ -174,7 +175,9 @@ class EmailDocument(TextDocument, MailMessageMixin):
content_type=self.getContentType(),
embedded_file_list=self.getAggregateValueList(portal_type=document_type_list),
)
result = message_from_x(data)
if six.PY3:
data = data.encode()
result = message_from_bytes(data)
self._v_message = result
return result
......
......@@ -205,9 +205,9 @@ class File(Document, OFS_File):
security.declareProtected(Permissions.AccessContentsInformation, 'getMimeTypeAndContent')
def getMimeTypeAndContent(self):
# type: () -> tuple[str, bytes]
"""This method returns a tuple which contains mimetype and content."""
from erp5.component.document.EmailDocument import MimeTypeException
# return a tuple (mime_type, data)
content = None
mime_type = self.getContentType()
......@@ -229,6 +229,8 @@ class File(Document, OFS_File):
elif getattr(self, 'getBaseData', None) is not None:
content = self.getBaseData()
if isinstance(content, six.text_type):
content = content.encode('utf-8')
if content and not isinstance(content, bytes):
content = bytes(content)
......
......@@ -40,22 +40,24 @@ import six
filename_regexp = 'name="([^"]*)"'
def testCharsetAndConvert(text_content, content_type, encoding):
try:
if encoding is not None:
text_content = text_content.decode(encoding)
else:
if six.PY2:
text_content = text_content.decode().encode('utf-8')
except (UnicodeDecodeError, LookupError):
encoding = guessEncodingFromText(text_content, content_type)
if encoding is not None:
try:
if not isinstance(text_content, six.text_type):
try:
if encoding is not None:
text_content = text_content.decode(encoding)
except (UnicodeDecodeError, LookupError):
# TODO: errors= repr ?
else:
text_content = text_content.decode()
if six.PY2:
text_content = text_content.encode('utf-8')
except (UnicodeDecodeError, LookupError):
encoding = guessEncodingFromText(text_content, content_type)
if encoding is not None:
try:
text_content = text_content.decode(encoding)
except (UnicodeDecodeError, LookupError):
# TODO: errors= repr ?
text_content = repr(text_content)[1:-1]
else:
text_content = repr(text_content)[1:-1]
else:
text_content = repr(text_content)[1:-1]
return text_content, encoding
......@@ -118,18 +120,21 @@ class MailMessageMixin:
result = {}
for (name, value) in self._getMessage().items():
try:
decoded_header = decode_header(value)
decoded_header_parts = decode_header(value)
except HeaderParseError as error_message:
decoded_header = ()
decoded_header_parts = ()
LOG('MailMessageMixin.getContentInformation', INFO,
'Failed to decode %s header of %s with error: %s' %
(name, self.getPath(), error_message))
for text, encoding in decoded_header:
text, encoding = testCharsetAndConvert(text, 'text/plain', encoding)
if name in result:
result[name] = '%s %s' % (result[name], text)
else:
result[name] = text
header_parts = []
for text, encoding in decoded_header_parts:
text, _ = testCharsetAndConvert(text, 'text/plain', encoding)
header_parts.append(text)
if six.PY3:
result[name] = ''.join(header_parts)
else:
# https://bugs.python.org/issue1079
result[name] = ' '.join(header_parts)
return result
security.declareProtected(Permissions.AccessContentsInformation, 'getAttachmentInformationList')
......@@ -215,6 +220,8 @@ class MailMessageMixin:
encoding=part_encoding,
index=index) # add index to generate
# a unique cache key per attachment
if six.PY3:
content = content.encode()
else:
content = part.get_payload(decode=1)
return content
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment