Commit 9de1fca3 authored by David Wilson's avatar David Wilson

issue #9: ensure messages arrive on the expected stream

If no ADD_ROUTE message has been received from the master associating a
stream with a particular context ID, then it is expected messages
originating from that context ID can only be routed via the parent.
parent 4de321a3
......@@ -638,7 +638,7 @@ class Stream(BasicStream):
msg.data = self._input_buf[self.HEADER_LEN:self.HEADER_LEN+msg_len]
self._input_buf = self._input_buf[self.HEADER_LEN+msg_len:]
self._router._async_route(msg)
self._router._async_route(msg, self)
return True
def on_transmit(self, broker):
......@@ -929,8 +929,16 @@ class Router(object):
except Exception:
LOG.exception('%r._invoke(%r): %r crashed', self, msg, fn)
def _async_route(self, msg):
IOLOG.debug('%r._async_route(%r)', self, msg)
def _async_route(self, msg, stream=None):
IOLOG.debug('%r._async_route(%r, %r)', self, msg, stream)
# Perform source verification.
if stream is not None:
expected_stream = self._stream_by_id.get(msg.src_id,
self._stream_by_id.get(mitogen.parent_id))
if stream != expected_stream:
LOG.error('%r: bad source: got %r from %r, should be from %r',
self, msg, stream, expected_stream)
if msg.dst_id == mitogen.context_id:
return self._invoke(msg)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment