Commit b02c520f authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'wq-for-6.11' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq

Pull workqueue updates from Tejun Heo:

 - Lai fixed a bug where CPU hotplug and workqueue attribute changes
   race leaving some workqueues not fully updated. This involved
   refactoring and changing how online CPUs are tracked. The resulting
   code is cleaner.

 - Workqueue watchdog touch operation was causing too much cacheline
   contention on very large machines. Nicholas improved scalabililty by
   avoiding unnecessary global updates.

 - Code cleanups and minor rescuer behavior improvement.

 - The last commit 58629d48 ("workqueue: Always queue work items to
   the newest PWQ for order workqueues") is a cherry-picked straggler
   commit from for-6.10-fixes, a fix for a bug which may not actually
   trigger.

* tag 'wq-for-6.11' of git://git.kernel.org/pub/scm/linux/kernel/git/tj/wq: (24 commits)
  workqueue: Always queue work items to the newest PWQ for order workqueues
  workqueue: Rename wq_update_pod() to unbound_wq_update_pwq()
  workqueue: Remove the arguments @hotplug_cpu and @online from wq_update_pod()
  workqueue: Remove the argument @cpu_going_down from wq_calc_pod_cpumask()
  workqueue: Remove the unneeded cpumask empty check in wq_calc_pod_cpumask()
  workqueue: Remove cpus_read_lock() from apply_wqattrs_lock()
  workqueue: Simplify wq_calc_pod_cpumask() with wq_online_cpumask
  workqueue: Add wq_online_cpumask
  workqueue: Init rescuer's affinities as the wq's effective cpumask
  workqueue: Put PWQ allocation and WQ enlistment in the same lock C.S.
  workqueue: Move kthread_flush_worker() out of alloc_and_link_pwqs()
  workqueue: Make rescuer initialization as the last step of the creation of a new wq
  workqueue: Register sysfs after the whole creation of the new wq
  workqueue: Simplify goto statement
  workqueue: Update cpumasks after only applying it successfully
  workqueue: Improve scalability of workqueue watchdog touch
  workqueue: wq_watchdog_touch is always called with valid CPU
  workqueue: Remove useless pool->dying_workers
  workqueue: Detach workers directly in idle_cull_fn()
  workqueue: Don't bind the rescuer in the last working cpu
  ...
parents 895b9b12 58629d48
...@@ -216,8 +216,6 @@ struct worker_pool { ...@@ -216,8 +216,6 @@ struct worker_pool {
struct worker *manager; /* L: purely informational */ struct worker *manager; /* L: purely informational */
struct list_head workers; /* A: attached workers */ struct list_head workers; /* A: attached workers */
struct list_head dying_workers; /* A: workers about to die */
struct completion *detach_completion; /* all workers detached */
struct ida worker_ida; /* worker IDs for task name */ struct ida worker_ida; /* worker IDs for task name */
...@@ -436,7 +434,7 @@ static struct wq_pod_type wq_pod_types[WQ_AFFN_NR_TYPES]; ...@@ -436,7 +434,7 @@ static struct wq_pod_type wq_pod_types[WQ_AFFN_NR_TYPES];
static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE; static enum wq_affn_scope wq_affn_dfl = WQ_AFFN_CACHE;
/* buf for wq_update_unbound_pod_attrs(), protected by CPU hotplug exclusion */ /* buf for wq_update_unbound_pod_attrs(), protected by CPU hotplug exclusion */
static struct workqueue_attrs *wq_update_pod_attrs_buf; static struct workqueue_attrs *unbound_wq_update_pwq_attrs_buf;
static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */ static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */
static DEFINE_MUTEX(wq_pool_attach_mutex); /* protects worker attach/detach */ static DEFINE_MUTEX(wq_pool_attach_mutex); /* protects worker attach/detach */
...@@ -447,6 +445,9 @@ static struct rcuwait manager_wait = __RCUWAIT_INITIALIZER(manager_wait); ...@@ -447,6 +445,9 @@ static struct rcuwait manager_wait = __RCUWAIT_INITIALIZER(manager_wait);
static LIST_HEAD(workqueues); /* PR: list of all workqueues */ static LIST_HEAD(workqueues); /* PR: list of all workqueues */
static bool workqueue_freezing; /* PL: have wqs started freezing? */ static bool workqueue_freezing; /* PL: have wqs started freezing? */
/* PL: mirror the cpu_online_mask excluding the CPU in the midst of hotplugging */
static cpumask_var_t wq_online_cpumask;
/* PL&A: allowable cpus for unbound wqs and work items */ /* PL&A: allowable cpus for unbound wqs and work items */
static cpumask_var_t wq_unbound_cpumask; static cpumask_var_t wq_unbound_cpumask;
...@@ -1684,33 +1685,6 @@ static void __pwq_activate_work(struct pool_workqueue *pwq, ...@@ -1684,33 +1685,6 @@ static void __pwq_activate_work(struct pool_workqueue *pwq,
__clear_bit(WORK_STRUCT_INACTIVE_BIT, wdb); __clear_bit(WORK_STRUCT_INACTIVE_BIT, wdb);
} }
/**
* pwq_activate_work - Activate a work item if inactive
* @pwq: pool_workqueue @work belongs to
* @work: work item to activate
*
* Returns %true if activated. %false if already active.
*/
static bool pwq_activate_work(struct pool_workqueue *pwq,
struct work_struct *work)
{
struct worker_pool *pool = pwq->pool;
struct wq_node_nr_active *nna;
lockdep_assert_held(&pool->lock);
if (!(*work_data_bits(work) & WORK_STRUCT_INACTIVE))
return false;
nna = wq_node_nr_active(pwq->wq, pool->node);
if (nna)
atomic_inc(&nna->nr);
pwq->nr_active++;
__pwq_activate_work(pwq, work);
return true;
}
static bool tryinc_node_nr_active(struct wq_node_nr_active *nna) static bool tryinc_node_nr_active(struct wq_node_nr_active *nna)
{ {
int max = READ_ONCE(nna->max); int max = READ_ONCE(nna->max);
...@@ -2117,7 +2091,7 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags, ...@@ -2117,7 +2091,7 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
*/ */
pwq = get_work_pwq(work); pwq = get_work_pwq(work);
if (pwq && pwq->pool == pool) { if (pwq && pwq->pool == pool) {
unsigned long work_data; unsigned long work_data = *work_data_bits(work);
debug_work_deactivate(work); debug_work_deactivate(work);
...@@ -2126,13 +2100,17 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags, ...@@ -2126,13 +2100,17 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
* pwq->inactive_works since a queued barrier can't be * pwq->inactive_works since a queued barrier can't be
* canceled (see the comments in insert_wq_barrier()). * canceled (see the comments in insert_wq_barrier()).
* *
* An inactive work item cannot be grabbed directly because * An inactive work item cannot be deleted directly because
* it might have linked barrier work items which, if left * it might have linked barrier work items which, if left
* on the inactive_works list, will confuse pwq->nr_active * on the inactive_works list, will confuse pwq->nr_active
* management later on and cause stall. Make sure the work * management later on and cause stall. Move the linked
* item is activated before grabbing. * barrier work items to the worklist when deleting the grabbed
* item. Also keep WORK_STRUCT_INACTIVE in work_data, so that
* it doesn't participate in nr_active management in later
* pwq_dec_nr_in_flight().
*/ */
pwq_activate_work(pwq, work); if (work_data & WORK_STRUCT_INACTIVE)
move_linked_works(work, &pwq->pool->worklist, NULL);
list_del_init(&work->entry); list_del_init(&work->entry);
...@@ -2140,7 +2118,6 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags, ...@@ -2140,7 +2118,6 @@ static int try_to_grab_pending(struct work_struct *work, u32 cflags,
* work->data points to pwq iff queued. Let's point to pool. As * work->data points to pwq iff queued. Let's point to pool. As
* this destroys work->data needed by the next step, stash it. * this destroys work->data needed by the next step, stash it.
*/ */
work_data = *work_data_bits(work);
set_work_pool_and_keep_pending(work, pool->id, set_work_pool_and_keep_pending(work, pool->id,
pool_offq_flags(pool)); pool_offq_flags(pool));
...@@ -2298,9 +2275,13 @@ static void __queue_work(int cpu, struct workqueue_struct *wq, ...@@ -2298,9 +2275,13 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
* If @work was previously on a different pool, it might still be * If @work was previously on a different pool, it might still be
* running there, in which case the work needs to be queued on that * running there, in which case the work needs to be queued on that
* pool to guarantee non-reentrancy. * pool to guarantee non-reentrancy.
*
* For ordered workqueue, work items must be queued on the newest pwq
* for accurate order management. Guaranteed order also guarantees
* non-reentrancy. See the comments above unplug_oldest_pwq().
*/ */
last_pool = get_work_pool(work); last_pool = get_work_pool(work);
if (last_pool && last_pool != pool) { if (last_pool && last_pool != pool && !(wq->flags & __WQ_ORDERED)) {
struct worker *worker; struct worker *worker;
raw_spin_lock(&last_pool->lock); raw_spin_lock(&last_pool->lock);
...@@ -2710,6 +2691,27 @@ static void worker_attach_to_pool(struct worker *worker, ...@@ -2710,6 +2691,27 @@ static void worker_attach_to_pool(struct worker *worker,
mutex_unlock(&wq_pool_attach_mutex); mutex_unlock(&wq_pool_attach_mutex);
} }
static void unbind_worker(struct worker *worker)
{
lockdep_assert_held(&wq_pool_attach_mutex);
kthread_set_per_cpu(worker->task, -1);
if (cpumask_intersects(wq_unbound_cpumask, cpu_active_mask))
WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task, wq_unbound_cpumask) < 0);
else
WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task, cpu_possible_mask) < 0);
}
static void detach_worker(struct worker *worker)
{
lockdep_assert_held(&wq_pool_attach_mutex);
unbind_worker(worker);
list_del(&worker->node);
worker->pool = NULL;
}
/** /**
* worker_detach_from_pool() - detach a worker from its pool * worker_detach_from_pool() - detach a worker from its pool
* @worker: worker which is attached to its pool * @worker: worker which is attached to its pool
...@@ -2721,26 +2723,16 @@ static void worker_attach_to_pool(struct worker *worker, ...@@ -2721,26 +2723,16 @@ static void worker_attach_to_pool(struct worker *worker,
static void worker_detach_from_pool(struct worker *worker) static void worker_detach_from_pool(struct worker *worker)
{ {
struct worker_pool *pool = worker->pool; struct worker_pool *pool = worker->pool;
struct completion *detach_completion = NULL;
/* there is one permanent BH worker per CPU which should never detach */ /* there is one permanent BH worker per CPU which should never detach */
WARN_ON_ONCE(pool->flags & POOL_BH); WARN_ON_ONCE(pool->flags & POOL_BH);
mutex_lock(&wq_pool_attach_mutex); mutex_lock(&wq_pool_attach_mutex);
detach_worker(worker);
kthread_set_per_cpu(worker->task, -1);
list_del(&worker->node);
worker->pool = NULL;
if (list_empty(&pool->workers) && list_empty(&pool->dying_workers))
detach_completion = pool->detach_completion;
mutex_unlock(&wq_pool_attach_mutex); mutex_unlock(&wq_pool_attach_mutex);
/* clear leftover flags without pool->lock after it is detached */ /* clear leftover flags without pool->lock after it is detached */
worker->flags &= ~(WORKER_UNBOUND | WORKER_REBOUND); worker->flags &= ~(WORKER_UNBOUND | WORKER_REBOUND);
if (detach_completion)
complete(detach_completion);
} }
static int format_worker_id(char *buf, size_t size, struct worker *worker, static int format_worker_id(char *buf, size_t size, struct worker *worker,
...@@ -2844,35 +2836,22 @@ static struct worker *create_worker(struct worker_pool *pool) ...@@ -2844,35 +2836,22 @@ static struct worker *create_worker(struct worker_pool *pool)
return NULL; return NULL;
} }
static void unbind_worker(struct worker *worker) static void detach_dying_workers(struct list_head *cull_list)
{ {
lockdep_assert_held(&wq_pool_attach_mutex); struct worker *worker;
kthread_set_per_cpu(worker->task, -1); list_for_each_entry(worker, cull_list, entry)
if (cpumask_intersects(wq_unbound_cpumask, cpu_active_mask)) detach_worker(worker);
WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task, wq_unbound_cpumask) < 0);
else
WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task, cpu_possible_mask) < 0);
} }
static void wake_dying_workers(struct list_head *cull_list) static void reap_dying_workers(struct list_head *cull_list)
{ {
struct worker *worker, *tmp; struct worker *worker, *tmp;
list_for_each_entry_safe(worker, tmp, cull_list, entry) { list_for_each_entry_safe(worker, tmp, cull_list, entry) {
list_del_init(&worker->entry); list_del_init(&worker->entry);
unbind_worker(worker); kthread_stop_put(worker->task);
/* kfree(worker);
* If the worker was somehow already running, then it had to be
* in pool->idle_list when set_worker_dying() happened or we
* wouldn't have gotten here.
*
* Thus, the worker must either have observed the WORKER_DIE
* flag, or have set its state to TASK_IDLE. Either way, the
* below will be observed by the worker and is safe to do
* outside of pool->lock.
*/
wake_up_process(worker->task);
} }
} }
...@@ -2906,7 +2885,9 @@ static void set_worker_dying(struct worker *worker, struct list_head *list) ...@@ -2906,7 +2885,9 @@ static void set_worker_dying(struct worker *worker, struct list_head *list)
worker->flags |= WORKER_DIE; worker->flags |= WORKER_DIE;
list_move(&worker->entry, list); list_move(&worker->entry, list);
list_move(&worker->node, &pool->dying_workers);
/* get an extra task struct reference for later kthread_stop_put() */
get_task_struct(worker->task);
} }
/** /**
...@@ -2965,9 +2946,9 @@ static void idle_cull_fn(struct work_struct *work) ...@@ -2965,9 +2946,9 @@ static void idle_cull_fn(struct work_struct *work)
/* /*
* Grabbing wq_pool_attach_mutex here ensures an already-running worker * Grabbing wq_pool_attach_mutex here ensures an already-running worker
* cannot proceed beyong worker_detach_from_pool() in its self-destruct * cannot proceed beyong set_pf_worker() in its self-destruct path.
* path. This is required as a previously-preempted worker could run after * This is required as a previously-preempted worker could run after
* set_worker_dying() has happened but before wake_dying_workers() did. * set_worker_dying() has happened but before detach_dying_workers() did.
*/ */
mutex_lock(&wq_pool_attach_mutex); mutex_lock(&wq_pool_attach_mutex);
raw_spin_lock_irq(&pool->lock); raw_spin_lock_irq(&pool->lock);
...@@ -2988,8 +2969,10 @@ static void idle_cull_fn(struct work_struct *work) ...@@ -2988,8 +2969,10 @@ static void idle_cull_fn(struct work_struct *work)
} }
raw_spin_unlock_irq(&pool->lock); raw_spin_unlock_irq(&pool->lock);
wake_dying_workers(&cull_list); detach_dying_workers(&cull_list);
mutex_unlock(&wq_pool_attach_mutex); mutex_unlock(&wq_pool_attach_mutex);
reap_dying_workers(&cull_list);
} }
static void send_mayday(struct work_struct *work) static void send_mayday(struct work_struct *work)
...@@ -3368,9 +3351,7 @@ static int worker_thread(void *__worker) ...@@ -3368,9 +3351,7 @@ static int worker_thread(void *__worker)
set_pf_worker(false); set_pf_worker(false);
ida_free(&pool->worker_ida, worker->id); ida_free(&pool->worker_ida, worker->id);
worker_detach_from_pool(worker);
WARN_ON_ONCE(!list_empty(&worker->entry)); WARN_ON_ONCE(!list_empty(&worker->entry));
kfree(worker);
return 0; return 0;
} }
...@@ -4761,7 +4742,6 @@ static int init_worker_pool(struct worker_pool *pool) ...@@ -4761,7 +4742,6 @@ static int init_worker_pool(struct worker_pool *pool)
timer_setup(&pool->mayday_timer, pool_mayday_timeout, 0); timer_setup(&pool->mayday_timer, pool_mayday_timeout, 0);
INIT_LIST_HEAD(&pool->workers); INIT_LIST_HEAD(&pool->workers);
INIT_LIST_HEAD(&pool->dying_workers);
ida_init(&pool->worker_ida); ida_init(&pool->worker_ida);
INIT_HLIST_NODE(&pool->hash_node); INIT_HLIST_NODE(&pool->hash_node);
...@@ -4903,7 +4883,6 @@ static void rcu_free_pool(struct rcu_head *rcu) ...@@ -4903,7 +4883,6 @@ static void rcu_free_pool(struct rcu_head *rcu)
*/ */
static void put_unbound_pool(struct worker_pool *pool) static void put_unbound_pool(struct worker_pool *pool)
{ {
DECLARE_COMPLETION_ONSTACK(detach_completion);
struct worker *worker; struct worker *worker;
LIST_HEAD(cull_list); LIST_HEAD(cull_list);
...@@ -4955,14 +4934,11 @@ static void put_unbound_pool(struct worker_pool *pool) ...@@ -4955,14 +4934,11 @@ static void put_unbound_pool(struct worker_pool *pool)
WARN_ON(pool->nr_workers || pool->nr_idle); WARN_ON(pool->nr_workers || pool->nr_idle);
raw_spin_unlock_irq(&pool->lock); raw_spin_unlock_irq(&pool->lock);
wake_dying_workers(&cull_list); detach_dying_workers(&cull_list);
if (!list_empty(&pool->workers) || !list_empty(&pool->dying_workers))
pool->detach_completion = &detach_completion;
mutex_unlock(&wq_pool_attach_mutex); mutex_unlock(&wq_pool_attach_mutex);
if (pool->detach_completion) reap_dying_workers(&cull_list);
wait_for_completion(pool->detach_completion);
/* shut down the timers */ /* shut down the timers */
del_timer_sync(&pool->idle_timer); del_timer_sync(&pool->idle_timer);
...@@ -5038,12 +5014,6 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs) ...@@ -5038,12 +5014,6 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
return NULL; return NULL;
} }
static void rcu_free_pwq(struct rcu_head *rcu)
{
kmem_cache_free(pwq_cache,
container_of(rcu, struct pool_workqueue, rcu));
}
/* /*
* Scheduled on pwq_release_worker by put_pwq() when an unbound pwq hits zero * Scheduled on pwq_release_worker by put_pwq() when an unbound pwq hits zero
* refcnt and needs to be destroyed. * refcnt and needs to be destroyed.
...@@ -5089,7 +5059,7 @@ static void pwq_release_workfn(struct kthread_work *work) ...@@ -5089,7 +5059,7 @@ static void pwq_release_workfn(struct kthread_work *work)
raw_spin_unlock_irq(&nna->lock); raw_spin_unlock_irq(&nna->lock);
} }
call_rcu(&pwq->rcu, rcu_free_pwq); kfree_rcu(pwq, rcu);
/* /*
* If we're the last pwq going away, @wq is already dead and no one * If we're the last pwq going away, @wq is already dead and no one
...@@ -5161,14 +5131,22 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq, ...@@ -5161,14 +5131,22 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
return pwq; return pwq;
} }
static void apply_wqattrs_lock(void)
{
mutex_lock(&wq_pool_mutex);
}
static void apply_wqattrs_unlock(void)
{
mutex_unlock(&wq_pool_mutex);
}
/** /**
* wq_calc_pod_cpumask - calculate a wq_attrs' cpumask for a pod * wq_calc_pod_cpumask - calculate a wq_attrs' cpumask for a pod
* @attrs: the wq_attrs of the default pwq of the target workqueue * @attrs: the wq_attrs of the default pwq of the target workqueue
* @cpu: the target CPU * @cpu: the target CPU
* @cpu_going_down: if >= 0, the CPU to consider as offline
* *
* Calculate the cpumask a workqueue with @attrs should use on @pod. If * Calculate the cpumask a workqueue with @attrs should use on @pod.
* @cpu_going_down is >= 0, that cpu is considered offline during calculation.
* The result is stored in @attrs->__pod_cpumask. * The result is stored in @attrs->__pod_cpumask.
* *
* If pod affinity is not enabled, @attrs->cpumask is always used. If enabled * If pod affinity is not enabled, @attrs->cpumask is always used. If enabled
...@@ -5177,29 +5155,18 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq, ...@@ -5177,29 +5155,18 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
* *
* The caller is responsible for ensuring that the cpumask of @pod stays stable. * The caller is responsible for ensuring that the cpumask of @pod stays stable.
*/ */
static void wq_calc_pod_cpumask(struct workqueue_attrs *attrs, int cpu, static void wq_calc_pod_cpumask(struct workqueue_attrs *attrs, int cpu)
int cpu_going_down)
{ {
const struct wq_pod_type *pt = wqattrs_pod_type(attrs); const struct wq_pod_type *pt = wqattrs_pod_type(attrs);
int pod = pt->cpu_pod[cpu]; int pod = pt->cpu_pod[cpu];
/* does @pod have any online CPUs @attrs wants? */ /* calculate possible CPUs in @pod that @attrs wants */
cpumask_and(attrs->__pod_cpumask, pt->pod_cpus[pod], attrs->cpumask); cpumask_and(attrs->__pod_cpumask, pt->pod_cpus[pod], attrs->cpumask);
cpumask_and(attrs->__pod_cpumask, attrs->__pod_cpumask, cpu_online_mask); /* does @pod have any online CPUs @attrs wants? */
if (cpu_going_down >= 0) if (!cpumask_intersects(attrs->__pod_cpumask, wq_online_cpumask)) {
cpumask_clear_cpu(cpu_going_down, attrs->__pod_cpumask);
if (cpumask_empty(attrs->__pod_cpumask)) {
cpumask_copy(attrs->__pod_cpumask, attrs->cpumask); cpumask_copy(attrs->__pod_cpumask, attrs->cpumask);
return; return;
} }
/* yeap, return possible CPUs in @pod that @attrs wants */
cpumask_and(attrs->__pod_cpumask, attrs->cpumask, pt->pod_cpus[pod]);
if (cpumask_empty(attrs->__pod_cpumask))
pr_warn_once("WARNING: workqueue cpumask: online intersect > "
"possible intersect\n");
} }
/* install @pwq into @wq and return the old pwq, @cpu < 0 for dfl_pwq */ /* install @pwq into @wq and return the old pwq, @cpu < 0 for dfl_pwq */
...@@ -5284,7 +5251,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq, ...@@ -5284,7 +5251,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
ctx->dfl_pwq->refcnt++; ctx->dfl_pwq->refcnt++;
ctx->pwq_tbl[cpu] = ctx->dfl_pwq; ctx->pwq_tbl[cpu] = ctx->dfl_pwq;
} else { } else {
wq_calc_pod_cpumask(new_attrs, cpu, -1); wq_calc_pod_cpumask(new_attrs, cpu);
ctx->pwq_tbl[cpu] = alloc_unbound_pwq(wq, new_attrs); ctx->pwq_tbl[cpu] = alloc_unbound_pwq(wq, new_attrs);
if (!ctx->pwq_tbl[cpu]) if (!ctx->pwq_tbl[cpu])
goto out_free; goto out_free;
...@@ -5394,15 +5361,12 @@ int apply_workqueue_attrs(struct workqueue_struct *wq, ...@@ -5394,15 +5361,12 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
} }
/** /**
* wq_update_pod - update pod affinity of a wq for CPU hot[un]plug * unbound_wq_update_pwq - update a pwq slot for CPU hot[un]plug
* @wq: the target workqueue * @wq: the target workqueue
* @cpu: the CPU to update pool association for * @cpu: the CPU to update the pwq slot for
* @hotplug_cpu: the CPU coming up or going down
* @online: whether @cpu is coming up or going down
* *
* This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and * This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and
* %CPU_DOWN_FAILED. @cpu is being hot[un]plugged, update pod affinity of * %CPU_DOWN_FAILED. @cpu is in the same pod of the CPU being hot[un]plugged.
* @wq accordingly.
* *
* *
* If pod affinity can't be adjusted due to memory allocation failure, it falls * If pod affinity can't be adjusted due to memory allocation failure, it falls
...@@ -5415,10 +5379,8 @@ int apply_workqueue_attrs(struct workqueue_struct *wq, ...@@ -5415,10 +5379,8 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
* CPU_DOWN. If a workqueue user wants strict affinity, it's the user's * CPU_DOWN. If a workqueue user wants strict affinity, it's the user's
* responsibility to flush the work item from CPU_DOWN_PREPARE. * responsibility to flush the work item from CPU_DOWN_PREPARE.
*/ */
static void wq_update_pod(struct workqueue_struct *wq, int cpu, static void unbound_wq_update_pwq(struct workqueue_struct *wq, int cpu)
int hotplug_cpu, bool online)
{ {
int off_cpu = online ? -1 : hotplug_cpu;
struct pool_workqueue *old_pwq = NULL, *pwq; struct pool_workqueue *old_pwq = NULL, *pwq;
struct workqueue_attrs *target_attrs; struct workqueue_attrs *target_attrs;
...@@ -5432,13 +5394,13 @@ static void wq_update_pod(struct workqueue_struct *wq, int cpu, ...@@ -5432,13 +5394,13 @@ static void wq_update_pod(struct workqueue_struct *wq, int cpu,
* Let's use a preallocated one. The following buf is protected by * Let's use a preallocated one. The following buf is protected by
* CPU hotplug exclusion. * CPU hotplug exclusion.
*/ */
target_attrs = wq_update_pod_attrs_buf; target_attrs = unbound_wq_update_pwq_attrs_buf;
copy_workqueue_attrs(target_attrs, wq->unbound_attrs); copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
wqattrs_actualize_cpumask(target_attrs, wq_unbound_cpumask); wqattrs_actualize_cpumask(target_attrs, wq_unbound_cpumask);
/* nothing to do if the target cpumask matches the current pwq */ /* nothing to do if the target cpumask matches the current pwq */
wq_calc_pod_cpumask(target_attrs, cpu, off_cpu); wq_calc_pod_cpumask(target_attrs, cpu);
if (wqattrs_equal(target_attrs, unbound_pwq(wq, cpu)->pool->attrs)) if (wqattrs_equal(target_attrs, unbound_pwq(wq, cpu)->pool->attrs))
return; return;
...@@ -5472,21 +5434,25 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq) ...@@ -5472,21 +5434,25 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
bool highpri = wq->flags & WQ_HIGHPRI; bool highpri = wq->flags & WQ_HIGHPRI;
int cpu, ret; int cpu, ret;
lockdep_assert_cpus_held();
lockdep_assert_held(&wq_pool_mutex);
wq->cpu_pwq = alloc_percpu(struct pool_workqueue *); wq->cpu_pwq = alloc_percpu(struct pool_workqueue *);
if (!wq->cpu_pwq) if (!wq->cpu_pwq)
goto enomem; goto enomem;
if (!(wq->flags & WQ_UNBOUND)) { if (!(wq->flags & WQ_UNBOUND)) {
struct worker_pool __percpu *pools;
if (wq->flags & WQ_BH)
pools = bh_worker_pools;
else
pools = cpu_worker_pools;
for_each_possible_cpu(cpu) { for_each_possible_cpu(cpu) {
struct pool_workqueue **pwq_p; struct pool_workqueue **pwq_p;
struct worker_pool __percpu *pools;
struct worker_pool *pool; struct worker_pool *pool;
if (wq->flags & WQ_BH)
pools = bh_worker_pools;
else
pools = cpu_worker_pools;
pool = &(per_cpu_ptr(pools, cpu)[highpri]); pool = &(per_cpu_ptr(pools, cpu)[highpri]);
pwq_p = per_cpu_ptr(wq->cpu_pwq, cpu); pwq_p = per_cpu_ptr(wq->cpu_pwq, cpu);
...@@ -5504,26 +5470,18 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq) ...@@ -5504,26 +5470,18 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq)
return 0; return 0;
} }
cpus_read_lock();
if (wq->flags & __WQ_ORDERED) { if (wq->flags & __WQ_ORDERED) {
struct pool_workqueue *dfl_pwq; struct pool_workqueue *dfl_pwq;
ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]); ret = apply_workqueue_attrs_locked(wq, ordered_wq_attrs[highpri]);
/* there should only be single pwq for ordering guarantee */ /* there should only be single pwq for ordering guarantee */
dfl_pwq = rcu_access_pointer(wq->dfl_pwq); dfl_pwq = rcu_access_pointer(wq->dfl_pwq);
WARN(!ret && (wq->pwqs.next != &dfl_pwq->pwqs_node || WARN(!ret && (wq->pwqs.next != &dfl_pwq->pwqs_node ||
wq->pwqs.prev != &dfl_pwq->pwqs_node), wq->pwqs.prev != &dfl_pwq->pwqs_node),
"ordering guarantee broken for workqueue %s\n", wq->name); "ordering guarantee broken for workqueue %s\n", wq->name);
} else { } else {
ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); ret = apply_workqueue_attrs_locked(wq, unbound_std_wq_attrs[highpri]);
} }
cpus_read_unlock();
/* for unbound pwq, flush the pwq_release_worker ensures that the
* pwq_release_workfn() completes before calling kfree(wq).
*/
if (ret)
kthread_flush_worker(pwq_release_worker);
return ret; return ret;
...@@ -5561,6 +5519,8 @@ static int init_rescuer(struct workqueue_struct *wq) ...@@ -5561,6 +5519,8 @@ static int init_rescuer(struct workqueue_struct *wq)
char id_buf[WORKER_ID_LEN]; char id_buf[WORKER_ID_LEN];
int ret; int ret;
lockdep_assert_held(&wq_pool_mutex);
if (!(wq->flags & WQ_MEM_RECLAIM)) if (!(wq->flags & WQ_MEM_RECLAIM))
return 0; return 0;
...@@ -5585,7 +5545,7 @@ static int init_rescuer(struct workqueue_struct *wq) ...@@ -5585,7 +5545,7 @@ static int init_rescuer(struct workqueue_struct *wq)
wq->rescuer = rescuer; wq->rescuer = rescuer;
if (wq->flags & WQ_UNBOUND) if (wq->flags & WQ_UNBOUND)
kthread_bind_mask(rescuer->task, wq_unbound_cpumask); kthread_bind_mask(rescuer->task, unbound_effective_cpumask(wq));
else else
kthread_bind_mask(rescuer->task, cpu_possible_mask); kthread_bind_mask(rescuer->task, cpu_possible_mask);
wake_up_process(rescuer->task); wake_up_process(rescuer->task);
...@@ -5733,21 +5693,15 @@ struct workqueue_struct *alloc_workqueue(const char *fmt, ...@@ -5733,21 +5693,15 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
goto err_unreg_lockdep; goto err_unreg_lockdep;
} }
if (alloc_and_link_pwqs(wq) < 0)
goto err_free_node_nr_active;
if (wq_online && init_rescuer(wq) < 0)
goto err_destroy;
if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
goto err_destroy;
/* /*
* wq_pool_mutex protects global freeze state and workqueues list. * wq_pool_mutex protects the workqueues list, allocations of PWQs,
* Grab it, adjust max_active and add the new @wq to workqueues * and the global freeze state. alloc_and_link_pwqs() also requires
* list. * cpus_read_lock() for PWQs' affinities.
*/ */
mutex_lock(&wq_pool_mutex); apply_wqattrs_lock();
if (alloc_and_link_pwqs(wq) < 0)
goto err_unlock_free_node_nr_active;
mutex_lock(&wq->mutex); mutex_lock(&wq->mutex);
wq_adjust_max_active(wq); wq_adjust_max_active(wq);
...@@ -5755,13 +5709,27 @@ struct workqueue_struct *alloc_workqueue(const char *fmt, ...@@ -5755,13 +5709,27 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
list_add_tail_rcu(&wq->list, &workqueues); list_add_tail_rcu(&wq->list, &workqueues);
mutex_unlock(&wq_pool_mutex); if (wq_online && init_rescuer(wq) < 0)
goto err_unlock_destroy;
apply_wqattrs_unlock();
if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
goto err_destroy;
return wq; return wq;
err_free_node_nr_active: err_unlock_free_node_nr_active:
if (wq->flags & WQ_UNBOUND) apply_wqattrs_unlock();
/*
* Failed alloc_and_link_pwqs() may leave pending pwq->release_work,
* flushing the pwq_release_worker ensures that the pwq_release_workfn()
* completes before calling kfree(wq).
*/
if (wq->flags & WQ_UNBOUND) {
kthread_flush_worker(pwq_release_worker);
free_node_nr_active(wq->node_nr_active); free_node_nr_active(wq->node_nr_active);
}
err_unreg_lockdep: err_unreg_lockdep:
wq_unregister_lockdep(wq); wq_unregister_lockdep(wq);
wq_free_lockdep(wq); wq_free_lockdep(wq);
...@@ -5769,6 +5737,8 @@ struct workqueue_struct *alloc_workqueue(const char *fmt, ...@@ -5769,6 +5737,8 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
free_workqueue_attrs(wq->unbound_attrs); free_workqueue_attrs(wq->unbound_attrs);
kfree(wq); kfree(wq);
return NULL; return NULL;
err_unlock_destroy:
apply_wqattrs_unlock();
err_destroy: err_destroy:
destroy_workqueue(wq); destroy_workqueue(wq);
return NULL; return NULL;
...@@ -6607,6 +6577,8 @@ int workqueue_online_cpu(unsigned int cpu) ...@@ -6607,6 +6577,8 @@ int workqueue_online_cpu(unsigned int cpu)
mutex_lock(&wq_pool_mutex); mutex_lock(&wq_pool_mutex);
cpumask_set_cpu(cpu, wq_online_cpumask);
for_each_pool(pool, pi) { for_each_pool(pool, pi) {
/* BH pools aren't affected by hotplug */ /* BH pools aren't affected by hotplug */
if (pool->flags & POOL_BH) if (pool->flags & POOL_BH)
...@@ -6629,7 +6601,7 @@ int workqueue_online_cpu(unsigned int cpu) ...@@ -6629,7 +6601,7 @@ int workqueue_online_cpu(unsigned int cpu)
int tcpu; int tcpu;
for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]]) for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]])
wq_update_pod(wq, tcpu, cpu, true); unbound_wq_update_pwq(wq, tcpu);
mutex_lock(&wq->mutex); mutex_lock(&wq->mutex);
wq_update_node_max_active(wq, -1); wq_update_node_max_active(wq, -1);
...@@ -6653,6 +6625,9 @@ int workqueue_offline_cpu(unsigned int cpu) ...@@ -6653,6 +6625,9 @@ int workqueue_offline_cpu(unsigned int cpu)
/* update pod affinity of unbound workqueues */ /* update pod affinity of unbound workqueues */
mutex_lock(&wq_pool_mutex); mutex_lock(&wq_pool_mutex);
cpumask_clear_cpu(cpu, wq_online_cpumask);
list_for_each_entry(wq, &workqueues, list) { list_for_each_entry(wq, &workqueues, list) {
struct workqueue_attrs *attrs = wq->unbound_attrs; struct workqueue_attrs *attrs = wq->unbound_attrs;
...@@ -6661,7 +6636,7 @@ int workqueue_offline_cpu(unsigned int cpu) ...@@ -6661,7 +6636,7 @@ int workqueue_offline_cpu(unsigned int cpu)
int tcpu; int tcpu;
for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]]) for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]])
wq_update_pod(wq, tcpu, cpu, false); unbound_wq_update_pwq(wq, tcpu);
mutex_lock(&wq->mutex); mutex_lock(&wq->mutex);
wq_update_node_max_active(wq, cpu); wq_update_node_max_active(wq, cpu);
...@@ -6901,9 +6876,6 @@ int workqueue_unbound_exclude_cpumask(cpumask_var_t exclude_cpumask) ...@@ -6901,9 +6876,6 @@ int workqueue_unbound_exclude_cpumask(cpumask_var_t exclude_cpumask)
lockdep_assert_cpus_held(); lockdep_assert_cpus_held();
mutex_lock(&wq_pool_mutex); mutex_lock(&wq_pool_mutex);
/* Save the current isolated cpumask & export it via sysfs */
cpumask_copy(wq_isolated_cpumask, exclude_cpumask);
/* /*
* If the operation fails, it will fall back to * If the operation fails, it will fall back to
* wq_requested_unbound_cpumask which is initially set to * wq_requested_unbound_cpumask which is initially set to
...@@ -6915,6 +6887,10 @@ int workqueue_unbound_exclude_cpumask(cpumask_var_t exclude_cpumask) ...@@ -6915,6 +6887,10 @@ int workqueue_unbound_exclude_cpumask(cpumask_var_t exclude_cpumask)
if (!cpumask_equal(cpumask, wq_unbound_cpumask)) if (!cpumask_equal(cpumask, wq_unbound_cpumask))
ret = workqueue_apply_unbound_cpumask(cpumask); ret = workqueue_apply_unbound_cpumask(cpumask);
/* Save the current isolated cpumask & export it via sysfs */
if (!ret)
cpumask_copy(wq_isolated_cpumask, exclude_cpumask);
mutex_unlock(&wq_pool_mutex); mutex_unlock(&wq_pool_mutex);
free_cpumask_var(cpumask); free_cpumask_var(cpumask);
return ret; return ret;
...@@ -6948,9 +6924,8 @@ static int wq_affn_dfl_set(const char *val, const struct kernel_param *kp) ...@@ -6948,9 +6924,8 @@ static int wq_affn_dfl_set(const char *val, const struct kernel_param *kp)
wq_affn_dfl = affn; wq_affn_dfl = affn;
list_for_each_entry(wq, &workqueues, list) { list_for_each_entry(wq, &workqueues, list) {
for_each_online_cpu(cpu) { for_each_online_cpu(cpu)
wq_update_pod(wq, cpu, cpu, true); unbound_wq_update_pwq(wq, cpu);
}
} }
mutex_unlock(&wq_pool_mutex); mutex_unlock(&wq_pool_mutex);
...@@ -7038,19 +7013,6 @@ static struct attribute *wq_sysfs_attrs[] = { ...@@ -7038,19 +7013,6 @@ static struct attribute *wq_sysfs_attrs[] = {
}; };
ATTRIBUTE_GROUPS(wq_sysfs); ATTRIBUTE_GROUPS(wq_sysfs);
static void apply_wqattrs_lock(void)
{
/* CPUs should stay stable across pwq creations and installations */
cpus_read_lock();
mutex_lock(&wq_pool_mutex);
}
static void apply_wqattrs_unlock(void)
{
mutex_unlock(&wq_pool_mutex);
cpus_read_unlock();
}
static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr, static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
char *buf) char *buf)
{ {
...@@ -7249,16 +7211,12 @@ static int workqueue_set_unbound_cpumask(cpumask_var_t cpumask) ...@@ -7249,16 +7211,12 @@ static int workqueue_set_unbound_cpumask(cpumask_var_t cpumask)
*/ */
cpumask_and(cpumask, cpumask, cpu_possible_mask); cpumask_and(cpumask, cpumask, cpu_possible_mask);
if (!cpumask_empty(cpumask)) { if (!cpumask_empty(cpumask)) {
ret = 0;
apply_wqattrs_lock(); apply_wqattrs_lock();
cpumask_copy(wq_requested_unbound_cpumask, cpumask); if (!cpumask_equal(cpumask, wq_unbound_cpumask))
if (cpumask_equal(cpumask, wq_unbound_cpumask)) { ret = workqueue_apply_unbound_cpumask(cpumask);
ret = 0; if (!ret)
goto out_unlock; cpumask_copy(wq_requested_unbound_cpumask, cpumask);
}
ret = workqueue_apply_unbound_cpumask(cpumask);
out_unlock:
apply_wqattrs_unlock(); apply_wqattrs_unlock();
} }
...@@ -7577,10 +7535,18 @@ static void wq_watchdog_timer_fn(struct timer_list *unused) ...@@ -7577,10 +7535,18 @@ static void wq_watchdog_timer_fn(struct timer_list *unused)
notrace void wq_watchdog_touch(int cpu) notrace void wq_watchdog_touch(int cpu)
{ {
unsigned long thresh = READ_ONCE(wq_watchdog_thresh) * HZ;
unsigned long touch_ts = READ_ONCE(wq_watchdog_touched);
unsigned long now = jiffies;
if (cpu >= 0) if (cpu >= 0)
per_cpu(wq_watchdog_touched_cpu, cpu) = jiffies; per_cpu(wq_watchdog_touched_cpu, cpu) = now;
else
WARN_ONCE(1, "%s should be called with valid CPU", __func__);
wq_watchdog_touched = jiffies; /* Don't unnecessarily store to global cacheline */
if (time_after(now, touch_ts + thresh / 4))
WRITE_ONCE(wq_watchdog_touched, jiffies);
} }
static void wq_watchdog_set_thresh(unsigned long thresh) static void wq_watchdog_set_thresh(unsigned long thresh)
...@@ -7690,10 +7656,12 @@ void __init workqueue_init_early(void) ...@@ -7690,10 +7656,12 @@ void __init workqueue_init_early(void)
BUILD_BUG_ON(__alignof__(struct pool_workqueue) < __alignof__(long long)); BUILD_BUG_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
BUG_ON(!alloc_cpumask_var(&wq_online_cpumask, GFP_KERNEL));
BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL)); BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
BUG_ON(!alloc_cpumask_var(&wq_requested_unbound_cpumask, GFP_KERNEL)); BUG_ON(!alloc_cpumask_var(&wq_requested_unbound_cpumask, GFP_KERNEL));
BUG_ON(!zalloc_cpumask_var(&wq_isolated_cpumask, GFP_KERNEL)); BUG_ON(!zalloc_cpumask_var(&wq_isolated_cpumask, GFP_KERNEL));
cpumask_copy(wq_online_cpumask, cpu_online_mask);
cpumask_copy(wq_unbound_cpumask, cpu_possible_mask); cpumask_copy(wq_unbound_cpumask, cpu_possible_mask);
restrict_unbound_cpumask("HK_TYPE_WQ", housekeeping_cpumask(HK_TYPE_WQ)); restrict_unbound_cpumask("HK_TYPE_WQ", housekeeping_cpumask(HK_TYPE_WQ));
restrict_unbound_cpumask("HK_TYPE_DOMAIN", housekeeping_cpumask(HK_TYPE_DOMAIN)); restrict_unbound_cpumask("HK_TYPE_DOMAIN", housekeeping_cpumask(HK_TYPE_DOMAIN));
...@@ -7704,8 +7672,8 @@ void __init workqueue_init_early(void) ...@@ -7704,8 +7672,8 @@ void __init workqueue_init_early(void)
pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC); pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
wq_update_pod_attrs_buf = alloc_workqueue_attrs(); unbound_wq_update_pwq_attrs_buf = alloc_workqueue_attrs();
BUG_ON(!wq_update_pod_attrs_buf); BUG_ON(!unbound_wq_update_pwq_attrs_buf);
/* /*
* If nohz_full is enabled, set power efficient workqueue as unbound. * If nohz_full is enabled, set power efficient workqueue as unbound.
...@@ -7970,12 +7938,12 @@ void __init workqueue_init_topology(void) ...@@ -7970,12 +7938,12 @@ void __init workqueue_init_topology(void)
/* /*
* Workqueues allocated earlier would have all CPUs sharing the default * Workqueues allocated earlier would have all CPUs sharing the default
* worker pool. Explicitly call wq_update_pod() on all workqueue and CPU * worker pool. Explicitly call unbound_wq_update_pwq() on all workqueue
* combinations to apply per-pod sharing. * and CPU combinations to apply per-pod sharing.
*/ */
list_for_each_entry(wq, &workqueues, list) { list_for_each_entry(wq, &workqueues, list) {
for_each_online_cpu(cpu) for_each_online_cpu(cpu)
wq_update_pod(wq, cpu, cpu, true); unbound_wq_update_pwq(wq, cpu);
if (wq->flags & WQ_UNBOUND) { if (wq->flags & WQ_UNBOUND) {
mutex_lock(&wq->mutex); mutex_lock(&wq->mutex);
wq_update_node_max_active(wq, -1); wq_update_node_max_active(wq, -1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment