Commit 3834eac2 authored by Denis Bilenko's avatar Denis Bilenko

Fix issue #86: bytearrays are not supported by pywsgi. based on pull request...

Fix issue #86: bytearrays are not supported by pywsgi. based on pull request #174 by Aaron Westendorf.

- on Python >= 2.6, where bytearray is present, it is now used to prepare the data
- on Python 2.5, string.join is continued to be used
- add test for bytearrays to test__pywsgi.py
parent aa9e748c
......@@ -345,11 +345,60 @@ class WSGIHandler(object):
self.response_use_chunked = True
self.response_headers.append(('Transfer-Encoding', 'chunked'))
def _sendall(self, data):
try:
self.socket.sendall(data)
except socket.error, ex:
self.status = 'socket error: %s' % ex
if self.code > 0:
self.code = -self.code
raise
self.response_length += len(data)
def _write(self, data):
if not data:
return
if self.response_use_chunked:
## Write the chunked encoding
data = "%x\r\n%s\r\n" % (len(data), data)
self._sendall(data)
def write(self, data):
towrite = []
if self.code in (304, 204) and data:
raise AssertionError('The %s response must have no body' % self.code)
if self.headers_sent:
self._write(data)
else:
if not self.status:
raise AssertionError("The application did not call start_response()")
if not self.headers_sent:
self._write_with_headers(data)
if sys.version_info[:2] >= (2, 6):
def _write_with_headers(self, data):
towrite = bytearray()
self.headers_sent = True
self.finalize_headers()
towrite.extend('%s %s\r\n' % (self.request_version, self.status))
for header in self.response_headers:
towrite.extend('%s: %s\r\n' % header)
towrite.extend('\r\n')
if data:
if self.response_use_chunked:
## Write the chunked encoding
towrite.extend("%x\r\n%s\r\n" % (len(data), data))
else:
towrite.extend(data)
self._sendall(towrite)
else:
# Python 2.5 does not have bytearray
def _write_with_headers(self, data):
towrite = []
self.headers_sent = True
self.finalize_headers()
......@@ -358,26 +407,13 @@ class WSGIHandler(object):
towrite.append('%s: %s\r\n' % header)
towrite.append('\r\n')
if data:
if self.code in (304, 204):
raise AssertionError('The %s response must have no body' % self.code)
if self.response_use_chunked:
## Write the chunked encoding
towrite.append("%x\r\n%s\r\n" % (len(data), data))
else:
towrite.append(data)
msg = ''.join(towrite)
try:
self.socket.sendall(msg)
except socket.error, ex:
self.status = 'socket error: %s' % ex
if self.code > 0:
self.code = -self.code
raise
self.response_length += len(msg)
self._sendall(''.join(towrite))
def start_response(self, status, headers, exc_info=None):
if exc_info:
......
......@@ -371,6 +371,23 @@ class TestYield(CommonTests):
yield "not found"
if 'bytearray' in __builtins__.__dict__:
class TestBytearray(CommonTests):
validator = None
@staticmethod
def application(env, start_response):
path = env['PATH_INFO']
if path == '/':
start_response('200 OK', [('Content-Type', 'text/plain')])
return [bytearray("hello "), bytearray("world")]
else:
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [bytearray("not found")]
class TestGetArg(TestCase):
@staticmethod
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment