Commit 7f3a1d1e authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Refactor xprt_transmit() to remove wait for reply code

Allow the caller in clnt.c to call into the code to wait for a reply
after calling xprt_transmit(). Again, the reason is that the backchannel
code does not need this functionality.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent edc81dcd
...@@ -335,6 +335,7 @@ void xprt_free_slot(struct rpc_xprt *xprt, ...@@ -335,6 +335,7 @@ void xprt_free_slot(struct rpc_xprt *xprt,
void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task); void xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
bool xprt_prepare_transmit(struct rpc_task *task); bool xprt_prepare_transmit(struct rpc_task *task);
void xprt_request_enqueue_receive(struct rpc_task *task); void xprt_request_enqueue_receive(struct rpc_task *task);
void xprt_request_wait_receive(struct rpc_task *task);
void xprt_transmit(struct rpc_task *task); void xprt_transmit(struct rpc_task *task);
void xprt_end_transmit(struct rpc_task *task); void xprt_end_transmit(struct rpc_task *task);
int xprt_adjust_timeout(struct rpc_rqst *req); int xprt_adjust_timeout(struct rpc_rqst *req);
......
...@@ -1975,15 +1975,6 @@ call_transmit(struct rpc_task *task) ...@@ -1975,15 +1975,6 @@ call_transmit(struct rpc_task *task)
return; return;
if (is_retrans) if (is_retrans)
task->tk_client->cl_stats->rpcretrans++; task->tk_client->cl_stats->rpcretrans++;
/*
* On success, ensure that we call xprt_end_transmit() before sleeping
* in order to allow access to the socket to other RPC requests.
*/
call_transmit_status(task);
if (rpc_reply_expected(task))
return;
task->tk_action = rpc_exit_task;
rpc_wake_up_queued_task(&task->tk_rqstp->rq_xprt->pending, task);
} }
/* /*
...@@ -2000,6 +1991,7 @@ call_transmit_status(struct rpc_task *task) ...@@ -2000,6 +1991,7 @@ call_transmit_status(struct rpc_task *task)
*/ */
if (task->tk_status == 0) { if (task->tk_status == 0) {
xprt_end_transmit(task); xprt_end_transmit(task);
xprt_request_wait_receive(task);
return; return;
} }
......
...@@ -654,6 +654,22 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) ...@@ -654,6 +654,22 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
} }
EXPORT_SYMBOL_GPL(xprt_force_disconnect); EXPORT_SYMBOL_GPL(xprt_force_disconnect);
static unsigned int
xprt_connect_cookie(struct rpc_xprt *xprt)
{
return READ_ONCE(xprt->connect_cookie);
}
static bool
xprt_request_retransmit_after_disconnect(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
return req->rq_connect_cookie != xprt_connect_cookie(xprt) ||
!xprt_connected(xprt);
}
/** /**
* xprt_conditional_disconnect - force a transport to disconnect * xprt_conditional_disconnect - force a transport to disconnect
* @xprt: transport to disconnect * @xprt: transport to disconnect
...@@ -1009,6 +1025,39 @@ static void xprt_timer(struct rpc_task *task) ...@@ -1009,6 +1025,39 @@ static void xprt_timer(struct rpc_task *task)
task->tk_status = 0; task->tk_status = 0;
} }
/**
* xprt_request_wait_receive - wait for the reply to an RPC request
* @task: RPC task about to send a request
*
*/
void xprt_request_wait_receive(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt;
if (!test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate))
return;
/*
* Sleep on the pending queue if we're expecting a reply.
* The spinlock ensures atomicity between the test of
* req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
*/
spin_lock(&xprt->queue_lock);
if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
xprt->ops->set_retrans_timeout(task);
rpc_sleep_on(&xprt->pending, task, xprt_timer);
/*
* Send an extra queue wakeup call if the
* connection was dropped in case the call to
* rpc_sleep_on() raced.
*/
if (xprt_request_retransmit_after_disconnect(task))
rpc_wake_up_queued_task_set_status(&xprt->pending,
task, -ENOTCONN);
}
spin_unlock(&xprt->queue_lock);
}
/** /**
* xprt_prepare_transmit - reserve the transport before sending a request * xprt_prepare_transmit - reserve the transport before sending a request
* @task: RPC task about to send a request * @task: RPC task about to send a request
...@@ -1028,9 +1077,8 @@ bool xprt_prepare_transmit(struct rpc_task *task) ...@@ -1028,9 +1077,8 @@ bool xprt_prepare_transmit(struct rpc_task *task)
task->tk_status = req->rq_reply_bytes_recvd; task->tk_status = req->rq_reply_bytes_recvd;
goto out_unlock; goto out_unlock;
} }
if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
&& xprt_connected(xprt) !xprt_request_retransmit_after_disconnect(task)) {
&& req->rq_connect_cookie == xprt->connect_cookie) {
xprt->ops->set_retrans_timeout(task); xprt->ops->set_retrans_timeout(task);
rpc_sleep_on(&xprt->pending, task, xprt_timer); rpc_sleep_on(&xprt->pending, task, xprt_timer);
goto out_unlock; goto out_unlock;
...@@ -1091,8 +1139,6 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1091,8 +1139,6 @@ void xprt_transmit(struct rpc_task *task)
task->tk_flags |= RPC_TASK_SENT; task->tk_flags |= RPC_TASK_SENT;
spin_lock_bh(&xprt->transport_lock); spin_lock_bh(&xprt->transport_lock);
xprt->ops->set_retrans_timeout(task);
xprt->stat.sends++; xprt->stat.sends++;
xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
xprt->stat.bklog_u += xprt->backlog.qlen; xprt->stat.bklog_u += xprt->backlog.qlen;
...@@ -1101,22 +1147,6 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1101,22 +1147,6 @@ void xprt_transmit(struct rpc_task *task)
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
req->rq_connect_cookie = connect_cookie; req->rq_connect_cookie = connect_cookie;
if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
/*
* Sleep on the pending queue if we're expecting a reply.
* The spinlock ensures atomicity between the test of
* req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
*/
spin_lock(&xprt->queue_lock);
if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
rpc_sleep_on(&xprt->pending, task, xprt_timer);
/* Wake up immediately if the connection was dropped */
if (!xprt_connected(xprt))
rpc_wake_up_queued_task_set_status(&xprt->pending,
task, -ENOTCONN);
}
spin_unlock(&xprt->queue_lock);
}
} }
static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
...@@ -1321,7 +1351,7 @@ xprt_request_init(struct rpc_task *task) ...@@ -1321,7 +1351,7 @@ xprt_request_init(struct rpc_task *task)
req->rq_xprt = xprt; req->rq_xprt = xprt;
req->rq_buffer = NULL; req->rq_buffer = NULL;
req->rq_xid = xprt_alloc_xid(xprt); req->rq_xid = xprt_alloc_xid(xprt);
req->rq_connect_cookie = xprt->connect_cookie - 1; req->rq_connect_cookie = xprt_connect_cookie(xprt) - 1;
req->rq_bytes_sent = 0; req->rq_bytes_sent = 0;
req->rq_snd_buf.len = 0; req->rq_snd_buf.len = 0;
req->rq_snd_buf.buflen = 0; req->rq_snd_buf.buflen = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment