Commit 948cefe1 authored by Brenden Blanco's avatar Brenden Blanco

Add python support for attaching bpf programs to uprobes

This adds a similar set of functions to kprobes for userspace probes.
The calling convention is different, however, since the user must
provide a library/binary name and function symbol or address. Add two
simple test cases for both.

I don't see an easy way in this api to add auto-loading support, as in
kprobe functions that start with "kprobe__". Such niceties can come
later.
Signed-off-by: default avatarBrenden Blanco <bblanco@plumgrid.com>
parent 68e2d149
......@@ -20,7 +20,9 @@ import fcntl
import json
import multiprocessing
import os
import re
from subprocess import Popen, PIPE
import struct
import sys
basestring = (unicode if sys.version_info[0] < 3 else str)
......@@ -97,6 +99,14 @@ lib.bpf_attach_kprobe.argtypes = [ct.c_int, ct.c_char_p, ct.c_char_p, ct.c_int,
ct.c_int, ct.c_int, _CB_TYPE, ct.py_object]
lib.bpf_detach_kprobe.restype = ct.c_int
lib.bpf_detach_kprobe.argtypes = [ct.c_char_p]
lib.bpf_attach_uprobe.restype = ct.c_void_p
_CB_TYPE = ct.CFUNCTYPE(None, ct.py_object, ct.c_int,
ct.c_ulonglong, ct.POINTER(ct.c_ulonglong))
_RAW_CB_TYPE = ct.CFUNCTYPE(None, ct.py_object, ct.c_void_p, ct.c_int)
lib.bpf_attach_uprobe.argtypes = [ct.c_int, ct.c_char_p, ct.c_char_p, ct.c_int,
ct.c_int, ct.c_int, _CB_TYPE, ct.py_object]
lib.bpf_detach_uprobe.restype = ct.c_int
lib.bpf_detach_uprobe.argtypes = [ct.c_char_p]
lib.bpf_open_perf_buffer.restype = ct.c_void_p
lib.bpf_open_perf_buffer.argtypes = [_RAW_CB_TYPE, ct.py_object, ct.c_int, ct.c_int]
lib.perf_reader_poll.restype = ct.c_int
......@@ -107,6 +117,7 @@ lib.perf_reader_fd.restype = int
lib.perf_reader_fd.argtypes = [ct.c_void_p]
open_kprobes = {}
open_uprobes = {}
tracefile = None
TRACEFS = "/sys/kernel/debug/tracing"
KALLSYMS = "/proc/kallsyms"
......@@ -122,7 +133,13 @@ def cleanup_kprobes():
if isinstance(k, str):
desc = "-:kprobes/%s" % k
lib.bpf_detach_kprobe(desc.encode("ascii"))
for k, v in open_uprobes.items():
lib.perf_reader_free(v)
if isinstance(k, str):
desc = "-:uprobes/%s" % k
lib.bpf_detach_uprobe(desc.encode("ascii"))
open_kprobes.clear()
open_uprobes.clear()
if tracefile:
tracefile.close()
......@@ -137,6 +154,11 @@ class BPF(object):
PROG_ARRAY = 3
PERF_EVENT_ARRAY = 4
_probe_repl = re.compile("[^a-zA-Z0-9_]")
_libsearch_cache = {}
_lib_load_address_cache = {}
_lib_symbol_cache = {}
class Function(object):
def __init__(self, bpf, name, fd):
self.bpf = bpf
......@@ -670,6 +692,141 @@ class BPF(object):
raise Exception("Failed to detach BPF from kprobe")
del open_kprobes[ev_name]
@classmethod
def find_library(cls, name):
if name in cls._libsearch_cache:
return cls._libsearch_cache[name]
if struct.calcsize("l") == 4:
machine = os.uname()[4] + "-32"
else:
machine = os.uname()[4] + "-64"
mach_map = {
"x86_64-64": "libc6,x86-64",
"ppc64-64": "libc6,64bit",
"sparc64-64": "libc6,64bit",
"s390x-64": "libc6,64bit",
"ia64-64": "libc6,IA-64",
}
abi_type = mach_map.get(machine, "libc6")
expr = r"\s+lib%s\.[^\s]+\s+\(%s, [^)]+[^/]+([^\s]+)" % (name, abi_type)
with os.popen("/sbin/ldconfig -p 2>/dev/null") as f:
data = f.read()
res = re.search(expr, data)
if not res:
return None
path = res.group(1)
cls._libsearch_cache[name] = path
return path
@classmethod
def find_load_address(cls, path):
if path in cls._lib_load_address_cache:
return cls._lib_load_address_cache[path]
# "LOAD off 0x0000000000000000 vaddr 0x0000000000400000 paddr 0x..."
with os.popen("""/usr/bin/objdump -x %s | awk '$1 == "LOAD" && $3 ~ /^[0x]*$/ { print $5 }'""" % path) as f:
data = f.read().rstrip()
if not data:
return None
addr = int(data, 16)
cls._lib_load_address_cache[path] = addr
cls._lib_symbol_cache[path] = {}
return addr
@classmethod
def find_symbol(cls, path, sym):
# initialized in find_load_address
symbols = cls._lib_symbol_cache[path]
if sym in symbols:
return symbols[sym]
with os.popen("""objdump -tT %s | awk -v sym=%s '$NF == sym && $4 == ".text" { print $1; exit }'""" % (path, sym)) as f:
data = f.read().rstrip()
if not data:
return None
addr = int(data, 16)
symbols[sym] = addr
return addr
@classmethod
def _check_path_symbol(cls, name, sym, addr):
if name.startswith("/"):
path = name
else:
path = BPF.find_library(name)
if not path:
raise Exception("could not find library %s" % name)
path = os.path.realpath(path)
load_addr = BPF.find_load_address(path)
if not addr and sym:
addr = BPF.find_symbol(path, sym)
if not addr:
raise Exception("could not determine address of symbol %s" % sym)
return (path, load_addr+addr)
def attach_uprobe(self, name="", sym="", addr=None,
fn_name="", pid=-1, cpu=0, group_fd=-1):
(path, addr) = BPF._check_path_symbol(name, sym, addr)
fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "p_%s_0x%x" % (self._probe_repl.sub("_", path), addr)
desc = "p:uprobes/%s %s:0x%x" % (ev_name, path, addr)
res = lib.bpf_attach_uprobe(fn.fd, ev_name.encode("ascii"),
desc.encode("ascii"), pid, cpu, group_fd,
self._reader_cb_impl, ct.cast(id(self), ct.py_object))
res = ct.cast(res, ct.c_void_p)
if res == None:
raise Exception("Failed to attach BPF to uprobe")
open_uprobes[ev_name] = res
return self
@classmethod
def detach_uprobe(cls, name="", sym="", addr=None):
(path, addr) = BPF._check_path_symbol(name, sym, addr)
ev_name = "p_%s_0x%x" % (cls._probe_repl.sub("_", path), addr)
if ev_name not in open_uprobes:
raise Exception("Uprobe %s is not attached" % event)
lib.perf_reader_free(open_uprobes[ev_name])
desc = "-:uprobes/%s" % ev_name
res = lib.bpf_detach_uprobe(desc.encode("ascii"))
if res < 0:
raise Exception("Failed to detach BPF from uprobe")
del open_uprobes[ev_name]
def attach_uretprobe(self, name="", sym="", addr=None,
fn_name="", pid=-1, cpu=0, group_fd=-1):
(path, addr) = BPF._check_path_symbol(name, sym, addr)
fn = self.load_func(fn_name, BPF.KPROBE)
ev_name = "r_%s_0x%x" % (self._probe_repl.sub("_", path), addr)
desc = "r:uprobes/%s %s:0x%x" % (ev_name, path, addr)
res = lib.bpf_attach_uprobe(fn.fd, ev_name.encode("ascii"),
desc.encode("ascii"), pid, cpu, group_fd,
self._reader_cb_impl, ct.cast(id(self), ct.py_object))
res = ct.cast(res, ct.c_void_p)
if res == None:
raise Exception("Failed to attach BPF to uprobe")
open_uprobes[ev_name] = res
return self
@classmethod
def detach_uretprobe(cls, name="", sym="", addr=None):
(path, addr) = BPF._check_path_symbol(name, sym, addr)
ev_name = "r_%s_0x%x" % (cls._probe_repl.sub("_", path), addr)
if ev_name not in open_uprobes:
raise Exception("Kretprobe %s is not attached" % event)
lib.perf_reader_free(open_uprobes[ev_name])
desc = "-:uprobes/%s" % ev_name
res = lib.bpf_detach_uprobe(desc.encode("ascii"))
if res < 0:
raise Exception("Failed to detach BPF from uprobe")
del open_uprobes[ev_name]
def _trace_autoload(self):
# Cater to one-liner case where attach_kprobe is omitted and C function
# name matches that of the kprobe.
......
......@@ -46,3 +46,5 @@ add_test(NAME py_test_callchain WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${TEST_WRAPPER} py_callchain sudo ${CMAKE_CURRENT_SOURCE_DIR}/test_callchain.py)
add_test(NAME py_array WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${TEST_WRAPPER} py_array sudo ${CMAKE_CURRENT_SOURCE_DIR}/test_array.py)
add_test(NAME py_uprobes WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
COMMAND ${TEST_WRAPPER} py_uprobes sudo ${CMAKE_CURRENT_SOURCE_DIR}/test_uprobes.py)
#!/usr/bin/env python
# Copyright (c) PLUMgrid, Inc.
# Licensed under the Apache License, Version 2.0 (the "License")
import bcc
import ctypes
import os
import re
import struct
import time
import unittest
## code from ctypes impl
#if struct.calcsize("l") == 4:
# machine = os.uname()[4] + "-32"
#else:
# machine = os.uname()[4] + "-64"
#mach_map = {
# "x86_64-64": "libc6,x86-64",
# "ppc64-64": "libc6,64bit",
# "sparc64-64": "libc6,64bit",
# "s390x-64": "libc6,64bit",
# "ia64-64": "libc6,IA-64",
#}
#abi_type = mach_map.get(machine, "libc6")
#
#def find_library_fullpath(name):
# expr = r"\s+lib%s\.[^\s]+\s+\(%s, [^)]+[^/]+([^\s]+)" % (name, abi_type)
# with os.popen("/sbin/ldconfig -p 2>/dev/null") as f:
# data = f.read()
# res = re.search(expr, data)
# if not res:
# return None
# return res.group(1)
class TestUprobes(unittest.TestCase):
def test_simple_library(self):
text = """
#include <uapi/linux/ptrace.h>
BPF_TABLE("array", int, u64, stats, 1);
static void incr(int idx) {
u64 *ptr = stats.lookup(&idx);
if (ptr)
++(*ptr);
}
int count(struct pt_regs *ctx) {
u32 pid = bpf_get_current_pid_tgid();
if (pid == PID)
incr(0);
return 0;
}"""
text = text.replace("PID", "%d" % os.getpid())
b = bcc.BPF(text=text)
b.attach_uprobe(name="c", sym="malloc_stats", fn_name="count")
b.attach_uretprobe(name="c", sym="malloc_stats", fn_name="count")
libc = ctypes.CDLL("libc.so.6")
libc.malloc_stats.restype = None
libc.malloc_stats.argtypes = []
libc.malloc_stats()
self.assertEqual(b["stats"][ctypes.c_int(0)].value, 2)
b.detach_uprobe(name="c", sym="malloc_stats")
def test_simple_binary(self):
text = """
#include <uapi/linux/ptrace.h>
BPF_TABLE("array", int, u64, stats, 1);
static void incr(int idx) {
u64 *ptr = stats.lookup(&idx);
if (ptr)
++(*ptr);
}
int count(struct pt_regs *ctx) {
u32 pid = bpf_get_current_pid_tgid();
incr(0);
return 0;
}"""
text = text.replace("PID", "%d" % os.getpid())
b = bcc.BPF(text=text)
b.attach_uprobe(name="/usr/bin/python2", sym="main", fn_name="count")
b.attach_uretprobe(name="/usr/bin/python2", sym="main", fn_name="count")
with os.popen("/usr/bin/python2 -V") as f:
pass
self.assertGreater(b["stats"][ctypes.c_int(0)].value, 0)
b.detach_uprobe(name="/usr/bin/python2", sym="main")
if __name__ == "__main__":
unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment