Commit f0942fa4 authored by Jim Fulton's avatar Jim Fulton

Provide shorter code path for loads, which are most common operation.

Simplified and optimized marshalling code.
parent 532ecb81
......@@ -24,7 +24,7 @@ import logging
import ZEO.ServerStub
from ZEO.ClientStorage import ClientStorage
from ZEO.Exceptions import ClientDisconnected
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.marshal import encode
from ZEO.tests import forker
from ZODB.DB import DB
......@@ -475,7 +475,7 @@ class ConnectionTests(CommonSetupTearDown):
class Hack:
pass
msg = Marshaller().encode(1, 0, "foo", (Hack(),))
msg = encode(1, 0, "foo", (Hack(),))
self._bad_message(msg)
del Hack
......
......@@ -13,11 +13,13 @@
##############################################################################
import asyncore
import atexit
import cPickle
import errno
import select
import sys
import threading
import logging
import ZEO.zrpc.marshal
import traceback, time
......@@ -25,7 +27,6 @@ import ZEO.zrpc.trigger
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
from ZEO.zrpc.marshal import Marshaller, ServerMarshaller
from ZEO.zrpc.log import short_repr, log
from ZODB.loglevels import BLATHER, TRACE
import ZODB.POSException
......@@ -287,7 +288,10 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# our peer.
def __init__(self, sock, addr, obj, tag, map=None):
self.obj = None
self.marshal = Marshaller()
self.decode = ZEO.zrpc.marshal.decode
self.encode = ZEO.zrpc.marshal.encode
self.fast_encode = ZEO.zrpc.marshal.fast_encode
self.closed = False
self.peer_protocol_version = None # set in recv_handshake()
......@@ -413,13 +417,34 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# will raise an exception. The exception will ultimately
# result in asycnore calling handle_error(), which will
# close the connection.
msgid, async, name, args = self.marshal.decode(message)
msgid, async, name, args = self.decode(message)
if debug_zrpc:
self.log("recv msg: %s, %s, %s, %s" % (msgid, async, name,
short_repr(args)),
level=TRACE)
if name == REPLY:
if name == 'loadEx':
# Special case and inline the heck out of load case:
try:
ret = self.obj.loadEx(*args)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, msg:
if not isinstance(msg, self.unlogged_exception_types):
self.log("%s() raised exception: %s" % (name, msg),
logging.ERROR, exc_info=True)
self.return_error(msgid, *sys.exc_info()[:2])
else:
try:
self.message_output(self.fast_encode(msgid, 0, REPLY, ret))
self.poll()
except:
# Fall back to normal version for better error handling
self.send_reply(msgid, ret)
elif name == REPLY:
assert not async
self.handle_reply(msgid, args)
else:
......@@ -493,14 +518,14 @@ class Connection(smac.SizedMessageAsyncConnection, object):
# it's acceptable -- we really do want to catch every exception
# cPickle may raise.
try:
msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
msg = self.encode(msgid, 0, REPLY, (err_type, err_value))
except: # see above
try:
r = short_repr(err_value)
except:
r = "<unreprable>"
err = ZRPCError("Couldn't pickle error %.100s" % r)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
msg = self.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
self.poll()
......@@ -527,7 +552,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
if debug_zrpc:
self.log("send msg: %d, %d, %s, ..." % (msgid, async, method),
level=TRACE)
buf = self.marshal.encode(msgid, async, method, args)
buf = self.encode(msgid, async, method, args)
self.message_output(buf)
return msgid
......@@ -560,7 +585,7 @@ class Connection(smac.SizedMessageAsyncConnection, object):
The calls will not be interleaved with other calls from the same
client.
"""
self.message_output(self.marshal.encode(0, 1, method, args)
self.message_output(self.encode(0, 1, method, args)
for method, args in iterator)
def handle_reply(self, msgid, ret):
......@@ -573,6 +598,8 @@ class Connection(smac.SizedMessageAsyncConnection, object):
self.trigger.pull_trigger()
# import cProfile, time
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
......@@ -583,7 +610,9 @@ class ManagedServerConnection(Connection):
self.mgr = mgr
map = {}
Connection.__init__(self, sock, addr, obj, 'S', map=map)
self.marshal = ServerMarshaller()
self.decode = ZEO.zrpc.marshal.server_decode
self.trigger = ZEO.zrpc.trigger.trigger(map)
self.call_from_thread = self.trigger.pull_trigger
......@@ -591,6 +620,15 @@ class ManagedServerConnection(Connection):
t.setDaemon(True)
t.start()
# self.profile = cProfile.Profile()
# def message_input(self, message):
# self.profile.enable()
# try:
# Connection.message_input(self, message)
# finally:
# self.profile.disable()
def handshake(self):
# Send the server's preferred protocol to the client.
self.message_output(self.current_protocol)
......@@ -602,6 +640,7 @@ class ManagedServerConnection(Connection):
def close(self):
self.obj.notifyDisconnected()
Connection.close(self)
# self.profile.dump_stats(str(time.time())+'.stats')
def send_reply(self, msgid, ret, immediately=True):
# encode() can pass on a wide variety of exceptions from cPickle.
......@@ -609,14 +648,14 @@ class ManagedServerConnection(Connection):
# it's acceptable -- we really do want to catch every exception
# cPickle may raise.
try:
msg = self.marshal.encode(msgid, 0, REPLY, ret)
msg = self.encode(msgid, 0, REPLY, ret)
except: # see above
try:
r = short_repr(ret)
except:
r = "<unreprable>"
err = ZRPCError("Couldn't pickle return %.100s" % r)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
msg = self.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
if immediately:
self.poll()
......
......@@ -11,60 +11,65 @@
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import cPickle
from cPickle import Unpickler, Pickler
from cStringIO import StringIO
import logging
from ZEO.zrpc.error import ZRPCError
from ZEO.zrpc.log import log, short_repr
class Marshaller:
"""Marshal requests and replies to second across network"""
def encode(self, msgid, flags, name, args):
"""Returns an encoded message"""
# (We used to have a global pickler, but that's not thread-safe. :-( )
# Note that args may contain very large binary pickles already; for
# this reason, it's important to use proto 1 (or higher) pickles here
# too. For a long time, this used proto 0 pickles, and that can
# bloat our pickle to 4x the size (due to high-bit and control bytes
# being represented by \xij escapes in proto 0).
# Undocumented: cPickle.Pickler accepts a lone protocol argument;
# pickle.py does not.
pickler = cPickle.Pickler(1)
pickler.fast = 1
# Undocumented: pickler.dump(), for a cPickle.Pickler, takes
# an optional boolean argument. When true, it returns the pickle;
# when false or unspecified, it returns the pickler object itself.
# pickle.py does none of this.
return pickler.dump((msgid, flags, name, args), 1)
def decode(self, msg):
"""Decodes msg and returns its parts"""
unpickler = cPickle.Unpickler(StringIO(msg))
unpickler.find_global = find_global
try:
return unpickler.load() # msgid, flags, name, args
except:
log("can't decode message: %s" % short_repr(msg),
level=logging.ERROR)
raise
class ServerMarshaller(Marshaller):
def decode(self, msg):
"""Decodes msg and returns its parts"""
unpickler = cPickle.Unpickler(StringIO(msg))
unpickler.find_global = server_find_global
try:
return unpickler.load() # msgid, flags, name, args
except:
log("can't decode message: %s" % short_repr(msg),
level=logging.ERROR)
raise
def encode(*args): # args: (msgid, flags, name, args)
# (We used to have a global pickler, but that's not thread-safe. :-( )
# It's not thread safe if, in the couse of pickling, we call the
# Python interpeter, which releases the GIL.
# Note that args may contain very large binary pickles already; for
# this reason, it's important to use proto 1 (or higher) pickles here
# too. For a long time, this used proto 0 pickles, and that can
# bloat our pickle to 4x the size (due to high-bit and control bytes
# being represented by \xij escapes in proto 0).
# Undocumented: cPickle.Pickler accepts a lone protocol argument;
# pickle.py does not.
pickler = Pickler(1)
pickler.fast = 1
return pickler.dump(args, 1)
@apply
def fast_encode():
# Only use in cases where you *know* the data contains only basic
# Python objects
pickler = Pickler(1)
pickler.fast = 1
dump = pickler.dump
def fast_encode(*args):
return dump(args, 1)
return fast_encode
def decode(msg):
"""Decodes msg and returns its parts"""
unpickler = Unpickler(StringIO(msg))
unpickler.find_global = find_global
try:
return unpickler.load() # msgid, flags, name, args
except:
log("can't decode message: %s" % short_repr(msg),
level=logging.ERROR)
raise
def server_decode(msg):
"""Decodes msg and returns its parts"""
unpickler = Unpickler(StringIO(msg))
unpickler.find_global = server_find_global
try:
return unpickler.load() # msgid, flags, name, args
except:
log("can't decode message: %s" % short_repr(msg),
level=logging.ERROR)
raise
_globals = globals()
_silly = ('__doc__',)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment