Commit 69ab9510 authored by Antoine Pitrou's avatar Antoine Pitrou

Issue #9360: Cleanup and improvements to the nntplib module. The API

now conforms to the philosophy of bytes and unicode separation in Python 3.
A test suite has also been added.
parent 926f0da5
......@@ -11,100 +11,99 @@
single: Network News Transfer Protocol
This module defines the class :class:`NNTP` which implements the client side of
the NNTP protocol. It can be used to implement a news reader or poster, or
automated news processors. For more information on NNTP (Network News Transfer
Protocol), see Internet :rfc:`977`.
the Network News Transfer Protocol. It can be used to implement a news reader
or poster, or automated news processors. It is compatible with :rfc:`3977`
as well as the older :rfc:`977` and :rfc:`2980`.
Here are two small examples of how it can be used. To list some statistics
about a newsgroup and print the subjects of the last 10 articles::
>>> s = NNTP('news.gmane.org')
>>> s = nntplib.NNTP('news.gmane.org')
>>> resp, count, first, last, name = s.group('gmane.comp.python.committers')
>>> print('Group', name, 'has', count, 'articles, range', first, 'to', last)
Group gmane.comp.python.committers has 1071 articles, range 1 to 1071
>>> resp, subs = s.xhdr('subject', first + '-' + last)
>>> for id, sub in subs[-10:]: print(id, sub)
Group gmane.comp.python.committers has 1096 articles, range 1 to 1096
>>> resp, overviews = s.over((last - 9, last))
>>> for id, over in overviews:
... print(id, nntplib.decode_header(over['subject']))
...
1062 Re: Mercurial Status?
1063 Re: [python-committers] (Windows) buildbots on 3.x
1064 Re: Mercurial Status?
1065 Re: Mercurial Status?
1066 Python 2.6.6 status
1067 Commit Privileges for Ask Solem
1068 Re: Commit Privileges for Ask Solem
1069 Re: Commit Privileges for Ask Solem
1070 Re: Commit Privileges for Ask Solem
1071 2.6.6 rc 2
1087 Re: Commit privileges for Łukasz Langa
1088 Re: 3.2 alpha 2 freeze
1089 Re: 3.2 alpha 2 freeze
1090 Re: Commit privileges for Łukasz Langa
1091 Re: Commit privileges for Łukasz Langa
1092 Updated ssh key
1093 Re: Updated ssh key
1094 Re: Updated ssh key
1095 Hello fellow committers!
1096 Re: Hello fellow committers!
>>> s.quit()
'205 Bye!'
To post an article from a file (this assumes that the article has valid
To post an article from a binary file (this assumes that the article has valid
headers, and that you have right to post on the particular newsgroup)::
>>> s = NNTP('news.gmane.org')
>>> f = open('/tmp/article')
>>> s = nntplib.NNTP('news.gmane.org')
>>> f = open('/tmp/article.txt', 'rb')
>>> s.post(f)
'240 Article posted successfully.'
>>> s.quit()
'205 Bye!'
The module itself defines the following items:
The module itself defines the following classes:
.. class:: NNTP(host[, port [, user[, password [, readermode][, usenetrc]]]])
.. class:: NNTP(host, port=119, user=None, password=None, readermode=None, usenetrc=True, [timeout])
Return a new instance of the :class:`NNTP` class, representing a connection
to the NNTP server running on host *host*, listening at port *port*. The
default *port* is 119. If the optional *user* and *password* are provided,
or if suitable credentials are present in :file:`/.netrc` and the optional
flag *usenetrc* is true (the default), the ``AUTHINFO USER`` and ``AUTHINFO
PASS`` commands are used to identify and authenticate the user to the server.
If the optional flag *readermode* is true, then a ``mode reader`` command is
sent before authentication is performed. Reader mode is sometimes necessary
if you are connecting to an NNTP server on the local machine and intend to
call reader-specific commands, such as ``group``. If you get unexpected
to the NNTP server running on host *host*, listening at port *port*.
An optional *timeout* can be specified for the socket connection.
If the optional *user* and *password* are provided, or if suitable
credentials are present in :file:`/.netrc` and the optional flag *usenetrc*
is true (the default), the ``AUTHINFO USER`` and ``AUTHINFO PASS`` commands
are used to identify and authenticate the user to the server. If the optional
flag *readermode* is true, then a ``mode reader`` command is sent before
authentication is performed. Reader mode is sometimes necessary if you are
connecting to an NNTP server on the local machine and intend to call
reader-specific commands, such as ``group``. If you get unexpected
:exc:`NNTPPermanentError`\ s, you might need to set *readermode*.
*readermode* defaults to ``None``. *usenetrc* defaults to ``True``.
.. exception:: NNTPError
Derived from the standard exception :exc:`Exception`, this is the base class for
all exceptions raised by the :mod:`nntplib` module.
Derived from the standard exception :exc:`Exception`, this is the base
class for all exceptions raised by the :mod:`nntplib` module. Instances
of this class have the following attribute:
.. attribute:: response
The response of the server if available, as a :class:`str` object.
.. exception:: NNTPReplyError
Exception raised when an unexpected reply is received from the server. For
backwards compatibility, the exception ``error_reply`` is equivalent to this
class.
Exception raised when an unexpected reply is received from the server.
.. exception:: NNTPTemporaryError
Exception raised when an error code in the range 400--499 is received. For
backwards compatibility, the exception ``error_temp`` is equivalent to this
class.
Exception raised when a response code in the range 400--499 is received.
.. exception:: NNTPPermanentError
Exception raised when an error code in the range 500--599 is received. For
backwards compatibility, the exception ``error_perm`` is equivalent to this
class.
Exception raised when a response code in the range 500--599 is received.
.. exception:: NNTPProtocolError
Exception raised when a reply is received from the server that does not begin
with a digit in the range 1--5. For backwards compatibility, the exception
``error_proto`` is equivalent to this class.
with a digit in the range 1--5.
.. exception:: NNTPDataError
Exception raised when there is some error in the response data. For backwards
compatibility, the exception ``error_data`` is equivalent to this class.
Exception raised when there is some error in the response data.
.. _nntp-objects:
......@@ -112,10 +111,29 @@ The module itself defines the following items:
NNTP Objects
------------
NNTP instances have the following methods. The *response* that is returned as
the first item in the return tuple of almost all methods is the server's
response: a string beginning with a three-digit code. If the server's response
indicates an error, the method raises one of the above exceptions.
:class:`NNTP` instances have the following methods. The *response* that is
returned as the first item in the return tuple of almost all methods is the
server's response: a string beginning with a three-digit code. If the server's
response indicates an error, the method raises one of the above exceptions.
.. note::
Many of the following methods take an optional keyword-only argument *file*.
When the *file* argument is supplied, it must be either a :term:`file object`
opened for binary writing, or the name of an on-disk file to be written to.
The method will then write any data returned by the server (except for the
response line and the terminating dot) to the file; any list of lines,
tuples or objects that the method normally returns will be empty.
.. versionchanged:: 3.2
Many of the following methods have been reworked and fixed, which makes
them incompatible with their 3.1 counterparts.
.. method:: NNTP.quit()
Send a ``QUIT`` command and close the connection. Once this method has been
called, no other methods of the NNTP object should be called.
.. method:: NNTP.getwelcome()
......@@ -125,62 +143,70 @@ indicates an error, the method raises one of the above exceptions.
that may be relevant to the user.)
.. method:: NNTP.set_debuglevel(level)
.. method:: NNTP.getcapabilities()
Set the instance's debugging level. This controls the amount of debugging
output printed. The default, ``0``, produces no debugging output. A value of
``1`` produces a moderate amount of debugging output, generally a single line
per request or response. A value of ``2`` or higher produces the maximum amount
of debugging output, logging each line sent and received on the connection
(including message text).
Return the :rfc:`3977` capabilities advertised by the server, as a
:class:`dict` instance mapping capability names to (possibly empty) lists
of values. On legacy servers which don't understand the ``CAPABILITIES``
command, an empty dictionary is returned instead.
>>> s = NNTP('news.gmane.org')
>>> 'POST' in s.getcapabilities()
True
.. versionadded:: 3.2
.. method:: NNTP.newgroups(date, time, [file])
.. method:: NNTP.newgroups(date, *, file=None)
Send a ``NEWGROUPS`` command. The *date* argument should be a string of the
form ``'yymmdd'`` indicating the date, and *time* should be a string of the form
``'hhmmss'`` indicating the time. Return a pair ``(response, groups)`` where
*groups* is a list of group names that are new since the given date and time. If
the *file* parameter is supplied, then the output of the ``NEWGROUPS`` command
is stored in a file. If *file* is a string, then the method will open a file
object with that name, write to it then close it. If *file* is a :term:`file
object`, then it will start calling :meth:`write` on it to store the lines of
the command output. If *file* is supplied, then the returned *list* is an empty list.
Send a ``NEWGROUPS`` command. The *date* argument should be a
:class:`datetime.date` or :class:`datetime.datetime` object.
Return a pair ``(response, groups)`` where *groups* is a list representing
the groups that are new since the given *date*. If *file* is supplied,
though, then *groups* will be empty.
>>> from datetime import date, timedelta
>>> resp, groups = s.newgroups(date.today() - timedelta(days=3))
>>> len(groups)
85
>>> groups[0]
GroupInfo(group='gmane.network.tor.devel', last='4', first='1', flag='m')
.. method:: NNTP.newnews(group, date, time, [file])
.. method:: NNTP.newnews(group, date, *, file=None)
Send a ``NEWNEWS`` command. Here, *group* is a group name or ``'*'``, and
*date* and *time* have the same meaning as for :meth:`newgroups`. Return a pair
``(response, articles)`` where *articles* is a list of message ids. If the
*file* parameter is supplied, then the output of the ``NEWNEWS`` command is
stored in a file. If *file* is a string, then the method will open a file
object with that name, write to it then close it. If *file* is a :term:`file
object`, then it will start calling :meth:`write` on it to store the lines of the
command output. If *file* is supplied, then the returned *list* is an empty list.
*date* has the same meaning as for :meth:`newgroups`. Return a pair
``(response, articles)`` where *articles* is a list of message ids.
This command is frequently disabled by NNTP server administrators.
.. method:: NNTP.list([file])
.. method:: NNTP.list(*, file=None)
Send a ``LIST`` command. Return a pair ``(response, list)`` where *list* is a
list of tuples. Each tuple has the form ``(group, last, first, flag)``, where
list of tuples representing all the groups available from this NNTP server.
Each tuple has the form ``(group, last, first, flag)``, where
*group* is a group name, *last* and *first* are the last and first article
numbers (as strings), and *flag* is ``'y'`` if posting is allowed, ``'n'`` if
not, and ``'m'`` if the newsgroup is moderated. (Note the ordering: *last*,
*first*.) If the *file* parameter is supplied, then the output of the ``LIST``
command is stored in a file. If *file* is a string, then the method will open
a file with that name, write to it then close it. If *file* is a :term:`file
object`, then it will start calling :meth:`write` on it to store the lines of
the command output. If *file* is supplied, then the returned *list* is an empty
list.
numbers, and *flag* is ``'y'`` if posting is allowed, ``'n'`` if not,
and ``'m'`` if the newsgroup is moderated. (Note the ordering: *last*, *first*.)
This command will often return very large results. It is best to cache the
results offline unless you really need to refresh them.
.. method:: NNTP.descriptions(grouppattern)
Send a ``LIST NEWSGROUPS`` command, where *grouppattern* is a wildmat string as
specified in RFC2980 (it's essentially the same as DOS or UNIX shell wildcard
strings). Return a pair ``(response, list)``, where *list* is a list of tuples
containing ``(name, title)``.
specified in :rfc:`3977` (it's essentially the same as DOS or UNIX shell wildcard
strings). Return a pair ``(response, descriptions)``, where *descriptions*
is a dictionary mapping group names to textual descriptions.
>>> resp, descs = s.descriptions('gmane.comp.python.*')
>>> len(descs)
295
>>> descs.popitem()
('gmane.comp.python.bio.general', 'BioPython discussion list (Moderated)')
.. method:: NNTP.description(group)
......@@ -195,30 +221,73 @@ indicates an error, the method raises one of the above exceptions.
.. method:: NNTP.group(name)
Send a ``GROUP`` command, where *name* is the group name. Return a tuple
``(response, count, first, last, name)`` where *count* is the (estimated) number
of articles in the group, *first* is the first article number in the group,
*last* is the last article number in the group, and *name* is the group name.
The numbers are returned as strings.
.. method:: NNTP.help([file])
Send a ``GROUP`` command, where *name* is the group name. The group is
selected as the current group, if it exists. Return a tuple
``(response, count, first, last, name)`` where *count* is the (estimated)
number of articles in the group, *first* is the first article number in
the group, *last* is the last article number in the group, and *name*
is the group name.
.. method:: NNTP.over(message_spec, *, file=None)
Send a ``OVER`` command, or a ``XOVER`` command on legacy servers.
*message_spec* can be either a string representing a message id, or
a ``(first, last)`` tuple of numbers indicating a range of articles in
the current group, or a ``(first, None)`` tuple indicating a range of
articles starting from *first* to the last article in the current group,
or :const:`None` to select the current article in the current group.
Return a pair ``(response, overviews)``. *overviews* is a list of
``(article_number, overview)`` tuples, one for each article selected
by *message_spec*. Each *overview* is a dictionary with the same number
of items, but this number depends on the server. These items are either
message headers (the key is then the lower-cased header name) or metadata
items (the key is then the metadata name prepended with ``":"``). The
following items are guaranteed to be present by the NNTP specification:
* the ``subject``, ``from``, ``date``, ``message-id`` and ``references``
headers
* the ``:bytes`` metadata: the number of bytes in the entire raw article
(including headers and body)
* the ``:lines`` metadata: the number of lines in the article body
It is advisable to use the :func:`decode_header` function on header
values when they may contain non-ASCII characters::
>>> _, _, first, last, _ = s.group('gmane.comp.python.devel')
>>> resp, overviews = s.over((last, last))
>>> art_num, over = overviews[0]
>>> art_num
117216
>>> list(over.keys())
['xref', 'from', ':lines', ':bytes', 'references', 'date', 'message-id', 'subject']
>>> over['from']
'=?UTF-8?B?Ik1hcnRpbiB2LiBMw7Z3aXMi?= <martin@v.loewis.de>'
>>> nntplib.decode_header(over['from'])
'"Martin v. Löwis" <martin@v.loewis.de>'
.. versionadded:: 3.2
.. method:: NNTP.help(*, file=None)
Send a ``HELP`` command. Return a pair ``(response, list)`` where *list* is a
list of help strings. If the *file* parameter is supplied, then the output of
the ``HELP`` command is stored in a file. If *file* is a string, then the
method will open a file with that name, write to it then close it. If *file*
is a :term:`file object`, then it will start calling :meth:`write` on it to store
the lines of the command output. If *file* is supplied, then the returned *list*
is an empty list.
list of help strings.
.. method:: NNTP.stat(id)
.. method:: NNTP.stat(message_spec=None)
Send a ``STAT`` command, where *id* is the message id (enclosed in ``'<'`` and
``'>'``) or an article number (as a string). Return a triple ``(response,
number, id)`` where *number* is the article number (as a string) and *id* is the
message id (enclosed in ``'<'`` and ``'>'``).
Send a ``STAT`` command, where *message_spec* is either a message id
(enclosed in ``'<'`` and ``'>'``) or an article number in the current group.
If *message_spec* is omitted or :const:`None`, the current article in the
current group is considered. Return a triple ``(response, number, id)``
where *number* is the article number and *id* is the message id.
>>> _, _, first, last, _ = s.group('gmane.comp.python.devel')
>>> resp, number, message_id = s.stat(first)
>>> number, message_id
(9099, '<20030112190404.GE29873@epoch.metaslash.com>')
.. method:: NNTP.next()
......@@ -231,28 +300,69 @@ indicates an error, the method raises one of the above exceptions.
Send a ``LAST`` command. Return as for :meth:`stat`.
.. method:: NNTP.head(id)
.. method:: NNTP.article(message_spec=None, *, file=None)
Send an ``ARTICLE`` command, where *message_spec* has the same meaning as
for :meth:`stat`. Return a tuple ``(response, info)`` where *info*
is a :class:`~collections.namedtuple` with three members *number*,
*message_id* and *lines* (in that order). *number* is the article number
in the group (or 0 if the information is not available), *message_id* the
message id as a string, and *lines* a list of lines (without terminating
newlines) comprising the raw message including headers and body.
>>> resp, info = s.article('<20030112190404.GE29873@epoch.metaslash.com>')
>>> info.number
0
>>> info.message_id
'<20030112190404.GE29873@epoch.metaslash.com>'
>>> len(info.lines)
65
>>> info.lines[0]
b'Path: main.gmane.org!not-for-mail'
>>> info.lines[1]
b'From: Neal Norwitz <neal@metaslash.com>'
>>> info.lines[-3:]
[b'There is a patch for 2.3 as well as 2.2.', b'', b'Neal']
.. method:: NNTP.head(message_spec=None, *, file=None)
Same as :meth:`article()`, but sends a ``HEAD`` command. The *lines*
returned (or written to *file*) will only contain the message headers, not
the body.
Send a ``HEAD`` command, where *id* has the same meaning as for :meth:`stat`.
Return a tuple ``(response, number, id, list)`` where the first three are the
same as for :meth:`stat`, and *list* is a list of the article's headers (an
uninterpreted list of lines, without trailing newlines).
.. method:: NNTP.body(message_spec=None, *, file=None)
.. method:: NNTP.body(id,[file])
Same as :meth:`article()`, but sends a ``BODY`` command. The *lines*
returned (or written to *file*) will only contain the message body, not the
headers.
Send a ``BODY`` command, where *id* has the same meaning as for :meth:`stat`.
If the *file* parameter is supplied, then the body is stored in a file. If
*file* is a string, then the method will open a file with that name, write
to it then close it. If *file* is a :term:`file object`, then it will start
calling :meth:`write` on it to store the lines of the body. Return as for
:meth:`head`. If *file* is supplied, then the returned *list* is an empty list.
.. method:: NNTP.post(data)
.. method:: NNTP.article(id)
Post an article using the ``POST`` command. The *data* argument is either
a :term:`file object` opened for binary reading, or any iterable of bytes
objects (representing raw lines of the article to be posted). It should
represent a well-formed news article, including the required headers. The
:meth:`post` method automatically escapes lines beginning with ``.`` and
appends the termination line.
Send an ``ARTICLE`` command, where *id* has the same meaning as for
:meth:`stat`. Return as for :meth:`head`.
If the method succeeds, the server's response is returned. If the server
refuses posting, a :class:`NNTPReplyError` is raised.
.. method:: NNTP.ihave(message_id, data)
Send an ``IHAVE`` command. *message_id* is the id of the message to send
to the server (enclosed in ``'<'`` and ``'>'``). The *data* parameter
and the return value are the same as for :meth:`post()`.
.. method:: NNTP.date()
Return a pair ``(response, date)``. *date* is a :class:`~datetime.datetime`
object containing the current date and time of the server.
.. method:: NNTP.slave()
......@@ -260,10 +370,23 @@ indicates an error, the method raises one of the above exceptions.
Send a ``SLAVE`` command. Return the server's *response*.
.. method:: NNTP.xhdr(header, string, [file])
.. method:: NNTP.set_debuglevel(level)
Set the instance's debugging level. This controls the amount of debugging
output printed. The default, ``0``, produces no debugging output. A value of
``1`` produces a moderate amount of debugging output, generally a single line
per request or response. A value of ``2`` or higher produces the maximum amount
of debugging output, logging each line sent and received on the connection
(including message text).
The following are optional NNTP extensions defined in :rfc:`2980`. Some of
them have been superseded by newer commands in :rfc:`3977`.
Send an ``XHDR`` command. This command is not defined in the RFC but is a
common extension. The *header* argument is a header keyword, e.g.
.. method:: NNTP.xhdr(header, string, *, file=None)
Send an ``XHDR`` command. The *header* argument is a header keyword, e.g.
``'subject'``. The *string* argument should have the form ``'first-last'``
where *first* and *last* are the first and last article numbers to search.
Return a pair ``(response, list)``, where *list* is a list of pairs ``(id,
......@@ -276,66 +399,55 @@ indicates an error, the method raises one of the above exceptions.
returned *list* is an empty list.
.. method:: NNTP.post(file)
Post an article using the ``POST`` command. The *file* argument is an open file
object which is read until EOF using its :meth:`readline` method. It should be
a well-formed news article, including the required headers. The :meth:`post`
method automatically escapes lines beginning with ``.``.
.. method:: NNTP.ihave(id, file)
Send an ``IHAVE`` command. *id* is a message id (enclosed in ``'<'`` and
``'>'``). If the response is not an error, treat *file* exactly as for the
:meth:`post` method.
.. method:: NNTP.xover(start, end, *, file=None)
.. method:: NNTP.date()
Send an ``XOVER`` command. *start* and *end* are article numbers
delimiting the range of articles to select. The return value is the
same of for :meth:`over()`. It is recommended to use :meth:`over()`
instead, since it will automatically use the newer ``OVER`` command
if available.
Return a triple ``(response, date, time)``, containing the current date and time
in a form suitable for the :meth:`newnews` and :meth:`newgroups` methods. This
is an optional NNTP extension, and may not be supported by all servers.
.. method:: NNTP.xpath(id)
.. method:: NNTP.xgtitle(name, [file])
Return a pair ``(resp, path)``, where *path* is the directory path to the
article with message ID *id*. Most of the time, this extension is not
enabled by NNTP server administrators.
Process an ``XGTITLE`` command, returning a pair ``(response, list)``, where
*list* is a list of tuples containing ``(name, title)``. If the *file* parameter
is supplied, then the output of the ``XGTITLE`` command is stored in a file.
If *file* is a string, then the method will open a file with that name, write
to it then close it. If *file* is a :term:`file object`, then it will start
calling :meth:`write` on it to store the lines of the command output. If *file*
is supplied, then the returned *list* is an empty list. This is an optional NNTP
extension, and may not be supported by all servers.
RFC2980 says "It is suggested that this extension be deprecated". Use
:meth:`descriptions` or :meth:`description` instead.
.. XXX deprecated:
.. method:: NNTP.xgtitle(name, *, file=None)
.. method:: NNTP.xover(start, end, [file])
Process an ``XGTITLE`` command, returning a pair ``(response, list)``, where
*list* is a list of tuples containing ``(name, title)``. If the *file* parameter
is supplied, then the output of the ``XGTITLE`` command is stored in a file.
If *file* is a string, then the method will open a file with that name, write
to it then close it. If *file* is a :term:`file object`, then it will start
calling :meth:`write` on it to store the lines of the command output. If *file*
is supplied, then the returned *list* is an empty list. This is an optional NNTP
extension, and may not be supported by all servers.
Return a pair ``(resp, list)``. *list* is a list of tuples, one for each
article in the range delimited by the *start* and *end* article numbers. Each
tuple is of the form ``(article number, subject, poster, date, id, references,
size, lines)``. If the *file* parameter is supplied, then the output of the
``XOVER`` command is stored in a file. If *file* is a string, then the method
will open a file with that name, write to it then close it. If *file* is a
:term:`file object`, then it will start calling :meth:`write` on it to store the
lines of the command output. If *file* is supplied, then the returned *list* is
an empty list. This is an optional NNTP extension, and may not be supported by
all servers.
RFC2980 says "It is suggested that this extension be deprecated". Use
:meth:`descriptions` or :meth:`description` instead.
.. method:: NNTP.xpath(id)
Utility functions
-----------------
Return a pair ``(resp, path)``, where *path* is the directory path to the
article with message ID *id*. This is an optional NNTP extension, and may not
be supported by all servers.
The module also defines the following utility function:
.. method:: NNTP.quit()
.. function:: decode_header(header_str)
Send a ``QUIT`` command and close the connection. Once this method has been
called, no other methods of the NNTP object should be called.
Decode a header value, un-escaping any escaped non-ASCII characters.
*header_str* must be a :class:`str` object. The unescaped value is
returned. Using this function is recommended to display some headers
in a human readable form::
>>> decode_header("Some subject")
'Some subject'
>>> decode_header("=?ISO-8859-15?Q?D=E9buter_en_Python?=")
'Débuter en Python'
>>> decode_header("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=")
'Re: problème de matrice'
"""An NNTP client class based on RFC 977: Network News Transfer Protocol.
"""An NNTP client class based on:
- RFC 977: Network News Transfer Protocol
- RFC 2980: Common NNTP Extensions
- RFC 3977: Network News Transfer Protocol (version 2)
Example:
......@@ -27,15 +30,53 @@ are strings, not numbers, since they are rarely used for calculations.
# RFC 977 by Brian Kantor and Phil Lapsley.
# xover, xgtitle, xpath, date methods by Kevan Heydon
# Incompatible changes from the 2.x nntplib:
# - all commands are encoded as UTF-8 data (using the "surrogateescape"
# error handler), except for raw message data (POST, IHAVE)
# - all responses are decoded as UTF-8 data (using the "surrogateescape"
# error handler), except for raw message data (ARTICLE, HEAD, BODY)
# - the `file` argument to various methods is keyword-only
#
# - NNTP.date() returns a datetime object
# - NNTP.newgroups() and NNTP.newnews() take a datetime (or date) object,
# rather than a pair of (date, time) strings.
# - NNTP.newgroups() and NNTP.list() return a list of GroupInfo named tuples
# - NNTP.descriptions() returns a dict mapping group names to descriptions
# - NNTP.xover() returns a list of dicts mapping field names (header or metadata)
# to field values; each dict representing a message overview.
# - NNTP.article(), NNTP.head() and NNTP.body() return a (response, ArticleInfo)
# tuple.
# - the "internal" methods have been marked private (they now start with
# an underscore)
# Other changes from the 2.x/3.1 nntplib:
# - automatic querying of capabilities at connect
# - New method NNTP.getcapabilities()
# - New method NNTP.over()
# - New helper function decode_header()
# - NNTP.post() and NNTP.ihave() accept file objects, bytes-like objects and
# arbitrary iterables yielding lines.
# - An extensive test suite :-)
# TODO:
# - return structured data (GroupInfo etc.) everywhere
# - support HDR
# Imports
import re
import socket
import collections
import datetime
import warnings
__all__ = ["NNTP","NNTPReplyError","NNTPTemporaryError",
"NNTPPermanentError","NNTPProtocolError","NNTPDataError",
"error_reply","error_temp","error_perm","error_proto",
"error_data",]
from email.header import decode_header as _email_decode_header
from socket import _GLOBAL_DEFAULT_TIMEOUT
__all__ = ["NNTP",
"NNTPReplyError", "NNTPTemporaryError", "NNTPPermanentError",
"NNTPProtocolError", "NNTPDataError",
"decode_header",
]
# Exceptions raised when an error or invalid response is received
class NNTPError(Exception):
......@@ -67,39 +108,189 @@ class NNTPDataError(NNTPError):
"""Error in response data"""
pass
# for backwards compatibility
error_reply = NNTPReplyError
error_temp = NNTPTemporaryError
error_perm = NNTPPermanentError
error_proto = NNTPProtocolError
error_data = NNTPDataError
# Standard port used by NNTP servers
NNTP_PORT = 119
# Response numbers that are followed by additional text (e.g. article)
LONGRESP = [b'100', b'215', b'220', b'221', b'222', b'224', b'230', b'231', b'282']
_LONGRESP = {
'100', # HELP
'101', # CAPABILITIES
'211', # LISTGROUP (also not multi-line with GROUP)
'215', # LIST
'220', # ARTICLE
'221', # HEAD, XHDR
'222', # BODY
'224', # OVER, XOVER
'225', # HDR
'230', # NEWNEWS
'231', # NEWGROUPS
'282', # XGTITLE
}
# Default decoded value for LIST OVERVIEW.FMT if not supported
_DEFAULT_OVERVIEW_FMT = [
"subject", "from", "date", "message-id", "references", ":bytes", ":lines"]
# Alternative names allowed in LIST OVERVIEW.FMT response
_OVERVIEW_FMT_ALTERNATIVES = {
'bytes': ':bytes',
'lines': ':lines',
}
# Line terminators (we always output CRLF, but accept any of CRLF, CR, LF)
CRLF = b'\r\n'
_CRLF = b'\r\n'
GroupInfo = collections.namedtuple('GroupInfo',
['group', 'last', 'first', 'flag'])
ArticleInfo = collections.namedtuple('ArticleInfo',
['number', 'message_id', 'lines'])
# The class itself
class NNTP:
def __init__(self, host, port=NNTP_PORT, user=None, password=None,
readermode=None, usenetrc=True):
# Helper function(s)
def decode_header(header_str):
"""Takes an unicode string representing a munged header value
and decodes it as a (possibly non-ASCII) readable value."""
parts = []
for v, enc in _email_decode_header(header_str):
if isinstance(v, bytes):
parts.append(v.decode(enc or 'ascii'))
else:
parts.append(v)
return ' '.join(parts)
def _parse_overview_fmt(lines):
"""Parse a list of string representing the response to LIST OVERVIEW.FMT
and return a list of header/metadata names.
Raises NNTPDataError if the response is not compliant
(cf. RFC 3977, section 8.4)."""
fmt = []
for line in lines:
if line[0] == ':':
# Metadata name (e.g. ":bytes")
name, _, suffix = line[1:].partition(':')
name = ':' + name
else:
# Header name (e.g. "Subject:" or "Xref:full")
name, _, suffix = line.partition(':')
name = name.lower()
name = _OVERVIEW_FMT_ALTERNATIVES.get(name, name)
# Should we do something with the suffix?
fmt.append(name)
defaults = _DEFAULT_OVERVIEW_FMT
if len(fmt) < len(defaults):
raise NNTPDataError("LIST OVERVIEW.FMT response too short")
if fmt[:len(defaults)] != defaults:
raise NNTPDataError("LIST OVERVIEW.FMT redefines default fields")
return fmt
def _parse_overview(lines, fmt, data_process_func=None):
"""Parse the response to a OVER or XOVER command according to the
overview format `fmt`."""
n_defaults = len(_DEFAULT_OVERVIEW_FMT)
overview = []
for line in lines:
fields = {}
article_number, *tokens = line.split('\t')
article_number = int(article_number)
for i, token in enumerate(tokens):
if i >= len(fmt):
# XXX should we raise an error? Some servers might not
# support LIST OVERVIEW.FMT and still return additional
# headers.
continue
field_name = fmt[i]
is_metadata = field_name.startswith(':')
if i >= n_defaults and not is_metadata:
# Non-default header names are included in full in the response
h = field_name + ":"
if token[:len(h)].lower() != h:
raise NNTPDataError("OVER/XOVER response doesn't include "
"names of additional headers")
token = token[len(h):].lstrip(" ")
fields[fmt[i]] = token
overview.append((article_number, fields))
return overview
def _parse_datetime(date_str, time_str=None):
"""Parse a pair of (date, time) strings, and return a datetime object.
If only the date is given, it is assumed to be date and time
concatenated together (e.g. response to the DATE command).
"""
if time_str is None:
time_str = date_str[-6:]
date_str = date_str[:-6]
hours = int(time_str[:2])
minutes = int(time_str[2:4])
seconds = int(time_str[4:])
year = int(date_str[:-4])
month = int(date_str[-4:-2])
day = int(date_str[-2:])
# RFC 3977 doesn't say how to interpret 2-char years. Assume that
# there are no dates before 1970 on Usenet.
if year < 70:
year += 2000
elif year < 100:
year += 1900
return datetime.datetime(year, month, day, hours, minutes, seconds)
def _unparse_datetime(dt, legacy=False):
"""Format a date or datetime object as a pair of (date, time) strings
in the format required by the NEWNEWS and NEWGROUPS commands. If a
date object is passed, the time is assumed to be midnight (00h00).
The returned representation depends on the legacy flag:
* if legacy is False (the default):
date has the YYYYMMDD format and time the HHMMSS format
* if legacy is True:
date has the YYMMDD format and time the HHMMSS format.
RFC 3977 compliant servers should understand both formats; therefore,
legacy is only needed when talking to old servers.
"""
if not isinstance(dt, datetime.datetime):
time_str = "000000"
else:
time_str = "{0.hour:02d}{0.minute:02d}{0.second:02d}".format(dt)
y = dt.year
if legacy:
y = y % 100
date_str = "{0:02d}{1.month:02d}{1.day:02d}".format(y, dt)
else:
date_str = "{0:04d}{1.month:02d}{1.day:02d}".format(y, dt)
return date_str, time_str
# The classes themselves
class _NNTPBase:
# UTF-8 is the character set for all NNTP commands and responses: they
# are automatically encoded (when sending) and decoded (and receiving)
# by this class.
# However, some multi-line data blocks can contain arbitrary bytes (for
# example, latin-1 or utf-16 data in the body of a message). Commands
# taking (POST, IHAVE) or returning (HEAD, BODY, ARTICLE) raw message
# data will therefore only accept and produce bytes objects.
# Furthermore, since there could be non-compliant servers out there,
# we use 'surrogateescape' as the error handler for fault tolerance
# and easy round-tripping. This could be useful for some applications
# (e.g. NNTP gateways).
encoding = 'utf-8'
errors = 'surrogateescape'
def __init__(self, file, user=None, password=None,
readermode=None, usenetrc=True,
timeout=_GLOBAL_DEFAULT_TIMEOUT):
"""Initialize an instance. Arguments:
- host: hostname to connect to
- port: port to connect to (default the standard NNTP port)
- file: file-like object (open for read/write in binary mode)
- user: username to authenticate with
- password: password to use with username
- readermode: if true, send 'mode reader' command after
connecting.
- usenetrc: allow loading username and password from ~/.netrc file
if not specified explicitly
- timeout: timeout (in seconds) used for socket connections
readermode is sometimes necessary if you are connecting to an
NNTP server on the local machine and intend to call
......@@ -107,12 +298,9 @@ class NNTP:
unexpected NNTPPermanentErrors, you might need to set
readermode.
"""
self.host = host
self.port = port
self.sock = socket.create_connection((host, port))
self.file = self.sock.makefile('rb')
self.file = file
self.debugging = 0
self.welcome = self.getresp()
self.welcome = self._getresp()
# 'mode reader' is sometimes necessary to enable 'reader' mode.
# However, the order in which 'mode reader' and 'authinfo' need to
......@@ -122,12 +310,12 @@ class NNTP:
readermode_afterauth = 0
if readermode:
try:
self.welcome = self.shortcmd('mode reader')
self.welcome = self._shortcmd('mode reader')
except NNTPPermanentError:
# error 500, probably 'not implemented'
pass
except NNTPTemporaryError as e:
if user and e.response.startswith(b'480'):
if user and e.response.startswith('480'):
# Need authorization before 'mode reader'
readermode_afterauth = 1
else:
......@@ -144,29 +332,35 @@ class NNTP:
password = auth[2]
except IOError:
pass
# Perform NNRP authentication if needed.
# Perform NNTP authentication if needed.
if user:
resp = self.shortcmd('authinfo user '+user)
if resp.startswith(b'381'):
resp = self._shortcmd('authinfo user '+user)
if resp.startswith('381'):
if not password:
raise NNTPReplyError(resp)
else:
resp = self.shortcmd(
resp = self._shortcmd(
'authinfo pass '+password)
if not resp.startswith(b'281'):
if not resp.startswith('281'):
raise NNTPPermanentError(resp)
if readermode_afterauth:
try:
self.welcome = self.shortcmd('mode reader')
self.welcome = self._shortcmd('mode reader')
except NNTPPermanentError:
# error 500, probably 'not implemented'
pass
# Get the welcome message from the server
# (this is read and squirreled away by __init__()).
# If the response code is 200, posting is allowed;
# if it 201, posting is not allowed
# Inquire about capabilities (RFC 3977)
self.nntp_version = 1
try:
resp, caps = self.capabilities()
except NNTPPermanentError:
# Server doesn't support capabilities
self._caps = {}
else:
self._caps = caps
if 'VERSION' in caps:
self.nntp_version = int(caps['VERSION'][0])
def getwelcome(self):
"""Get the welcome message from the server
......@@ -177,6 +371,12 @@ class NNTP:
if self.debugging: print('*welcome*', repr(self.welcome))
return self.welcome
def getcapabilities(self):
"""Get the server capabilities, as read by __init__().
If the CAPABILITIES command is not supported, an empty dict is
returned."""
return self._caps
def set_debuglevel(self, level):
"""Set the debugging level. Argument 'level' means:
0: no debugging output (default)
......@@ -186,121 +386,221 @@ class NNTP:
self.debugging = level
debug = set_debuglevel
def putline(self, line):
"""Internal: send one line to the server, appending CRLF."""
line = line + CRLF
def _putline(self, line):
"""Internal: send one line to the server, appending CRLF.
The `line` must be a bytes-like object."""
line = line + _CRLF
if self.debugging > 1: print('*put*', repr(line))
self.sock.sendall(line)
self.file.write(line)
self.file.flush()
def putcmd(self, line):
"""Internal: send one command to the server (through putline())."""
def _putcmd(self, line):
"""Internal: send one command to the server (through _putline()).
The `line` must be an unicode string."""
if self.debugging: print('*cmd*', repr(line))
line = bytes(line, "ASCII")
self.putline(line)
line = line.encode(self.encoding, self.errors)
self._putline(line)
def getline(self):
"""Internal: return one line from the server, stripping CRLF.
Raise EOFError if the connection is closed."""
def _getline(self, strip_crlf=True):
"""Internal: return one line from the server, stripping _CRLF.
Raise EOFError if the connection is closed.
Returns a bytes object."""
line = self.file.readline()
if self.debugging > 1:
print('*get*', repr(line))
if not line: raise EOFError
if line[-2:] == CRLF:
line = line[:-2]
elif line[-1:] in CRLF:
line = line[:-1]
if strip_crlf:
if line[-2:] == _CRLF:
line = line[:-2]
elif line[-1:] in _CRLF:
line = line[:-1]
return line
def getresp(self):
def _getresp(self):
"""Internal: get a response from the server.
Raise various errors if the response indicates an error."""
resp = self.getline()
Raise various errors if the response indicates an error.
Returns an unicode string."""
resp = self._getline()
if self.debugging: print('*resp*', repr(resp))
resp = resp.decode(self.encoding, self.errors)
c = resp[:1]
if c == b'4':
if c == '4':
raise NNTPTemporaryError(resp)
if c == b'5':
if c == '5':
raise NNTPPermanentError(resp)
if c not in b'123':
if c not in '123':
raise NNTPProtocolError(resp)
return resp
def getlongresp(self, file=None):
def _getlongresp(self, file=None):
"""Internal: get a response plus following text from the server.
Raise various errors if the response indicates an error."""
Raise various errors if the response indicates an error.
Returns a (response, lines) tuple where `response` is an unicode
string and `lines` is a list of bytes objects.
If `file` is a file-like object, it must be open in binary mode.
"""
openedFile = None
try:
# If a string was passed then open a file with that name
if isinstance(file, str):
openedFile = file = open(file, "w")
if isinstance(file, (str, bytes)):
openedFile = file = open(file, "wb")
resp = self.getresp()
if resp[:3] not in LONGRESP:
resp = self._getresp()
if resp[:3] not in _LONGRESP:
raise NNTPReplyError(resp)
list = []
while 1:
line = self.getline()
if line == b'.':
break
if line.startswith(b'..'):
line = line[1:]
if file:
file.write(line + b'\n')
else:
list.append(line)
lines = []
if file is not None:
# XXX lines = None instead?
terminators = (b'.' + _CRLF, b'.\n')
while 1:
line = self._getline(False)
if line in terminators:
break
if line.startswith(b'..'):
line = line[1:]
file.write(line)
else:
terminator = b'.'
while 1:
line = self._getline()
if line == terminator:
break
if line.startswith(b'..'):
line = line[1:]
lines.append(line)
finally:
# If this method created the file, then it must close it
if openedFile:
openedFile.close()
return resp, list
return resp, lines
def shortcmd(self, line):
"""Internal: send a command and get the response."""
self.putcmd(line)
return self.getresp()
def _shortcmd(self, line):
"""Internal: send a command and get the response.
Same return value as _getresp()."""
self._putcmd(line)
return self._getresp()
def _longcmd(self, line, file=None):
"""Internal: send a command and get the response plus following text.
Same return value as _getlongresp()."""
self._putcmd(line)
return self._getlongresp(file)
def _longcmdstring(self, line, file=None):
"""Internal: send a command and get the response plus following text.
Same as _longcmd() and _getlongresp(), except that the returned `lines`
are unicode strings rather than bytes objects.
"""
self._putcmd(line)
resp, list = self._getlongresp(file)
return resp, [line.decode(self.encoding, self.errors)
for line in list]
def _getoverviewfmt(self):
"""Internal: get the overview format. Queries the server if not
already done, else returns the cached value."""
try:
return self._cachedoverviewfmt
except AttributeError:
pass
try:
resp, lines = self._longcmdstring("LIST OVERVIEW.FMT")
except NNTPPermanentError:
# Not supported by server?
fmt = _DEFAULT_OVERVIEW_FMT[:]
else:
fmt = _parse_overview_fmt(lines)
self._cachedoverviewfmt = fmt
return fmt
def longcmd(self, line, file=None):
"""Internal: send a command and get the response plus following text."""
self.putcmd(line)
return self.getlongresp(file)
def _grouplist(self, lines):
# Parse lines into "group last first flag"
return [GroupInfo(*line.split()) for line in lines]
def newgroups(self, date, time, file=None):
"""Process a NEWGROUPS command. Arguments:
- date: string 'yymmdd' indicating the date
- time: string 'hhmmss' indicating the time
def capabilities(self):
"""Process a CAPABILITIES command. Not supported by all servers.
Return:
- resp: server response if successful
- list: list of newsgroup names"""
return self.longcmd('NEWGROUPS ' + date + ' ' + time, file)
- caps: a dictionary mapping capability names to lists of tokens
(for example {'VERSION': ['2'], 'OVER': [], LIST: ['ACTIVE', 'HEADERS'] })
"""
caps = {}
resp, lines = self._longcmdstring("CAPABILITIES")
for line in lines:
name, *tokens = line.split()
caps[name] = tokens
return resp, caps
def newnews(self, group, date, time, file=None):
def newgroups(self, date, *, file=None):
"""Process a NEWGROUPS command. Arguments:
- date: a date or datetime object
Return:
- resp: server response if successful
- list: list of newsgroup names
"""
if not isinstance(date, (datetime.date, datetime.date)):
raise TypeError(
"the date parameter must be a date or datetime object, "
"not '{:40}'".format(date.__class__.__name__))
date_str, time_str = _unparse_datetime(date, self.nntp_version < 2)
cmd = 'NEWGROUPS {0} {1}'.format(date_str, time_str)
resp, lines = self._longcmdstring(cmd, file)
return resp, self._grouplist(lines)
def newnews(self, group, date, *, file=None):
"""Process a NEWNEWS command. Arguments:
- group: group name or '*'
- date: string 'yymmdd' indicating the date
- time: string 'hhmmss' indicating the time
- date: a date or datetime object
Return:
- resp: server response if successful
- list: list of message ids"""
cmd = 'NEWNEWS ' + group + ' ' + date + ' ' + time
return self.longcmd(cmd, file)
def list(self, file=None):
"""Process a LIST command. Return:
- list: list of message ids
"""
if not isinstance(date, (datetime.date, datetime.date)):
raise TypeError(
"the date parameter must be a date or datetime object, "
"not '{:40}'".format(date.__class__.__name__))
date_str, time_str = _unparse_datetime(date, self.nntp_version < 2)
cmd = 'NEWNEWS {0} {1} {2}'.format(group, date_str, time_str)
return self._longcmdstring(cmd, file)
def list(self, *, file=None):
"""Process a LIST command. Argument:
- file: Filename string or file object to store the result in
Returns:
- resp: server response if successful
- list: list of (group, last, first, flag) (strings)"""
- list: list of (group, last, first, flag) (strings)
"""
resp, lines = self._longcmdstring('LIST', file)
return resp, self._grouplist(lines)
resp, list = self.longcmd('LIST', file)
for i in range(len(list)):
# Parse lines into "group last first flag"
list[i] = tuple(list[i].split())
return resp, list
def _getdescriptions(self, group_pattern, return_all):
line_pat = re.compile('^(?P<group>[^ \t]+)[ \t]+(.*)$')
# Try the more std (acc. to RFC2980) LIST NEWSGROUPS first
resp, lines = self._longcmdstring('LIST NEWSGROUPS ' + group_pattern)
if not resp.startswith('215'):
# Now the deprecated XGTITLE. This either raises an error
# or succeeds with the same output structure as LIST
# NEWSGROUPS.
resp, lines = self._longcmdstring('XGTITLE ' + group_pattern)
groups = {}
for raw_line in lines:
match = line_pat.search(raw_line.strip())
if match:
name, desc = match.group(1, 2)
if not return_all:
return desc
groups[name] = desc
if return_all:
return resp, groups
else:
# Nothing found
return ''
def description(self, group):
"""Get a description for a single group. If more than one
group matches ('group' is a pattern), return the first. If no
group matches, return an empty string.
......@@ -311,42 +611,24 @@ class NNTP:
NOTE: This neither checks for a wildcard in 'group' nor does
it check whether the group actually exists."""
resp, lines = self.descriptions(group)
if len(lines) == 0:
return b''
else:
return lines[0][1]
return self._getdescriptions(group, False)
def descriptions(self, group_pattern):
"""Get descriptions for a range of groups."""
line_pat = re.compile(b'^(?P<group>[^ \t]+)[ \t]+(.*)$')
# Try the more std (acc. to RFC2980) LIST NEWSGROUPS first
resp, raw_lines = self.longcmd('LIST NEWSGROUPS ' + group_pattern)
if not resp.startswith(b'215'):
# Now the deprecated XGTITLE. This either raises an error
# or succeeds with the same output structure as LIST
# NEWSGROUPS.
resp, raw_lines = self.longcmd('XGTITLE ' + group_pattern)
lines = []
for raw_line in raw_lines:
match = line_pat.search(raw_line.strip())
if match:
lines.append(match.group(1, 2))
return resp, lines
return self._getdescriptions(group_pattern, True)
def group(self, name):
"""Process a GROUP command. Argument:
- group: the group name
Returns:
- resp: server response if successful
- count: number of articles (string)
- first: first article number (string)
- last: last article number (string)
- name: the group name"""
resp = self.shortcmd('GROUP ' + name)
if not resp.startswith(b'211'):
- count: number of articles
- first: first article number
- last: last article number
- name: the group name
"""
resp = self._shortcmd('GROUP ' + name)
if not resp.startswith('211'):
raise NNTPReplyError(resp)
words = resp.split()
count = first = last = 0
......@@ -359,151 +641,177 @@ class NNTP:
last = words[3]
if n > 4:
name = words[4].lower()
return resp, count, first, last, name
return resp, int(count), int(first), int(last), name
def help(self, file=None):
"""Process a HELP command. Returns:
def help(self, *, file=None):
"""Process a HELP command. Argument:
- file: Filename string or file object to store the result in
Returns:
- resp: server response if successful
- list: list of strings"""
return self.longcmd('HELP',file)
- list: list of strings returned by the server in response to the
HELP command
"""
return self._longcmdstring('HELP', file)
def statparse(self, resp):
"""Internal: parse the response of a STAT, NEXT or LAST command."""
if not resp.startswith(b'22'):
def _statparse(self, resp):
"""Internal: parse the response line of a STAT, NEXT, LAST,
ARTICLE, HEAD or BODY command."""
if not resp.startswith('22'):
raise NNTPReplyError(resp)
words = resp.split()
nr = 0
id = b''
n = len(words)
if n > 1:
nr = words[1]
if n > 2:
id = words[2]
return resp, nr, id
art_num = int(words[1])
message_id = words[2]
return resp, art_num, message_id
def statcmd(self, line):
def _statcmd(self, line):
"""Internal: process a STAT, NEXT or LAST command."""
resp = self.shortcmd(line)
return self.statparse(resp)
resp = self._shortcmd(line)
return self._statparse(resp)
def stat(self, id):
def stat(self, message_spec=None):
"""Process a STAT command. Argument:
- id: article number or message id
- message_spec: article number or message id (if not specified,
the current article is selected)
Returns:
- resp: server response if successful
- nr: the article number
- id: the message id"""
return self.statcmd('STAT {0}'.format(id))
- art_num: the article number
- message_id: the message id
"""
if message_spec:
return self._statcmd('STAT {0}'.format(message_spec))
else:
return self._statcmd('STAT')
def next(self):
"""Process a NEXT command. No arguments. Return as for STAT."""
return self.statcmd('NEXT')
return self._statcmd('NEXT')
def last(self):
"""Process a LAST command. No arguments. Return as for STAT."""
return self.statcmd('LAST')
return self._statcmd('LAST')
def artcmd(self, line, file=None):
def _artcmd(self, line, file=None):
"""Internal: process a HEAD, BODY or ARTICLE command."""
resp, list = self.longcmd(line, file)
resp, nr, id = self.statparse(resp)
return resp, nr, id, list
resp, lines = self._longcmd(line, file)
resp, art_num, message_id = self._statparse(resp)
return resp, ArticleInfo(art_num, message_id, lines)
def head(self, id):
def head(self, message_spec=None, *, file=None):
"""Process a HEAD command. Argument:
- id: article number or message id
- message_spec: article number or message id
- file: filename string or file object to store the headers in
Returns:
- resp: server response if successful
- nr: article number
- id: message id
- list: the lines of the article's header"""
return self.artcmd('HEAD {0}'.format(id))
- ArticleInfo: (article number, message id, list of header lines)
"""
if message_spec is not None:
cmd = 'HEAD {0}'.format(message_spec)
else:
cmd = 'HEAD'
return self._artcmd(cmd, file)
def body(self, id, file=None):
def body(self, message_spec=None, *, file=None):
"""Process a BODY command. Argument:
- id: article number or message id
- file: Filename string or file object to store the article in
- message_spec: article number or message id
- file: filename string or file object to store the body in
Returns:
- resp: server response if successful
- nr: article number
- id: message id
- list: the lines of the article's body or an empty list
if file was used"""
return self.artcmd('BODY {0}'.format(id), file)
- ArticleInfo: (article number, message id, list of body lines)
"""
if message_spec is not None:
cmd = 'BODY {0}'.format(message_spec)
else:
cmd = 'BODY'
return self._artcmd(cmd, file)
def article(self, id):
def article(self, message_spec=None, *, file=None):
"""Process an ARTICLE command. Argument:
- id: article number or message id
- message_spec: article number or message id
- file: filename string or file object to store the article in
Returns:
- resp: server response if successful
- nr: article number
- id: message id
- list: the lines of the article"""
return self.artcmd('ARTICLE {0}'.format(id))
- ArticleInfo: (article number, message id, list of article lines)
"""
if message_spec is not None:
cmd = 'ARTICLE {0}'.format(message_spec)
else:
cmd = 'ARTICLE'
return self._artcmd(cmd, file)
def slave(self):
"""Process a SLAVE command. Returns:
- resp: server response if successful"""
return self.shortcmd('SLAVE')
- resp: server response if successful
"""
return self._shortcmd('SLAVE')
def xhdr(self, hdr, str, file=None):
def xhdr(self, hdr, str, *, file=None):
"""Process an XHDR command (optional server extension). Arguments:
- hdr: the header type (e.g. 'subject')
- str: an article nr, a message id, or a range nr1-nr2
- file: Filename string or file object to store the result in
Returns:
- resp: server response if successful
- list: list of (nr, value) strings"""
pat = re.compile(b'^([0-9]+) ?(.*)\n?')
resp, lines = self.longcmd('XHDR {0} {1}'.format(hdr, str), file)
for i in range(len(lines)):
line = lines[i]
- list: list of (nr, value) strings
"""
pat = re.compile('^([0-9]+) ?(.*)\n?')
resp, lines = self._longcmdstring('XHDR {0} {1}'.format(hdr, str), file)
def remove_number(line):
m = pat.match(line)
if m:
lines[i] = m.group(1, 2)
return resp, lines
return m.group(1, 2) if m else line
return resp, [remove_number(line) for line in lines]
def xover(self, start, end, file=None):
def xover(self, start, end, *, file=None):
"""Process an XOVER command (optional server extension) Arguments:
- start: start of range
- end: end of range
- file: Filename string or file object to store the result in
Returns:
- resp: server response if successful
- list: list of dicts containing the response fields
"""
resp, lines = self._longcmdstring('XOVER {0}-{1}'.format(start, end),
file)
fmt = self._getoverviewfmt()
return resp, _parse_overview(lines, fmt)
def over(self, message_spec, *, file=None):
"""Process an OVER command. If the command isn't supported, fall
back to XOVER. Arguments:
- message_spec:
- either a message id, indicating the article to fetch
information about
- or a (start, end) tuple, indicating a range of article numbers;
if end is None, information up to the newest message will be
retrieved
- or None, indicating the current article number must be used
- file: Filename string or file object to store the result in
Returns:
- resp: server response if successful
- list: list of (art-nr, subject, poster, date,
id, references, size, lines)"""
- list: list of dicts containing the response fields
resp, lines = self.longcmd('XOVER {0}-{1}'.format(start, end), file)
xover_lines = []
for line in lines:
elem = line.split(b'\t')
try:
xover_lines.append((elem[0],
elem[1],
elem[2],
elem[3],
elem[4],
elem[5].split(),
elem[6],
elem[7]))
except IndexError:
raise NNTPDataError(line)
return resp,xover_lines
def xgtitle(self, group, file=None):
NOTE: the "message id" form isn't supported by XOVER
"""
cmd = 'OVER' if 'OVER' in self._caps else 'XOVER'
if isinstance(message_spec, (tuple, list)):
start, end = message_spec
cmd += ' {0}-{1}'.format(start, end or '')
elif message_spec is not None:
cmd = cmd + ' ' + message_spec
resp, lines = self._longcmdstring(cmd, file)
fmt = self._getoverviewfmt()
return resp, _parse_overview(lines, fmt)
def xgtitle(self, group, *, file=None):
"""Process an XGTITLE command (optional server extension) Arguments:
- group: group name wildcard (i.e. news.*)
Returns:
- resp: server response if successful
- list: list of (name,title) strings"""
line_pat = re.compile(b'^([^ \t]+)[ \t]+(.*)$')
resp, raw_lines = self.longcmd('XGTITLE ' + group, file)
warnings.warn("The XGTITLE extension is not actively used, "
"use descriptions() instead",
PendingDeprecationWarning, 2)
line_pat = re.compile('^([^ \t]+)[ \t]+(.*)$')
resp, raw_lines = self._longcmdstring('XGTITLE ' + group, file)
lines = []
for raw_line in raw_lines:
match = line_pat.search(raw_line.strip())
......@@ -511,15 +819,18 @@ class NNTP:
lines.append(match.group(1, 2))
return resp, lines
def xpath(self,id):
def xpath(self, id):
"""Process an XPATH command (optional server extension) Arguments:
- id: Message id of article
Returns:
resp: server response if successful
path: directory path to article"""
path: directory path to article
"""
warnings.warn("The XPATH extension is not actively used",
PendingDeprecationWarning, 2)
resp = self.shortcmd('XPATH {0}'.format(id))
if not resp.startswith(b'223'):
resp = self._shortcmd('XPATH {0}'.format(id))
if not resp.startswith('223'):
raise NNTPReplyError(resp)
try:
[resp_num, path] = resp.split()
......@@ -528,89 +839,144 @@ class NNTP:
else:
return resp, path
def date (self):
"""Process the DATE command. Arguments:
None
def date(self):
"""Process the DATE command.
Returns:
resp: server response if successful
date: Date suitable for newnews/newgroups commands etc.
time: Time suitable for newnews/newgroups commands etc."""
resp = self.shortcmd("DATE")
if not resp.startswith(b'111'):
- resp: server response if successful
- date: datetime object
"""
resp = self._shortcmd("DATE")
if not resp.startswith('111'):
raise NNTPReplyError(resp)
elem = resp.split()
if len(elem) != 2:
raise NNTPDataError(resp)
date = elem[1][2:8]
time = elem[1][-6:]
if len(date) != 6 or len(time) != 6:
date = elem[1]
if len(date) != 14:
raise NNTPDataError(resp)
return resp, date, time
return resp, _parse_datetime(date, None)
def _post(self, command, f):
resp = self.shortcmd(command)
# Raises error_??? if posting is not allowed
if not resp.startswith(b'3'):
resp = self._shortcmd(command)
# Raises a specific exception if posting is not allowed
if not resp.startswith('3'):
raise NNTPReplyError(resp)
while 1:
line = f.readline()
if not line:
break
if line.endswith(b'\n'):
line = line[:-1]
if isinstance(f, (bytes, bytearray)):
f = f.splitlines()
# We don't use _putline() because:
# - we don't want additional CRLF if the file or iterable is already
# in the right format
# - we don't want a spurious flush() after each line is written
for line in f:
if not line.endswith(_CRLF):
line = line.rstrip(b"\r\n") + _CRLF
if line.startswith(b'.'):
line = b'.' + line
self.putline(line)
self.putline(b'.')
return self.getresp()
self.file.write(line)
self.file.write(b".\r\n")
self.file.flush()
return self._getresp()
def post(self, f):
def post(self, data):
"""Process a POST command. Arguments:
- f: file containing the article
- data: bytes object, iterable or file containing the article
Returns:
- resp: server response if successful"""
return self._post('POST', f)
return self._post('POST', data)
def ihave(self, id, f):
def ihave(self, message_id, data):
"""Process an IHAVE command. Arguments:
- id: message-id of the article
- f: file containing the article
- message_id: message-id of the article
- data: file containing the article
Returns:
- resp: server response if successful
Note that if the server refuses the article an exception is raised."""
return self._post('IHAVE {0}'.format(id), f)
return self._post('IHAVE {0}'.format(message_id), data)
def _close(self):
self.file.close()
del self.file
def quit(self):
"""Process a QUIT command and close the socket. Returns:
- resp: server response if successful"""
resp = self.shortcmd('QUIT')
self.file.close()
self.sock.close()
del self.file, self.sock
try:
resp = self._shortcmd('QUIT')
finally:
self._close()
return resp
class NNTP(_NNTPBase):
def __init__(self, host, port=NNTP_PORT, user=None, password=None,
readermode=None, usenetrc=True,
timeout=_GLOBAL_DEFAULT_TIMEOUT):
"""Initialize an instance. Arguments:
- host: hostname to connect to
- port: port to connect to (default the standard NNTP port)
- user: username to authenticate with
- password: password to use with username
- readermode: if true, send 'mode reader' command after
connecting.
- usenetrc: allow loading username and password from ~/.netrc file
if not specified explicitly
- timeout: timeout (in seconds) used for socket connections
readermode is sometimes necessary if you are connecting to an
NNTP server on the local machine and intend to call
reader-specific comamnds, such as `group'. If you get
unexpected NNTPPermanentErrors, you might need to set
readermode.
"""
self.host = host
self.port = port
self.sock = socket.create_connection((host, port), timeout)
file = self.sock.makefile("rwb")
_NNTPBase.__init__(self, file, user, password,
readermode, usenetrc, timeout)
def _close(self):
try:
_NNTPBase._close(self)
finally:
self.sock.close()
# Test retrieval when run as a script.
# Assumption: if there's a local news server, it's called 'news'.
# Assumption: if user queries a remote news server, it's named
# in the environment variable NNTPSERVER (used by slrn and kin)
# and we want readermode off.
if __name__ == '__main__':
import os
newshost = 'news' and os.environ["NNTPSERVER"]
if newshost.find('.') == -1:
mode = 'readermode'
else:
mode = None
s = NNTP(newshost, readermode=mode)
resp, count, first, last, name = s.group('comp.lang.python')
print(resp)
import argparse
from email.utils import parsedate
parser = argparse.ArgumentParser(description="""\
nntplib built-in demo - display the latest articles in a newsgroup""")
parser.add_argument('-g', '--group', default='gmane.comp.python.general',
help='group to fetch messages from (default: %(default)s)')
parser.add_argument('-s', '--server', default='news.gmane.org',
help='NNTP server hostname (default: %(default)s)')
parser.add_argument('-p', '--port', default=NNTP_PORT, type=int,
help='NNTP port number (default: %(default)s)')
parser.add_argument('-n', '--nb-articles', default=10, type=int,
help='number of articles to fetch (default: %(default)s)')
args = parser.parse_args()
s = NNTP(host=args.server, port=args.port)
resp, count, first, last, name = s.group(args.group)
print('Group', name, 'has', count, 'articles, range', first, 'to', last)
resp, subs = s.xhdr('subject', '{0}-{1}'.format(first, last))
print(resp)
for item in subs:
print("%7s %s" % item)
resp = s.quit()
print(resp)
def cut(s, lim):
if len(s) > lim:
s = s[:lim - 4] + "..."
return s
first = str(int(last) - args.nb_articles + 1)
resp, overviews = s.xover(first, last)
for artnum, over in overviews:
author = decode_header(over['from']).split('<', 1)[0]
subject = decode_header(over['subject'])
lines = int(over[':lines'])
print("{:7} {:20} {:42} ({})".format(
artnum, cut(author, 20), cut(subject, 42), lines)
)
s.quit()
import io
import datetime
import textwrap
import unittest
import contextlib
from test import support
from nntplib import NNTP, GroupInfo
import nntplib
TIMEOUT = 30
# TODO:
# - test the `file` arg to more commands
# - test error conditions
class NetworkedNNTPTestsMixin:
def test_welcome(self):
welcome = self.server.getwelcome()
self.assertEqual(str, type(welcome))
def test_help(self):
resp, list = self.server.help()
self.assertTrue(resp.startswith("100 "), resp)
for line in list:
self.assertEqual(str, type(line))
def test_list(self):
resp, list = self.server.list()
if len(list) > 0:
self.assertEqual(GroupInfo, type(list[0]))
self.assertEqual(str, type(list[0].group))
def test_unknown_command(self):
with self.assertRaises(nntplib.NNTPPermanentError) as cm:
self.server._shortcmd("XYZZY")
resp = cm.exception.response
self.assertTrue(resp.startswith("500 "), resp)
def test_newgroups(self):
# gmane gets a constant influx of new groups. In order not to stress
# the server too much, we choose a recent date in the past.
dt = datetime.date.today() - datetime.timedelta(days=7)
resp, groups = self.server.newgroups(dt)
if len(groups) > 0:
self.assertIsInstance(groups[0], GroupInfo)
self.assertIsInstance(groups[0].group, str)
def test_description(self):
def _check_desc(desc):
# Sanity checks
self.assertIsInstance(desc, str)
self.assertNotIn(self.GROUP_NAME, desc)
desc = self.server.description(self.GROUP_NAME)
_check_desc(desc)
# Another sanity check
self.assertIn("Python", desc)
# With a pattern
desc = self.server.description(self.GROUP_PAT)
_check_desc(desc)
# Shouldn't exist
desc = self.server.description("zk.brrtt.baz")
self.assertEqual(desc, '')
def test_descriptions(self):
resp, descs = self.server.descriptions(self.GROUP_PAT)
# 215 for LIST NEWSGROUPS, 282 for XGTITLE
self.assertTrue(
resp.startswith("215 ") or resp.startswith("282 "), resp)
self.assertIsInstance(descs, dict)
desc = descs[self.GROUP_NAME]
self.assertEqual(desc, self.server.description(self.GROUP_NAME))
def test_group(self):
result = self.server.group(self.GROUP_NAME)
self.assertEqual(5, len(result))
resp, count, first, last, group = result
self.assertEqual(group, self.GROUP_NAME)
self.assertIsInstance(count, int)
self.assertIsInstance(first, int)
self.assertIsInstance(last, int)
self.assertLessEqual(first, last)
self.assertTrue(resp.startswith("211 "), resp)
def test_date(self):
resp, date = self.server.date()
self.assertIsInstance(date, datetime.datetime)
# Sanity check
self.assertGreaterEqual(date.year, 1995)
self.assertLessEqual(date.year, 2030)
def _check_art_dict(self, art_dict):
# Some sanity checks for a field dictionary returned by OVER / XOVER
self.assertIsInstance(art_dict, dict)
# NNTP has 7 mandatory fields
self.assertGreaterEqual(art_dict.keys(),
{"subject", "from", "date", "message-id",
"references", ":bytes", ":lines"}
)
for v in art_dict.values():
self.assertIsInstance(v, str)
def test_xover(self):
resp, count, first, last, name = self.server.group(self.GROUP_NAME)
resp, lines = self.server.xover(last, last)
art_num, art_dict = lines[0]
self.assertEqual(art_num, last)
self._check_art_dict(art_dict)
def test_over(self):
resp, count, first, last, name = self.server.group(self.GROUP_NAME)
start = last - 10
# The "start-" article range form
resp, lines = self.server.over((start, None))
art_num, art_dict = lines[0]
self._check_art_dict(art_dict)
# The "start-end" article range form
resp, lines = self.server.over((start, last))
art_num, art_dict = lines[-1]
self.assertEqual(art_num, last)
self._check_art_dict(art_dict)
# XXX The "message_id" form is unsupported by gmane
# 503 Overview by message-ID unsupported
def test_xhdr(self):
resp, count, first, last, name = self.server.group(self.GROUP_NAME)
resp, lines = self.server.xhdr('subject', last)
for line in lines:
self.assertEqual(str, type(line[1]))
def check_article_resp(self, resp, article, art_num=None):
self.assertIsInstance(article, nntplib.ArticleInfo)
if art_num is not None:
self.assertEqual(article.number, art_num)
for line in article.lines:
self.assertIsInstance(line, bytes)
# XXX this could exceptionally happen...
self.assertNotIn(article.lines[-1], (b".", b".\n", b".\r\n"))
def test_article_head_body(self):
resp, count, first, last, name = self.server.group(self.GROUP_NAME)
resp, head = self.server.head(last)
self.assertTrue(resp.startswith("221 "), resp)
self.check_article_resp(resp, head, last)
resp, body = self.server.body(last)
self.assertTrue(resp.startswith("222 "), resp)
self.check_article_resp(resp, body, last)
resp, article = self.server.article(last)
self.assertTrue(resp.startswith("220 "), resp)
self.check_article_resp(resp, article, last)
self.assertEqual(article.lines, head.lines + [b''] + body.lines)
def test_quit(self):
self.server.quit()
self.server = None
class NetworkedNNTPTests(NetworkedNNTPTestsMixin, unittest.TestCase):
NNTP_HOST = 'news.gmane.org'
GROUP_NAME = 'gmane.comp.python.devel'
GROUP_PAT = 'gmane.comp.python.d*'
def setUp(self):
support.requires("network")
with support.transient_internet(self.NNTP_HOST):
self.server = NNTP(self.NNTP_HOST, timeout=TIMEOUT)
def tearDown(self):
if self.server is not None:
self.server.quit()
# Disabled with gmane as it produces too much data
test_list = None
def test_capabilities(self):
# As of this writing, gmane implements NNTP version 2 and has a
# couple of well-known capabilities. Just sanity check that we
# got them.
def _check_caps(caps):
caps_list = caps['LIST']
self.assertIsInstance(caps_list, (list, tuple))
self.assertIn('OVERVIEW.FMT', caps_list)
self.assertGreaterEqual(self.server.nntp_version, 2)
_check_caps(self.server.getcapabilities())
# This re-emits the command
resp, caps = self.server.capabilities()
_check_caps(caps)
#
# Non-networked tests using a local server (or something mocking it).
#
class _NNTPServerIO(io.RawIOBase):
"""A raw IO object allowing NNTP commands to be received and processed
by a handler. The handler can push responses which can then be read
from the IO object."""
def __init__(self, handler):
io.RawIOBase.__init__(self)
# The channel from the client
self.c2s = io.BytesIO()
# The channel to the client
self.s2c = io.BytesIO()
self.handler = handler
self.handler.start(self.c2s.readline, self.push_data)
def readable(self):
return True
def writable(self):
return True
def push_data(self, data):
"""Push (buffer) some data to send to the client."""
pos = self.s2c.tell()
self.s2c.seek(0, 2)
self.s2c.write(data)
self.s2c.seek(pos)
def write(self, b):
"""The client sends us some data"""
pos = self.c2s.tell()
self.c2s.write(b)
self.c2s.seek(pos)
self.handler.process_pending()
return len(b)
def readinto(self, buf):
"""The client wants to read a response"""
self.handler.process_pending()
b = self.s2c.read(len(buf))
n = len(b)
buf[:n] = b
return n
class MockedNNTPTestsMixin:
# Override in derived classes
handler_class = None
def setUp(self):
super().setUp()
self.make_server()
def tearDown(self):
super().tearDown()
del self.server
def make_server(self, *args, **kwargs):
self.handler = self.handler_class()
self.sio = _NNTPServerIO(self.handler)
# Using BufferedRWPair instead of BufferedRandom ensures the file
# isn't seekable.
file = io.BufferedRWPair(self.sio, self.sio)
self.server = nntplib._NNTPBase(file, *args, **kwargs)
return self.server
class NNTPv1Handler:
"""A handler for RFC 977"""
welcome = "200 NNTP mock server"
def start(self, readline, push_data):
self.in_body = False
self.allow_posting = True
self._readline = readline
self._push_data = push_data
# Our welcome
self.handle_welcome()
def _decode(self, data):
return str(data, "utf-8", "surrogateescape")
def process_pending(self):
if self.in_body:
while True:
line = self._readline()
if not line:
return
self.body.append(line)
if line == b".\r\n":
break
try:
meth, tokens = self.body_callback
meth(*tokens, body=self.body)
finally:
self.body_callback = None
self.body = None
self.in_body = False
while True:
line = self._decode(self._readline())
if not line:
return
if not line.endswith("\r\n"):
raise ValueError("line doesn't end with \\r\\n: {!r}".format(line))
line = line[:-2]
cmd, *tokens = line.split()
#meth = getattr(self.handler, "handle_" + cmd.upper(), None)
meth = getattr(self, "handle_" + cmd.upper(), None)
if meth is None:
self.handle_unknown()
else:
try:
meth(*tokens)
except Exception as e:
raise ValueError("command failed: {!r}".format(line)) from e
else:
if self.in_body:
self.body_callback = meth, tokens
self.body = []
def expect_body(self):
"""Flag that the client is expected to post a request body"""
self.in_body = True
def push_data(self, data):
"""Push some binary data"""
self._push_data(data)
def push_lit(self, lit):
"""Push a string literal"""
lit = textwrap.dedent(lit)
lit = "\r\n".join(lit.splitlines()) + "\r\n"
lit = lit.encode('utf-8')
self.push_data(lit)
def handle_unknown(self):
self.push_lit("500 What?")
def handle_welcome(self):
self.push_lit(self.welcome)
def handle_QUIT(self):
self.push_lit("205 Bye!")
def handle_DATE(self):
self.push_lit("111 20100914001155")
def handle_GROUP(self, group):
if group == "fr.comp.lang.python":
self.push_lit("211 486 761 1265 fr.comp.lang.python")
else:
self.push_lit("411 No such group {}".format(group))
def handle_HELP(self):
self.push_lit("""\
100 Legal commands
authinfo user Name|pass Password|generic <prog> <args>
date
help
Report problems to <root@example.org>
.""")
def handle_STAT(self, message_spec=None):
if message_spec is None:
self.push_lit("412 No newsgroup selected")
elif message_spec == "3000234":
self.push_lit("223 3000234 <45223423@example.com>")
elif message_spec == "<45223423@example.com>":
self.push_lit("223 0 <45223423@example.com>")
else:
self.push_lit("430 No Such Article Found")
def handle_NEXT(self):
self.push_lit("223 3000237 <668929@example.org> retrieved")
def handle_LAST(self):
self.push_lit("223 3000234 <45223423@example.com> retrieved")
def handle_LIST(self, action=None, param=None):
if action is None:
self.push_lit("""\
215 Newsgroups in form "group high low flags".
comp.lang.python 0000052340 0000002828 y
comp.lang.python.announce 0000001153 0000000993 m
free.it.comp.lang.python 0000000002 0000000002 y
fr.comp.lang.python 0000001254 0000000760 y
free.it.comp.lang.python.learner 0000000000 0000000001 y
tw.bbs.comp.lang.python 0000000304 0000000304 y
.""")
elif action == "OVERVIEW.FMT":
self.push_lit("""\
215 Order of fields in overview database.
Subject:
From:
Date:
Message-ID:
References:
Bytes:
Lines:
Xref:full
.""")
elif action == "NEWSGROUPS":
assert param is not None
if param == "comp.lang.python":
self.push_lit("""\
215 Descriptions in form "group description".
comp.lang.python\tThe Python computer language.
.""")
elif param == "comp.lang.python*":
self.push_lit("""\
215 Descriptions in form "group description".
comp.lang.python.announce\tAnnouncements about the Python language. (Moderated)
comp.lang.python\tThe Python computer language.
.""")
else:
self.push_lit("""\
215 Descriptions in form "group description".
.""")
else:
self.push_lit('501 Unknown LIST keyword')
def handle_NEWNEWS(self, group, date_str, time_str):
# We hard code different return messages depending on passed
# argument and date syntax.
if (group == "comp.lang.python" and date_str == "20100913"
and time_str == "082004"):
# Date was passed in RFC 3977 format (NNTP "v2")
self.push_lit("""\
230 list of newsarticles (NNTP v2) created after Mon Sep 13 08:20:04 2010 follows
<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
.""")
elif (group == "comp.lang.python" and date_str == "100913"
and time_str == "082004"):
# Date was passed in RFC 977 format (NNTP "v1")
self.push_lit("""\
230 list of newsarticles (NNTP v1) created after Mon Sep 13 08:20:04 2010 follows
<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>
<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>
.""")
else:
self.push_lit("""\
230 An empty list of newsarticles follows
.""")
# (Note for experiments: many servers disable NEWNEWS.
# As of this writing, sicinfo3.epfl.ch doesn't.)
def handle_XOVER(self, message_spec):
if message_spec == "57-59":
self.push_lit(
"224 Overview information for 57-58 follows\n"
"57\tRe: ANN: New Plone book with strong Python (and Zope) themes throughout"
"\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
"\tSat, 19 Jun 2010 18:04:08 -0400"
"\t<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>"
"\t<hvalf7$ort$1@dough.gmane.org>\t7103\t16"
"\tXref: news.gmane.org gmane.comp.python.authors:57"
"\n"
"58\tLooking for a few good bloggers"
"\tDoug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>"
"\tThu, 22 Jul 2010 09:14:14 -0400"
"\t<A29863FA-F388-40C3-AA25-0FD06B09B5BF@gmail.com>"
"\t\t6683\t16"
"\tXref: news.gmane.org gmane.comp.python.authors:58"
"\n"
# An UTF-8 overview line from fr.comp.lang.python
"59\tRe: Message d'erreur incompréhensible (par moi)"
"\tEric Brunel <eric.brunel@pragmadev.nospam.com>"
"\tWed, 15 Sep 2010 18:09:15 +0200"
"\t<eric.brunel-2B8B56.18091515092010@news.wanadoo.fr>"
"\t<4c90ec87$0$32425$ba4acef3@reader.news.orange.fr>\t1641\t27"
"\tXref: saria.nerim.net fr.comp.lang.python:1265"
"\n"
".\n")
else:
self.push_lit("""\
224 No articles
.""")
def handle_POST(self, *, body=None):
if body is None:
if self.allow_posting:
self.push_lit("340 Input article; end with <CR-LF>.<CR-LF>")
self.expect_body()
else:
self.push_lit("440 Posting not permitted")
else:
assert self.allow_posting
self.push_lit("240 Article received OK")
self.posted_body = body
def handle_IHAVE(self, message_id, *, body=None):
if body is None:
if (self.allow_posting and
message_id == "<i.am.an.article.you.will.want@example.com>"):
self.push_lit("335 Send it; end with <CR-LF>.<CR-LF>")
self.expect_body()
else:
self.push_lit("435 Article not wanted")
else:
assert self.allow_posting
self.push_lit("235 Article transferred OK")
self.posted_body = body
sample_head = """\
From: "Demo User" <nobody@example.net>
Subject: I am just a test article
Content-Type: text/plain; charset=UTF-8; format=flowed
Message-ID: <i.am.an.article.you.will.want@example.com>"""
sample_body = """\
This is just a test article.
..Here is a dot-starting line.
-- Signed by Andr\xe9."""
sample_article = sample_head + "\n\n" + sample_body
def handle_ARTICLE(self, message_spec=None):
if message_spec is None:
self.push_lit("220 3000237 <45223423@example.com>")
elif message_spec == "<45223423@example.com>":
self.push_lit("220 0 <45223423@example.com>")
elif message_spec == "3000234":
self.push_lit("220 3000234 <45223423@example.com>")
else:
self.push_lit("430 No Such Article Found")
return
self.push_lit(self.sample_article)
self.push_lit(".")
def handle_HEAD(self, message_spec=None):
if message_spec is None:
self.push_lit("221 3000237 <45223423@example.com>")
elif message_spec == "<45223423@example.com>":
self.push_lit("221 0 <45223423@example.com>")
elif message_spec == "3000234":
self.push_lit("221 3000234 <45223423@example.com>")
else:
self.push_lit("430 No Such Article Found")
return
self.push_lit(self.sample_head)
self.push_lit(".")
def handle_BODY(self, message_spec=None):
if message_spec is None:
self.push_lit("222 3000237 <45223423@example.com>")
elif message_spec == "<45223423@example.com>":
self.push_lit("222 0 <45223423@example.com>")
elif message_spec == "3000234":
self.push_lit("222 3000234 <45223423@example.com>")
else:
self.push_lit("430 No Such Article Found")
return
self.push_lit(self.sample_body)
self.push_lit(".")
class NNTPv2Handler(NNTPv1Handler):
"""A handler for RFC 3977 (NNTP "v2")"""
def handle_CAPABILITIES(self):
self.push_lit("""\
101 Capability list:
VERSION 2
IMPLEMENTATION INN 2.5.1
AUTHINFO USER
HDR
LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
OVER
POST
READER
.""")
def handle_OVER(self, message_spec=None):
return self.handle_XOVER(message_spec)
class NNTPv1v2TestsMixin:
def setUp(self):
super().setUp()
def test_welcome(self):
self.assertEqual(self.server.welcome, self.handler.welcome)
def test_date(self):
resp, date = self.server.date()
self.assertEqual(resp, "111 20100914001155")
self.assertEqual(date, datetime.datetime(2010, 9, 14, 0, 11, 55))
def test_quit(self):
self.assertFalse(self.sio.closed)
resp = self.server.quit()
self.assertEqual(resp, "205 Bye!")
self.assertTrue(self.sio.closed)
def test_help(self):
resp, help = self.server.help()
self.assertEqual(resp, "100 Legal commands")
self.assertEqual(help, [
' authinfo user Name|pass Password|generic <prog> <args>',
' date',
' help',
'Report problems to <root@example.org>',
])
def test_list(self):
resp, groups = self.server.list()
self.assertEqual(len(groups), 6)
g = groups[1]
self.assertEqual(g,
GroupInfo("comp.lang.python.announce", "0000001153",
"0000000993", "m"))
def test_stat(self):
resp, art_num, message_id = self.server.stat(3000234)
self.assertEqual(resp, "223 3000234 <45223423@example.com>")
self.assertEqual(art_num, 3000234)
self.assertEqual(message_id, "<45223423@example.com>")
resp, art_num, message_id = self.server.stat("<45223423@example.com>")
self.assertEqual(resp, "223 0 <45223423@example.com>")
self.assertEqual(art_num, 0)
self.assertEqual(message_id, "<45223423@example.com>")
with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
self.server.stat("<non.existent.id>")
self.assertEqual(cm.exception.response, "430 No Such Article Found")
with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
self.server.stat()
self.assertEqual(cm.exception.response, "412 No newsgroup selected")
def test_next(self):
resp, art_num, message_id = self.server.next()
self.assertEqual(resp, "223 3000237 <668929@example.org> retrieved")
self.assertEqual(art_num, 3000237)
self.assertEqual(message_id, "<668929@example.org>")
def test_last(self):
resp, art_num, message_id = self.server.last()
self.assertEqual(resp, "223 3000234 <45223423@example.com> retrieved")
self.assertEqual(art_num, 3000234)
self.assertEqual(message_id, "<45223423@example.com>")
def test_description(self):
desc = self.server.description("comp.lang.python")
self.assertEqual(desc, "The Python computer language.")
desc = self.server.description("comp.lang.pythonx")
self.assertEqual(desc, "")
def test_descriptions(self):
resp, groups = self.server.descriptions("comp.lang.python")
self.assertEqual(resp, '215 Descriptions in form "group description".')
self.assertEqual(groups, {
"comp.lang.python": "The Python computer language.",
})
resp, groups = self.server.descriptions("comp.lang.python*")
self.assertEqual(groups, {
"comp.lang.python": "The Python computer language.",
"comp.lang.python.announce": "Announcements about the Python language. (Moderated)",
})
resp, groups = self.server.descriptions("comp.lang.pythonx")
self.assertEqual(groups, {})
def test_group(self):
resp, count, first, last, group = self.server.group("fr.comp.lang.python")
self.assertTrue(resp.startswith("211 "), resp)
self.assertEqual(first, 761)
self.assertEqual(last, 1265)
self.assertEqual(count, 486)
self.assertEqual(group, "fr.comp.lang.python")
with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
self.server.group("comp.lang.python.devel")
exc = cm.exception
self.assertTrue(exc.response.startswith("411 No such group"),
exc.response)
def test_newnews(self):
# NEWNEWS comp.lang.python [20]100913 082004
dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
resp, ids = self.server.newnews("comp.lang.python", dt)
expected = (
"230 list of newsarticles (NNTP v{0}) "
"created after Mon Sep 13 08:20:04 2010 follows"
).format(self.nntp_version)
self.assertEqual(resp, expected)
self.assertEqual(ids, [
"<a4929a40-6328-491a-aaaf-cb79ed7309a2@q2g2000vbk.googlegroups.com>",
"<f30c0419-f549-4218-848f-d7d0131da931@y3g2000vbm.googlegroups.com>",
])
# NEWNEWS fr.comp.lang.python [20]100913 082004
dt = datetime.datetime(2010, 9, 13, 8, 20, 4)
resp, ids = self.server.newnews("fr.comp.lang.python", dt)
self.assertEqual(resp, "230 An empty list of newsarticles follows")
self.assertEqual(ids, [])
def _check_article_body(self, lines):
self.assertEqual(len(lines), 4)
self.assertEqual(lines[-1].decode('utf8'), "-- Signed by André.")
self.assertEqual(lines[-2], b"")
self.assertEqual(lines[-3], b".Here is a dot-starting line.")
self.assertEqual(lines[-4], b"This is just a test article.")
def _check_article_head(self, lines):
self.assertEqual(len(lines), 4)
self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>')
self.assertEqual(lines[3], b"Message-ID: <i.am.an.article.you.will.want@example.com>")
def _check_article_data(self, lines):
self.assertEqual(len(lines), 9)
self._check_article_head(lines[:4])
self._check_article_body(lines[-4:])
self.assertEqual(lines[4], b"")
def test_article(self):
# ARTICLE
resp, info = self.server.article()
self.assertEqual(resp, "220 3000237 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 3000237)
self.assertEqual(message_id, "<45223423@example.com>")
self._check_article_data(lines)
# ARTICLE num
resp, info = self.server.article(3000234)
self.assertEqual(resp, "220 3000234 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 3000234)
self.assertEqual(message_id, "<45223423@example.com>")
self._check_article_data(lines)
# ARTICLE id
resp, info = self.server.article("<45223423@example.com>")
self.assertEqual(resp, "220 0 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 0)
self.assertEqual(message_id, "<45223423@example.com>")
self._check_article_data(lines)
# Non-existent id
with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
self.server.article("<non-existent@example.com>")
self.assertEqual(cm.exception.response, "430 No Such Article Found")
def test_article_file(self):
# With a "file" argument
f = io.BytesIO()
resp, info = self.server.article(file=f)
self.assertEqual(resp, "220 3000237 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 3000237)
self.assertEqual(message_id, "<45223423@example.com>")
self.assertEqual(lines, [])
data = f.getvalue()
self.assertTrue(data.startswith(
b'From: "Demo User" <nobody@example.net>\r\n'
b'Subject: I am just a test article\r\n'
), ascii(data))
self.assertTrue(data.endswith(
b'This is just a test article.\r\n'
b'.Here is a dot-starting line.\r\n'
b'\r\n'
b'-- Signed by Andr\xc3\xa9.\r\n'
), ascii(data))
def test_head(self):
# HEAD
resp, info = self.server.head()
self.assertEqual(resp, "221 3000237 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 3000237)
self.assertEqual(message_id, "<45223423@example.com>")
self._check_article_head(lines)
# HEAD num
resp, info = self.server.head(3000234)
self.assertEqual(resp, "221 3000234 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 3000234)
self.assertEqual(message_id, "<45223423@example.com>")
self._check_article_head(lines)
# HEAD id
resp, info = self.server.head("<45223423@example.com>")
self.assertEqual(resp, "221 0 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 0)
self.assertEqual(message_id, "<45223423@example.com>")
self._check_article_head(lines)
# Non-existent id
with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
self.server.head("<non-existent@example.com>")
self.assertEqual(cm.exception.response, "430 No Such Article Found")
def test_body(self):
# BODY
resp, info = self.server.body()
self.assertEqual(resp, "222 3000237 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 3000237)
self.assertEqual(message_id, "<45223423@example.com>")
self._check_article_body(lines)
# BODY num
resp, info = self.server.body(3000234)
self.assertEqual(resp, "222 3000234 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 3000234)
self.assertEqual(message_id, "<45223423@example.com>")
self._check_article_body(lines)
# BODY id
resp, info = self.server.body("<45223423@example.com>")
self.assertEqual(resp, "222 0 <45223423@example.com>")
art_num, message_id, lines = info
self.assertEqual(art_num, 0)
self.assertEqual(message_id, "<45223423@example.com>")
self._check_article_body(lines)
# Non-existent id
with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
self.server.body("<non-existent@example.com>")
self.assertEqual(cm.exception.response, "430 No Such Article Found")
def check_over_xover_resp(self, resp, overviews):
self.assertTrue(resp.startswith("224 "), resp)
self.assertEqual(len(overviews), 3)
art_num, over = overviews[0]
self.assertEqual(art_num, 57)
self.assertEqual(over, {
"from": "Doug Hellmann <doug.hellmann-Re5JQEeQqe8AvxtiuMwx3w@public.gmane.org>",
"subject": "Re: ANN: New Plone book with strong Python (and Zope) themes throughout",
"date": "Sat, 19 Jun 2010 18:04:08 -0400",
"message-id": "<4FD05F05-F98B-44DC-8111-C6009C925F0C@gmail.com>",
"references": "<hvalf7$ort$1@dough.gmane.org>",
":bytes": "7103",
":lines": "16",
"xref": "news.gmane.org gmane.comp.python.authors:57"
})
art_num, over = overviews[2]
self.assertEqual(over["subject"],
"Re: Message d'erreur incompréhensible (par moi)")
def test_xover(self):
resp, overviews = self.server.xover(57, 59)
self.check_over_xover_resp(resp, overviews)
def test_over(self):
# In NNTP "v1", this will fallback on XOVER
resp, overviews = self.server.over((57, 59))
self.check_over_xover_resp(resp, overviews)
sample_post = (
b'From: "Demo User" <nobody@example.net>\r\n'
b'Subject: I am just a test article\r\n'
b'Content-Type: text/plain; charset=UTF-8; format=flowed\r\n'
b'Message-ID: <i.am.an.article.you.will.want@example.com>\r\n'
b'\r\n'
b'This is just a test article.\r\n'
b'.Here is a dot-starting line.\r\n'
b'\r\n'
b'-- Signed by Andr\xc3\xa9.\r\n'
)
def _check_posted_body(self):
# Check the raw body as received by the server
lines = self.handler.posted_body
# One additional line for the "." terminator
self.assertEqual(len(lines), 10)
self.assertEqual(lines[-1], b'.\r\n')
self.assertEqual(lines[-2], b'-- Signed by Andr\xc3\xa9.\r\n')
self.assertEqual(lines[-3], b'\r\n')
self.assertEqual(lines[-4], b'..Here is a dot-starting line.\r\n')
self.assertEqual(lines[0], b'From: "Demo User" <nobody@example.net>\r\n')
def _check_post_ihave_sub(self, func, *args, file_factory):
# First the prepared post with CRLF endings
post = self.sample_post
func_args = args + (file_factory(post),)
self.handler.posted_body = None
resp = func(*func_args)
self._check_posted_body()
# Then the same post with "normal" line endings - they should be
# converted by NNTP.post and NNTP.ihave.
post = self.sample_post.replace(b"\r\n", b"\n")
func_args = args + (file_factory(post),)
self.handler.posted_body = None
resp = func(*func_args)
self._check_posted_body()
return resp
def check_post_ihave(self, func, success_resp, *args):
# With a bytes object
resp = self._check_post_ihave_sub(func, *args, file_factory=bytes)
self.assertEqual(resp, success_resp)
# With a bytearray object
resp = self._check_post_ihave_sub(func, *args, file_factory=bytearray)
self.assertEqual(resp, success_resp)
# With a file object
resp = self._check_post_ihave_sub(func, *args, file_factory=io.BytesIO)
self.assertEqual(resp, success_resp)
# With an iterable of terminated lines
def iterlines(b):
return iter(b.splitlines(True))
resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
self.assertEqual(resp, success_resp)
# With an iterable of non-terminated lines
def iterlines(b):
return iter(b.splitlines(False))
resp = self._check_post_ihave_sub(func, *args, file_factory=iterlines)
self.assertEqual(resp, success_resp)
def test_post(self):
self.check_post_ihave(self.server.post, "240 Article received OK")
self.handler.allow_posting = False
with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
self.server.post(self.sample_post)
self.assertEqual(cm.exception.response,
"440 Posting not permitted")
def test_ihave(self):
self.check_post_ihave(self.server.ihave, "235 Article transferred OK",
"<i.am.an.article.you.will.want@example.com>")
with self.assertRaises(nntplib.NNTPTemporaryError) as cm:
self.server.ihave("<another.message.id>", self.sample_post)
self.assertEqual(cm.exception.response,
"435 Article not wanted")
class NNTPv1Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
"""Tests an NNTP v1 server (no capabilities)."""
nntp_version = 1
handler_class = NNTPv1Handler
def test_caps(self):
caps = self.server.getcapabilities()
self.assertEqual(caps, {})
self.assertEqual(self.server.nntp_version, 1)
class NNTPv2Tests(NNTPv1v2TestsMixin, MockedNNTPTestsMixin, unittest.TestCase):
"""Tests an NNTP v2 server (with capabilities)."""
nntp_version = 2
handler_class = NNTPv2Handler
def test_caps(self):
caps = self.server.getcapabilities()
self.assertEqual(caps, {
'VERSION': ['2'],
'IMPLEMENTATION': ['INN', '2.5.1'],
'AUTHINFO': ['USER'],
'HDR': [],
'LIST': ['ACTIVE', 'ACTIVE.TIMES', 'DISTRIB.PATS',
'HEADERS', 'NEWSGROUPS', 'OVERVIEW.FMT'],
'OVER': [],
'POST': [],
'READER': [],
})
self.assertEqual(self.server.nntp_version, 2)
class MiscTests(unittest.TestCase):
def test_decode_header(self):
def gives(a, b):
self.assertEqual(nntplib.decode_header(a), b)
gives("" , "")
gives("a plain header", "a plain header")
gives(" with extra spaces ", " with extra spaces ")
gives("=?ISO-8859-15?Q?D=E9buter_en_Python?=", "Débuter en Python")
gives("=?utf-8?q?Re=3A_=5Bsqlite=5D_probl=C3=A8me_avec_ORDER_BY_sur_des_cha?="
" =?utf-8?q?=C3=AEnes_de_caract=C3=A8res_accentu=C3=A9es?=",
"Re: [sqlite] problème avec ORDER BY sur des chaînes de caractères accentuées")
gives("Re: =?UTF-8?B?cHJvYmzDqG1lIGRlIG1hdHJpY2U=?=",
"Re: problème de matrice")
# A natively utf-8 header (found in the real world!)
gives("Re: Message d'erreur incompréhensible (par moi)",
"Re: Message d'erreur incompréhensible (par moi)")
def test_parse_overview_fmt(self):
# The minimal (default) response
lines = ["Subject:", "From:", "Date:", "Message-ID:",
"References:", ":bytes", ":lines"]
self.assertEqual(nntplib._parse_overview_fmt(lines),
["subject", "from", "date", "message-id", "references",
":bytes", ":lines"])
# The minimal response using alternative names
lines = ["Subject:", "From:", "Date:", "Message-ID:",
"References:", "Bytes:", "Lines:"]
self.assertEqual(nntplib._parse_overview_fmt(lines),
["subject", "from", "date", "message-id", "references",
":bytes", ":lines"])
# Variations in casing
lines = ["subject:", "FROM:", "DaTe:", "message-ID:",
"References:", "BYTES:", "Lines:"]
self.assertEqual(nntplib._parse_overview_fmt(lines),
["subject", "from", "date", "message-id", "references",
":bytes", ":lines"])
# First example from RFC 3977
lines = ["Subject:", "From:", "Date:", "Message-ID:",
"References:", ":bytes", ":lines", "Xref:full",
"Distribution:full"]
self.assertEqual(nntplib._parse_overview_fmt(lines),
["subject", "from", "date", "message-id", "references",
":bytes", ":lines", "xref", "distribution"])
# Second example from RFC 3977
lines = ["Subject:", "From:", "Date:", "Message-ID:",
"References:", "Bytes:", "Lines:", "Xref:FULL",
"Distribution:FULL"]
self.assertEqual(nntplib._parse_overview_fmt(lines),
["subject", "from", "date", "message-id", "references",
":bytes", ":lines", "xref", "distribution"])
# A classic response from INN
lines = ["Subject:", "From:", "Date:", "Message-ID:",
"References:", "Bytes:", "Lines:", "Xref:full"]
self.assertEqual(nntplib._parse_overview_fmt(lines),
["subject", "from", "date", "message-id", "references",
":bytes", ":lines", "xref"])
def test_parse_overview(self):
fmt = nntplib._DEFAULT_OVERVIEW_FMT + ["xref"]
# First example from RFC 3977
lines = [
'3000234\tI am just a test article\t"Demo User" '
'<nobody@example.com>\t6 Oct 1998 04:38:40 -0500\t'
'<45223423@example.com>\t<45454@example.net>\t1234\t'
'17\tXref: news.example.com misc.test:3000363',
]
overview = nntplib._parse_overview(lines, fmt)
(art_num, fields), = overview
self.assertEqual(art_num, 3000234)
self.assertEqual(fields, {
'subject': 'I am just a test article',
'from': '"Demo User" <nobody@example.com>',
'date': '6 Oct 1998 04:38:40 -0500',
'message-id': '<45223423@example.com>',
'references': '<45454@example.net>',
':bytes': '1234',
':lines': '17',
'xref': 'news.example.com misc.test:3000363',
})
def test_parse_datetime(self):
def gives(a, b, *c):
self.assertEqual(nntplib._parse_datetime(a, b),
datetime.datetime(*c))
# Output of DATE command
gives("19990623135624", None, 1999, 6, 23, 13, 56, 24)
# Variations
gives("19990623", "135624", 1999, 6, 23, 13, 56, 24)
gives("990623", "135624", 1999, 6, 23, 13, 56, 24)
gives("090623", "135624", 2009, 6, 23, 13, 56, 24)
def test_unparse_datetime(self):
# Test non-legacy mode
# 1) with a datetime
def gives(y, M, d, h, m, s, date_str, time_str):
dt = datetime.datetime(y, M, d, h, m, s)
self.assertEqual(nntplib._unparse_datetime(dt),
(date_str, time_str))
self.assertEqual(nntplib._unparse_datetime(dt, False),
(date_str, time_str))
gives(1999, 6, 23, 13, 56, 24, "19990623", "135624")
gives(2000, 6, 23, 13, 56, 24, "20000623", "135624")
gives(2010, 6, 5, 1, 2, 3, "20100605", "010203")
# 2) with a date
def gives(y, M, d, date_str, time_str):
dt = datetime.date(y, M, d)
self.assertEqual(nntplib._unparse_datetime(dt),
(date_str, time_str))
self.assertEqual(nntplib._unparse_datetime(dt, False),
(date_str, time_str))
gives(1999, 6, 23, "19990623", "000000")
gives(2000, 6, 23, "20000623", "000000")
gives(2010, 6, 5, "20100605", "000000")
def test_unparse_datetime_legacy(self):
# Test legacy mode (RFC 977)
# 1) with a datetime
def gives(y, M, d, h, m, s, date_str, time_str):
dt = datetime.datetime(y, M, d, h, m, s)
self.assertEqual(nntplib._unparse_datetime(dt, True),
(date_str, time_str))
gives(1999, 6, 23, 13, 56, 24, "990623", "135624")
gives(2000, 6, 23, 13, 56, 24, "000623", "135624")
gives(2010, 6, 5, 1, 2, 3, "100605", "010203")
# 2) with a date
def gives(y, M, d, date_str, time_str):
dt = datetime.date(y, M, d)
self.assertEqual(nntplib._unparse_datetime(dt, True),
(date_str, time_str))
gives(1999, 6, 23, "990623", "000000")
gives(2000, 6, 23, "000623", "000000")
gives(2010, 6, 5, "100605", "000000")
def test_main():
support.run_unittest(MiscTests, NNTPv1Tests, NNTPv2Tests,
NetworkedNNTPTests
)
if __name__ == "__main__":
test_main()
......@@ -76,6 +76,10 @@ Core and Builtins
Library
-------
- Issue #9360: Cleanup and improvements to the nntplib module. The API
now conforms to the philosophy of bytes and unicode separation in Python 3.
A test suite has also been added.
- Issue #9962: GzipFile now has the peek() method.
- Issue #9090: When a socket with a timeout fails with EWOULDBLOCK or EAGAIN,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment