Commit a296e1e3 authored by Brenden Blanco's avatar Brenden Blanco Committed by yonghong-song

python3 fixes and testing support (#1916)

* python3: check ksymname calls with _assert_is_bytes

Fixes a bytes/string concatenation error when get/fix_syscall_fnname is
called from a python3 system.

* python3: use env python invocation in tools

In order to facilitate testing, but not necessarily as an example of
good practice, I am changing the invocation of the test tools to use
`/usr/bin/env python`, so that we can control which python (2 vs 3)
gets invoked for the test. On the buildbots, I plan to add an optional
`ln -s /usr/bin/python3 /usr/local/bin/python` on systems that have
python3-bcc package built. This way, we get more test coverage. Having a
cmake mechanism to enable both python2 and python3 testing could be a
further enhancement.

* tools/memleak: add an explicit stdout.flush to print loop

The stdout flush behavior seems to have changed in python3, breaking one
of the tests. I think it makes sense to flush stdout at the end of each
timed interval loop anyway, so adding that to the tool itself.

* tests: add b'' strings and fix dangling handles

Add b'' strings in a few places in the test tools, and fix one dangling
process handle in the memleak test tool runner.
parent b84714a4
......@@ -551,7 +551,7 @@ class BPF(object):
# would probably lead to error in later API calls.
def get_syscall_prefix(self):
for prefix in self._syscall_prefixes:
if self.ksymname("{}bpf".format(prefix)) != -1:
if self.ksymname(b"%sbpf" % prefix) != -1:
return prefix
return self._syscall_prefixes[0]
......@@ -559,12 +559,14 @@ class BPF(object):
# system's syscall prefix. For example, given "clone" the helper would
# return "sys_clone" or "__x64_sys_clone".
def get_syscall_fnname(self, name):
name = _assert_is_bytes(name)
return self.get_syscall_prefix() + name
# Given a Kernel function name that represents a syscall but already has a
# prefix included, transform it to current system's prefix. For example,
# if "sys_clone" provided, the helper may translate it to "__x64_sys_clone".
def fix_syscall_fnname(self, name):
name = _assert_is_bytes(name)
for prefix in self._syscall_prefixes:
if name.startswith(prefix):
return self.get_syscall_fnname(name[len(prefix):])
......
#!/usr/bin/python
#!/usr/bin/env python
# Copyright (c) PLUMgrid, Inc.
# Licensed under the Apache License, Version 2.0 (the "License")
......
......@@ -56,30 +56,33 @@ def setUpModule():
@skipUnless(kernel_version_ge(4, 6), "requires kernel >= 4.6")
class MemleakToolTests(TestCase):
def tearDown(self):
if self.p:
del(self.p)
def run_leaker(self, leak_kind):
# Starting memleak.py, which in turn launches the leaking application.
p = subprocess.Popen(cfg.cmd_format.format(leak_kind),
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
shell=True)
self.p = subprocess.Popen(cfg.cmd_format.format(leak_kind),
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
shell=True)
# Waiting for the first report.
while True:
p.poll()
if p.returncode is not None:
self.p.poll()
if self.p.returncode is not None:
break
line = p.stdout.readline()
if "with outstanding allocations" in line:
line = self.p.stdout.readline()
if b"with outstanding allocations" in line:
break
# At this point, memleak.py have already launched application and set
# probes. Sending command to the leaking application to make its
# allocations.
out = p.communicate(input="\n")[0]
out = self.p.communicate(input=b"\n")[0]
# If there were memory leaks, they are in the output. Filter the lines
# containing "byte" substring. Every interesting line is expected to
# start with "N bytes from"
x = [x for x in out.split('\n') if 'byte' in x]
x = [x for x in out.split(b'\n') if b'byte' in x]
self.assertTrue(len(x) >= 1,
msg="At least one line should have 'byte' substring.")
......
......@@ -9,7 +9,7 @@ from unittest import main, TestCase
class TestKprobeRgx(TestCase):
def setUp(self):
self.b = BPF(text="""
self.b = BPF(text=b"""
typedef struct { int idx; } Key;
typedef struct { u64 val; } Val;
BPF_HASH(stats, Key, Val, 3);
......@@ -22,23 +22,23 @@ class TestKprobeRgx(TestCase):
return 0;
}
""")
self.b.attach_kprobe(event_re="^" + self.b.get_syscall_prefix() + "bp.*",
fn_name="hello")
self.b.attach_kretprobe(event_re="^" + self.b.get_syscall_prefix() + "bp.*",
fn_name="goodbye")
self.b.attach_kprobe(event_re=b"^" + self.b.get_syscall_prefix() + b"bp.*",
fn_name=b"hello")
self.b.attach_kretprobe(event_re=b"^" + self.b.get_syscall_prefix() + b"bp.*",
fn_name=b"goodbye")
def test_send1(self):
k1 = self.b["stats"].Key(1)
k2 = self.b["stats"].Key(2)
self.assertTrue(self.b["stats"][k1].val >= 2)
self.assertTrue(self.b["stats"][k2].val == 1)
k1 = self.b[b"stats"].Key(1)
k2 = self.b[b"stats"].Key(2)
self.assertTrue(self.b[b"stats"][k1].val >= 2)
self.assertTrue(self.b[b"stats"][k2].val == 1)
class TestKprobeReplace(TestCase):
def setUp(self):
self.b = BPF(text="int empty(void *ctx) { return 0; }")
self.b = BPF(text=b"int empty(void *ctx) { return 0; }")
def test_periods(self):
self.b.attach_kprobe(event_re="^tcp_enter_cwr.*", fn_name="empty")
self.b.attach_kprobe(event_re=b"^tcp_enter_cwr.*", fn_name=b"empty")
if __name__ == "__main__":
main()
#!/usr/bin/python
#!/usr/bin/env python
#
# USAGE: test_usdt.py
#
......
#!/usr/bin/python
#!/usr/bin/env python
#
# USAGE: test_usdt2.py
#
......
#!/usr/bin/python
#!/usr/bin/env python
#
# USAGE: test_usdt3.py
#
......
#!/usr/bin/python
#!/usr/bin/env python
# Copyright (c) Catalysts GmbH
# Licensed under the Apache License, Version 2.0 (the "License")
......
......@@ -18,6 +18,7 @@ import resource
import argparse
import subprocess
import os
import sys
class Allocation(object):
def __init__(self, stack, size):
......@@ -517,6 +518,7 @@ while True:
print_outstanding_combined()
else:
print_outstanding()
sys.stdout.flush()
count_so_far += 1
if num_prints is not None and count_so_far >= num_prints:
exit()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment