Commit 44a49517 authored by Jim Fulton's avatar Jim Fulton

Added support for message iterators. This allows one, for example, to

use an iterator to send a large file without loading it in memory.
parent 8f074201
...@@ -101,7 +101,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -101,7 +101,7 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
self.__state = 0 self.__state = 0
self.__has_mac = 0 self.__has_mac = 0
self.__msg_size = 4 self.__msg_size = 4
self.__output_lock = threading.Lock() # Protects __output self.__output_messages = []
self.__output = [] self.__output = []
self.__closed = False self.__closed = False
# Each side of the connection sends and receives messages. A # Each side of the connection sends and receives messages. A
...@@ -129,8 +129,30 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -129,8 +129,30 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
def setSessionKey(self, sesskey): def setSessionKey(self, sesskey):
log("set session key %r" % sesskey) log("set session key %r" % sesskey)
# Low-level construction is now delayed until data are sent.
# This is to allow use of iterators that generate messages
# only when we're ready to do I/O so that we can effeciently
# transmit large files. Because we delay messages, we also
# have to delay setting the session key to retain proper
# ordering.
# The low-level output queue supports strings, a special close
# marker, and iterators. It doesn't support callbacks. We
# can create a allback by providing an iterator that doesn't
# yield anything.
# The hack fucntion below is a callback in iterator's
# clothing. :) It never yields anything, but is a generator
# and thus iterator, because it contains a yield statement.
def hack():
self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha) self.__hmac_send = hmac.HMAC(sesskey, digestmod=sha)
self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha) self.__hmac_recv = hmac.HMAC(sesskey, digestmod=sha)
if False:
yield ''
self.message_output(hack())
def get_addr(self): def get_addr(self):
return self.addr return self.addr
...@@ -232,19 +254,34 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -232,19 +254,34 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
return True return True
def writable(self): def writable(self):
if len(self.__output) == 0: return bool(self.__output_messages or self.__output)
return False
else:
return True
def should_close(self): def should_close(self):
self.__output.append(_close_marker) self.__output_messages.append(_close_marker)
def handle_write(self): def handle_write(self):
self.__output_lock.acquire()
try:
output = self.__output output = self.__output
while output: messages = self.__output_messages
while output or messages:
# Process queued messages until we have enough output
size = sum((len(s) for s in output))
while (size <= SEND_SIZE) and messages:
message = messages[0]
if message.__class__ is str:
size += self.__message_output(messages.pop(0), output)
elif message is _close_marker:
del messages[:]
del output[:]
return self.close()
else:
try:
message = message.next()
except StopIteration:
messages.pop(0)
else:
size += self.__message_output(message, output)
# Accumulate output into a single string so that we avoid # Accumulate output into a single string so that we avoid
# multiple send() calls, but avoid accumulating too much # multiple send() calls, but avoid accumulating too much
# data. If we send a very small string and have more data # data. If we send a very small string and have more data
...@@ -252,16 +289,9 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -252,16 +289,9 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
# unfortunate interaction between the Nagle algorithm and # unfortunate interaction between the Nagle algorithm and
# delayed acks. If we send a very large string, only a # delayed acks. If we send a very large string, only a
# portion of it will actually be delivered at a time. # portion of it will actually be delivered at a time.
l = 0 l = 0
for i in range(len(output)): for i in range(len(output)):
try:
l += len(output[i]) l += len(output[i])
except TypeError:
# We had an output marker, close the connection
assert output[i] is _close_marker
return self.close()
if l > SEND_SIZE: if l > SEND_SIZE:
break break
...@@ -276,42 +306,38 @@ class SizedMessageAsyncConnection(asyncore.dispatcher): ...@@ -276,42 +306,38 @@ class SizedMessageAsyncConnection(asyncore.dispatcher):
if err[0] in expected_socket_write_errors: if err[0] in expected_socket_write_errors:
break # we couldn't write anything break # we couldn't write anything
raise raise
if n < len(v):
if n < l:
output.insert(0, v[n:]) output.insert(0, v[n:])
break # we can't write any more break # we can't write any more
finally:
self.__output_lock.release()
def handle_close(self): def handle_close(self):
self.close() self.close()
def message_output(self, message): def message_output(self, message):
if __debug__:
if self._debug:
log("message_output %d bytes: %s hmac=%d" %
(len(message), short_repr(message),
self.__hmac_send and 1 or 0),
level=TRACE)
if self.__closed: if self.__closed:
raise DisconnectedError( raise DisconnectedError(
"This action is temporarily unavailable.<p>") "This action is temporarily unavailable.<p>")
self.__output_lock.acquire() self.__output_messages.append(message)
try:
def __message_output(self, message, output):
# do two separate appends to avoid copying the message string # do two separate appends to avoid copying the message string
size = 4
if self.__hmac_send: if self.__hmac_send:
self.__output.append(struct.pack(">I", len(message) | MAC_BIT)) output.append(struct.pack(">I", len(message) | MAC_BIT))
self.__hmac_send.update(message) self.__hmac_send.update(message)
self.__output.append(self.__hmac_send.digest()) output.append(self.__hmac_send.digest())
size += 20
else: else:
self.__output.append(struct.pack(">I", len(message))) output.append(struct.pack(">I", len(message)))
if len(message) <= SEND_SIZE: if len(message) <= SEND_SIZE:
self.__output.append(message) output.append(message)
else: else:
for i in range(0, len(message), SEND_SIZE): for i in range(0, len(message), SEND_SIZE):
self.__output.append(message[i:i+SEND_SIZE]) output.append(message[i:i+SEND_SIZE])
finally:
self.__output_lock.release() return size + len(message)
def close(self): def close(self):
if not self.__closed: if not self.__closed:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment