Commit f8cc6401 authored by Gregory P. Smith's avatar Gregory P. Smith

This should fix issue2632. A long description of the two competing

problems is in the bug report (one old, one recently introduced trying
to fix the old one).  In short:

buffer data during socket._fileobject.read() and readlines() within a
cStringIO object instead of a [] of str()s returned from the recv()
call.

This prevents excessive memory use due to the size parameter being
passed to recv() being grossly larger than the actual size of the data
returned *and* prevents excessive cpu usage due to looping in python
calling recv() with a very tiny size value if min() is used as the
previous memory-use bug "fix" did.

It also documents what the socket._fileobject._rbufsize member is
actually used for.

This is a candidate for back porting to 2.5.
parent b457ddaf
...@@ -78,6 +78,11 @@ else: ...@@ -78,6 +78,11 @@ else:
import os, sys, warnings import os, sys, warnings
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
try: try:
from errno import EBADF from errno import EBADF
except ImportError: except ImportError:
...@@ -234,6 +239,9 @@ class _fileobject(object): ...@@ -234,6 +239,9 @@ class _fileobject(object):
bufsize = self.default_bufsize bufsize = self.default_bufsize
self.bufsize = bufsize self.bufsize = bufsize
self.softspace = False self.softspace = False
# _rbufsize is the suggested recv buffer size. It is *strictly*
# obeyed within readline() for recv calls. If it is larger than
# default_bufsize it will be used for recv calls within read().
if bufsize == 0: if bufsize == 0:
self._rbufsize = 1 self._rbufsize = 1
elif bufsize == 1: elif bufsize == 1:
...@@ -241,7 +249,11 @@ class _fileobject(object): ...@@ -241,7 +249,11 @@ class _fileobject(object):
else: else:
self._rbufsize = bufsize self._rbufsize = bufsize
self._wbufsize = bufsize self._wbufsize = bufsize
self._rbuf = "" # A string # We use StringIO for the read buffer to avoid holding a list
# of variously sized string objects which have been known to
# fragment the heap due to how they are malloc()ed and often
# realloc()ed down much smaller than their original allocation.
self._rbuf = StringIO()
self._wbuf = [] # A list of strings self._wbuf = [] # A list of strings
self._close = close self._close = close
...@@ -299,56 +311,86 @@ class _fileobject(object): ...@@ -299,56 +311,86 @@ class _fileobject(object):
return buf_len return buf_len
def read(self, size=-1): def read(self, size=-1):
data = self._rbuf # Use max, disallow tiny reads in a loop as they are very inefficient.
# We never leave read() with any leftover data in our internal buffer.
rbufsize = max(self._rbufsize, self.default_bufsize)
# Our use of StringIO rather than lists of string objects returned by
# recv() minimizes memory usage and fragmentation that occurs when
# rbufsize is large compared to the typical return value of recv().
buf = self._rbuf
buf.seek(0, 2) # seek end
if size < 0: if size < 0:
# Read until EOF # Read until EOF
buffers = [] self._rbuf = StringIO() # reset _rbuf. we consume it via buf.
if data:
buffers.append(data)
self._rbuf = ""
if self._rbufsize <= 1:
recv_size = self.default_bufsize
else:
recv_size = self._rbufsize
while True: while True:
data = self._sock.recv(recv_size) data = self._sock.recv(rbufsize)
if not data: if not data:
break break
buffers.append(data) buf.write(data)
return "".join(buffers) return buf.getvalue()
else: else:
# Read until size bytes or EOF seen, whichever comes first # Read until size bytes or EOF seen, whichever comes first
buf_len = len(data) buf_len = buf.tell()
if buf_len >= size: if buf_len >= size:
self._rbuf = data[size:] # Already have size bytes in our buffer? Extract and return.
return data[:size] buf.seek(0)
buffers = [] rv = buf.read(size)
if data: self._rbuf = StringIO()
buffers.append(data) self._rbuf.write(buf.read())
self._rbuf = "" return rv
self._rbuf = StringIO() # reset _rbuf. we consume it via buf.
while True: while True:
left = size - buf_len left = size - buf_len
recv_size = min(self._rbufsize, left) # Using max() here means that recv() can malloc a
# large amount of memory even though recv may return
# much less data than that. But the returned data
# string is short lived in that case as we copy it
# into a StringIO and free it.
recv_size = max(rbufsize, left)
data = self._sock.recv(recv_size) data = self._sock.recv(recv_size)
if not data: if not data:
break break
buffers.append(data)
n = len(data) n = len(data)
if n == size and not buf_len:
# Shortcut. Avoid buffer data copies when:
# - We have no data in our buffer.
# AND
# - Our call to recv returned exactly the
# number of bytes we were asked to read.
return data
if n >= left: if n >= left:
self._rbuf = data[left:] # avoids data copy of: buf.write(data[:left])
buffers[-1] = data[:left] buf.write(buffer(data, 0, left))
# avoids data copy of: self._rbuf.write(data[left:])
self._rbuf.write(buffer(data, left))
del data # explicit free
break break
buf.write(data)
buf_len += n buf_len += n
return "".join(buffers) del data # explicit free
#assert buf_len == buf.tell()
return buf.getvalue()
def readline(self, size=-1): def readline(self, size=-1):
data = self._rbuf buf = self._rbuf
if self._rbufsize > 1:
# if we're buffering, check if we already have it in our buffer
buf.seek(0)
bline = buf.readline(size)
if bline.endswith('\n') or len(bline) == size:
self._rbuf = StringIO()
self._rbuf.write(buf.read())
return bline
del bline
buf.seek(0, 2) # seek end
if size < 0: if size < 0:
# Read until \n or EOF, whichever comes first # Read until \n or EOF, whichever comes first
if self._rbufsize <= 1: if self._rbufsize <= 1:
# Speed up unbuffered case # Speed up unbuffered case
assert data == "" assert buf.tell() == 0
buffers = [] buffers = []
data = None
recv = self._sock.recv recv = self._sock.recv
while data != "\n": while data != "\n":
data = recv(1) data = recv(1)
...@@ -356,61 +398,64 @@ class _fileobject(object): ...@@ -356,61 +398,64 @@ class _fileobject(object):
break break
buffers.append(data) buffers.append(data)
return "".join(buffers) return "".join(buffers)
nl = data.find('\n')
if nl >= 0: buf = self._rbuf
nl += 1 buf.seek(0, 2) # seek end
self._rbuf = data[nl:] self._rbuf = StringIO() # reset _rbuf. we consume it via buf.
return data[:nl]
buffers = []
if data:
buffers.append(data)
self._rbuf = ""
while True: while True:
data = self._sock.recv(self._rbufsize) data = self._sock.recv(self._rbufsize)
if not data: if not data:
break break
buffers.append(data)
nl = data.find('\n') nl = data.find('\n')
if nl >= 0: if nl >= 0:
nl += 1 nl += 1
self._rbuf = data[nl:] buf.write(buffer(data, 0, nl))
buffers[-1] = data[:nl] self._rbuf.write(buffer(data, nl))
del data
break break
return "".join(buffers) buf.write(data)
return buf.getvalue()
else: else:
# Read until size bytes or \n or EOF seen, whichever comes first # Read until size bytes or \n or EOF seen, whichever comes first
nl = data.find('\n', 0, size) buf_len = buf.tell()
if nl >= 0:
nl += 1
self._rbuf = data[nl:]
return data[:nl]
buf_len = len(data)
if buf_len >= size: if buf_len >= size:
self._rbuf = data[size:] buf.seek(0)
return data[:size] rv = buf.read(size)
buffers = [] self._rbuf = StringIO()
if data: self._rbuf.write(buf.read())
buffers.append(data) return rv
self._rbuf = "" self._rbuf = StringIO() # reset _rbuf. we consume it via buf.
while True: while True:
data = self._sock.recv(self._rbufsize) data = self._sock.recv(self._rbufsize)
if not data: if not data:
break break
buffers.append(data)
left = size - buf_len left = size - buf_len
# did we just receive a newline?
nl = data.find('\n', 0, left) nl = data.find('\n', 0, left)
if nl >= 0: if nl >= 0:
nl += 1 nl += 1
self._rbuf = data[nl:] # save the excess data to _rbuf
buffers[-1] = data[:nl] self._rbuf.write(buffer(data, nl))
if buf_len:
buf.write(buffer(data, 0, nl))
break break
else:
# Shortcut. Avoid data copy through buf when returning
# a substring of our first recv().
return data[:nl]
n = len(data) n = len(data)
if n == size and not buf_len:
# Shortcut. Avoid data copy through buf when
# returning exactly all of our first recv().
return data
if n >= left: if n >= left:
self._rbuf = data[left:] buf.write(buffer(data, 0, left))
buffers[-1] = data[:left] self._rbuf.write(buffer(data, left))
break break
buf.write(data)
buf_len += n buf_len += n
return "".join(buffers) #assert buf_len == buf.tell()
return buf.getvalue()
def readlines(self, sizehint=0): def readlines(self, sizehint=0):
total = 0 total = 0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment