Commit df07e47d authored by David Wilson's avatar David Wilson

core: de-munge Message.unpickle() vs. Receiver.get().

parent a39cd44b
...@@ -254,17 +254,27 @@ class Message(object): ...@@ -254,17 +254,27 @@ class Message(object):
self.data = cPickle.dumps(CallError(e), protocol=2) self.data = cPickle.dumps(CallError(e), protocol=2)
return self return self
def unpickle(self): def unpickle(self, throw=True):
"""Deserialize `data` into an object.""" """Deserialize `data` into an object."""
IOLOG.debug('%r.unpickle()', self) IOLOG.debug('%r.unpickle()', self)
fp = cStringIO.StringIO(self.data) fp = cStringIO.StringIO(self.data)
unpickler = cPickle.Unpickler(fp) unpickler = cPickle.Unpickler(fp)
unpickler.find_global = self._find_global unpickler.find_global = self._find_global
try: try:
return unpickler.load() # Must occur off the broker thread.
obj = unpickler.load()
except (TypeError, ValueError), ex: except (TypeError, ValueError), ex:
raise StreamError('invalid message: %s', ex) raise StreamError('invalid message: %s', ex)
if throw:
if obj == _DEAD:
raise ChannelError(ChannelError.remote_msg)
if isinstance(obj, CallError):
raise obj
return obj
def __repr__(self): def __repr__(self):
return 'Message(%r, %r, %r, %r, %r, %r..%d)' % ( return 'Message(%r, %r, %r, %r, %r, %r..%d)' % (
self.dst_id, self.src_id, self.auth_id, self.handle, self.dst_id, self.src_id, self.auth_id, self.handle,
...@@ -358,19 +368,7 @@ class Receiver(object): ...@@ -358,19 +368,7 @@ class Receiver(object):
if msg == _DEAD: if msg == _DEAD:
raise ChannelError(ChannelError.local_msg) raise ChannelError(ChannelError.local_msg)
return msg
# Must occur off the broker thread.
data = msg.unpickle()
if data == _DEAD and self.raise_channelerror:
raise ChannelError(ChannelError.remote_msg)
if isinstance(data, CallError):
raise data
return msg, data
def get_data(self, timeout=None):
return self.get(timeout)[1]
def __iter__(self): def __iter__(self):
while True: while True:
...@@ -1230,7 +1228,8 @@ class ExternalContext(object): ...@@ -1230,7 +1228,8 @@ class ExternalContext(object):
fp.close() fp.close()
def _dispatch_calls(self): def _dispatch_calls(self):
for msg, data in self.channel: for msg in self.channel:
data = msg.unpickle(throw=False)
LOG.debug('_dispatch_calls(%r)', data) LOG.debug('_dispatch_calls(%r)', data)
if msg.src_id not in mitogen.parent_ids: if msg.src_id not in mitogen.parent_ids:
LOG.warning('CALL_FUNCTION from non-parent %r', msg.src_id) LOG.warning('CALL_FUNCTION from non-parent %r', msg.src_id)
......
...@@ -564,17 +564,15 @@ class Context(mitogen.core.Context): ...@@ -564,17 +564,15 @@ class Context(mitogen.core.Context):
else: else:
klass = None klass = None
recv = self.send_async( return self.send_async(
mitogen.core.Message.pickled( mitogen.core.Message.pickled(
(fn.__module__, klass, fn.__name__, args, kwargs), (fn.__module__, klass, fn.__name__, args, kwargs),
handle=mitogen.core.CALL_FUNCTION, handle=mitogen.core.CALL_FUNCTION,
) )
) )
recv.raise_channelerror = False
return recv
def call(self, fn, *args, **kwargs): def call(self, fn, *args, **kwargs):
return self.call_async(fn, *args, **kwargs).get_data() return self.call_async(fn, *args, **kwargs).get().unpickle()
class Router(mitogen.parent.Router): class Router(mitogen.parent.Router):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment