asynchat.py 11.8 KB
Newer Older
1
# -*- Mode: Python; tab-width: 4 -*-
Tim Peters's avatar
Tim Peters committed
2
#       Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
Tim Peters's avatar
Tim Peters committed
3
#       Author: Sam Rushing <rushing@nightmare.com>
4 5 6

# ======================================================================
# Copyright 1996 by Sam Rushing
Tim Peters's avatar
Tim Peters committed
7
#
8
#                         All Rights Reserved
Tim Peters's avatar
Tim Peters committed
9
#
10 11 12 13 14 15 16 17
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
Tim Peters's avatar
Tim Peters committed
18
#
19 20 21 22 23 24 25 26 27
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================

28
r"""A class supporting chat-style (command/response) protocols.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47

This class adds support for 'chat' style protocols - where one side
sends a 'command', and the other sends a response (examples would be
the common internet protocols - smtp, nntp, ftp, etc..).

The handle_read() method looks at the input stream for the current
'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
for multi-line output), calling self.found_terminator() on its
receipt.

for example:
Say you build an async nntp client using this class.  At the start
of the connection, you'll have self.terminator set to '\r\n', in
order to process the single-line greeting.  Just before issuing a
'LIST' command you'll set it to '\r\n.\r\n'.  The output of the LIST
command will be accumulated (using your own 'collect_incoming_data'
method) up to the terminator, and then control will be returned to
you - by calling your self.found_terminator() method.
"""
48 49
import socket
import asyncore
50
from collections import deque
51

52 53 54 55 56 57 58 59 60 61 62 63 64
def buffer(obj, start=None, stop=None):
    # if memoryview objects gain slicing semantics,
    # this function will change for the better
    # memoryview used for the TypeError
    memoryview(obj)
    if start == None:
        start = 0
    if stop == None:
        stop = len(obj)
    x = obj[start:stop]
    ## print("buffer type is: %s"%(type(x),))
    return x

65
class async_chat (asyncore.dispatcher):
Tim Peters's avatar
Tim Peters committed
66 67 68 69 70 71 72 73
    """This is an abstract class.  You must derive from this class, and add
    the two methods collect_incoming_data() and found_terminator()"""

    # these are overridable defaults

    ac_in_buffer_size       = 4096
    ac_out_buffer_size      = 4096

74 75 76 77 78 79
    # we don't want to enable the use of encoding by default, because that is a
    # sign of an application bug that we don't want to pass silently

    use_encoding            = 0
    encoding                = 'latin1'

80
    def __init__ (self, sock=None, map=None):
81
        # for string terminator matching
82
        self.ac_in_buffer = b''
83 84 85 86 87 88 89 90 91 92 93 94

        # we use a list here rather than cStringIO for a few reasons...
        # del lst[:] is faster than sio.truncate(0)
        # lst = [] is faster than sio.truncate(0)
        # cStringIO will be gaining unicode support in py3k, which
        # will negatively affect the performance of bytes compared to
        # a ''.join() equivalent
        self.incoming = []

        # we toss the use of the "simple producer" and replace it with
        # a pure deque, which the original fifo was a wrapping of
        self.producer_fifo = deque()
95
        asyncore.dispatcher.__init__ (self, sock, map)
Tim Peters's avatar
Tim Peters committed
96

97
    def collect_incoming_data(self, data):
98
        raise NotImplementedError("must be implemented in subclass")
Tim Peters's avatar
Tim Peters committed
99

100 101 102 103 104 105 106 107
    def _collect_incoming_data(self, data):
        self.incoming.append(data)

    def _get_data(self):
        d = b''.join(self.incoming)
        del self.incoming[:]
        return d

108
    def found_terminator(self):
109
        raise NotImplementedError("must be implemented in subclass")
Tim Peters's avatar
Tim Peters committed
110

Tim Peters's avatar
Tim Peters committed
111 112
    def set_terminator (self, term):
        "Set the input delimiter.  Can be a fixed string of any length, an integer, or None"
113 114
        if isinstance(term, str) and self.use_encoding:
            term = bytes(term, self.encoding)
Tim Peters's avatar
Tim Peters committed
115 116 117 118 119 120 121 122 123 124 125 126 127 128
        self.terminator = term

    def get_terminator (self):
        return self.terminator

    # grab some more data from the socket,
    # throw it to the collector method,
    # check for the terminator,
    # if found, transition to the next state.

    def handle_read (self):

        try:
            data = self.recv (self.ac_in_buffer_size)
129
        except socket.error as why:
Tim Peters's avatar
Tim Peters committed
130 131 132
            self.handle_error()
            return

133 134 135
        if isinstance(data, str) and self.use_encoding:
            data = bytes(str, self.encoding)
        self.ac_in_buffer = self.ac_in_buffer + data
Tim Peters's avatar
Tim Peters committed
136 137 138 139

        # Continue to search for self.terminator in self.ac_in_buffer,
        # while calling self.collect_incoming_data.  The while loop
        # is necessary because we might read several data+terminator
140
        # combos with a single recv(4096).
Tim Peters's avatar
Tim Peters committed
141 142 143 144

        while self.ac_in_buffer:
            lb = len(self.ac_in_buffer)
            terminator = self.get_terminator()
145
            if not terminator:
Tim Peters's avatar
Tim Peters committed
146 147
                # no terminator, collect it all
                self.collect_incoming_data (self.ac_in_buffer)
148
                self.ac_in_buffer = b''
149
            elif isinstance(terminator, int):
Tim Peters's avatar
Tim Peters committed
150 151 152 153
                # numeric terminator
                n = terminator
                if lb < n:
                    self.collect_incoming_data (self.ac_in_buffer)
154
                    self.ac_in_buffer = b''
Tim Peters's avatar
Tim Peters committed
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
                    self.terminator = self.terminator - lb
                else:
                    self.collect_incoming_data (self.ac_in_buffer[:n])
                    self.ac_in_buffer = self.ac_in_buffer[n:]
                    self.terminator = 0
                    self.found_terminator()
            else:
                # 3 cases:
                # 1) end of buffer matches terminator exactly:
                #    collect data, transition
                # 2) end of buffer matches some prefix:
                #    collect data to the prefix
                # 3) end of buffer does not match any prefix:
                #    collect data
                terminator_len = len(terminator)
170
                index = self.ac_in_buffer.find(terminator)
Tim Peters's avatar
Tim Peters committed
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
                if index != -1:
                    # we found the terminator
                    if index > 0:
                        # don't bother reporting the empty string (source of subtle bugs)
                        self.collect_incoming_data (self.ac_in_buffer[:index])
                    self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
                    # This does the Right Thing if the terminator is changed here.
                    self.found_terminator()
                else:
                    # check for a prefix of the terminator
                    index = find_prefix_at_end (self.ac_in_buffer, terminator)
                    if index:
                        if index != lb:
                            # we found a prefix, collect up to the prefix
                            self.collect_incoming_data (self.ac_in_buffer[:-index])
                            self.ac_in_buffer = self.ac_in_buffer[-index:]
                        break
                    else:
                        # no prefix, collect it all
                        self.collect_incoming_data (self.ac_in_buffer)
191
                        self.ac_in_buffer = b''
Tim Peters's avatar
Tim Peters committed
192 193

    def handle_write (self):
194
        self.initiate_send()
Tim Peters's avatar
Tim Peters committed
195 196 197 198 199

    def handle_close (self):
        self.close()

    def push (self, data):
200 201 202 203 204 205
        sabs = self.ac_out_buffer_size
        if len(data) > sabs:
            for i in range(0, len(data), sabs):
                self.producer_fifo.append(data[i:i+sabs])
        else:
            self.producer_fifo.append(data)
Tim Peters's avatar
Tim Peters committed
206 207 208
        self.initiate_send()

    def push_with_producer (self, producer):
209
        self.producer_fifo.append(producer)
Tim Peters's avatar
Tim Peters committed
210 211 212 213
        self.initiate_send()

    def readable (self):
        "predicate for inclusion in the readable for select()"
214 215 216 217 218
        # cannot use the old predicate, it violates the claim of the
        # set_terminator method.

        # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
        return 1
Tim Peters's avatar
Tim Peters committed
219 220 221

    def writable (self):
        "predicate for inclusion in the writable for select()"
222
        return self.producer_fifo or (not self.connected)
Tim Peters's avatar
Tim Peters committed
223 224 225

    def close_when_done (self):
        "automatically close this channel once the outgoing queue is empty"
226 227 228 229 230 231 232 233 234 235 236
        self.producer_fifo.append(None)

    def initiate_send(self):
        while self.producer_fifo and self.connected:
            first = self.producer_fifo[0]
            # handle empty string/buffer or None entry
            if not first:
                del self.producer_fifo[0]
                if first is None:
                    ## print("first is None")
                    self.handle_close()
Tim Peters's avatar
Tim Peters committed
237
                    return
238 239 240 241 242 243 244 245
                ## print("first is not None")

            # handle classic producer behavior
            obs = self.ac_out_buffer_size
            try:
                data = buffer(first, 0, obs)
            except TypeError:
                data = first.more()
Tim Peters's avatar
Tim Peters committed
246
                if data:
247
                    self.producer_fifo.appendleft(data)
Tim Peters's avatar
Tim Peters committed
248
                else:
249 250
                    del self.producer_fifo[0]
                continue
Tim Peters's avatar
Tim Peters committed
251

252 253
            if isinstance(data, str) and self.use_encoding:
                data = bytes(data, self.encoding)
Tim Peters's avatar
Tim Peters committed
254

255
            # send the data
Tim Peters's avatar
Tim Peters committed
256
            try:
257 258
                num_sent = self.send(data)
            except socket.error:
Tim Peters's avatar
Tim Peters committed
259 260 261
                self.handle_error()
                return

262 263 264 265 266 267 268 269
            if num_sent:
                if num_sent < len(data) or obs < len(first):
                    self.producer_fifo[0] = first[num_sent:]
                else:
                    del self.producer_fifo[0]
            # we tried to send some actual data
            return

Tim Peters's avatar
Tim Peters committed
270 271
    def discard_buffers (self):
        # Emergencies only!
272
        self.ac_in_buffer = b''
273 274
        del self.incoming[:]
        self.producer_fifo.clear()
275

276
class simple_producer:
Guido van Rossum's avatar
Guido van Rossum committed
277

Tim Peters's avatar
Tim Peters committed
278 279 280
    def __init__ (self, data, buffer_size=512):
        self.data = data
        self.buffer_size = buffer_size
281

Tim Peters's avatar
Tim Peters committed
282 283 284 285 286 287 288
    def more (self):
        if len (self.data) > self.buffer_size:
            result = self.data[:self.buffer_size]
            self.data = self.data[self.buffer_size:]
            return result
        else:
            result = self.data
289
            self.data = b''
Tim Peters's avatar
Tim Peters committed
290
            return result
291 292

class fifo:
Tim Peters's avatar
Tim Peters committed
293 294
    def __init__ (self, list=None):
        if not list:
295
            self.list = deque()
Tim Peters's avatar
Tim Peters committed
296
        else:
297
            self.list = deque(list)
Tim Peters's avatar
Tim Peters committed
298 299 300 301 302

    def __len__ (self):
        return len(self.list)

    def is_empty (self):
303
        return not self.list
Tim Peters's avatar
Tim Peters committed
304 305

    def first (self):
306
        return self.list[0]
Tim Peters's avatar
Tim Peters committed
307 308

    def push (self, data):
309
        self.list.append(data)
Tim Peters's avatar
Tim Peters committed
310 311 312

    def pop (self):
        if self.list:
313
            return (1, self.list.popleft())
Tim Peters's avatar
Tim Peters committed
314 315
        else:
            return (0, None)
316 317 318 319 320 321 322

# Given 'haystack', see if any prefix of 'needle' is at its end.  This
# assumes an exact match has already been checked.  Return the number of
# characters matched.
# for example:
# f_p_a_e ("qwerty\r", "\r\n") => 1
# f_p_a_e ("qwertydkjf", "\r\n") => 0
323
# f_p_a_e ("qwerty\r\n", "\r\n") => <undefined>
324 325

# this could maybe be made faster with a computed regex?
326
# [answer: no; circa Python-2.0, Jan 2001]
327 328
# new python:   28961/s
# old python:   18307/s
329 330
# re:        12820/s
# regex:     14035/s
331 332

def find_prefix_at_end (haystack, needle):
Tim Peters's avatar
Tim Peters committed
333 334 335 336
    l = len(needle) - 1
    while l and not haystack.endswith(needle[:l]):
        l -= 1
    return l