Commit 279a0910 authored by Georg Brandl's avatar Georg Brandl

Convert test_poll to unittest.

parent 8e132d6e
test_poll
Running poll test 1
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
This is a test.
Poll test 1 complete
Running poll test 2
Poll test 2 complete
Running poll test 3
Poll test 3 complete
# Test case for the os.poll() function # Test case for the os.poll() function
import sys, os, select, random import sys, os, select, random, unittest
from test.test_support import verify, verbose, TestSkipped, TESTFN from test.test_support import TestSkipped, TESTFN, run_unittest
try: try:
select.poll select.poll
...@@ -16,12 +16,12 @@ def find_ready_matching(ready, flag): ...@@ -16,12 +16,12 @@ def find_ready_matching(ready, flag):
match.append(fd) match.append(fd)
return match return match
def test_poll1(): class PollTests(unittest.TestCase):
"""Basic functional test of poll object
def test_poll1(self):
# Basic functional test of poll object
# Create a bunch of pipe and test that poll works with them.
Create a bunch of pipe and test that poll works with them.
"""
print 'Running poll test 1'
p = select.poll() p = select.poll()
NUM_PIPES = 12 NUM_PIPES = 12
...@@ -41,6 +41,8 @@ def test_poll1(): ...@@ -41,6 +41,8 @@ def test_poll1():
r2w[rd] = wr r2w[rd] = wr
w2r[wr] = rd w2r[wr] = rd
bufs = []
while writers: while writers:
ready = p.poll() ready = p.poll()
ready_writers = find_ready_matching(ready, select.POLLOUT) ready_writers = find_ready_matching(ready, select.POLLOUT)
...@@ -55,17 +57,16 @@ def test_poll1(): ...@@ -55,17 +57,16 @@ def test_poll1():
raise RuntimeError, "no pipes ready for reading" raise RuntimeError, "no pipes ready for reading"
rd = random.choice(ready_readers) rd = random.choice(ready_readers)
buf = os.read(rd, MSG_LEN) buf = os.read(rd, MSG_LEN)
verify(len(buf) == MSG_LEN) self.assertEqual(len(buf), MSG_LEN)
print buf bufs.append(buf)
os.close(r2w[rd]) ; os.close( rd ) os.close(r2w[rd]) ; os.close( rd )
p.unregister( r2w[rd] ) p.unregister( r2w[rd] )
p.unregister( rd ) p.unregister( rd )
writers.remove(r2w[rd]) writers.remove(r2w[rd])
poll_unit_tests() self.assertEqual(bufs, [MSG] * NUM_PIPES)
print 'Poll test 1 complete'
def poll_unit_tests(): def poll_unit_tests(self):
# returns NVAL for invalid file descriptor # returns NVAL for invalid file descriptor
FD = 42 FD = 42
try: try:
...@@ -75,42 +76,27 @@ def poll_unit_tests(): ...@@ -75,42 +76,27 @@ def poll_unit_tests():
p = select.poll() p = select.poll()
p.register(FD) p.register(FD)
r = p.poll() r = p.poll()
verify(r[0] == (FD, select.POLLNVAL)) self.assertEqual(r[0], (FD, select.POLLNVAL))
f = open(TESTFN, 'w') f = open(TESTFN, 'w')
fd = f.fileno() fd = f.fileno()
p = select.poll() p = select.poll()
p.register(f) p.register(f)
r = p.poll() r = p.poll()
verify(r[0][0] == fd) self.assertEqual(r[0][0], fd)
f.close() f.close()
r = p.poll() r = p.poll()
verify(r[0] == (fd, select.POLLNVAL)) self.assertEqual(r[0], (fd, select.POLLNVAL))
os.unlink(TESTFN) os.unlink(TESTFN)
# type error for invalid arguments # type error for invalid arguments
p = select.poll() p = select.poll()
try: self.assertRaises(TypeError, p.register, p)
p.register(p) self.assertRaises(TypeError, p.unregister, p)
except TypeError:
pass
else:
print "Bogus register call did not raise TypeError"
try:
p.unregister(p)
except TypeError:
pass
else:
print "Bogus unregister call did not raise TypeError"
# can't unregister non-existent object # can't unregister non-existent object
p = select.poll() p = select.poll()
try: self.assertRaises(KeyError, p.unregister, 3)
p.unregister(3)
except KeyError:
pass
else:
print "Bogus unregister call did not raise KeyError"
# Test error cases # Test error cases
pollster = select.poll() pollster = select.poll()
...@@ -121,29 +107,18 @@ def poll_unit_tests(): ...@@ -121,29 +107,18 @@ def poll_unit_tests():
def fileno(self): def fileno(self):
return 'fileno' return 'fileno'
try: self.assertRaises(TypeError, pollster.register, Nope(), 0)
pollster.register( Nope(), 0 ) self.assertRaises(TypeError, pollster.register, Almost(), 0)
except TypeError: pass
else: print 'expected TypeError exception, not raised'
try:
pollster.register( Almost(), 0 )
except TypeError: pass
else: print 'expected TypeError exception, not raised'
# Another test case for poll(). This is copied from the test case for
# select(), modified to use poll() instead.
# Another test case for poll(). This is copied from the test case for def test_poll2(self):
# select(), modified to use poll() instead.
def test_poll2():
print 'Running poll test 2'
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done' cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
p = os.popen(cmd, 'r') p = os.popen(cmd, 'r')
pollster = select.poll() pollster = select.poll()
pollster.register( p, select.POLLIN ) pollster.register( p, select.POLLIN )
for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10: for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
if verbose:
print 'timeout =', tout
fdlist = pollster.poll(tout) fdlist = pollster.poll(tout)
if (fdlist == []): if (fdlist == []):
continue continue
...@@ -151,42 +126,31 @@ def test_poll2(): ...@@ -151,42 +126,31 @@ def test_poll2():
if flags & select.POLLHUP: if flags & select.POLLHUP:
line = p.readline() line = p.readline()
if line != "": if line != "":
print 'error: pipe seems to be closed, but still returns data' self.fail('error: pipe seems to be closed, but still returns data')
continue continue
elif flags & select.POLLIN: elif flags & select.POLLIN:
line = p.readline() line = p.readline()
if verbose:
print repr(line)
if not line: if not line:
if verbose:
print 'EOF'
break break
continue continue
else: else:
print 'Unexpected return value from select.poll:', fdlist self.fail('Unexpected return value from select.poll: %s' % fdlist)
p.close() p.close()
print 'Poll test 2 complete'
def test_poll3(): def test_poll3(self):
# test int overflow # test int overflow
print 'Running poll test 3'
pollster = select.poll() pollster = select.poll()
pollster.register(1) pollster.register(1)
try: self.assertRaises(OverflowError, pollster.poll, 1L << 64)
pollster.poll(1L << 64)
except OverflowError:
pass
else:
print 'Expected OverflowError with excessive timeout'
x = 2 + 3 x = 2 + 3
if x != 5: if x != 5:
print 'Overflow must have occurred' self.fail('Overflow must have occurred')
print 'Poll test 3 complete'
def test_main():
run_unittest(PollTests)
test_poll1() if __name__ == '__main__':
test_poll2() test_main()
test_poll3()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment