test_asynchat.py 10.9 KB
Newer Older
1 2 3 4 5 6
# test asynchat

from test import support

# If this fails, the test will be skipped.
thread = support.import_module('_thread')
7

Victor Stinner's avatar
Victor Stinner committed
8 9
import asynchat
import asyncore
10
import errno
Victor Stinner's avatar
Victor Stinner committed
11
import socket
12
import sys
Victor Stinner's avatar
Victor Stinner committed
13 14
import time
import unittest
15
import unittest.mock
16 17 18 19
try:
    import threading
except ImportError:
    threading = None
20

21
HOST = support.HOST
22
SERVER_QUIT = b'QUIT\n'
23
TIMEOUT = 3.0
24

25 26 27 28 29 30 31 32 33 34 35
if threading:
    class echo_server(threading.Thread):
        # parameter to determine the number of bytes passed back to the
        # client each send
        chunk_size = 1

        def __init__(self, event):
            threading.Thread.__init__(self)
            self.event = event
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.port = support.bind_port(self.sock)
Victor Stinner's avatar
Victor Stinner committed
36 37
            # This will be set if the client wants us to wait before echoing
            # data back.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
            self.start_resend_event = None

        def run(self):
            self.sock.listen(1)
            self.event.set()
            conn, client = self.sock.accept()
            self.buffer = b""
            # collect data until quit message is seen
            while SERVER_QUIT not in self.buffer:
                data = conn.recv(1)
                if not data:
                    break
                self.buffer = self.buffer + data

            # remove the SERVER_QUIT message
            self.buffer = self.buffer.replace(SERVER_QUIT, b'')

            if self.start_resend_event:
                self.start_resend_event.wait()

            # re-send entire set of collected data
            try:
Victor Stinner's avatar
Victor Stinner committed
60 61
                # this may fail on some tests, such as test_close_when_done,
                # since the client closes the channel when it's done sending
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
                while self.buffer:
                    n = conn.send(self.buffer[:self.chunk_size])
                    time.sleep(0.001)
                    self.buffer = self.buffer[n:]
            except:
                pass

            conn.close()
            self.sock.close()

    class echo_client(asynchat.async_chat):

        def __init__(self, terminator, server_port):
            asynchat.async_chat.__init__(self)
            self.contents = []
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
            self.connect((HOST, server_port))
            self.set_terminator(terminator)
            self.buffer = b""

            def handle_connect(self):
                pass

            if sys.platform == 'darwin':
                # select.poll returns a select.POLLHUP at the end of the tests
                # on darwin, so just ignore it
                def handle_expt(self):
                    pass

        def collect_incoming_data(self, data):
            self.buffer += data

        def found_terminator(self):
            self.contents.append(self.buffer)
            self.buffer = b""

    def start_echo_server():
        event = threading.Event()
        s = echo_server(event)
        s.start()
        event.wait()
        event.clear()
Victor Stinner's avatar
Victor Stinner committed
104
        time.sleep(0.01)   # Give server time to start accepting.
105 106 107 108
        return s, event


@unittest.skipUnless(threading, 'Threading required for this test.')
109
class TestAsynchat(unittest.TestCase):
110 111
    usepoll = False

Victor Stinner's avatar
Victor Stinner committed
112
    def setUp(self):
113
        self._threads = support.threading_setup()
114

Victor Stinner's avatar
Victor Stinner committed
115
    def tearDown(self):
116
        support.threading_cleanup(*self._threads)
117

118
    def line_terminator_check(self, term, server_chunk):
119 120
        event = threading.Event()
        s = echo_server(event)
121 122
        s.chunk_size = server_chunk
        s.start()
123 124
        event.wait()
        event.clear()
Victor Stinner's avatar
Victor Stinner committed
125
        time.sleep(0.01)   # Give server time to start accepting.
Christian Heimes's avatar
Christian Heimes committed
126
        c = echo_client(term, s.port)
127
        c.push(b"hello ")
128 129
        c.push(b"world" + term)
        c.push(b"I'm not dead yet!" + term)
130 131
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
132 133 134
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
135 136 137 138 139 140 141 142 143

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    # the line terminator tests below check receiving variously-sized
    # chunks back from the server in order to exercise all branches of
    # async_chat.handle_read

    def test_line_terminator1(self):
        # test one-character terminator
Victor Stinner's avatar
Victor Stinner committed
144
        for l in (1, 2, 3):
145
            self.line_terminator_check(b'\n', l)
146 147 148

    def test_line_terminator2(self):
        # test two-character terminator
Victor Stinner's avatar
Victor Stinner committed
149
        for l in (1, 2, 3):
150
            self.line_terminator_check(b'\r\n', l)
151 152 153

    def test_line_terminator3(self):
        # test three-character terminator
Victor Stinner's avatar
Victor Stinner committed
154
        for l in (1, 2, 3):
155
            self.line_terminator_check(b'qqq', l)
156 157 158

    def numeric_terminator_check(self, termlen):
        # Try reading a fixed number of bytes
159
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
160
        c = echo_client(termlen, s.port)
161 162 163 164
        data = b"hello world, I'm not dead yet!\n"
        c.push(data)
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
165 166 167
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
168

169
        self.assertEqual(c.contents, [data[:termlen]])
170

171 172 173 174 175 176 177 178 179
    def test_numeric_terminator1(self):
        # check that ints & longs both work (since type is
        # explicitly checked in async_chat.handle_read)
        self.numeric_terminator_check(1)

    def test_numeric_terminator2(self):
        self.numeric_terminator_check(6)

    def test_none_terminator(self):
180
        # Try reading a fixed number of bytes
181
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
182
        c = echo_client(None, s.port)
183 184 185 186
        data = b"hello world, I'm not dead yet!\n"
        c.push(data)
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
187 188 189
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
190 191 192 193 194

        self.assertEqual(c.contents, [])
        self.assertEqual(c.buffer, data)

    def test_simple_producer(self):
195
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
196
        c = echo_client(b'\n', s.port)
197 198 199 200
        data = b"hello world\nI'm not dead yet!\n"
        p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
        c.push_with_producer(p)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
201 202 203
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
204 205 206 207

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    def test_string_producer(self):
208
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
209
        c = echo_client(b'\n', s.port)
210 211 212
        data = b"hello world\nI'm not dead yet!\n"
        c.push_with_producer(data+SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
213 214 215
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
216 217 218 219 220

        self.assertEqual(c.contents, [b"hello world", b"I'm not dead yet!"])

    def test_empty_line(self):
        # checks that empty lines are handled correctly
221
        s, event = start_echo_server()
Christian Heimes's avatar
Christian Heimes committed
222
        c = echo_client(b'\n', s.port)
223
        c.push(b"hello world\n\nI'm not dead yet!\n")
224 225
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
226 227 228
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
229 230 231 232 233

        self.assertEqual(c.contents,
                         [b"hello world", b"", b"I'm not dead yet!"])

    def test_close_when_done(self):
234
        s, event = start_echo_server()
235
        s.start_resend_event = threading.Event()
Christian Heimes's avatar
Christian Heimes committed
236
        c = echo_client(b'\n', s.port)
237
        c.push(b"hello world\nI'm not dead yet!\n")
238 239 240
        c.push(SERVER_QUIT)
        c.close_when_done()
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
241 242 243 244 245 246

        # Only allow the server to start echoing data back to the client after
        # the client has closed its connection.  This prevents a race condition
        # where the server echoes all of its data before we can check that it
        # got any down below.
        s.start_resend_event.set()
247 248 249
        s.join(timeout=TIMEOUT)
        if s.is_alive():
            self.fail("join() timed out")
250

251 252 253 254
        self.assertEqual(c.contents, [])
        # the server might have been able to send a byte or two back, but this
        # at least checks that it received something and didn't just fail
        # (which could still result in the client not having received anything)
255
        self.assertGreater(len(s.buffer), 0)
256

257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
    def test_push(self):
        # Issue #12523: push() should raise a TypeError if it doesn't get
        # a bytes string
        s, event = start_echo_server()
        c = echo_client(b'\n', s.port)
        data = b'bytes\n'
        c.push(data)
        c.push(bytearray(data))
        c.push(memoryview(data))
        self.assertRaises(TypeError, c.push, 10)
        self.assertRaises(TypeError, c.push, 'unicode')
        c.push(SERVER_QUIT)
        asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
        s.join(timeout=TIMEOUT)
        self.assertEqual(c.contents, [b'bytes', b'bytes', b'bytes'])

273 274 275 276

class TestAsynchat_WithPoll(TestAsynchat):
    usepoll = True

Victor Stinner's avatar
Victor Stinner committed
277

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
class TestAsynchatMocked(unittest.TestCase):
    def test_blockingioerror(self):
        # Issue #16133: handle_read() must ignore BlockingIOError
        sock = unittest.mock.Mock()
        sock.recv.side_effect = BlockingIOError(errno.EAGAIN)

        dispatcher = asynchat.async_chat()
        dispatcher.set_socket(sock)
        self.addCleanup(dispatcher.del_channel)

        with unittest.mock.patch.object(dispatcher, 'handle_error') as error:
            dispatcher.handle_read()
        self.assertFalse(error.called)


293 294 295 296 297
class TestHelperFunctions(unittest.TestCase):
    def test_find_prefix_at_end(self):
        self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
        self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)

Victor Stinner's avatar
Victor Stinner committed
298

299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
class TestFifo(unittest.TestCase):
    def test_basic(self):
        f = asynchat.fifo()
        f.push(7)
        f.push(b'a')
        self.assertEqual(len(f), 2)
        self.assertEqual(f.first(), 7)
        self.assertEqual(f.pop(), (1, 7))
        self.assertEqual(len(f), 1)
        self.assertEqual(f.first(), b'a')
        self.assertEqual(f.is_empty(), False)
        self.assertEqual(f.pop(), (1, b'a'))
        self.assertEqual(len(f), 0)
        self.assertEqual(f.is_empty(), True)
        self.assertEqual(f.pop(), (0, None))

    def test_given_list(self):
        f = asynchat.fifo([b'x', 17, 3])
        self.assertEqual(len(f), 3)
        self.assertEqual(f.pop(), (1, b'x'))
        self.assertEqual(f.pop(), (1, 17))
        self.assertEqual(f.pop(), (1, 3))
        self.assertEqual(f.pop(), (0, None))
322 323


324 325 326 327 328 329 330 331
class TestNotConnected(unittest.TestCase):
    def test_disallow_negative_terminator(self):
        # Issue #11259
        client = asynchat.async_chat()
        self.assertRaises(ValueError, client.set_terminator, -1)



332
if __name__ == "__main__":
333
    unittest.main()