Commit 4d01dc3b authored by David Wilson's avatar David Wilson

Initial pass at module preloading

* Don't implement the rules for when preloading occurs yet
* Don't attempt to streamily preload modules downstream while this
  context hasn't yet received the final module. There is quite
  significant latency buried in here, but for now it's a lot of work to
  fix.

This works well enough to handle at least the mitogen package, but it's
likely broken for anything bigger.
parent 21971874
...@@ -56,6 +56,7 @@ FORWARD_LOG = 102 ...@@ -56,6 +56,7 @@ FORWARD_LOG = 102
ADD_ROUTE = 103 ADD_ROUTE = 103
ALLOCATE_ID = 104 ALLOCATE_ID = 104
SHUTDOWN = 105 SHUTDOWN = 105
LOAD_MODULE = 106
CHUNK_SIZE = 16384 CHUNK_SIZE = 16384
...@@ -305,6 +306,8 @@ def _queue_interruptible_get(queue, timeout=None, block=True): ...@@ -305,6 +306,8 @@ def _queue_interruptible_get(queue, timeout=None, block=True):
if timeout is not None: if timeout is not None:
timeout += time.time() timeout += time.time()
LOG.info('timeout = %r, block = %r', timeout, block)
msg = None msg = None
while msg is None and (timeout is None or timeout > time.time()): while msg is None and (timeout is None or timeout > time.time()):
try: try:
...@@ -395,7 +398,7 @@ class Importer(object): ...@@ -395,7 +398,7 @@ class Importer(object):
:param context: Context to communicate via. :param context: Context to communicate via.
""" """
def __init__(self, context, core_src): def __init__(self, router, context, core_src):
self._context = context self._context = context
self._present = {'mitogen': [ self._present = {'mitogen': [
'mitogen.ansible', 'mitogen.ansible',
...@@ -407,13 +410,19 @@ class Importer(object): ...@@ -407,13 +410,19 @@ class Importer(object):
'mitogen.sudo', 'mitogen.sudo',
'mitogen.utils', 'mitogen.utils',
]} ]}
self._lock = threading.Lock()
# Presence of an entry in this map indicates in-flight GET_MODULE.
self._callbacks = {}
self.tls = threading.local() self.tls = threading.local()
router.add_handler(self._on_load_module, LOAD_MODULE)
self._cache = {} self._cache = {}
if core_src: if core_src:
self._cache['mitogen.core'] = ( self._cache['mitogen.core'] = (
'mitogen.core',
None, None,
'mitogen/core.py', 'mitogen/core.py',
zlib.compress(core_src), zlib.compress(core_src),
[],
) )
def __repr__(self): def __repr__(self):
...@@ -468,23 +477,53 @@ class Importer(object): ...@@ -468,23 +477,53 @@ class Importer(object):
# later. # later.
os.environ['PBR_VERSION'] = '0.0.0' os.environ['PBR_VERSION'] = '0.0.0'
def _on_load_module(self, msg):
tup = msg.unpickle()
fullname = tup[0]
LOG.debug('Importer._on_load_module(%r)', fullname)
self._lock.acquire()
try:
self._cache[fullname] = tup
callbacks = self._callbacks.pop(fullname, [])
finally:
self._lock.release()
for callback in callbacks:
callback()
def _request_module(self, fullname, callback):
self._lock.acquire()
try:
present = fullname in self._cache
if not present:
funcs = self._callbacks.get(fullname)
if funcs is not None:
LOG.debug('_request_module(%r): in flight', fullname)
funcs.append(callback)
else:
LOG.debug('_request_module(%r): new request', fullname)
self._callbacks[fullname] = [callback]
self._context.send(Message(data=fullname, handle=GET_MODULE))
finally:
self._lock.release()
if present:
callback()
def load_module(self, fullname): def load_module(self, fullname):
LOG.debug('Importer.load_module(%r)', fullname) LOG.debug('Importer.load_module(%r)', fullname)
self._load_module_hacks(fullname) self._load_module_hacks(fullname)
try: event = threading.Event()
ret = self._cache[fullname] self._request_module(fullname, event.set)
except KeyError: event.wait()
self._cache[fullname] = ret = (
self._context.send_await(
Message(data=fullname, handle=GET_MODULE)
)
)
ret = self._cache[fullname]
if ret is None: if ret is None:
raise ImportError('Master does not have %r' % (fullname,)) raise ImportError('Master does not have %r' % (fullname,))
pkg_present = ret[0] pkg_present = ret[1]
mod = sys.modules.setdefault(fullname, imp.new_module(fullname)) mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = self.get_filename(fullname) mod.__file__ = self.get_filename(fullname)
mod.__loader__ = self mod.__loader__ = self
...@@ -500,11 +539,11 @@ class Importer(object): ...@@ -500,11 +539,11 @@ class Importer(object):
def get_filename(self, fullname): def get_filename(self, fullname):
if fullname in self._cache: if fullname in self._cache:
return 'master:' + self._cache[fullname][1] return 'master:' + self._cache[fullname][2]
def get_source(self, fullname): def get_source(self, fullname):
if fullname in self._cache: if fullname in self._cache:
return zlib.decompress(self._cache[fullname][2]) return zlib.decompress(self._cache[fullname][3])
class LogHandler(logging.Handler): class LogHandler(logging.Handler):
...@@ -593,6 +632,7 @@ class Stream(BasicStream): ...@@ -593,6 +632,7 @@ class Stream(BasicStream):
self._router = router self._router = router
self.remote_id = remote_id self.remote_id = remote_id
self.name = 'default' self.name = 'default'
self.modules_sent = set()
self.construct(**kwargs) self.construct(**kwargs)
self._output_buf = collections.deque() self._output_buf = collections.deque()
...@@ -853,6 +893,10 @@ class Router(object): ...@@ -853,6 +893,10 @@ class Router(object):
def __repr__(self): def __repr__(self):
return 'Router(%r)' % (self.broker,) return 'Router(%r)' % (self.broker,)
def stream_by_id(self, dst_id):
return self._stream_by_id.get(dst_id,
self._stream_by_id.get(mitogen.parent_id))
def on_disconnect(self, stream, broker): def on_disconnect(self, stream, broker):
"""Invoked by Stream.on_disconnect().""" """Invoked by Stream.on_disconnect()."""
for context in self._context_by_id.itervalues(): for context in self._context_by_id.itervalues():
...@@ -1132,7 +1176,7 @@ class ExternalContext(object): ...@@ -1132,7 +1176,7 @@ class ExternalContext(object):
else: else:
core_src = None core_src = None
self.importer = Importer(self.parent, core_src) self.importer = Importer(self.router, self.parent, core_src)
sys.meta_path.append(self.importer) sys.meta_path.append(self.importer)
def _setup_package(self, context_id, parent_ids): def _setup_package(self, context_id, parent_ids):
......
...@@ -599,13 +599,14 @@ class ModuleFinder(object): ...@@ -599,13 +599,14 @@ class ModuleFinder(object):
stack.extend(set(fullnames).difference(found, stack, [fullname])) stack.extend(set(fullnames).difference(found, stack, [fullname]))
found.update(fullnames) found.update(fullnames)
return found return sorted(found)
class ModuleResponder(object): class ModuleResponder(object):
def __init__(self, router): def __init__(self, router):
self._router = router self._router = router
self._finder = ModuleFinder() self._finder = ModuleFinder()
self._cache = {} # fullname -> pickled
router.add_handler(self._on_get_module, mitogen.core.GET_MODULE) router.add_handler(self._on_get_module, mitogen.core.GET_MODULE)
def __repr__(self): def __repr__(self):
...@@ -622,40 +623,57 @@ class ModuleResponder(object): ...@@ -622,40 +623,57 @@ class ModuleResponder(object):
return src[:match.start()] return src[:match.start()]
return src return src
def _on_get_module(self, msg): def _build_tuple(self, fullname):
LOG.debug('%r.get_module(%r)', self, msg) if fullname in self._cache:
if msg == mitogen.core._DEAD: return self._cache[fullname]
return
fullname = msg.data path, source, is_pkg = self._finder.get_module_source(fullname)
try: if source is None:
path, source, is_pkg = self._finder.get_module_source(fullname) raise ImportError('could not find %r' % (fullname,))
if source is None:
raise ImportError('could not find %r' % (fullname,))
if is_pkg:
pkg_present = get_child_modules(path, fullname)
LOG.debug('get_child_modules(%r, %r) -> %r',
path, fullname, pkg_present)
else:
pkg_present = None
if fullname == '__main__': if is_pkg:
source = self.neutralize_main(source) pkg_present = get_child_modules(path, fullname)
compressed = zlib.compress(source) LOG.debug('_build_tuple(%r, %r) -> %r',
related = list(self._finder.find_related(fullname)) path, fullname, pkg_present)
else:
pkg_present = None
if fullname == '__main__':
source = self.neutralize_main(source)
compressed = zlib.compress(source)
related = list(self._finder.find_related(fullname))
# 0:fullname 1:pkg_present 2:path 3:compressed 4:related
return fullname, pkg_present, path, compressed, related
def _send_load_module(self, msg, fullname):
stream = self._router.stream_by_id(msg.src_id)
if fullname not in stream.sent_modules:
self._router.route( self._router.route(
mitogen.core.Message.pickled( mitogen.core.Message.pickled(
(pkg_present, path, compressed, related), self._build_tuple(fullname),
dst_id=msg.src_id, dst_id=msg.src_id,
handle=msg.reply_to, handle=mitogen.core.LOAD_MODULE,
) )
) )
stream.sent_modules.add(fullname)
def _on_get_module(self, msg):
LOG.debug('%r.get_module(%r)', self, msg)
if msg == mitogen.core._DEAD:
return
fullname = msg.data
try:
tup = self._build_tuple(fullname)
related = tup[4]
for name in related + [fullname]:
if name not in ('mitogen', 'mitogen.core'):
self._send_load_module(msg, name)
except Exception: except Exception:
LOG.debug('While importing %r', fullname, exc_info=True) LOG.debug('While importing %r', fullname, exc_info=True)
self._router.route( self._router.route(
mitogen.core.Message.pickled( mitogen.core.Message.pickled(
None, (fullname, None, None, None, []),
dst_id=msg.src_id, dst_id=msg.src_id,
handle=msg.reply_to, handle=msg.reply_to,
) )
...@@ -682,41 +700,28 @@ class ModuleForwarder(object): ...@@ -682,41 +700,28 @@ class ModuleForwarder(object):
return return
fullname = msg.data fullname = msg.data
cached = self.importer._cache.get(fullname) callback = lambda: self._on_cache_callback(msg, fullname)
if cached: self.importer._request_module(fullname, callback)
LOG.debug('%r._on_get_module(): using cached %r', self, fullname)
self.router.route(
mitogen.core.Message.pickled(
cached,
dst_id=msg.src_id,
handle=msg.reply_to,
)
)
else:
LOG.debug('%r._on_get_module(): requesting %r', self, fullname)
self.parent_context.send(
mitogen.core.Message(
data=msg.data,
handle=mitogen.core.GET_MODULE,
reply_to=self.router.add_handler(
lambda m: self._on_got_source(m, msg),
persist=False
)
)
)
def _on_got_source(self, msg, original_msg): def _send_one_module(self, msg, tup):
LOG.debug('%r._on_got_source(%r, %r)', self, msg, original_msg)
fullname = original_msg.data
self.importer._cache[fullname] = msg.unpickle()
self.router.route( self.router.route(
mitogen.core.Message( mitogen.core.Message.pickled(
data=msg.data, self.importer._cache[fullname],
dst_id=original_msg.src_id, dst_id=msg.src_id,
handle=original_msg.reply_to, handle=mitogen.core.LOAD_MODULE,
) )
) )
def _on_cache_callback(self, msg, fullname):
LOG.debug('%r._on_get_module(): sending %r', self, fullname)
tup = self.importer._cache[fullname]
if tup is not None:
for related in tup[4]:
rtup = self.importer._cache[fullname]
self._send_one_module(rtup)
self._send_one_module(tup)
class Stream(mitogen.core.Stream): class Stream(mitogen.core.Stream):
""" """
...@@ -731,6 +736,10 @@ class Stream(mitogen.core.Stream): ...@@ -731,6 +736,10 @@ class Stream(mitogen.core.Stream):
#: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log. #: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.
profiling = False profiling = False
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set()
def construct(self, remote_name=None, python_path=None, debug=False, def construct(self, remote_name=None, python_path=None, debug=False,
profiling=False, **kwargs): profiling=False, **kwargs):
"""Get the named context running on the local machine, creating it if """Get the named context running on the local machine, creating it if
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment