Commit f51164a8 authored by Paul E. McKenney's avatar Paul E. McKenney

rcu: Employ jiffies-based backstop to callback time limit

Currently, if there are more than 100 ready-to-invoke RCU callbacks queued
on a given CPU, the rcu_do_batch() function sets a timeout for invocation
of the series.  This timeout defaulting to three milliseconds, and may
be adjusted using the rcutree.rcu_resched_ns kernel boot parameter.
This timeout is checked using local_clock(), but the overhead of this
function combined with the common-case very small callback-invocation
overhead means that local_clock() is checked every 32nd invocation.

This works well except for longer-than average callbacks.  For example,
a series of 500-microsecond-duration callbacks means that local_clock()
is checked only once every 16 milliseconds, which makes it difficult to
enforce a three-millisecond timeout.

This commit therefore adds a Kconfig option RCU_DOUBLE_CHECK_CB_TIME
that enables backup timeout checking using the coarser grained but
lighter weight jiffies.  If the jiffies counter detects a timeout,
then local_clock() is consulted even if this is not the 32nd callback.
This prevents the aforementioned 16-millisecond latency blow.
Reported-by: default avatarDomas Mituzas <dmituzas@meta.com>
Signed-off-by: default avatarPaul E. McKenney <paulmck@kernel.org>
parent fea1c1f0
...@@ -314,4 +314,22 @@ config RCU_LAZY ...@@ -314,4 +314,22 @@ config RCU_LAZY
To save power, batch RCU callbacks and flush after delay, memory To save power, batch RCU callbacks and flush after delay, memory
pressure, or callback list growing too big. pressure, or callback list growing too big.
config RCU_DOUBLE_CHECK_CB_TIME
bool "RCU callback-batch backup time check"
depends on RCU_EXPERT
default n
help
Use this option to provide more precise enforcement of the
rcutree.rcu_resched_ns module parameter in situations where
a single RCU callback might run for hundreds of microseconds,
thus defeating the 32-callback batching used to amortize the
cost of the fine-grained but expensive local_clock() function.
This option rounds rcutree.rcu_resched_ns up to the next
jiffy, and overrides the 32-callback batching if this limit
is exceeded.
Say Y here if you need tighter callback-limit enforcement.
Say N here if you are unsure.
endmenu # "RCU Subsystem" endmenu # "RCU Subsystem"
...@@ -2047,10 +2047,15 @@ rcu_check_quiescent_state(struct rcu_data *rdp) ...@@ -2047,10 +2047,15 @@ rcu_check_quiescent_state(struct rcu_data *rdp)
} }
/* Return true if callback-invocation time limit exceeded. */ /* Return true if callback-invocation time limit exceeded. */
static bool rcu_do_batch_check_time(long count, long tlimit) static bool rcu_do_batch_check_time(long count, long tlimit,
bool jlimit_check, unsigned long jlimit)
{ {
// Invoke local_clock() only once per 32 consecutive callbacks. // Invoke local_clock() only once per 32 consecutive callbacks.
return unlikely(tlimit) && !likely(count & 31) && local_clock() >= tlimit; return unlikely(tlimit) &&
(!likely(count & 31) ||
(IS_ENABLED(CONFIG_RCU_DOUBLE_CHECK_CB_TIME) &&
jlimit_check && time_after(jiffies, jlimit))) &&
local_clock() >= tlimit;
} }
/* /*
...@@ -2059,13 +2064,17 @@ static bool rcu_do_batch_check_time(long count, long tlimit) ...@@ -2059,13 +2064,17 @@ static bool rcu_do_batch_check_time(long count, long tlimit)
*/ */
static void rcu_do_batch(struct rcu_data *rdp) static void rcu_do_batch(struct rcu_data *rdp)
{ {
long bl;
long count = 0;
int div; int div;
bool __maybe_unused empty; bool __maybe_unused empty;
unsigned long flags; unsigned long flags;
struct rcu_head *rhp; unsigned long jlimit;
bool jlimit_check = false;
long pending;
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl); struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
long bl, count = 0; struct rcu_head *rhp;
long pending, tlimit = 0; long tlimit = 0;
/* If no callbacks are ready, just return. */ /* If no callbacks are ready, just return. */
if (!rcu_segcblist_ready_cbs(&rdp->cblist)) { if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
...@@ -2090,11 +2099,14 @@ static void rcu_do_batch(struct rcu_data *rdp) ...@@ -2090,11 +2099,14 @@ static void rcu_do_batch(struct rcu_data *rdp)
div = div < 0 ? 7 : div > sizeof(long) * 8 - 2 ? sizeof(long) * 8 - 2 : div; div = div < 0 ? 7 : div > sizeof(long) * 8 - 2 ? sizeof(long) * 8 - 2 : div;
bl = max(rdp->blimit, pending >> div); bl = max(rdp->blimit, pending >> div);
if ((in_serving_softirq() || rdp->rcu_cpu_kthread_status == RCU_KTHREAD_RUNNING) && if ((in_serving_softirq() || rdp->rcu_cpu_kthread_status == RCU_KTHREAD_RUNNING) &&
unlikely(bl > 100)) { (IS_ENABLED(CONFIG_RCU_DOUBLE_CHECK_CB_TIME) || unlikely(bl > 100))) {
const long npj = NSEC_PER_SEC / HZ;
long rrn = READ_ONCE(rcu_resched_ns); long rrn = READ_ONCE(rcu_resched_ns);
rrn = rrn < NSEC_PER_MSEC ? NSEC_PER_MSEC : rrn > NSEC_PER_SEC ? NSEC_PER_SEC : rrn; rrn = rrn < NSEC_PER_MSEC ? NSEC_PER_MSEC : rrn > NSEC_PER_SEC ? NSEC_PER_SEC : rrn;
tlimit = local_clock() + rrn; tlimit = local_clock() + rrn;
jlimit = jiffies + (rrn + npj + 1) / npj;
jlimit_check = true;
} }
trace_rcu_batch_start(rcu_state.name, trace_rcu_batch_start(rcu_state.name,
rcu_segcblist_n_cbs(&rdp->cblist), bl); rcu_segcblist_n_cbs(&rdp->cblist), bl);
...@@ -2134,7 +2146,7 @@ static void rcu_do_batch(struct rcu_data *rdp) ...@@ -2134,7 +2146,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
* Make sure we don't spend too much time here and deprive other * Make sure we don't spend too much time here and deprive other
* softirq vectors of CPU cycles. * softirq vectors of CPU cycles.
*/ */
if (rcu_do_batch_check_time(count, tlimit)) if (rcu_do_batch_check_time(count, tlimit, jlimit_check, jlimit))
break; break;
} else { } else {
// In rcuc/rcuoc context, so no worries about // In rcuc/rcuoc context, so no worries about
...@@ -2147,7 +2159,7 @@ static void rcu_do_batch(struct rcu_data *rdp) ...@@ -2147,7 +2159,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
// But rcuc kthreads can delay quiescent-state // But rcuc kthreads can delay quiescent-state
// reporting, so check time limits for them. // reporting, so check time limits for them.
if (rdp->rcu_cpu_kthread_status == RCU_KTHREAD_RUNNING && if (rdp->rcu_cpu_kthread_status == RCU_KTHREAD_RUNNING &&
rcu_do_batch_check_time(count, tlimit)) { rcu_do_batch_check_time(count, tlimit, jlimit_check, jlimit)) {
rdp->rcu_cpu_has_work = 1; rdp->rcu_cpu_has_work = 1;
break; break;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment