Commit 25fa1c14 authored by Denis Bilenko's avatar Denis Bilenko

test__pywsgi.py: do not use urllib2 as that makes totalrefcount check fail

parent a40090fc
# @author Donovan Preston
#
# Copyright (c) 2007, Linden Research, Inc. # Copyright (c) 2007, Linden Research, Inc.
# Copyright (c) 2009-2010 gevent contributors
# Permission is hereby granted, free of charge, to any person obtaining a copy # Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal # of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights # in the Software without restriction, including without limitation the rights
...@@ -50,6 +49,9 @@ try: ...@@ -50,6 +49,9 @@ try:
except ImportError: except ImportError:
pass pass
REASONS = {200: 'OK',
500: 'Internal Server Error'}
class ConnectionClosed(Exception): class ConnectionClosed(Exception):
pass pass
...@@ -147,7 +149,7 @@ class Response(object): ...@@ -147,7 +149,7 @@ class Response(object):
if code is not None: if code is not None:
self.assertCode(code) self.assertCode(code)
if reason == 'default': if reason == 'default':
reason = {200: 'OK'}.get(code) reason = REASONS.get(code)
if reason is not None: if reason is not None:
self.assertReason(reason) self.assertReason(reason)
if version is not None: if version is not None:
...@@ -235,11 +237,19 @@ class TestCase(greentest.TestCase): ...@@ -235,11 +237,19 @@ class TestCase(greentest.TestCase):
def connect(self): def connect(self):
return socket.create_connection(('127.0.0.1', self.port)) return socket.create_connection(('127.0.0.1', self.port))
def makefile(self):
return self.connect().makefile(bufsize=1)
def urlopen(self, *args, **kwargs):
fd = self.connect().makefile(bufsize=1)
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
return read_http(fd, *args, **kwargs)
class CommonTests(TestCase): class CommonTests(TestCase):
def test_basic(self): def test_basic(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
response = read_http(fd, body='hello world') response = read_http(fd, body='hello world')
if response.headers.get('Connection') == 'close' and not server_implements_pipeline: if response.headers.get('Connection') == 'close' and not server_implements_pipeline:
...@@ -253,7 +263,7 @@ class CommonTests(TestCase): ...@@ -253,7 +263,7 @@ class CommonTests(TestCase):
def test_pipeline(self): def test_pipeline(self):
if not server_implements_pipeline: if not server_implements_pipeline:
return return
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n' + 'GET /notexist HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n' + 'GET /notexist HTTP/1.1\r\nHost: localhost\r\n\r\n')
read_http(fd, body='hello world') read_http(fd, body='hello world')
exception = AssertionError('HTTP pipelining not supported; the second request is thrown away') exception = AssertionError('HTTP pipelining not supported; the second request is thrown away')
...@@ -269,7 +279,7 @@ class CommonTests(TestCase): ...@@ -269,7 +279,7 @@ class CommonTests(TestCase):
raise raise
def test_connection_close(self): def test_connection_close(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
response = read_http(fd) response = read_http(fd)
if response.headers.get('Connection') == 'close' and not server_implements_pipeline: if response.headers.get('Connection') == 'close' and not server_implements_pipeline:
...@@ -285,7 +295,7 @@ class CommonTests(TestCase): ...@@ -285,7 +295,7 @@ class CommonTests(TestCase):
raise raise
def SKIP_test_006_reject_long_urls(self): def SKIP_test_006_reject_long_urls(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
path_parts = [] path_parts = []
for ii in range(3000): for ii in range(3000):
path_parts.append('path') path_parts.append('path')
...@@ -314,14 +324,14 @@ class TestNoChunks(CommonTests): ...@@ -314,14 +324,14 @@ class TestNoChunks(CommonTests):
return ['not ', 'found'] return ['not ', 'found']
def test(self): def test(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
response = read_http(fd, body='hello world') response = read_http(fd, body='hello world')
assert response.chunks is None, response.chunks assert response.chunks is None, response.chunks
response.assertHeader('Content-Length', '11') response.assertHeader('Content-Length', '11')
if not server_implements_pipeline: if not server_implements_pipeline:
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET /not-found HTTP/1.1\r\nHost: localhost\r\n\r\n') fd.write('GET /not-found HTTP/1.1\r\nHost: localhost\r\n\r\n')
response = read_http(fd, code=404, reason='Not Found', body='not found') response = read_http(fd, code=404, reason='Not Found', body='not found')
...@@ -367,7 +377,7 @@ class TestGetArg(TestCase): ...@@ -367,7 +377,7 @@ class TestGetArg(TestCase):
def test_007_get_arg(self): def test_007_get_arg(self):
# define a new handler that does a get_arg as well as a read_body # define a new handler that does a get_arg as well as a read_body
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
request = '\r\n'.join(( request = '\r\n'.join((
'POST / HTTP/1.0', 'POST / HTTP/1.0',
'Host: localhost', 'Host: localhost',
...@@ -395,7 +405,7 @@ class TestChunkedApp(TestCase): ...@@ -395,7 +405,7 @@ class TestChunkedApp(TestCase):
yield chunk yield chunk
def test_chunked_response(self): def test_chunked_response(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body()) response = read_http(fd, body=self.body())
if server_implements_chunked: if server_implements_chunked:
...@@ -407,7 +417,7 @@ class TestChunkedApp(TestCase): ...@@ -407,7 +417,7 @@ class TestChunkedApp(TestCase):
self.assertEqual(response.chunks, None) self.assertEqual(response.chunks, None)
def test_no_chunked_http_1_0(self): def test_no_chunked_http_1_0(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, version='1.0') response = read_http(fd, version='1.0')
self.assertEqual(response.body, self.body()) self.assertEqual(response.body, self.body())
...@@ -435,19 +445,19 @@ class TestChunkedPost(TestCase): ...@@ -435,19 +445,19 @@ class TestChunkedPost(TestCase):
return [x for x in iter(lambda: env['wsgi.input'].read(1), '')] return [x for x in iter(lambda: env['wsgi.input'].read(1), '')]
def test_014_chunked_post(self): def test_014_chunked_post(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('POST /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n' fd.write('POST /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n' 'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n') '2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
read_http(fd, body='oh hai') read_http(fd, body='oh hai')
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('POST /b HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n' fd.write('POST /b HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n' 'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n') '2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
read_http(fd, body='oh hai') read_http(fd, body='oh hai')
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('POST /c HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n' fd.write('POST /c HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n' 'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n') '2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
...@@ -477,14 +487,14 @@ class TestUseWrite(TestCase): ...@@ -477,14 +487,14 @@ class TestUseWrite(TestCase):
return [self.end] return [self.end]
def test_explicit_content_length(self): def test_explicit_content_length(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET /explicit-content-length HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET /explicit-content-length HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body + self.end) response = read_http(fd, body=self.body + self.end)
response.assertHeader('Content-Length', self.content_length) response.assertHeader('Content-Length', self.content_length)
response.assertHeader('Transfer-Encoding', None) response.assertHeader('Transfer-Encoding', None)
def test_no_content_length(self): def test_no_content_length(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET /no-content-length HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET /no-content-length HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body + self.end) response = read_http(fd, body=self.body + self.end)
if server_implements_chunked: if server_implements_chunked:
...@@ -494,7 +504,7 @@ class TestUseWrite(TestCase): ...@@ -494,7 +504,7 @@ class TestUseWrite(TestCase):
response.assertHeader('Content-Length', self.content_length) response.assertHeader('Content-Length', self.content_length)
def test_no_content_length_twice(self): def test_no_content_length_twice(self):
fd = self.connect().makefile(bufsize=1) fd = self.makefile()
fd.write('GET /no-content-length-twice HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n') fd.write('GET /no-content-length-twice HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body + self.body + self.end) response = read_http(fd, body=self.body + self.body + self.end)
if server_implements_chunked: if server_implements_chunked:
...@@ -513,9 +523,20 @@ class HttpsTestCase(TestCase): ...@@ -513,9 +523,20 @@ class HttpsTestCase(TestCase):
def init_server(self, application): def init_server(self, application):
self.server = self.get_wsgi_module().WSGIServer(('127.0.0.1', 0), application, certfile=self.certfile, keyfile=self.keyfile) self.server = self.get_wsgi_module().WSGIServer(('127.0.0.1', 0), application, certfile=self.certfile, keyfile=self.keyfile)
def urlopen(self, *args, **kwargs): def urlopen(self, method='GET', post_body=None, **kwargs):
req = HTTPRequest("https://localhost:%s/foo" % self.server.server_port, *args, **kwargs) import ssl
return urllib2.urlopen(req) sock = self.connect()
sock = ssl.wrap_socket(sock)
fd = sock.makefile(bufsize=1)
fd.write('%s / HTTP/1.1\r\nHost: localhost\r\n' % method)
if post_body is not None:
fd.write('Content-Length: %s\r\n\r\n' % len(post_body))
fd.write(post_body)
if kwargs.get('body') is None:
kwargs['body'] = post_body
else:
fd.write('\r\n')
return read_http(fd, **kwargs)
def application(self, environ, start_response): def application(self, environ, start_response):
assert environ['wsgi.url_scheme'] == 'https', environ['wsgi.url_scheme'] assert environ['wsgi.url_scheme'] == 'https', environ['wsgi.url_scheme']
...@@ -528,12 +549,12 @@ class TestHttps(HttpsTestCase): ...@@ -528,12 +549,12 @@ class TestHttps(HttpsTestCase):
if hasattr(socket, 'ssl'): if hasattr(socket, 'ssl'):
def test_012_ssl_server(self): def test_012_ssl_server(self):
result = self.urlopen(method="POST", data='abc').read() result = self.urlopen(method="POST", post_body='abc')
self.assertEquals(result, 'abc') self.assertEquals(result.body, 'abc')
def test_013_empty_return(self): def test_013_empty_return(self):
result = self.urlopen().read() result = self.urlopen()
self.assertEquals(result, '') self.assertEquals(result.body, '')
class TestInternational(TestCase): class TestInternational(TestCase):
...@@ -569,7 +590,7 @@ class TestInputReadline(TestCase): ...@@ -569,7 +590,7 @@ class TestInputReadline(TestCase):
return lines return lines
def test(self): def test(self):
fd = self.connect().makefile() fd = self.makefile()
content = 'hello\n\nworld\n123' content = 'hello\n\nworld\n123'
fd.write('POST / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n' fd.write('POST / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Content-Length: %s\r\n\r\n%s' % (len(content), content)) 'Content-Length: %s\r\n\r\n%s' % (len(content), content))
...@@ -606,17 +627,8 @@ class TestError(TestCase): ...@@ -606,17 +627,8 @@ class TestError(TestCase):
def application(env, start_response): def application(env, start_response):
raise greentest.ExpectedException('TestError.application') raise greentest.ExpectedException('TestError.application')
@property
def url(self):
return 'http://127.0.0.1:%s' % self.port
def test(self): def test(self):
try: self.urlopen(code=500)
r = urllib2.urlopen(self.url)
raise AssertionError('Must raise HTTPError, returned %r: %s' % (r, r.code))
except urllib2.HTTPError, ex:
assert ex.code == 500, ex
assert ex.msg == 'Internal Server Error', ex
class TestError_after_start_response(TestError): class TestError_after_start_response(TestError):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment