Commit c2a8ec07 authored by Paul E. McKenney's avatar Paul E. McKenney

srcu: Move to state-based grace-period sequencing

The current SRCU grace-period processing might never reach the last
portion of srcu_advance_batches().  This is OK given the current
implementation, as the first portion, up to the try_check_zero()
following the srcu_flip() is sufficient to drive grace periods forward.
However, it has the unfortunate side-effect of making it impossible to
determine when a given grace period has ended, and it will be necessary
to efficiently trace ends of grace periods in order to efficiently handle
per-CPU SRCU callback lists.

This commit therefore adds states to the SRCU grace-period processing,
so that the end of a given SRCU grace period is marked by the transition
to the SRCU_STATE_DONE state.
Signed-off-by: default avatarPaul E. McKenney <paulmck@linux.vnet.ibm.com>
parent c6e56f59
...@@ -48,7 +48,7 @@ struct srcu_struct { ...@@ -48,7 +48,7 @@ struct srcu_struct {
unsigned long completed; unsigned long completed;
struct srcu_array __percpu *per_cpu_ref; struct srcu_array __percpu *per_cpu_ref;
spinlock_t queue_lock; /* protect ->batch_queue, ->running */ spinlock_t queue_lock; /* protect ->batch_queue, ->running */
bool running; int srcu_state;
/* callbacks just queued */ /* callbacks just queued */
struct rcu_batch batch_queue; struct rcu_batch batch_queue;
/* callbacks try to do the first check_zero */ /* callbacks try to do the first check_zero */
...@@ -62,6 +62,12 @@ struct srcu_struct { ...@@ -62,6 +62,12 @@ struct srcu_struct {
#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */ #endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
}; };
/* Values for -> state variable. */
#define SRCU_STATE_IDLE 0
#define SRCU_STATE_SCAN1 1
#define SRCU_STATE_SCAN2 2
#define SRCU_STATE_DONE 3
#ifdef CONFIG_DEBUG_LOCK_ALLOC #ifdef CONFIG_DEBUG_LOCK_ALLOC
int __init_srcu_struct(struct srcu_struct *sp, const char *name, int __init_srcu_struct(struct srcu_struct *sp, const char *name,
...@@ -89,7 +95,7 @@ void process_srcu(struct work_struct *work); ...@@ -89,7 +95,7 @@ void process_srcu(struct work_struct *work);
.completed = -300, \ .completed = -300, \
.per_cpu_ref = &name##_srcu_array, \ .per_cpu_ref = &name##_srcu_array, \
.queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \ .queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \
.running = false, \ .srcu_state = SRCU_STATE_IDLE, \
.batch_queue = RCU_BATCH_INIT(name.batch_queue), \ .batch_queue = RCU_BATCH_INIT(name.batch_queue), \
.batch_check0 = RCU_BATCH_INIT(name.batch_check0), \ .batch_check0 = RCU_BATCH_INIT(name.batch_check0), \
.batch_check1 = RCU_BATCH_INIT(name.batch_check1), \ .batch_check1 = RCU_BATCH_INIT(name.batch_check1), \
......
...@@ -111,7 +111,7 @@ static int init_srcu_struct_fields(struct srcu_struct *sp) ...@@ -111,7 +111,7 @@ static int init_srcu_struct_fields(struct srcu_struct *sp)
{ {
sp->completed = 0; sp->completed = 0;
spin_lock_init(&sp->queue_lock); spin_lock_init(&sp->queue_lock);
sp->running = false; sp->srcu_state = SRCU_STATE_IDLE;
rcu_batch_init(&sp->batch_queue); rcu_batch_init(&sp->batch_queue);
rcu_batch_init(&sp->batch_check0); rcu_batch_init(&sp->batch_check0);
rcu_batch_init(&sp->batch_check1); rcu_batch_init(&sp->batch_check1);
...@@ -270,7 +270,7 @@ void cleanup_srcu_struct(struct srcu_struct *sp) ...@@ -270,7 +270,7 @@ void cleanup_srcu_struct(struct srcu_struct *sp)
if (WARN_ON(!rcu_all_batches_empty(sp))) if (WARN_ON(!rcu_all_batches_empty(sp)))
return; /* Leakage unless caller handles error. */ return; /* Leakage unless caller handles error. */
flush_delayed_work(&sp->work); flush_delayed_work(&sp->work);
if (WARN_ON(sp->running)) if (WARN_ON(READ_ONCE(sp->srcu_state) != SRCU_STATE_IDLE))
return; /* Caller forgot to stop doing call_srcu()? */ return; /* Caller forgot to stop doing call_srcu()? */
free_percpu(sp->per_cpu_ref); free_percpu(sp->per_cpu_ref);
sp->per_cpu_ref = NULL; sp->per_cpu_ref = NULL;
...@@ -391,8 +391,8 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head, ...@@ -391,8 +391,8 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
spin_lock_irqsave(&sp->queue_lock, flags); spin_lock_irqsave(&sp->queue_lock, flags);
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
rcu_batch_queue(&sp->batch_queue, head); rcu_batch_queue(&sp->batch_queue, head);
if (!sp->running) { if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
sp->running = true; WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1);
queue_delayed_work(system_power_efficient_wq, &sp->work, 0); queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
} }
spin_unlock_irqrestore(&sp->queue_lock, flags); spin_unlock_irqrestore(&sp->queue_lock, flags);
...@@ -424,9 +424,9 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount) ...@@ -424,9 +424,9 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
head->func = wakeme_after_rcu; head->func = wakeme_after_rcu;
spin_lock_irq(&sp->queue_lock); spin_lock_irq(&sp->queue_lock);
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */ smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
if (!sp->running) { if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
/* steal the processing owner */ /* steal the processing owner */
sp->running = true; WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1);
rcu_batch_queue(&sp->batch_check0, head); rcu_batch_queue(&sp->batch_check0, head);
spin_unlock_irq(&sp->queue_lock); spin_unlock_irq(&sp->queue_lock);
/* give the processing owner to work_struct */ /* give the processing owner to work_struct */
...@@ -548,7 +548,9 @@ static void srcu_collect_new(struct srcu_struct *sp) ...@@ -548,7 +548,9 @@ static void srcu_collect_new(struct srcu_struct *sp)
*/ */
static void srcu_advance_batches(struct srcu_struct *sp, int trycount) static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
{ {
int idx = 1 ^ (sp->completed & 1); int idx;
WARN_ON_ONCE(sp->srcu_state == SRCU_STATE_IDLE);
/* /*
* Because readers might be delayed for an extended period after * Because readers might be delayed for an extended period after
...@@ -558,48 +560,56 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount) ...@@ -558,48 +560,56 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
* invoking a callback. * invoking a callback.
*/ */
if (rcu_batch_empty(&sp->batch_check0) && if (sp->srcu_state == SRCU_STATE_DONE)
rcu_batch_empty(&sp->batch_check1)) WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1);
return; /* no callbacks need to be advanced */
if (sp->srcu_state == SRCU_STATE_SCAN1) {
if (!try_check_zero(sp, idx, trycount)) idx = 1 ^ (sp->completed & 1);
return; /* failed to advance, will try after SRCU_INTERVAL */ if (!try_check_zero(sp, idx, trycount))
return; /* readers present, retry after SRCU_INTERVAL */
/*
* The callbacks in ->batch_check1 have already done with their /*
* first zero check and flip back when they were enqueued on * The callbacks in ->batch_check1 have already done
* ->batch_check0 in a previous invocation of srcu_advance_batches(). * with their first zero check and flip back when they were
* (Presumably try_check_zero() returned false during that * enqueued on ->batch_check0 in a previous invocation of
* invocation, leaving the callbacks stranded on ->batch_check1.) * srcu_advance_batches(). (Presumably try_check_zero()
* They are therefore ready to invoke, so move them to ->batch_done. * returned false during that invocation, leaving the
*/ * callbacks stranded on ->batch_check1.) They are therefore
rcu_batch_move(&sp->batch_done, &sp->batch_check1); * ready to invoke, so move them to ->batch_done.
*/
if (rcu_batch_empty(&sp->batch_check0)) rcu_batch_move(&sp->batch_done, &sp->batch_check1);
return; /* no callbacks need to be advanced */ srcu_flip(sp);
srcu_flip(sp);
/*
/* * The callbacks in ->batch_check0 just finished their
* The callbacks in ->batch_check0 just finished their * first check zero and flip, so move them to ->batch_check1
* first check zero and flip, so move them to ->batch_check1 * for future checking on the other idx.
* for future checking on the other idx. */
*/ rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN2);
/* }
* SRCU read-side critical sections are normally short, so check
* at least twice in quick succession after a flip.
*/
trycount = trycount < 2 ? 2 : trycount;
if (!try_check_zero(sp, idx^1, trycount))
return; /* failed to advance, will try after SRCU_INTERVAL */
/* if (sp->srcu_state == SRCU_STATE_SCAN2) {
* The callbacks in ->batch_check1 have now waited for all
* pre-existing readers using both idx values. They are therefore /*
* ready to invoke, so move them to ->batch_done. * SRCU read-side critical sections are normally short,
*/ * so check at least twice in quick succession after a flip.
rcu_batch_move(&sp->batch_done, &sp->batch_check1); */
idx = 1 ^ (sp->completed & 1);
trycount = trycount < 2 ? 2 : trycount;
if (!try_check_zero(sp, idx, trycount))
return; /* readers present, retry after SRCU_INTERVAL */
/*
* The callbacks in ->batch_check1 have now waited for
* all pre-existing readers using both idx values. They are
* therefore ready to invoke, so move them to ->batch_done.
*/
rcu_batch_move(&sp->batch_done, &sp->batch_check1);
WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE);
}
} }
/* /*
...@@ -633,8 +643,9 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay) ...@@ -633,8 +643,9 @@ static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
if (rcu_all_batches_empty(sp)) { if (rcu_all_batches_empty(sp)) {
spin_lock_irq(&sp->queue_lock); spin_lock_irq(&sp->queue_lock);
if (rcu_all_batches_empty(sp)) { if (rcu_all_batches_empty(sp) &&
sp->running = false; READ_ONCE(sp->srcu_state) == SRCU_STATE_DONE) {
WRITE_ONCE(sp->srcu_state, SRCU_STATE_IDLE);
pending = false; pending = false;
} }
spin_unlock_irq(&sp->queue_lock); spin_unlock_irq(&sp->queue_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment