Commit b158259c authored by David Wilson's avatar David Wilson

Split up parent and master modules

Knocks 4kb off network footprint for a proxy connection.
parent 5f2fa2cd
......@@ -30,7 +30,6 @@ import inspect
import logging
import os
import shutil
import signal
import socket
import subprocess
import sys
......@@ -39,6 +38,7 @@ import threading
import mitogen.core
import mitogen.master
import mitogen.parent
from mitogen.core import LOG, IOLOG
......@@ -123,7 +123,7 @@ class Process(object):
mitogen.core.listen(self.pump, 'receive', self._on_pump_receive)
if proc:
pmon = mitogen.master.ProcessMonitor.instance()
pmon = mitogen.parent.ProcessMonitor.instance()
pmon.add(proc.pid, self._on_proc_exit)
def __repr__(self):
......@@ -313,7 +313,7 @@ def _fakessh_main(dest_context_id, econtext):
@mitogen.core.takes_router
def run(dest, router, args, deadline=None, econtext=None):
if econtext is not None:
mitogen.master.upgrade_router(econtext)
mitogen.parent.upgrade_router(econtext)
context_id = router.allocate_id()
fakessh = mitogen.master.Context(router, context_id)
......
......@@ -26,24 +26,15 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import dis
import errno
import getpass
import imp
import inspect
import itertools
import logging
import os
import pkgutil
import pty
import re
import select
import signal
import socket
import sys
import termios
import textwrap
import threading
import time
import types
import zlib
......@@ -58,188 +49,18 @@ if not hasattr(pkgutil, 'find_loader'):
from mitogen.compat import pkgutil
import mitogen.core
import mitogen.parent
from mitogen.core import LOG
LOG = logging.getLogger('mitogen')
IOLOG = logging.getLogger('mitogen.io')
RLOG = logging.getLogger('mitogen.ctx')
DOCSTRING_RE = re.compile(r'""".+?"""', re.M | re.S)
COMMENT_RE = re.compile(r'^[ ]*#[^\n]*$', re.M)
IOLOG_RE = re.compile(r'^[ ]*IOLOG.debug\(.+?\)$', re.M)
def minimize_source(source):
subber = lambda match: '""' + ('\n' * match.group(0).count('\n'))
source = DOCSTRING_RE.sub(subber, source)
source = COMMENT_RE.sub('', source)
return source.replace(' ', '\t')
def get_child_modules(path, fullname):
it = pkgutil.iter_modules([os.path.dirname(path)])
return ['%s.%s' % (fullname, name) for _, name, _ in it]
class Argv(object):
def __init__(self, argv):
self.argv = argv
def escape(self, x):
s = '"'
for c in x:
if c in '\\$"`':
s += '\\'
s += c
s += '"'
return s
def __str__(self):
return ' '.join(map(self.escape, self.argv))
def create_child(*args):
parentfp, childfp = socket.socketpair()
pid = os.fork()
if not pid:
mitogen.core.set_block(childfp.fileno())
os.dup2(childfp.fileno(), 0)
os.dup2(childfp.fileno(), 1)
childfp.close()
parentfp.close()
os.execvp(args[0], args)
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno())
parentfp.close()
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
pid, fd, os.getpid(), Argv(args))
return pid, fd
def flags(names):
"""Return the result of ORing a set of (space separated) :py:mod:`termios`
module constants together."""
return sum(getattr(termios, name) for name in names.split())
def cfmakeraw(tflags):
"""Given a list returned by :py:func:`termios.tcgetattr`, return a list
that has been modified in the same manner as the `cfmakeraw()` C library
function."""
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = tflags
iflag &= ~flags('IGNBRK BRKINT PARMRK ISTRIP INLCR IGNCR ICRNL IXON')
oflag &= ~flags('OPOST IXOFF')
lflag &= ~flags('ECHO ECHOE ECHONL ICANON ISIG IEXTEN')
cflag &= ~flags('CSIZE PARENB')
cflag |= flags('CS8')
# TODO: one or more of the above bit twiddles sets or omits a necessary
# flag. Forcing these fields to zero, as shown below, gets us what we want
# on Linux/OS X, but it is possibly broken on some other OS.
iflag = 0
oflag = 0
lflag = 0
return [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
def disable_echo(fd):
old = termios.tcgetattr(fd)
new = cfmakeraw(old)
flags = (
termios.TCSAFLUSH |
getattr(termios, 'TCSASOFT', 0)
)
termios.tcsetattr(fd, flags, new)
def close_nonstandard_fds():
for fd in xrange(3, 1024):
try:
os.close(fd)
except OSError:
pass
def tty_create_child(*args):
master_fd, slave_fd = os.openpty()
disable_echo(master_fd)
disable_echo(slave_fd)
pid = os.fork()
if not pid:
mitogen.core.set_block(slave_fd)
os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
close_nonstandard_fds()
os.setsid()
os.close(os.open(os.ttyname(1), os.O_RDWR))
os.execvp(args[0], args)
os.close(slave_fd)
LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
pid, master_fd, os.getpid(), Argv(args))
return pid, master_fd
def write_all(fd, s, deadline=None):
timeout = None
written = 0
while written < len(s):
if deadline is not None:
timeout = max(0, deadline - time.time())
if timeout == 0:
raise mitogen.core.TimeoutError('write timed out')
_, wfds, _ = select.select([], [fd], [], timeout)
if not wfds:
continue
n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written))
if disconnected:
raise mitogen.core.StreamError('EOF on stream during write')
written += n
def iter_read(fd, deadline=None):
bits = []
timeout = None
while True:
if deadline is not None:
timeout = max(0, deadline - time.time())
if timeout == 0:
break
rfds, _, _ = select.select([fd], [], [], timeout)
if not rfds:
continue
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
IOLOG.debug('iter_read(%r) -> %r', fd, s)
if disconnected or not s:
raise mitogen.core.StreamError(
'EOF on stream; last 300 bytes received: %r' %
(''.join(bits)[-300:],)
)
bits.append(s)
yield s
raise mitogen.core.TimeoutError('read timed out')
def discard_until(fd, s, deadline):
for buf in iter_read(fd, deadline):
if buf.endswith(s):
return
def scan_code_imports(co, LOAD_CONST=dis.opname.index('LOAD_CONST'),
IMPORT_NAME=dis.opname.index('IMPORT_NAME')):
"""Given a code object `co`, scan its bytecode yielding any
......@@ -713,170 +534,6 @@ class ModuleResponder(object):
)
class ModuleForwarder(object):
"""
Respond to GET_MODULE requests in a slave by forwarding the request to our
parent context, or satisfying the request from our local Importer cache.
"""
def __init__(self, router, parent_context, importer):
self.router = router
self.parent_context = parent_context
self.importer = importer
router.add_handler(self._on_get_module, mitogen.core.GET_MODULE)
def __repr__(self):
return 'ModuleForwarder(%r)' % (self.router,)
def _on_get_module(self, msg):
LOG.debug('%r._on_get_module(%r)', self, msg)
if msg == mitogen.core._DEAD:
return
fullname = msg.data
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)
def _send_one_module(self, msg, tup):
self.router.route(
mitogen.core.Message.pickled(
tup,
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _on_cache_callback(self, msg, fullname):
LOG.debug('%r._on_get_module(): sending %r', self, fullname)
tup = self.importer._cache[fullname]
if tup is not None:
for related in tup[4]:
rtup = self.importer._cache[fullname]
self._send_one_module(msg, rtup)
self._send_one_module(msg, tup)
class Stream(mitogen.core.Stream):
"""
Base for streams capable of starting new slaves.
"""
#: The path to the remote Python interpreter.
python_path = 'python2.7'
#: True to cause context to write verbose /tmp/mitogen.<pid>.log.
debug = False
#: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.
profiling = False
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
self.sent_packages = set(['mitogen'])
def construct(self, remote_name=None, python_path=None, debug=False,
profiling=False, **kwargs):
"""Get the named context running on the local machine, creating it if
it does not exist."""
super(Stream, self).construct(**kwargs)
if python_path:
self.python_path = python_path
if remote_name is None:
remote_name = '%s@%s:%d'
remote_name %= (getpass.getuser(), socket.gethostname(), os.getpid())
self.remote_name = remote_name
self.debug = debug
self.profiling = profiling
def on_shutdown(self, broker):
"""Request the slave gracefully shut itself down."""
LOG.debug('%r closing CALL_FUNCTION channel', self)
self.send(
mitogen.core.Message(
src_id=mitogen.context_id,
dst_id=self.remote_id,
handle=mitogen.core.SHUTDOWN,
)
)
# base64'd and passed to 'python -c'. It forks, dups 0->100, creates a
# pipe, then execs a new interpreter with a custom argv. 'CONTEXT_NAME' is
# replaced with the context name. Optimized for size.
@staticmethod
def _first_stage():
import os,sys,zlib
R,W=os.pipe()
r,w=os.pipe()
if os.fork():
os.dup2(0,100)
os.dup2(R,0)
os.dup2(r,101)
for f in R,r,W,w:os.close(f)
os.environ['ARGV0']=e=sys.executable
os.execv(e,['mitogen:CONTEXT_NAME'])
os.write(1,'EC0\n')
C=zlib.decompress(sys.stdin.read(input()))
os.fdopen(W,'w',0).write(C)
os.fdopen(w,'w',0).write('%s\n'%len(C)+C)
os.write(1,'EC1\n')
sys.exit(0)
def get_boot_command(self):
source = inspect.getsource(self._first_stage)
source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:]))
source = source.replace(' ', '\t')
source = source.replace('CONTEXT_NAME', self.remote_name)
encoded = source.encode('zlib').encode('base64').replace('\n', '')
# We can't use bytes.decode() in 3.x since it was restricted to always
# return unicode, so codecs.decode() is used instead. In 3.x
# codecs.decode() requires a bytes object. Since we must be compatible
# with 2.4 (no bytes literal), an extra .encode() either returns the
# same str (2.x) or an equivalent bytes (3.x).
return [
self.python_path, '-c',
'from codecs import decode as _;'
'exec(_(_("%s".encode(),"base64"),"zlib"))' % (encoded,)
]
def get_preamble(self):
parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id)
source = inspect.getsource(mitogen.core)
source += '\nExternalContext().main%r\n' % ((
parent_ids, # parent_ids
self.remote_id, # context_id
self.debug,
self.profiling,
LOG.level or logging.getLogger().level or logging.INFO,
),)
compressed = zlib.compress(minimize_source(source))
return str(len(compressed)) + '\n' + compressed
create_child = staticmethod(create_child)
def connect(self):
LOG.debug('%r.connect()', self)
pid, fd = self.create_child(*self.get_boot_command())
self.name = 'local.%s' % (pid,)
self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.receive_side.fd)
self._connect_bootstrap()
def _ec0_received(self):
LOG.debug('%r._ec0_received()', self)
write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0)
def _connect_bootstrap(self):
discard_until(self.receive_side.fd, 'EC0\n', time.time() + 10.0)
self._ec0_received()
class Broker(mitogen.core.Broker):
shutdown_timeout = 5.0
......@@ -919,98 +576,10 @@ class Context(mitogen.core.Context):
return self.call_async(fn, *args, **kwargs).get_data()
def _local_method():
return Stream
def _ssh_method():
import mitogen.ssh
return mitogen.ssh.Stream
def _sudo_method():
import mitogen.sudo
return mitogen.sudo.Stream
METHOD_NAMES = {
'local': _local_method,
'ssh': _ssh_method,
'sudo': _sudo_method,
}
def upgrade_router(econtext):
if not isinstance(econtext.router, Router): # TODO
econtext.router.__class__ = Router # TODO
econtext.router.id_allocator = ChildIdAllocator(econtext.router)
LOG.debug('_proxy_connect(): constructing ModuleForwarder')
ModuleForwarder(econtext.router, econtext.parent, econtext.importer)
@mitogen.core.takes_econtext
def _proxy_connect(name, context_id, method_name, kwargs, econtext):
upgrade_router(econtext)
context = econtext.router._connect(
context_id,
METHOD_NAMES[method_name](),
name=name,
**kwargs
)
return context.name
class IdAllocator(object):
def __init__(self, router):
self.router = router
self.next_id = 1
self.lock = threading.Lock()
router.add_handler(self.on_allocate_id, mitogen.core.ALLOCATE_ID)
def __repr__(self):
return 'IdAllocator(%r)' % (self.router,)
def allocate(self):
self.lock.acquire()
try:
id_ = self.next_id
self.next_id += 1
return id_
finally:
self.lock.release()
def on_allocate_id(self, msg):
id_ = self.allocate()
requestee = self.router.context_by_id(msg.src_id)
allocated = self.router.context_by_id(id_, msg.src_id)
LOG.debug('%r: allocating %r to %r', self, allocated, requestee)
self.router.route(
mitogen.core.Message.pickled(
id_,
dst_id=msg.src_id,
handle=msg.reply_to,
)
)
LOG.debug('%r: publishing route to %r via %r', self,
allocated, requestee)
self.router.propagate_route(allocated, requestee)
class ChildIdAllocator(object):
def __init__(self, router):
self.router = router
def allocate(self):
master = Context(self.router, 0)
return master.send_await(
mitogen.core.Message(dst_id=0, handle=mitogen.core.ALLOCATE_ID)
)
class Router(mitogen.core.Router):
class Router(mitogen.parent.Router):
context_class = Context
broker_class = Broker
debug = False
profiling = False
def __init__(self, broker=None):
......@@ -1032,18 +601,6 @@ class Router(mitogen.core.Router):
self.broker.shutdown()
self.broker.join()
def allocate_id(self):
return self.id_allocator.allocate()
def context_by_id(self, context_id, via_id=None):
context = self._context_by_id.get(context_id)
if context is None:
context = Context(self, context_id)
if via_id is not None:
context.via = self.context_by_id(via_id)
self._context_by_id[context_id] = context
return context
def local(self, **kwargs):
return self.connect('local', **kwargs)
......@@ -1053,27 +610,6 @@ class Router(mitogen.core.Router):
def ssh(self, **kwargs):
return self.connect('ssh', **kwargs)
def _connect(self, context_id, klass, name=None, **kwargs):
context = Context(self, context_id)
stream = klass(self, context.context_id, **kwargs)
if name is not None:
stream.name = name
stream.connect()
context.name = stream.name
self.register(context, stream)
return context
def connect(self, method_name, name=None, **kwargs):
klass = METHOD_NAMES[method_name]()
kwargs.setdefault('debug', self.debug)
kwargs.setdefault('profiling', self.profiling)
via = kwargs.pop('via', None)
if via is not None:
return self.proxy_connect(via, method_name, name=name, **kwargs)
context_id = self.allocate_id()
return self._connect(context_id, klass, name=name, **kwargs)
def propagate_route(self, target, via):
self.add_route(target.context_id, via.context_id)
child = via
......@@ -1090,43 +626,40 @@ class Router(mitogen.core.Router):
child = parent
parent = parent.via
def proxy_connect(self, via_context, method_name, name=None, **kwargs):
context_id = self.allocate_id()
# Must be added prior to _proxy_connect() to avoid a race.
self.add_route(context_id, via_context.context_id)
name = via_context.call(_proxy_connect,
name, context_id, method_name, kwargs
)
name = '%s.%s' % (via_context.name, name)
context = Context(self, context_id, name=name)
context.via = via_context
self._context_by_id[context.context_id] = context
class IdAllocator(object):
def __init__(self, router):
self.router = router
self.next_id = 1
self.lock = threading.Lock()
router.add_handler(self.on_allocate_id, mitogen.core.ALLOCATE_ID)
self.propagate_route(context, via_context)
return context
def __repr__(self):
return 'IdAllocator(%r)' % (self.router,)
def allocate(self):
self.lock.acquire()
try:
id_ = self.next_id
self.next_id += 1
return id_
finally:
self.lock.release()
class ProcessMonitor(object):
def __init__(self):
# pid -> callback()
self.callback_by_pid = {}
signal.signal(signal.SIGCHLD, self._on_sigchld)
def _on_sigchld(self, _signum, _frame):
for pid, callback in self.callback_by_pid.items():
pid, status = os.waitpid(pid, os.WNOHANG)
if pid:
callback(status)
del self.callback_by_pid[pid]
def add(self, pid, callback):
self.callback_by_pid[pid] = callback
_instance = None
@classmethod
def instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
def on_allocate_id(self, msg):
id_ = self.allocate()
requestee = self.router.context_by_id(msg.src_id)
allocated = self.router.context_by_id(id_, msg.src_id)
LOG.debug('%r: allocating %r to %r', self, allocated, requestee)
self.router.route(
mitogen.core.Message.pickled(
id_,
dst_id=msg.src_id,
handle=msg.reply_to,
)
)
LOG.debug('%r: publishing route to %r via %r', self,
allocated, requestee)
self.router.propagate_route(allocated, requestee)
# Copyright 2017, David Wilson
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Neither the name of the copyright holder nor the names of its contributors
# may be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import re
import logging
import os
import termios
import signal
import select
import getpass
import time
import socket
import inspect
import textwrap
import zlib
import mitogen.core
from mitogen.core import LOG, IOLOG
DOCSTRING_RE = re.compile(r'""".+?"""', re.M | re.S)
COMMENT_RE = re.compile(r'^[ ]*#[^\n]*$', re.M)
class Argv(object):
def __init__(self, argv):
self.argv = argv
def escape(self, x):
s = '"'
for c in x:
if c in '\\$"`':
s += '\\'
s += c
s += '"'
return s
def __str__(self):
return ' '.join(map(self.escape, self.argv))
def minimize_source(source):
subber = lambda match: '""' + ('\n' * match.group(0).count('\n'))
source = DOCSTRING_RE.sub(subber, source)
source = COMMENT_RE.sub('', source)
return source.replace(' ', '\t')
def flags(names):
"""Return the result of ORing a set of (space separated) :py:mod:`termios`
module constants together."""
return sum(getattr(termios, name) for name in names.split())
def cfmakeraw(tflags):
"""Given a list returned by :py:func:`termios.tcgetattr`, return a list
that has been modified in the same manner as the `cfmakeraw()` C library
function."""
iflag, oflag, cflag, lflag, ispeed, ospeed, cc = tflags
iflag &= ~flags('IGNBRK BRKINT PARMRK ISTRIP INLCR IGNCR ICRNL IXON')
oflag &= ~flags('OPOST IXOFF')
lflag &= ~flags('ECHO ECHOE ECHONL ICANON ISIG IEXTEN')
cflag &= ~flags('CSIZE PARENB')
cflag |= flags('CS8')
# TODO: one or more of the above bit twiddles sets or omits a necessary
# flag. Forcing these fields to zero, as shown below, gets us what we want
# on Linux/OS X, but it is possibly broken on some other OS.
iflag = 0
oflag = 0
lflag = 0
return [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
def disable_echo(fd):
old = termios.tcgetattr(fd)
new = cfmakeraw(old)
flags = (
termios.TCSAFLUSH |
getattr(termios, 'TCSASOFT', 0)
)
termios.tcsetattr(fd, flags, new)
def close_nonstandard_fds():
for fd in xrange(3, 1024):
try:
os.close(fd)
except OSError:
pass
def create_child(*args):
parentfp, childfp = socket.socketpair()
pid = os.fork()
if not pid:
mitogen.core.set_block(childfp.fileno())
os.dup2(childfp.fileno(), 0)
os.dup2(childfp.fileno(), 1)
childfp.close()
parentfp.close()
os.execvp(args[0], args)
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno())
parentfp.close()
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
pid, fd, os.getpid(), Argv(args))
return pid, fd
def tty_create_child(*args):
master_fd, slave_fd = os.openpty()
disable_echo(master_fd)
disable_echo(slave_fd)
pid = os.fork()
if not pid:
mitogen.core.set_block(slave_fd)
os.dup2(slave_fd, 0)
os.dup2(slave_fd, 1)
os.dup2(slave_fd, 2)
close_nonstandard_fds()
os.setsid()
os.close(os.open(os.ttyname(1), os.O_RDWR))
os.execvp(args[0], args)
os.close(slave_fd)
LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
pid, master_fd, os.getpid(), Argv(args))
return pid, master_fd
def write_all(fd, s, deadline=None):
timeout = None
written = 0
while written < len(s):
if deadline is not None:
timeout = max(0, deadline - time.time())
if timeout == 0:
raise mitogen.core.TimeoutError('write timed out')
_, wfds, _ = select.select([], [fd], [], timeout)
if not wfds:
continue
n, disconnected = mitogen.core.io_op(os.write, fd, buffer(s, written))
if disconnected:
raise mitogen.core.StreamError('EOF on stream during write')
written += n
def iter_read(fd, deadline=None):
bits = []
timeout = None
while True:
if deadline is not None:
timeout = max(0, deadline - time.time())
if timeout == 0:
break
rfds, _, _ = select.select([fd], [], [], timeout)
if not rfds:
continue
s, disconnected = mitogen.core.io_op(os.read, fd, 4096)
IOLOG.debug('iter_read(%r) -> %r', fd, s)
if disconnected or not s:
raise mitogen.core.StreamError(
'EOF on stream; last 300 bytes received: %r' %
(''.join(bits)[-300:],)
)
bits.append(s)
yield s
raise mitogen.core.TimeoutError('read timed out')
def discard_until(fd, s, deadline):
for buf in iter_read(fd, deadline):
if buf.endswith(s):
return
def upgrade_router(econtext):
if not isinstance(econtext.router, Router): # TODO
econtext.router.__class__ = Router # TODO
econtext.router.id_allocator = ChildIdAllocator(econtext.router)
LOG.debug('_proxy_connect(): constructing ModuleForwarder')
ModuleForwarder(econtext.router, econtext.parent, econtext.importer)
def _local_method():
return mitogen.parent.Stream
def _ssh_method():
import mitogen.ssh
return mitogen.ssh.Stream
def _sudo_method():
import mitogen.sudo
return mitogen.sudo.Stream
METHOD_NAMES = {
'local': _local_method,
'ssh': _ssh_method,
'sudo': _sudo_method,
}
@mitogen.core.takes_econtext
def _proxy_connect(name, context_id, method_name, kwargs, econtext):
mitogen.parent.upgrade_router(econtext)
context = econtext.router._connect(
context_id,
METHOD_NAMES[method_name](),
name=name,
**kwargs
)
return context.name
class Stream(mitogen.core.Stream):
"""
Base for streams capable of starting new slaves.
"""
#: The path to the remote Python interpreter.
python_path = 'python2.7'
#: True to cause context to write verbose /tmp/mitogen.<pid>.log.
debug = False
#: True to cause context to write /tmp/mitogen.stats.<pid>.<thread>.log.
profiling = False
def __init__(self, *args, **kwargs):
super(Stream, self).__init__(*args, **kwargs)
self.sent_modules = set(['mitogen', 'mitogen.core'])
self.sent_packages = set(['mitogen'])
def construct(self, remote_name=None, python_path=None, debug=False,
profiling=False, **kwargs):
"""Get the named context running on the local machine, creating it if
it does not exist."""
super(Stream, self).construct(**kwargs)
if python_path:
self.python_path = python_path
if remote_name is None:
remote_name = '%s@%s:%d'
remote_name %= (getpass.getuser(), socket.gethostname(), os.getpid())
self.remote_name = remote_name
self.debug = debug
self.profiling = profiling
def on_shutdown(self, broker):
"""Request the slave gracefully shut itself down."""
LOG.debug('%r closing CALL_FUNCTION channel', self)
self.send(
mitogen.core.Message(
src_id=mitogen.context_id,
dst_id=self.remote_id,
handle=mitogen.core.SHUTDOWN,
)
)
# base64'd and passed to 'python -c'. It forks, dups 0->100, creates a
# pipe, then execs a new interpreter with a custom argv. 'CONTEXT_NAME' is
# replaced with the context name. Optimized for size.
@staticmethod
def _first_stage():
import os,sys,zlib
R,W=os.pipe()
r,w=os.pipe()
if os.fork():
os.dup2(0,100)
os.dup2(R,0)
os.dup2(r,101)
for f in R,r,W,w:os.close(f)
os.environ['ARGV0']=e=sys.executable
os.execv(e,['mitogen:CONTEXT_NAME'])
os.write(1,'EC0\n')
C=zlib.decompress(sys.stdin.read(input()))
os.fdopen(W,'w',0).write(C)
os.fdopen(w,'w',0).write('%s\n'%len(C)+C)
os.write(1,'EC1\n')
sys.exit(0)
def get_boot_command(self):
source = inspect.getsource(self._first_stage)
source = textwrap.dedent('\n'.join(source.strip().split('\n')[2:]))
source = source.replace(' ', '\t')
source = source.replace('CONTEXT_NAME', self.remote_name)
encoded = source.encode('zlib').encode('base64').replace('\n', '')
# We can't use bytes.decode() in 3.x since it was restricted to always
# return unicode, so codecs.decode() is used instead. In 3.x
# codecs.decode() requires a bytes object. Since we must be compatible
# with 2.4 (no bytes literal), an extra .encode() either returns the
# same str (2.x) or an equivalent bytes (3.x).
return [
self.python_path, '-c',
'from codecs import decode as _;'
'exec(_(_("%s".encode(),"base64"),"zlib"))' % (encoded,)
]
def get_preamble(self):
parent_ids = mitogen.parent_ids[:]
parent_ids.insert(0, mitogen.context_id)
source = inspect.getsource(mitogen.core)
source += '\nExternalContext().main%r\n' % ((
parent_ids, # parent_ids
self.remote_id, # context_id
self.debug,
self.profiling,
LOG.level or logging.getLogger().level or logging.INFO,
),)
compressed = zlib.compress(minimize_source(source))
return str(len(compressed)) + '\n' + compressed
create_child = staticmethod(create_child)
def connect(self):
LOG.debug('%r.connect()', self)
pid, fd = self.create_child(*self.get_boot_command())
self.name = 'local.%s' % (pid,)
self.receive_side = mitogen.core.Side(self, fd)
self.transmit_side = mitogen.core.Side(self, os.dup(fd))
LOG.debug('%r.connect(): child process stdin/stdout=%r',
self, self.receive_side.fd)
self._connect_bootstrap()
def _ec0_received(self):
LOG.debug('%r._ec0_received()', self)
write_all(self.transmit_side.fd, self.get_preamble())
discard_until(self.receive_side.fd, 'EC1\n', time.time() + 10.0)
def _connect_bootstrap(self):
discard_until(self.receive_side.fd, 'EC0\n', time.time() + 10.0)
self._ec0_received()
class ChildIdAllocator(object):
def __init__(self, router):
self.router = router
def allocate(self):
master = mitogen.core.Context(self.router, 0)
return master.send_await(
mitogen.core.Message(dst_id=0, handle=mitogen.core.ALLOCATE_ID)
)
class Router(mitogen.core.Router):
context_class = mitogen.core.Context
def allocate_id(self):
return self.id_allocator.allocate()
def context_by_id(self, context_id, via_id=None):
context = self._context_by_id.get(context_id)
if context is None:
context = self.context_class(self, context_id)
if via_id is not None:
context.via = self.context_by_id(via_id)
self._context_by_id[context_id] = context
return context
def _connect(self, context_id, klass, name=None, **kwargs):
context = self.context_class(self, context_id)
stream = klass(self, context.context_id, **kwargs)
if name is not None:
stream.name = name
stream.connect()
context.name = stream.name
self.register(context, stream)
return context
def connect(self, method_name, name=None, **kwargs):
klass = METHOD_NAMES[method_name]()
kwargs.setdefault('debug', self.debug)
kwargs.setdefault('profiling', self.profiling)
via = kwargs.pop('via', None)
if via is not None:
return self.proxy_connect(via, method_name, name=name, **kwargs)
context_id = self.allocate_id()
return self._connect(context_id, klass, name=name, **kwargs)
def proxy_connect(self, via_context, method_name, name=None, **kwargs):
context_id = self.allocate_id()
# Must be added prior to _proxy_connect() to avoid a race.
self.add_route(context_id, via_context.context_id)
name = via_context.call(_proxy_connect,
name, context_id, method_name, kwargs
)
name = '%s.%s' % (via_context.name, name)
context = self.context_class(self, context_id, name=name)
context.via = via_context
self._context_by_id[context.context_id] = context
self.propagate_route(context, via_context)
return context
class ProcessMonitor(object):
def __init__(self):
# pid -> callback()
self.callback_by_pid = {}
signal.signal(signal.SIGCHLD, self._on_sigchld)
def _on_sigchld(self, _signum, _frame):
for pid, callback in self.callback_by_pid.items():
pid, status = os.waitpid(pid, os.WNOHANG)
if pid:
callback(status)
del self.callback_by_pid[pid]
def add(self, pid, callback):
self.callback_by_pid[pid] = callback
_instance = None
@classmethod
def instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
class ModuleForwarder(object):
"""
Respond to GET_MODULE requests in a slave by forwarding the request to our
parent context, or satisfying the request from our local Importer cache.
"""
def __init__(self, router, parent_context, importer):
self.router = router
self.parent_context = parent_context
self.importer = importer
router.add_handler(self._on_get_module, mitogen.core.GET_MODULE)
def __repr__(self):
return 'ModuleForwarder(%r)' % (self.router,)
def _on_get_module(self, msg):
LOG.debug('%r._on_get_module(%r)', self, msg)
if msg == mitogen.core._DEAD:
return
fullname = msg.data
callback = lambda: self._on_cache_callback(msg, fullname)
self.importer._request_module(fullname, callback)
def _send_one_module(self, msg, tup):
self.router.route(
mitogen.core.Message.pickled(
tup,
dst_id=msg.src_id,
handle=mitogen.core.LOAD_MODULE,
)
)
def _on_cache_callback(self, msg, fullname):
LOG.debug('%r._on_get_module(): sending %r', self, fullname)
tup = self.importer._cache[fullname]
if tup is not None:
for related in tup[4]:
rtup = self.importer._cache[fullname]
self._send_one_module(msg, rtup)
self._send_one_module(msg, tup)
......@@ -33,7 +33,7 @@ import commands
import logging
import time
import mitogen.master
import mitogen.parent
LOG = logging.getLogger('mitogen')
......@@ -46,8 +46,8 @@ class PasswordError(mitogen.core.Error):
pass
class Stream(mitogen.master.Stream):
create_child = staticmethod(mitogen.master.tty_create_child)
class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.tty_create_child)
python_path = 'python2.7'
#: The path to the SSH binary.
......@@ -103,8 +103,8 @@ class Stream(mitogen.master.Stream):
def _connect_bootstrap(self):
password_sent = False
for buf in mitogen.master.iter_read(self.receive_side.fd,
time.time() + 10.0):
for buf in mitogen.parent.iter_read(self.receive_side.fd,
time.time() + 10.0):
LOG.debug('%r: received %r', self, buf)
if buf.endswith('EC0\n'):
self._ec0_received()
......
......@@ -30,7 +30,7 @@ import os
import time
import mitogen.core
import mitogen.master
import mitogen.parent
LOG = logging.getLogger(__name__)
......@@ -41,8 +41,8 @@ class PasswordError(mitogen.core.Error):
pass
class Stream(mitogen.master.Stream):
create_child = staticmethod(mitogen.master.tty_create_child)
class Stream(mitogen.parent.Stream):
create_child = staticmethod(mitogen.parent.tty_create_child)
sudo_path = 'sudo'
password = None
......@@ -93,8 +93,8 @@ class Stream(mitogen.master.Stream):
def _connect_bootstrap(self):
password_sent = False
for buf in mitogen.master.iter_read(self.receive_side.fd,
time.time() + 10.0):
for buf in mitogen.parent.iter_read(self.receive_side.fd,
time.time() + 10.0):
LOG.debug('%r: received %r', self, buf)
if buf.endswith('EC0\n'):
self._ec0_received()
......
......@@ -8,6 +8,7 @@ import zlib
import mitogen.fakessh
import mitogen.master
import mitogen.parent
import mitogen.ssh
import mitogen.sudo
......@@ -26,9 +27,10 @@ print 'Preamble size: %s (%.2fKiB)' % (
for mod in (
mitogen.master,
mitogen.parent,
mitogen.ssh,
mitogen.sudo,
mitogen.fakessh,
):
sz = len(zlib.compress(mitogen.master.minimize_source(inspect.getsource(mod))))
sz = len(zlib.compress(mitogen.parent.minimize_source(inspect.getsource(mod))))
print '%s size: %s (%.2fKiB)' % (mod.__name__, sz, sz / 1024.0)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment