test_smtplib.py 27.6 KB
Newer Older
1
import asyncore
2
import email.mime.text
3
import email.utils
4
import socket
5
import smtpd
6
import smtplib
7
import io
8
import re
9
import sys
10
import time
11
import select
12

13
import unittest
14
from test import support, mock_socket
15

16 17 18 19 20
try:
    import threading
except ImportError:
    threading = None

21
HOST = support.HOST
Christian Heimes's avatar
Christian Heimes committed
22

23 24 25 26 27 28 29 30
if sys.platform == 'darwin':
    # select.poll returns a select.POLLHUP at the end of the tests
    # on darwin, so just ignore it
    def handle_expt(self):
        pass
    smtpd.SMTPChannel.handle_expt = handle_expt


Christian Heimes's avatar
Christian Heimes committed
31
def server(evt, buf, serv):
Christian Heimes's avatar
Christian Heimes committed
32 33
    serv.listen(5)
    evt.set()
34 35 36 37 38
    try:
        conn, addr = serv.accept()
    except socket.timeout:
        pass
    else:
39 40 41 42 43 44 45 46 47
        n = 500
        while buf and n > 0:
            r, w, e = select.select([], [conn], [])
            if w:
                sent = conn.send(buf)
                buf = buf[sent:]

            n -= 1

48 49 50 51 52
        conn.close()
    finally:
        serv.close()
        evt.set()

53
class GeneralTests(unittest.TestCase):
54 55

    def setUp(self):
56 57
        smtplib.socket = mock_socket
        self.port = 25
58 59

    def tearDown(self):
60
        smtplib.socket = socket
61

62 63 64 65 66 67 68
    # This method is no longer used but is retained for backward compatibility,
    # so test to make sure it still works.
    def testQuoteData(self):
        teststr  = "abc\n.jkl\rfoo\r\n..blue"
        expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
        self.assertEqual(expected, smtplib.quotedata(teststr))

69
    def testBasic1(self):
70
        mock_socket.reply_with(b"220 Hola mundo")
71
        # connects
Christian Heimes's avatar
Christian Heimes committed
72
        smtp = smtplib.SMTP(HOST, self.port)
Georg Brandl's avatar
Georg Brandl committed
73
        smtp.close()
74 75

    def testBasic2(self):
76
        mock_socket.reply_with(b"220 Hola mundo")
77
        # connects, include port in host name
Christian Heimes's avatar
Christian Heimes committed
78
        smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
Georg Brandl's avatar
Georg Brandl committed
79
        smtp.close()
80 81

    def testLocalHostName(self):
82
        mock_socket.reply_with(b"220 Hola mundo")
83
        # check that supplied local_hostname is used
Christian Heimes's avatar
Christian Heimes committed
84
        smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
85
        self.assertEqual(smtp.local_hostname, "testhost")
Georg Brandl's avatar
Georg Brandl committed
86
        smtp.close()
87 88

    def testTimeoutDefault(self):
89
        mock_socket.reply_with(b"220 Hola mundo")
90 91 92
        self.assertTrue(mock_socket.getdefaulttimeout() is None)
        mock_socket.setdefaulttimeout(30)
        self.assertEqual(mock_socket.getdefaulttimeout(), 30)
Georg Brandl's avatar
Georg Brandl committed
93 94 95
        try:
            smtp = smtplib.SMTP(HOST, self.port)
        finally:
96
            mock_socket.setdefaulttimeout(None)
97
        self.assertEqual(smtp.sock.gettimeout(), 30)
Georg Brandl's avatar
Georg Brandl committed
98
        smtp.close()
99 100

    def testTimeoutNone(self):
101
        mock_socket.reply_with(b"220 Hola mundo")
Georg Brandl's avatar
Georg Brandl committed
102
        self.assertTrue(socket.getdefaulttimeout() is None)
103 104
        socket.setdefaulttimeout(30)
        try:
Christian Heimes's avatar
Christian Heimes committed
105
            smtp = smtplib.SMTP(HOST, self.port, timeout=None)
106
        finally:
Georg Brandl's avatar
Georg Brandl committed
107 108 109 110 111
            socket.setdefaulttimeout(None)
        self.assertTrue(smtp.sock.gettimeout() is None)
        smtp.close()

    def testTimeoutValue(self):
112
        mock_socket.reply_with(b"220 Hola mundo")
Georg Brandl's avatar
Georg Brandl committed
113
        smtp = smtplib.SMTP(HOST, self.port, timeout=30)
114
        self.assertEqual(smtp.sock.gettimeout(), 30)
Georg Brandl's avatar
Georg Brandl committed
115
        smtp.close()
116 117


118
# Test server thread using the specified SMTP server class
Christian Heimes's avatar
Christian Heimes committed
119
def debugging_server(serv, serv_evt, client_evt):
Christian Heimes's avatar
Christian Heimes committed
120
    serv_evt.set()
121 122 123 124 125 126 127 128 129 130 131 132 133

    try:
        if hasattr(select, 'poll'):
            poll_fun = asyncore.poll2
        else:
            poll_fun = asyncore.poll

        n = 1000
        while asyncore.socket_map and n > 0:
            poll_fun(0.01, asyncore.socket_map)

            # when the client conversation is finished, it will
            # set client_evt, and it's then ok to kill the server
134
            if client_evt.is_set():
135 136 137 138 139 140 141 142
                serv.close()
                break

            n -= 1

    except socket.timeout:
        pass
    finally:
143
        if not client_evt.is_set():
Christian Heimes's avatar
Christian Heimes committed
144 145 146
            # allow some time for the client to read the result
            time.sleep(0.5)
            serv.close()
147 148 149 150 151 152
        asyncore.close_all()
        serv_evt.set()

MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
MSG_END = '------------ END MESSAGE ------------\n'

153 154 155
# NOTE: Some SMTP objects in the tests below are created with a non-default
# local_hostname argument to the constructor, since (on some systems) the FQDN
# lookup caused by the default local_hostname sometimes takes so long that the
156
# test server times out, causing the test to fail.
157 158

# Test behavior of smtpd.DebuggingServer
159 160
@unittest.skipUnless(threading, 'Threading required for this test.')
class DebuggingServerTests(unittest.TestCase):
161

162 163
    maxDiff = None

164
    def setUp(self):
165 166
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
167 168 169 170 171
        # temporarily replace sys.stdout to capture DebuggingServer output
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output

172
        self._threads = support.threading_setup()
173 174
        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
175 176 177
        # Capture SMTPChannel debug output
        self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
        smtpd.DEBUGSTREAM = io.StringIO()
178 179 180 181
        # Pick a random unused port by passing 0 for the port number
        self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
Christian Heimes's avatar
Christian Heimes committed
182
        serv_args = (self.serv, self.serv_evt, self.client_evt)
183 184
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()
185 186

        # wait until server thread has assigned a port number
Christian Heimes's avatar
Christian Heimes committed
187 188
        self.serv_evt.wait()
        self.serv_evt.clear()
189 190

    def tearDown(self):
191
        socket.getfqdn = self.real_getfqdn
192 193 194 195
        # indicate that the client is finished
        self.client_evt.set()
        # wait for the server thread to terminate
        self.serv_evt.wait()
196 197
        self.thread.join()
        support.threading_cleanup(*self._threads)
198 199
        # restore sys.stdout
        sys.stdout = self.old_stdout
200 201 202
        # restore DEBUGSTREAM
        smtpd.DEBUGSTREAM.close()
        smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
203 204 205

    def testBasic(self):
        # connect
Christian Heimes's avatar
Christian Heimes committed
206
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
207 208
        smtp.quit()

209
    def testNOOP(self):
Christian Heimes's avatar
Christian Heimes committed
210
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
211 212 213 214 215
        expected = (250, b'Ok')
        self.assertEqual(smtp.noop(), expected)
        smtp.quit()

    def testRSET(self):
Christian Heimes's avatar
Christian Heimes committed
216
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
217 218 219 220 221 222
        expected = (250, b'Ok')
        self.assertEqual(smtp.rset(), expected)
        smtp.quit()

    def testNotImplemented(self):
        # EHLO isn't implemented in DebuggingServer
Christian Heimes's avatar
Christian Heimes committed
223
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
224 225 226 227
        expected = (502, b'Error: command "EHLO" not implemented')
        self.assertEqual(smtp.ehlo(), expected)
        smtp.quit()

228 229
    def testVRFY(self):
        # VRFY isn't implemented in DebuggingServer
Christian Heimes's avatar
Christian Heimes committed
230
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
231 232 233 234 235 236 237 238
        expected = (502, b'Error: command "VRFY" not implemented')
        self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
        self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
        smtp.quit()

    def testSecondHELO(self):
        # check that a second HELO returns a message that it's a duplicate
        # (this behavior is specific to smtpd.SMTPChannel)
Christian Heimes's avatar
Christian Heimes committed
239
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
240 241 242 243 244
        smtp.helo()
        expected = (503, b'Duplicate HELO/EHLO')
        self.assertEqual(smtp.helo(), expected)
        smtp.quit()

245
    def testHELP(self):
Christian Heimes's avatar
Christian Heimes committed
246
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
247 248 249 250 251 252
        self.assertEqual(smtp.help(), b'Error: command "HELP" not implemented')
        smtp.quit()

    def testSend(self):
        # connect and send mail
        m = 'A test message'
Christian Heimes's avatar
Christian Heimes committed
253
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
254
        smtp.sendmail('John', 'Sally', m)
255 256 257 258
        # XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
        # in asyncore.  This sleep might help, but should really be fixed
        # properly by using an Event variable.
        time.sleep(0.01)
259 260 261 262 263 264 265 266
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

267 268 269 270 271 272 273 274 275 276 277 278 279 280
    def testSendBinary(self):
        m = b'A test message'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.sendmail('John', 'Sally', m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
    def testSendNeedingDotQuote(self):
        # Issue 12283
        m = '.A test\n.mes.sage.'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.sendmail('John', 'Sally', m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

296 297 298 299 300 301 302 303 304 305 306 307
    def testSendMessage(self):
        m = email.mime.text.MIMEText('A test message')
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m, from_addr='John', to_addrs='Sally')
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
308
        m['X-Peer'] = socket.gethostbyname('localhost')
309 310 311 312 313 314 315 316 317 318 319 320 321 322
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)

    def testSendMessageWithAddresses(self):
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John'
        m['CC'] = 'Sally, Fred'
        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()
323 324 325
        # make sure the Bcc header is still in the message.
        self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
                                    '<warped@silly.walks.com>')
326 327 328 329 330

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
331
        m['X-Peer'] = socket.gethostbyname('localhost')
332
        # The Bcc header should not be transmitted.
333 334 335 336 337
        del m['Bcc']
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
338
        self.assertRegex(debugout, sender)
339 340 341 342
        for addr in ('John', 'Sally', 'Fred', 'root@localhost',
                     'warped@silly.walks.com'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
343
            self.assertRegex(debugout, to_addr)
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359

    def testSendMessageWithSomeAddresses(self):
        # Make sure nothing breaks if not all of the three 'to' headers exist
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John, Dinsdale'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
360
        m['X-Peer'] = socket.gethostbyname('localhost')
361 362 363 364
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
365
        self.assertRegex(debugout, sender)
366 367 368
        for addr in ('John', 'Dinsdale'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
369
            self.assertRegex(debugout, to_addr)
370

371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
    def testSendMessageWithSpecifiedAddresses(self):
        # Make sure addresses specified in call override those in message.
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John, Dinsdale'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example.net')
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
        m['X-Peer'] = socket.gethostbyname('localhost')
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
        self.assertRegex(debugout, sender)
        for addr in ('John', 'Dinsdale'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
            self.assertNotRegex(debugout, to_addr)
        recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
        self.assertRegex(debugout, recip)

    def testSendMessageWithMultipleFrom(self):
        # Sender overrides To
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'Bernard, Bianca'
        m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
        m['To'] = 'John, Dinsdale'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # Add the X-Peer header that DebuggingServer adds
        m['X-Peer'] = socket.gethostbyname('localhost')
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re.MULTILINE)
        self.assertRegex(debugout, sender)
        for addr in ('John', 'Dinsdale'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
            self.assertRegex(debugout, to_addr)

    def testSendMessageResent(self):
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John'
        m['CC'] = 'Sally, Fred'
        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
        m['Resent-From'] = 'holy@grail.net'
        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
        m['Resent-Bcc'] = 'doe@losthope.net'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        smtp.send_message(m)
        # XXX (see comment in testSend)
        time.sleep(0.01)
        smtp.quit()

        self.client_evt.set()
        self.serv_evt.wait()
        self.output.flush()
        # The Resent-Bcc headers are deleted before serialization.
        del m['Bcc']
        del m['Resent-Bcc']
        # Add the X-Peer header that DebuggingServer adds
        m['X-Peer'] = socket.gethostbyname('localhost')
        mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
        self.assertEqual(self.output.getvalue(), mexpect)
        debugout = smtpd.DEBUGSTREAM.getvalue()
        sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
        self.assertRegex(debugout, sender)
        for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
            to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
                                 re.MULTILINE)
            self.assertRegex(debugout, to_addr)

    def testSendMessageMultipleResentRaises(self):
        m = email.mime.text.MIMEText('A test message')
        m['From'] = 'foo@bar.com'
        m['To'] = 'John'
        m['CC'] = 'Sally, Fred'
        m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.com>'
        m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
        m['Resent-From'] = 'holy@grail.net'
        m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
        m['Resent-Bcc'] = 'doe@losthope.net'
        m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
        m['Resent-To'] = 'holy@grail.net'
        m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
        with self.assertRaises(ValueError):
            smtp.send_message(m)
        smtp.close()
477

478
class NonConnectingTests(unittest.TestCase):
Christian Heimes's avatar
Christian Heimes committed
479

480 481 482 483 484 485
    def setUp(self):
        smtplib.socket = mock_socket

    def tearDown(self):
        smtplib.socket = socket

Christian Heimes's avatar
Christian Heimes committed
486 487 488 489 490 491 492 493 494 495 496 497
    def testNotConnected(self):
        # Test various operations on an unconnected SMTP object that
        # should raise exceptions (at present the attempt in SMTP.send
        # to reference the nonexistent 'sock' attribute of the SMTP object
        # causes an AttributeError)
        smtp = smtplib.SMTP()
        self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
        self.assertRaises(smtplib.SMTPServerDisconnected,
                          smtp.send, 'test msg')

    def testNonnumericPort(self):
        # check that non-numeric port raises socket.error
498
        self.assertRaises(mock_socket.error, smtplib.SMTP,
Christian Heimes's avatar
Christian Heimes committed
499
                          "localhost", "bogus")
500
        self.assertRaises(mock_socket.error, smtplib.SMTP,
Christian Heimes's avatar
Christian Heimes committed
501 502 503
                          "localhost:bogus")


504
# test response of client to a non-successful HELO message
505 506
@unittest.skipUnless(threading, 'Threading required for this test.')
class BadHELOServerTests(unittest.TestCase):
507 508

    def setUp(self):
509 510
        smtplib.socket = mock_socket
        mock_socket.reply_with(b"199 no hello for you!")
511 512 513
        self.old_stdout = sys.stdout
        self.output = io.StringIO()
        sys.stdout = self.output
514
        self.port = 25
515 516

    def tearDown(self):
517
        smtplib.socket = socket
518 519 520 521
        sys.stdout = self.old_stdout

    def testFailingHELO(self):
        self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
Christian Heimes's avatar
Christian Heimes committed
522
                            HOST, self.port, 'localhost', 3)
523

524 525 526 527 528 529

sim_users = {'Mr.A@somewhere.com':'John A',
             'Ms.B@somewhere.com':'Sally B',
             'Mrs.C@somewhereesle.com':'Ruth C',
            }

530
sim_auth = ('Mr.A@somewhere.com', 'somepassword')
531 532 533 534 535 536 537 538 539
sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
                          'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
sim_auth_credentials = {
    'login': 'TXIuQUBzb21ld2hlcmUuY29t',
    'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
    'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
                 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
    }
sim_auth_login_password = 'C29TZXBHC3N3B3JK'
540

541 542 543 544 545 546
sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
             'list-2':['Ms.B@somewhere.com',],
            }

# Simulated SMTP channel & server
class SimSMTPChannel(smtpd.SMTPChannel):
547

548 549 550
    def __init__(self, extra_features, *args, **kw):
        self._extrafeatures = ''.join(
            [ "250-{0}\r\n".format(x) for x in extra_features ])
551 552
        super(SimSMTPChannel, self).__init__(*args, **kw)

553
    def smtp_EHLO(self, arg):
554 555 556 557 558 559
        resp = ('250-testhost\r\n'
                '250-EXPN\r\n'
                '250-SIZE 20000000\r\n'
                '250-STARTTLS\r\n'
                '250-DELIVERBY\r\n')
        resp = resp + self._extrafeatures + '250 HELP'
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
        self.push(resp)

    def smtp_VRFY(self, arg):
        raw_addr = email.utils.parseaddr(arg)[1]
        quoted_addr = smtplib.quoteaddr(arg)
        if raw_addr in sim_users:
            self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr))
        else:
            self.push('550 No such user: %s' % arg)

    def smtp_EXPN(self, arg):
        list_name = email.utils.parseaddr(arg)[1].lower()
        if list_name in sim_lists:
            user_list = sim_lists[list_name]
            for n, user_email in enumerate(user_list):
                quoted_addr = smtplib.quoteaddr(user_email)
                if n < len(user_list) - 1:
                    self.push('250-%s %s' % (sim_users[user_email], quoted_addr))
                else:
                    self.push('250 %s %s' % (sim_users[user_email], quoted_addr))
        else:
            self.push('550 No access for you!')

583
    def smtp_AUTH(self, arg):
584 585 586
        if arg.strip().lower()=='cram-md5':
            self.push('334 {}'.format(sim_cram_md5_challenge))
            return
587
        mech, auth = arg.split()
588 589
        mech = mech.lower()
        if mech not in sim_auth_credentials:
590
            self.push('504 auth type unimplemented')
591 592 593 594 595 596 597
            return
        if mech == 'plain' and auth==sim_auth_credentials['plain']:
            self.push('235 plain auth ok')
        elif mech=='login' and auth==sim_auth_credentials['login']:
            self.push('334 Password:')
        else:
            self.push('550 No access for you!')
598

599 600 601
    def handle_error(self):
        raise

602 603

class SimSMTPServer(smtpd.SMTPServer):
604

605 606 607 608
    def __init__(self, *args, **kw):
        self._extra_features = []
        smtpd.SMTPServer.__init__(self, *args, **kw)

609
    def handle_accepted(self, conn, addr):
610 611
        self._SMTPchannel = SimSMTPChannel(self._extra_features,
                                           self, conn, addr)
612 613 614 615

    def process_message(self, peer, mailfrom, rcpttos, data):
        pass

616
    def add_feature(self, feature):
617
        self._extra_features.append(feature)
618

619 620 621
    def handle_error(self):
        raise

622 623 624

# Test various SMTP & ESMTP commands/behaviors that require a simulated server
# (i.e., something with more features than DebuggingServer)
625 626
@unittest.skipUnless(threading, 'Threading required for this test.')
class SMTPSimTests(unittest.TestCase):
627 628

    def setUp(self):
629 630
        self.real_getfqdn = socket.getfqdn
        socket.getfqdn = mock_socket.getfqdn
631
        self._threads = support.threading_setup()
632 633
        self.serv_evt = threading.Event()
        self.client_evt = threading.Event()
634 635 636 637
        # Pick a random unused port by passing 0 for the port number
        self.serv = SimSMTPServer((HOST, 0), ('nowhere', -1))
        # Keep a note of what port was assigned
        self.port = self.serv.socket.getsockname()[1]
Christian Heimes's avatar
Christian Heimes committed
638
        serv_args = (self.serv, self.serv_evt, self.client_evt)
639 640
        self.thread = threading.Thread(target=debugging_server, args=serv_args)
        self.thread.start()
641 642

        # wait until server thread has assigned a port number
Christian Heimes's avatar
Christian Heimes committed
643 644
        self.serv_evt.wait()
        self.serv_evt.clear()
645 646

    def tearDown(self):
647
        socket.getfqdn = self.real_getfqdn
648 649 650 651
        # indicate that the client is finished
        self.client_evt.set()
        # wait for the server thread to terminate
        self.serv_evt.wait()
652 653
        self.thread.join()
        support.threading_cleanup(*self._threads)
654 655 656

    def testBasic(self):
        # smoke test
Christian Heimes's avatar
Christian Heimes committed
657
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
658 659 660
        smtp.quit()

    def testEHLO(self):
Christian Heimes's avatar
Christian Heimes committed
661
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681

        # no features should be present before the EHLO
        self.assertEqual(smtp.esmtp_features, {})

        # features expected from the test server
        expected_features = {'expn':'',
                             'size': '20000000',
                             'starttls': '',
                             'deliverby': '',
                             'help': '',
                             }

        smtp.ehlo()
        self.assertEqual(smtp.esmtp_features, expected_features)
        for k in expected_features:
            self.assertTrue(smtp.has_extn(k))
        self.assertFalse(smtp.has_extn('unsupported-feature'))
        smtp.quit()

    def testVRFY(self):
Christian Heimes's avatar
Christian Heimes committed
682
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
683 684 685

        for email, name in sim_users.items():
            expected_known = (250, bytes('%s %s' %
686 687
                                         (name, smtplib.quoteaddr(email)),
                                         "ascii"))
688 689 690
            self.assertEqual(smtp.vrfy(email), expected_known)

        u = 'nobody@nowhere.com'
691 692
        expected_unknown = (550, ('No such user: %s'
                                       % smtplib.quoteaddr(u)).encode('ascii'))
693 694 695 696
        self.assertEqual(smtp.vrfy(u), expected_unknown)
        smtp.quit()

    def testEXPN(self):
Christian Heimes's avatar
Christian Heimes committed
697
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
698 699 700 701 702

        for listname, members in sim_lists.items():
            users = []
            for m in members:
                users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
703
            expected_known = (250, bytes('\n'.join(users), "ascii"))
704 705 706 707 708 709 710
            self.assertEqual(smtp.expn(listname), expected_known)

        u = 'PSU-Members-List'
        expected_unknown = (550, b'No access for you!')
        self.assertEqual(smtp.expn(u), expected_unknown)
        smtp.quit()

711 712
    def testAUTH_PLAIN(self):
        self.serv.add_feature("AUTH PLAIN")
713
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
714

715
        expected_auth_ok = (235, b'plain auth ok')
716
        self.assertEqual(smtp.login(sim_auth[0], sim_auth[1]), expected_auth_ok)
717
        smtp.close()
718

719 720 721 722 723 724 725 726 727 728
    # SimSMTPChannel doesn't fully support LOGIN or CRAM-MD5 auth because they
    # require a synchronous read to obtain the credentials...so instead smtpd
    # sees the credential sent by smtplib's login method as an unknown command,
    # which results in smtplib raising an auth error.  Fortunately the error
    # message contains the encoded credential, so we can partially check that it
    # was generated correctly (partially, because the 'word' is uppercased in
    # the error message).

    def testAUTH_LOGIN(self):
        self.serv.add_feature("AUTH LOGIN")
729
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
730 731
        try: smtp.login(sim_auth[0], sim_auth[1])
        except smtplib.SMTPAuthenticationError as err:
732
            self.assertIn(sim_auth_login_password, str(err))
733
        smtp.close()
734 735 736

    def testAUTH_CRAM_MD5(self):
        self.serv.add_feature("AUTH CRAM-MD5")
737
        smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
738 739 740

        try: smtp.login(sim_auth[0], sim_auth[1])
        except smtplib.SMTPAuthenticationError as err:
741
            self.assertIn(sim_auth_credentials['cram-md5'], str(err))
742
        smtp.close()
743 744 745 746

    #TODO: add tests for correct AUTH method fallback now that the
    #test infrastructure can support it.

747

748
def test_main(verbose=None):
749
    support.run_unittest(GeneralTests, DebuggingServerTests,
Christian Heimes's avatar
Christian Heimes committed
750
                              NonConnectingTests,
751
                              BadHELOServerTests, SMTPSimTests)
752 753 754

if __name__ == '__main__':
    test_main()