Commit 4b764de6 authored by Teng Qin's avatar Teng Qin Committed by Sasha Goldshtein

Add callback for perf buffer lost sample events (#1092)

When events were lost, we used to print a message to stderr, which is less useful.
Now, we allow attaching an optional callback that would be invoked when events
are lost, allowing the user's program to perform some additional processing.
parent b255a0a2
......@@ -897,9 +897,9 @@ These are equivalent.
### 2. open_perf_buffer()
Syntax: ```table.open_perf_buffers(callback, page_cnt=N)```
Syntax: ```table.open_perf_buffers(callback, page_cnt=N, lost_cb=None)```
This operates on a table as defined in BPF as BPF_PERF_OUTPUT(), and associates the callback Python function ```callback``` to be called when data is available in the perf ring buffer. This is part of the recommended mechanism for transferring per-event data from kernel to user space. The size of the perf ring buffer can be specified via the ```page_cnt``` parameter, which must be a power of two number of pages and defaults to 8.
This operates on a table as defined in BPF as BPF_PERF_OUTPUT(), and associates the callback Python function ```callback``` to be called when data is available in the perf ring buffer. This is part of the recommended mechanism for transferring per-event data from kernel to user space. The size of the perf ring buffer can be specified via the ```page_cnt``` parameter, which must be a power of two number of pages and defaults to 8. If the callback is not processing data fast enough, some submitted data may be lost. ```lost_cb``` will be called to log / monitor the lost count. If ```lost_cb``` is the default ```None``` value, it will just print a line of message to ```stderr```.
Example:
......
......@@ -393,7 +393,9 @@ StatusTuple BPF::detach_perf_event(uint32_t ev_type, uint32_t ev_config) {
}
StatusTuple BPF::open_perf_buffer(const std::string& name,
perf_reader_raw_cb cb, void* cb_cookie,
perf_reader_raw_cb cb,
perf_reader_lost_cb lost_cb,
void* cb_cookie,
int page_cnt) {
if (perf_buffers_.find(name) == perf_buffers_.end()) {
TableStorage::iterator it;
......@@ -406,7 +408,7 @@ StatusTuple BPF::open_perf_buffer(const std::string& name,
if ((page_cnt & (page_cnt - 1)) != 0)
return StatusTuple(-1, "open_perf_buffer page_cnt must be a power of two");
auto table = perf_buffers_[name];
TRY2(table->open_all_cpu(cb, cb_cookie, page_cnt));
TRY2(table->open_all_cpu(cb, lost_cb, cb_cookie, page_cnt));
return StatusTuple(0);
}
......
......@@ -105,7 +105,9 @@ public:
return BPFStackTable({});
}
StatusTuple open_perf_buffer(const std::string& name, perf_reader_raw_cb cb,
StatusTuple open_perf_buffer(const std::string& name,
perf_reader_raw_cb cb,
perf_reader_lost_cb lost_cb = nullptr,
void* cb_cookie = nullptr,
int page_cnt = DEFAULT_PERF_BUFFER_PAGE_CNT);
StatusTuple close_perf_buffer(const std::string& name);
......
......@@ -68,13 +68,14 @@ std::vector<std::string> BPFStackTable::get_stack_symbol(int stack_id,
return res;
}
StatusTuple BPFPerfBuffer::open_on_cpu(perf_reader_raw_cb cb, int cpu,
void* cb_cookie, int page_cnt) {
StatusTuple BPFPerfBuffer::open_on_cpu(perf_reader_raw_cb cb,
perf_reader_lost_cb lost_cb,
int cpu, void* cb_cookie, int page_cnt) {
if (cpu_readers_.find(cpu) != cpu_readers_.end())
return StatusTuple(-1, "Perf buffer already open on CPU %d", cpu);
auto reader = static_cast<perf_reader*>(
bpf_open_perf_buffer(cb, cb_cookie, -1, cpu, page_cnt));
bpf_open_perf_buffer(cb, lost_cb, cb_cookie, -1, cpu, page_cnt));
if (reader == nullptr)
return StatusTuple(-1, "Unable to construct perf reader");
......@@ -98,8 +99,9 @@ StatusTuple BPFPerfBuffer::open_on_cpu(perf_reader_raw_cb cb, int cpu,
return StatusTuple(0);
}
StatusTuple BPFPerfBuffer::open_all_cpu(perf_reader_raw_cb cb, void* cb_cookie,
int page_cnt) {
StatusTuple BPFPerfBuffer::open_all_cpu(perf_reader_raw_cb cb,
perf_reader_lost_cb lost_cb,
void* cb_cookie, int page_cnt) {
if (cpu_readers_.size() != 0 || epfd_ != -1)
return StatusTuple(-1, "Previously opened perf buffer not cleaned");
......@@ -108,7 +110,7 @@ StatusTuple BPFPerfBuffer::open_all_cpu(perf_reader_raw_cb cb, void* cb_cookie,
epfd_ = epoll_create1(EPOLL_CLOEXEC);
for (int i : cpus) {
auto res = open_on_cpu(cb, i, cb_cookie, page_cnt);
auto res = open_on_cpu(cb, lost_cb, i, cb_cookie, page_cnt);
if (res.code() != 0) {
TRY2(close_all_cpu());
return res;
......
......@@ -125,14 +125,14 @@ class BPFPerfBuffer : protected BPFTableBase<int, int> {
: BPFTableBase<int, int>(desc), epfd_(-1) {}
~BPFPerfBuffer();
StatusTuple open_all_cpu(perf_reader_raw_cb cb, void* cb_cookie,
int page_cnt);
StatusTuple open_all_cpu(perf_reader_raw_cb cb, perf_reader_lost_cb lost_cb,
void* cb_cookie, int page_cnt);
StatusTuple close_all_cpu();
void poll(int timeout);
private:
StatusTuple open_on_cpu(perf_reader_raw_cb cb, int cpu, void* cb_cookie,
int page_cnt);
StatusTuple open_on_cpu(perf_reader_raw_cb cb, perf_reader_lost_cb lost_cb,
int cpu, void* cb_cookie, int page_cnt);
StatusTuple close_on_cpu(int cpu);
std::map<int, perf_reader*> cpu_readers_;
......
......@@ -356,7 +356,7 @@ void * bpf_attach_kprobe(int progfd, enum bpf_probe_attach_type attach_type, con
int n;
snprintf(new_name, sizeof(new_name), "%s_bcc_%d", ev_name, getpid());
reader = perf_reader_new(cb, NULL, cb_cookie, probe_perf_reader_page_cnt);
reader = perf_reader_new(cb, NULL, NULL, cb_cookie, probe_perf_reader_page_cnt);
if (!reader)
goto error;
......@@ -417,7 +417,7 @@ void * bpf_attach_uprobe(int progfd, enum bpf_probe_attach_type attach_type, con
int n;
snprintf(new_name, sizeof(new_name), "%s_bcc_%d", ev_name, getpid());
reader = perf_reader_new(cb, NULL, cb_cookie, probe_perf_reader_page_cnt);
reader = perf_reader_new(cb, NULL, NULL, cb_cookie, probe_perf_reader_page_cnt);
if (!reader)
goto error;
......@@ -503,7 +503,7 @@ void * bpf_attach_tracepoint(int progfd, const char *tp_category,
char buf[256];
struct perf_reader *reader = NULL;
reader = perf_reader_new(cb, NULL, cb_cookie, probe_perf_reader_page_cnt);
reader = perf_reader_new(cb, NULL, NULL, cb_cookie, probe_perf_reader_page_cnt);
if (!reader)
goto error;
......@@ -525,13 +525,14 @@ int bpf_detach_tracepoint(const char *tp_category, const char *tp_name) {
return 0;
}
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid,
int cpu, int page_cnt) {
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb,
perf_reader_lost_cb lost_cb, void *cb_cookie,
int pid, int cpu, int page_cnt) {
int pfd;
struct perf_event_attr attr = {};
struct perf_reader *reader = NULL;
reader = perf_reader_new(NULL, raw_cb, cb_cookie, page_cnt);
reader = perf_reader_new(NULL, raw_cb, lost_cb, cb_cookie, page_cnt);
if (!reader)
goto error;
......
......@@ -48,6 +48,7 @@ int bpf_open_raw_sock(const char *name);
typedef void (*perf_reader_cb)(void *cb_cookie, int pid, uint64_t callchain_num,
void *callchain);
typedef void (*perf_reader_raw_cb)(void *cb_cookie, void *raw, int raw_size);
typedef void (*perf_reader_lost_cb)(uint64_t lost);
void * bpf_attach_kprobe(int progfd, enum bpf_probe_attach_type attach_type,
const char *ev_name, const char *fn_name,
......@@ -68,8 +69,9 @@ void * bpf_attach_tracepoint(int progfd, const char *tp_category,
int group_fd, perf_reader_cb cb, void *cb_cookie);
int bpf_detach_tracepoint(const char *tp_category, const char *tp_name);
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid,
int cpu, int page_cnt);
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb,
perf_reader_lost_cb lost_cb, void *cb_cookie,
int pid, int cpu, int page_cnt);
/* attached a prog expressed by progfd to the device specified in dev_name */
int bpf_attach_xdp(const char *dev_name, int progfd);
......
......@@ -29,6 +29,7 @@
struct perf_reader {
perf_reader_cb cb;
perf_reader_raw_cb raw_cb;
perf_reader_lost_cb lost_cb;
void *cb_cookie; // to be returned in the cb
void *buf; // for keeping segmented data
size_t buf_size;
......@@ -41,12 +42,15 @@ struct perf_reader {
};
struct perf_reader * perf_reader_new(perf_reader_cb cb,
perf_reader_raw_cb raw_cb, void *cb_cookie, int page_cnt) {
perf_reader_raw_cb raw_cb,
perf_reader_lost_cb lost_cb,
void *cb_cookie, int page_cnt) {
struct perf_reader *reader = calloc(1, sizeof(struct perf_reader));
if (!reader)
return NULL;
reader->cb = cb;
reader->raw_cb = raw_cb;
reader->lost_cb = lost_cb;
reader->cb_cookie = cb_cookie;
reader->fd = -1;
reader->page_size = getpagesize();
......@@ -235,7 +239,12 @@ void perf_reader_event_read(struct perf_reader *reader) {
}
if (e->type == PERF_RECORD_LOST) {
fprintf(stderr, "Lost %lu samples\n", *(uint64_t *)(ptr + sizeof(*e)));
uint64_t lost = *(uint64_t *)(ptr + sizeof(*e));
if (reader->lost_cb) {
reader->lost_cb(lost);
} else {
fprintf(stderr, "Possibly lost %llu samples\n", lost);
}
} else if (e->type == PERF_RECORD_SAMPLE) {
if (reader->type == PERF_TYPE_TRACEPOINT)
parse_tracepoint(reader, ptr, e->size);
......
......@@ -26,7 +26,9 @@ extern "C" {
struct perf_reader;
struct perf_reader * perf_reader_new(perf_reader_cb cb,
perf_reader_raw_cb raw_cb, void *cb_cookie, int page_cnt);
perf_reader_raw_cb raw_cb,
perf_reader_lost_cb lost_cb,
void *cb_cookie, int page_cnt);
void perf_reader_free(void *ptr);
int perf_reader_mmap(struct perf_reader *reader, unsigned type, unsigned long sample_type);
void perf_reader_event_read(struct perf_reader *reader);
......
......@@ -39,6 +39,7 @@ int bpf_open_raw_sock(const char *name);
typedef void (*perf_reader_cb)(void *cb_cookie, int pid, uint64_t callchain_num, void *callchain);
typedef void (*perf_reader_raw_cb)(void *cb_cookie, void *raw, int raw_size);
typedef void (*perf_reader_lost_cb)(uint64_t lost);
void * bpf_attach_kprobe(int progfd, int attach_type, const char *ev_name,
const char *fn_name,
......@@ -54,7 +55,7 @@ void * bpf_attach_uprobe(int progfd, int attach_type, const char *ev_name,
int bpf_detach_uprobe(const char *ev_name);
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, void *cb_cookie, int pid, int cpu, int page_cnt);
void * bpf_open_perf_buffer(perf_reader_raw_cb raw_cb, perf_reader_lost_cb lost_cb, void *cb_cookie, int pid, int cpu, int page_cnt);
]]
ffi.cdef[[
......@@ -98,7 +99,6 @@ int bpf_table_leaf_sscanf(void *program, size_t id, const char *buf, void *leaf)
ffi.cdef[[
struct perf_reader;
struct perf_reader * perf_reader_new(perf_reader_cb cb, perf_reader_raw_cb raw_cb, void *cb_cookie);
void perf_reader_free(void *ptr);
int perf_reader_mmap(struct perf_reader *reader, unsigned type, unsigned long sample_type);
int perf_reader_poll(int num_readers, struct perf_reader **readers, int timeout);
......
......@@ -243,14 +243,22 @@ local function _perf_id(id, cpu)
return string.format("bcc:perf_event_array:%d:%d", tonumber(id), cpu or 0)
end
function PerfEventArray:_open_perf_buffer(cpu, callback, ctype, page_cnt)
function PerfEventArray:_open_perf_buffer(cpu, callback, ctype, page_cnt, lost_cb)
local _cb = ffi.cast("perf_reader_raw_cb",
function (cookie, data, size)
callback(cpu, ctype(data)[0])
end)
local _lost_cb = nil
if lost_cb then
_lost_cb = ffi.cast("perf_reader_lost_cb",
function (lost)
lost_cb(lost)
end)
end
-- default to 8 pages per buffer
local reader = libbcc.bpf_open_perf_buffer(_cb, nil, -1, cpu, page_cnt or 8)
local reader = libbcc.bpf_open_perf_buffer(_cb, nil, _lost_cb, -1, cpu, page_cnt or 8)
assert(reader, "failed to open perf buffer")
local fd = libbcc.perf_reader_fd(reader)
......@@ -259,11 +267,11 @@ function PerfEventArray:_open_perf_buffer(cpu, callback, ctype, page_cnt)
self._callbacks[cpu] = _cb
end
function PerfEventArray:open_perf_buffer(callback, data_type, data_params, page_cnt)
function PerfEventArray:open_perf_buffer(callback, data_type, data_params, page_cnt, lost_cb)
assert(data_type, "a data type is needed for callback conversion")
local ctype = ffi.typeof(data_type.."*", unpack(data_params or {}))
for i = 0, Posix.cpu_count() - 1 do
self:_open_perf_buffer(i, callback, ctype, page_cnt)
self:_open_perf_buffer(i, callback, ctype, page_cnt, lost_cb)
end
end
......
......@@ -87,6 +87,7 @@ lib.bpf_attach_kprobe.restype = ct.c_void_p
_CB_TYPE = ct.CFUNCTYPE(None, ct.py_object, ct.c_int,
ct.c_ulonglong, ct.POINTER(ct.c_ulonglong))
_RAW_CB_TYPE = ct.CFUNCTYPE(None, ct.py_object, ct.c_void_p, ct.c_int)
_LOST_CB_TYPE = ct.CFUNCTYPE(None, ct.c_ulonglong)
lib.bpf_attach_kprobe.argtypes = [ct.c_int, ct.c_int, ct.c_char_p, ct.c_char_p, ct.c_int,
ct.c_int, ct.c_int, _CB_TYPE, ct.py_object]
lib.bpf_detach_kprobe.restype = ct.c_int
......@@ -102,7 +103,7 @@ lib.bpf_attach_tracepoint.argtypes = [ct.c_int, ct.c_char_p, ct.c_char_p, ct.c_i
lib.bpf_detach_tracepoint.restype = ct.c_int
lib.bpf_detach_tracepoint.argtypes = [ct.c_char_p, ct.c_char_p]
lib.bpf_open_perf_buffer.restype = ct.c_void_p
lib.bpf_open_perf_buffer.argtypes = [_RAW_CB_TYPE, ct.py_object, ct.c_int, ct.c_int, ct.c_int]
lib.bpf_open_perf_buffer.argtypes = [_RAW_CB_TYPE, _LOST_CB_TYPE, ct.py_object, ct.c_int, ct.c_int, ct.c_int]
lib.bpf_open_perf_event.restype = ct.c_int
lib.bpf_open_perf_event.argtypes = [ct.c_uint, ct.c_ulonglong, ct.c_int, ct.c_int]
lib.perf_reader_poll.restype = ct.c_int
......
......@@ -18,7 +18,7 @@ from functools import reduce
import multiprocessing
import os
from .libbcc import lib, _RAW_CB_TYPE
from .libbcc import lib, _RAW_CB_TYPE, _LOST_CB_TYPE
from .perf import Perf
from .utils import get_online_cpus
from .utils import get_possible_cpus
......@@ -507,7 +507,7 @@ class PerfEventArray(ArrayBase):
super(PerfEventArray, self).__delitem__(key)
self.close_perf_buffer(key)
def open_perf_buffer(self, callback, page_cnt=8):
def open_perf_buffer(self, callback, page_cnt=8, lost_cb=None):
"""open_perf_buffers(callback)
Opens a set of per-cpu ring buffer to receive custom perf event
......@@ -521,11 +521,12 @@ class PerfEventArray(ArrayBase):
raise Exception("Perf buffer page_cnt must be a power of two")
for i in get_online_cpus():
self._open_perf_buffer(i, callback, page_cnt)
self._open_perf_buffer(i, callback, page_cnt, lost_cb)
def _open_perf_buffer(self, cpu, callback, page_cnt):
def _open_perf_buffer(self, cpu, callback, page_cnt, lost_cb):
fn = _RAW_CB_TYPE(lambda _, data, size: callback(cpu, data, size))
reader = lib.bpf_open_perf_buffer(fn, None, -1, cpu, page_cnt)
lost_fn = _LOST_CB_TYPE(lambda lost: lost_cb(lost) if lost_cb else -1)
reader = lib.bpf_open_perf_buffer(fn, lost_fn, None, -1, cpu, page_cnt)
if not reader:
raise Exception("Could not open perf buffer")
fd = lib.perf_reader_fd(reader)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment