Commit b94ff136 authored by Jack Diederich's avatar Jack Diederich

- issue #6748 intermittent test failures from sockets

- telnetlib tests now use mock sockets for most tests
parent 2a9f18e4
import socket import socket
import select
import threading import threading
import telnetlib import telnetlib
import time import time
import queue import contextlib
import sys
import io
from unittest import TestCase from unittest import TestCase
from test import support from test import support
HOST = support.HOST HOST = support.HOST
EOF_sigil = object()
def server(evt, serv):
def server(evt, serv, dataq=None, test_done=None):
""" Open a tcp server in four steps
1) set evt to true to let the parent know we are ready
2) [optional] if is not False, write the list of data from dataq.get()
to the socket.
3) [optional] if test_done is not None, it's an event; wait
for parent to set test_done before closing connection
4) set evt to true to let the parent know we're done
"""
serv.listen(5) serv.listen(5)
evt.set() evt.set()
try: try:
conn, addr = serv.accept() conn, addr = serv.accept()
if dataq:
data = b''
new_data = dataq.get(True, 0.5)
dataq.task_done()
for item in new_data:
if item == EOF_sigil:
break
if type(item) in [int, float]:
time.sleep(item)
else:
data += item
written = conn.send(data)
data = data[written:]
except socket.timeout: except socket.timeout:
pass pass
finally: finally:
if test_done is not None:
test_done.wait()
serv.close() serv.close()
evt.set() evt.set()
...@@ -100,163 +75,159 @@ class GeneralTests(TestCase): ...@@ -100,163 +75,159 @@ class GeneralTests(TestCase):
self.assertEqual(telnet.sock.gettimeout(), 30) self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close() telnet.sock.close()
def _read_setUp(self): class SocketStub(object):
self.evt = threading.Event() ''' a socket proxy that re-defines sendall() '''
self.dataq = queue.Queue() def __init__(self, reads=[]):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.reads = reads
self.sock.settimeout(3) self.writes = []
self.port = support.bind_port(self.sock) self.block = False
self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) def sendall(self, data):
self.thread.start() self.writes.append(data)
self.evt.wait() def recv(self, size):
self.evt.clear() out = b''
time.sleep(.1) while self.reads and len(out) < size:
out += self.reads.pop(0)
def _read_tearDown(self): if len(out) > size:
self.evt.wait() self.reads.insert(0, out[size:])
self.thread.join() out = out[:size]
return out
class TelnetAlike(telnetlib.Telnet):
def fileno(self):
raise NotImplementedError()
def close(self): pass
def sock_avail(self):
return (not self.sock.block)
def msg(self, msg, *args):
with support.captured_stdout() as out:
telnetlib.Telnet.msg(self, msg, *args)
self._messages += out.getvalue()
return
def new_select(*s_args):
block = False
for l in s_args:
for fob in l:
if isinstance(fob, TelnetAlike):
block = fob.sock.block
if block:
return [[], [], []]
else:
return s_args
@contextlib.contextmanager
def test_socket(reads):
def new_conn(*ignored):
return SocketStub(reads)
try:
old_conn = socket.create_connection
socket.create_connection = new_conn
yield None
finally:
socket.create_connection = old_conn
return
def test_telnet(reads=[], cls=TelnetAlike):
''' return a telnetlib.Telnet object that uses a SocketStub with
reads queued up to be read '''
for x in reads:
assert type(x) is bytes, x
with test_socket(reads):
telnet = cls('dummy', 0)
telnet._messages = '' # debuglevel output
return telnet
class ReadTests(TestCase): class ReadTests(TestCase):
setUp = _read_setUp def setUp(self):
tearDown = _read_tearDown self.old_select = select.select
select.select = new_select
# use a similar approach to testing timeouts as test_timeout.py def tearDown(self):
# these will never pass 100% but make the fuzz big enough that it is rare select.select = self.old_select
block_long = 0.6
block_short = 0.3 def test_read_until(self):
def test_read_until_A(self):
""" """
read_until(expected, [timeout]) read_until(expected, timeout=None)
Read until the expected string has been seen, or a timeout is test the blocking version of read_util
hit (default is no timeout); may block.
""" """
want = [b'x' * 10, b'match', b'y' * 10, EOF_sigil] want = [b'xxxmatchyyy']
self.dataq.put(want) telnet = test_telnet(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_until(b'match') data = telnet.read_until(b'match')
self.assertEqual(data, b''.join(want[:-2])) self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads))
reads = [b'x' * 50, b'match', b'y' * 50]
expect = b''.join(reads[:-1])
telnet = test_telnet(reads)
data = telnet.read_until(b'match')
self.assertEqual(data, expect)
def test_read_until_B(self):
# test the timeout - it does NOT raise socket.timeout
want = [b'hello', self.block_long, b'not seen', EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_until(b'not seen', self.block_short)
self.assertEqual(data, want[0])
self.assertEqual(telnet.read_all(), b'not seen')
def test_read_all_A(self): def test_read_all(self):
""" """
read_all() read_all()
Read all data until EOF; may block. Read all data until EOF; may block.
""" """
want = [b'x' * 500, b'y' * 500, b'z' * 500, EOF_sigil] reads = [b'x' * 500, b'y' * 500, b'z' * 500]
self.dataq.put(want) expect = b''.join(reads)
telnet = telnetlib.Telnet(HOST, self.port) telnet = test_telnet(reads)
self.dataq.join()
data = telnet.read_all() data = telnet.read_all()
self.assertEqual(data, b''.join(want[:-1])) self.assertEqual(data, expect)
return return
def _test_blocking(self, func): def test_read_some(self):
self.dataq.put([self.block_long, EOF_sigil])
self.dataq.join()
start = time.time()
data = func()
self.assertTrue(self.block_short <= time.time() - start)
def test_read_all_B(self):
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
def test_read_all_C(self):
self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
telnet.read_all()
telnet.read_all() # shouldn't raise
def test_read_some_A(self):
""" """
read_some() read_some()
Read at least one byte or EOF; may block. Read at least one byte or EOF; may block.
""" """
# test 'at least one byte' # test 'at least one byte'
want = [b'x' * 500, EOF_sigil] telnet = test_telnet([b'x' * 500])
self.dataq.put(want) data = telnet.read_some()
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
data = telnet.read_all()
self.assertTrue(len(data) >= 1) self.assertTrue(len(data) >= 1)
def test_read_some_B(self):
# test EOF # test EOF
self.dataq.put([EOF_sigil]) telnet = test_telnet()
telnet = telnetlib.Telnet(HOST, self.port) data = telnet.read_some()
self.dataq.join() self.assertEqual(b'', data)
self.assertEqual(b'', telnet.read_some())
def test_read_some_C(self): def _read_eager(self, func_name):
self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
def _test_read_any_eager_A(self, func_name):
""" """
read_very_eager() read_*_eager()
Read all data available already queued or on the socket, Read all data available already queued or on the socket,
without blocking. without blocking.
""" """
want = [self.block_long, b'x' * 100, b'y' * 100, EOF_sigil] want = b'x' * 100
expects = want[1] + want[2] telnet = test_telnet([want])
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name) func = getattr(telnet, func_name)
telnet.sock.block = True
self.assertEqual(b'', func())
telnet.sock.block = False
data = b'' data = b''
while True: while True:
try: try:
data += func() data += func()
self.assertTrue(expects.startswith(data))
except EOFError: except EOFError:
break break
self.assertEqual(expects, data) self.assertEqual(data, want)
def _test_read_any_eager_B(self, func_name): def test_read_eager(self):
# test EOF # read_eager and read_very_eager make the same gaurantees
self.dataq.put([EOF_sigil]) # (they behave differently but we only test the gaurantees)
telnet = telnetlib.Telnet(HOST, self.port) self._read_eager('read_eager')
self.dataq.join() self._read_eager('read_very_eager')
time.sleep(self.block_short) # NB -- we need to test the IAC block which is mentioned in the
func = getattr(telnet, func_name) # docstring but not in the module docs
self.assertRaises(EOFError, func)
def read_very_lazy(self):
# read_eager and read_very_eager make the same gaurantees want = b'x' * 100
# (they behave differently but we only test the gaurantees) telnet = test_telnet([want])
def test_read_very_eager_A(self): self.assertEqual(b'', telnet.read_very_lazy())
self._test_read_any_eager_A('read_very_eager') while telnet.sock.reads:
def test_read_very_eager_B(self): telnet.fill_rawq()
self._test_read_any_eager_B('read_very_eager') data = telnet.read_very_lazy()
def test_read_eager_A(self): self.assertEqual(want, data)
self._test_read_any_eager_A('read_eager') self.assertRaises(EOFError, telnet.read_very_lazy)
def test_read_eager_B(self):
self._test_read_any_eager_B('read_eager') def test_read_lazy(self):
# NB -- we need to test the IAC block which is mentioned in the docstring want = b'x' * 100
# but not in the module docs telnet = test_telnet([want])
def _test_read_any_lazy_B(self, func_name):
self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
func = getattr(telnet, func_name)
telnet.fill_rawq()
self.assertRaises(EOFError, func)
def test_read_lazy_A(self):
want = [b'x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual(b'', telnet.read_lazy()) self.assertEqual(b'', telnet.read_lazy())
data = b'' data = b''
while True: while True:
...@@ -267,35 +238,8 @@ class ReadTests(TestCase): ...@@ -267,35 +238,8 @@ class ReadTests(TestCase):
telnet.fill_rawq() telnet.fill_rawq()
except EOFError: except EOFError:
break break
self.assertTrue(want[0].startswith(data)) self.assertTrue(want.startswith(data))
self.assertEqual(data, want[0]) self.assertEqual(data, want)
def test_read_lazy_B(self):
self._test_read_any_lazy_B('read_lazy')
def test_read_very_lazy_A(self):
want = [b'x' * 100, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
time.sleep(self.block_short)
self.assertEqual(b'', telnet.read_very_lazy())
data = b''
while True:
try:
read_data = telnet.read_very_lazy()
except EOFError:
break
data += read_data
if not read_data:
telnet.fill_rawq()
self.assertEqual(b'', telnet.cookedq)
telnet.process_rawq()
self.assertTrue(want[0].startswith(data))
self.assertEqual(data, want[0])
def test_read_very_lazy_B(self):
self._test_read_any_lazy_B('read_very_lazy')
class nego_collector(object): class nego_collector(object):
def __init__(self, sb_getter=None): def __init__(self, sb_getter=None):
...@@ -309,91 +253,32 @@ class nego_collector(object): ...@@ -309,91 +253,32 @@ class nego_collector(object):
sb_data = self.sb_getter() sb_data = self.sb_getter()
self.sb_seen += sb_data self.sb_seen += sb_data
class SocketProxy(object): tl = telnetlib
''' a socket proxy that re-defines sendall() '''
def __init__(self, real_sock):
self.socket = real_sock
self._raw_sent = b''
def __getattr__(self, k):
return getattr(self.socket, k)
def sendall(self, data):
self._raw_sent += data
self.socket.sendall(data)
class TelnetSockSendall(telnetlib.Telnet):
def open(self, *args, **opts):
telnetlib.Telnet.open(self, *args, **opts)
self.sock = SocketProxy(self.sock)
class WriteTests(TestCase): class WriteTests(TestCase):
'''The only thing that write does is replace each tl.IAC for '''The only thing that write does is replace each tl.IAC for
tl.IAC+tl.IAC''' tl.IAC+tl.IAC'''
def setUp(self):
self.evt = threading.Event()
self.test_done = threading.Event()
self.dataq = queue.Queue()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.port = support.bind_port(self.sock)
self.thread = threading.Thread(target=server, args=(
self.evt, self.sock, self.dataq, self.test_done))
self.thread.start()
self.evt.wait()
self.evt.clear()
time.sleep(.1)
def tearDown(self):
self.test_done.set()
self.evt.wait()
self.thread.join()
def _test_write(self, data):
self.telnet.sock._raw_sent = b''
self.telnet.write(data)
after_write = self.telnet.sock._raw_sent
self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC),
after_write)
def test_write(self): def test_write(self):
self.telnet = TelnetSockSendall()
data_sample = [b'data sample without IAC', data_sample = [b'data sample without IAC',
b'data sample with' + tl.IAC + b' one IAC', b'data sample with' + tl.IAC + b' one IAC',
b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC, b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC,
tl.IAC, tl.IAC,
b''] b'']
self.telnet.open(HOST, self.port) for data in data_sample:
for d in data_sample: telnet = test_telnet()
self.dataq.put([b'']) telnet.write(data)
self._test_write(d) written = b''.join(telnet.sock.writes)
self.telnet.close() self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written)
tl = telnetlib
class TelnetDebuglevel(tl.Telnet):
''' Telnet-alike that captures messages written to stdout when
debuglevel > 0
'''
_messages = ''
def msg(self, msg, *args):
orig_stdout = sys.stdout
sys.stdout = fake_stdout = io.StringIO()
tl.Telnet.msg(self, msg, *args)
self._messages += fake_stdout.getvalue()
sys.stdout = orig_stdout
return
class OptionTests(TestCase): class OptionTests(TestCase):
setUp = _read_setUp
tearDown = _read_tearDown
# RFC 854 commands # RFC 854 commands
cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
def _test_command(self, data): def _test_command(self, data):
""" helper for testing IAC + cmd """ """ helper for testing IAC + cmd """
self.setUp() telnet = test_telnet(data)
self.dataq.put(data) data_len = len(b''.join(data))
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector() nego = nego_collector()
telnet.set_option_negotiation_callback(nego.do_nego) telnet.set_option_negotiation_callback(nego.do_nego)
txt = telnet.read_all() txt = telnet.read_all()
...@@ -401,24 +286,16 @@ class OptionTests(TestCase): ...@@ -401,24 +286,16 @@ class OptionTests(TestCase):
self.assertTrue(len(cmd) > 0) # we expect at least one command self.assertTrue(len(cmd) > 0) # we expect at least one command
self.assertTrue(cmd[:1] in self.cmds) self.assertTrue(cmd[:1] in self.cmds)
self.assertEqual(cmd[1:2], tl.NOOPT) self.assertEqual(cmd[1:2], tl.NOOPT)
self.assertEqual(len(b''.join(data[:-1])), len(txt + cmd)) self.assertEqual(data_len, len(txt + cmd))
nego.sb_getter = None # break the nego => telnet cycle nego.sb_getter = None # break the nego => telnet cycle
self.tearDown()
def test_IAC_commands(self): def test_IAC_commands(self):
# reset our setup
self.dataq.put([EOF_sigil])
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
self.tearDown()
for cmd in self.cmds: for cmd in self.cmds:
self._test_command([tl.IAC, cmd, EOF_sigil]) self._test_command([tl.IAC, cmd])
self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100, EOF_sigil]) self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100])
self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10, EOF_sigil]) self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10])
# all at once # all at once
self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) self._test_command([tl.IAC + cmd for (cmd) in self.cmds])
self.assertEqual(b'', telnet.read_sb_data())
def test_SB_commands(self): def test_SB_commands(self):
# RFC 855, subnegotiations portion # RFC 855, subnegotiations portion
...@@ -427,11 +304,8 @@ class OptionTests(TestCase): ...@@ -427,11 +304,8 @@ class OptionTests(TestCase):
tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE,
tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE,
EOF_sigil,
] ]
self.dataq.put(send) telnet = test_telnet(send)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
nego = nego_collector(telnet.read_sb_data) nego = nego_collector(telnet.read_sb_data)
telnet.set_option_negotiation_callback(nego.do_nego) telnet.set_option_negotiation_callback(nego.do_nego)
txt = telnet.read_all() txt = telnet.read_all()
...@@ -441,19 +315,6 @@ class OptionTests(TestCase): ...@@ -441,19 +315,6 @@ class OptionTests(TestCase):
self.assertEqual(b'', telnet.read_sb_data()) self.assertEqual(b'', telnet.read_sb_data())
nego.sb_getter = None # break the nego => telnet cycle nego.sb_getter = None # break the nego => telnet cycle
def _test_debuglevel(self, data, expected_msg):
""" helper for testing debuglevel messages """
self.setUp()
self.dataq.put(data + [EOF_sigil])
telnet = TelnetDebuglevel(HOST, self.port)
telnet.set_debuglevel(1)
self.dataq.join()
txt = telnet.read_all()
self.assertTrue(expected_msg in telnet._messages,
msg=(telnet._messages, expected_msg))
telnet.close()
self.tearDown()
def test_debuglevel_reads(self): def test_debuglevel_reads(self):
# test all the various places that self.msg(...) is called # test all the various places that self.msg(...) is called
given_a_expect_b = [ given_a_expect_b = [
...@@ -467,21 +328,18 @@ class OptionTests(TestCase): ...@@ -467,21 +328,18 @@ class OptionTests(TestCase):
(tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"),
] ]
for a, b in given_a_expect_b: for a, b in given_a_expect_b:
self._test_debuglevel([a, EOF_sigil], b) telnet = test_telnet([a])
telnet.set_debuglevel(1)
txt = telnet.read_all()
self.assertTrue(b in telnet._messages)
return return
def test_debuglevel_write(self): def test_debuglevel_write(self):
self.setUp() telnet = test_telnet()
telnet = TelnetDebuglevel(HOST, self.port)
telnet.set_debuglevel(1) telnet.set_debuglevel(1)
self.dataq.put([b'', EOF_sigil])
self.dataq.join()
telnet.write(b'xxx') telnet.write(b'xxx')
expected = "send b'xxx'\n" expected = "send b'xxx'\n"
self.assertTrue(expected in telnet._messages, self.assertTrue(expected in telnet._messages)
msg=(telnet._messages, expected))
telnet.close()
self.tearDown()
def test_main(verbose=None): def test_main(verbose=None):
support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests) support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment