Issue #19170: telnetlib: use selectors.

parent 3b6b76b8
This diff is collapsed.
import socket
import select
import selectors
import telnetlib
import time
import contextlib
import unittest
from unittest import TestCase
from test import support
threading = support.import_module('threading')
......@@ -112,40 +111,32 @@ class TelnetAlike(telnetlib.Telnet):
self._messages += out.getvalue()
return
def mock_select(*s_args):
block = False
for l in s_args:
for fob in l:
if isinstance(fob, TelnetAlike):
block = fob.sock.block
if block:
return [[], [], []]
else:
return s_args
class MockPoller(object):
test_case = None # Set during TestCase setUp.
class MockSelector(selectors.BaseSelector):
def __init__(self):
self._file_objs = []
super().__init__()
self.keys = {}
def register(self, fileobj, events, data=None):
key = selectors.SelectorKey(fileobj, 0, events, data)
self.keys[fileobj] = key
return key
def register(self, fd, eventmask):
self.test_case.assertTrue(hasattr(fd, 'fileno'), fd)
self.test_case.assertEqual(eventmask, select.POLLIN|select.POLLPRI)
self._file_objs.append(fd)
def unregister(self, fileobj):
key = self.keys.pop(fileobj)
return key
def poll(self, timeout=None):
def select(self, timeout=None):
block = False
for fob in self._file_objs:
if isinstance(fob, TelnetAlike):
block = fob.sock.block
for fileobj in self.keys:
if isinstance(fileobj, TelnetAlike):
block = fileobj.sock.block
break
if block:
return []
else:
return zip(self._file_objs, [select.POLLIN]*len(self._file_objs))
return [(key, key.events) for key in self.keys.values()]
def unregister(self, fd):
self._file_objs.remove(fd)
@contextlib.contextmanager
def test_socket(reads):
......@@ -159,7 +150,7 @@ def test_socket(reads):
socket.create_connection = old_conn
return
def test_telnet(reads=(), cls=TelnetAlike, use_poll=None):
def test_telnet(reads=(), cls=TelnetAlike):
''' return a telnetlib.Telnet object that uses a SocketStub with
reads queued up to be read '''
for x in reads:
......@@ -167,29 +158,14 @@ def test_telnet(reads=(), cls=TelnetAlike, use_poll=None):
with test_socket(reads):
telnet = cls('dummy', 0)
telnet._messages = '' # debuglevel output
if use_poll is not None:
if use_poll and not telnet._has_poll:
raise unittest.SkipTest('select.poll() required.')
telnet._has_poll = use_poll
return telnet
class ExpectAndReadTestCase(TestCase):
def setUp(self):
self.old_select = select.select
select.select = mock_select
self.old_poll = False
if hasattr(select, 'poll'):
self.old_poll = select.poll
select.poll = MockPoller
MockPoller.test_case = self
self.old_selector = telnetlib._TelnetSelector
telnetlib._TelnetSelector = MockSelector
def tearDown(self):
if self.old_poll:
MockPoller.test_case = None
select.poll = self.old_poll
select.select = self.old_select
telnetlib._TelnetSelector = self.old_selector
class ReadTests(ExpectAndReadTestCase):
def test_read_until(self):
......@@ -208,22 +184,6 @@ class ReadTests(ExpectAndReadTestCase):
data = telnet.read_until(b'match')
self.assertEqual(data, expect)
def test_read_until_with_poll(self):
"""Use select.poll() to implement telnet.read_until()."""
want = [b'x' * 10, b'match', b'y' * 10]
telnet = test_telnet(want, use_poll=True)
select.select = lambda *_: self.fail('unexpected select() call.')
data = telnet.read_until(b'match')
self.assertEqual(data, b''.join(want[:-1]))
def test_read_until_with_select(self):
"""Use select.select() to implement telnet.read_until()."""
want = [b'x' * 10, b'match', b'y' * 10]
telnet = test_telnet(want, use_poll=False)
if self.old_poll:
select.poll = lambda *_: self.fail('unexpected poll() call.')
data = telnet.read_until(b'match')
self.assertEqual(data, b''.join(want[:-1]))
def test_read_all(self):
"""
......@@ -427,23 +387,6 @@ class ExpectTests(ExpectAndReadTestCase):
(_,_,data) = telnet.expect([b'match'])
self.assertEqual(data, b''.join(want[:-1]))
def test_expect_with_poll(self):
"""Use select.poll() to implement telnet.expect()."""
want = [b'x' * 10, b'match', b'y' * 10]
telnet = test_telnet(want, use_poll=True)
select.select = lambda *_: self.fail('unexpected select() call.')
(_,_,data) = telnet.expect([b'match'])
self.assertEqual(data, b''.join(want[:-1]))
def test_expect_with_select(self):
"""Use select.select() to implement telnet.expect()."""
want = [b'x' * 10, b'match', b'y' * 10]
telnet = test_telnet(want, use_poll=False)
if self.old_poll:
select.poll = lambda *_: self.fail('unexpected poll() call.')
(_,_,data) = telnet.expect([b'match'])
self.assertEqual(data, b''.join(want[:-1]))
def test_main(verbose=None):
support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment