Commit 7a153655 authored by David S. Miller's avatar David S. Miller

Merge branch 'Refactor-classifier-API-to-work-with-Qdisc-blocks-without-rtnl-lock'

Vlad Buslov says:

====================
Refactor classifier API to work with Qdisc/blocks without rtnl lock

Currently, all netlink protocol handlers for updating rules, actions and
qdiscs are protected with single global rtnl lock which removes any
possibility for parallelism. This patch set is a third step to remove
rtnl lock dependency from TC rules update path.

Recently, new rtnl registration flag RTNL_FLAG_DOIT_UNLOCKED was added.
Handlers registered with this flag are called without RTNL taken. End
goal is to have rule update handlers(RTM_NEWTFILTER, RTM_DELTFILTER,
etc.) to be registered with UNLOCKED flag to allow parallel execution.
However, there is no intention to completely remove or split rtnl lock
itself. This patch set addresses specific problems in implementation of
classifiers API that prevent its control path from being executed
concurrently. Additional changes are required to refactor classifiers
API and individual classifiers for parallel execution. This patch set
lays groundwork to eventually register rule update handlers as
rtnl-unlocked by modifying code in cls API that works with Qdiscs and
blocks. Following patch set does the same for chains and classifiers.

The goal of this change is to refactor tcf_block_find() and its
dependencies to allow concurrent execution:
- Extend Qdisc API with rcu to lookup and take reference to Qdisc
  without relying on rtnl lock.
- Extend tcf_block with atomic reference counting and rcu.
- Always take reference to tcf_block while working with it.
- Implement tcf_block_release() to release resources obtained by
  tcf_block_find()
- Create infrastructure to allow registering Qdiscs with class ops that
  do not require the caller to hold rtnl lock.

All three netlink rule update handlers use tcf_block_find() to lookup
Qdisc and block, and this patch set introduces additional means of
synchronization to substitute rtnl lock in cls API.

Some functions in cls and sch APIs have historic names that no longer
clearly describe their intent. In order not make this code even more
confusing when introducing their concurrency-friendly versions, rename
these functions to describe actual implementation.

Changes from V2 to V3:
- Patch 1:
  - Explicitly include refcount.h in rtnetlink.h.
- Patch 3:
  - Move rcu_head field to the end of struct Qdisc.
  - Rearrange local variable declarations in qdisc_lookup_rcu().
- Patch 5:
  - Remove tcf_qdisc_put() and inline its content to callers.

Changes from V1 to V2:
- Rebase on latest net-next.
- Patch 8 - remove.
- Patch 9 - fold into patch 11.
- Patch 11:
  - Rename tcf_block_{get|put}() to tcf_block_refcnt_{get|put}().
- Patch 13 - remove.
====================
Acked-by: default avatarCong Wang <xiyou.wangcong@gmail.com>
Signed-off-by: default avatarDavid S. Miller <davem@davemloft.net>
parents c774973e 787ce6d0
......@@ -6,6 +6,7 @@
#include <linux/mutex.h>
#include <linux/netdevice.h>
#include <linux/wait.h>
#include <linux/refcount.h>
#include <uapi/linux/rtnetlink.h>
extern int rtnetlink_send(struct sk_buff *skb, struct net *net, u32 pid, u32 group, int echo);
......@@ -34,6 +35,7 @@ extern void rtnl_unlock(void);
extern int rtnl_trylock(void);
extern int rtnl_is_locked(void);
extern int rtnl_lock_killable(void);
extern bool refcount_dec_and_rtnl_lock(refcount_t *r);
extern wait_queue_head_t netdev_unregistering_wq;
extern struct rw_semaphore pernet_ops_rwsem;
......@@ -83,6 +85,11 @@ static inline struct netdev_queue *dev_ingress_queue(struct net_device *dev)
return rtnl_dereference(dev->ingress_queue);
}
static inline struct netdev_queue *dev_ingress_queue_rcu(struct net_device *dev)
{
return rcu_dereference(dev->ingress_queue);
}
struct netdev_queue *dev_ingress_queue_create(struct net_device *dev);
#ifdef CONFIG_NET_INGRESS
......
......@@ -102,6 +102,7 @@ int qdisc_set_default(const char *id);
void qdisc_hash_add(struct Qdisc *q, bool invisible);
void qdisc_hash_del(struct Qdisc *q);
struct Qdisc *qdisc_lookup(struct net_device *dev, u32 handle);
struct Qdisc *qdisc_lookup_rcu(struct net_device *dev, u32 handle);
struct qdisc_rate_table *qdisc_get_rtab(struct tc_ratespec *r,
struct nlattr *tab,
struct netlink_ext_ack *extack);
......
......@@ -105,6 +105,7 @@ struct Qdisc {
spinlock_t busylock ____cacheline_aligned_in_smp;
spinlock_t seqlock;
struct rcu_head rcu;
};
static inline void qdisc_refcount_inc(struct Qdisc *qdisc)
......@@ -114,6 +115,19 @@ static inline void qdisc_refcount_inc(struct Qdisc *qdisc)
refcount_inc(&qdisc->refcnt);
}
/* Intended to be used by unlocked users, when concurrent qdisc release is
* possible.
*/
static inline struct Qdisc *qdisc_refcount_inc_nz(struct Qdisc *qdisc)
{
if (qdisc->flags & TCQ_F_BUILTIN)
return qdisc;
if (refcount_inc_not_zero(&qdisc->refcnt))
return qdisc;
return NULL;
}
static inline bool qdisc_is_running(struct Qdisc *qdisc)
{
if (qdisc->flags & TCQ_F_NOLOCK)
......@@ -331,7 +345,7 @@ struct tcf_chain {
struct tcf_block {
struct list_head chain_list;
u32 index; /* block index for shared blocks */
unsigned int refcnt;
refcount_t refcnt;
struct net *net;
struct Qdisc *q;
struct list_head cb_list;
......@@ -343,6 +357,7 @@ struct tcf_block {
struct tcf_chain *chain;
struct list_head filter_chain_list;
} chain0;
struct rcu_head rcu;
};
static inline void tcf_block_offload_inc(struct tcf_block *block, u32 *flags)
......@@ -554,7 +569,8 @@ void dev_deactivate_many(struct list_head *head);
struct Qdisc *dev_graft_qdisc(struct netdev_queue *dev_queue,
struct Qdisc *qdisc);
void qdisc_reset(struct Qdisc *qdisc);
void qdisc_destroy(struct Qdisc *qdisc);
void qdisc_put(struct Qdisc *qdisc);
void qdisc_put_unlocked(struct Qdisc *qdisc);
void qdisc_tree_reduce_backlog(struct Qdisc *qdisc, unsigned int n,
unsigned int len);
struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
......
......@@ -130,6 +130,12 @@ int rtnl_is_locked(void)
}
EXPORT_SYMBOL(rtnl_is_locked);
bool refcount_dec_and_rtnl_lock(refcount_t *r)
{
return refcount_dec_and_mutex_lock(r, &rtnl_mutex);
}
EXPORT_SYMBOL(refcount_dec_and_rtnl_lock);
#ifdef CONFIG_PROVE_LOCKING
bool lockdep_rtnl_is_held(void)
{
......
This diff is collapsed.
......@@ -314,6 +314,24 @@ struct Qdisc *qdisc_lookup(struct net_device *dev, u32 handle)
return q;
}
struct Qdisc *qdisc_lookup_rcu(struct net_device *dev, u32 handle)
{
struct netdev_queue *nq;
struct Qdisc *q;
if (!handle)
return NULL;
q = qdisc_match_from_root(dev->qdisc, handle);
if (q)
goto out;
nq = dev_ingress_queue_rcu(dev);
if (nq)
q = qdisc_match_from_root(nq->qdisc_sleeping, handle);
out:
return q;
}
static struct Qdisc *qdisc_leaf(struct Qdisc *p, u32 classid)
{
unsigned long cl;
......@@ -920,7 +938,7 @@ static void notify_and_destroy(struct net *net, struct sk_buff *skb,
qdisc_notify(net, skb, n, clid, old, new);
if (old)
qdisc_destroy(old);
qdisc_put(old);
}
/* Graft qdisc "new" to class "classid" of qdisc "parent" or
......@@ -973,7 +991,7 @@ static int qdisc_graft(struct net_device *dev, struct Qdisc *parent,
qdisc_refcount_inc(new);
if (!ingress)
qdisc_destroy(old);
qdisc_put(old);
}
skip:
......@@ -1561,7 +1579,7 @@ static int tc_modify_qdisc(struct sk_buff *skb, struct nlmsghdr *n,
err = qdisc_graft(dev, p, skb, n, clid, q, NULL, extack);
if (err) {
if (q)
qdisc_destroy(q);
qdisc_put(q);
return err;
}
......
......@@ -150,7 +150,7 @@ static void atm_tc_put(struct Qdisc *sch, unsigned long cl)
pr_debug("atm_tc_put: destroying\n");
list_del_init(&flow->list);
pr_debug("atm_tc_put: qdisc %p\n", flow->q);
qdisc_destroy(flow->q);
qdisc_put(flow->q);
tcf_block_put(flow->block);
if (flow->sock) {
pr_debug("atm_tc_put: f_count %ld\n",
......
......@@ -1418,7 +1418,7 @@ static void cbq_destroy_class(struct Qdisc *sch, struct cbq_class *cl)
WARN_ON(cl->filters);
tcf_block_put(cl->block);
qdisc_destroy(cl->q);
qdisc_put(cl->q);
qdisc_put_rtab(cl->R_tab);
gen_kill_estimator(&cl->rate_est);
if (cl != &q->link)
......
......@@ -379,7 +379,7 @@ static void cbs_destroy(struct Qdisc *sch)
cbs_disable_offload(dev, q);
if (q->qdisc)
qdisc_destroy(q->qdisc);
qdisc_put(q->qdisc);
}
static int cbs_dump(struct Qdisc *sch, struct sk_buff *skb)
......
......@@ -134,7 +134,7 @@ static int drr_change_class(struct Qdisc *sch, u32 classid, u32 parentid,
tca[TCA_RATE]);
if (err) {
NL_SET_ERR_MSG(extack, "Failed to replace estimator");
qdisc_destroy(cl->qdisc);
qdisc_put(cl->qdisc);
kfree(cl);
return err;
}
......@@ -153,7 +153,7 @@ static int drr_change_class(struct Qdisc *sch, u32 classid, u32 parentid,
static void drr_destroy_class(struct Qdisc *sch, struct drr_class *cl)
{
gen_kill_estimator(&cl->rate_est);
qdisc_destroy(cl->qdisc);
qdisc_put(cl->qdisc);
kfree(cl);
}
......
......@@ -412,7 +412,7 @@ static void dsmark_destroy(struct Qdisc *sch)
pr_debug("%s(sch %p,[qdisc %p])\n", __func__, sch, p);
tcf_block_put(p->block);
qdisc_destroy(p->q);
qdisc_put(p->q);
if (p->mv != p->embedded)
kfree(p->mv);
}
......
......@@ -177,7 +177,7 @@ struct Qdisc *fifo_create_dflt(struct Qdisc *sch, struct Qdisc_ops *ops,
if (q) {
err = fifo_set_limit(q, limit);
if (err < 0) {
qdisc_destroy(q);
qdisc_put(q);
q = NULL;
}
}
......
......@@ -901,7 +901,7 @@ struct Qdisc *qdisc_create_dflt(struct netdev_queue *dev_queue,
if (!ops->init || ops->init(sch, NULL, extack) == 0)
return sch;
qdisc_destroy(sch);
qdisc_put(sch);
return NULL;
}
EXPORT_SYMBOL(qdisc_create_dflt);
......@@ -941,15 +941,18 @@ void qdisc_free(struct Qdisc *qdisc)
kfree((char *) qdisc - qdisc->padded);
}
void qdisc_destroy(struct Qdisc *qdisc)
void qdisc_free_cb(struct rcu_head *head)
{
struct Qdisc *q = container_of(head, struct Qdisc, rcu);
qdisc_free(q);
}
static void qdisc_destroy(struct Qdisc *qdisc)
{
const struct Qdisc_ops *ops = qdisc->ops;
struct sk_buff *skb, *tmp;
if (qdisc->flags & TCQ_F_BUILTIN ||
!refcount_dec_and_test(&qdisc->refcnt))
return;
#ifdef CONFIG_NET_SCHED
qdisc_hash_del(qdisc);
......@@ -974,9 +977,34 @@ void qdisc_destroy(struct Qdisc *qdisc)
kfree_skb_list(skb);
}
qdisc_free(qdisc);
call_rcu(&qdisc->rcu, qdisc_free_cb);
}
void qdisc_put(struct Qdisc *qdisc)
{
if (qdisc->flags & TCQ_F_BUILTIN ||
!refcount_dec_and_test(&qdisc->refcnt))
return;
qdisc_destroy(qdisc);
}
EXPORT_SYMBOL(qdisc_put);
/* Version of qdisc_put() that is called with rtnl mutex unlocked.
* Intended to be used as optimization, this function only takes rtnl lock if
* qdisc reference counter reached zero.
*/
void qdisc_put_unlocked(struct Qdisc *qdisc)
{
if (qdisc->flags & TCQ_F_BUILTIN ||
!refcount_dec_and_rtnl_lock(&qdisc->refcnt))
return;
qdisc_destroy(qdisc);
rtnl_unlock();
}
EXPORT_SYMBOL(qdisc_destroy);
EXPORT_SYMBOL(qdisc_put_unlocked);
/* Attach toplevel qdisc to device queue. */
struct Qdisc *dev_graft_qdisc(struct netdev_queue *dev_queue,
......@@ -1270,7 +1298,7 @@ static void shutdown_scheduler_queue(struct net_device *dev,
rcu_assign_pointer(dev_queue->qdisc, qdisc_default);
dev_queue->qdisc_sleeping = qdisc_default;
qdisc_destroy(qdisc);
qdisc_put(qdisc);
}
}
......@@ -1279,7 +1307,7 @@ void dev_shutdown(struct net_device *dev)
netdev_for_each_tx_queue(dev, shutdown_scheduler_queue, &noop_qdisc);
if (dev_ingress_queue(dev))
shutdown_scheduler_queue(dev, dev_ingress_queue(dev), &noop_qdisc);
qdisc_destroy(dev->qdisc);
qdisc_put(dev->qdisc);
dev->qdisc = &noop_qdisc;
WARN_ON(timer_pending(&dev->watchdog_timer));
......
......@@ -1092,7 +1092,7 @@ hfsc_destroy_class(struct Qdisc *sch, struct hfsc_class *cl)
struct hfsc_sched *q = qdisc_priv(sch);
tcf_block_put(cl->block);
qdisc_destroy(cl->qdisc);
qdisc_put(cl->qdisc);
gen_kill_estimator(&cl->rate_est);
if (cl != &q->root)
kfree(cl);
......
......@@ -1208,7 +1208,7 @@ static void htb_destroy_class(struct Qdisc *sch, struct htb_class *cl)
{
if (!cl->level) {
WARN_ON(!cl->leaf.q);
qdisc_destroy(cl->leaf.q);
qdisc_put(cl->leaf.q);
}
gen_kill_estimator(&cl->rate_est);
tcf_block_put(cl->block);
......@@ -1409,7 +1409,7 @@ static int htb_change_class(struct Qdisc *sch, u32 classid,
/* turn parent into inner node */
qdisc_reset(parent->leaf.q);
qdisc_tree_reduce_backlog(parent->leaf.q, qlen, backlog);
qdisc_destroy(parent->leaf.q);
qdisc_put(parent->leaf.q);
if (parent->prio_activity)
htb_deactivate(q, parent);
......
......@@ -65,7 +65,7 @@ static void mq_destroy(struct Qdisc *sch)
if (!priv->qdiscs)
return;
for (ntx = 0; ntx < dev->num_tx_queues && priv->qdiscs[ntx]; ntx++)
qdisc_destroy(priv->qdiscs[ntx]);
qdisc_put(priv->qdiscs[ntx]);
kfree(priv->qdiscs);
}
......@@ -119,7 +119,7 @@ static void mq_attach(struct Qdisc *sch)
qdisc = priv->qdiscs[ntx];
old = dev_graft_qdisc(qdisc->dev_queue, qdisc);
if (old)
qdisc_destroy(old);
qdisc_put(old);
#ifdef CONFIG_NET_SCHED
if (ntx < dev->real_num_tx_queues)
qdisc_hash_add(qdisc, false);
......
......@@ -40,7 +40,7 @@ static void mqprio_destroy(struct Qdisc *sch)
for (ntx = 0;
ntx < dev->num_tx_queues && priv->qdiscs[ntx];
ntx++)
qdisc_destroy(priv->qdiscs[ntx]);
qdisc_put(priv->qdiscs[ntx]);
kfree(priv->qdiscs);
}
......@@ -300,7 +300,7 @@ static void mqprio_attach(struct Qdisc *sch)
qdisc = priv->qdiscs[ntx];
old = dev_graft_qdisc(qdisc->dev_queue, qdisc);
if (old)
qdisc_destroy(old);
qdisc_put(old);
if (ntx < dev->real_num_tx_queues)
qdisc_hash_add(qdisc, false);
}
......
......@@ -175,7 +175,7 @@ multiq_destroy(struct Qdisc *sch)
tcf_block_put(q->block);
for (band = 0; band < q->bands; band++)
qdisc_destroy(q->queues[band]);
qdisc_put(q->queues[band]);
kfree(q->queues);
}
......@@ -204,7 +204,7 @@ static int multiq_tune(struct Qdisc *sch, struct nlattr *opt,
q->queues[i] = &noop_qdisc;
qdisc_tree_reduce_backlog(child, child->q.qlen,
child->qstats.backlog);
qdisc_destroy(child);
qdisc_put(child);
}
}
......@@ -228,7 +228,7 @@ static int multiq_tune(struct Qdisc *sch, struct nlattr *opt,
qdisc_tree_reduce_backlog(old,
old->q.qlen,
old->qstats.backlog);
qdisc_destroy(old);
qdisc_put(old);
}
sch_tree_unlock(sch);
}
......
......@@ -1022,7 +1022,7 @@ static void netem_destroy(struct Qdisc *sch)
qdisc_watchdog_cancel(&q->watchdog);
if (q->qdisc)
qdisc_destroy(q->qdisc);
qdisc_put(q->qdisc);
dist_free(q->delay_dist);
dist_free(q->slot_dist);
}
......
......@@ -175,7 +175,7 @@ prio_destroy(struct Qdisc *sch)
tcf_block_put(q->block);
prio_offload(sch, NULL);
for (prio = 0; prio < q->bands; prio++)
qdisc_destroy(q->queues[prio]);
qdisc_put(q->queues[prio]);
}
static int prio_tune(struct Qdisc *sch, struct nlattr *opt,
......@@ -205,7 +205,7 @@ static int prio_tune(struct Qdisc *sch, struct nlattr *opt,
extack);
if (!queues[i]) {
while (i > oldbands)
qdisc_destroy(queues[--i]);
qdisc_put(queues[--i]);
return -ENOMEM;
}
}
......@@ -220,7 +220,7 @@ static int prio_tune(struct Qdisc *sch, struct nlattr *opt,
qdisc_tree_reduce_backlog(child, child->q.qlen,
child->qstats.backlog);
qdisc_destroy(child);
qdisc_put(child);
}
for (i = oldbands; i < q->bands; i++) {
......
......@@ -526,7 +526,7 @@ static int qfq_change_class(struct Qdisc *sch, u32 classid, u32 parentid,
return 0;
destroy_class:
qdisc_destroy(cl->qdisc);
qdisc_put(cl->qdisc);
kfree(cl);
return err;
}
......@@ -537,7 +537,7 @@ static void qfq_destroy_class(struct Qdisc *sch, struct qfq_class *cl)
qfq_rm_from_agg(q, cl);
gen_kill_estimator(&cl->rate_est);
qdisc_destroy(cl->qdisc);
qdisc_put(cl->qdisc);
kfree(cl);
}
......
......@@ -181,7 +181,7 @@ static void red_destroy(struct Qdisc *sch)
del_timer_sync(&q->adapt_timer);
red_offload(sch, false);
qdisc_destroy(q->qdisc);
qdisc_put(q->qdisc);
}
static const struct nla_policy red_policy[TCA_RED_MAX + 1] = {
......@@ -233,7 +233,7 @@ static int red_change(struct Qdisc *sch, struct nlattr *opt,
if (child) {
qdisc_tree_reduce_backlog(q->qdisc, q->qdisc->q.qlen,
q->qdisc->qstats.backlog);
qdisc_destroy(q->qdisc);
qdisc_put(q->qdisc);
q->qdisc = child;
}
......
......@@ -469,7 +469,7 @@ static void sfb_destroy(struct Qdisc *sch)
struct sfb_sched_data *q = qdisc_priv(sch);
tcf_block_put(q->block);
qdisc_destroy(q->qdisc);
qdisc_put(q->qdisc);
}
static const struct nla_policy sfb_policy[TCA_SFB_MAX + 1] = {
......@@ -523,7 +523,7 @@ static int sfb_change(struct Qdisc *sch, struct nlattr *opt,
qdisc_tree_reduce_backlog(q->qdisc, q->qdisc->q.qlen,
q->qdisc->qstats.backlog);
qdisc_destroy(q->qdisc);
qdisc_put(q->qdisc);
q->qdisc = child;
q->rehash_interval = msecs_to_jiffies(ctl->rehash_interval);
......
......@@ -392,7 +392,7 @@ static int tbf_change(struct Qdisc *sch, struct nlattr *opt,
if (child) {
qdisc_tree_reduce_backlog(q->qdisc, q->qdisc->q.qlen,
q->qdisc->qstats.backlog);
qdisc_destroy(q->qdisc);
qdisc_put(q->qdisc);
q->qdisc = child;
}
q->limit = qopt->limit;
......@@ -438,7 +438,7 @@ static void tbf_destroy(struct Qdisc *sch)
struct tbf_sched_data *q = qdisc_priv(sch);
qdisc_watchdog_cancel(&q->watchdog);
qdisc_destroy(q->qdisc);
qdisc_put(q->qdisc);
}
static int tbf_dump(struct Qdisc *sch, struct sk_buff *skb)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment