Commit b5890a9b authored by Tres Seaver's avatar Tres Seaver

Ensure that mailhosts sharing a queue do not double-deliver mails

Arrange this by sharing the thread which processes emails for
that directory.

See:  https://bugs.launchpad.net/zope2/+bug/574286
parent 06a2a774
...@@ -8,6 +8,10 @@ Zope Changes ...@@ -8,6 +8,10 @@ Zope Changes
Bugs Fixed Bugs Fixed
- Ensure that mailhosts which share a queue directory do not double-
deliver mails, by sharing the thread which processes emails for
that directory. https://bugs.launchpad.net/zope2/+bug/574286
- Process "evil" JSON cookies which contain double quotes in violation - Process "evil" JSON cookies which contain double quotes in violation
of RFC 2965 / 2616. https://bugs.launchpad.net/zope2/+bug/563229 of RFC 2965 / 2616. https://bugs.launchpad.net/zope2/+bug/563229
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
$Id$ $Id$
""" """
from os.path import realpath
import mimetools import mimetools
import rfc822 import rfc822
import time import time
...@@ -202,32 +203,37 @@ class MailBase(Acquisition.Implicit, OFS.SimpleItem.Item, RoleManager): ...@@ -202,32 +203,37 @@ class MailBase(Acquisition.Implicit, OFS.SimpleItem.Item, RoleManager):
force_tls=self.force_tls force_tls=self.force_tls
) )
security.declarePrivate('_getThreadKey')
def _getThreadKey(self):
""" Return the key used to find our processor thread.
"""
return realpath(self.smtp_queue_directory)
@synchronized(lock) @synchronized(lock)
def _stopQueueProcessorThread(self): def _stopQueueProcessorThread(self):
""" Stop thread for processing the mail queue """ """ Stop thread for processing the mail queue """
key = self._getThreadKey()
path = self.absolute_url(1) if queue_threads.has_key(key):
if queue_threads.has_key(path): thread = queue_threads[key]
thread = queue_threads[path]
thread.stop() thread.stop()
while thread.isAlive(): while thread.isAlive():
# wait until thread is really dead # wait until thread is really dead
time.sleep(0.3) time.sleep(0.3)
del queue_threads[path] del queue_threads[path]
LOG.info('Thread for %s stopped' % path) LOG.info('Thread for %s stopped' % key)
@synchronized(lock) @synchronized(lock)
def _startQueueProcessorThread(self): def _startQueueProcessorThread(self):
""" Start thread for processing the mail queue """ """ Start thread for processing the mail queue
"""
path = self.absolute_url(1) key = self._getThreadKey()
if not queue_threads.has_key(path): if not queue_threads.has_key(key):
thread = QueueProcessorThread() thread = QueueProcessorThread()
thread.setMailer(self._makeMailer()) thread.setMailer(self._makeMailer())
thread.setQueuePath(self.smtp_queue_directory) thread.setQueuePath(self.smtp_queue_directory)
thread.start() thread.start()
queue_threads[path] = thread queue_threads[key] = thread
LOG.info('Thread for %s started' % path) LOG.info('Thread for %s started' % key)
security.declareProtected(view, 'queueLength') security.declareProtected(view, 'queueLength')
def queueLength(self): def queueLength(self):
...@@ -243,9 +249,9 @@ class MailBase(Acquisition.Implicit, OFS.SimpleItem.Item, RoleManager): ...@@ -243,9 +249,9 @@ class MailBase(Acquisition.Implicit, OFS.SimpleItem.Item, RoleManager):
security.declareProtected(view, 'queueThreadAlive') security.declareProtected(view, 'queueThreadAlive')
def queueThreadAlive(self): def queueThreadAlive(self):
""" return True/False is queue thread is working """ """ return True/False is queue thread is working
"""
th = queue_threads.get(self.absolute_url(1)) th = queue_threads.get(self._getThreadKey())
if th: if th:
return th.isAlive() return th.isAlive()
return False return False
......
...@@ -208,6 +208,14 @@ This is the message body.""" ...@@ -208,6 +208,14 @@ This is the message body."""
self.assertEqual(mailhost.sent, outmsg) self.assertEqual(mailhost.sent, outmsg)
self.assertEqual(mailhost.immediate, True) self.assertEqual(mailhost.immediate, True)
def test__getThreadKey_uses_fspath(self):
mh1 = self._makeOne('mh1')
mh1.smtp_queue_directory = '/abc'
mh1.absolute_url = lambda self: 'http://example.com/mh1'
mh2 = self._makeOne('mh2')
mh2.smtp_queue_directory = '/abc'
mh2.absolute_url = lambda self: 'http://example.com/mh2'
self.assertEqual(mh1._getThreadKey(), mh2._getThreadKey())
def test_suite(): def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment