Commit 76af4d93 authored by Tejun Heo's avatar Tejun Heo

workqueue: update synchronization rules on workqueue->pwqs

Make workqueue->pwqs protected by workqueue_lock for writes and
sched-RCU protected for reads.  Lockdep assertions are added to
for_each_pwq() and first_pwq() and all their users are converted to
either hold workqueue_lock or disable preemption/irq.

alloc_and_link_pwqs() is updated to use list_add_tail_rcu() for
consistency which isn't strictly necessary as the workqueue isn't
visible.  destroy_workqueue() isn't updated to sched-RCU release pwqs.
This is okay as the workqueue should have on users left by that point.

The locking is superflous at this point.  This is to help
implementation of unbound pools/pwqs with custom attributes.

This patch doesn't introduce any behavior changes.

v2: Updated for_each_pwq() use if/else for the hidden assertion
    statement instead of just if as suggested by Lai.  This avoids
    confusing the following else clause.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
Reviewed-by: default avatarLai Jiangshan <laijs@cn.fujitsu.com>
parent 7fb98ea7
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include <linux/lockdep.h> #include <linux/lockdep.h>
#include <linux/idr.h> #include <linux/idr.h>
#include <linux/hashtable.h> #include <linux/hashtable.h>
#include <linux/rculist.h>
#include "workqueue_internal.h" #include "workqueue_internal.h"
...@@ -118,6 +119,8 @@ enum { ...@@ -118,6 +119,8 @@ enum {
* F: wq->flush_mutex protected. * F: wq->flush_mutex protected.
* *
* W: workqueue_lock protected. * W: workqueue_lock protected.
*
* R: workqueue_lock protected for writes. Sched-RCU protected for reads.
*/ */
/* struct worker is defined in workqueue_internal.h */ /* struct worker is defined in workqueue_internal.h */
...@@ -169,7 +172,7 @@ struct pool_workqueue { ...@@ -169,7 +172,7 @@ struct pool_workqueue {
int nr_active; /* L: nr of active works */ int nr_active; /* L: nr of active works */
int max_active; /* L: max active works */ int max_active; /* L: max active works */
struct list_head delayed_works; /* L: delayed works */ struct list_head delayed_works; /* L: delayed works */
struct list_head pwqs_node; /* I: node on wq->pwqs */ struct list_head pwqs_node; /* R: node on wq->pwqs */
struct list_head mayday_node; /* W: node on wq->maydays */ struct list_head mayday_node; /* W: node on wq->maydays */
} __aligned(1 << WORK_STRUCT_FLAG_BITS); } __aligned(1 << WORK_STRUCT_FLAG_BITS);
...@@ -189,7 +192,7 @@ struct wq_flusher { ...@@ -189,7 +192,7 @@ struct wq_flusher {
struct workqueue_struct { struct workqueue_struct {
unsigned int flags; /* W: WQ_* flags */ unsigned int flags; /* W: WQ_* flags */
struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwq's */ struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwq's */
struct list_head pwqs; /* I: all pwqs of this wq */ struct list_head pwqs; /* R: all pwqs of this wq */
struct list_head list; /* W: list of all workqueues */ struct list_head list; /* W: list of all workqueues */
struct mutex flush_mutex; /* protects wq flushing */ struct mutex flush_mutex; /* protects wq flushing */
...@@ -227,6 +230,11 @@ EXPORT_SYMBOL_GPL(system_freezable_wq); ...@@ -227,6 +230,11 @@ EXPORT_SYMBOL_GPL(system_freezable_wq);
#define CREATE_TRACE_POINTS #define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h> #include <trace/events/workqueue.h>
#define assert_rcu_or_wq_lock() \
rcu_lockdep_assert(rcu_read_lock_sched_held() || \
lockdep_is_held(&workqueue_lock), \
"sched RCU or workqueue lock should be held")
#define for_each_std_worker_pool(pool, cpu) \ #define for_each_std_worker_pool(pool, cpu) \
for ((pool) = &std_worker_pools(cpu)[0]; \ for ((pool) = &std_worker_pools(cpu)[0]; \
(pool) < &std_worker_pools(cpu)[NR_STD_WORKER_POOLS]; (pool)++) (pool) < &std_worker_pools(cpu)[NR_STD_WORKER_POOLS]; (pool)++)
...@@ -282,9 +290,18 @@ static inline int __next_wq_cpu(int cpu, const struct cpumask *mask, ...@@ -282,9 +290,18 @@ static inline int __next_wq_cpu(int cpu, const struct cpumask *mask,
* for_each_pwq - iterate through all pool_workqueues of the specified workqueue * for_each_pwq - iterate through all pool_workqueues of the specified workqueue
* @pwq: iteration cursor * @pwq: iteration cursor
* @wq: the target workqueue * @wq: the target workqueue
*
* This must be called either with workqueue_lock held or sched RCU read
* locked. If the pwq needs to be used beyond the locking in effect, the
* caller is responsible for guaranteeing that the pwq stays online.
*
* The if/else clause exists only for the lockdep assertion and can be
* ignored.
*/ */
#define for_each_pwq(pwq, wq) \ #define for_each_pwq(pwq, wq) \
list_for_each_entry((pwq), &(wq)->pwqs, pwqs_node) list_for_each_entry_rcu((pwq), &(wq)->pwqs, pwqs_node) \
if (({ assert_rcu_or_wq_lock(); false; })) { } \
else
#ifdef CONFIG_DEBUG_OBJECTS_WORK #ifdef CONFIG_DEBUG_OBJECTS_WORK
...@@ -463,9 +480,19 @@ static struct worker_pool *get_std_worker_pool(int cpu, bool highpri) ...@@ -463,9 +480,19 @@ static struct worker_pool *get_std_worker_pool(int cpu, bool highpri)
return &pools[highpri]; return &pools[highpri];
} }
/**
* first_pwq - return the first pool_workqueue of the specified workqueue
* @wq: the target workqueue
*
* This must be called either with workqueue_lock held or sched RCU read
* locked. If the pwq needs to be used beyond the locking in effect, the
* caller is responsible for guaranteeing that the pwq stays online.
*/
static struct pool_workqueue *first_pwq(struct workqueue_struct *wq) static struct pool_workqueue *first_pwq(struct workqueue_struct *wq)
{ {
return list_first_entry(&wq->pwqs, struct pool_workqueue, pwqs_node); assert_rcu_or_wq_lock();
return list_first_or_null_rcu(&wq->pwqs, struct pool_workqueue,
pwqs_node);
} }
static unsigned int work_color_to_flags(int color) static unsigned int work_color_to_flags(int color)
...@@ -2486,10 +2513,12 @@ static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq, ...@@ -2486,10 +2513,12 @@ static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
atomic_set(&wq->nr_pwqs_to_flush, 1); atomic_set(&wq->nr_pwqs_to_flush, 1);
} }
local_irq_disable();
for_each_pwq(pwq, wq) { for_each_pwq(pwq, wq) {
struct worker_pool *pool = pwq->pool; struct worker_pool *pool = pwq->pool;
spin_lock_irq(&pool->lock); spin_lock(&pool->lock);
if (flush_color >= 0) { if (flush_color >= 0) {
WARN_ON_ONCE(pwq->flush_color != -1); WARN_ON_ONCE(pwq->flush_color != -1);
...@@ -2506,9 +2535,11 @@ static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq, ...@@ -2506,9 +2535,11 @@ static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
pwq->work_color = work_color; pwq->work_color = work_color;
} }
spin_unlock_irq(&pool->lock); spin_unlock(&pool->lock);
} }
local_irq_enable();
if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_pwqs_to_flush)) if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_pwqs_to_flush))
complete(&wq->first_flusher->done); complete(&wq->first_flusher->done);
...@@ -2699,12 +2730,14 @@ void drain_workqueue(struct workqueue_struct *wq) ...@@ -2699,12 +2730,14 @@ void drain_workqueue(struct workqueue_struct *wq)
reflush: reflush:
flush_workqueue(wq); flush_workqueue(wq);
local_irq_disable();
for_each_pwq(pwq, wq) { for_each_pwq(pwq, wq) {
bool drained; bool drained;
spin_lock_irq(&pwq->pool->lock); spin_lock(&pwq->pool->lock);
drained = !pwq->nr_active && list_empty(&pwq->delayed_works); drained = !pwq->nr_active && list_empty(&pwq->delayed_works);
spin_unlock_irq(&pwq->pool->lock); spin_unlock(&pwq->pool->lock);
if (drained) if (drained)
continue; continue;
...@@ -2713,13 +2746,17 @@ void drain_workqueue(struct workqueue_struct *wq) ...@@ -2713,13 +2746,17 @@ void drain_workqueue(struct workqueue_struct *wq)
(flush_cnt % 100 == 0 && flush_cnt <= 1000)) (flush_cnt % 100 == 0 && flush_cnt <= 1000))
pr_warn("workqueue %s: flush on destruction isn't complete after %u tries\n", pr_warn("workqueue %s: flush on destruction isn't complete after %u tries\n",
wq->name, flush_cnt); wq->name, flush_cnt);
local_irq_enable();
goto reflush; goto reflush;
} }
spin_lock_irq(&workqueue_lock); spin_lock(&workqueue_lock);
if (!--wq->nr_drainers) if (!--wq->nr_drainers)
wq->flags &= ~WQ_DRAINING; wq->flags &= ~WQ_DRAINING;
spin_unlock_irq(&workqueue_lock); spin_unlock(&workqueue_lock);
local_irq_enable();
} }
EXPORT_SYMBOL_GPL(drain_workqueue); EXPORT_SYMBOL_GPL(drain_workqueue);
...@@ -3085,7 +3122,7 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq) ...@@ -3085,7 +3122,7 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
per_cpu_ptr(wq->cpu_pwqs, cpu); per_cpu_ptr(wq->cpu_pwqs, cpu);
pwq->pool = get_std_worker_pool(cpu, highpri); pwq->pool = get_std_worker_pool(cpu, highpri);
list_add_tail(&pwq->pwqs_node, &wq->pwqs); list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs);
} }
} else { } else {
struct pool_workqueue *pwq; struct pool_workqueue *pwq;
...@@ -3095,7 +3132,7 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq) ...@@ -3095,7 +3132,7 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
return -ENOMEM; return -ENOMEM;
pwq->pool = get_std_worker_pool(WORK_CPU_UNBOUND, highpri); pwq->pool = get_std_worker_pool(WORK_CPU_UNBOUND, highpri);
list_add_tail(&pwq->pwqs_node, &wq->pwqs); list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs);
} }
return 0; return 0;
...@@ -3172,6 +3209,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt, ...@@ -3172,6 +3209,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
if (alloc_and_link_pwqs(wq) < 0) if (alloc_and_link_pwqs(wq) < 0)
goto err; goto err;
local_irq_disable();
for_each_pwq(pwq, wq) { for_each_pwq(pwq, wq) {
BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK); BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
pwq->wq = wq; pwq->wq = wq;
...@@ -3180,6 +3218,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt, ...@@ -3180,6 +3218,7 @@ struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
INIT_LIST_HEAD(&pwq->delayed_works); INIT_LIST_HEAD(&pwq->delayed_works);
INIT_LIST_HEAD(&pwq->mayday_node); INIT_LIST_HEAD(&pwq->mayday_node);
} }
local_irq_enable();
if (flags & WQ_RESCUER) { if (flags & WQ_RESCUER) {
struct worker *rescuer; struct worker *rescuer;
...@@ -3237,24 +3276,32 @@ void destroy_workqueue(struct workqueue_struct *wq) ...@@ -3237,24 +3276,32 @@ void destroy_workqueue(struct workqueue_struct *wq)
/* drain it before proceeding with destruction */ /* drain it before proceeding with destruction */
drain_workqueue(wq); drain_workqueue(wq);
spin_lock_irq(&workqueue_lock);
/* sanity checks */ /* sanity checks */
for_each_pwq(pwq, wq) { for_each_pwq(pwq, wq) {
int i; int i;
for (i = 0; i < WORK_NR_COLORS; i++) for (i = 0; i < WORK_NR_COLORS; i++) {
if (WARN_ON(pwq->nr_in_flight[i])) if (WARN_ON(pwq->nr_in_flight[i])) {
spin_unlock_irq(&workqueue_lock);
return; return;
}
}
if (WARN_ON(pwq->nr_active) || if (WARN_ON(pwq->nr_active) ||
WARN_ON(!list_empty(&pwq->delayed_works))) WARN_ON(!list_empty(&pwq->delayed_works))) {
spin_unlock_irq(&workqueue_lock);
return; return;
} }
}
/* /*
* wq list is used to freeze wq, remove from list after * wq list is used to freeze wq, remove from list after
* flushing is complete in case freeze races us. * flushing is complete in case freeze races us.
*/ */
spin_lock_irq(&workqueue_lock);
list_del(&wq->list); list_del(&wq->list);
spin_unlock_irq(&workqueue_lock); spin_unlock_irq(&workqueue_lock);
if (wq->flags & WQ_RESCUER) { if (wq->flags & WQ_RESCUER) {
...@@ -3338,13 +3385,19 @@ EXPORT_SYMBOL_GPL(workqueue_set_max_active); ...@@ -3338,13 +3385,19 @@ EXPORT_SYMBOL_GPL(workqueue_set_max_active);
bool workqueue_congested(int cpu, struct workqueue_struct *wq) bool workqueue_congested(int cpu, struct workqueue_struct *wq)
{ {
struct pool_workqueue *pwq; struct pool_workqueue *pwq;
bool ret;
preempt_disable();
if (!(wq->flags & WQ_UNBOUND)) if (!(wq->flags & WQ_UNBOUND))
pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
else else
pwq = first_pwq(wq); pwq = first_pwq(wq);
return !list_empty(&pwq->delayed_works); ret = !list_empty(&pwq->delayed_works);
preempt_enable();
return ret;
} }
EXPORT_SYMBOL_GPL(workqueue_congested); EXPORT_SYMBOL_GPL(workqueue_congested);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment