Commit 7a476082 authored by Kirill Smelkov's avatar Kirill Smelkov

tracing/pyruntraced: New tool to run Python code with tracepoints activated (draft)

We will hopefully need it in the future to test external python code
similarly to when testing in-process go code with the help of
tracepoints. See module description for details (tracetest is not yet
done).

There is no yet Go client API for interacting with such-traced python
program. For the record: manually inspecting traced python process could
be done via e.g.

	socat EXEC:"./pyruntraced 3 neo.trace.py -- ../../../t/backup-play/N1-writer",fdin=3,fdout=3 TCP:localhost:8888

and telnetting to localhost:8888 from another xterm.
parent 78c29936
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018 Nexedi SA and Contributors.
# Kirill Smelkov <kirr@nexedi.com>
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.
"""pyruntraced ... - run python ... with tracepoints attached
This program runs Python code with tracepoints activated.
Whenever a tracepoint is reached, attached probe sends corresponding trace event
to driving process and awaits further commands from it. Commands could be:
- to execute arbitrary python code, thus allowing program state inspection, or
- to continue original code execution.
Overall this allows Go driver process to test several concurrent child
subprocesses via tracetest, the same way to how it is done in pure NEO/go.
Please see documentation for next Go packages for context:
- lab.nexedi.com/kirr/go123/tracing
- lab.nexedi.com/kirr/go123/tracing/tracetest
Implementation notes
--------------------
The channel for tracing communication in between driver and traced process is
passed as opened file descriptor by driver, and its number is specified via
<fd> commandline argument.
The protocol in between driver and traced process is as follows (from the point
of view of traced process):
> tid T eventname event # event happenned on a thread
< tid C # driver tells tracee to continue execution
< tid !... # driver asks tracee to evaluate ... in context where probe is attached
> tid E exception # tracee reports result as exception
> tid R result # tracee reports result
eventname is just regular string
event, exception, result are JSON-encoded objects.
Fork is not allowed in order not to confuse the driver(*).
Tracepoints and probes definitions are provided in separate files that are
passed as <trace>* command line arguments.
See trace_entry() for documentation on how to define probes.
(*) fork could be supported by making every new process to reattach to the
driver via new file descriptor - e.g. connecting via socket.
We don't currently do that for simplicity.
"""
from functools import wraps
import json
import thread, threading
# Tracer implements synchronous tracing in compatible way to tracetest on
# NEO/go side.
#
# See top-level module description for details.
class Tracer(object):
def __init__(self, ftracetx, ftracerx):
self.ftx = ftracetx # file object to send data to tracer
self.txlock = threading.Lock() # serializes writes to .ftx
self.frx = ftracerx # file object to receive data from tracer
self.rxtab = {} # {} tid -> RxQueue
self.rxlock = threading.Lock() # serializes access to .rxtab
# NOTE 2 separate ftx/frx file objects are used because python for
# stdio objects does locking and this way if it would be only 1 file
# object readline in _serve_recv() would block tx until readline wakes up.
self.trecv = threading.Thread(target=self._serve_recv)
self.trecv.daemon = True # XXX better to gracefully stop it?
self.trecv.start()
# _send sends 1 line to tracer from current thread.
def _send(self, line):
assert '\n' not in line
tid = thread.get_ident()
self.txlock.acquire()
try:
self.ftx.write(('%d ' % tid) + line + '\n')
self.ftx.flush()
finally:
self.txlock.release()
# _serve_recv receives lines from .frx and multiplexes them to
# destination threads RX queues.
#
# runs in a dedicated thread.
def _serve_recv(self):
while 1:
line = self.frx.readline()
# tid SP rest \n
tid, line = line.split(None, 1)
line = line.rstrip('\n')
tid = int(tid)
self.rxlock.acquire()
rxq = self.rxtab.get(tid)
if rxq is None:
self.rxtab[tid] = rxq = RxQueue()
rxq.lineq.append(line)
rxq.event.set()
self.rxlock.release()
# _recv receives 1 line from tracer for current thread.
def _recv(self):
tid = thread.get_ident()
while 1:
self.rxlock.acquire()
rxq = self.rxtab.get(tid)
if rxq is None:
rxq = self.rxtab[tid] = RxQueue()
if len(rxq.lineq) > 0:
# data was already queued for us
line = rxq.lineq.pop(0)
self.rxlock.release()
return line
# there is no data - we have to wait for it
rxq.event.clear()
self.rxlock.release()
rxq.event.wait()
# woken up -> retry to dequeue data
# trace1 handles 1 trace event.
#
# it sends the event to tracer and awaits commands from it to either
# inspect current state or continue execution.
#
# globals/locals are the mapping used in eval, if driver asks to inspect
# program state.
def trace1(self, eventname, event, globals, locals):
# send trace event
evstr = json.dumps(event)
assert '\n' not in evstr
self._send('T %s %s' % (eventname, evstr))
# wait for commands
# ! ... - eval python code
# C - continue
while 1:
line = self._recv()
if len(line) == 0 or line[0] not in "!C":
raise RuntimeError("trace1: got invalid line from driver: %r" % line)
if line[0] == 'C':
return # probe finishes - continue execution of original code
# eval python in context of probed function
try:
r = eval(line[1:], globals, locals)
except Exception as e:
reply = 'E %s' % json.dumps(str(e))
else:
try:
reply = 'R %s' % json.dumps(r)
except Exception as e:
# some types are not json-serializable
# XXX ok to play such games here?
# XXX too many objects are not JSON-serializable.
reply = 'E %s' % json.dumps(str(e))
self._send(reply)
# RxQueue represents receive queue for 1 thread
class RxQueue(object):
def __init__(self):
self.lineq = [] # [] of lines received
self.event = threading.Event() # sender signals consumer there is new data
# gtracer is the global tracer object
gtracer = None # Tracer
# trace_entry attaches probe to func entry.
#
# For example
#
# class MyClass:
# def meth(self, x, y):
# ...
#
# @trace_entry(MyClass.meth, 'MyEvent')
# def _(self, x, y):
# return {'x': x, 'y': y}
#
# will emit event 'MyEvent' with corresponding x/y dict on every call entry to
# MyClass.meth() .
def trace_entry(func, eventname):
klass = func.im_class
fname = func.im_func.func_name
def deco(f):
@wraps(func)
def probe(self, *args, **kw):
event = f(self, *args, **kw)
gtracer.trace1(eventname, event, func.func_globals, {'self': self, 'args': args, 'kw': kw})
return func(self, *args, **kw)
setattr(klass, fname, probe)
return deco
# ----------------------------------------
import os, sys, code, runpy
def usage(out):
print >>out, "Usage: pyruntraced <fd> <trace>* -- ..."
def die(msg):
print >>sys.stderr, msg
sys.exit(2)
# main mimics `python ...`,
# but with tracepoints already attached.
def main():
argv = sys.argv[1:]
if not argv:
usage(sys.stderr)
sys.exit(2)
# setup global tracer
global gtracer
fdtrace = argv[0]
fdtrace = int(fdtrace)
ttx = os.fdopen(fdtrace, 'w')
trx = os.fdopen(fdtrace, 'r')
gtracer = Tracer(ttx, trx)
argv = argv[1:]
# forbid fork (see top-level description about why)
def nofork(): raise RuntimeError('pyruntraced: fork forbidden')
os.fork = nofork
os.forkpty = nofork
# load trace probes
# NOTE list of trace files to load might be empty
tracev = []
dashdash = False
for i, arg in enumerate(argv):
if arg == '--':
dashdash = True
break
if arg.startswith('-'):
die("option '%s' goes before '--'" % arg)
tracev.append(arg)
if not dashdash:
die("no '--' found")
argv = argv[i+1:]
for t in tracev:
# load in current context so that e.g. @trace_entry is visible in the
# tracing code.
execfile(t, globals())
# now mimic `python ...` # XXX -> mimicpython(argv) ? pythonmain(argv) ?
# interactive console
if not argv:
code.interact()
return
# -c command
if argv[0] == '-c':
sys.argv = argv[0:1] + argv[2:] # python leaves '-c' as argv[0]
# exec with the same globals `python -c ...` does
g = {'__name__': '__main__',
'__doc__': None,
'__package__': None}
exec argv[1] in g
# -m module
elif argv[0] == '-m':
# search sys.path for module and run corresponding .py file as script
sys.argv = argv[1:]
runpy.run_module(sys.argv[0], init_globals={'__doc__': None}, run_name='__main__')
elif argv[0].startswith('-'):
die("invalid option '%s'" % argv[0])
# file
else:
sys.argv = argv
filepath = argv[0]
# exec with same globals `python file.py` does
# XXX use runpy.run_path() instead?
g = {'__name__': '__main__',
'__file__': filepath,
'__doc__': None,
'__package__': None}
execfile(filepath, g)
return
if __name__ == '__main__':
main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment