Commit bfde23ce authored by Daniel Jordan's avatar Daniel Jordan Committed by Herbert Xu

padata: unbind parallel jobs from specific CPUs

Padata binds the parallel part of a job to a single CPU and round-robins
over all CPUs in the system for each successive job.  Though the serial
parts rely on per-CPU queues for correct ordering, they're not necessary
for parallel work, and it improves performance to run the job locally on
NUMA machines and let the scheduler pick the CPU within a node on a busy
system.

So, make the parallel workqueue unbound.

Update the parallel workqueue's cpumask when the instance's parallel
cpumask changes.

Now that parallel jobs no longer run on max_active=1 workqueues, two or
more parallel works that hash to the same CPU may run simultaneously,
finish out of order, and so be serialized out of order.  Prevent this by
keeping the works sorted on the reorder list by sequence number and
checking that in the reordering logic.

padata_get_next becomes padata_find_next so it can be reused for the end
of padata_reorder, where it's used to avoid uselessly queueing work when
the next job by sequence number isn't finished yet but a later job that
hashed to the same CPU has.

The ENODATA case in padata_find_next no longer makes sense because
parallel jobs aren't bound to specific CPUs.  The EINPROGRESS case takes
care of the scenario where a parallel job is potentially running on the
same CPU as padata_find_next, and with only one error code left, just
use NULL instead.
Signed-off-by: default avatarDaniel Jordan <daniel.m.jordan@oracle.com>
Cc: Herbert Xu <herbert@gondor.apana.org.au>
Cc: Lai Jiangshan <jiangshanlai@gmail.com>
Cc: Peter Zijlstra <peterz@infradead.org>
Cc: Steffen Klassert <steffen.klassert@secunet.com>
Cc: Tejun Heo <tj@kernel.org>
Cc: linux-crypto@vger.kernel.org
Cc: linux-kernel@vger.kernel.org
Signed-off-by: default avatarHerbert Xu <herbert@gondor.apana.org.au>
parent 45d153c0
...@@ -35,6 +35,7 @@ struct padata_priv { ...@@ -35,6 +35,7 @@ struct padata_priv {
struct parallel_data *pd; struct parallel_data *pd;
int cb_cpu; int cb_cpu;
int cpu; int cpu;
unsigned int seq_nr;
int info; int info;
void (*parallel)(struct padata_priv *padata); void (*parallel)(struct padata_priv *padata);
void (*serial)(struct padata_priv *padata); void (*serial)(struct padata_priv *padata);
...@@ -105,6 +106,7 @@ struct padata_cpumask { ...@@ -105,6 +106,7 @@ struct padata_cpumask {
* @reorder_objects: Number of objects waiting in the reorder queues. * @reorder_objects: Number of objects waiting in the reorder queues.
* @refcnt: Number of objects holding a reference on this parallel_data. * @refcnt: Number of objects holding a reference on this parallel_data.
* @max_seq_nr: Maximal used sequence number. * @max_seq_nr: Maximal used sequence number.
* @processed: Number of already processed objects.
* @cpu: Next CPU to be processed. * @cpu: Next CPU to be processed.
* @cpumask: The cpumasks in use for parallel and serial workers. * @cpumask: The cpumasks in use for parallel and serial workers.
* @reorder_work: work struct for reordering. * @reorder_work: work struct for reordering.
...@@ -117,6 +119,7 @@ struct parallel_data { ...@@ -117,6 +119,7 @@ struct parallel_data {
atomic_t reorder_objects; atomic_t reorder_objects;
atomic_t refcnt; atomic_t refcnt;
atomic_t seq_nr; atomic_t seq_nr;
unsigned int processed;
int cpu; int cpu;
struct padata_cpumask cpumask; struct padata_cpumask cpumask;
struct work_struct reorder_work; struct work_struct reorder_work;
......
...@@ -46,18 +46,13 @@ static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index) ...@@ -46,18 +46,13 @@ static int padata_index_to_cpu(struct parallel_data *pd, int cpu_index)
return target_cpu; return target_cpu;
} }
static int padata_cpu_hash(struct parallel_data *pd) static int padata_cpu_hash(struct parallel_data *pd, unsigned int seq_nr)
{ {
unsigned int seq_nr;
int cpu_index;
/* /*
* Hash the sequence numbers to the cpus by taking * Hash the sequence numbers to the cpus by taking
* seq_nr mod. number of cpus in use. * seq_nr mod. number of cpus in use.
*/ */
int cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
seq_nr = atomic_inc_return(&pd->seq_nr);
cpu_index = seq_nr % cpumask_weight(pd->cpumask.pcpu);
return padata_index_to_cpu(pd, cpu_index); return padata_index_to_cpu(pd, cpu_index);
} }
...@@ -144,7 +139,8 @@ int padata_do_parallel(struct padata_instance *pinst, ...@@ -144,7 +139,8 @@ int padata_do_parallel(struct padata_instance *pinst,
padata->pd = pd; padata->pd = pd;
padata->cb_cpu = *cb_cpu; padata->cb_cpu = *cb_cpu;
target_cpu = padata_cpu_hash(pd); padata->seq_nr = atomic_inc_return(&pd->seq_nr);
target_cpu = padata_cpu_hash(pd, padata->seq_nr);
padata->cpu = target_cpu; padata->cpu = target_cpu;
queue = per_cpu_ptr(pd->pqueue, target_cpu); queue = per_cpu_ptr(pd->pqueue, target_cpu);
...@@ -152,7 +148,7 @@ int padata_do_parallel(struct padata_instance *pinst, ...@@ -152,7 +148,7 @@ int padata_do_parallel(struct padata_instance *pinst,
list_add_tail(&padata->list, &queue->parallel.list); list_add_tail(&padata->list, &queue->parallel.list);
spin_unlock(&queue->parallel.lock); spin_unlock(&queue->parallel.lock);
queue_work_on(target_cpu, pinst->parallel_wq, &queue->work); queue_work(pinst->parallel_wq, &queue->work);
out: out:
rcu_read_unlock_bh(); rcu_read_unlock_bh();
...@@ -162,21 +158,19 @@ int padata_do_parallel(struct padata_instance *pinst, ...@@ -162,21 +158,19 @@ int padata_do_parallel(struct padata_instance *pinst,
EXPORT_SYMBOL(padata_do_parallel); EXPORT_SYMBOL(padata_do_parallel);
/* /*
* padata_get_next - Get the next object that needs serialization. * padata_find_next - Find the next object that needs serialization.
* *
* Return values are: * Return values are:
* *
* A pointer to the control struct of the next object that needs * A pointer to the control struct of the next object that needs
* serialization, if present in one of the percpu reorder queues. * serialization, if present in one of the percpu reorder queues.
* *
* -EINPROGRESS, if the next object that needs serialization will * NULL, if the next object that needs serialization will
* be parallel processed by another cpu and is not yet present in * be parallel processed by another cpu and is not yet present in
* the cpu's reorder queue. * the cpu's reorder queue.
*
* -ENODATA, if this cpu has to do the parallel processing for
* the next object.
*/ */
static struct padata_priv *padata_get_next(struct parallel_data *pd) static struct padata_priv *padata_find_next(struct parallel_data *pd,
bool remove_object)
{ {
struct padata_parallel_queue *next_queue; struct padata_parallel_queue *next_queue;
struct padata_priv *padata; struct padata_priv *padata;
...@@ -187,28 +181,30 @@ static struct padata_priv *padata_get_next(struct parallel_data *pd) ...@@ -187,28 +181,30 @@ static struct padata_priv *padata_get_next(struct parallel_data *pd)
reorder = &next_queue->reorder; reorder = &next_queue->reorder;
spin_lock(&reorder->lock); spin_lock(&reorder->lock);
if (!list_empty(&reorder->list)) { if (list_empty(&reorder->list)) {
padata = list_entry(reorder->list.next, spin_unlock(&reorder->lock);
struct padata_priv, list); return NULL;
}
list_del_init(&padata->list);
atomic_dec(&pd->reorder_objects);
pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1, padata = list_entry(reorder->list.next, struct padata_priv, list);
false);
/*
* Checks the rare case where two or more parallel jobs have hashed to
* the same CPU and one of the later ones finishes first.
*/
if (padata->seq_nr != pd->processed) {
spin_unlock(&reorder->lock); spin_unlock(&reorder->lock);
goto out; return NULL;
} }
spin_unlock(&reorder->lock);
if (__this_cpu_read(pd->pqueue->cpu_index) == next_queue->cpu_index) { if (remove_object) {
padata = ERR_PTR(-ENODATA); list_del_init(&padata->list);
goto out; atomic_dec(&pd->reorder_objects);
++pd->processed;
pd->cpu = cpumask_next_wrap(cpu, pd->cpumask.pcpu, -1, false);
} }
padata = ERR_PTR(-EINPROGRESS); spin_unlock(&reorder->lock);
out:
return padata; return padata;
} }
...@@ -234,26 +230,16 @@ static void padata_reorder(struct parallel_data *pd) ...@@ -234,26 +230,16 @@ static void padata_reorder(struct parallel_data *pd)
return; return;
while (1) { while (1) {
padata = padata_get_next(pd); padata = padata_find_next(pd, true);
/* /*
* If the next object that needs serialization is parallel * If the next object that needs serialization is parallel
* processed by another cpu and is still on it's way to the * processed by another cpu and is still on it's way to the
* cpu's reorder queue, nothing to do for now. * cpu's reorder queue, nothing to do for now.
*/ */
if (PTR_ERR(padata) == -EINPROGRESS) if (!padata)
break; break;
/*
* This cpu has to do the parallel processing of the next
* object. It's waiting in the cpu's parallelization queue,
* so exit immediately.
*/
if (PTR_ERR(padata) == -ENODATA) {
spin_unlock_bh(&pd->lock);
return;
}
cb_cpu = padata->cb_cpu; cb_cpu = padata->cb_cpu;
squeue = per_cpu_ptr(pd->squeue, cb_cpu); squeue = per_cpu_ptr(pd->squeue, cb_cpu);
...@@ -277,7 +263,8 @@ static void padata_reorder(struct parallel_data *pd) ...@@ -277,7 +263,8 @@ static void padata_reorder(struct parallel_data *pd)
smp_mb(); smp_mb();
next_queue = per_cpu_ptr(pd->pqueue, pd->cpu); next_queue = per_cpu_ptr(pd->pqueue, pd->cpu);
if (!list_empty(&next_queue->reorder.list)) if (!list_empty(&next_queue->reorder.list) &&
padata_find_next(pd, false))
queue_work(pinst->serial_wq, &pd->reorder_work); queue_work(pinst->serial_wq, &pd->reorder_work);
} }
...@@ -332,9 +319,14 @@ void padata_do_serial(struct padata_priv *padata) ...@@ -332,9 +319,14 @@ void padata_do_serial(struct padata_priv *padata)
struct parallel_data *pd = padata->pd; struct parallel_data *pd = padata->pd;
struct padata_parallel_queue *pqueue = per_cpu_ptr(pd->pqueue, struct padata_parallel_queue *pqueue = per_cpu_ptr(pd->pqueue,
padata->cpu); padata->cpu);
struct padata_priv *cur;
spin_lock(&pqueue->reorder.lock); spin_lock(&pqueue->reorder.lock);
list_add_tail(&padata->list, &pqueue->reorder.list); /* Sort in ascending order of sequence number. */
list_for_each_entry_reverse(cur, &pqueue->reorder.list, list)
if (cur->seq_nr < padata->seq_nr)
break;
list_add(&padata->list, &cur->list);
atomic_inc(&pd->reorder_objects); atomic_inc(&pd->reorder_objects);
spin_unlock(&pqueue->reorder.lock); spin_unlock(&pqueue->reorder.lock);
...@@ -353,17 +345,36 @@ static int padata_setup_cpumasks(struct parallel_data *pd, ...@@ -353,17 +345,36 @@ static int padata_setup_cpumasks(struct parallel_data *pd,
const struct cpumask *pcpumask, const struct cpumask *pcpumask,
const struct cpumask *cbcpumask) const struct cpumask *cbcpumask)
{ {
if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL)) struct workqueue_attrs *attrs;
return -ENOMEM; int err = -ENOMEM;
if (!alloc_cpumask_var(&pd->cpumask.pcpu, GFP_KERNEL))
goto out;
cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask); cpumask_and(pd->cpumask.pcpu, pcpumask, cpu_online_mask);
if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL)) {
free_cpumask_var(pd->cpumask.pcpu);
return -ENOMEM;
}
if (!alloc_cpumask_var(&pd->cpumask.cbcpu, GFP_KERNEL))
goto free_pcpu_mask;
cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask); cpumask_and(pd->cpumask.cbcpu, cbcpumask, cpu_online_mask);
attrs = alloc_workqueue_attrs();
if (!attrs)
goto free_cbcpu_mask;
/* Restrict parallel_wq workers to pd->cpumask.pcpu. */
cpumask_copy(attrs->cpumask, pd->cpumask.pcpu);
err = apply_workqueue_attrs(pd->pinst->parallel_wq, attrs);
free_workqueue_attrs(attrs);
if (err < 0)
goto free_cbcpu_mask;
return 0; return 0;
free_cbcpu_mask:
free_cpumask_var(pd->cpumask.cbcpu);
free_pcpu_mask:
free_cpumask_var(pd->cpumask.pcpu);
out:
return err;
} }
static void __padata_list_init(struct padata_list *pd_list) static void __padata_list_init(struct padata_list *pd_list)
...@@ -429,6 +440,8 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst, ...@@ -429,6 +440,8 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
pd->squeue = alloc_percpu(struct padata_serial_queue); pd->squeue = alloc_percpu(struct padata_serial_queue);
if (!pd->squeue) if (!pd->squeue)
goto err_free_pqueue; goto err_free_pqueue;
pd->pinst = pinst;
if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0) if (padata_setup_cpumasks(pd, pcpumask, cbcpumask) < 0)
goto err_free_squeue; goto err_free_squeue;
...@@ -437,7 +450,6 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst, ...@@ -437,7 +450,6 @@ static struct parallel_data *padata_alloc_pd(struct padata_instance *pinst,
atomic_set(&pd->seq_nr, -1); atomic_set(&pd->seq_nr, -1);
atomic_set(&pd->reorder_objects, 0); atomic_set(&pd->reorder_objects, 0);
atomic_set(&pd->refcnt, 0); atomic_set(&pd->refcnt, 0);
pd->pinst = pinst;
spin_lock_init(&pd->lock); spin_lock_init(&pd->lock);
pd->cpu = cpumask_first(pd->cpumask.pcpu); pd->cpu = cpumask_first(pd->cpumask.pcpu);
INIT_WORK(&pd->reorder_work, invoke_padata_reorder); INIT_WORK(&pd->reorder_work, invoke_padata_reorder);
...@@ -968,8 +980,8 @@ static struct padata_instance *padata_alloc(const char *name, ...@@ -968,8 +980,8 @@ static struct padata_instance *padata_alloc(const char *name,
if (!pinst) if (!pinst)
goto err; goto err;
pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_MEM_RECLAIM | pinst->parallel_wq = alloc_workqueue("%s_parallel", WQ_UNBOUND, 0,
WQ_CPU_INTENSIVE, 1, name); name);
if (!pinst->parallel_wq) if (!pinst->parallel_wq)
goto err_free_inst; goto err_free_inst;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment