Commit 50dc757f authored by Jason Madden's avatar Jason Madden

Fix #874

Refactor test__backdoor.py to be simpler, and add a test case for this
specific issue. Take the proposed patch of switch in by overriding
throw() just for backdoor because I don't really see a general place for
it in the hub.
parent 3af257af
......@@ -140,6 +140,9 @@ Other Changes
- The various ``FileObject`` implementations are more consistent with
each other. **Note:** Writing to the *io* property of a FileObject should be
considered deprecated.
- Timeout exceptions (and other asynchronous exceptions) could cause
the Backdoor server to fail to properly manage the
stdout/stderr/stdin values. Reported with a patch in :pr:`874` by stefanmh.
Servers
~~~~~~~
......
......@@ -28,7 +28,6 @@ try:
except AttributeError:
sys.ps2 = '... '
class _Greenlet_stdreplace(Greenlet):
# A greenlet that replaces sys.std[in/out/err] while running.
_fileobj = None
......@@ -37,7 +36,7 @@ class _Greenlet_stdreplace(Greenlet):
def switch(self, *args, **kw):
if self._fileobj is not None:
self.switch_in()
Greenlet.switch(self, *args, **kw) # pylint:disable=no-member
Greenlet.switch(self, *args, **kw)
def switch_in(self):
self.saved = sys.stdin, sys.stderr, sys.stdout
......@@ -47,11 +46,16 @@ class _Greenlet_stdreplace(Greenlet):
sys.stdin, sys.stderr, sys.stdout = self.saved
self.saved = None
def throw(self, *args, **kwargs):
if self.saved is None and self._fileobj is not None:
self.switch_in()
Greenlet.throw(self, *args, **kwargs)
def run(self):
try:
return Greenlet.run(self)
finally:
# XXX why is this necessary?
# Make sure to restore the originals.
self.switch_out()
......
......@@ -367,8 +367,9 @@ class TestCase(TestCaseMetaClass("NewBase", (BaseTestCase,), {})):
self.cleanup()
self._error = self._none
for x in self.close_on_teardown:
close = getattr(x, 'close', x)
try:
x.close()
close()
except Exception:
pass
try:
......
......@@ -7,39 +7,45 @@ from _six import xrange
def read_until(conn, postfix):
read = b''
if isinstance(postfix, str) and str != bytes:
if not isinstance(postfix, bytes):
postfix = postfix.encode('utf-8')
while not read.endswith(postfix):
result = conn.recv(1)
if not result:
raise AssertionError('Connection ended before %r. Data read:\n%r' % (postfix, read))
read += result
if str != bytes:
read = read.decode('utf-8')
return read
return read if isinstance(read, str) else read.decode('utf-8')
def create_connection(address):
conn = socket.socket()
conn.connect(address)
return conn
def readline(conn):
with conn.makefile() as f:
return f.readline()
class Test(greentest.TestCase):
def readline(conn):
f = conn.makefile()
line = f.readline()
f.close()
return line
_server = None
def tearDown(self):
self._server.stop()
self._server = None
super(Test, self).tearDown()
class Test(greentest.TestCase):
def _make_server(self, *args, **kwargs):
self._server = backdoor.BackdoorServer(('127.0.0.1', 0), *args, **kwargs)
self._close_on_teardown(self._server.stop)
self._server.start()
def _create_connection(self):
conn = socket.socket()
self._close_on_teardown(conn)
conn.connect(('127.0.0.1', self._server.server_port))
return conn
def test(self):
server = backdoor.BackdoorServer(('127.0.0.1', 0))
server.start()
self._make_server()
def connect():
conn = create_connection(('127.0.0.1', server.server_port))
conn = self._create_connection()
try:
read_until(conn, '>>> ')
conn.sendall(b'2+2\r\n')
......@@ -48,70 +54,66 @@ class Test(greentest.TestCase):
finally:
conn.close()
try:
jobs = [gevent.spawn(connect) for _ in xrange(10)]
gevent.joinall(jobs, raise_error=True)
finally:
server.close()
#self.assertEqual(conn.recv(1), '')
jobs = [gevent.spawn(connect) for _ in xrange(10)]
gevent.joinall(jobs, raise_error=True)
def test_quit(self):
server = backdoor.BackdoorServer(('127.0.0.1', 0))
server.start()
conn = None
try:
conn = create_connection(('127.0.0.1', server.server_port))
read_until(conn, '>>> ')
conn.sendall(b'quit()\r\n')
line = readline(conn)
self.assertEqual(line, '')
finally:
if conn is not None:
conn.close()
server.stop()
self._make_server()
conn = self._create_connection()
read_until(conn, '>>> ')
conn.sendall(b'quit()\r\n')
line = readline(conn)
self.assertEqual(line, '')
def test_sys_exit(self):
server = backdoor.BackdoorServer(('127.0.0.1', 0))
server.start()
conn = None
try:
conn = create_connection(('127.0.0.1', server.server_port))
read_until(conn, b'>>> ')
conn.sendall(b'import sys; sys.exit(0)\r\n')
line = readline(conn)
self.assertEqual(line, '')
finally:
if conn is not None:
conn.close()
server.stop()
self._make_server()
conn = self._create_connection()
read_until(conn, b'>>> ')
conn.sendall(b'import sys; sys.exit(0)\r\n')
line = readline(conn)
self.assertEqual(line, '')
def test_banner(self):
banner = "Welcome stranger!" # native string
server = backdoor.BackdoorServer(('127.0.0.1', 0), banner=banner)
server.start()
try:
conn = create_connection(('127.0.0.1', server.server_port))
response = read_until(conn, b'>>> ')
self.assertEqual(response[:len(banner)], banner, response)
conn.close()
finally:
server.stop()
self._make_server(banner=banner)
conn = self._create_connection()
response = read_until(conn, b'>>> ')
self.assertEqual(response[:len(banner)], banner, response)
def test_builtins(self):
server = backdoor.BackdoorServer(('127.0.0.1', 0))
server.start()
conn = None
try:
conn = create_connection(('127.0.0.1', server.server_port))
read_until(conn, b'>>> ')
conn.sendall(b'locals()["__builtins__"]\r\n')
response = read_until(conn, '>>> ')
self.assertTrue(len(response) < 300, msg="locals() unusable: %s..." % response)
finally:
if conn is not None:
conn.close()
server.stop()
self._make_server()
conn = self._create_connection()
read_until(conn, b'>>> ')
conn.sendall(b'locals()["__builtins__"]\r\n')
response = read_until(conn, '>>> ')
self.assertTrue(len(response) < 300, msg="locals() unusable: %s..." % response)
def test_switch_exc(self):
from gevent.queue import Queue, Empty
def bad():
q = Queue()
print('switching out, then throwing in')
try:
q.get(block=True, timeout=0.1)
except Empty:
print("Got Empty")
print('switching out')
gevent.sleep(0.1)
print('switched in')
self._make_server(locals={'bad': bad})
conn = self._create_connection()
read_until(conn, b'>>> ')
conn.sendall(b'bad()\r\n')
response = read_until(conn, '>>> ')
response = response.replace('\r\n', '\n')
self.assertEqual('switching out, then throwing in\nGot Empty\nswitching out\nswitched in\n>>> ', response)
conn.sendall(b'quit()\r\n')
line = readline(conn)
self.assertEqual(line, '')
conn.close()
if __name__ == '__main__':
greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment