Commit 00967e84 authored by Alexei Starovoitov's avatar Alexei Starovoitov

Merge branch 'af_xdp-smp_mb-fixes'

Magnus Karlsson says:

====================
This patch set fixes one bug and removes two dependencies on Linux
kernel headers from the XDP socket code in libbpf. A number of people
have pointed out that these two dependencies make it hard to build the
XDP socket part of libbpf without any kernel header dependencies. The
two removed dependecies are:

* Remove the usage of likely and unlikely (compiler.h) in xsk.h. It
  has been reported that the use of these actually decreases the
  performance of the ring access code due to an increase in
  instruction cache misses, so let us just remove these.

* Remove the dependency on barrier.h as it brings in a lot of kernel
  headers. As the XDP socket code only uses two simple functions from
  it, we can reimplement these. As a bonus, the new implementation is
  faster as it uses the same barrier primitives as the kernel does
  when the same code is compiled there. Without this patch, the user
  land code uses lfence and sfence on x86, which are unnecessarily
  harsh/thorough.

In the process of removing these dependencies a missing barrier
function for at least PPC64 was discovered. For a full explanation on
the missing barrier, please refer to patch 1. So the patch set now
starts with two patches fixing this. I have also added a patch at the
end removing this full memory barrier for x86 only, as it is not
needed there.

Structure of the patch set:
Patch 1-2: Adds the missing barrier function in kernel and user space.
Patch 3-4: Removes the dependencies
Patch 5: Optimizes the added barrier from patch 2 so that it does not
         do unnecessary work on x86.

v2 -> v3:
* Added missing memory barrier in ring code
* Added an explanation on the three barriers we use in the code
* Moved barrier functions from xsk.h to libbpf_util.h
* Added comment on why we have these functions in libbpf_util.h
* Added a new barrier function in user space that makes it possible to
  remove the full memory barrier on x86.

v1 -> v2:
* Added comment about validity of ARM 32-bit barriers.
  Only armv7 and above.

/Magnus
====================
Acked-by: default avatarSong Liu <songliubraving@fb.com>
Signed-off-by: default avatarAlexei Starovoitov <ast@kernel.org>
parents d1b7725d 2c5935f1
...@@ -43,6 +43,48 @@ struct xsk_queue { ...@@ -43,6 +43,48 @@ struct xsk_queue {
u64 invalid_descs; u64 invalid_descs;
}; };
/* The structure of the shared state of the rings are the same as the
* ring buffer in kernel/events/ring_buffer.c. For the Rx and completion
* ring, the kernel is the producer and user space is the consumer. For
* the Tx and fill rings, the kernel is the consumer and user space is
* the producer.
*
* producer consumer
*
* if (LOAD ->consumer) { LOAD ->producer
* (A) smp_rmb() (C)
* STORE $data LOAD $data
* smp_wmb() (B) smp_mb() (D)
* STORE ->producer STORE ->consumer
* }
*
* (A) pairs with (D), and (B) pairs with (C).
*
* Starting with (B), it protects the data from being written after
* the producer pointer. If this barrier was missing, the consumer
* could observe the producer pointer being set and thus load the data
* before the producer has written the new data. The consumer would in
* this case load the old data.
*
* (C) protects the consumer from speculatively loading the data before
* the producer pointer actually has been read. If we do not have this
* barrier, some architectures could load old data as speculative loads
* are not discarded as the CPU does not know there is a dependency
* between ->producer and data.
*
* (A) is a control dependency that separates the load of ->consumer
* from the stores of $data. In case ->consumer indicates there is no
* room in the buffer to store $data we do not. So no barrier is needed.
*
* (D) protects the load of the data to be observed to happen after the
* store of the consumer pointer. If we did not have this memory
* barrier, the producer could observe the consumer pointer being set
* and overwrite the data with a new value before the consumer got the
* chance to read the old value. The consumer would thus miss reading
* the old entry and very likely read the new entry twice, once right
* now and again after circling through the ring.
*/
/* Common functions operating for both RXTX and umem queues */ /* Common functions operating for both RXTX and umem queues */
static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
...@@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) ...@@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr)
static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr)
{ {
if (q->cons_tail == q->cons_head) { if (q->cons_tail == q->cons_head) {
smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail); WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
...@@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) ...@@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_tail, 1) == 0) if (xskq_nb_free(q, q->prod_tail, 1) == 0)
return -ENOSPC; return -ENOSPC;
/* A, matches D */
ring->desc[q->prod_tail++ & q->ring_mask] = addr; ring->desc[q->prod_tail++ & q->ring_mask] = addr;
/* Order producer and data */ /* Order producer and data */
smp_wmb(); smp_wmb(); /* B, matches C */
WRITE_ONCE(q->ring->producer, q->prod_tail); WRITE_ONCE(q->ring->producer, q->prod_tail);
return 0; return 0;
...@@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) ...@@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0)
return -ENOSPC; return -ENOSPC;
/* A, matches D */
ring->desc[q->prod_head++ & q->ring_mask] = addr; ring->desc[q->prod_head++ & q->ring_mask] = addr;
return 0; return 0;
} }
...@@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, ...@@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
u32 nb_entries) u32 nb_entries)
{ {
/* Order producer and data */ /* Order producer and data */
smp_wmb(); smp_wmb(); /* B, matches C */
q->prod_tail += nb_entries; q->prod_tail += nb_entries;
WRITE_ONCE(q->ring->producer, q->prod_tail); WRITE_ONCE(q->ring->producer, q->prod_tail);
...@@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q) ...@@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q)
if (xskq_nb_free(q, q->prod_head, 1) == 0) if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC; return -ENOSPC;
/* A, matches D */
q->prod_head++; q->prod_head++;
return 0; return 0;
} }
...@@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, ...@@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
struct xdp_desc *desc) struct xdp_desc *desc)
{ {
if (q->cons_tail == q->cons_head) { if (q->cons_tail == q->cons_head) {
smp_mb(); /* D, matches A */
WRITE_ONCE(q->ring->consumer, q->cons_tail); WRITE_ONCE(q->ring->consumer, q->cons_tail);
q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
/* Order consumer and data */ /* Order consumer and data */
smp_rmb(); smp_rmb(); /* C, matches B */
} }
return xskq_validate_desc(q, desc); return xskq_validate_desc(q, desc);
...@@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, ...@@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
if (xskq_nb_free(q, q->prod_head, 1) == 0) if (xskq_nb_free(q, q->prod_head, 1) == 0)
return -ENOSPC; return -ENOSPC;
/* A, matches D */
idx = (q->prod_head++) & q->ring_mask; idx = (q->prod_head++) & q->ring_mask;
ring->desc[idx].addr = addr; ring->desc[idx].addr = addr;
ring->desc[idx].len = len; ring->desc[idx].len = len;
...@@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, ...@@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
static inline void xskq_produce_flush_desc(struct xsk_queue *q) static inline void xskq_produce_flush_desc(struct xsk_queue *q)
{ {
/* Order producer and data */ /* Order producer and data */
smp_wmb(); smp_wmb(); /* B, matches C */
q->prod_tail = q->prod_head, q->prod_tail = q->prod_head,
WRITE_ONCE(q->ring->producer, q->prod_tail); WRITE_ONCE(q->ring->producer, q->prod_tail);
......
...@@ -23,6 +23,36 @@ do { \ ...@@ -23,6 +23,36 @@ do { \
#define pr_info(fmt, ...) __pr(LIBBPF_INFO, fmt, ##__VA_ARGS__) #define pr_info(fmt, ...) __pr(LIBBPF_INFO, fmt, ##__VA_ARGS__)
#define pr_debug(fmt, ...) __pr(LIBBPF_DEBUG, fmt, ##__VA_ARGS__) #define pr_debug(fmt, ...) __pr(LIBBPF_DEBUG, fmt, ##__VA_ARGS__)
/* Use these barrier functions instead of smp_[rw]mb() when they are
* used in a libbpf header file. That way they can be built into the
* application that uses libbpf.
*/
#if defined(__i386__) || defined(__x86_64__)
# define libbpf_smp_rmb() asm volatile("" : : : "memory")
# define libbpf_smp_wmb() asm volatile("" : : : "memory")
# define libbpf_smp_mb() \
asm volatile("lock; addl $0,-4(%%rsp)" : : : "memory", "cc")
/* Hinders stores to be observed before older loads. */
# define libbpf_smp_rwmb() asm volatile("" : : : "memory")
#elif defined(__aarch64__)
# define libbpf_smp_rmb() asm volatile("dmb ishld" : : : "memory")
# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
# define libbpf_smp_rwmb() libbpf_smp_mb()
#elif defined(__arm__)
/* These are only valid for armv7 and above */
# define libbpf_smp_rmb() asm volatile("dmb ish" : : : "memory")
# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory")
# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory")
# define libbpf_smp_rwmb() libbpf_smp_mb()
#else
# warning Architecture missing native barrier functions in libbpf_util.h.
# define libbpf_smp_rmb() __sync_synchronize()
# define libbpf_smp_wmb() __sync_synchronize()
# define libbpf_smp_mb() __sync_synchronize()
# define libbpf_smp_rwmb() __sync_synchronize()
#endif
#ifdef __cplusplus #ifdef __cplusplus
} /* extern "C" */ } /* extern "C" */
#endif #endif
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <linux/if_xdp.h> #include <linux/if_xdp.h>
#include "libbpf.h" #include "libbpf.h"
#include "libbpf_util.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -36,6 +37,10 @@ struct name { \ ...@@ -36,6 +37,10 @@ struct name { \
DEFINE_XSK_RING(xsk_ring_prod); DEFINE_XSK_RING(xsk_ring_prod);
DEFINE_XSK_RING(xsk_ring_cons); DEFINE_XSK_RING(xsk_ring_cons);
/* For a detailed explanation on the memory barriers associated with the
* ring, please take a look at net/xdp/xsk_queue.h.
*/
struct xsk_umem; struct xsk_umem;
struct xsk_socket; struct xsk_socket;
...@@ -105,7 +110,7 @@ static inline __u32 xsk_cons_nb_avail(struct xsk_ring_cons *r, __u32 nb) ...@@ -105,7 +110,7 @@ static inline __u32 xsk_cons_nb_avail(struct xsk_ring_cons *r, __u32 nb)
static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod, static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod,
size_t nb, __u32 *idx) size_t nb, __u32 *idx)
{ {
if (unlikely(xsk_prod_nb_free(prod, nb) < nb)) if (xsk_prod_nb_free(prod, nb) < nb)
return 0; return 0;
*idx = prod->cached_prod; *idx = prod->cached_prod;
...@@ -116,10 +121,10 @@ static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod, ...@@ -116,10 +121,10 @@ static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod,
static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, size_t nb) static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, size_t nb)
{ {
/* Make sure everything has been written to the ring before signalling /* Make sure everything has been written to the ring before indicating
* this to the kernel. * this to the kernel by writing the producer pointer.
*/ */
smp_wmb(); libbpf_smp_wmb();
*prod->producer += nb; *prod->producer += nb;
} }
...@@ -129,11 +134,11 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons, ...@@ -129,11 +134,11 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons,
{ {
size_t entries = xsk_cons_nb_avail(cons, nb); size_t entries = xsk_cons_nb_avail(cons, nb);
if (likely(entries > 0)) { if (entries > 0) {
/* Make sure we do not speculatively read the data before /* Make sure we do not speculatively read the data before
* we have received the packet buffers from the ring. * we have received the packet buffers from the ring.
*/ */
smp_rmb(); libbpf_smp_rmb();
*idx = cons->cached_cons; *idx = cons->cached_cons;
cons->cached_cons += entries; cons->cached_cons += entries;
...@@ -144,6 +149,11 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons, ...@@ -144,6 +149,11 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons,
static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb) static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb)
{ {
/* Make sure data has been read before indicating we are done
* with the entries by updating the consumer pointer.
*/
libbpf_smp_rwmb();
*cons->consumer += nb; *cons->consumer += nb;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment