Commit 65a909fb authored by Trond Myklebust's avatar Trond Myklebust

RPC: Remove the rpc_queue_lock global spinlock. Replace it with per-rpc_queue

     spinlocks.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@fys.uio.no>
parent ccc62d90
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <linux/timer.h> #include <linux/timer.h>
#include <linux/sunrpc/types.h> #include <linux/sunrpc/types.h>
#include <linux/spinlock.h>
#include <linux/wait.h> #include <linux/wait.h>
#include <linux/workqueue.h> #include <linux/workqueue.h>
#include <linux/sunrpc/xdr.h> #include <linux/sunrpc/xdr.h>
...@@ -185,6 +186,7 @@ typedef void (*rpc_action)(struct rpc_task *); ...@@ -185,6 +186,7 @@ typedef void (*rpc_action)(struct rpc_task *);
* RPC synchronization objects * RPC synchronization objects
*/ */
struct rpc_wait_queue { struct rpc_wait_queue {
spinlock_t lock;
struct list_head tasks[RPC_NR_PRIORITY]; /* task queue for each priority level */ struct list_head tasks[RPC_NR_PRIORITY]; /* task queue for each priority level */
unsigned long cookie; /* cookie of last task serviced */ unsigned long cookie; /* cookie of last task serviced */
unsigned char maxpriority; /* maximum priority (0 if queue is not a priority queue) */ unsigned char maxpriority; /* maximum priority (0 if queue is not a priority queue) */
...@@ -205,6 +207,7 @@ struct rpc_wait_queue { ...@@ -205,6 +207,7 @@ struct rpc_wait_queue {
#ifndef RPC_DEBUG #ifndef RPC_DEBUG
# define RPC_WAITQ_INIT(var,qname) { \ # define RPC_WAITQ_INIT(var,qname) { \
.lock = SPIN_LOCK_UNLOCKED, \
.tasks = { \ .tasks = { \
[0] = LIST_HEAD_INIT(var.tasks[0]), \ [0] = LIST_HEAD_INIT(var.tasks[0]), \
[1] = LIST_HEAD_INIT(var.tasks[1]), \ [1] = LIST_HEAD_INIT(var.tasks[1]), \
...@@ -213,6 +216,7 @@ struct rpc_wait_queue { ...@@ -213,6 +216,7 @@ struct rpc_wait_queue {
} }
#else #else
# define RPC_WAITQ_INIT(var,qname) { \ # define RPC_WAITQ_INIT(var,qname) { \
.lock = SPIN_LOCK_UNLOCKED, \
.tasks = { \ .tasks = { \
[0] = LIST_HEAD_INIT(var.tasks[0]), \ [0] = LIST_HEAD_INIT(var.tasks[0]), \
[1] = LIST_HEAD_INIT(var.tasks[1]), \ [1] = LIST_HEAD_INIT(var.tasks[1]), \
......
...@@ -68,11 +68,6 @@ static DECLARE_MUTEX(rpciod_sema); ...@@ -68,11 +68,6 @@ static DECLARE_MUTEX(rpciod_sema);
static unsigned int rpciod_users; static unsigned int rpciod_users;
static struct workqueue_struct *rpciod_workqueue; static struct workqueue_struct *rpciod_workqueue;
/*
* Spinlock for wait queues. Access to the latter also has to be
* interrupt-safe in order to allow timers to wake up sleeping tasks.
*/
static spinlock_t rpc_queue_lock = SPIN_LOCK_UNLOCKED;
/* /*
* Spinlock for other critical sections of code. * Spinlock for other critical sections of code.
*/ */
...@@ -80,7 +75,7 @@ static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED; ...@@ -80,7 +75,7 @@ static spinlock_t rpc_sched_lock = SPIN_LOCK_UNLOCKED;
/* /*
* Disable the timer for a given RPC task. Should be called with * Disable the timer for a given RPC task. Should be called with
* rpc_queue_lock and bh_disabled in order to avoid races within * queue->lock and bh_disabled in order to avoid races within
* rpc_run_timer(). * rpc_run_timer().
*/ */
static inline void static inline void
...@@ -131,7 +126,7 @@ __rpc_add_timer(struct rpc_task *task, rpc_action timer) ...@@ -131,7 +126,7 @@ __rpc_add_timer(struct rpc_task *task, rpc_action timer)
/* /*
* Delete any timer for the current task. Because we use del_timer_sync(), * Delete any timer for the current task. Because we use del_timer_sync(),
* this function should never be called while holding rpc_queue_lock. * this function should never be called while holding queue->lock.
*/ */
static inline void static inline void
rpc_delete_timer(struct rpc_task *task) rpc_delete_timer(struct rpc_task *task)
...@@ -240,6 +235,7 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c ...@@ -240,6 +235,7 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
{ {
int i; int i;
spin_lock_init(&queue->lock);
for (i = 0; i < ARRAY_SIZE(queue->tasks); i++) for (i = 0; i < ARRAY_SIZE(queue->tasks); i++)
INIT_LIST_HEAD(&queue->tasks[i]); INIT_LIST_HEAD(&queue->tasks[i]);
queue->maxpriority = maxprio; queue->maxpriority = maxprio;
...@@ -330,23 +326,22 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, ...@@ -330,23 +326,22 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
__rpc_add_timer(task, timer); __rpc_add_timer(task, timer);
} }
void void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
rpc_action action, rpc_action timer) rpc_action action, rpc_action timer)
{ {
/* /*
* Protect the queue operations. * Protect the queue operations.
*/ */
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&q->lock);
__rpc_sleep_on(q, task, action, timer); __rpc_sleep_on(q, task, action, timer);
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&q->lock);
} }
/** /**
* __rpc_do_wake_up_task - wake up a single rpc_task * __rpc_do_wake_up_task - wake up a single rpc_task
* @task: task to be woken up * @task: task to be woken up
* *
* Caller must hold rpc_queue_lock, and have cleared the task queued flag. * Caller must hold queue->lock, and have cleared the task queued flag.
*/ */
static void __rpc_do_wake_up_task(struct rpc_task *task) static void __rpc_do_wake_up_task(struct rpc_task *task)
{ {
...@@ -404,9 +399,11 @@ void rpc_wake_up_task(struct rpc_task *task) ...@@ -404,9 +399,11 @@ void rpc_wake_up_task(struct rpc_task *task)
{ {
if (rpc_start_wakeup(task)) { if (rpc_start_wakeup(task)) {
if (RPC_IS_QUEUED(task)) { if (RPC_IS_QUEUED(task)) {
spin_lock_bh(&rpc_queue_lock); struct rpc_wait_queue *queue = task->u.tk_wait.rpc_waitq;
spin_lock_bh(&queue->lock);
__rpc_do_wake_up_task(task); __rpc_do_wake_up_task(task);
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&queue->lock);
} }
rpc_finish_wakeup(task); rpc_finish_wakeup(task);
} }
...@@ -472,14 +469,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) ...@@ -472,14 +469,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
struct rpc_task *task = NULL; struct rpc_task *task = NULL;
dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue)); dprintk("RPC: wake_up_next(%p \"%s\")\n", queue, rpc_qname(queue));
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&queue->lock);
if (RPC_IS_PRIORITY(queue)) if (RPC_IS_PRIORITY(queue))
task = __rpc_wake_up_next_priority(queue); task = __rpc_wake_up_next_priority(queue);
else { else {
task_for_first(task, &queue->tasks[0]) task_for_first(task, &queue->tasks[0])
__rpc_wake_up_task(task); __rpc_wake_up_task(task);
} }
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&queue->lock);
return task; return task;
} }
...@@ -488,14 +485,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue) ...@@ -488,14 +485,14 @@ struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
* rpc_wake_up - wake up all rpc_tasks * rpc_wake_up - wake up all rpc_tasks
* @queue: rpc_wait_queue on which the tasks are sleeping * @queue: rpc_wait_queue on which the tasks are sleeping
* *
* Grabs rpc_queue_lock * Grabs queue->lock
*/ */
void rpc_wake_up(struct rpc_wait_queue *queue) void rpc_wake_up(struct rpc_wait_queue *queue)
{ {
struct rpc_task *task; struct rpc_task *task;
struct list_head *head; struct list_head *head;
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&queue->lock);
head = &queue->tasks[queue->maxpriority]; head = &queue->tasks[queue->maxpriority];
for (;;) { for (;;) {
while (!list_empty(head)) { while (!list_empty(head)) {
...@@ -506,7 +503,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue) ...@@ -506,7 +503,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
break; break;
head--; head--;
} }
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&queue->lock);
} }
/** /**
...@@ -514,14 +511,14 @@ void rpc_wake_up(struct rpc_wait_queue *queue) ...@@ -514,14 +511,14 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
* @queue: rpc_wait_queue on which the tasks are sleeping * @queue: rpc_wait_queue on which the tasks are sleeping
* @status: status value to set * @status: status value to set
* *
* Grabs rpc_queue_lock * Grabs queue->lock
*/ */
void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
{ {
struct list_head *head; struct list_head *head;
struct rpc_task *task; struct rpc_task *task;
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&queue->lock);
head = &queue->tasks[queue->maxpriority]; head = &queue->tasks[queue->maxpriority];
for (;;) { for (;;) {
while (!list_empty(head)) { while (!list_empty(head)) {
...@@ -533,7 +530,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) ...@@ -533,7 +530,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
break; break;
head--; head--;
} }
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&queue->lock);
} }
/* /*
...@@ -834,8 +831,7 @@ rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags) ...@@ -834,8 +831,7 @@ rpc_new_task(struct rpc_clnt *clnt, rpc_action callback, int flags)
goto out; goto out;
} }
void void rpc_release_task(struct rpc_task *task)
rpc_release_task(struct rpc_task *task)
{ {
dprintk("RPC: %4d release task\n", task->tk_pid); dprintk("RPC: %4d release task\n", task->tk_pid);
...@@ -885,10 +881,9 @@ rpc_release_task(struct rpc_task *task) ...@@ -885,10 +881,9 @@ rpc_release_task(struct rpc_task *task)
* queue 'childq'. If so returns a pointer to the parent. * queue 'childq'. If so returns a pointer to the parent.
* Upon failure returns NULL. * Upon failure returns NULL.
* *
* Caller must hold rpc_queue_lock * Caller must hold childq.lock
*/ */
static inline struct rpc_task * static inline struct rpc_task *rpc_find_parent(struct rpc_task *child)
rpc_find_parent(struct rpc_task *child)
{ {
struct rpc_task *task, *parent; struct rpc_task *task, *parent;
struct list_head *le; struct list_head *le;
...@@ -901,17 +896,16 @@ rpc_find_parent(struct rpc_task *child) ...@@ -901,17 +896,16 @@ rpc_find_parent(struct rpc_task *child)
return NULL; return NULL;
} }
static void static void rpc_child_exit(struct rpc_task *child)
rpc_child_exit(struct rpc_task *child)
{ {
struct rpc_task *parent; struct rpc_task *parent;
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&childq.lock);
if ((parent = rpc_find_parent(child)) != NULL) { if ((parent = rpc_find_parent(child)) != NULL) {
parent->tk_status = child->tk_status; parent->tk_status = child->tk_status;
__rpc_wake_up_task(parent); __rpc_wake_up_task(parent);
} }
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&childq.lock);
} }
/* /*
...@@ -934,22 +928,20 @@ rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent) ...@@ -934,22 +928,20 @@ rpc_new_child(struct rpc_clnt *clnt, struct rpc_task *parent)
return NULL; return NULL;
} }
void void rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
rpc_run_child(struct rpc_task *task, struct rpc_task *child, rpc_action func)
{ {
spin_lock_bh(&rpc_queue_lock); spin_lock_bh(&childq.lock);
/* N.B. Is it possible for the child to have already finished? */ /* N.B. Is it possible for the child to have already finished? */
__rpc_sleep_on(&childq, task, func, NULL); __rpc_sleep_on(&childq, task, func, NULL);
rpc_schedule_run(child); rpc_schedule_run(child);
spin_unlock_bh(&rpc_queue_lock); spin_unlock_bh(&childq.lock);
} }
/* /*
* Kill all tasks for the given client. * Kill all tasks for the given client.
* XXX: kill their descendants as well? * XXX: kill their descendants as well?
*/ */
void void rpc_killall_tasks(struct rpc_clnt *clnt)
rpc_killall_tasks(struct rpc_clnt *clnt)
{ {
struct rpc_task *rovr; struct rpc_task *rovr;
struct list_head *le; struct list_head *le;
...@@ -971,8 +963,7 @@ rpc_killall_tasks(struct rpc_clnt *clnt) ...@@ -971,8 +963,7 @@ rpc_killall_tasks(struct rpc_clnt *clnt)
static DECLARE_MUTEX_LOCKED(rpciod_running); static DECLARE_MUTEX_LOCKED(rpciod_running);
static void static void rpciod_killall(void)
rpciod_killall(void)
{ {
unsigned long flags; unsigned long flags;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment