Commit 3106c580 authored by Magnus Karlsson's avatar Magnus Karlsson Committed by Daniel Borkmann

i40e: Use batched xsk Tx interfaces to increase performance

Use the new batched xsk interfaces for the Tx path in the i40e driver
to improve performance. On my machine, this yields a throughput
increase of 4% for the l2fwd sample app in xdpsock. If we instead just
look at the Tx part, this patch set increases throughput with above
20% for Tx.

Note that I had to explicitly loop unroll the inner loop to get to
this performance level, by using a pragma. It is honored by both clang
and gcc and should be ignored by versions that do not support
it. Using the -funroll-loops compiler command line switch on the
source file resulted in a loop unrolling on a higher level that
lead to a performance decrease instead of an increase.
Signed-off-by: default avatarMagnus Karlsson <magnus.karlsson@intel.com>
Signed-off-by: default avatarDaniel Borkmann <daniel@iogearbox.net>
Acked-by: default avatarJohn Fastabend <john.fastabend@gmail.com>
Link: https://lore.kernel.org/bpf/1605525167-14450-6-git-send-email-magnus.karlsson@gmail.com
parent 9349eb3a
...@@ -676,6 +676,8 @@ void i40e_free_tx_resources(struct i40e_ring *tx_ring) ...@@ -676,6 +676,8 @@ void i40e_free_tx_resources(struct i40e_ring *tx_ring)
i40e_clean_tx_ring(tx_ring); i40e_clean_tx_ring(tx_ring);
kfree(tx_ring->tx_bi); kfree(tx_ring->tx_bi);
tx_ring->tx_bi = NULL; tx_ring->tx_bi = NULL;
kfree(tx_ring->xsk_descs);
tx_ring->xsk_descs = NULL;
if (tx_ring->desc) { if (tx_ring->desc) {
dma_free_coherent(tx_ring->dev, tx_ring->size, dma_free_coherent(tx_ring->dev, tx_ring->size,
...@@ -1277,6 +1279,13 @@ int i40e_setup_tx_descriptors(struct i40e_ring *tx_ring) ...@@ -1277,6 +1279,13 @@ int i40e_setup_tx_descriptors(struct i40e_ring *tx_ring)
if (!tx_ring->tx_bi) if (!tx_ring->tx_bi)
goto err; goto err;
if (ring_is_xdp(tx_ring)) {
tx_ring->xsk_descs = kcalloc(I40E_MAX_NUM_DESCRIPTORS, sizeof(*tx_ring->xsk_descs),
GFP_KERNEL);
if (!tx_ring->xsk_descs)
goto err;
}
u64_stats_init(&tx_ring->syncp); u64_stats_init(&tx_ring->syncp);
/* round up to nearest 4K */ /* round up to nearest 4K */
...@@ -1300,6 +1309,8 @@ int i40e_setup_tx_descriptors(struct i40e_ring *tx_ring) ...@@ -1300,6 +1309,8 @@ int i40e_setup_tx_descriptors(struct i40e_ring *tx_ring)
return 0; return 0;
err: err:
kfree(tx_ring->xsk_descs);
tx_ring->xsk_descs = NULL;
kfree(tx_ring->tx_bi); kfree(tx_ring->tx_bi);
tx_ring->tx_bi = NULL; tx_ring->tx_bi = NULL;
return -ENOMEM; return -ENOMEM;
......
...@@ -389,6 +389,7 @@ struct i40e_ring { ...@@ -389,6 +389,7 @@ struct i40e_ring {
struct i40e_channel *ch; struct i40e_channel *ch;
struct xdp_rxq_info xdp_rxq; struct xdp_rxq_info xdp_rxq;
struct xsk_buff_pool *xsk_pool; struct xsk_buff_pool *xsk_pool;
struct xdp_desc *xsk_descs; /* For storing descriptors in the AF_XDP ZC path */
} ____cacheline_internodealigned_in_smp; } ____cacheline_internodealigned_in_smp;
static inline bool ring_uses_build_skb(struct i40e_ring *ring) static inline bool ring_uses_build_skb(struct i40e_ring *ring)
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
/* Copyright(c) 2018 Intel Corporation. */ /* Copyright(c) 2018 Intel Corporation. */
#include <linux/bpf_trace.h> #include <linux/bpf_trace.h>
#include <linux/stringify.h>
#include <net/xdp_sock_drv.h> #include <net/xdp_sock_drv.h>
#include <net/xdp.h> #include <net/xdp.h>
...@@ -381,6 +382,69 @@ int i40e_clean_rx_irq_zc(struct i40e_ring *rx_ring, int budget) ...@@ -381,6 +382,69 @@ int i40e_clean_rx_irq_zc(struct i40e_ring *rx_ring, int budget)
return failure ? budget : (int)total_rx_packets; return failure ? budget : (int)total_rx_packets;
} }
static void i40e_xmit_pkt(struct i40e_ring *xdp_ring, struct xdp_desc *desc,
unsigned int *total_bytes)
{
struct i40e_tx_desc *tx_desc;
dma_addr_t dma;
dma = xsk_buff_raw_get_dma(xdp_ring->xsk_pool, desc->addr);
xsk_buff_raw_dma_sync_for_device(xdp_ring->xsk_pool, dma, desc->len);
tx_desc = I40E_TX_DESC(xdp_ring, xdp_ring->next_to_use++);
tx_desc->buffer_addr = cpu_to_le64(dma);
tx_desc->cmd_type_offset_bsz = build_ctob(I40E_TX_DESC_CMD_ICRC | I40E_TX_DESC_CMD_EOP,
0, desc->len, 0);
*total_bytes += desc->len;
}
static void i40e_xmit_pkt_batch(struct i40e_ring *xdp_ring, struct xdp_desc *desc,
unsigned int *total_bytes)
{
u16 ntu = xdp_ring->next_to_use;
struct i40e_tx_desc *tx_desc;
dma_addr_t dma;
u32 i;
loop_unrolled_for(i = 0; i < PKTS_PER_BATCH; i++) {
dma = xsk_buff_raw_get_dma(xdp_ring->xsk_pool, desc[i].addr);
xsk_buff_raw_dma_sync_for_device(xdp_ring->xsk_pool, dma, desc[i].len);
tx_desc = I40E_TX_DESC(xdp_ring, ntu++);
tx_desc->buffer_addr = cpu_to_le64(dma);
tx_desc->cmd_type_offset_bsz = build_ctob(I40E_TX_DESC_CMD_ICRC |
I40E_TX_DESC_CMD_EOP,
0, desc[i].len, 0);
*total_bytes += desc[i].len;
}
xdp_ring->next_to_use = ntu;
}
static void i40e_fill_tx_hw_ring(struct i40e_ring *xdp_ring, struct xdp_desc *descs, u32 nb_pkts,
unsigned int *total_bytes)
{
u32 batched, leftover, i;
batched = nb_pkts & ~(PKTS_PER_BATCH - 1);
leftover = nb_pkts & (PKTS_PER_BATCH - 1);
for (i = 0; i < batched; i += PKTS_PER_BATCH)
i40e_xmit_pkt_batch(xdp_ring, &descs[i], total_bytes);
for (i = batched; i < batched + leftover; i++)
i40e_xmit_pkt(xdp_ring, &descs[i], total_bytes);
}
static void i40e_set_rs_bit(struct i40e_ring *xdp_ring)
{
u16 ntu = xdp_ring->next_to_use ? xdp_ring->next_to_use - 1 : xdp_ring->count - 1;
struct i40e_tx_desc *tx_desc;
tx_desc = I40E_TX_DESC(xdp_ring, ntu);
tx_desc->cmd_type_offset_bsz |= (I40E_TX_DESC_CMD_RS << I40E_TXD_QW1_CMD_SHIFT);
}
/** /**
* i40e_xmit_zc - Performs zero-copy Tx AF_XDP * i40e_xmit_zc - Performs zero-copy Tx AF_XDP
* @xdp_ring: XDP Tx ring * @xdp_ring: XDP Tx ring
...@@ -390,45 +454,30 @@ int i40e_clean_rx_irq_zc(struct i40e_ring *rx_ring, int budget) ...@@ -390,45 +454,30 @@ int i40e_clean_rx_irq_zc(struct i40e_ring *rx_ring, int budget)
**/ **/
static bool i40e_xmit_zc(struct i40e_ring *xdp_ring, unsigned int budget) static bool i40e_xmit_zc(struct i40e_ring *xdp_ring, unsigned int budget)
{ {
unsigned int sent_frames = 0, total_bytes = 0; struct xdp_desc *descs = xdp_ring->xsk_descs;
struct i40e_tx_desc *tx_desc = NULL; u32 nb_pkts, nb_processed = 0;
struct xdp_desc desc; unsigned int total_bytes = 0;
dma_addr_t dma;
nb_pkts = xsk_tx_peek_release_desc_batch(xdp_ring->xsk_pool, descs, budget);
while (budget-- > 0) { if (!nb_pkts)
if (!xsk_tx_peek_desc(xdp_ring->xsk_pool, &desc)) return false;
break;
if (xdp_ring->next_to_use + nb_pkts >= xdp_ring->count) {
dma = xsk_buff_raw_get_dma(xdp_ring->xsk_pool, desc.addr); nb_processed = xdp_ring->count - xdp_ring->next_to_use;
xsk_buff_raw_dma_sync_for_device(xdp_ring->xsk_pool, dma, i40e_fill_tx_hw_ring(xdp_ring, descs, nb_processed, &total_bytes);
desc.len); xdp_ring->next_to_use = 0;
tx_desc = I40E_TX_DESC(xdp_ring, xdp_ring->next_to_use);
tx_desc->buffer_addr = cpu_to_le64(dma);
tx_desc->cmd_type_offset_bsz =
build_ctob(I40E_TX_DESC_CMD_ICRC
| I40E_TX_DESC_CMD_EOP,
0, desc.len, 0);
sent_frames++;
total_bytes += desc.len;
xdp_ring->next_to_use++;
if (xdp_ring->next_to_use == xdp_ring->count)
xdp_ring->next_to_use = 0;
} }
if (tx_desc) { i40e_fill_tx_hw_ring(xdp_ring, &descs[nb_processed], nb_pkts - nb_processed,
/* Request an interrupt for the last frame and bump tail ptr. */ &total_bytes);
tx_desc->cmd_type_offset_bsz |= (I40E_TX_DESC_CMD_RS <<
I40E_TXD_QW1_CMD_SHIFT);
i40e_xdp_ring_update_tail(xdp_ring);
xsk_tx_release(xdp_ring->xsk_pool); /* Request an interrupt for the last frame and bump tail ptr. */
i40e_update_tx_stats(xdp_ring, sent_frames, total_bytes); i40e_set_rs_bit(xdp_ring);
} i40e_xdp_ring_update_tail(xdp_ring);
i40e_update_tx_stats(xdp_ring, nb_pkts, total_bytes);
return !!budget; return true;
} }
/** /**
......
...@@ -4,6 +4,22 @@ ...@@ -4,6 +4,22 @@
#ifndef _I40E_XSK_H_ #ifndef _I40E_XSK_H_
#define _I40E_XSK_H_ #define _I40E_XSK_H_
/* This value should match the pragma in the loop_unrolled_for
* macro. Why 4? It is strictly empirical. It seems to be a good
* compromise between the advantage of having simultaneous outstanding
* reads to the DMA array that can hide each others latency and the
* disadvantage of having a larger code path.
*/
#define PKTS_PER_BATCH 4
#ifdef __clang__
#define loop_unrolled_for _Pragma("clang loop unroll_count(4)") for
#elif __GNUC__ >= 8
#define loop_unrolled_for _Pragma("GCC unroll 4") for
#else
#define loop_unrolled_for for
#endif
struct i40e_vsi; struct i40e_vsi;
struct xsk_buff_pool; struct xsk_buff_pool;
struct zero_copy_allocator; struct zero_copy_allocator;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment