Commit 1d4899b8 authored by Jason Madden's avatar Jason Madden

Simplifiy test__pywsgi.py

Remove redundant checks for constant values.
parent 9fe8ba12
...@@ -54,13 +54,11 @@ from gevent.pywsgi import Input ...@@ -54,13 +54,11 @@ from gevent.pywsgi import Input
CONTENT_LENGTH = 'Content-Length' CONTENT_LENGTH = 'Content-Length'
CONN_ABORTED_ERRORS = greentest.CONN_ABORTED_ERRORS CONN_ABORTED_ERRORS = greentest.CONN_ABORTED_ERRORS
server_implements_chunked = True
server_implements_pipeline = True
server_implements_100continue = True
DEBUG = '-v' in sys.argv
REASONS = {200: 'OK', REASONS = {
500: 'Internal Server Error'} 200: 'OK',
500: 'Internal Server Error'
}
class ConnectionClosed(Exception): class ConnectionClosed(Exception):
...@@ -316,25 +314,45 @@ class TestCase(greentest.TestCase): ...@@ -316,25 +314,45 @@ class TestCase(greentest.TestCase):
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
return read_http(fd, *args, **kwargs) return read_http(fd, *args, **kwargs)
class CommonTests(TestCase): HTTP_CLIENT_VERSION = '1.1'
def format_request(self, method='GET', path='/', **headers):
headers = '\r\n'.join('%s: %s' % item for item in headers.items())
headers = headers + '\r\n' if headers else headers
result = (
'%(method)s %(path)s HTTP/%(http_ver)s\r\n'
'Host: localhost\r\n'
'%(headers)s'
'\r\n'
)
result = result % dict(
method=method,
path=path,
http_ver=self.HTTP_CLIENT_VERSION,
headers=headers
)
return result
class CommonTestMixin(object):
def test_basic(self): def test_basic(self):
with self.makefile() as fd: with self.makefile() as fd:
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write(self.format_request())
response = read_http(fd, body='hello world') response = read_http(fd, body='hello world')
if response.headers.get('Connection') == 'close' and not server_implements_pipeline: if response.headers.get('Connection') == 'close':
return return
fd.write('GET /notexist HTTP/1.1\r\nHost: localhost\r\n\r\n')
fd.write(self.format_request(path='/notexist'))
read_http(fd, code=404, reason='Not Found', body='not found') read_http(fd, code=404, reason='Not Found', body='not found')
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write(self.format_request())
read_http(fd, body='hello world') read_http(fd, body='hello world')
def test_pipeline(self): def test_pipeline(self):
if not server_implements_pipeline:
return
exception = AssertionError('HTTP pipelining not supported; the second request is thrown away') exception = AssertionError('HTTP pipelining not supported; the second request is thrown away')
with self.makefile() as fd: with self.makefile() as fd:
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n' + 'GET /notexist HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write(self.format_request() + self.format_request(path='/notexist'))
read_http(fd, body='hello world') read_http(fd, body='hello world')
try: try:
...@@ -349,13 +367,13 @@ class CommonTests(TestCase): ...@@ -349,13 +367,13 @@ class CommonTests(TestCase):
def test_connection_close(self): def test_connection_close(self):
with self.makefile() as fd: with self.makefile() as fd:
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write(self.format_request())
response = read_http(fd) response = read_http(fd)
if response.headers.get('Connection') == 'close' and not server_implements_pipeline: if response.headers.get('Connection') == 'close':
return return
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write(self.format_request(Connection='close'))
read_http(fd) read_http(fd)
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write(self.format_request())
# This may either raise, or it may return an empty response, # This may either raise, or it may return an empty response,
# depend on timing and the Python version. # depend on timing and the Python version.
try: try:
...@@ -384,7 +402,7 @@ class CommonTests(TestCase): ...@@ -384,7 +402,7 @@ class CommonTests(TestCase):
self.assertEqual(status, '414') self.assertEqual(status, '414')
class TestNoChunks(CommonTests): class TestNoChunks(CommonTestMixin, TestCase):
# when returning a list of strings a shortcut is employed by the server: # when returning a list of strings a shortcut is employed by the server:
# it calculates the content-length and joins all the chunks before sending # it calculates the content-length and joins all the chunks before sending
validator = None validator = None
...@@ -399,9 +417,6 @@ class TestNoChunks(CommonTests): ...@@ -399,9 +417,6 @@ class TestNoChunks(CommonTests):
return [b'not ', b'found'] return [b'not ', b'found']
def test(self): def test(self):
if not server_implements_pipeline:
raise unittest.SkipTest("No pipelines")
with self.makefile() as fd: with self.makefile() as fd:
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
response = read_http(fd, body='hello world') response = read_http(fd, body='hello world')
...@@ -431,7 +446,7 @@ class TestExplicitContentLength(TestNoChunks): # pylint:disable=too-many-ancesto ...@@ -431,7 +446,7 @@ class TestExplicitContentLength(TestNoChunks): # pylint:disable=too-many-ancesto
return [b'not ', b'found'] return [b'not ', b'found']
class TestYield(CommonTests): class TestYield(CommonTestMixin, TestCase):
@staticmethod @staticmethod
def application(env, start_response): def application(env, start_response):
...@@ -444,7 +459,7 @@ class TestYield(CommonTests): ...@@ -444,7 +459,7 @@ class TestYield(CommonTests):
yield b"not found" yield b"not found"
class TestBytearray(CommonTests): class TestBytearray(CommonTestMixin, TestCase):
validator = None validator = None
...@@ -458,7 +473,7 @@ class TestBytearray(CommonTests): ...@@ -458,7 +473,7 @@ class TestBytearray(CommonTests):
return [bytearray(b"not found")] return [bytearray(b"not found")]
class MultiLineHeader(TestCase): class TestMultiLineHeader(TestCase):
@staticmethod @staticmethod
def application(env, start_response): def application(env, start_response):
assert "test.submit" in env["CONTENT_TYPE"] assert "test.submit" in env["CONTENT_TYPE"]
...@@ -550,13 +565,9 @@ class TestChunkedApp(TestCase): ...@@ -550,13 +565,9 @@ class TestChunkedApp(TestCase):
with self.makefile() as fd: with self.makefile() as fd:
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body(), chunks=None) response = read_http(fd, body=self.body(), chunks=None)
if server_implements_chunked:
response.assertHeader('Transfer-Encoding', 'chunked') response.assertHeader('Transfer-Encoding', 'chunked')
self.assertEqual(response.chunks, self.chunks) self.assertEqual(response.chunks, self.chunks)
else:
response.assertHeader('Transfer-Encoding', False)
response.assertHeader('Content-Length', str(len(self.body())))
self.assertEqual(response.chunks, False)
def test_no_chunked_http_1_0(self): def test_no_chunked_http_1_0(self):
with self.makefile() as fd: with self.makefile() as fd:
...@@ -743,22 +754,18 @@ class TestUseWrite(TestCase): ...@@ -743,22 +754,18 @@ class TestUseWrite(TestCase):
with self.makefile() as fd: with self.makefile() as fd:
fd.write('GET /no-content-length HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET /no-content-length HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body + self.end) response = read_http(fd, body=self.body + self.end)
if server_implements_chunked:
response.assertHeader('Content-Length', False) response.assertHeader('Content-Length', False)
response.assertHeader('Transfer-Encoding', 'chunked') response.assertHeader('Transfer-Encoding', 'chunked')
else:
response.assertHeader('Content-Length', self.content_length)
def test_no_content_length_twice(self): def test_no_content_length_twice(self):
with self.makefile() as fd: with self.makefile() as fd:
fd.write('GET /no-content-length-twice HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET /no-content-length-twice HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body + self.body + self.end) response = read_http(fd, body=self.body + self.body + self.end)
if server_implements_chunked:
response.assertHeader('Content-Length', False) response.assertHeader('Content-Length', False)
response.assertHeader('Transfer-Encoding', 'chunked') response.assertHeader('Transfer-Encoding', 'chunked')
self.assertEqual(response.chunks, [self.body, self.body, self.end]) self.assertEqual(response.chunks, [self.body, self.body, self.end])
else:
response.assertHeader('Content-Length', str(5 + 5 + 3))
class HttpsTestCase(TestCase): class HttpsTestCase(TestCase):
...@@ -1005,10 +1012,7 @@ class TestEmptyYield(TestCase): ...@@ -1005,10 +1012,7 @@ class TestEmptyYield(TestCase):
yield b"" yield b""
def test_err(self): def test_err(self):
if server_implements_chunked:
chunks = [] chunks = []
else:
chunks = False
with self.makefile() as fd: with self.makefile() as fd:
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
...@@ -1028,10 +1032,7 @@ class TestFirstEmptyYield(TestCase): ...@@ -1028,10 +1032,7 @@ class TestFirstEmptyYield(TestCase):
yield b"hello" yield b"hello"
def test_err(self): def test_err(self):
if server_implements_chunked:
chunks = [b'hello'] chunks = [b'hello']
else:
chunks = False
with self.makefile() as fd: with self.makefile() as fd:
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
...@@ -1194,13 +1195,7 @@ class ChunkedInputTests(TestCase): ...@@ -1194,13 +1195,7 @@ class ChunkedInputTests(TestCase):
read_http(fd, body="pong") read_http(fd, body="pong")
def ping_if_possible(self, fd): def ping_if_possible(self, fd):
try:
self.ping(fd) self.ping(fd)
except ConnectionClosed:
if server_implements_pipeline:
raise
with self.makefile() as fd2:
self.ping(fd2)
def test_short_read_with_content_length(self): def test_short_read_with_content_length(self):
body = self.body() body = self.body()
...@@ -1239,14 +1234,7 @@ class ChunkedInputTests(TestCase): ...@@ -1239,14 +1234,7 @@ class ChunkedInputTests(TestCase):
with self.makefile() as fd: with self.makefile() as fd:
fd.write(req) fd.write(req)
try:
read_http(fd, body="pong") read_http(fd, body="pong")
except AssertionError as ex:
if str(ex).startswith('Unexpected code: 400'):
if not server_implements_chunked:
print('ChunkedNotImplementedWarning')
return
raise
self.ping_if_possible(fd) self.ping_if_possible(fd)
...@@ -1324,16 +1312,8 @@ class Expect100ContinueTests(TestCase): ...@@ -1324,16 +1312,8 @@ class Expect100ContinueTests(TestCase):
def test_continue(self): def test_continue(self):
with self.makefile() as fd: with self.makefile() as fd:
fd.write('PUT / HTTP/1.1\r\nHost: localhost\r\nContent-length: 1025\r\nExpect: 100-continue\r\n\r\n') fd.write('PUT / HTTP/1.1\r\nHost: localhost\r\nContent-length: 1025\r\nExpect: 100-continue\r\n\r\n')
try:
read_http(fd, code=417, body="failure") read_http(fd, code=417, body="failure")
except AssertionError as ex:
if str(ex).startswith('Unexpected code: 400'):
if not server_implements_100continue:
print('100ContinueNotImplementedWarning')
return
raise
fd.write('PUT / HTTP/1.1\r\nHost: localhost\r\nContent-length: 7\r\nExpect: 100-continue\r\n\r\ntesting') fd.write('PUT / HTTP/1.1\r\nHost: localhost\r\nContent-length: 7\r\nExpect: 100-continue\r\n\r\ntesting')
read_http(fd, code=100) read_http(fd, code=100)
...@@ -1824,7 +1804,6 @@ class TestEnviron(TestCase): ...@@ -1824,7 +1804,6 @@ class TestEnviron(TestCase):
self.assertEqual(json.dumps(bltin), json.dumps(env)) self.assertEqual(json.dumps(bltin), json.dumps(env))
del CommonTests
if __name__ == '__main__': if __name__ == '__main__':
greentest.main() greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment