Commit 523834b0 authored by Jon Maloy's avatar Jon Maloy Committed by Greg Kroah-Hartman

tipc: reduce risk of wakeup queue starvation

[ Upstream commit 7c5b4205 ]

In commit 365ad353 ("tipc: reduce risk of user starvation during
link congestion") we allowed senders to add exactly one list of extra
buffers to the link backlog queues during link congestion (aka
"oversubscription"). However, the criteria for when to stop adding
wakeup messages to the input queue when the overload abates is
inaccurate, and may cause starvation problems during very high load.

Currently, we stop adding wakeup messages after 10 total failed attempts
where we find that there is no space left in the backlog queue for a
certain importance level. The counter for this is accumulated across all
levels, which may lead the algorithm to leave the loop prematurely,
although there may still be plenty of space available at some levels.
The result is sometimes that messages near the wakeup queue tail are not
added to the input queue as they should be.

We now introduce a more exact algorithm, where we keep adding wakeup
messages to a level as long as the backlog queue has free slots for
the corresponding level, and stop at the moment there are no more such
slots or when there are no more wakeup messages to dequeue.

Fixes: 365ad353 ("tipc: reduce risk of user starvation during link congestion")
Reported-by: default avatarTung Nguyen <tung.q.nguyen@dektech.com.au>
Acked-by: default avatarYing Xue <ying.xue@windriver.com>
Signed-off-by: default avatarJon Maloy <jon.maloy@ericsson.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
Signed-off-by: default avatarSasha Levin <sashal@kernel.org>
parent b2b4ee81
......@@ -830,18 +830,31 @@ static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
*/
static void link_prepare_wakeup(struct tipc_link *l)
{
struct sk_buff_head *wakeupq = &l->wakeupq;
struct sk_buff_head *inputq = l->inputq;
struct sk_buff *skb, *tmp;
int imp, i = 0;
struct sk_buff_head tmpq;
int avail[5] = {0,};
int imp = 0;
__skb_queue_head_init(&tmpq);
skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
for (; imp <= TIPC_SYSTEM_IMPORTANCE; imp++)
avail[imp] = l->backlog[imp].limit - l->backlog[imp].len;
skb_queue_walk_safe(wakeupq, skb, tmp) {
imp = TIPC_SKB_CB(skb)->chain_imp;
if (l->backlog[imp].len < l->backlog[imp].limit) {
skb_unlink(skb, &l->wakeupq);
skb_queue_tail(l->inputq, skb);
} else if (i++ > 10) {
break;
}
if (avail[imp] <= 0)
continue;
avail[imp]--;
__skb_unlink(skb, wakeupq);
__skb_queue_tail(&tmpq, skb);
}
spin_lock_bh(&inputq->lock);
skb_queue_splice_tail(&tmpq, inputq);
spin_unlock_bh(&inputq->lock);
}
void tipc_link_reset(struct tipc_link *l)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment