Commit 7e0a0e38 authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Replace the queue timer with a delayed work function

The queue timer function, which walks the RPC queue in order to locate
candidates for waking up is one of the current constraints against
removing the bh-safe queue spin locks. Replace it with a delayed
work queue, so that we can do the actual rpc task wake ups from an
ordinary process context.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 44942b4e
...@@ -183,8 +183,9 @@ struct rpc_task_setup { ...@@ -183,8 +183,9 @@ struct rpc_task_setup {
#define RPC_NR_PRIORITY (1 + RPC_PRIORITY_PRIVILEGED - RPC_PRIORITY_LOW) #define RPC_NR_PRIORITY (1 + RPC_PRIORITY_PRIVILEGED - RPC_PRIORITY_LOW)
struct rpc_timer { struct rpc_timer {
struct timer_list timer;
struct list_head list; struct list_head list;
unsigned long expires;
struct delayed_work dwork;
}; };
/* /*
......
...@@ -46,7 +46,7 @@ static mempool_t *rpc_buffer_mempool __read_mostly; ...@@ -46,7 +46,7 @@ static mempool_t *rpc_buffer_mempool __read_mostly;
static void rpc_async_schedule(struct work_struct *); static void rpc_async_schedule(struct work_struct *);
static void rpc_release_task(struct rpc_task *task); static void rpc_release_task(struct rpc_task *task);
static void __rpc_queue_timer_fn(struct timer_list *t); static void __rpc_queue_timer_fn(struct work_struct *);
/* /*
* RPC tasks sit here while waiting for conditions to improve. * RPC tasks sit here while waiting for conditions to improve.
...@@ -87,13 +87,19 @@ __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task) ...@@ -87,13 +87,19 @@ __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
task->tk_timeout = 0; task->tk_timeout = 0;
list_del(&task->u.tk_wait.timer_list); list_del(&task->u.tk_wait.timer_list);
if (list_empty(&queue->timer_list.list)) if (list_empty(&queue->timer_list.list))
del_timer(&queue->timer_list.timer); cancel_delayed_work(&queue->timer_list.dwork);
} }
static void static void
rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires) rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
{ {
timer_reduce(&queue->timer_list.timer, expires); unsigned long now = jiffies;
queue->timer_list.expires = expires;
if (time_before_eq(expires, now))
expires = 0;
else
expires -= now;
mod_delayed_work(rpciod_workqueue, &queue->timer_list.dwork, expires);
} }
/* /*
...@@ -107,7 +113,8 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task, ...@@ -107,7 +113,8 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task,
task->tk_pid, jiffies_to_msecs(timeout - jiffies)); task->tk_pid, jiffies_to_msecs(timeout - jiffies));
task->tk_timeout = timeout; task->tk_timeout = timeout;
rpc_set_queue_timer(queue, timeout); if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires))
rpc_set_queue_timer(queue, timeout);
list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list); list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
} }
...@@ -250,7 +257,8 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c ...@@ -250,7 +257,8 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
queue->maxpriority = nr_queues - 1; queue->maxpriority = nr_queues - 1;
rpc_reset_waitqueue_priority(queue); rpc_reset_waitqueue_priority(queue);
queue->qlen = 0; queue->qlen = 0;
timer_setup(&queue->timer_list.timer, __rpc_queue_timer_fn, 0); queue->timer_list.expires = 0;
INIT_DEFERRABLE_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn);
INIT_LIST_HEAD(&queue->timer_list.list); INIT_LIST_HEAD(&queue->timer_list.list);
rpc_assign_waitqueue_name(queue, qname); rpc_assign_waitqueue_name(queue, qname);
} }
...@@ -269,7 +277,7 @@ EXPORT_SYMBOL_GPL(rpc_init_wait_queue); ...@@ -269,7 +277,7 @@ EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
void rpc_destroy_wait_queue(struct rpc_wait_queue *queue) void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
{ {
del_timer_sync(&queue->timer_list.timer); cancel_delayed_work_sync(&queue->timer_list.dwork);
} }
EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue); EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
...@@ -759,13 +767,15 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) ...@@ -759,13 +767,15 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
} }
EXPORT_SYMBOL_GPL(rpc_wake_up_status); EXPORT_SYMBOL_GPL(rpc_wake_up_status);
static void __rpc_queue_timer_fn(struct timer_list *t) static void __rpc_queue_timer_fn(struct work_struct *work)
{ {
struct rpc_wait_queue *queue = from_timer(queue, t, timer_list.timer); struct rpc_wait_queue *queue = container_of(work,
struct rpc_wait_queue,
timer_list.dwork.work);
struct rpc_task *task, *n; struct rpc_task *task, *n;
unsigned long expires, now, timeo; unsigned long expires, now, timeo;
spin_lock(&queue->lock); spin_lock_bh(&queue->lock);
expires = now = jiffies; expires = now = jiffies;
list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) { list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
timeo = task->tk_timeout; timeo = task->tk_timeout;
...@@ -780,7 +790,7 @@ static void __rpc_queue_timer_fn(struct timer_list *t) ...@@ -780,7 +790,7 @@ static void __rpc_queue_timer_fn(struct timer_list *t)
} }
if (!list_empty(&queue->timer_list.list)) if (!list_empty(&queue->timer_list.list))
rpc_set_queue_timer(queue, expires); rpc_set_queue_timer(queue, expires);
spin_unlock(&queue->lock); spin_unlock_bh(&queue->lock);
} }
static void __rpc_atrun(struct rpc_task *task) static void __rpc_atrun(struct rpc_task *task)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment