Commit 84193c07 authored by Tejun Heo's avatar Tejun Heo

workqueue: Generalize unbound CPU pods

While renamed to pod, the code still assumes that the pods are defined by
NUMA boundaries. Let's generalize it:

* workqueue_attrs->affn_scope is added. Each enum represents the type of
  boundaries that define the pods. There are currently two scopes -
  WQ_AFFN_NUMA and WQ_AFFN_SYSTEM. The former is the same behavior as before
  - one pod per NUMA node. The latter defines one global pod across the
  whole system.

* struct wq_pod_type is added which describes how pods are configured for
  each affnity scope. For each pod, it lists the member CPUs and the
  preferred NUMA node for memory allocations. The reverse mapping from CPU
  to pod is also available.

* wq_pod_enabled is dropped. Pod is now always enabled. The previously
  disabled behavior is now implemented through WQ_AFFN_SYSTEM.

* get_unbound_pool() wants to determine the NUMA node to allocate memory
  from for the new pool. The variables are renamed from node to pod but the
  logic still assumes they're one and the same. Clearly distinguish them -
  walk the WQ_AFFN_NUMA pods to find the matching pod and then use the pod's
  NUMA node.

* wq_calc_pod_cpumask() was taking @pod but assumed that it was the NUMA
  node. Take @cpu instead and determine the cpumask to use from the pod_type
  matching @attrs.

* apply_wqattrs_prepare() is update to return ERR_PTR() on error instead of
  NULL so that it can indicate -EINVAL on invalid affinity scopes.

This patch allows CPUs to be grouped into pods however desired per type.
While this patch causes some internal behavior changes, nothing material
should change for workqueue users.

v2: Trigger WARN_ON_ONCE() in wqattrs_pod_type() if affn_scope is
    WQ_AFFN_NR_TYPES which indicates that the function is called with a
    worker_pool's attrs instead of a workqueue's.
Signed-off-by: default avatarTejun Heo <tj@kernel.org>
parent 5de7a03c
......@@ -125,6 +125,15 @@ struct rcu_work {
struct workqueue_struct *wq;
};
enum wq_affn_scope {
WQ_AFFN_NUMA, /* one pod per NUMA node */
WQ_AFFN_SYSTEM, /* one pod across the whole system */
WQ_AFFN_NR_TYPES,
WQ_AFFN_DFL = WQ_AFFN_NUMA,
};
/**
* struct workqueue_attrs - A struct for workqueue attributes.
*
......@@ -141,12 +150,26 @@ struct workqueue_attrs {
*/
cpumask_var_t cpumask;
/*
* Below fields aren't properties of a worker_pool. They only modify how
* :c:func:`apply_workqueue_attrs` select pools and thus don't
* participate in pool hash calculations or equality comparisons.
*/
/**
* @affn_scope: unbound CPU affinity scope
*
* CPU pods are used to improve execution locality of unbound work
* items. There are multiple pod types, one for each wq_affn_scope, and
* every CPU in the system belongs to one pod in every pod type. CPUs
* that belong to the same pod share the worker pool. For example,
* selecting %WQ_AFFN_NUMA makes the workqueue use a separate worker
* pool for each NUMA node.
*/
enum wq_affn_scope affn_scope;
/**
* @ordered: work items must be executed one by one in queueing order
*
* Unlike other fields, ``ordered`` isn't a property of a worker_pool. It
* only modifies how :c:func:`apply_workqueue_attrs` select pools and thus
* doesn't participate in pool hash calculations or equality comparisons.
*/
bool ordered;
};
......
......@@ -326,7 +326,18 @@ struct workqueue_struct {
static struct kmem_cache *pwq_cache;
static cpumask_var_t *wq_pod_cpus; /* possible CPUs of each node */
/*
* Each pod type describes how CPUs should be grouped for unbound workqueues.
* See the comment above workqueue_attrs->affn_scope.
*/
struct wq_pod_type {
int nr_pods; /* number of pods */
cpumask_var_t *pod_cpus; /* pod -> cpus */
int *pod_node; /* pod -> node */
int *cpu_pod; /* cpu -> pod */
};
static struct wq_pod_type wq_pod_types[WQ_AFFN_NR_TYPES];
/*
* Per-cpu work items which run for longer than the following threshold are
......@@ -344,8 +355,6 @@ module_param_named(power_efficient, wq_power_efficient, bool, 0444);
static bool wq_online; /* can kworkers be created yet? */
static bool wq_pod_enabled; /* unbound CPU pod affinity enabled */
/* buf for wq_update_unbound_pod_attrs(), protected by CPU hotplug exclusion */
static struct workqueue_attrs *wq_update_pod_attrs_buf;
static cpumask_var_t wq_update_pod_cpumask_buf;
......@@ -1774,10 +1783,6 @@ static int select_numa_node_cpu(int node)
{
int cpu;
/* No point in doing this if NUMA isn't enabled for workqueues */
if (!wq_pod_enabled)
return WORK_CPU_UNBOUND;
/* Delay binding to CPU if node is not valid or online */
if (node < 0 || node >= MAX_NUMNODES || !node_online(node))
return WORK_CPU_UNBOUND;
......@@ -3659,6 +3664,7 @@ struct workqueue_attrs *alloc_workqueue_attrs(void)
goto fail;
cpumask_copy(attrs->cpumask, cpu_possible_mask);
attrs->affn_scope = WQ_AFFN_DFL;
return attrs;
fail:
free_workqueue_attrs(attrs);
......@@ -3670,11 +3676,13 @@ static void copy_workqueue_attrs(struct workqueue_attrs *to,
{
to->nice = from->nice;
cpumask_copy(to->cpumask, from->cpumask);
/*
* Unlike hash and equality test, this function doesn't ignore
* ->ordered as it is used for both pool and wq attrs. Instead,
* get_unbound_pool() explicitly clears ->ordered after copying.
* Unlike hash and equality test, copying shouldn't ignore wq-only
* fields as copying is used for both pool and wq attrs. Instead,
* get_unbound_pool() explicitly clears the fields.
*/
to->affn_scope = from->affn_scope;
to->ordered = from->ordered;
}
......@@ -3684,6 +3692,7 @@ static void copy_workqueue_attrs(struct workqueue_attrs *to,
*/
static void wqattrs_clear_for_pool(struct workqueue_attrs *attrs)
{
attrs->affn_scope = WQ_AFFN_NR_TYPES;
attrs->ordered = false;
}
......@@ -3723,6 +3732,25 @@ static void wqattrs_actualize_cpumask(struct workqueue_attrs *attrs,
cpumask_copy(attrs->cpumask, unbound_cpumask);
}
/* find wq_pod_type to use for @attrs */
static const struct wq_pod_type *
wqattrs_pod_type(const struct workqueue_attrs *attrs)
{
struct wq_pod_type *pt = &wq_pod_types[attrs->affn_scope];
if (!WARN_ON_ONCE(attrs->affn_scope == WQ_AFFN_NR_TYPES) &&
likely(pt->nr_pods))
return pt;
/*
* Before workqueue_init_topology(), only SYSTEM is available which is
* initialized in workqueue_init_early().
*/
pt = &wq_pod_types[WQ_AFFN_SYSTEM];
BUG_ON(!pt->nr_pods);
return pt;
}
/**
* init_worker_pool - initialize a newly zalloc'd worker_pool
* @pool: worker_pool to initialize
......@@ -3924,10 +3952,10 @@ static void put_unbound_pool(struct worker_pool *pool)
*/
static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
{
struct wq_pod_type *pt = &wq_pod_types[WQ_AFFN_NUMA];
u32 hash = wqattrs_hash(attrs);
struct worker_pool *pool;
int pod;
int target_pod = NUMA_NO_NODE;
int pod, node = NUMA_NO_NODE;
lockdep_assert_held(&wq_pool_mutex);
......@@ -3939,23 +3967,20 @@ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
}
}
/* if cpumask is contained inside a pod, we belong to that pod */
if (wq_pod_enabled) {
for_each_node(pod) {
if (cpumask_subset(attrs->cpumask, wq_pod_cpus[pod])) {
target_pod = pod;
/* If cpumask is contained inside a NUMA pod, that's our NUMA node */
for (pod = 0; pod < pt->nr_pods; pod++) {
if (cpumask_subset(attrs->cpumask, pt->pod_cpus[pod])) {
node = pt->pod_node[pod];
break;
}
}
}
/* nope, create a new one */
pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_pod);
pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, node);
if (!pool || init_worker_pool(pool) < 0)
goto fail;
pool->node = target_pod;
pool->node = node;
copy_workqueue_attrs(pool->attrs, attrs);
wqattrs_clear_for_pool(pool->attrs);
......@@ -4143,7 +4168,7 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
/**
* wq_calc_pod_cpumask - calculate a wq_attrs' cpumask for a pod
* @attrs: the wq_attrs of the default pwq of the target workqueue
* @pod: the target CPU pod
* @cpu: the target CPU
* @cpu_going_down: if >= 0, the CPU to consider as offline
* @cpumask: outarg, the resulting cpumask
*
......@@ -4157,30 +4182,29 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
*
* The caller is responsible for ensuring that the cpumask of @pod stays stable.
*/
static void wq_calc_pod_cpumask(const struct workqueue_attrs *attrs, int pod,
static void wq_calc_pod_cpumask(const struct workqueue_attrs *attrs, int cpu,
int cpu_going_down, cpumask_t *cpumask)
{
if (!wq_pod_enabled || attrs->ordered)
goto use_dfl;
const struct wq_pod_type *pt = wqattrs_pod_type(attrs);
int pod = pt->cpu_pod[cpu];
/* does @pod have any online CPUs @attrs wants? */
cpumask_and(cpumask, cpumask_of_node(pod), attrs->cpumask);
cpumask_and(cpumask, pt->pod_cpus[pod], attrs->cpumask);
cpumask_and(cpumask, cpumask, cpu_online_mask);
if (cpu_going_down >= 0)
cpumask_clear_cpu(cpu_going_down, cpumask);
if (cpumask_empty(cpumask))
goto use_dfl;
if (cpumask_empty(cpumask)) {
cpumask_copy(cpumask, attrs->cpumask);
return;
}
/* yeap, return possible CPUs in @pod that @attrs wants */
cpumask_and(cpumask, attrs->cpumask, wq_pod_cpus[pod]);
cpumask_and(cpumask, attrs->cpumask, pt->pod_cpus[pod]);
if (cpumask_empty(cpumask))
pr_warn_once("WARNING: workqueue cpumask: online intersect > "
"possible intersect\n");
return;
use_dfl:
cpumask_copy(cpumask, attrs->cpumask);
}
/* install @pwq into @wq's cpu_pwq and return the old pwq */
......@@ -4237,6 +4261,10 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
lockdep_assert_held(&wq_pool_mutex);
if (WARN_ON(attrs->affn_scope < 0 ||
attrs->affn_scope >= WQ_AFFN_NR_TYPES))
return ERR_PTR(-EINVAL);
ctx = kzalloc(struct_size(ctx, pwq_tbl, nr_cpu_ids), GFP_KERNEL);
new_attrs = alloc_workqueue_attrs();
......@@ -4266,8 +4294,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
ctx->dfl_pwq->refcnt++;
ctx->pwq_tbl[cpu] = ctx->dfl_pwq;
} else {
wq_calc_pod_cpumask(new_attrs, cpu_to_node(cpu), -1,
tmp_attrs->cpumask);
wq_calc_pod_cpumask(new_attrs, cpu, -1, tmp_attrs->cpumask);
ctx->pwq_tbl[cpu] = alloc_unbound_pwq(wq, tmp_attrs);
if (!ctx->pwq_tbl[cpu])
goto out_free;
......@@ -4287,7 +4314,7 @@ apply_wqattrs_prepare(struct workqueue_struct *wq,
free_workqueue_attrs(tmp_attrs);
free_workqueue_attrs(new_attrs);
apply_wqattrs_cleanup(ctx);
return NULL;
return ERR_PTR(-ENOMEM);
}
/* set attrs and install prepared pwqs, @ctx points to old pwqs on return */
......@@ -4343,8 +4370,8 @@ static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
}
ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_cpumask);
if (!ctx)
return -ENOMEM;
if (IS_ERR(ctx))
return PTR_ERR(ctx);
/* the ctx has been prepared successfully, let's commit it */
apply_wqattrs_commit(ctx);
......@@ -4409,7 +4436,6 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
static void wq_update_pod(struct workqueue_struct *wq, int cpu,
int hotplug_cpu, bool online)
{
int pod = cpu_to_node(cpu);
int off_cpu = online ? -1 : hotplug_cpu;
struct pool_workqueue *old_pwq = NULL, *pwq;
struct workqueue_attrs *target_attrs;
......@@ -4417,8 +4443,7 @@ static void wq_update_pod(struct workqueue_struct *wq, int cpu,
lockdep_assert_held(&wq_pool_mutex);
if (!wq_pod_enabled || !(wq->flags & WQ_UNBOUND) ||
wq->unbound_attrs->ordered)
if (!(wq->flags & WQ_UNBOUND) || wq->unbound_attrs->ordered)
return;
/*
......@@ -4433,7 +4458,7 @@ static void wq_update_pod(struct workqueue_struct *wq, int cpu,
wqattrs_actualize_cpumask(target_attrs, wq_unbound_cpumask);
/* nothing to do if the target cpumask matches the current pwq */
wq_calc_pod_cpumask(target_attrs, pod, off_cpu, cpumask);
wq_calc_pod_cpumask(target_attrs, cpu, off_cpu, cpumask);
pwq = rcu_dereference_protected(*per_cpu_ptr(wq->cpu_pwq, cpu),
lockdep_is_held(&wq_pool_mutex));
if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
......@@ -5460,14 +5485,16 @@ int workqueue_online_cpu(unsigned int cpu)
/* update pod affinity of unbound workqueues */
list_for_each_entry(wq, &workqueues, list) {
struct workqueue_attrs *attrs = wq->unbound_attrs;
if (attrs) {
const struct wq_pod_type *pt = wqattrs_pod_type(attrs);
int tcpu;
for_each_possible_cpu(tcpu) {
if (cpu_to_node(tcpu) == cpu_to_node(cpu)) {
for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]])
wq_update_pod(wq, tcpu, cpu, true);
}
}
}
mutex_unlock(&wq_pool_mutex);
return 0;
......@@ -5486,14 +5513,16 @@ int workqueue_offline_cpu(unsigned int cpu)
/* update pod affinity of unbound workqueues */
mutex_lock(&wq_pool_mutex);
list_for_each_entry(wq, &workqueues, list) {
struct workqueue_attrs *attrs = wq->unbound_attrs;
if (attrs) {
const struct wq_pod_type *pt = wqattrs_pod_type(attrs);
int tcpu;
for_each_possible_cpu(tcpu) {
if (cpu_to_node(tcpu) == cpu_to_node(cpu)) {
for_each_cpu(tcpu, pt->pod_cpus[pt->cpu_pod[cpu]])
wq_update_pod(wq, tcpu, cpu, false);
}
}
}
mutex_unlock(&wq_pool_mutex);
return 0;
......@@ -5689,8 +5718,8 @@ static int workqueue_apply_unbound_cpumask(const cpumask_var_t unbound_cpumask)
continue;
ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs, unbound_cpumask);
if (!ctx) {
ret = -ENOMEM;
if (IS_ERR(ctx)) {
ret = PTR_ERR(ctx);
break;
}
......@@ -6283,6 +6312,7 @@ static inline void wq_watchdog_init(void) { }
*/
void __init workqueue_init_early(void)
{
struct wq_pod_type *pt = &wq_pod_types[WQ_AFFN_SYSTEM];
int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
int i, cpu;
......@@ -6302,6 +6332,22 @@ void __init workqueue_init_early(void)
BUG_ON(!alloc_cpumask_var(&wq_update_pod_cpumask_buf, GFP_KERNEL));
/* initialize WQ_AFFN_SYSTEM pods */
pt->pod_cpus = kcalloc(1, sizeof(pt->pod_cpus[0]), GFP_KERNEL);
pt->pod_node = kcalloc(1, sizeof(pt->pod_node[0]), GFP_KERNEL);
pt->cpu_pod = kcalloc(nr_cpu_ids, sizeof(pt->cpu_pod[0]), GFP_KERNEL);
BUG_ON(!pt->pod_cpus || !pt->pod_node || !pt->cpu_pod);
BUG_ON(!zalloc_cpumask_var_node(&pt->pod_cpus[0], GFP_KERNEL, NUMA_NO_NODE));
wq_update_pod_attrs_buf = alloc_workqueue_attrs();
BUG_ON(!wq_update_pod_attrs_buf);
pt->nr_pods = 1;
cpumask_copy(pt->pod_cpus[0], cpu_possible_mask);
pt->pod_node[0] = NUMA_NO_NODE;
pt->cpu_pod[0] = 0;
/* initialize CPU pools */
for_each_possible_cpu(cpu) {
struct worker_pool *pool;
......@@ -6457,8 +6503,8 @@ void __init workqueue_init(void)
*/
void __init workqueue_init_topology(void)
{
struct wq_pod_type *pt = &wq_pod_types[WQ_AFFN_NUMA];
struct workqueue_struct *wq;
cpumask_var_t *tbl;
int node, cpu;
if (num_possible_nodes() <= 1)
......@@ -6478,20 +6524,23 @@ void __init workqueue_init_topology(void)
* available. Build one from cpu_to_node() which should have been
* fully initialized by now.
*/
tbl = kcalloc(nr_node_ids, sizeof(tbl[0]), GFP_KERNEL);
BUG_ON(!tbl);
pt->pod_cpus = kcalloc(nr_node_ids, sizeof(pt->pod_cpus[0]), GFP_KERNEL);
pt->pod_node = kcalloc(nr_node_ids, sizeof(pt->pod_node[0]), GFP_KERNEL);
pt->cpu_pod = kcalloc(nr_cpu_ids, sizeof(pt->cpu_pod[0]), GFP_KERNEL);
BUG_ON(!pt->pod_cpus || !pt->pod_node || !pt->cpu_pod);
for_each_node(node)
BUG_ON(!zalloc_cpumask_var_node(&tbl[node], GFP_KERNEL,
BUG_ON(!zalloc_cpumask_var_node(&pt->pod_cpus[node], GFP_KERNEL,
node_online(node) ? node : NUMA_NO_NODE));
for_each_possible_cpu(cpu) {
node = cpu_to_node(cpu);
cpumask_set_cpu(cpu, tbl[node]);
cpumask_set_cpu(cpu, pt->pod_cpus[node]);
pt->pod_node[node] = node;
pt->cpu_pod[cpu] = node;
}
wq_pod_cpus = tbl;
wq_pod_enabled = true;
pt->nr_pods = nr_node_ids;
/*
* Workqueues allocated earlier would have all CPUs sharing the default
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment