Commit d74900eb authored by Josiah Carlson's avatar Josiah Carlson

Committing Py3k version of changelist 64080 and 64257, along with updated tests

for smtpd, which required updating with the new semantics.
parent d51ee54a
......@@ -81,6 +81,12 @@ connection requests.
:exc:`NotImplementedError` exception.
.. method:: async_chat._collect_incoming_data(data)
Sample implementation of a data collection rutine to be used in conjunction
with :meth:`_get_data` in a user-specified :meth:`found_terminator`.
.. method:: async_chat.discard_buffers()
In emergencies this method will discard any data held in the input and/or
......@@ -95,6 +101,12 @@ connection requests.
should be available via an instance attribute.
.. method:: async_chat._get_data()
Will return and clear the data received with the sample
:meth:`_collect_incoming_data` implementation.
.. method:: async_chat.get_terminator()
Returns the current terminator for the channel.
......
......@@ -222,6 +222,21 @@ any that have been added to the map during asynchronous service) is closed.
flushed). Sockets are automatically closed when they are
garbage-collected.
.. class:: file_dispatcher()
A file_dispatcher takes a file descriptor or file object along with an
optional map argument and wraps it for use with the :cfunc:`poll` or
:cfunc:`loop` functions. If provided a file object or anything with a
:cfunc:`fileno` method, that method will be called and passed to the
:class:`file_wrapper` constructor. Availability: UNIX.
.. class:: file_wrapper()
A file_wrapper takes an integer file descriptor and calls :func:`os.dup` to
duplicate the handle so that the original handle may be closed independently
of the file_wrapper. This class implements sufficient methods to emulate a
socket for use by the :class:`file_dispatcher` class. Availability: UNIX.
.. _asyncore-example:
......
......@@ -45,12 +45,23 @@ command will be accumulated (using your own 'collect_incoming_data'
method) up to the terminator, and then control will be returned to
you - by calling your self.found_terminator() method.
"""
import sys
import socket
import asyncore
from collections import deque
def buffer(obj, start=None, stop=None):
# if memoryview objects gain slicing semantics,
# this function will change for the better
# memoryview used for the TypeError
memoryview(obj)
if start == None:
start = 0
if stop == None:
stop = len(obj)
x = obj[start:stop]
## print("buffer type is: %s"%(type(x),))
return x
class async_chat (asyncore.dispatcher):
"""This is an abstract class. You must derive from this class, and add
the two methods collect_incoming_data() and found_terminator()"""
......@@ -60,20 +71,47 @@ class async_chat (asyncore.dispatcher):
ac_in_buffer_size = 4096
ac_out_buffer_size = 4096
# we don't want to enable the use of encoding by default, because that is a
# sign of an application bug that we don't want to pass silently
use_encoding = 0
encoding = 'latin1'
def __init__ (self, conn=None):
# for string terminator matching
self.ac_in_buffer = b''
self.ac_out_buffer = b''
self.producer_fifo = fifo()
# we use a list here rather than cStringIO for a few reasons...
# del lst[:] is faster than sio.truncate(0)
# lst = [] is faster than sio.truncate(0)
# cStringIO will be gaining unicode support in py3k, which
# will negatively affect the performance of bytes compared to
# a ''.join() equivalent
self.incoming = []
# we toss the use of the "simple producer" and replace it with
# a pure deque, which the original fifo was a wrapping of
self.producer_fifo = deque()
asyncore.dispatcher.__init__ (self, conn)
def collect_incoming_data(self, data):
raise NotImplementedError("must be implemented in subclass")
def _collect_incoming_data(self, data):
self.incoming.append(data)
def _get_data(self):
d = b''.join(self.incoming)
del self.incoming[:]
return d
def found_terminator(self):
raise NotImplementedError("must be implemented in subclass")
def set_terminator (self, term):
"Set the input delimiter. Can be a fixed string of any length, an integer, or None"
if isinstance(term, str) and self.use_encoding:
term = bytes(term, self.encoding)
self.terminator = term
def get_terminator (self):
......@@ -92,14 +130,14 @@ class async_chat (asyncore.dispatcher):
self.handle_error()
return
if isinstance(data, str):
data = data.encode('ascii')
self.ac_in_buffer = self.ac_in_buffer + bytes(data)
if isinstance(data, str) and self.use_encoding:
data = bytes(str, self.encoding)
self.ac_in_buffer = self.ac_in_buffer + data
# Continue to search for self.terminator in self.ac_in_buffer,
# while calling self.collect_incoming_data. The while loop
# is necessary because we might read several data+terminator
# combos with a single recv(1024).
# combos with a single recv(4096).
while self.ac_in_buffer:
lb = len(self.ac_in_buffer)
......@@ -108,7 +146,7 @@ class async_chat (asyncore.dispatcher):
# no terminator, collect it all
self.collect_incoming_data (self.ac_in_buffer)
self.ac_in_buffer = b''
elif isinstance(terminator, int) or isinstance(terminator, int):
elif isinstance(terminator, int):
# numeric terminator
n = terminator
if lb < n:
......@@ -129,8 +167,6 @@ class async_chat (asyncore.dispatcher):
# 3) end of buffer does not match any prefix:
# collect data
terminator_len = len(terminator)
if isinstance(terminator, str):
terminator = terminator.encode('ascii')
index = self.ac_in_buffer.find(terminator)
if index != -1:
# we found the terminator
......@@ -155,91 +191,87 @@ class async_chat (asyncore.dispatcher):
self.ac_in_buffer = b''
def handle_write (self):
self.initiate_send ()
self.initiate_send()
def handle_close (self):
self.close()
def push (self, data):
self.producer_fifo.push (simple_producer (data))
sabs = self.ac_out_buffer_size
if len(data) > sabs:
for i in range(0, len(data), sabs):
self.producer_fifo.append(data[i:i+sabs])
else:
self.producer_fifo.append(data)
self.initiate_send()
def push_with_producer (self, producer):
self.producer_fifo.push (producer)
self.producer_fifo.append(producer)
self.initiate_send()
def readable (self):
"predicate for inclusion in the readable for select()"
return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
# cannot use the old predicate, it violates the claim of the
# set_terminator method.
# return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
return 1
def writable (self):
"predicate for inclusion in the writable for select()"
# return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected)
# this is about twice as fast, though not as clear.
return not (
(self.ac_out_buffer == b'') and
self.producer_fifo.is_empty() and
self.connected
)
return self.producer_fifo or (not self.connected)
def close_when_done (self):
"automatically close this channel once the outgoing queue is empty"
self.producer_fifo.push (None)
# refill the outgoing buffer by calling the more() method
# of the first producer in the queue
def refill_buffer (self):
while 1:
if len(self.producer_fifo):
p = self.producer_fifo.first()
# a 'None' in the producer fifo is a sentinel,
# telling us to close the channel.
if p is None:
if not self.ac_out_buffer:
self.producer_fifo.pop()
self.close()
return
elif isinstance(p, str) or isinstance(p, bytes):
if isinstance(p, str):
p = p.encode('ascii')
self.producer_fifo.pop()
self.ac_out_buffer = self.ac_out_buffer + p
self.producer_fifo.append(None)
def initiate_send(self):
while self.producer_fifo and self.connected:
first = self.producer_fifo[0]
# handle empty string/buffer or None entry
if not first:
del self.producer_fifo[0]
if first is None:
## print("first is None")
self.handle_close()
return
data = p.more()
## print("first is not None")
# handle classic producer behavior
obs = self.ac_out_buffer_size
try:
data = buffer(first, 0, obs)
except TypeError:
data = first.more()
if data:
if isinstance(data, str):
data = data.encode('ascii')
self.ac_out_buffer = self.ac_out_buffer + bytes(data)
return
self.producer_fifo.appendleft(data)
else:
self.producer_fifo.pop()
else:
return
del self.producer_fifo[0]
continue
def initiate_send (self):
obs = self.ac_out_buffer_size
# try to refill the buffer
if (len (self.ac_out_buffer) < obs):
self.refill_buffer()
if isinstance(data, str) and self.use_encoding:
data = bytes(data, self.encoding)
if self.ac_out_buffer and self.connected:
# try to send the buffer
# send the data
try:
num_sent = self.send (self.ac_out_buffer[:obs])
if num_sent:
self.ac_out_buffer = self.ac_out_buffer[num_sent:]
except socket.error as why:
num_sent = self.send(data)
except socket.error:
self.handle_error()
return
if num_sent:
if num_sent < len(data) or obs < len(first):
self.producer_fifo[0] = first[num_sent:]
else:
del self.producer_fifo[0]
# we tried to send some actual data
return
def discard_buffers (self):
# Emergencies only!
self.ac_in_buffer = b''
self.ac_out_buffer = b''
while self.producer_fifo:
self.producer_fifo.pop()
del self.incoming[:]
self.producer_fifo.clear()
class simple_producer:
......
This diff is collapsed.
......@@ -124,11 +124,11 @@ class SMTPChannel(asynchat.async_chat):
self.__peer = conn.getpeername()
print('Peer:', repr(self.__peer), file=DEBUGSTREAM)
self.push('220 %s %s' % (self.__fqdn, __version__))
self.set_terminator('\r\n')
self.set_terminator(b'\r\n')
# Overrides base class for convenience
def push(self, msg):
asynchat.async_chat.push(self, msg + '\r\n')
asynchat.async_chat.push(self, bytes(msg + '\r\n', 'ascii'))
# Implementation of base class abstract method
def collect_incoming_data(self, data):
......@@ -177,7 +177,7 @@ class SMTPChannel(asynchat.async_chat):
self.__rcpttos = []
self.__mailfrom = None
self.__state = self.COMMAND
self.set_terminator('\r\n')
self.set_terminator(b'\r\n')
if not status:
self.push('250 Ok')
else:
......@@ -264,7 +264,7 @@ class SMTPChannel(asynchat.async_chat):
self.push('501 Syntax: DATA')
return
self.__state = self.DATA
self.set_terminator('\r\n.\r\n')
self.set_terminator(b'\r\n.\r\n')
self.push('354 End data with <CR><LF>.<CR><LF>')
......
......@@ -105,8 +105,8 @@ class TestAsynchat(unittest.TestCase):
time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term, s.port)
c.push(b"hello ")
c.push(bytes("world%s" % term, "ascii"))
c.push(bytes("I'm not dead yet!%s" % term, "ascii"))
c.push(b"world" + term)
c.push(b"I'm not dead yet!" + term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
......@@ -120,17 +120,17 @@ class TestAsynchat(unittest.TestCase):
def test_line_terminator1(self):
# test one-character terminator
for l in (1,2,3):
self.line_terminator_check('\n', l)
self.line_terminator_check(b'\n', l)
def test_line_terminator2(self):
# test two-character terminator
for l in (1,2,3):
self.line_terminator_check('\r\n', l)
self.line_terminator_check(b'\r\n', l)
def test_line_terminator3(self):
# test three-character terminator
for l in (1,2,3):
self.line_terminator_check('qqq', l)
self.line_terminator_check(b'qqq', l)
def numeric_terminator_check(self, termlen):
# Try reading a fixed number of bytes
......@@ -190,7 +190,7 @@ class TestAsynchat(unittest.TestCase):
# checks that empty lines are handled correctly
s, event = start_echo_server()
c = echo_client(b'\n', s.port)
c.push("hello world\n\nI'm not dead yet!\n")
c.push(b"hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
......@@ -201,7 +201,7 @@ class TestAsynchat(unittest.TestCase):
def test_close_when_done(self):
s, event = start_echo_server()
c = echo_client(b'\n', s.port)
c.push("hello world\nI'm not dead yet!\n")
c.push(b"hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
......
......@@ -28,6 +28,9 @@ class dummychannel:
def __init__(self):
self.socket = dummysocket()
def close(self):
self.socket.close()
class exitingdummy:
def __init__(self):
pass
......@@ -382,8 +385,8 @@ if hasattr(asyncore, 'file_wrapper'):
fd = os.open(TESTFN, os.O_RDONLY)
w = asyncore.file_wrapper(fd)
self.assertEqual(w.fd, fd)
self.assertEqual(w.fileno(), fd)
self.assertNotEqual(w.fd, fd)
self.assertNotEqual(w.fileno(), fd)
self.assertEqual(w.recv(13), b"It's not dead")
self.assertEqual(w.read(6), b", it's")
w.close()
......
......@@ -14,6 +14,14 @@ from test import support
HOST = support.HOST
if sys.platform == 'darwin':
# select.poll returns a select.POLLHUP at the end of the tests
# on darwin, so just ignore it
def handle_expt(self):
pass
smtpd.SMTPChannel.handle_expt = handle_expt
def server(evt, buf, serv):
serv.listen(5)
evt.set()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment