Commit d025fbf1 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'nfs-for-4.15-3' of git://git.linux-nfs.org/projects/anna/linux-nfs

Pull NFS client fixes from Anna Schumaker:
 "This has two stable bugfixes, one to fix a BUG_ON() when
  nfs_commit_inode() is called with no outstanding commit requests and
  another to fix a race in the SUNRPC receive codepath.

  Additionally, there are also fixes for an NFS client deadlock and an
  xprtrdma performance regression.

  Summary:

  Stable bugfixes:
   - NFS: Avoid a BUG_ON() in nfs_commit_inode() by not waiting for a
     commit in the case that there were no commit requests.
   - SUNRPC: Fix a race in the receive code path

  Other fixes:
   - NFS: Fix a deadlock in nfs client initialization
   - xprtrdma: Fix a performance regression for small IOs"

* tag 'nfs-for-4.15-3' of git://git.linux-nfs.org/projects/anna/linux-nfs:
  SUNRPC: Fix a race in the receive code path
  nfs: don't wait on commit in nfs_commit_inode() if there were no commit requests
  xprtrdma: Spread reply processing over more CPUs
  nfs: fix a deadlock in nfs client initialization
parents f6f37321 90d91b0c
...@@ -291,12 +291,23 @@ static struct nfs_client *nfs_match_client(const struct nfs_client_initdata *dat ...@@ -291,12 +291,23 @@ static struct nfs_client *nfs_match_client(const struct nfs_client_initdata *dat
const struct sockaddr *sap = data->addr; const struct sockaddr *sap = data->addr;
struct nfs_net *nn = net_generic(data->net, nfs_net_id); struct nfs_net *nn = net_generic(data->net, nfs_net_id);
again:
list_for_each_entry(clp, &nn->nfs_client_list, cl_share_link) { list_for_each_entry(clp, &nn->nfs_client_list, cl_share_link) {
const struct sockaddr *clap = (struct sockaddr *)&clp->cl_addr; const struct sockaddr *clap = (struct sockaddr *)&clp->cl_addr;
/* Don't match clients that failed to initialise properly */ /* Don't match clients that failed to initialise properly */
if (clp->cl_cons_state < 0) if (clp->cl_cons_state < 0)
continue; continue;
/* If a client is still initializing then we need to wait */
if (clp->cl_cons_state > NFS_CS_READY) {
refcount_inc(&clp->cl_count);
spin_unlock(&nn->nfs_client_lock);
nfs_wait_client_init_complete(clp);
nfs_put_client(clp);
spin_lock(&nn->nfs_client_lock);
goto again;
}
/* Different NFS versions cannot share the same nfs_client */ /* Different NFS versions cannot share the same nfs_client */
if (clp->rpc_ops != data->nfs_mod->rpc_ops) if (clp->rpc_ops != data->nfs_mod->rpc_ops)
continue; continue;
......
...@@ -404,15 +404,19 @@ struct nfs_client *nfs4_init_client(struct nfs_client *clp, ...@@ -404,15 +404,19 @@ struct nfs_client *nfs4_init_client(struct nfs_client *clp,
if (error < 0) if (error < 0)
goto error; goto error;
if (!nfs4_has_session(clp))
nfs_mark_client_ready(clp, NFS_CS_READY);
error = nfs4_discover_server_trunking(clp, &old); error = nfs4_discover_server_trunking(clp, &old);
if (error < 0) if (error < 0)
goto error; goto error;
if (clp != old) if (clp != old) {
clp->cl_preserve_clid = true; clp->cl_preserve_clid = true;
/*
* Mark the client as having failed initialization so other
* processes walking the nfs_client_list in nfs_match_client()
* won't try to use it.
*/
nfs_mark_client_ready(clp, -EPERM);
}
nfs_put_client(clp); nfs_put_client(clp);
clear_bit(NFS_CS_TSM_POSSIBLE, &clp->cl_flags); clear_bit(NFS_CS_TSM_POSSIBLE, &clp->cl_flags);
return old; return old;
...@@ -539,6 +543,9 @@ int nfs40_walk_client_list(struct nfs_client *new, ...@@ -539,6 +543,9 @@ int nfs40_walk_client_list(struct nfs_client *new,
spin_lock(&nn->nfs_client_lock); spin_lock(&nn->nfs_client_lock);
list_for_each_entry(pos, &nn->nfs_client_list, cl_share_link) { list_for_each_entry(pos, &nn->nfs_client_list, cl_share_link) {
if (pos == new)
goto found;
status = nfs4_match_client(pos, new, &prev, nn); status = nfs4_match_client(pos, new, &prev, nn);
if (status < 0) if (status < 0)
goto out_unlock; goto out_unlock;
...@@ -559,6 +566,7 @@ int nfs40_walk_client_list(struct nfs_client *new, ...@@ -559,6 +566,7 @@ int nfs40_walk_client_list(struct nfs_client *new,
* way that a SETCLIENTID_CONFIRM to pos can succeed is * way that a SETCLIENTID_CONFIRM to pos can succeed is
* if new and pos point to the same server: * if new and pos point to the same server:
*/ */
found:
refcount_inc(&pos->cl_count); refcount_inc(&pos->cl_count);
spin_unlock(&nn->nfs_client_lock); spin_unlock(&nn->nfs_client_lock);
...@@ -572,6 +580,7 @@ int nfs40_walk_client_list(struct nfs_client *new, ...@@ -572,6 +580,7 @@ int nfs40_walk_client_list(struct nfs_client *new,
case 0: case 0:
nfs4_swap_callback_idents(pos, new); nfs4_swap_callback_idents(pos, new);
pos->cl_confirm = new->cl_confirm; pos->cl_confirm = new->cl_confirm;
nfs_mark_client_ready(pos, NFS_CS_READY);
prev = NULL; prev = NULL;
*result = pos; *result = pos;
......
...@@ -1890,6 +1890,8 @@ int nfs_commit_inode(struct inode *inode, int how) ...@@ -1890,6 +1890,8 @@ int nfs_commit_inode(struct inode *inode, int how)
if (res) if (res)
error = nfs_generic_commit_list(inode, &head, how, &cinfo); error = nfs_generic_commit_list(inode, &head, how, &cinfo);
nfs_commit_end(cinfo.mds); nfs_commit_end(cinfo.mds);
if (res == 0)
return res;
if (error < 0) if (error < 0)
goto out_error; goto out_error;
if (!may_wait) if (!may_wait)
......
...@@ -1001,6 +1001,7 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1001,6 +1001,7 @@ void xprt_transmit(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp; struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = req->rq_xprt; struct rpc_xprt *xprt = req->rq_xprt;
unsigned int connect_cookie;
int status, numreqs; int status, numreqs;
dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
...@@ -1024,6 +1025,7 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1024,6 +1025,7 @@ void xprt_transmit(struct rpc_task *task)
} else if (!req->rq_bytes_sent) } else if (!req->rq_bytes_sent)
return; return;
connect_cookie = xprt->connect_cookie;
req->rq_xtime = ktime_get(); req->rq_xtime = ktime_get();
status = xprt->ops->send_request(task); status = xprt->ops->send_request(task);
trace_xprt_transmit(xprt, req->rq_xid, status); trace_xprt_transmit(xprt, req->rq_xid, status);
...@@ -1047,20 +1049,28 @@ void xprt_transmit(struct rpc_task *task) ...@@ -1047,20 +1049,28 @@ void xprt_transmit(struct rpc_task *task)
xprt->stat.bklog_u += xprt->backlog.qlen; xprt->stat.bklog_u += xprt->backlog.qlen;
xprt->stat.sending_u += xprt->sending.qlen; xprt->stat.sending_u += xprt->sending.qlen;
xprt->stat.pending_u += xprt->pending.qlen; xprt->stat.pending_u += xprt->pending.qlen;
spin_unlock_bh(&xprt->transport_lock);
/* Don't race with disconnect */ req->rq_connect_cookie = connect_cookie;
if (!xprt_connected(xprt)) if (rpc_reply_expected(task) && !READ_ONCE(req->rq_reply_bytes_recvd)) {
task->tk_status = -ENOTCONN;
else {
/* /*
* Sleep on the pending queue since * Sleep on the pending queue if we're expecting a reply.
* we're expecting a reply. * The spinlock ensures atomicity between the test of
* req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
*/ */
if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) spin_lock(&xprt->recv_lock);
if (!req->rq_reply_bytes_recvd) {
rpc_sleep_on(&xprt->pending, task, xprt_timer); rpc_sleep_on(&xprt->pending, task, xprt_timer);
req->rq_connect_cookie = xprt->connect_cookie; /*
* Send an extra queue wakeup call if the
* connection was dropped in case the call to
* rpc_sleep_on() raced.
*/
if (!xprt_connected(xprt))
xprt_wake_pending_tasks(xprt, -ENOTCONN);
}
spin_unlock(&xprt->recv_lock);
} }
spin_unlock_bh(&xprt->transport_lock);
} }
static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task) static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
......
...@@ -1408,11 +1408,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -1408,11 +1408,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n", dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
__func__, rep, req, be32_to_cpu(rep->rr_xid)); __func__, rep, req, be32_to_cpu(rep->rr_xid));
if (list_empty(&req->rl_registered) && queue_work_on(req->rl_cpu, rpcrdma_receive_wq, &rep->rr_work);
!test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags))
rpcrdma_complete_rqst(rep);
else
queue_work(rpcrdma_receive_wq, &rep->rr_work);
return; return;
out_badstatus: out_badstatus:
......
...@@ -52,6 +52,7 @@ ...@@ -52,6 +52,7 @@
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/seq_file.h> #include <linux/seq_file.h>
#include <linux/sunrpc/addr.h> #include <linux/sunrpc/addr.h>
#include <linux/smp.h>
#include "xprt_rdma.h" #include "xprt_rdma.h"
...@@ -656,6 +657,7 @@ xprt_rdma_allocate(struct rpc_task *task) ...@@ -656,6 +657,7 @@ xprt_rdma_allocate(struct rpc_task *task)
task->tk_pid, __func__, rqst->rq_callsize, task->tk_pid, __func__, rqst->rq_callsize,
rqst->rq_rcvsize, req); rqst->rq_rcvsize, req);
req->rl_cpu = smp_processor_id();
req->rl_connect_cookie = 0; /* our reserved value */ req->rl_connect_cookie = 0; /* our reserved value */
rpcrdma_set_xprtdata(rqst, req); rpcrdma_set_xprtdata(rqst, req);
rqst->rq_buffer = req->rl_sendbuf->rg_base; rqst->rq_buffer = req->rl_sendbuf->rg_base;
......
...@@ -83,7 +83,7 @@ rpcrdma_alloc_wq(void) ...@@ -83,7 +83,7 @@ rpcrdma_alloc_wq(void)
struct workqueue_struct *recv_wq; struct workqueue_struct *recv_wq;
recv_wq = alloc_workqueue("xprtrdma_receive", recv_wq = alloc_workqueue("xprtrdma_receive",
WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, WQ_MEM_RECLAIM | WQ_HIGHPRI,
0); 0);
if (!recv_wq) if (!recv_wq)
return -ENOMEM; return -ENOMEM;
......
...@@ -342,6 +342,7 @@ enum { ...@@ -342,6 +342,7 @@ enum {
struct rpcrdma_buffer; struct rpcrdma_buffer;
struct rpcrdma_req { struct rpcrdma_req {
struct list_head rl_list; struct list_head rl_list;
int rl_cpu;
unsigned int rl_connect_cookie; unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer; struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply; struct rpcrdma_rep *rl_reply;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment