Commit 1bf36496 authored by Magnus Karlsson's avatar Magnus Karlsson Committed by Daniel Borkmann

selftests: xsk: Introduce pacing of traffic

Introduce pacing of traffic so that the Tx thread can never send more
packets than the receiver has processed plus the number of packets it
can have in its umem. So at any point in time, the number of in flight
packets (not processed by the Rx thread) are less than or equal to the
number of packets that can be held in the Rx thread's umem.

The batch size is also increased to improve running time.
Signed-off-by: default avatarMagnus Karlsson <magnus.karlsson@intel.com>
Signed-off-by: default avatarDaniel Borkmann <daniel@iogearbox.net>
Link: https://lore.kernel.org/bpf/20210922075613.12186-11-magnus.karlsson@gmail.com
parent 89013b8a
...@@ -384,6 +384,7 @@ static void __test_spec_init(struct test_spec *test, struct ifobject *ifobj_tx, ...@@ -384,6 +384,7 @@ static void __test_spec_init(struct test_spec *test, struct ifobject *ifobj_tx,
ifobj->umem = &ifobj->umem_arr[0]; ifobj->umem = &ifobj->umem_arr[0];
ifobj->xsk = &ifobj->xsk_arr[0]; ifobj->xsk = &ifobj->xsk_arr[0];
ifobj->use_poll = false; ifobj->use_poll = false;
ifobj->pacing_on = true;
ifobj->pkt_stream = test->pkt_stream_default; ifobj->pkt_stream = test->pkt_stream_default;
if (i == 0) { if (i == 0) {
...@@ -724,6 +725,7 @@ static void receive_pkts(struct pkt_stream *pkt_stream, struct xsk_socket_info * ...@@ -724,6 +725,7 @@ static void receive_pkts(struct pkt_stream *pkt_stream, struct xsk_socket_info *
{ {
struct pkt *pkt = pkt_stream_get_next_rx_pkt(pkt_stream); struct pkt *pkt = pkt_stream_get_next_rx_pkt(pkt_stream);
u32 idx_rx = 0, idx_fq = 0, rcvd, i; u32 idx_rx = 0, idx_fq = 0, rcvd, i;
u32 total = 0;
int ret; int ret;
while (pkt) { while (pkt) {
...@@ -772,6 +774,13 @@ static void receive_pkts(struct pkt_stream *pkt_stream, struct xsk_socket_info * ...@@ -772,6 +774,13 @@ static void receive_pkts(struct pkt_stream *pkt_stream, struct xsk_socket_info *
xsk_ring_prod__submit(&xsk->umem->fq, rcvd); xsk_ring_prod__submit(&xsk->umem->fq, rcvd);
xsk_ring_cons__release(&xsk->rx, rcvd); xsk_ring_cons__release(&xsk->rx, rcvd);
pthread_mutex_lock(&pacing_mutex);
pkts_in_flight -= rcvd;
total += rcvd;
if (pkts_in_flight < umem->num_frames)
pthread_cond_signal(&pacing_cond);
pthread_mutex_unlock(&pacing_mutex);
} }
} }
...@@ -797,10 +806,19 @@ static u32 __send_pkts(struct ifobject *ifobject, u32 pkt_nb) ...@@ -797,10 +806,19 @@ static u32 __send_pkts(struct ifobject *ifobject, u32 pkt_nb)
valid_pkts++; valid_pkts++;
} }
pthread_mutex_lock(&pacing_mutex);
pkts_in_flight += valid_pkts;
if (ifobject->pacing_on && pkts_in_flight >= ifobject->umem->num_frames - BATCH_SIZE) {
kick_tx(xsk);
pthread_cond_wait(&pacing_cond, &pacing_mutex);
}
pthread_mutex_unlock(&pacing_mutex);
xsk_ring_prod__submit(&xsk->tx, i); xsk_ring_prod__submit(&xsk->tx, i);
xsk->outstanding_tx += valid_pkts; xsk->outstanding_tx += valid_pkts;
complete_pkts(xsk, BATCH_SIZE); complete_pkts(xsk, i);
usleep(10);
return i; return i;
} }
...@@ -819,8 +837,6 @@ static void send_pkts(struct ifobject *ifobject) ...@@ -819,8 +837,6 @@ static void send_pkts(struct ifobject *ifobject)
fds.events = POLLOUT; fds.events = POLLOUT;
while (pkt_cnt < ifobject->pkt_stream->nb_pkts) { while (pkt_cnt < ifobject->pkt_stream->nb_pkts) {
u32 sent;
if (ifobject->use_poll) { if (ifobject->use_poll) {
int ret; int ret;
...@@ -832,9 +848,7 @@ static void send_pkts(struct ifobject *ifobject) ...@@ -832,9 +848,7 @@ static void send_pkts(struct ifobject *ifobject)
continue; continue;
} }
sent = __send_pkts(ifobject, pkt_cnt); pkt_cnt += __send_pkts(ifobject, pkt_cnt);
pkt_cnt += sent;
usleep(10);
} }
wait_for_tx_completion(ifobject->xsk); wait_for_tx_completion(ifobject->xsk);
...@@ -1043,6 +1057,7 @@ static void testapp_validate_traffic(struct test_spec *test) ...@@ -1043,6 +1057,7 @@ static void testapp_validate_traffic(struct test_spec *test)
test->current_step++; test->current_step++;
pkt_stream_reset(ifobj_rx->pkt_stream); pkt_stream_reset(ifobj_rx->pkt_stream);
pkts_in_flight = 0;
/*Spawn RX thread */ /*Spawn RX thread */
pthread_create(&t0, NULL, ifobj_rx->func_ptr, test); pthread_create(&t0, NULL, ifobj_rx->func_ptr, test);
...@@ -1126,6 +1141,8 @@ static void testapp_stats(struct test_spec *test) ...@@ -1126,6 +1141,8 @@ static void testapp_stats(struct test_spec *test)
for (i = 0; i < STAT_TEST_TYPE_MAX; i++) { for (i = 0; i < STAT_TEST_TYPE_MAX; i++) {
test_spec_reset(test); test_spec_reset(test);
stat_test_type = i; stat_test_type = i;
/* No or few packets will be received so cannot pace packets */
test->ifobj_tx->pacing_on = false;
switch (stat_test_type) { switch (stat_test_type) {
case STAT_TEST_RX_DROPPED: case STAT_TEST_RX_DROPPED:
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
#define UDP_PKT_DATA_SIZE (UDP_PKT_SIZE - sizeof(struct udphdr)) #define UDP_PKT_DATA_SIZE (UDP_PKT_SIZE - sizeof(struct udphdr))
#define USLEEP_MAX 10000 #define USLEEP_MAX 10000
#define SOCK_RECONF_CTR 10 #define SOCK_RECONF_CTR 10
#define BATCH_SIZE 8 #define BATCH_SIZE 64
#define POLL_TMOUT 1000 #define POLL_TMOUT 1000
#define DEFAULT_PKT_CNT (4 * 1024) #define DEFAULT_PKT_CNT (4 * 1024)
#define DEFAULT_UMEM_BUFFERS (DEFAULT_PKT_CNT / 4) #define DEFAULT_UMEM_BUFFERS (DEFAULT_PKT_CNT / 4)
...@@ -136,6 +136,7 @@ struct ifobject { ...@@ -136,6 +136,7 @@ struct ifobject {
bool tx_on; bool tx_on;
bool rx_on; bool rx_on;
bool use_poll; bool use_poll;
bool pacing_on;
u8 dst_mac[ETH_ALEN]; u8 dst_mac[ETH_ALEN];
u8 src_mac[ETH_ALEN]; u8 src_mac[ETH_ALEN];
}; };
...@@ -151,5 +152,9 @@ struct test_spec { ...@@ -151,5 +152,9 @@ struct test_spec {
}; };
pthread_barrier_t barr; pthread_barrier_t barr;
pthread_mutex_t pacing_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t pacing_cond = PTHREAD_COND_INITIALIZER;
u32 pkts_in_flight;
#endif /* XDPXCEIVER_H */ #endif /* XDPXCEIVER_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment