Commit e0fa6523 authored by Andrii Nakryiko's avatar Andrii Nakryiko

Merge branch 'add libbpf getters for individual ringbuffers'

Martin Kelly says:

====================
This patch series adds a new ring__ API to libbpf exposing getters for
accessing the individual ringbuffers inside a struct ring_buffer. This is
useful for polling individually, getting available data, or similar use
cases. The API looks like this, and was roughly proposed by Andrii Nakryiko
in another thread:

Getting a ring struct:
struct ring *ring_buffer__ring(struct ring_buffer *rb, unsigned int idx);

Using the ring struct:
unsigned long ring__consumer_pos(const struct ring *r);
unsigned long ring__producer_pos(const struct ring *r);
size_t ring__avail_data_size(const struct ring *r);
size_t ring__size(const struct ring *r);
int ring__map_fd(const struct ring *r);
int ring__consume(struct ring *r);

Changes in v2:
- Addressed all feedback from Andrii Nakryiko
====================
Signed-off-by: default avatarAndrii Nakryiko <andrii@kernel.org>
parents 831916fb cb3d7dd2
...@@ -1229,6 +1229,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook, ...@@ -1229,6 +1229,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
/* Ring buffer APIs */ /* Ring buffer APIs */
struct ring_buffer; struct ring_buffer;
struct ring;
struct user_ring_buffer; struct user_ring_buffer;
typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size); typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
...@@ -1249,6 +1250,78 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms); ...@@ -1249,6 +1250,78 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb); LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb); LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
/**
* @brief **ring_buffer__ring()** returns the ringbuffer object inside a given
* ringbuffer manager representing a single BPF_MAP_TYPE_RINGBUF map instance.
*
* @param rb A ringbuffer manager object.
* @param idx An index into the ringbuffers contained within the ringbuffer
* manager object. The index is 0-based and corresponds to the order in which
* ring_buffer__add was called.
* @return A ringbuffer object on success; NULL and errno set if the index is
* invalid.
*/
LIBBPF_API struct ring *ring_buffer__ring(struct ring_buffer *rb,
unsigned int idx);
/**
* @brief **ring__consumer_pos()** returns the current consumer position in the
* given ringbuffer.
*
* @param r A ringbuffer object.
* @return The current consumer position.
*/
LIBBPF_API unsigned long ring__consumer_pos(const struct ring *r);
/**
* @brief **ring__producer_pos()** returns the current producer position in the
* given ringbuffer.
*
* @param r A ringbuffer object.
* @return The current producer position.
*/
LIBBPF_API unsigned long ring__producer_pos(const struct ring *r);
/**
* @brief **ring__avail_data_size()** returns the number of bytes in the
* ringbuffer not yet consumed. This has no locking associated with it, so it
* can be inaccurate if operations are ongoing while this is called. However, it
* should still show the correct trend over the long-term.
*
* @param r A ringbuffer object.
* @return The number of bytes not yet consumed.
*/
LIBBPF_API size_t ring__avail_data_size(const struct ring *r);
/**
* @brief **ring__size()** returns the total size of the ringbuffer's map data
* area (excluding special producer/consumer pages). Effectively this gives the
* amount of usable bytes of data inside the ringbuffer.
*
* @param r A ringbuffer object.
* @return The total size of the ringbuffer map data area.
*/
LIBBPF_API size_t ring__size(const struct ring *r);
/**
* @brief **ring__map_fd()** returns the file descriptor underlying the given
* ringbuffer.
*
* @param r A ringbuffer object.
* @return The underlying ringbuffer file descriptor
*/
LIBBPF_API int ring__map_fd(const struct ring *r);
/**
* @brief **ring__consume()** consumes available ringbuffer data without event
* polling.
*
* @param r A ringbuffer object.
* @return The number of records consumed (or INT_MAX, whichever is less), or
* a negative number if any of the callbacks return an error.
*/
LIBBPF_API int ring__consume(struct ring *r);
struct user_ring_buffer_opts { struct user_ring_buffer_opts {
size_t sz; /* size of this struct, for forward/backward compatibility */ size_t sz; /* size of this struct, for forward/backward compatibility */
}; };
......
...@@ -400,4 +400,11 @@ LIBBPF_1.3.0 { ...@@ -400,4 +400,11 @@ LIBBPF_1.3.0 {
bpf_program__attach_netfilter; bpf_program__attach_netfilter;
bpf_program__attach_tcx; bpf_program__attach_tcx;
bpf_program__attach_uprobe_multi; bpf_program__attach_uprobe_multi;
ring__avail_data_size;
ring__consume;
ring__consumer_pos;
ring__map_fd;
ring__producer_pos;
ring__size;
ring_buffer__ring;
} LIBBPF_1.2.0; } LIBBPF_1.2.0;
...@@ -34,7 +34,7 @@ struct ring { ...@@ -34,7 +34,7 @@ struct ring {
struct ring_buffer { struct ring_buffer {
struct epoll_event *events; struct epoll_event *events;
struct ring *rings; struct ring **rings;
size_t page_size; size_t page_size;
int epoll_fd; int epoll_fd;
int ring_cnt; int ring_cnt;
...@@ -57,7 +57,7 @@ struct ringbuf_hdr { ...@@ -57,7 +57,7 @@ struct ringbuf_hdr {
__u32 pad; __u32 pad;
}; };
static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) static void ringbuf_free_ring(struct ring_buffer *rb, struct ring *r)
{ {
if (r->consumer_pos) { if (r->consumer_pos) {
munmap(r->consumer_pos, rb->page_size); munmap(r->consumer_pos, rb->page_size);
...@@ -67,6 +67,8 @@ static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r) ...@@ -67,6 +67,8 @@ static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1)); munmap(r->producer_pos, rb->page_size + 2 * (r->mask + 1));
r->producer_pos = NULL; r->producer_pos = NULL;
} }
free(r);
} }
/* Add extra RINGBUF maps to this ring buffer manager */ /* Add extra RINGBUF maps to this ring buffer manager */
...@@ -107,8 +109,10 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, ...@@ -107,8 +109,10 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
return libbpf_err(-ENOMEM); return libbpf_err(-ENOMEM);
rb->events = tmp; rb->events = tmp;
r = &rb->rings[rb->ring_cnt]; r = calloc(1, sizeof(*r));
memset(r, 0, sizeof(*r)); if (!r)
return libbpf_err(-ENOMEM);
rb->rings[rb->ring_cnt] = r;
r->map_fd = map_fd; r->map_fd = map_fd;
r->sample_cb = sample_cb; r->sample_cb = sample_cb;
...@@ -121,7 +125,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, ...@@ -121,7 +125,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
err = -errno; err = -errno;
pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n", pr_warn("ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
map_fd, err); map_fd, err);
return libbpf_err(err); goto err_out;
} }
r->consumer_pos = tmp; r->consumer_pos = tmp;
...@@ -131,16 +135,16 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, ...@@ -131,16 +135,16 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
*/ */
mmap_sz = rb->page_size + 2 * (__u64)info.max_entries; mmap_sz = rb->page_size + 2 * (__u64)info.max_entries;
if (mmap_sz != (__u64)(size_t)mmap_sz) { if (mmap_sz != (__u64)(size_t)mmap_sz) {
err = -E2BIG;
pr_warn("ringbuf: ring buffer size (%u) is too big\n", info.max_entries); pr_warn("ringbuf: ring buffer size (%u) is too big\n", info.max_entries);
return libbpf_err(-E2BIG); goto err_out;
} }
tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ, MAP_SHARED, map_fd, rb->page_size); tmp = mmap(NULL, (size_t)mmap_sz, PROT_READ, MAP_SHARED, map_fd, rb->page_size);
if (tmp == MAP_FAILED) { if (tmp == MAP_FAILED) {
err = -errno; err = -errno;
ringbuf_unmap_ring(rb, r);
pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n", pr_warn("ringbuf: failed to mmap data pages for map fd=%d: %d\n",
map_fd, err); map_fd, err);
return libbpf_err(err); goto err_out;
} }
r->producer_pos = tmp; r->producer_pos = tmp;
r->data = tmp + rb->page_size; r->data = tmp + rb->page_size;
...@@ -152,14 +156,17 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd, ...@@ -152,14 +156,17 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
e->data.fd = rb->ring_cnt; e->data.fd = rb->ring_cnt;
if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) { if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, e) < 0) {
err = -errno; err = -errno;
ringbuf_unmap_ring(rb, r);
pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n", pr_warn("ringbuf: failed to epoll add map fd=%d: %d\n",
map_fd, err); map_fd, err);
return libbpf_err(err); goto err_out;
} }
rb->ring_cnt++; rb->ring_cnt++;
return 0; return 0;
err_out:
ringbuf_free_ring(rb, r);
return libbpf_err(err);
} }
void ring_buffer__free(struct ring_buffer *rb) void ring_buffer__free(struct ring_buffer *rb)
...@@ -170,7 +177,7 @@ void ring_buffer__free(struct ring_buffer *rb) ...@@ -170,7 +177,7 @@ void ring_buffer__free(struct ring_buffer *rb)
return; return;
for (i = 0; i < rb->ring_cnt; ++i) for (i = 0; i < rb->ring_cnt; ++i)
ringbuf_unmap_ring(rb, &rb->rings[i]); ringbuf_free_ring(rb, rb->rings[i]);
if (rb->epoll_fd >= 0) if (rb->epoll_fd >= 0)
close(rb->epoll_fd); close(rb->epoll_fd);
...@@ -278,7 +285,7 @@ int ring_buffer__consume(struct ring_buffer *rb) ...@@ -278,7 +285,7 @@ int ring_buffer__consume(struct ring_buffer *rb)
int i; int i;
for (i = 0; i < rb->ring_cnt; i++) { for (i = 0; i < rb->ring_cnt; i++) {
struct ring *ring = &rb->rings[i]; struct ring *ring = rb->rings[i];
err = ringbuf_process_ring(ring); err = ringbuf_process_ring(ring);
if (err < 0) if (err < 0)
...@@ -305,7 +312,7 @@ int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms) ...@@ -305,7 +312,7 @@ int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
for (i = 0; i < cnt; i++) { for (i = 0; i < cnt; i++) {
__u32 ring_id = rb->events[i].data.fd; __u32 ring_id = rb->events[i].data.fd;
struct ring *ring = &rb->rings[ring_id]; struct ring *ring = rb->rings[ring_id];
err = ringbuf_process_ring(ring); err = ringbuf_process_ring(ring);
if (err < 0) if (err < 0)
...@@ -323,6 +330,58 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb) ...@@ -323,6 +330,58 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb)
return rb->epoll_fd; return rb->epoll_fd;
} }
struct ring *ring_buffer__ring(struct ring_buffer *rb, unsigned int idx)
{
if (idx >= rb->ring_cnt)
return errno = ERANGE, NULL;
return rb->rings[idx];
}
unsigned long ring__consumer_pos(const struct ring *r)
{
/* Synchronizes with smp_store_release() in ringbuf_process_ring(). */
return smp_load_acquire(r->consumer_pos);
}
unsigned long ring__producer_pos(const struct ring *r)
{
/* Synchronizes with smp_store_release() in __bpf_ringbuf_reserve() in
* the kernel.
*/
return smp_load_acquire(r->producer_pos);
}
size_t ring__avail_data_size(const struct ring *r)
{
unsigned long cons_pos, prod_pos;
cons_pos = ring__consumer_pos(r);
prod_pos = ring__producer_pos(r);
return prod_pos - cons_pos;
}
size_t ring__size(const struct ring *r)
{
return r->mask + 1;
}
int ring__map_fd(const struct ring *r)
{
return r->map_fd;
}
int ring__consume(struct ring *r)
{
int64_t res;
res = ringbuf_process_ring(r);
if (res < 0)
return libbpf_err(res);
return res > INT_MAX ? INT_MAX : res;
}
static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb) static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb)
{ {
if (rb->consumer_pos) { if (rb->consumer_pos) {
......
...@@ -91,6 +91,9 @@ static void ringbuf_subtest(void) ...@@ -91,6 +91,9 @@ static void ringbuf_subtest(void)
int err, cnt, rb_fd; int err, cnt, rb_fd;
int page_size = getpagesize(); int page_size = getpagesize();
void *mmap_ptr, *tmp_ptr; void *mmap_ptr, *tmp_ptr;
struct ring *ring;
int map_fd;
unsigned long avail_data, ring_size, cons_pos, prod_pos;
skel = test_ringbuf_lskel__open(); skel = test_ringbuf_lskel__open();
if (CHECK(!skel, "skel_open", "skeleton open failed\n")) if (CHECK(!skel, "skel_open", "skeleton open failed\n"))
...@@ -162,6 +165,13 @@ static void ringbuf_subtest(void) ...@@ -162,6 +165,13 @@ static void ringbuf_subtest(void)
trigger_samples(); trigger_samples();
ring = ring_buffer__ring(ringbuf, 0);
if (!ASSERT_OK_PTR(ring, "ring_buffer__ring_idx_0"))
goto cleanup;
map_fd = ring__map_fd(ring);
ASSERT_EQ(map_fd, skel->maps.ringbuf.map_fd, "ring_map_fd");
/* 2 submitted + 1 discarded records */ /* 2 submitted + 1 discarded records */
CHECK(skel->bss->avail_data != 3 * rec_sz, CHECK(skel->bss->avail_data != 3 * rec_sz,
"err_avail_size", "exp %ld, got %ld\n", "err_avail_size", "exp %ld, got %ld\n",
...@@ -176,6 +186,18 @@ static void ringbuf_subtest(void) ...@@ -176,6 +186,18 @@ static void ringbuf_subtest(void)
"err_prod_pos", "exp %ld, got %ld\n", "err_prod_pos", "exp %ld, got %ld\n",
3L * rec_sz, skel->bss->prod_pos); 3L * rec_sz, skel->bss->prod_pos);
/* verify getting this data directly via the ring object yields the same
* results
*/
avail_data = ring__avail_data_size(ring);
ASSERT_EQ(avail_data, 3 * rec_sz, "ring_avail_size");
ring_size = ring__size(ring);
ASSERT_EQ(ring_size, page_size, "ring_ring_size");
cons_pos = ring__consumer_pos(ring);
ASSERT_EQ(cons_pos, 0, "ring_cons_pos");
prod_pos = ring__producer_pos(ring);
ASSERT_EQ(prod_pos, 3 * rec_sz, "ring_prod_pos");
/* poll for samples */ /* poll for samples */
err = ring_buffer__poll(ringbuf, -1); err = ring_buffer__poll(ringbuf, -1);
...@@ -282,6 +304,10 @@ static void ringbuf_subtest(void) ...@@ -282,6 +304,10 @@ static void ringbuf_subtest(void)
err = ring_buffer__consume(ringbuf); err = ring_buffer__consume(ringbuf);
CHECK(err < 0, "rb_consume", "failed: %d\b", err); CHECK(err < 0, "rb_consume", "failed: %d\b", err);
/* also consume using ring__consume to make sure it works the same */
err = ring__consume(ring);
ASSERT_GE(err, 0, "ring_consume");
/* 3 rounds, 2 samples each */ /* 3 rounds, 2 samples each */
cnt = atomic_xchg(&sample_cnt, 0); cnt = atomic_xchg(&sample_cnt, 0);
CHECK(cnt != 6, "cnt", "exp %d samples, got %d\n", 6, cnt); CHECK(cnt != 6, "cnt", "exp %d samples, got %d\n", 6, cnt);
......
...@@ -42,6 +42,8 @@ void test_ringbuf_multi(void) ...@@ -42,6 +42,8 @@ void test_ringbuf_multi(void)
{ {
struct test_ringbuf_multi *skel; struct test_ringbuf_multi *skel;
struct ring_buffer *ringbuf = NULL; struct ring_buffer *ringbuf = NULL;
struct ring *ring_old;
struct ring *ring;
int err; int err;
int page_size = getpagesize(); int page_size = getpagesize();
int proto_fd = -1; int proto_fd = -1;
...@@ -84,11 +86,24 @@ void test_ringbuf_multi(void) ...@@ -84,11 +86,24 @@ void test_ringbuf_multi(void)
if (CHECK(!ringbuf, "ringbuf_create", "failed to create ringbuf\n")) if (CHECK(!ringbuf, "ringbuf_create", "failed to create ringbuf\n"))
goto cleanup; goto cleanup;
/* verify ring_buffer__ring returns expected results */
ring = ring_buffer__ring(ringbuf, 0);
if (!ASSERT_OK_PTR(ring, "ring_buffer__ring_idx_0"))
goto cleanup;
ring_old = ring;
ring = ring_buffer__ring(ringbuf, 1);
ASSERT_ERR_PTR(ring, "ring_buffer__ring_idx_1");
err = ring_buffer__add(ringbuf, bpf_map__fd(skel->maps.ringbuf2), err = ring_buffer__add(ringbuf, bpf_map__fd(skel->maps.ringbuf2),
process_sample, (void *)(long)2); process_sample, (void *)(long)2);
if (CHECK(err, "ringbuf_add", "failed to add another ring\n")) if (CHECK(err, "ringbuf_add", "failed to add another ring\n"))
goto cleanup; goto cleanup;
/* verify adding a new ring didn't invalidate our older pointer */
ring = ring_buffer__ring(ringbuf, 0);
if (!ASSERT_EQ(ring, ring_old, "ring_buffer__ring_again"))
goto cleanup;
err = test_ringbuf_multi__attach(skel); err = test_ringbuf_multi__attach(skel);
if (CHECK(err, "skel_attach", "skeleton attachment failed: %d\n", err)) if (CHECK(err, "skel_attach", "skeleton attachment failed: %d\n", err))
goto cleanup; goto cleanup;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment