Commit 547e009b authored by R David Murray's avatar R David Murray

#13700: Make imap.authenticate with authobject work.

This fixes a bytes/string confusion in the API which prevented
custom authobjects from working at all.

Original patch by Erno Tukia.
parent e570cc71
...@@ -176,9 +176,10 @@ An :class:`IMAP4` instance has the following methods: ...@@ -176,9 +176,10 @@ An :class:`IMAP4` instance has the following methods:
data = authobject(response) data = authobject(response)
It will be called to process server continuation responses. It should return It will be called to process server continuation responses; the *response*
``data`` that will be encoded and sent to server. It should return ``None`` if argument it is passed will be ``bytes``. It should return ``bytes`` *data*
the client abort response ``*`` should be sent instead. that will be base64 encoded and sent to the server. It should return
``None`` if the client abort response ``*`` should be sent instead.
.. method:: IMAP4.check() .. method:: IMAP4.check()
......
...@@ -360,10 +360,10 @@ class IMAP4: ...@@ -360,10 +360,10 @@ class IMAP4:
data = authobject(response) data = authobject(response)
It will be called to process server continuation responses. It will be called to process server continuation responses; the
It should return data that will be encoded and sent to server. response argument it is passed will be a bytes. It should return bytes
It should return None if the client abort response '*' should data that will be base64 encoded and sent to the server. It should
be sent instead. return None if the client abort response '*' should be sent instead.
""" """
mech = mechanism.upper() mech = mechanism.upper()
# XXX: shouldn't this code be removed, not commented out? # XXX: shouldn't this code be removed, not commented out?
...@@ -546,7 +546,9 @@ class IMAP4: ...@@ -546,7 +546,9 @@ class IMAP4:
def _CRAM_MD5_AUTH(self, challenge): def _CRAM_MD5_AUTH(self, challenge):
""" Authobject to use with CRAM-MD5 authentication. """ """ Authobject to use with CRAM-MD5 authentication. """
import hmac import hmac
return self.user + " " + hmac.HMAC(self.password, challenge).hexdigest() pwd = (self.password.encode('ASCII') if isinstance(self.password, str)
else self.password)
return self.user + " " + hmac.HMAC(pwd, challenge).hexdigest()
def logout(self): def logout(self):
...@@ -1288,14 +1290,16 @@ class _Authenticator: ...@@ -1288,14 +1290,16 @@ class _Authenticator:
# so when it gets to the end of the 8-bit input # so when it gets to the end of the 8-bit input
# there's no partial 6-bit output. # there's no partial 6-bit output.
# #
oup = '' oup = b''
if isinstance(inp, str):
inp = inp.encode('ASCII')
while inp: while inp:
if len(inp) > 48: if len(inp) > 48:
t = inp[:48] t = inp[:48]
inp = inp[48:] inp = inp[48:]
else: else:
t = inp t = inp
inp = '' inp = b''
e = binascii.b2a_base64(t) e = binascii.b2a_base64(t)
if e: if e:
oup = oup + e[:-1] oup = oup + e[:-1]
...@@ -1303,7 +1307,7 @@ class _Authenticator: ...@@ -1303,7 +1307,7 @@ class _Authenticator:
def decode(self, inp): def decode(self, inp):
if not inp: if not inp:
return '' return b''
return binascii.a2b_base64(inp) return binascii.a2b_base64(inp)
......
...@@ -78,14 +78,25 @@ else: ...@@ -78,14 +78,25 @@ else:
class SimpleIMAPHandler(socketserver.StreamRequestHandler): class SimpleIMAPHandler(socketserver.StreamRequestHandler):
timeout = 1 timeout = 1
continuation = None
capabilities = ''
def _send(self, message): def _send(self, message):
if verbose: print("SENT: %r" % message.strip()) if verbose: print("SENT: %r" % message.strip())
self.wfile.write(message) self.wfile.write(message)
def _send_line(self, message):
self._send(message + b'\r\n')
def _send_textline(self, message):
self._send_line(message.encode('ASCII'))
def _send_tagged(self, tag, code, message):
self._send_textline(' '.join((tag, code, message)))
def handle(self): def handle(self):
# Send a welcome message. # Send a welcome message.
self._send(b'* OK IMAP4rev1\r\n') self._send_textline('* OK IMAP4rev1')
while 1: while 1:
# Gather up input until we receive a line terminator or we timeout. # Gather up input until we receive a line terminator or we timeout.
# Accumulate read(1) because it's simpler to handle the differences # Accumulate read(1) because it's simpler to handle the differences
...@@ -105,19 +116,33 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler): ...@@ -105,19 +116,33 @@ class SimpleIMAPHandler(socketserver.StreamRequestHandler):
break break
if verbose: print('GOT: %r' % line.strip()) if verbose: print('GOT: %r' % line.strip())
splitline = line.split() if self.continuation:
tag = splitline[0].decode('ASCII') try:
cmd = splitline[1].decode('ASCII') self.continuation.send(line)
except StopIteration:
self.continuation = None
continue
splitline = line.decode('ASCII').split()
tag = splitline[0]
cmd = splitline[1]
args = splitline[2:] args = splitline[2:]
if hasattr(self, 'cmd_'+cmd): if hasattr(self, 'cmd_'+cmd):
getattr(self, 'cmd_'+cmd)(tag, args) continuation = getattr(self, 'cmd_'+cmd)(tag, args)
if continuation:
self.continuation = continuation
next(continuation)
else: else:
self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCII')) self._send_tagged(tag, 'BAD', cmd + ' unknown')
def cmd_CAPABILITY(self, tag, args): def cmd_CAPABILITY(self, tag, args):
self._send(b'* CAPABILITY IMAP4rev1\r\n') caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4rev1'
self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII')) self._send_textline('* CAPABILITY ' + caps)
self._send_tagged(tag, 'OK', 'CAPABILITY completed')
def cmd_LOGOUT(self, tag, args):
self._send_textline('* BYE IMAP4ref1 Server logging out')
self._send_tagged(tag, 'OK', 'LOGOUT completed')
class BaseThreadedNetworkedTests(unittest.TestCase): class BaseThreadedNetworkedTests(unittest.TestCase):
...@@ -167,6 +192,16 @@ class BaseThreadedNetworkedTests(unittest.TestCase): ...@@ -167,6 +192,16 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
finally: finally:
self.reap_server(server, thread) self.reap_server(server, thread)
@contextmanager
def reaped_pair(self, hdlr):
server, thread = self.make_server((support.HOST, 0), hdlr)
client = self.imap_class(*server.server_address)
try:
yield server, client
finally:
client.logout()
self.reap_server(server, thread)
@reap_threads @reap_threads
def test_connect(self): def test_connect(self):
with self.reaped_server(SimpleIMAPHandler) as server: with self.reaped_server(SimpleIMAPHandler) as server:
...@@ -192,12 +227,86 @@ class BaseThreadedNetworkedTests(unittest.TestCase): ...@@ -192,12 +227,86 @@ class BaseThreadedNetworkedTests(unittest.TestCase):
def cmd_CAPABILITY(self, tag, args): def cmd_CAPABILITY(self, tag, args):
self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII')) self._send_tagged(tag, 'OK', 'CAPABILITY completed')
with self.reaped_server(BadNewlineHandler) as server: with self.reaped_server(BadNewlineHandler) as server:
self.assertRaises(imaplib.IMAP4.abort, self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address) self.imap_class, *server.server_address)
@reap_threads
def test_bad_auth_name(self):
class MyServer(SimpleIMAPHandler):
def cmd_AUTHENTICATE(self, tag, args):
self._send_tagged(tag, 'NO', 'unrecognized authentication '
'type {}'.format(args[0]))
with self.reaped_pair(MyServer) as (server, client):
with self.assertRaises(imaplib.IMAP4.error):
client.authenticate('METHOD', lambda: 1)
@reap_threads
def test_invalid_authentication(self):
class MyServer(SimpleIMAPHandler):
def cmd_AUTHENTICATE(self, tag, args):
self._send_textline('+')
self.response = yield
self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
with self.reaped_pair(MyServer) as (server, client):
with self.assertRaises(imaplib.IMAP4.error):
code, data = client.authenticate('MYAUTH', lambda x: b'fake')
@reap_threads
def test_valid_authentication(self):
class MyServer(SimpleIMAPHandler):
def cmd_AUTHENTICATE(self, tag, args):
self._send_textline('+')
self.server.response = yield
self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
with self.reaped_pair(MyServer) as (server, client):
code, data = client.authenticate('MYAUTH', lambda x: b'fake')
self.assertEqual(code, 'OK')
self.assertEqual(server.response,
b'ZmFrZQ==\r\n') #b64 encoded 'fake'
with self.reaped_pair(MyServer) as (server, client):
code, data = client.authenticate('MYAUTH', lambda x: 'fake')
self.assertEqual(code, 'OK')
self.assertEqual(server.response,
b'ZmFrZQ==\r\n') #b64 encoded 'fake'
@reap_threads
def test_login_cram_md5(self):
class AuthHandler(SimpleIMAPHandler):
capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
def cmd_AUTHENTICATE(self, tag, args):
self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
'VzdG9uLm1jaS5uZXQ=')
r = yield
if r == b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n':
self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
else:
self._send_tagged(tag, 'NO', 'No access')
with self.reaped_pair(AuthHandler) as (server, client):
self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
self.assertEqual(ret, "OK")
with self.reaped_pair(AuthHandler) as (server, client):
self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
self.assertEqual(ret, "OK")
class ThreadedNetworkedTests(BaseThreadedNetworkedTests): class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
......
...@@ -227,6 +227,9 @@ Core and Builtins ...@@ -227,6 +227,9 @@ Core and Builtins
Library Library
------- -------
- Issue #13700: Fix byte/string handling in imaplib authentication when an
authobject is specified.
- Issue #13153: Tkinter functions now raise TclError instead of ValueError when - Issue #13153: Tkinter functions now raise TclError instead of ValueError when
a string argument contains non-BMP character. a string argument contains non-BMP character.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment