Commit 55ae1aab authored by Ricardo Labiaga's avatar Ricardo Labiaga Committed by Benny Halevy

nfs41: Add backchannel processing support to RPC state machine

Adds rpc_run_bc_task() which is called by the NFS callback service to
process backchannel requests.  It performs similar work to rpc_run_task()
though "schedules" the backchannel task to be executed starting at the
call_trasmit state in the RPC state machine.

It also introduces some miscellaneous updates to the argument validation,
call_transmit, and transport cleanup functions to take into account
that there are now forechannel and backchannel tasks.

Backchannel requests do not carry an RPC message structure, since the
payload has already been XDR encoded using the existing NFSv4 callback
mechanism.

Introduce a new transmit state for the client to reply on to backchannel
requests.  This new state simply reserves the transport and issues the
reply.  In case of a connection related error, disconnects the transport and
drops the reply.  It requires the forechannel to re-establish the connection
and the server to retransmit the request, as stated in NFSv4.1 section
2.9.2 "Client and Server Transport Behavior".

Note: There is no need to loop attempting to reserve the transport.  If EAGAIN
is returned by xprt_prepare_transmit(), return with tk_status == 0,
setting tk_action to call_bc_transmit.  rpc_execute() will invoke it again
after the task is taken off the sleep queue.

[nfs41: rpc_run_bc_task() need not be exported outside RPC module]
[nfs41: New call_bc_transmit RPC state]
Signed-off-by: default avatarRicardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: default avatarBenny Halevy <bhalevy@panasas.com>
[nfs41: Backchannel: No need to loop in call_bc_transmit()]
Signed-off-by: default avatarAndy Adamson <andros@netapp.com>
Signed-off-by: default avatarRicardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: default avatarBenny Halevy <bhalevy@panasas.com>
[rpc_count_iostats incorrectly exits early]
Signed-off-by: default avatarRicardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: default avatarBenny Halevy <bhalevy@panasas.com>
[Convert rpc_reply_expected() to inline function]
[Remove unnecessary BUG_ON()]
[Rename variable]
Signed-off-by: default avatarRicardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: default avatarBenny Halevy <bhalevy@panasas.com>
parent 44b98efd
...@@ -210,6 +210,8 @@ struct rpc_wait_queue { ...@@ -210,6 +210,8 @@ struct rpc_wait_queue {
*/ */
struct rpc_task *rpc_new_task(const struct rpc_task_setup *); struct rpc_task *rpc_new_task(const struct rpc_task_setup *);
struct rpc_task *rpc_run_task(const struct rpc_task_setup *); struct rpc_task *rpc_run_task(const struct rpc_task_setup *);
struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
const struct rpc_call_ops *ops);
void rpc_put_task(struct rpc_task *); void rpc_put_task(struct rpc_task *);
void rpc_exit_task(struct rpc_task *); void rpc_exit_task(struct rpc_task *);
void rpc_release_calldata(const struct rpc_call_ops *, void *); void rpc_release_calldata(const struct rpc_call_ops *, void *);
......
...@@ -215,6 +215,18 @@ struct rpc_xprt { ...@@ -215,6 +215,18 @@ struct rpc_xprt {
/* buffer in use */ /* buffer in use */
#endif /* CONFIG_NFS_V4_1 */ #endif /* CONFIG_NFS_V4_1 */
#if defined(CONFIG_NFS_V4_1)
static inline int bc_prealloc(struct rpc_rqst *req)
{
return test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
}
#else
static inline int bc_prealloc(struct rpc_rqst *req)
{
return 0;
}
#endif /* CONFIG_NFS_V4_1 */
struct xprt_create { struct xprt_create {
int ident; /* XPRT_TRANSPORT identifier */ int ident; /* XPRT_TRANSPORT identifier */
struct sockaddr * srcaddr; /* optional local address */ struct sockaddr * srcaddr; /* optional local address */
......
...@@ -36,7 +36,9 @@ ...@@ -36,7 +36,9 @@
#include <linux/sunrpc/clnt.h> #include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/rpc_pipe_fs.h> #include <linux/sunrpc/rpc_pipe_fs.h>
#include <linux/sunrpc/metrics.h> #include <linux/sunrpc/metrics.h>
#include <linux/sunrpc/bc_xprt.h>
#include "sunrpc.h"
#ifdef RPC_DEBUG #ifdef RPC_DEBUG
# define RPCDBG_FACILITY RPCDBG_CALL # define RPCDBG_FACILITY RPCDBG_CALL
...@@ -63,6 +65,9 @@ static void call_decode(struct rpc_task *task); ...@@ -63,6 +65,9 @@ static void call_decode(struct rpc_task *task);
static void call_bind(struct rpc_task *task); static void call_bind(struct rpc_task *task);
static void call_bind_status(struct rpc_task *task); static void call_bind_status(struct rpc_task *task);
static void call_transmit(struct rpc_task *task); static void call_transmit(struct rpc_task *task);
#if defined(CONFIG_NFS_V4_1)
static void call_bc_transmit(struct rpc_task *task);
#endif /* CONFIG_NFS_V4_1 */
static void call_status(struct rpc_task *task); static void call_status(struct rpc_task *task);
static void call_transmit_status(struct rpc_task *task); static void call_transmit_status(struct rpc_task *task);
static void call_refresh(struct rpc_task *task); static void call_refresh(struct rpc_task *task);
...@@ -613,6 +618,50 @@ rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags, ...@@ -613,6 +618,50 @@ rpc_call_async(struct rpc_clnt *clnt, const struct rpc_message *msg, int flags,
} }
EXPORT_SYMBOL_GPL(rpc_call_async); EXPORT_SYMBOL_GPL(rpc_call_async);
#if defined(CONFIG_NFS_V4_1)
/**
* rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
* rpc_execute against it
* @ops: RPC call ops
*/
struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
const struct rpc_call_ops *tk_ops)
{
struct rpc_task *task;
struct xdr_buf *xbufp = &req->rq_snd_buf;
struct rpc_task_setup task_setup_data = {
.callback_ops = tk_ops,
};
dprintk("RPC: rpc_run_bc_task req= %p\n", req);
/*
* Create an rpc_task to send the data
*/
task = rpc_new_task(&task_setup_data);
if (!task) {
xprt_free_bc_request(req);
goto out;
}
task->tk_rqstp = req;
/*
* Set up the xdr_buf length.
* This also indicates that the buffer is XDR encoded already.
*/
xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
xbufp->tail[0].iov_len;
task->tk_action = call_bc_transmit;
atomic_inc(&task->tk_count);
BUG_ON(atomic_read(&task->tk_count) != 2);
rpc_execute(task);
out:
dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
return task;
}
#endif /* CONFIG_NFS_V4_1 */
void void
rpc_call_start(struct rpc_task *task) rpc_call_start(struct rpc_task *task)
{ {
...@@ -1098,7 +1147,7 @@ call_transmit(struct rpc_task *task) ...@@ -1098,7 +1147,7 @@ call_transmit(struct rpc_task *task)
* in order to allow access to the socket to other RPC requests. * in order to allow access to the socket to other RPC requests.
*/ */
call_transmit_status(task); call_transmit_status(task);
if (task->tk_msg.rpc_proc->p_decode != NULL) if (rpc_reply_expected(task))
return; return;
task->tk_action = rpc_exit_task; task->tk_action = rpc_exit_task;
rpc_wake_up_queued_task(&task->tk_xprt->pending, task); rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
...@@ -1133,6 +1182,72 @@ call_transmit_status(struct rpc_task *task) ...@@ -1133,6 +1182,72 @@ call_transmit_status(struct rpc_task *task)
} }
} }
#if defined(CONFIG_NFS_V4_1)
/*
* 5b. Send the backchannel RPC reply. On error, drop the reply. In
* addition, disconnect on connectivity errors.
*/
static void
call_bc_transmit(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
BUG_ON(task->tk_status != 0);
task->tk_status = xprt_prepare_transmit(task);
if (task->tk_status == -EAGAIN) {
/*
* Could not reserve the transport. Try again after the
* transport is released.
*/
task->tk_status = 0;
task->tk_action = call_bc_transmit;
return;
}
task->tk_action = rpc_exit_task;
if (task->tk_status < 0) {
printk(KERN_NOTICE "RPC: Could not send backchannel reply "
"error: %d\n", task->tk_status);
return;
}
xprt_transmit(task);
xprt_end_transmit(task);
dprint_status(task);
switch (task->tk_status) {
case 0:
/* Success */
break;
case -EHOSTDOWN:
case -EHOSTUNREACH:
case -ENETUNREACH:
case -ETIMEDOUT:
/*
* Problem reaching the server. Disconnect and let the
* forechannel reestablish the connection. The server will
* have to retransmit the backchannel request and we'll
* reprocess it. Since these ops are idempotent, there's no
* need to cache our reply at this time.
*/
printk(KERN_NOTICE "RPC: Could not send backchannel reply "
"error: %d\n", task->tk_status);
xprt_conditional_disconnect(task->tk_xprt,
req->rq_connect_cookie);
break;
default:
/*
* We were unable to reply and will have to drop the
* request. The server should reconnect and retransmit.
*/
BUG_ON(task->tk_status == -EAGAIN);
printk(KERN_NOTICE "RPC: Could not send backchannel reply "
"error: %d\n", task->tk_status);
break;
}
rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
}
#endif /* CONFIG_NFS_V4_1 */
/* /*
* 6. Sort out the RPC call status * 6. Sort out the RPC call status
*/ */
......
...@@ -141,12 +141,14 @@ EXPORT_SYMBOL_GPL(rpc_free_iostats); ...@@ -141,12 +141,14 @@ EXPORT_SYMBOL_GPL(rpc_free_iostats);
void rpc_count_iostats(struct rpc_task *task) void rpc_count_iostats(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
struct rpc_iostats *stats = task->tk_client->cl_metrics; struct rpc_iostats *stats;
struct rpc_iostats *op_metrics; struct rpc_iostats *op_metrics;
long rtt, execute, queue; long rtt, execute, queue;
if (!stats || !req) if (!task->tk_client || !task->tk_client->cl_metrics || !req)
return; return;
stats = task->tk_client->cl_metrics;
op_metrics = &stats[task->tk_msg.rpc_proc->p_statidx]; op_metrics = &stats[task->tk_msg.rpc_proc->p_statidx];
op_metrics->om_ops++; op_metrics->om_ops++;
......
/******************************************************************************
(c) 2008 NetApp. All Rights Reserved.
NetApp provides this source code under the GPL v2 License.
The GPL v2 license is available at
http://opensource.org/licenses/gpl-license.php.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
/*
* Functions and macros used internally by RPC
*/
#ifndef _NET_SUNRPC_SUNRPC_H
#define _NET_SUNRPC_SUNRPC_H
static inline int rpc_reply_expected(struct rpc_task *task)
{
return (task->tk_msg.rpc_proc != NULL) &&
(task->tk_msg.rpc_proc->p_decode != NULL);
}
#endif /* _NET_SUNRPC_SUNRPC_H */
...@@ -12,8 +12,9 @@ ...@@ -12,8 +12,9 @@
* - Next, the caller puts together the RPC message, stuffs it into * - Next, the caller puts together the RPC message, stuffs it into
* the request struct, and calls xprt_transmit(). * the request struct, and calls xprt_transmit().
* - xprt_transmit sends the message and installs the caller on the * - xprt_transmit sends the message and installs the caller on the
* transport's wait list. At the same time, it installs a timer that * transport's wait list. At the same time, if a reply is expected,
* is run after the packet's timeout has expired. * it installs a timer that is run after the packet's timeout has
* expired.
* - When a packet arrives, the data_ready handler walks the list of * - When a packet arrives, the data_ready handler walks the list of
* pending requests for that transport. If a matching XID is found, the * pending requests for that transport. If a matching XID is found, the
* caller is woken up, and the timer removed. * caller is woken up, and the timer removed.
...@@ -46,6 +47,8 @@ ...@@ -46,6 +47,8 @@
#include <linux/sunrpc/clnt.h> #include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/metrics.h> #include <linux/sunrpc/metrics.h>
#include "sunrpc.h"
/* /*
* Local variables * Local variables
*/ */
...@@ -873,7 +876,10 @@ void xprt_transmit(struct rpc_task *task) ...@@ -873,7 +876,10 @@ void xprt_transmit(struct rpc_task *task)
dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
if (!req->rq_received) { if (!req->rq_received) {
if (list_empty(&req->rq_list)) { if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
/*
* Add to the list only if we're expecting a reply
*/
spin_lock_bh(&xprt->transport_lock); spin_lock_bh(&xprt->transport_lock);
/* Update the softirq receive buffer */ /* Update the softirq receive buffer */
memcpy(&req->rq_private_buf, &req->rq_rcv_buf, memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
...@@ -908,8 +914,13 @@ void xprt_transmit(struct rpc_task *task) ...@@ -908,8 +914,13 @@ void xprt_transmit(struct rpc_task *task)
/* Don't race with disconnect */ /* Don't race with disconnect */
if (!xprt_connected(xprt)) if (!xprt_connected(xprt))
task->tk_status = -ENOTCONN; task->tk_status = -ENOTCONN;
else if (!req->rq_received) else if (!req->rq_received && rpc_reply_expected(task)) {
/*
* Sleep on the pending queue since
* we're expecting a reply.
*/
rpc_sleep_on(&xprt->pending, task, xprt_timer); rpc_sleep_on(&xprt->pending, task, xprt_timer);
}
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
} }
...@@ -982,11 +993,17 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) ...@@ -982,11 +993,17 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
*/ */
void xprt_release(struct rpc_task *task) void xprt_release(struct rpc_task *task)
{ {
struct rpc_xprt *xprt = task->tk_xprt; struct rpc_xprt *xprt;
struct rpc_rqst *req; struct rpc_rqst *req;
int is_bc_request;
if (!(req = task->tk_rqstp)) if (!(req = task->tk_rqstp))
return; return;
/* Preallocated backchannel request? */
is_bc_request = bc_prealloc(req);
xprt = req->rq_xprt;
rpc_count_iostats(task); rpc_count_iostats(task);
spin_lock_bh(&xprt->transport_lock); spin_lock_bh(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task); xprt->ops->release_xprt(xprt, task);
...@@ -999,10 +1016,19 @@ void xprt_release(struct rpc_task *task) ...@@ -999,10 +1016,19 @@ void xprt_release(struct rpc_task *task)
mod_timer(&xprt->timer, mod_timer(&xprt->timer,
xprt->last_used + xprt->idle_timeout); xprt->last_used + xprt->idle_timeout);
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
if (!bc_prealloc(req))
xprt->ops->buf_free(req->rq_buffer); xprt->ops->buf_free(req->rq_buffer);
task->tk_rqstp = NULL; task->tk_rqstp = NULL;
if (req->rq_release_snd_buf) if (req->rq_release_snd_buf)
req->rq_release_snd_buf(req); req->rq_release_snd_buf(req);
/*
* Early exit if this is a backchannel preallocated request.
* There is no need to have it added to the RPC slot list.
*/
if (is_bc_request)
return;
memset(req, 0, sizeof(*req)); /* mark unused */ memset(req, 0, sizeof(*req)); /* mark unused */
dprintk("RPC: %5u release request %p\n", task->tk_pid, req); dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment