Commit 95781f98 authored by Facundo Batista's avatar Facundo Batista

Added tests for asynchat classes simple_producer & fifo, and the

find_prefix_at_end function. Check behavior of a string given as a
producer.  Added tests for behavior of asynchat.async_chat when given
int, long, and None terminator arguments. Added usepoll attribute to
TestAsynchat to allow running the asynchat tests with poll support
chosen whether it's available or not (improves coverage of asyncore
code). [GSoC - Alan McIntyre]
parent d8d47aff
......@@ -7,8 +7,12 @@ from test import test_support
HOST = "127.0.0.1"
PORT = 54322
SERVER_QUIT = 'QUIT\n'
class echo_server(threading.Thread):
# parameter to determine the number of bytes passed back to the
# client each send
chunk_size = 1
def run(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
......@@ -17,15 +21,28 @@ class echo_server(threading.Thread):
PORT = test_support.bind_port(sock, HOST, PORT)
sock.listen(1)
conn, client = sock.accept()
buffer = ""
while "\n" not in buffer:
self.buffer = ""
# collect data until quit message is seen
while SERVER_QUIT not in self.buffer:
data = conn.recv(1)
if not data:
break
buffer = buffer + data
while buffer:
n = conn.send(buffer)
buffer = buffer[n:]
self.buffer = self.buffer + data
# remove the SERVER_QUIT message
self.buffer = self.buffer.replace(SERVER_QUIT, '')
# re-send entire set of collected data
try:
# this may fail on some tests, such as test_close_when_done, since
# the client closes the channel when it's done sending
while self.buffer:
n = conn.send(self.buffer[:self.chunk_size])
time.sleep(0.001)
self.buffer = self.buffer[n:]
except:
pass
conn.close()
sock.close()
......@@ -33,61 +50,197 @@ class echo_client(asynchat.async_chat):
def __init__(self, terminator):
asynchat.async_chat.__init__(self)
self.contents = None
self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((HOST, PORT))
self.set_terminator(terminator)
self.buffer = ""
self.buffer = ''
def handle_connect(self):
pass
##print "Connected"
def collect_incoming_data(self, data):
self.buffer = self.buffer + data
self.buffer += data
def found_terminator(self):
#print "Received:", repr(self.buffer)
self.contents = self.buffer
self.contents.append(self.buffer)
self.buffer = ""
self.close()
class TestAsynchat(unittest.TestCase):
usepoll = False
def setUp (self):
pass
def tearDown (self):
pass
def test_line_terminator(self):
def line_terminator_check(self, term, server_chunk):
s = echo_server()
s.chunk_size = server_chunk
s.start()
time.sleep(1) # Give server time to initialize
c = echo_client('\n')
time.sleep(0.5) # Give server time to initialize
c = echo_client(term)
c.push("hello ")
c.push("world\n")
asyncore.loop()
c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
s.join()
self.assertEqual(c.contents, 'hello world')
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
# the line terminator tests below check receiving variously-sized
# chunks back from the server in order to exercise all branches of
# async_chat.handle_read
def test_numeric_terminator(self):
def test_line_terminator1(self):
# test one-character terminator
for l in (1,2,3):
self.line_terminator_check('\n', l)
def test_line_terminator2(self):
# test two-character terminator
for l in (1,2,3):
self.line_terminator_check('\r\n', l)
def test_line_terminator3(self):
# test three-character terminator
for l in (1,2,3):
self.line_terminator_check('qqq', l)
def numeric_terminator_check(self, termlen):
# Try reading a fixed number of bytes
s = echo_server()
s.start()
time.sleep(1) # Give server time to initialize
c = echo_client(6L)
c.push("hello ")
c.push("world\n")
asyncore.loop()
time.sleep(0.5) # Give server time to initialize
c = echo_client(termlen)
data = "hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
s.join()
self.assertEqual(c.contents, [data[:termlen]])
def test_numeric_terminator1(self):
# check that ints & longs both work (since type is
# explicitly checked in async_chat.handle_read)
self.numeric_terminator_check(1)
self.numeric_terminator_check(1L)
def test_numeric_terminator2(self):
self.numeric_terminator_check(6L)
def test_none_terminator(self):
# Try reading a fixed number of bytes
s = echo_server()
s.start()
time.sleep(0.5) # Give server time to initialize
c = echo_client(None)
data = "hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
s.join()
self.assertEqual(c.contents, [])
self.assertEqual(c.buffer, data)
def test_simple_producer(self):
s = echo_server()
s.start()
time.sleep(0.5) # Give server time to initialize
c = echo_client('\n')
data = "hello world\nI'm not dead yet!\n"
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
c.push_with_producer(p)
asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
def test_string_producer(self):
s = echo_server()
s.start()
time.sleep(0.5) # Give server time to initialize
c = echo_client('\n')
data = "hello world\nI'm not dead yet!\n"
c.push_with_producer(data+SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
def test_empty_line(self):
# checks that empty lines are handled correctly
s = echo_server()
s.start()
time.sleep(0.5) # Give server time to initialize
c = echo_client('\n')
c.push("hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll)
s.join()
self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
def test_close_when_done(self):
s = echo_server()
s.start()
time.sleep(0.5) # Give server time to initialize
c = echo_client('\n')
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
s.join()
self.assertEqual(c.contents, 'hello ')
self.assertEqual(c.contents, [])
# the server might have been able to send a byte or two back, but this
# at least checks that it received something and didn't just fail
# (which could still result in the client not having received anything)
self.assertTrue(len(s.buffer) > 0)
class TestAsynchat_WithPoll(TestAsynchat):
usepoll = True
class TestHelperFunctions(unittest.TestCase):
def test_find_prefix_at_end(self):
self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
class TestFifo(unittest.TestCase):
def test_basic(self):
f = asynchat.fifo()
f.push(7)
f.push('a')
self.assertEqual(len(f), 2)
self.assertEqual(f.first(), 7)
self.assertEqual(f.pop(), (1, 7))
self.assertEqual(len(f), 1)
self.assertEqual(f.first(), 'a')
self.assertEqual(f.is_empty(), False)
self.assertEqual(f.pop(), (1, 'a'))
self.assertEqual(len(f), 0)
self.assertEqual(f.is_empty(), True)
self.assertEqual(f.pop(), (0, None))
def test_given_list(self):
f = asynchat.fifo(['x', 17, 3])
self.assertEqual(len(f), 3)
self.assertEqual(f.pop(), (1, 'x'))
self.assertEqual(f.pop(), (1, 17))
self.assertEqual(f.pop(), (1, 3))
self.assertEqual(f.pop(), (0, None))
def test_main(verbose=None):
test_support.run_unittest(TestAsynchat)
test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
TestHelperFunctions, TestFifo)
if __name__ == "__main__":
test_main(verbose=True)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment