Commit 720e8a63 authored by Andrew Morton's avatar Andrew Morton Committed by Linus Torvalds

[PATCH] rcu lock update: Use a sequence lock for starting batches

From: Manfred Spraul <manfred@colorfullife.com>

Step two for reducing cacheline trashing within rcupdate.c:

rcu_process_callbacks always acquires rcu_ctrlblk.state.mutex and calls
rcu_start_batch, even if the batch is already running or already scheduled to
run.

This can be avoided with a sequence lock: A sequence lock allows to read the
current batch number and next_pending atomically.  If next_pending is already
set, then there is no need to acquire the global mutex.

This means that for each grace period, there will be

- one write access to the rcu_ctrlblk.batch cacheline

- lots of read accesses to rcu_ctrlblk.batch (3-10*cpus_online()).  Behavior
  similar to the jiffies cacheline, shouldn't be a problem.

- cpus_online()+1 write accesses to rcu_ctrlblk.state, all of them starting
  with spin_lock(&rcu_ctrlblk.state.mutex).

  For large enough cpus_online() this will be a problem, but all except two
  of the spin_lock calls only protect the rcu_cpu_mask bitmap, thus a
  hierarchical bitmap would allow to split the write accesses to multiple
  cachelines.

Tested on an 8-way with reaim.  Unfortunately it probably won't help with Jack
Steiner's 'ls' test since in this test only one cpu generates rcu entries.
Signed-off-by: default avatarManfred Spraul <manfred@colorfullife.com>
Signed-off-by: default avatarAndrew Morton <akpm@osdl.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@osdl.org>
parent 5c60169a
......@@ -41,6 +41,7 @@
#include <linux/threads.h>
#include <linux/percpu.h>
#include <linux/cpumask.h>
#include <linux/seqlock.h>
/**
* struct rcu_head - callback structure for use with RCU
......@@ -69,11 +70,14 @@ struct rcu_ctrlblk {
struct {
long cur; /* Current batch number. */
long completed; /* Number of the last completed batch */
int next_pending; /* Is the next batch already waiting? */
seqcount_t lock; /* for atomically reading cur and */
/* next_pending. Spinlock not used, */
/* protected by state.mutex */
} batch ____cacheline_maxaligned_in_smp;
/* remaining members: bookkeeping of the progress of the grace period */
struct {
spinlock_t mutex; /* Guard this struct */
int next_pending; /* Is the next batch already waiting? */
cpumask_t rcu_cpu_mask; /* CPUs that need to switch */
/* in order for current batch to proceed. */
} state ____cacheline_maxaligned_in_smp;
......
......@@ -47,7 +47,7 @@
/* Definition for rcupdate control block. */
struct rcu_ctrlblk rcu_ctrlblk =
{ .batch = { .cur = -300, .completed = -300 },
{ .batch = { .cur = -300, .completed = -300 , .lock = SEQCNT_ZERO },
.state = {.mutex = SPIN_LOCK_UNLOCKED, .rcu_cpu_mask = CPU_MASK_NONE } };
DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
......@@ -124,16 +124,18 @@ static void rcu_start_batch(int next_pending)
cpumask_t active;
if (next_pending)
rcu_ctrlblk.state.next_pending = 1;
rcu_ctrlblk.batch.next_pending = 1;
if (rcu_ctrlblk.state.next_pending &&
if (rcu_ctrlblk.batch.next_pending &&
rcu_ctrlblk.batch.completed == rcu_ctrlblk.batch.cur) {
rcu_ctrlblk.state.next_pending = 0;
/* Can't change, since spin lock held. */
active = nohz_cpu_mask;
cpus_complement(active);
cpus_and(rcu_ctrlblk.state.rcu_cpu_mask, cpu_online_map, active);
write_seqcount_begin(&rcu_ctrlblk.batch.lock);
rcu_ctrlblk.batch.next_pending = 0;
rcu_ctrlblk.batch.cur++;
write_seqcount_end(&rcu_ctrlblk.batch.lock);
}
}
......@@ -261,6 +263,8 @@ static void rcu_process_callbacks(unsigned long unused)
local_irq_disable();
if (!list_empty(&RCU_nxtlist(cpu)) && list_empty(&RCU_curlist(cpu))) {
int next_pending, seq;
__list_splice(&RCU_nxtlist(cpu), &RCU_curlist(cpu));
INIT_LIST_HEAD(&RCU_nxtlist(cpu));
local_irq_enable();
......@@ -268,10 +272,19 @@ static void rcu_process_callbacks(unsigned long unused)
/*
* start the next batch of callbacks
*/
spin_lock(&rcu_ctrlblk.state.mutex);
RCU_batch(cpu) = rcu_ctrlblk.batch.cur + 1;
rcu_start_batch(1);
spin_unlock(&rcu_ctrlblk.state.mutex);
do {
seq = read_seqcount_begin(&rcu_ctrlblk.batch.lock);
/* determine batch number */
RCU_batch(cpu) = rcu_ctrlblk.batch.cur + 1;
next_pending = rcu_ctrlblk.batch.next_pending;
} while (read_seqcount_retry(&rcu_ctrlblk.batch.lock, seq));
if (!next_pending) {
/* and start it/schedule start if it's a new batch */
spin_lock(&rcu_ctrlblk.state.mutex);
rcu_start_batch(1);
spin_unlock(&rcu_ctrlblk.state.mutex);
}
} else {
local_irq_enable();
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment