Commit 6dfb4367 authored by Paolo Abeni's avatar Paolo Abeni Committed by David S. Miller

udp: keep the sk_receive_queue held when splicing

On packet reception, when we are forced to splice the
sk_receive_queue, we can keep the related lock held, so
that we can avoid re-acquiring it, if fwd memory
scheduling is required.

v1 -> v2:
  the rx_queue_lock_held param in udp_rmem_release() is
  now a bool
Signed-off-by: default avatarPaolo Abeni <pabeni@redhat.com>
Acked-by: default avatarEric Dumazet <edumazet@google.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parent 2276f58a
...@@ -1164,7 +1164,8 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset, ...@@ -1164,7 +1164,8 @@ int udp_sendpage(struct sock *sk, struct page *page, int offset,
} }
/* fully reclaim rmem/fwd memory allocated for skb */ /* fully reclaim rmem/fwd memory allocated for skb */
static void udp_rmem_release(struct sock *sk, int size, int partial) static void udp_rmem_release(struct sock *sk, int size, int partial,
bool rx_queue_lock_held)
{ {
struct udp_sock *up = udp_sk(sk); struct udp_sock *up = udp_sk(sk);
struct sk_buff_head *sk_queue; struct sk_buff_head *sk_queue;
...@@ -1181,10 +1182,14 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) ...@@ -1181,10 +1182,14 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
} }
up->forward_deficit = 0; up->forward_deficit = 0;
/* acquire the sk_receive_queue for fwd allocated memory scheduling */ /* acquire the sk_receive_queue for fwd allocated memory scheduling,
* if the called don't held it already
*/
sk_queue = &sk->sk_receive_queue; sk_queue = &sk->sk_receive_queue;
if (!rx_queue_lock_held)
spin_lock(&sk_queue->lock); spin_lock(&sk_queue->lock);
sk->sk_forward_alloc += size; sk->sk_forward_alloc += size;
amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1); amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
sk->sk_forward_alloc -= amt; sk->sk_forward_alloc -= amt;
...@@ -1197,6 +1202,7 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) ...@@ -1197,6 +1202,7 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
/* this can save us from acquiring the rx queue lock on next receive */ /* this can save us from acquiring the rx queue lock on next receive */
skb_queue_splice_tail_init(sk_queue, &up->reader_queue); skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
if (!rx_queue_lock_held)
spin_unlock(&sk_queue->lock); spin_unlock(&sk_queue->lock);
} }
...@@ -1207,10 +1213,16 @@ static void udp_rmem_release(struct sock *sk, int size, int partial) ...@@ -1207,10 +1213,16 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
*/ */
void udp_skb_destructor(struct sock *sk, struct sk_buff *skb) void udp_skb_destructor(struct sock *sk, struct sk_buff *skb)
{ {
udp_rmem_release(sk, skb->dev_scratch, 1); udp_rmem_release(sk, skb->dev_scratch, 1, false);
} }
EXPORT_SYMBOL(udp_skb_destructor); EXPORT_SYMBOL(udp_skb_destructor);
/* as above, but the caller held the rx queue lock, too */
void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb)
{
udp_rmem_release(sk, skb->dev_scratch, 1, true);
}
/* Idea of busylocks is to let producers grab an extra spinlock /* Idea of busylocks is to let producers grab an extra spinlock
* to relieve pressure on the receive_queue spinlock shared by consumer. * to relieve pressure on the receive_queue spinlock shared by consumer.
* Under flood, this means that only one producer can be in line * Under flood, this means that only one producer can be in line
...@@ -1325,7 +1337,7 @@ void udp_destruct_sock(struct sock *sk) ...@@ -1325,7 +1337,7 @@ void udp_destruct_sock(struct sock *sk)
total += skb->truesize; total += skb->truesize;
kfree_skb(skb); kfree_skb(skb);
} }
udp_rmem_release(sk, total, 0); udp_rmem_release(sk, total, 0, true);
inet_sock_destruct(sk); inet_sock_destruct(sk);
} }
...@@ -1397,7 +1409,7 @@ static int first_packet_length(struct sock *sk) ...@@ -1397,7 +1409,7 @@ static int first_packet_length(struct sock *sk)
} }
res = skb ? skb->len : -1; res = skb ? skb->len : -1;
if (total) if (total)
udp_rmem_release(sk, total, 1); udp_rmem_release(sk, total, 1, false);
spin_unlock_bh(&rcvq->lock); spin_unlock_bh(&rcvq->lock);
return res; return res;
} }
...@@ -1471,16 +1483,20 @@ struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags, ...@@ -1471,16 +1483,20 @@ struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
goto busy_check; goto busy_check;
} }
/* refill the reader queue and walk it again */ /* refill the reader queue and walk it again
* keep both queues locked to avoid re-acquiring
* the sk_receive_queue lock if fwd memory scheduling
* is needed.
*/
_off = *off; _off = *off;
spin_lock(&sk_queue->lock); spin_lock(&sk_queue->lock);
skb_queue_splice_tail_init(sk_queue, queue); skb_queue_splice_tail_init(sk_queue, queue);
spin_unlock(&sk_queue->lock);
skb = __skb_try_recv_from_queue(sk, queue, flags, skb = __skb_try_recv_from_queue(sk, queue, flags,
udp_skb_destructor, udp_skb_dtor_locked,
peeked, &_off, err, peeked, &_off, err,
&last); &last);
spin_unlock(&sk_queue->lock);
spin_unlock_bh(&queue->lock); spin_unlock_bh(&queue->lock);
if (skb) { if (skb) {
*off = _off; *off = _off;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment