Commit e940eb3c authored by David S. Miller's avatar David S. Miller

Merge branch 'lockless-qdisc-opts'

Yunsheng Lin says:

====================
Some optimization for lockless qdisc

Patch 1: remove unnecessary seqcount operation.
Patch 2: implement TCQ_F_CAN_BYPASS.
Patch 3: remove qdisc->empty.

Performance data for pktgen in queue_xmit mode + dummy netdev
with pfifo_fast:

 threads    unpatched           patched             delta
    1       2.60Mpps            3.21Mpps             +23%
    2       3.84Mpps            5.56Mpps             +44%
    4       5.52Mpps            5.58Mpps             +1%
    8       2.77Mpps            2.76Mpps             -0.3%
   16       2.24Mpps            2.23Mpps             -0.4%

Performance for IP forward testing: 1.05Mpps increases to
1.16Mpps, about 10% improvement.

V3: Add 'Acked-by' from Jakub and 'Tested-by' from Vladimir,
    and resend based on latest net-next.
V2: Adjust the comment and commit log according to discussion
    in V1.
V1: Drop RFC tag, add nolock_qdisc_is_empty() and do the qdisc
    empty checking without the protection of qdisc->seqlock to
    aviod doing unnecessary spin_trylock() for contention case.
RFC v4: Use STATE_MISSED and STATE_DRAINING to indicate non-empty
        qdisc, and add patch 1 and 3.
====================
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents 38f75922 d3e0f575
...@@ -37,8 +37,15 @@ enum qdisc_state_t { ...@@ -37,8 +37,15 @@ enum qdisc_state_t {
__QDISC_STATE_SCHED, __QDISC_STATE_SCHED,
__QDISC_STATE_DEACTIVATED, __QDISC_STATE_DEACTIVATED,
__QDISC_STATE_MISSED, __QDISC_STATE_MISSED,
__QDISC_STATE_DRAINING,
}; };
#define QDISC_STATE_MISSED BIT(__QDISC_STATE_MISSED)
#define QDISC_STATE_DRAINING BIT(__QDISC_STATE_DRAINING)
#define QDISC_STATE_NON_EMPTY (QDISC_STATE_MISSED | \
QDISC_STATE_DRAINING)
struct qdisc_size_table { struct qdisc_size_table {
struct rcu_head rcu; struct rcu_head rcu;
struct list_head list; struct list_head list;
...@@ -110,8 +117,6 @@ struct Qdisc { ...@@ -110,8 +117,6 @@ struct Qdisc {
spinlock_t busylock ____cacheline_aligned_in_smp; spinlock_t busylock ____cacheline_aligned_in_smp;
spinlock_t seqlock; spinlock_t seqlock;
/* for NOLOCK qdisc, true if there are no enqueued skbs */
bool empty;
struct rcu_head rcu; struct rcu_head rcu;
/* private data */ /* private data */
...@@ -145,6 +150,11 @@ static inline bool qdisc_is_running(struct Qdisc *qdisc) ...@@ -145,6 +150,11 @@ static inline bool qdisc_is_running(struct Qdisc *qdisc)
return (raw_read_seqcount(&qdisc->running) & 1) ? true : false; return (raw_read_seqcount(&qdisc->running) & 1) ? true : false;
} }
static inline bool nolock_qdisc_is_empty(const struct Qdisc *qdisc)
{
return !(READ_ONCE(qdisc->state) & QDISC_STATE_NON_EMPTY);
}
static inline bool qdisc_is_percpu_stats(const struct Qdisc *q) static inline bool qdisc_is_percpu_stats(const struct Qdisc *q)
{ {
return q->flags & TCQ_F_CPUSTATS; return q->flags & TCQ_F_CPUSTATS;
...@@ -153,7 +163,7 @@ static inline bool qdisc_is_percpu_stats(const struct Qdisc *q) ...@@ -153,7 +163,7 @@ static inline bool qdisc_is_percpu_stats(const struct Qdisc *q)
static inline bool qdisc_is_empty(const struct Qdisc *qdisc) static inline bool qdisc_is_empty(const struct Qdisc *qdisc)
{ {
if (qdisc_is_percpu_stats(qdisc)) if (qdisc_is_percpu_stats(qdisc))
return READ_ONCE(qdisc->empty); return nolock_qdisc_is_empty(qdisc);
return !READ_ONCE(qdisc->q.qlen); return !READ_ONCE(qdisc->q.qlen);
} }
...@@ -161,7 +171,7 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) ...@@ -161,7 +171,7 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc)
{ {
if (qdisc->flags & TCQ_F_NOLOCK) { if (qdisc->flags & TCQ_F_NOLOCK) {
if (spin_trylock(&qdisc->seqlock)) if (spin_trylock(&qdisc->seqlock))
goto nolock_empty; return true;
/* If the MISSED flag is set, it means other thread has /* If the MISSED flag is set, it means other thread has
* set the MISSED flag before second spin_trylock(), so * set the MISSED flag before second spin_trylock(), so
...@@ -183,11 +193,7 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) ...@@ -183,11 +193,7 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc)
/* Retry again in case other CPU may not see the new flag /* Retry again in case other CPU may not see the new flag
* after it releases the lock at the end of qdisc_run_end(). * after it releases the lock at the end of qdisc_run_end().
*/ */
if (!spin_trylock(&qdisc->seqlock)) return spin_trylock(&qdisc->seqlock);
return false;
nolock_empty:
WRITE_ONCE(qdisc->empty, false);
} else if (qdisc_is_running(qdisc)) { } else if (qdisc_is_running(qdisc)) {
return false; return false;
} }
...@@ -201,15 +207,14 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc) ...@@ -201,15 +207,14 @@ static inline bool qdisc_run_begin(struct Qdisc *qdisc)
static inline void qdisc_run_end(struct Qdisc *qdisc) static inline void qdisc_run_end(struct Qdisc *qdisc)
{ {
write_seqcount_end(&qdisc->running);
if (qdisc->flags & TCQ_F_NOLOCK) { if (qdisc->flags & TCQ_F_NOLOCK) {
spin_unlock(&qdisc->seqlock); spin_unlock(&qdisc->seqlock);
if (unlikely(test_bit(__QDISC_STATE_MISSED, if (unlikely(test_bit(__QDISC_STATE_MISSED,
&qdisc->state))) { &qdisc->state)))
clear_bit(__QDISC_STATE_MISSED, &qdisc->state);
__netif_schedule(qdisc); __netif_schedule(qdisc);
} } else {
write_seqcount_end(&qdisc->running);
} }
} }
......
...@@ -3852,10 +3852,33 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, ...@@ -3852,10 +3852,33 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
qdisc_calculate_pkt_len(skb, q); qdisc_calculate_pkt_len(skb, q);
if (q->flags & TCQ_F_NOLOCK) { if (q->flags & TCQ_F_NOLOCK) {
if (q->flags & TCQ_F_CAN_BYPASS && nolock_qdisc_is_empty(q) &&
qdisc_run_begin(q)) {
/* Retest nolock_qdisc_is_empty() within the protection
* of q->seqlock to protect from racing with requeuing.
*/
if (unlikely(!nolock_qdisc_is_empty(q))) {
rc = q->enqueue(skb, q, &to_free) &
NET_XMIT_MASK;
__qdisc_run(q);
qdisc_run_end(q);
goto no_lock_out;
}
qdisc_bstats_cpu_update(q, skb);
if (sch_direct_xmit(skb, q, dev, txq, NULL, true) &&
!nolock_qdisc_is_empty(q))
__qdisc_run(q);
qdisc_run_end(q);
return NET_XMIT_SUCCESS;
}
rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK; rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK;
if (likely(!netif_xmit_frozen_or_stopped(txq))) qdisc_run(q);
qdisc_run(q);
no_lock_out:
if (unlikely(to_free)) if (unlikely(to_free))
kfree_skb_list(to_free); kfree_skb_list(to_free);
return rc; return rc;
......
...@@ -52,6 +52,8 @@ static void qdisc_maybe_clear_missed(struct Qdisc *q, ...@@ -52,6 +52,8 @@ static void qdisc_maybe_clear_missed(struct Qdisc *q,
*/ */
if (!netif_xmit_frozen_or_stopped(txq)) if (!netif_xmit_frozen_or_stopped(txq))
set_bit(__QDISC_STATE_MISSED, &q->state); set_bit(__QDISC_STATE_MISSED, &q->state);
else
set_bit(__QDISC_STATE_DRAINING, &q->state);
} }
/* Main transmission queue. */ /* Main transmission queue. */
...@@ -164,9 +166,13 @@ static inline void dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q) ...@@ -164,9 +166,13 @@ static inline void dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
skb = next; skb = next;
} }
if (lock)
if (lock) {
spin_unlock(lock); spin_unlock(lock);
__netif_schedule(q); set_bit(__QDISC_STATE_MISSED, &q->state);
} else {
__netif_schedule(q);
}
} }
static void try_bulk_dequeue_skb(struct Qdisc *q, static void try_bulk_dequeue_skb(struct Qdisc *q,
...@@ -409,7 +415,11 @@ void __qdisc_run(struct Qdisc *q) ...@@ -409,7 +415,11 @@ void __qdisc_run(struct Qdisc *q)
while (qdisc_restart(q, &packets)) { while (qdisc_restart(q, &packets)) {
quota -= packets; quota -= packets;
if (quota <= 0) { if (quota <= 0) {
__netif_schedule(q); if (q->flags & TCQ_F_NOLOCK)
set_bit(__QDISC_STATE_MISSED, &q->state);
else
__netif_schedule(q);
break; break;
} }
} }
...@@ -698,13 +708,14 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) ...@@ -698,13 +708,14 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc)
if (likely(skb)) { if (likely(skb)) {
qdisc_update_stats_at_dequeue(qdisc, skb); qdisc_update_stats_at_dequeue(qdisc, skb);
} else if (need_retry && } else if (need_retry &&
test_bit(__QDISC_STATE_MISSED, &qdisc->state)) { READ_ONCE(qdisc->state) & QDISC_STATE_NON_EMPTY) {
/* Delay clearing the STATE_MISSED here to reduce /* Delay clearing the STATE_MISSED here to reduce
* the overhead of the second spin_trylock() in * the overhead of the second spin_trylock() in
* qdisc_run_begin() and __netif_schedule() calling * qdisc_run_begin() and __netif_schedule() calling
* in qdisc_run_end(). * in qdisc_run_end().
*/ */
clear_bit(__QDISC_STATE_MISSED, &qdisc->state); clear_bit(__QDISC_STATE_MISSED, &qdisc->state);
clear_bit(__QDISC_STATE_DRAINING, &qdisc->state);
/* Make sure dequeuing happens after clearing /* Make sure dequeuing happens after clearing
* STATE_MISSED. * STATE_MISSED.
...@@ -714,8 +725,6 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc) ...@@ -714,8 +725,6 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc *qdisc)
need_retry = false; need_retry = false;
goto retry; goto retry;
} else {
WRITE_ONCE(qdisc->empty, true);
} }
return skb; return skb;
...@@ -916,7 +925,6 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue, ...@@ -916,7 +925,6 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
sch->enqueue = ops->enqueue; sch->enqueue = ops->enqueue;
sch->dequeue = ops->dequeue; sch->dequeue = ops->dequeue;
sch->dev_queue = dev_queue; sch->dev_queue = dev_queue;
sch->empty = true;
dev_hold(dev); dev_hold(dev);
refcount_set(&sch->refcnt, 1); refcount_set(&sch->refcnt, 1);
...@@ -1222,6 +1230,7 @@ static void dev_reset_queue(struct net_device *dev, ...@@ -1222,6 +1230,7 @@ static void dev_reset_queue(struct net_device *dev,
spin_unlock_bh(qdisc_lock(qdisc)); spin_unlock_bh(qdisc_lock(qdisc));
if (nolock) { if (nolock) {
clear_bit(__QDISC_STATE_MISSED, &qdisc->state); clear_bit(__QDISC_STATE_MISSED, &qdisc->state);
clear_bit(__QDISC_STATE_DRAINING, &qdisc->state);
spin_unlock_bh(&qdisc->seqlock); spin_unlock_bh(&qdisc->seqlock);
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment