Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
580774e9
Commit
580774e9
authored
Apr 09, 1998
by
Guido van Rossum
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Piers Lauder's IMAP module.
parent
d68af6d1
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
792 additions
and
0 deletions
+792
-0
Lib/imaplib.py
Lib/imaplib.py
+792
-0
No files found.
Lib/imaplib.py
0 → 100644
View file @
580774e9
"""IMAP4 client.
Based on RFC 2060.
Author: Piers Lauder <piers@cs.su.oz.au> December 1997.
Public class: IMAP4
Public variable: Debug
Public functions: Internaldate2tuple
Int2AP
ParseFlags
Time2Internaldate
"""
import
os
,
re
,
socket
,
string
,
time
# Globals
CRLF
=
'
\
r
\
n
'
Debug
=
0
IMAP4_PORT
=
143
# Commands
Commands
=
{
# name valid states
'APPEND'
:
(
'AUTH'
,
'SELECTED'
),
'AUTHENTICATE'
:
(
'NONAUTH'
,),
'CAPABILITY'
:
(
'NONAUTH'
,
'AUTH'
,
'SELECTED'
,
'LOGOUT'
),
'CHECK'
:
(
'SELECTED'
,),
'CLOSE'
:
(
'SELECTED'
,),
'COPY'
:
(
'SELECTED'
,),
'CREATE'
:
(
'AUTH'
,
'SELECTED'
),
'DELETE'
:
(
'AUTH'
,
'SELECTED'
),
'EXAMINE'
:
(
'AUTH'
,
'SELECTED'
),
'EXPUNGE'
:
(
'SELECTED'
,),
'FETCH'
:
(
'SELECTED'
,),
'LIST'
:
(
'AUTH'
,
'SELECTED'
),
'LOGIN'
:
(
'NONAUTH'
,),
'LOGOUT'
:
(
'NONAUTH'
,
'AUTH'
,
'SELECTED'
,
'LOGOUT'
),
'LSUB'
:
(
'AUTH'
,
'SELECTED'
),
'NOOP'
:
(
'NONAUTH'
,
'AUTH'
,
'SELECTED'
,
'LOGOUT'
),
'RENAME'
:
(
'AUTH'
,
'SELECTED'
),
'SEARCH'
:
(
'SELECTED'
,),
'SELECT'
:
(
'AUTH'
,
'SELECTED'
),
'STATUS'
:
(
'AUTH'
,
'SELECTED'
),
'STORE'
:
(
'SELECTED'
,),
'SUBSCRIBE'
:
(
'AUTH'
,
'SELECTED'
),
'UID'
:
(
'SELECTED'
,),
'UNSUBSCRIBE'
:
(
'AUTH'
,
'SELECTED'
),
}
# Patterns to match server responses
Continuation
=
re
.
compile
(
r'\
+ (?P<d
ata>.*)'
)
Flags
=
re
.
compile
(
r'.*FLAGS \
((?P<
flags>[^\
)]*)
\)'
)
InternalDate
=
re
.
compile
(
r'.*INTERNALDATE "'
r'(?P<day>[ 123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
r' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
r' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
r'"'
)
Literal
=
re
.
compile
(
r'(?P<data>.*) {(?P<size>\
d+)}$
')
Response_code = re.compile(r'
\
[(
?
P
<
type
>
[
A
-
Z
-
]
+
)(
(
?
P
<
data
>
[
^
\
]]
*
))
?
\
]
')
Untagged_response = re.compile(r'
\
*
(
?
P
<
type
>
[
A
-
Z
-
]
+
)
(
?
P
<
data
>
.
*
)
')
Untagged_status = re.compile(r'
\
*
(
?
P
<
data
>
\
d
+
)
(
?
P
<
type
>
[
A
-
Z
-
]
+
)(
(
?
P
<
data2
>
.
*
))
?
')
class IMAP4:
"""IMAP4 client class.
Instantiate with: IMAP4([host[, port]])
host - host'
s
name
(
default
:
localhost
);
port
-
port
number
(
default
:
standard
IMAP4
port
).
All
IMAP4rev1
commands
are
supported
by
methods
of
the
same
name
(
in
lower
-
case
).
Each
command
returns
a
tuple
:
(
type
,
[
data
,
...])
where
'type'
is
usually
'OK'
or
'NO'
,
and
'data'
is
either
the
text
from
the
tagged
response
,
or
untagged
results
from
command
.
Errors
raise
the
exception
class
<
instance
>
.
error
(
"<reason>"
).
IMAP4
server
errors
raise
<
instance
>
.
abort
(
"<reason>"
),
which
is
a
sub
-
class
of
'error'
.
"""
class error(Exception): pass # Logical errors - debug required
class abort(error): pass # Service errors - close and retry
COUNT = [0] # Count instantiations
def __init__(self, host = '', port = IMAP4_PORT):
self.host = host
self.port = port
self.debug = Debug
self.state = 'LOGOUT'
self.tagged_commands = {} # Tagged commands awaiting response
self.untagged_responses = {} # {typ: [data, ...], ...}
self.continuation_response = '' # Last continuation response
self.tagnum = 0
# Open socket to server.
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect(self.host, self.port)
self.file = self.sock.makefile('r')
# Create unique tag for this session,
# and compile tagged response matcher.
self.COUNT[0] = self.COUNT[0] + 1
self.tagpre = Int2AP((os.getpid()<<8)+self.COUNT[0])
self.tagre = re.compile(r'(?P<tag>'
+ self.tagpre
+ r'
\
d+) (?P<
t
ype>[A-Z]+) (?P<data>.*)')
# Get server welcome message,
# request and store CAPABILITY response.
if __debug__ and self.debug >= 1:
print '
\
t
new IMAP4 connection, tag=%s' % self.tagpre
self.welcome = self._get_response()
if self.untagged_responses.has_key('PREAUTH'):
self.state = 'AUTH'
elif self.untagged_responses.has_key('OK'):
self.state = 'NONAUTH'
# elif self.untagged_responses.has_key('BYE'):
else:
raise self.error(self.welcome)
cap = 'CAPABILITY'
self._simple_command(cap)
if not self.untagged_responses.has_key(cap):
raise self.error('no CAPABILITY response from server')
self.capabilities = tuple(string.split(self.untagged_responses[cap][-1]))
if not 'IMAP4REV1' in self.capabilities:
raise self.error('server not IMAP4REV1 compliant')
if __debug__ and self.debug >= 3:
print '
\
t
CAPABILITIES: %s' % `self.capabilities`
def __getattr__(self, attr):
"""
Allow
UPPERCASE
variants
of
all
following
IMAP4
commands
.
"""
if Commands.has_key(attr):
return eval("self.%s" % string.lower(attr))
raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
# Public methods
def append(self, mailbox, flags, date_time, message):
"""
Append
message
to
named
mailbox
.
(
typ
,
[
data
])
=
<
instance
>
.
append
(
mailbox
,
flags
,
date_time
,
message
)
"""
name = 'APPEND'
if flags:
flags = '(%s)' % flags
else:
flags = None
if date_time:
date_time = Time2Internaldate(date_time)
else:
date_time = None
tag = self._command(name, mailbox, flags, date_time, message)
return self._command_complete(name, tag)
def authenticate(self, func):
"""
Authenticate
command
-
requires
response
processing
.
UNIMPLEMENTED
"""
raise self.error('UNIMPLEMENTED')
def check(self):
"""
Checkpoint
mailbox
on
server
.
(
typ
,
[
data
])
=
<
instance
>
.
check
()
"""
return self._simple_command('CHECK')
def close(self):
"""
Close
currently
selected
mailbox
.
Deleted
messages
are
removed
from
writable
mailbox
.
This
is
the
recommended
command
before
'LOGOUT'
.
(
typ
,
[
data
])
=
<
instance
>
.
close
()
"""
try:
try: typ, dat = self._simple_command('CLOSE')
except EOFError: typ, dat = None, [None]
finally:
self.state = 'AUTH'
return typ, dat
def copy(self, message_set, new_mailbox):
"""
Copy
'message_set'
messages
onto
end
of
'new_mailbox'
.
(
typ
,
[
data
])
=
<
instance
>
.
copy
(
message_set
,
new_mailbox
)
"""
return self._simple_command('COPY', message_set, new_mailbox)
def create(self, mailbox):
"""
Create
new
mailbox
.
(
typ
,
[
data
])
=
<
instance
>
.
create
(
mailbox
)
"""
return self._simple_command('CREATE', mailbox)
def delete(self, mailbox):
"""
Delete
old
mailbox
.
(
typ
,
[
data
])
=
<
instance
>
.
delete
(
mailbox
)
"""
return self._simple_command('DELETE', mailbox)
def expunge(self):
"""
Permanently
remove
deleted
items
from
selected
mailbox
.
Generates
'EXPUNGE'
response
for
each
deleted
message
.
(
typ
,
[
data
])
=
<
instance
>
.
expunge
()
'data'
is
list
of
'EXPUNGE'
d
message
numbers
in
order
received
.
"""
name = 'EXPUNGE'
typ, dat = self._simple_command(name)
return self._untagged_response(typ, name)
def fetch(self, message_set, message_parts):
"""
Fetch
(
parts
of
)
messages
.
(
typ
,
[
data
,
...])
=
<
instance
>
.
fetch
(
message_set
,
message_parts
)
'data'
are
tuples
of
message
part
envelope
and
data
.
"""
name = 'FETCH'
typ, dat = self._simple_command(name, message_set, message_parts)
return self._untagged_response(typ, name)
def list(self, directory='""', pattern='*'):
"""
List
mailbox
names
in
directory
matching
pattern
.
(
typ
,
[
data
])
=
<
instance
>
.
list
(
directory
=
'""'
,
pattern
=
'*'
)
'data'
is
list
of
LIST
responses
.
"""
name = 'LIST'
typ, dat = self._simple_command(name, directory, pattern)
return self._untagged_response(typ, name)
def login(self, user, password):
"""
Identify
client
using
plaintext
password
.
(
typ
,
[
data
])
=
<
instance
>
.
list
(
user
,
password
)
"""
if not 'AUTH=LOGIN' in self.capabilities:
raise self.error("server doesn't allow LOGIN authorisation")
typ, dat = self._simple_command('LOGIN', user, password)
if typ != 'OK':
raise self.error(dat)
self.state = 'AUTH'
return typ, dat
def logout(self):
"""
Shutdown
connection
to
server
.
(
typ
,
[
data
])
=
<
instance
>
.
logout
()
Returns
server
'BYE'
response
.
"""
self.state = 'LOGOUT'
try: typ, dat = self._simple_command('LOGOUT')
except EOFError: typ, dat = None, [None]
self.file.close()
self.sock.close()
if self.untagged_responses.has_key('BYE'):
return 'BYE', self.untagged_responses['BYE']
return typ, dat
def lsub(self, directory='""', pattern='*'):
"""
List
'subscribed'
mailbox
names
in
directory
matching
pattern
.
(
typ
,
[
data
,
...])
=
<
instance
>
.
lsub
(
directory
=
'""'
,
pattern
=
'*'
)
'data'
are
tuples
of
message
part
envelope
and
data
.
"""
name = 'LSUB'
typ, dat = self._simple_command(name, directory, pattern)
return self._untagged_response(typ, name)
def recent(self):
"""
Prompt
server
for
an
update
.
(
typ
,
[
data
])
=
<
instance
>
.
recent
()
'data'
is
None
if
no
new
messages
,
else
value
of
RECENT
response
.
"""
name = 'RECENT'
typ, dat = self._untagged_response('OK', name)
if dat[-1]:
return typ, dat
typ, dat = self._simple_command('NOOP')
return self._untagged_response(typ, name)
def rename(self, oldmailbox, newmailbox):
"""
Rename
old
mailbox
name
to
new
.
(
typ
,
data
)
=
<
instance
>
.
rename
(
oldmailbox
,
newmailbox
)
"""
return self._simple_command('RENAME', oldmailbox, newmailbox)
def response(self, code):
"""
Return
data
for
response
'code'
if
received
,
or
None
.
(
code
,
[
data
])
=
<
instance
>
.
response
(
code
)
"""
return code, self.untagged_responses.get(code, [None])
def search(self, charset, criteria):
"""
Search
mailbox
for
matching
messages
.
(
typ
,
[
data
])
=
<
instance
>
.
search
(
charset
,
criteria
)
'data'
is
space
separated
list
of
matching
message
numbers
.
"""
name = 'SEARCH'
if charset:
charset = 'CHARSET ' + charset
typ, dat = self._simple_command(name, charset, criteria)
return self._untagged_response(typ, name)
def select(self, mailbox='INBOX', readonly=None):
"""
Select
a
mailbox
.
(
typ
,
[
data
])
=
<
instance
>
.
select
(
mailbox
=
'INBOX'
,
readonly
=
None
)
'data'
is
count
of
messages
in
mailbox
(
'EXISTS'
response
).
"""
# Mandated responses are ('FLAGS', 'EXISTS', 'RECENT', 'UIDVALIDITY')
# Remove immediately interesting responses
for r in ('EXISTS', 'READ-WRITE'):
if self.untagged_responses.has_key(r):
del self.untagged_responses[r]
if readonly:
name = 'EXAMINE'
else:
name = 'SELECT'
typ, dat = self._simple_command(name, mailbox)
if typ == 'OK':
self.state = 'SELECTED'
elif typ == 'NO':
self.state = 'AUTH'
if not readonly and not self.untagged_responses.has_key('READ-WRITE'):
raise self.error('%s is not writable' % mailbox)
return typ, self.untagged_responses.get('EXISTS', [None])
def status(self, mailbox, names):
"""
Request
named
status
conditions
for
mailbox
.
(
typ
,
[
data
])
=
<
instance
>
.
status
(
mailbox
,
names
)
"""
name = 'STATUS'
typ, dat = self._simple_command(name, mailbox, names)
return self._untagged_response(typ, name)
def store(self, message_set, command, flag_list):
"""
Alters
flag
dispositions
for
messages
in
mailbox
.
(
typ
,
[
data
])
=
<
instance
>
.
store
(
message_set
,
command
,
flag_list
)
"""
command = '%s %s' % (command, flag_list)
typ, dat = self._simple_command('STORE', message_set, command)
return self._untagged_response(typ, 'FETCH')
def subscribe(self, mailbox):
"""
Subscribe
to
new
mailbox
.
(
typ
,
[
data
])
=
<
instance
>
.
subscribe
(
mailbox
)
"""
return self._simple_command('SUBSCRIBE', mailbox)
def uid(self, command, args):
"""
Execute
"command args"
with
messages
identified
by
UID
,
rather
than
message
number
.
(
typ
,
[
data
])
=
<
instance
>
.
uid
(
command
,
args
)
Returns
response
appropriate
to
'command'
.
"""
name = 'UID'
typ, dat = self._simple_command('UID', command, args)
if command == 'SEARCH':
name = 'SEARCH'
else:
name = 'FETCH'
typ, dat2 = self._untagged_response(typ, name)
if dat2[-1]: dat = dat2
return typ, dat
def unsubscribe(self, mailbox):
"""
Unsubscribe
from
old
mailbox
.
(
typ
,
[
data
])
=
<
instance
>
.
unsubscribe
(
mailbox
)
"""
return self._simple_command('UNSUBSCRIBE', mailbox)
def xatom(self, name, arg1=None, arg2=None):
"""
Allow
simple
extension
commands
notified
by
server
in
CAPABILITY
response
.
(
typ
,
[
data
])
=
<
instance
>
.
xatom
(
name
,
arg1
=
None
,
arg2
=
None
)
"""
if name[0] != 'X' or not name in self.capabilities:
raise self.error('unknown extension command: %s' % name)
return self._simple_command(name, arg1, arg2)
# Private methods
def _append_untagged(self, typ, dat):
if self.untagged_responses.has_key(typ):
self.untagged_responses[typ].append(dat)
else:
self.untagged_responses[typ] = [dat]
if __debug__ and self.debug >= 5:
print '
\
t
untagged_responses[%s] += %.20s..' % (typ, `dat`)
def _command(self, name, dat1=None, dat2=None, dat3=None, literal=None):
if self.state not in Commands[name]:
raise self.error(
'command %s illegal in state %s' % (name, self.state))
tag = self._new_tag()
data = '%s %s' % (tag, name)
for d in (dat1, dat2, dat3):
if d is not None: data = '%s %s' % (data, d)
if literal is not None:
data = '%s {%s}' % (data, len(literal))
try:
self.sock.send('%s%s' % (data, CRLF))
except socket.error, val:
raise self.abort('socket error: %s' % val)
if __debug__ and self.debug >= 4:
print '
\
t
> %s' % data
if literal is None:
return tag
# Wait for continuation response
while self._get_response():
if self.tagged_commands[tag]: # BAD/NO?
return tag
# Send literal
if __debug__ and self.debug >= 4:
print '
\
t
write literal size %s' % len(literal)
try:
self.sock.send(literal)
self.sock.send(CRLF)
except socket.error, val:
raise self.abort('socket error: %s' % val)
return tag
def _command_complete(self, name, tag):
try:
typ, data = self._get_tagged_response(tag)
except self.abort, val:
raise self.abort('command: %s => %s' % (name, val))
except self.error, val:
raise self.error('command: %s => %s' % (name, val))
if self.untagged_responses.has_key('BYE') and name != 'LOGOUT':
raise self.abort(self.untagged_responses['BYE'][-1])
if typ == 'BAD':
raise self.error('%s command error: %s %s' % (name, typ, data))
return typ, data
def _get_response(self):
# Read response and store.
#
# Returns None for continuation responses,
# otherwise first response line received
# Protocol mandates all lines terminated by CRLF.
resp = self._get_line()[:-2]
# Command completion response?
if self._match(self.tagre, resp):
tag = self.mo.group('tag')
if not self.tagged_commands.has_key(tag):
raise self.abort('unexpected tagged response: %s' % resp)
typ = self.mo.group('type')
dat = self.mo.group('data')
self.tagged_commands[tag] = (typ, [dat])
else:
dat2 = None
# '*' (untagged) responses?
if not self._match(Untagged_response, resp):
if self._match(Untagged_status, resp):
dat2 = self.mo.group('data2')
if self.mo is None:
# Only other possibility is '+' (continuation) rsponse...
if self._match(Continuation, resp):
self.continuation_response = self.mo.group('data')
return None # NB: indicates continuation
raise self.abort('unexpected response: %s' % resp)
typ = self.mo.group('type')
dat = self.mo.group('data')
if dat2: dat = dat + ' ' + dat2
# Is there a literal to come?
while self._match(Literal, dat):
# Read literal direct from connection.
size = string.atoi(self.mo.group('size'))
if __debug__ and self.debug >= 4:
print '
\
t
read literal size %s' % size
data = self.file.read(size)
# Store response with literal as tuple
self._append_untagged(typ, (dat, data))
# Read trailer - possibly containing another literal
dat = self._get_line()[:-2]
self._append_untagged(typ, dat)
# Bracketed response information?
if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):
self._append_untagged(self.mo.group('type'), self.mo.group('data'))
return resp
def _get_tagged_response(self, tag):
while 1:
result = self.tagged_commands[tag]
if result is not None:
del self.tagged_commands[tag]
return result
self._get_response()
def _get_line(self):
line = self.file.readline()
if not line:
raise EOFError
# Protocol mandates all lines terminated by CRLF
if __debug__ and self.debug >= 4:
print '
\
t
< %s' % line[:-2]
return line
def _match(self, cre, s):
# Run compiled regular expression match method on 's'.
# Save result, return success.
self.mo = cre.match(s)
if __debug__ and self.mo is not None and self.debug >= 5:
print "
\
t
matched r'%s' => %s" % (cre.pattern, `self.mo.groups()`)
return self.mo is not None
def _new_tag(self):
tag = '%s%s' % (self.tagpre, self.tagnum)
self.tagnum = self.tagnum + 1
self.tagged_commands[tag] = None
return tag
def _simple_command(self, name, dat1=None, dat2=None):
return self._command_complete(name, self._command(name, dat1, dat2))
def _untagged_response(self, typ, name):
if not self.untagged_responses.has_key(name):
return typ, [None]
data = self.untagged_responses[name]
del self.untagged_responses[name]
return typ, data
Mon2num = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
def Internaldate2tuple(resp):
"""
Convert
IMAP4
INTERNALDATE
to
UT
.
Returns
Python
time
module
tuple
.
"""
mo = InternalDate.match(resp)
if not mo:
return None
mon = Mon2num[mo.group('mon')]
zonen = mo.group('zonen')
for name in ('day', 'year', 'hour', 'min', 'sec', 'zoneh', 'zonem'):
exec "%s = string.atoi(mo.group('%s'))" % (name, name)
# INTERNALDATE timezone must be subtracted to get UT
zone = (zoneh*60 + zonem)*60
if zonen == '-':
zone = -zone
tt = (year, mon, day, hour, min, sec, -1, -1, -1)
utc = time.mktime(tt)
# Following is necessary because the time module has no 'mkgmtime'.
# 'mktime' assumes arg in local timezone, so adds timezone/altzone.
lt = time.localtime(utc)
if time.daylight and lt[-1]:
zone = zone + time.altzone
else:
zone = zone + time.timezone
return time.localtime(utc - zone)
def Int2AP(num):
"""
Convert
integer
to
A
-
P
string
representation
.
"""
val = ''; AP = 'ABCDEFGHIJKLMNOP'
while num:
num, mod = divmod(num, 16)
val = AP[mod] + val
return val
def ParseFlags(resp):
"""
Convert
IMAP4
flags
response
to
python
tuple
.
"""
mo = Flags.match(resp)
if not mo:
return ()
return tuple(string.split(mo.group('flags')))
def Time2Internaldate(date_time):
"""
Convert
'date_time'
to
IMAP4
INTERNALDATE
representation
.
Return
string
in
form
:
'"DD-Mmm-YYYY HH:MM:SS +HHMM"'
"""
dttype = type(date_time)
if dttype is type(1):
tt = time.localtime(date_time)
elif dttype is type(()):
tt = date_time
elif dttype is type(""):
return date_time # Assume in correct format
else: raise ValueError
dt = time.strftime("%d-%b-%Y %H:%M:%S", tt)
if dt[0] == '0':
dt = ' ' + dt[1:]
if time.daylight and tt[-1]:
zone = -time.altzone
else:
zone = -time.timezone
return '"' + dt + " %+02d%02d" % divmod(zone/60, 60) + '"'
if __debug__ and __name__ == '__main__':
import getpass
USER = getpass.getuser()
PASSWD = getpass.getpass()
test_seq1 = (
('login', (USER, PASSWD)),
('create', ('/tmp/xxx',)),
('rename', ('/tmp/xxx', '/tmp/yyy')),
('CREATE', ('/tmp/yyz',)),
('append', ('/tmp/yyz', None, None, 'From: anon@x.y.z
\
n
\
n
data...')),
('select', ('/tmp/yyz',)),
('recent', ()),
('uid', ('SEARCH', 'ALL')),
('fetch', ('1', '(INTERNALDATE RFC822)')),
('store', ('1', 'FLAGS', '(
\
Dele
t
ed)')),
('expunge', ()),
('close', ()),
)
test_seq2 = (
('select', ()),
('response',('UIDVALIDITY',)),
('uid', ('SEARCH', 'ALL')),
('recent', ()),
('response', ('EXISTS',)),
('logout', ()),
)
def run(cmd, args):
typ, dat = apply(eval('M.%s' % cmd), args)
print ' %s %s
\
n
=> %s %s' % (cmd, args, typ, dat)
return dat
Debug = 4
M = IMAP4()
for cmd,args in test_seq1:
run(cmd, args)
for ml in M.list('/tmp/', 'yy%')[1]:
path = string.split(ml)[-1]
print '%s %s' % M.delete(path)
for cmd,args in test_seq2:
dat = run(cmd, args)
if (cmd,args) == ('uid', ('SEARCH', 'ALL')):
uid = string.split(dat[0])[-1]
run('uid', ('FETCH', '%s (FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822)' % uid))
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment