Commit 2eba4f56 authored by Amos Latteier's avatar Amos Latteier

Improved FCGI server's memory use for large uploads and downloads. Also fixed...

Improved FCGI server's memory use for large uploads and downloads. Also fixed broken shutdown handling.
parent 2fd740aa
......@@ -109,7 +109,7 @@ from PubCore import handle
from PubCore.ZEvent import Wakeup
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.HTTPRequest import HTTPRequest
from Producers import ShutdownProducer, LoggingProducer
from Producers import ShutdownProducer, LoggingProducer, file_part_producer, file_close_producer
from cStringIO import StringIO
from tempfile import TemporaryFile
......@@ -338,7 +338,8 @@ class FCGIChannel(asynchat.async_chat):
great loss.
"""
closed=0
using_temp_stdin=None
def __init__(self, server, sock, addr):
self.server = server
self.addr = addr
......@@ -437,6 +438,14 @@ class FCGIChannel(asynchat.async_chat):
if rec.contentLength == 0: # end of the stream
self.remainingRecs = self.remainingRecs - 1
else:
# see if stdin is getting too big, and
# replace it with a tempfile if necessary
if len(rec.content) + self.stdin.tell() > 1048576 and \
not self.using_temp_stdin:
t=TemporaryFile()
t.write(self.stdin.getvalue())
self.stdin=t
self.using_temp_stdin=1
self.stdin.write(rec.content)
# read some filter data
......@@ -470,7 +479,7 @@ class FCGIChannel(asynchat.async_chat):
to ZPublisher for processing.
"""
response = FCGIResponse(stdout = FCGIPipe(self, FCGI_STDOUT),
stderr = FCGIPipe(self, FCGI_STDERR))
stderr = StringIO())
response.setChannel(self)
request = HTTPRequest(self.stdin, self.env, response)
handle(self.server.module, request, response)
......@@ -530,12 +539,31 @@ class FCGIChannel(asynchat.async_chat):
rec.recType = recType
rec.reqId = self.requestId
# Can't send more than 64K minus header size. 8K seems about right.
while data:
chunk = data[:8192]
data = data[8192:]
rec.content = chunk
self.push(rec.getRecordAsString(), 0)
if type(data)==type(''):
# send some string data
while data:
chunk = data[:8192]
data = data[8192:]
rec.content = chunk
self.push(rec.getRecordAsString(), 0)
else:
# send a producer
p, cLen=data
eLen = (cLen + 7) & (0xFFFF - 7) # align to an 8-byte boundary
padLen = eLen - cLen
hdr = [ rec.version,
rec.recType,
rec.reqId >> 8,
rec.reqId & 0xFF,
cLen >> 8,
cLen & 0xFF,
padLen,
0]
hdr = string.join(map(chr, hdr), '')
self.push(hdr, 0)
self.push(p, 0)
self.push(padLen * '\000', 0)
def sendStreamTerminator(self, recType):
rec = FCGIRecord()
......@@ -544,7 +572,6 @@ class FCGIChannel(asynchat.async_chat):
rec.content = ""
self.push(rec.getRecordAsString(), 0)
def sendEndRecord(self, appStatus=0):
rec = FCGIRecord()
rec.recType = FCGI_END_REQUEST
......@@ -667,17 +694,52 @@ class FCGIServer(asyncore.dispatcher):
class FCGIResponse(HTTPResponse):
_tempfile=None
_tempstart=0
def setChannel(self, channel):
self.channel = channel
def write(self, data):
stdout=self.stdout
if not self._wrote:
self.stdout.write(str(self))
l=self.headers.get('content-length', None)
if l is not None:
try:
if type(l) is type(''): l=string.atoi(l)
if l > 128000:
self._tempfile=TemporaryFile()
except: pass
stdout.write(str(self))
self._wrote=1
self.stdout.write(data)
if not data: return
t=self._tempfile
if t is None:
stdout.write(data)
else:
while data:
# write file producers
# each producer holds 32K data
chunk=data[:32768]
data=data[32768:]
l=len(chunk)
b=self._tempstart
e=b+l
t.seek(b)
t.write(chunk)
self._tempstart=e
stdout.write((file_part_producer(t,b,e), l))
def _finish(self):
self.channel.sendStreamTerminator(FCGI_STDERR)
t=self._tempfile
if t is not None:
self.stdout.write((file_close_producer(t), 0))
self._tempfile=None
self.channel.sendStreamTerminator(FCGI_STDOUT)
self.channel.sendEndRecord()
self.stdout.close()
......@@ -691,20 +753,19 @@ class FCGIResponse(HTTPResponse):
r = self.headers.get('bobo-exception-value','0')
try: r=string.atoi(r)
except: r = r and 1 or 0
shutdown = r
shutdown = r,
if not self.channel.closed:
self.channel.push_with_producer(LoggingProducer(self.channel,
self.stdout.length,
'log_request'))
'log_request'), 0)
if shutdown:
sys.ZServerExitCode = shutdown
self.channel.push_with_producer(ShutdownProducer())
sys.ZServerExitCode = shutdown[0]
self.channel.push(ShutdownProducer(), 0)
Wakeup(lambda: asyncore.close_all())
else:
self.channel.push(None,0)
Wakeup()
self.channel.close_when_done()
self.channel=None
......@@ -724,8 +785,11 @@ class FCGIPipe:
def write(self, data):
datalen = len(data)
if datalen:
if type(data)==type(''):
datalen = len(data)
else:
p, datalen = data
if data:
self.channel.sendDataRecord(data, self.recType)
self.length = self.length + datalen
......
......@@ -109,7 +109,7 @@ from PubCore import handle
from PubCore.ZEvent import Wakeup
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.HTTPRequest import HTTPRequest
from Producers import ShutdownProducer, LoggingProducer
from Producers import ShutdownProducer, LoggingProducer, file_part_producer, file_close_producer
from cStringIO import StringIO
from tempfile import TemporaryFile
......@@ -338,7 +338,8 @@ class FCGIChannel(asynchat.async_chat):
great loss.
"""
closed=0
using_temp_stdin=None
def __init__(self, server, sock, addr):
self.server = server
self.addr = addr
......@@ -437,6 +438,14 @@ class FCGIChannel(asynchat.async_chat):
if rec.contentLength == 0: # end of the stream
self.remainingRecs = self.remainingRecs - 1
else:
# see if stdin is getting too big, and
# replace it with a tempfile if necessary
if len(rec.content) + self.stdin.tell() > 1048576 and \
not self.using_temp_stdin:
t=TemporaryFile()
t.write(self.stdin.getvalue())
self.stdin=t
self.using_temp_stdin=1
self.stdin.write(rec.content)
# read some filter data
......@@ -470,7 +479,7 @@ class FCGIChannel(asynchat.async_chat):
to ZPublisher for processing.
"""
response = FCGIResponse(stdout = FCGIPipe(self, FCGI_STDOUT),
stderr = FCGIPipe(self, FCGI_STDERR))
stderr = StringIO())
response.setChannel(self)
request = HTTPRequest(self.stdin, self.env, response)
handle(self.server.module, request, response)
......@@ -530,12 +539,31 @@ class FCGIChannel(asynchat.async_chat):
rec.recType = recType
rec.reqId = self.requestId
# Can't send more than 64K minus header size. 8K seems about right.
while data:
chunk = data[:8192]
data = data[8192:]
rec.content = chunk
self.push(rec.getRecordAsString(), 0)
if type(data)==type(''):
# send some string data
while data:
chunk = data[:8192]
data = data[8192:]
rec.content = chunk
self.push(rec.getRecordAsString(), 0)
else:
# send a producer
p, cLen=data
eLen = (cLen + 7) & (0xFFFF - 7) # align to an 8-byte boundary
padLen = eLen - cLen
hdr = [ rec.version,
rec.recType,
rec.reqId >> 8,
rec.reqId & 0xFF,
cLen >> 8,
cLen & 0xFF,
padLen,
0]
hdr = string.join(map(chr, hdr), '')
self.push(hdr, 0)
self.push(p, 0)
self.push(padLen * '\000', 0)
def sendStreamTerminator(self, recType):
rec = FCGIRecord()
......@@ -544,7 +572,6 @@ class FCGIChannel(asynchat.async_chat):
rec.content = ""
self.push(rec.getRecordAsString(), 0)
def sendEndRecord(self, appStatus=0):
rec = FCGIRecord()
rec.recType = FCGI_END_REQUEST
......@@ -667,17 +694,52 @@ class FCGIServer(asyncore.dispatcher):
class FCGIResponse(HTTPResponse):
_tempfile=None
_tempstart=0
def setChannel(self, channel):
self.channel = channel
def write(self, data):
stdout=self.stdout
if not self._wrote:
self.stdout.write(str(self))
l=self.headers.get('content-length', None)
if l is not None:
try:
if type(l) is type(''): l=string.atoi(l)
if l > 128000:
self._tempfile=TemporaryFile()
except: pass
stdout.write(str(self))
self._wrote=1
self.stdout.write(data)
if not data: return
t=self._tempfile
if t is None:
stdout.write(data)
else:
while data:
# write file producers
# each producer holds 32K data
chunk=data[:32768]
data=data[32768:]
l=len(chunk)
b=self._tempstart
e=b+l
t.seek(b)
t.write(chunk)
self._tempstart=e
stdout.write((file_part_producer(t,b,e), l))
def _finish(self):
self.channel.sendStreamTerminator(FCGI_STDERR)
t=self._tempfile
if t is not None:
self.stdout.write((file_close_producer(t), 0))
self._tempfile=None
self.channel.sendStreamTerminator(FCGI_STDOUT)
self.channel.sendEndRecord()
self.stdout.close()
......@@ -691,20 +753,19 @@ class FCGIResponse(HTTPResponse):
r = self.headers.get('bobo-exception-value','0')
try: r=string.atoi(r)
except: r = r and 1 or 0
shutdown = r
shutdown = r,
if not self.channel.closed:
self.channel.push_with_producer(LoggingProducer(self.channel,
self.stdout.length,
'log_request'))
'log_request'), 0)
if shutdown:
sys.ZServerExitCode = shutdown
self.channel.push_with_producer(ShutdownProducer())
sys.ZServerExitCode = shutdown[0]
self.channel.push(ShutdownProducer(), 0)
Wakeup(lambda: asyncore.close_all())
else:
self.channel.push(None,0)
Wakeup()
self.channel.close_when_done()
self.channel=None
......@@ -724,8 +785,11 @@ class FCGIPipe:
def write(self, data):
datalen = len(data)
if datalen:
if type(data)==type(''):
datalen = len(data)
else:
p, datalen = data
if data:
self.channel.sendDataRecord(data, self.recType)
self.length = self.length + datalen
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment