Commit f1dc237c authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Reduce latency when send queue is congested

Use the low latency transport workqueue to process the task that is
next in line on the xprt->sending queue.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent 40a5f1b1
...@@ -230,6 +230,10 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *, ...@@ -230,6 +230,10 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *,
struct rpc_task *); struct rpc_task *);
void rpc_wake_up(struct rpc_wait_queue *); void rpc_wake_up(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *); struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *,
bool (*)(struct rpc_task *, void *),
void *);
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *, struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *,
bool (*)(struct rpc_task *, void *), bool (*)(struct rpc_task *, void *),
void *); void *);
......
...@@ -330,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); ...@@ -330,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task);
* lockless RPC_IS_QUEUED() test) before we've had a chance to test * lockless RPC_IS_QUEUED() test) before we've had a chance to test
* the RPC_TASK_RUNNING flag. * the RPC_TASK_RUNNING flag.
*/ */
static void rpc_make_runnable(struct rpc_task *task) static void rpc_make_runnable(struct workqueue_struct *wq,
struct rpc_task *task)
{ {
bool need_wakeup = !rpc_test_and_set_running(task); bool need_wakeup = !rpc_test_and_set_running(task);
...@@ -339,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task) ...@@ -339,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task)
return; return;
if (RPC_IS_ASYNC(task)) { if (RPC_IS_ASYNC(task)) {
INIT_WORK(&task->u.tk_work, rpc_async_schedule); INIT_WORK(&task->u.tk_work, rpc_async_schedule);
queue_work(rpciod_workqueue, &task->u.tk_work); queue_work(wq, &task->u.tk_work);
} else } else
wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED);
} }
...@@ -408,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task, ...@@ -408,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
EXPORT_SYMBOL_GPL(rpc_sleep_on_priority); EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
/** /**
* __rpc_do_wake_up_task - wake up a single rpc_task * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task
* @wq: workqueue on which to run task
* @queue: wait queue * @queue: wait queue
* @task: task to be woken up * @task: task to be woken up
* *
* Caller must hold queue->lock, and have cleared the task queued flag. * Caller must hold queue->lock, and have cleared the task queued flag.
*/ */
static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task) static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
struct rpc_task *task)
{ {
dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n",
task->tk_pid, jiffies); task->tk_pid, jiffies);
...@@ -429,7 +433,7 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task ...@@ -429,7 +433,7 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
__rpc_remove_wait_queue(queue, task); __rpc_remove_wait_queue(queue, task);
rpc_make_runnable(task); rpc_make_runnable(wq, task);
dprintk("RPC: __rpc_wake_up_task done\n"); dprintk("RPC: __rpc_wake_up_task done\n");
} }
...@@ -437,15 +441,24 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task ...@@ -437,15 +441,24 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task
/* /*
* Wake up a queued task while the queue lock is being held * Wake up a queued task while the queue lock is being held
*/ */
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq,
struct rpc_wait_queue *queue, struct rpc_task *task)
{ {
if (RPC_IS_QUEUED(task)) { if (RPC_IS_QUEUED(task)) {
smp_rmb(); smp_rmb();
if (task->tk_waitqueue == queue) if (task->tk_waitqueue == queue)
__rpc_do_wake_up_task(queue, task); __rpc_do_wake_up_task_on_wq(wq, queue, task);
} }
} }
/*
* Wake up a queued task while the queue lock is being held
*/
static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task)
{
rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task);
}
/* /*
* Wake up a task on a specific queue * Wake up a task on a specific queue
*/ */
...@@ -519,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue) ...@@ -519,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
/* /*
* Wake up the first task on the wait queue. * Wake up the first task on the wait queue.
*/ */
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
struct rpc_wait_queue *queue,
bool (*func)(struct rpc_task *, void *), void *data) bool (*func)(struct rpc_task *, void *), void *data)
{ {
struct rpc_task *task = NULL; struct rpc_task *task = NULL;
...@@ -530,7 +544,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, ...@@ -530,7 +544,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
task = __rpc_find_next_queued(queue); task = __rpc_find_next_queued(queue);
if (task != NULL) { if (task != NULL) {
if (func(task, data)) if (func(task, data))
rpc_wake_up_task_queue_locked(queue, task); rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
else else
task = NULL; task = NULL;
} }
...@@ -538,6 +552,15 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, ...@@ -538,6 +552,15 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
return task; return task;
} }
/*
* Wake up the first task on the wait queue.
*/
struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
bool (*func)(struct rpc_task *, void *), void *data)
{
return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data);
}
EXPORT_SYMBOL_GPL(rpc_wake_up_first); EXPORT_SYMBOL_GPL(rpc_wake_up_first);
static bool rpc_wake_up_next_func(struct rpc_task *task, void *data) static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
...@@ -815,7 +838,7 @@ void rpc_execute(struct rpc_task *task) ...@@ -815,7 +838,7 @@ void rpc_execute(struct rpc_task *task)
bool is_async = RPC_IS_ASYNC(task); bool is_async = RPC_IS_ASYNC(task);
rpc_set_active(task); rpc_set_active(task);
rpc_make_runnable(task); rpc_make_runnable(rpciod_workqueue, task);
if (!is_async) if (!is_async)
__rpc_execute(task); __rpc_execute(task);
} }
......
...@@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt) ...@@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt)
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
return; return;
if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_func, xprt))
return; return;
xprt_clear_locked(xprt); xprt_clear_locked(xprt);
} }
...@@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) ...@@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
return; return;
if (RPCXPRT_CONGESTED(xprt)) if (RPCXPRT_CONGESTED(xprt))
goto out_unlock; goto out_unlock;
if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt)) if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending,
__xprt_lock_write_cong_func, xprt))
return; return;
out_unlock: out_unlock:
xprt_clear_locked(xprt); xprt_clear_locked(xprt);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment