Commit 01cde153 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'nfs-for-4.6-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs

Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  Features:
   - Add support for multiple NFSv4.1 callbacks in flight
   - Initial patchset for RPC multipath support
   - Adapt RPC/RDMA to use the new completion queue API

  Bugfixes and cleanups:
   - nfs4: nfs4_ff_layout_prepare_ds should return NULL if connection failed
   - Cleanups to remove nfs_inode_dio_wait and nfs4_file_fsync
   - Fix RPC/RDMA credit accounting
   - Properly handle RDMA_ERROR replies
   - xprtrdma: Do not wait if ib_post_send() fails
   - xprtrdma: Segment head and tail XDR buffers on page boundaries
   - xprtrdma cleanups for dprintk, physical_op_map and unused macros"

* tag 'nfs-for-4.6-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (35 commits)
  nfs/blocklayout: make sure making a aligned read request
  nfs4: nfs4_ff_layout_prepare_ds should return NULL if connection failed
  nfs: remove nfs_inode_dio_wait
  nfs: remove nfs4_file_fsync
  xprtrdma: Use new CQ API for RPC-over-RDMA client send CQs
  xprtrdma: Use an anonymous union in struct rpcrdma_mw
  xprtrdma: Use new CQ API for RPC-over-RDMA client receive CQs
  xprtrdma: Serialize credit accounting again
  xprtrdma: Properly handle RDMA_ERROR replies
  rpcrdma: Add RPCRDMA_HDRLEN_ERR
  xprtrdma: Do not wait if ib_post_send() fails
  xprtrdma: Segment head and tail XDR buffers on page boundaries
  xprtrdma: Clean up dprintk format string containing a newline
  xprtrdma: Clean up physical_op_map()
  xprtrdma: Clean up unused RPCRDMA_INLINE_PAD_THRESH macro
  NFS add callback_ops to nfs4_proc_bind_conn_to_session_callback
  pnfs/NFSv4.1: Add multipath capabilities to pNFS flexfiles servers over NFSv3
  SUNRPC: Allow addition of new transports to a struct rpc_clnt
  NFSv4.1: nfs4_proc_bind_conn_to_session must iterate over all connections
  SUNRPC: Make NFS swap work with multipath
  ...
parents 243d5067 f35592a9
......@@ -743,7 +743,7 @@ bl_set_layoutdriver(struct nfs_server *server, const struct nfs_fh *fh)
static bool
is_aligned_req(struct nfs_pageio_descriptor *pgio,
struct nfs_page *req, unsigned int alignment)
struct nfs_page *req, unsigned int alignment, bool is_write)
{
/*
* Always accept buffered writes, higher layers take care of the
......@@ -758,7 +758,8 @@ is_aligned_req(struct nfs_pageio_descriptor *pgio,
if (IS_ALIGNED(req->wb_bytes, alignment))
return true;
if (req_offset(req) + req->wb_bytes == i_size_read(pgio->pg_inode)) {
if (is_write &&
(req_offset(req) + req->wb_bytes == i_size_read(pgio->pg_inode))) {
/*
* If the write goes up to the inode size, just write
* the full page. Data past the inode size is
......@@ -775,7 +776,7 @@ is_aligned_req(struct nfs_pageio_descriptor *pgio,
static void
bl_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
{
if (!is_aligned_req(pgio, req, SECTOR_SIZE)) {
if (!is_aligned_req(pgio, req, SECTOR_SIZE, false)) {
nfs_pageio_reset_read_mds(pgio);
return;
}
......@@ -791,7 +792,7 @@ static size_t
bl_pg_test_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
struct nfs_page *req)
{
if (!is_aligned_req(pgio, req, SECTOR_SIZE))
if (!is_aligned_req(pgio, req, SECTOR_SIZE, false))
return 0;
return pnfs_generic_pg_test(pgio, prev, req);
}
......@@ -824,7 +825,7 @@ bl_pg_init_write(struct nfs_pageio_descriptor *pgio, struct nfs_page *req)
{
u64 wb_size;
if (!is_aligned_req(pgio, req, PAGE_SIZE)) {
if (!is_aligned_req(pgio, req, PAGE_SIZE, true)) {
nfs_pageio_reset_write_mds(pgio);
return;
}
......@@ -846,7 +847,7 @@ static size_t
bl_pg_test_write(struct nfs_pageio_descriptor *pgio, struct nfs_page *prev,
struct nfs_page *req)
{
if (!is_aligned_req(pgio, req, PAGE_SIZE))
if (!is_aligned_req(pgio, req, PAGE_SIZE, true))
return 0;
return pnfs_generic_pg_test(pgio, prev, req);
}
......
......@@ -37,10 +37,11 @@ enum nfs4_callback_opnum {
OP_CB_ILLEGAL = 10044,
};
struct nfs4_slot;
struct cb_process_state {
__be32 drc_status;
struct nfs_client *clp;
u32 slotid;
struct nfs4_slot *slot;
u32 minorversion;
struct net *net;
};
......
......@@ -354,47 +354,38 @@ __be32 nfs4_callback_devicenotify(struct cb_devicenotifyargs *args,
* a single outstanding callback request at a time.
*/
static __be32
validate_seqid(struct nfs4_slot_table *tbl, struct cb_sequenceargs * args)
validate_seqid(const struct nfs4_slot_table *tbl, const struct nfs4_slot *slot,
const struct cb_sequenceargs * args)
{
struct nfs4_slot *slot;
dprintk("%s enter. slotid %u seqid %u\n",
__func__, args->csa_slotid, args->csa_sequenceid);
dprintk("%s enter. slotid %u seqid %u, slot table seqid: %u\n",
__func__, args->csa_slotid, args->csa_sequenceid, slot->seq_nr);
if (args->csa_slotid >= NFS41_BC_MAX_CALLBACKS)
if (args->csa_slotid > tbl->server_highest_slotid)
return htonl(NFS4ERR_BADSLOT);
slot = tbl->slots + args->csa_slotid;
dprintk("%s slot table seqid: %u\n", __func__, slot->seq_nr);
/* Normal */
if (likely(args->csa_sequenceid == slot->seq_nr + 1))
goto out_ok;
/* Replay */
if (args->csa_sequenceid == slot->seq_nr) {
dprintk("%s seqid %u is a replay\n",
__func__, args->csa_sequenceid);
if (nfs4_test_locked_slot(tbl, slot->slot_nr))
return htonl(NFS4ERR_DELAY);
/* Signal process_op to set this error on next op */
if (args->csa_cachethis == 0)
return htonl(NFS4ERR_RETRY_UNCACHED_REP);
/* The ca_maxresponsesize_cached is 0 with no DRC */
else if (args->csa_cachethis == 1)
return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE);
/* Liar! We never allowed you to set csa_cachethis != 0 */
return htonl(NFS4ERR_SEQ_FALSE_RETRY);
}
/* Wraparound */
if (args->csa_sequenceid == 1 && (slot->seq_nr + 1) == 0) {
slot->seq_nr = 1;
goto out_ok;
}
if (unlikely(slot->seq_nr == 0xFFFFFFFFU)) {
if (args->csa_sequenceid == 1)
return htonl(NFS4_OK);
} else if (likely(args->csa_sequenceid == slot->seq_nr + 1))
return htonl(NFS4_OK);
/* Misordered request */
return htonl(NFS4ERR_SEQ_MISORDERED);
out_ok:
tbl->highest_used_slotid = args->csa_slotid;
return htonl(NFS4_OK);
}
/*
......@@ -473,6 +464,12 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args,
tbl = &clp->cl_session->bc_slot_table;
slot = tbl->slots + args->csa_slotid;
/* Set up res before grabbing the spinlock */
memcpy(&res->csr_sessionid, &args->csa_sessionid,
sizeof(res->csr_sessionid));
res->csr_sequenceid = args->csa_sequenceid;
res->csr_slotid = args->csa_slotid;
spin_lock(&tbl->slot_tbl_lock);
/* state manager is resetting the session */
if (test_bit(NFS4_SLOT_TBL_DRAINING, &tbl->slot_tbl_state)) {
......@@ -485,18 +482,26 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args,
goto out_unlock;
}
memcpy(&res->csr_sessionid, &args->csa_sessionid,
sizeof(res->csr_sessionid));
res->csr_sequenceid = args->csa_sequenceid;
res->csr_slotid = args->csa_slotid;
res->csr_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
res->csr_target_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
status = htonl(NFS4ERR_BADSLOT);
slot = nfs4_lookup_slot(tbl, args->csa_slotid);
if (IS_ERR(slot))
goto out_unlock;
res->csr_highestslotid = tbl->server_highest_slotid;
res->csr_target_highestslotid = tbl->target_highest_slotid;
status = validate_seqid(tbl, args);
status = validate_seqid(tbl, slot, args);
if (status)
goto out_unlock;
if (!nfs4_try_to_lock_slot(tbl, slot)) {
status = htonl(NFS4ERR_DELAY);
goto out_unlock;
}
cps->slot = slot;
cps->slotid = args->csa_slotid;
/* The ca_maxresponsesize_cached is 0 with no DRC */
if (args->csa_cachethis != 0)
return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE);
/*
* Check for pending referring calls. If a match is found, a
......@@ -513,7 +518,7 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args,
* If CB_SEQUENCE returns an error, then the state of the slot
* (sequence ID, cached reply) MUST NOT change.
*/
slot->seq_nr++;
slot->seq_nr = args->csa_sequenceid;
out_unlock:
spin_unlock(&tbl->slot_tbl_lock);
......
......@@ -752,7 +752,8 @@ preprocess_nfs41_op(int nop, unsigned int op_nr, struct callback_op **op)
return htonl(NFS_OK);
}
static void nfs4_callback_free_slot(struct nfs4_session *session)
static void nfs4_callback_free_slot(struct nfs4_session *session,
struct nfs4_slot *slot)
{
struct nfs4_slot_table *tbl = &session->bc_slot_table;
......@@ -761,15 +762,17 @@ static void nfs4_callback_free_slot(struct nfs4_session *session)
* Let the state manager know callback processing done.
* A single slot, so highest used slotid is either 0 or -1
*/
tbl->highest_used_slotid = NFS4_NO_SLOT;
nfs4_free_slot(tbl, slot);
nfs4_slot_tbl_drain_complete(tbl);
spin_unlock(&tbl->slot_tbl_lock);
}
static void nfs4_cb_free_slot(struct cb_process_state *cps)
{
if (cps->slotid != NFS4_NO_SLOT)
nfs4_callback_free_slot(cps->clp->cl_session);
if (cps->slot) {
nfs4_callback_free_slot(cps->clp->cl_session, cps->slot);
cps->slot = NULL;
}
}
#else /* CONFIG_NFS_V4_1 */
......@@ -893,7 +896,6 @@ static __be32 nfs4_callback_compound(struct svc_rqst *rqstp, void *argp, void *r
struct cb_process_state cps = {
.drc_status = 0,
.clp = NULL,
.slotid = NFS4_NO_SLOT,
.net = SVC_NET(rqstp),
};
unsigned int nops = 0;
......
......@@ -233,7 +233,7 @@ EXPORT_SYMBOL_GPL(nfs_file_mmap);
* nfs_file_write() that a write error occurred, and hence cause it to
* fall back to doing a synchronous write.
*/
int
static int
nfs_file_fsync_commit(struct file *file, loff_t start, loff_t end, int datasync)
{
struct nfs_open_context *ctx = nfs_file_open_context(file);
......@@ -263,9 +263,8 @@ nfs_file_fsync_commit(struct file *file, loff_t start, loff_t end, int datasync)
out:
return ret;
}
EXPORT_SYMBOL_GPL(nfs_file_fsync_commit);
static int
int
nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
{
int ret;
......@@ -273,13 +272,15 @@ nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
trace_nfs_fsync_enter(inode);
nfs_inode_dio_wait(inode);
inode_dio_wait(inode);
do {
ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
if (ret != 0)
break;
inode_lock(inode);
ret = nfs_file_fsync_commit(file, start, end, datasync);
if (!ret)
ret = pnfs_sync_inode(inode, !!datasync);
inode_unlock(inode);
/*
* If nfs_file_fsync_commit detected a server reboot, then
......@@ -293,6 +294,7 @@ nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
trace_nfs_fsync_exit(inode, ret);
return ret;
}
EXPORT_SYMBOL_GPL(nfs_file_fsync);
/*
* Decide whether a read/modify/write cycle may be more efficient
......@@ -368,7 +370,7 @@ static int nfs_write_begin(struct file *file, struct address_space *mapping,
/*
* Wait for O_DIRECT to complete
*/
nfs_inode_dio_wait(mapping->host);
inode_dio_wait(mapping->host);
page = grab_cache_page_write_begin(mapping, index, flags);
if (!page)
......
......@@ -418,6 +418,8 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx,
pnfs_error_mark_layout_for_return(ino, lseg);
} else
pnfs_error_mark_layout_for_return(ino, lseg);
ds = NULL;
goto out;
}
out_update_creds:
if (ff_layout_update_mirror_cred(mirror, ds))
......
......@@ -141,7 +141,7 @@ void nfs_evict_inode(struct inode *inode)
int nfs_sync_inode(struct inode *inode)
{
nfs_inode_dio_wait(inode);
inode_dio_wait(inode);
return nfs_wb_all(inode);
}
EXPORT_SYMBOL_GPL(nfs_sync_inode);
......
......@@ -358,7 +358,7 @@ int nfs_mknod(struct inode *, struct dentry *, umode_t, dev_t);
int nfs_rename(struct inode *, struct dentry *, struct inode *, struct dentry *);
/* file.c */
int nfs_file_fsync_commit(struct file *, loff_t, loff_t, int);
int nfs_file_fsync(struct file *file, loff_t start, loff_t end, int datasync);
loff_t nfs_file_llseek(struct file *, loff_t, int);
ssize_t nfs_file_read(struct kiocb *, struct iov_iter *);
ssize_t nfs_file_splice_read(struct file *, loff_t *, struct pipe_inode_info *,
......@@ -515,10 +515,6 @@ extern int nfs_sillyrename(struct inode *dir, struct dentry *dentry);
/* direct.c */
void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo,
struct nfs_direct_req *dreq);
static inline void nfs_inode_dio_wait(struct inode *inode)
{
inode_dio_wait(inode);
}
extern ssize_t nfs_dreq_bytes_left(struct nfs_direct_req *dreq);
/* nfs4proc.c */
......
......@@ -128,37 +128,6 @@ nfs4_file_flush(struct file *file, fl_owner_t id)
return vfs_fsync(file, 0);
}
static int
nfs4_file_fsync(struct file *file, loff_t start, loff_t end, int datasync)
{
int ret;
struct inode *inode = file_inode(file);
trace_nfs_fsync_enter(inode);
nfs_inode_dio_wait(inode);
do {
ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
if (ret != 0)
break;
inode_lock(inode);
ret = nfs_file_fsync_commit(file, start, end, datasync);
if (!ret)
ret = pnfs_sync_inode(inode, !!datasync);
inode_unlock(inode);
/*
* If nfs_file_fsync_commit detected a server reboot, then
* resend all dirty pages that might have been covered by
* the NFS_CONTEXT_RESEND_WRITES flag
*/
start = 0;
end = LLONG_MAX;
} while (ret == -EAGAIN);
trace_nfs_fsync_exit(inode, ret);
return ret;
}
#ifdef CONFIG_NFS_V4_2
static loff_t nfs4_file_llseek(struct file *filep, loff_t offset, int whence)
{
......@@ -266,7 +235,7 @@ const struct file_operations nfs4_file_operations = {
.open = nfs4_file_open,
.flush = nfs4_file_flush,
.release = nfs_file_release,
.fsync = nfs4_file_fsync,
.fsync = nfs_file_fsync,
.lock = nfs_lock,
.flock = nfs_flock,
.splice_read = nfs_file_splice_read,
......
......@@ -6783,13 +6783,26 @@ nfs41_same_server_scope(struct nfs41_server_scope *a,
return false;
}
static void
nfs4_bind_one_conn_to_session_done(struct rpc_task *task, void *calldata)
{
}
static const struct rpc_call_ops nfs4_bind_one_conn_to_session_ops = {
.rpc_call_done = &nfs4_bind_one_conn_to_session_done,
};
/*
* nfs4_proc_bind_conn_to_session()
* nfs4_proc_bind_one_conn_to_session()
*
* The 4.1 client currently uses the same TCP connection for the
* fore and backchannel.
*/
int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred)
static
int nfs4_proc_bind_one_conn_to_session(struct rpc_clnt *clnt,
struct rpc_xprt *xprt,
struct nfs_client *clp,
struct rpc_cred *cred)
{
int status;
struct nfs41_bind_conn_to_session_args args = {
......@@ -6804,6 +6817,14 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred
.rpc_resp = &res,
.rpc_cred = cred,
};
struct rpc_task_setup task_setup_data = {
.rpc_client = clnt,
.rpc_xprt = xprt,
.callback_ops = &nfs4_bind_one_conn_to_session_ops,
.rpc_message = &msg,
.flags = RPC_TASK_TIMEOUT,
};
struct rpc_task *task;
dprintk("--> %s\n", __func__);
......@@ -6811,7 +6832,16 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred
if (!(clp->cl_session->flags & SESSION4_BACK_CHAN))
args.dir = NFS4_CDFC4_FORE;
status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
/* Do not set the backchannel flag unless this is clnt->cl_xprt */
if (xprt != rcu_access_pointer(clnt->cl_xprt))
args.dir = NFS4_CDFC4_FORE;
task = rpc_run_task(&task_setup_data);
if (!IS_ERR(task)) {
status = task->tk_status;
rpc_put_task(task);
} else
status = PTR_ERR(task);
trace_nfs4_bind_conn_to_session(clp, status);
if (status == 0) {
if (memcmp(res.sessionid.data,
......@@ -6838,6 +6868,31 @@ int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred
return status;
}
struct rpc_bind_conn_calldata {
struct nfs_client *clp;
struct rpc_cred *cred;
};
static int
nfs4_proc_bind_conn_to_session_callback(struct rpc_clnt *clnt,
struct rpc_xprt *xprt,
void *calldata)
{
struct rpc_bind_conn_calldata *p = calldata;
return nfs4_proc_bind_one_conn_to_session(clnt, xprt, p->clp, p->cred);
}
int nfs4_proc_bind_conn_to_session(struct nfs_client *clp, struct rpc_cred *cred)
{
struct rpc_bind_conn_calldata data = {
.clp = clp,
.cred = cred,
};
return rpc_clnt_iterate_for_each_xprt(clp->cl_rpcclient,
nfs4_proc_bind_conn_to_session_callback, &data);
}
/*
* Minimum set of SP4_MACH_CRED operations from RFC 5661 in the enforce map
* and operations we'd like to see to enable certain features in the allow map
......@@ -7320,7 +7375,7 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args)
args->bc_attrs.max_resp_sz = PAGE_SIZE;
args->bc_attrs.max_resp_sz_cached = 0;
args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
args->bc_attrs.max_reqs = 1;
args->bc_attrs.max_reqs = NFS41_BC_MAX_CALLBACKS;
dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u "
"max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n",
......
......@@ -135,6 +135,43 @@ static struct nfs4_slot *nfs4_find_or_create_slot(struct nfs4_slot_table *tbl,
return ERR_PTR(-ENOMEM);
}
static void nfs4_lock_slot(struct nfs4_slot_table *tbl,
struct nfs4_slot *slot)
{
u32 slotid = slot->slot_nr;
__set_bit(slotid, tbl->used_slots);
if (slotid > tbl->highest_used_slotid ||
tbl->highest_used_slotid == NFS4_NO_SLOT)
tbl->highest_used_slotid = slotid;
slot->generation = tbl->generation;
}
/*
* nfs4_try_to_lock_slot - Given a slot try to allocate it
*
* Note: must be called with the slot_tbl_lock held.
*/
bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot)
{
if (nfs4_test_locked_slot(tbl, slot->slot_nr))
return false;
nfs4_lock_slot(tbl, slot);
return true;
}
/*
* nfs4_lookup_slot - Find a slot but don't allocate it
*
* Note: must be called with the slot_tbl_lock held.
*/
struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid)
{
if (slotid <= tbl->max_slotid)
return nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT);
return ERR_PTR(-E2BIG);
}
/*
* nfs4_alloc_slot - efficiently look for a free slot
*
......@@ -153,18 +190,11 @@ struct nfs4_slot *nfs4_alloc_slot(struct nfs4_slot_table *tbl)
__func__, tbl->used_slots[0], tbl->highest_used_slotid,
tbl->max_slotid + 1);
slotid = find_first_zero_bit(tbl->used_slots, tbl->max_slotid + 1);
if (slotid > tbl->max_slotid)
goto out;
ret = nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT);
if (IS_ERR(ret))
goto out;
__set_bit(slotid, tbl->used_slots);
if (slotid > tbl->highest_used_slotid ||
tbl->highest_used_slotid == NFS4_NO_SLOT)
tbl->highest_used_slotid = slotid;
ret->generation = tbl->generation;
out:
if (slotid <= tbl->max_slotid) {
ret = nfs4_find_or_create_slot(tbl, slotid, 1, GFP_NOWAIT);
if (!IS_ERR(ret))
nfs4_lock_slot(tbl, ret);
}
dprintk("<-- %s used_slots=%04lx highest_used=%u slotid=%u\n",
__func__, tbl->used_slots[0], tbl->highest_used_slotid,
!IS_ERR(ret) ? ret->slot_nr : NFS4_NO_SLOT);
......
......@@ -77,6 +77,8 @@ extern int nfs4_setup_slot_table(struct nfs4_slot_table *tbl,
unsigned int max_reqs, const char *queue);
extern void nfs4_shutdown_slot_table(struct nfs4_slot_table *tbl);
extern struct nfs4_slot *nfs4_alloc_slot(struct nfs4_slot_table *tbl);
extern struct nfs4_slot *nfs4_lookup_slot(struct nfs4_slot_table *tbl, u32 slotid);
extern bool nfs4_try_to_lock_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot);
extern void nfs4_free_slot(struct nfs4_slot_table *tbl, struct nfs4_slot *slot);
extern void nfs4_slot_tbl_drain_complete(struct nfs4_slot_table *tbl);
bool nfs41_wake_and_assign_slot(struct nfs4_slot_table *tbl,
......@@ -88,6 +90,12 @@ static inline bool nfs4_slot_tbl_draining(struct nfs4_slot_table *tbl)
return !!test_bit(NFS4_SLOT_TBL_DRAINING, &tbl->slot_tbl_state);
}
static inline bool nfs4_test_locked_slot(const struct nfs4_slot_table *tbl,
u32 slotid)
{
return !!test_bit(slotid, tbl->used_slots);
}
#if defined(CONFIG_NFS_V4_1)
extern void nfs41_set_target_slotid(struct nfs4_slot_table *tbl,
u32 target_highest_slotid);
......
......@@ -606,12 +606,22 @@ static int _nfs4_pnfs_v3_ds_connect(struct nfs_server *mds_srv,
dprintk("%s: DS %s: trying address %s\n",
__func__, ds->ds_remotestr, da->da_remotestr);
clp = get_v3_ds_connect(mds_srv->nfs_client,
if (!IS_ERR(clp)) {
struct xprt_create xprt_args = {
.ident = XPRT_TRANSPORT_TCP,
.net = clp->cl_net,
.dstaddr = (struct sockaddr *)&da->da_addr,
.addrlen = da->da_addrlen,
.servername = clp->cl_hostname,
};
/* Add this address as an alias */
rpc_clnt_add_xprt(clp->cl_rpcclient, &xprt_args,
rpc_clnt_test_and_add_xprt, NULL);
} else
clp = get_v3_ds_connect(mds_srv->nfs_client,
(struct sockaddr *)&da->da_addr,
da->da_addrlen, IPPROTO_TCP,
timeo, retrans, au_flavor);
if (!IS_ERR(clp))
break;
}
if (IS_ERR(clp)) {
......
......@@ -25,6 +25,7 @@
#include <asm/signal.h>
#include <linux/path.h>
#include <net/ipv6.h>
#include <linux/sunrpc/xprtmultipath.h>
struct rpc_inode;
......@@ -67,6 +68,7 @@ struct rpc_clnt {
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
struct dentry *cl_debugfs; /* debugfs directory */
#endif
struct rpc_xprt_iter cl_xpi;
};
/*
......@@ -139,7 +141,6 @@ struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args,
struct rpc_xprt *xprt);
struct rpc_clnt *rpc_bind_new_program(struct rpc_clnt *,
const struct rpc_program *, u32);
void rpc_task_reset_client(struct rpc_task *task, struct rpc_clnt *clnt);
struct rpc_clnt *rpc_clone_client(struct rpc_clnt *);
struct rpc_clnt *rpc_clone_client_set_auth(struct rpc_clnt *,
rpc_authflavor_t);
......@@ -181,6 +182,21 @@ size_t rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t);
const char *rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t);
int rpc_localaddr(struct rpc_clnt *, struct sockaddr *, size_t);
int rpc_clnt_iterate_for_each_xprt(struct rpc_clnt *clnt,
int (*fn)(struct rpc_clnt *, struct rpc_xprt *, void *),
void *data);
int rpc_clnt_test_and_add_xprt(struct rpc_clnt *clnt,
struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt,
void *dummy);
int rpc_clnt_add_xprt(struct rpc_clnt *, struct xprt_create *,
int (*setup)(struct rpc_clnt *,
struct rpc_xprt_switch *,
struct rpc_xprt *,
void *),
void *data);
const char *rpc_proc_name(const struct rpc_task *task);
#endif /* __KERNEL__ */
#endif /* _LINUX_SUNRPC_CLNT_H */
......@@ -93,6 +93,12 @@ struct rpcrdma_msg {
__be32 rm_pempty[3]; /* 3 empty chunk lists */
} rm_padded;
struct {
__be32 rm_err;
__be32 rm_vers_low;
__be32 rm_vers_high;
} rm_error;
__be32 rm_chunks[0]; /* read, write and reply chunks */
} rm_body;
......@@ -102,17 +108,13 @@ struct rpcrdma_msg {
* Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
*/
#define RPCRDMA_HDRLEN_MIN (sizeof(__be32) * 7)
#define RPCRDMA_HDRLEN_ERR (sizeof(__be32) * 5)
enum rpcrdma_errcode {
ERR_VERS = 1,
ERR_CHUNK = 2
};
struct rpcrdma_err_vers {
uint32_t rdma_vers_low; /* Version range supported by peer */
uint32_t rdma_vers_high;
};
enum rpcrdma_proc {
RDMA_MSG = 0, /* An RPC call or reply msg */
RDMA_NOMSG = 1, /* An RPC call or reply msg - separate body */
......
......@@ -42,40 +42,43 @@ struct rpc_wait {
*/
struct rpc_task {
atomic_t tk_count; /* Reference count */
int tk_status; /* result of last operation */
struct list_head tk_task; /* global list of tasks */
struct rpc_clnt * tk_client; /* RPC client */
struct rpc_rqst * tk_rqstp; /* RPC request */
/*
* RPC call state
*/
struct rpc_message tk_msg; /* RPC call info */
/*
* callback to be executed after waking up
* action next procedure for async tasks
* tk_ops caller callbacks
*/
void (*tk_callback)(struct rpc_task *);
void (*tk_action)(struct rpc_task *);
const struct rpc_call_ops *tk_ops;
void * tk_calldata;
unsigned long tk_timeout; /* timeout for rpc_sleep() */
unsigned long tk_runstate; /* Task run status */
struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could
* be any workqueue
*/
struct rpc_wait_queue *tk_waitqueue; /* RPC wait queue we're on */
union {
struct work_struct tk_work; /* Async task work queue */
struct rpc_wait tk_wait; /* RPC wait */
} u;
/*
* RPC call state
*/
struct rpc_message tk_msg; /* RPC call info */
void * tk_calldata; /* Caller private data */
const struct rpc_call_ops *tk_ops; /* Caller callbacks */
struct rpc_clnt * tk_client; /* RPC client */
struct rpc_xprt * tk_xprt; /* Transport */
struct rpc_rqst * tk_rqstp; /* RPC request */
struct workqueue_struct *tk_workqueue; /* Normally rpciod, but could
* be any workqueue
*/
ktime_t tk_start; /* RPC task init timestamp */
pid_t tk_owner; /* Process id for batching tasks */
int tk_status; /* result of last operation */
unsigned short tk_flags; /* misc flags */
unsigned short tk_timeouts; /* maj timeouts */
......@@ -100,6 +103,7 @@ struct rpc_call_ops {
struct rpc_task_setup {
struct rpc_task *task;
struct rpc_clnt *rpc_client;
struct rpc_xprt *rpc_xprt;
const struct rpc_message *rpc_message;
const struct rpc_call_ops *callback_ops;
void *callback_data;
......
......@@ -13,6 +13,7 @@
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/ktime.h>
#include <linux/kref.h>
#include <linux/sunrpc/sched.h>
#include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/msg_prot.h>
......@@ -166,7 +167,7 @@ enum xprt_transports {
};
struct rpc_xprt {
atomic_t count; /* Reference count */
struct kref kref; /* Reference count */
struct rpc_xprt_ops * ops; /* transport methods */
const struct rpc_timeout *timeout; /* timeout parms */
......@@ -196,6 +197,11 @@ struct rpc_xprt {
transport */
unsigned int bind_index; /* bind function index */
/*
* Multipath
*/
struct list_head xprt_switch;
/*
* Connection of transports
*/
......@@ -256,6 +262,7 @@ struct rpc_xprt {
struct dentry *debugfs; /* debugfs directory */
atomic_t inject_disconnect;
#endif
struct rcu_head rcu;
};
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
......@@ -318,24 +325,13 @@ int xprt_adjust_timeout(struct rpc_rqst *req);
void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task);
void xprt_release(struct rpc_task *task);
struct rpc_xprt * xprt_get(struct rpc_xprt *xprt);
void xprt_put(struct rpc_xprt *xprt);
struct rpc_xprt * xprt_alloc(struct net *net, size_t size,
unsigned int num_prealloc,
unsigned int max_req);
void xprt_free(struct rpc_xprt *);
/**
* xprt_get - return a reference to an RPC transport.
* @xprt: pointer to the transport
*
*/
static inline struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
{
if (atomic_inc_not_zero(&xprt->count))
return xprt;
return NULL;
}
static inline __be32 *xprt_skip_transport_header(struct rpc_xprt *xprt, __be32 *p)
{
return p + xprt->tsh_size;
......
/*
* RPC client multipathing definitions
*
* Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
*
* Trond Myklebust <trond.myklebust@primarydata.com>
*/
#ifndef _NET_SUNRPC_XPRTMULTIPATH_H
#define _NET_SUNRPC_XPRTMULTIPATH_H
struct rpc_xprt_iter_ops;
struct rpc_xprt_switch {
spinlock_t xps_lock;
struct kref xps_kref;
unsigned int xps_nxprts;
struct list_head xps_xprt_list;
struct net * xps_net;
const struct rpc_xprt_iter_ops *xps_iter_ops;
struct rcu_head xps_rcu;
};
struct rpc_xprt_iter {
struct rpc_xprt_switch __rcu *xpi_xpswitch;
struct rpc_xprt * xpi_cursor;
const struct rpc_xprt_iter_ops *xpi_ops;
};
struct rpc_xprt_iter_ops {
void (*xpi_rewind)(struct rpc_xprt_iter *);
struct rpc_xprt *(*xpi_xprt)(struct rpc_xprt_iter *);
struct rpc_xprt *(*xpi_next)(struct rpc_xprt_iter *);
};
extern struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
gfp_t gfp_flags);
extern struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps);
extern void xprt_switch_put(struct rpc_xprt_switch *xps);
extern void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps);
extern void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt);
extern void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
struct rpc_xprt *xprt);
extern void xprt_iter_init(struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *xps);
extern void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *xps);
extern void xprt_iter_destroy(struct rpc_xprt_iter *xpi);
extern struct rpc_xprt_switch *xprt_iter_xchg_switch(
struct rpc_xprt_iter *xpi,
struct rpc_xprt_switch *newswitch);
extern struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi);
extern struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi);
extern struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi);
#endif
......@@ -54,8 +54,6 @@
#define RPCRDMA_DEF_INLINE (1024) /* default inline max */
#define RPCRDMA_INLINE_PAD_THRESH (512)/* payload threshold to pad (bytes) */
/* Memory registration strategies, by number.
* This is part of a kernel / user space API. Do not remove. */
enum rpcrdma_memreg {
......
......@@ -12,7 +12,8 @@ sunrpc-y := clnt.o xprt.o socklib.o xprtsock.o sched.o \
svc.o svcsock.o svcauth.o svcauth_unix.o \
addr.o rpcb_clnt.o timer.o xdr.o \
sunrpc_syms.o cache.o rpc_pipe.o \
svc_xprt.o
svc_xprt.o \
xprtmultipath.o
sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o
sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o
sunrpc-$(CONFIG_PROC_FS) += stats.o
......
......@@ -1181,12 +1181,12 @@ static struct rpc_auth *
gss_create(struct rpc_auth_create_args *args, struct rpc_clnt *clnt)
{
struct gss_auth *gss_auth;
struct rpc_xprt *xprt = rcu_access_pointer(clnt->cl_xprt);
struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
while (clnt != clnt->cl_parent) {
struct rpc_clnt *parent = clnt->cl_parent;
/* Find the original parent for this transport */
if (rcu_access_pointer(parent->cl_xprt) != xprt)
if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
break;
clnt = parent;
}
......
This diff is collapsed.
......@@ -648,10 +648,10 @@ static struct rpc_task *rpcb_call_async(struct rpc_clnt *rpcb_clnt, struct rpcbi
static struct rpc_clnt *rpcb_find_transport_owner(struct rpc_clnt *clnt)
{
struct rpc_clnt *parent = clnt->cl_parent;
struct rpc_xprt *xprt = rcu_dereference(clnt->cl_xprt);
struct rpc_xprt_switch *xps = rcu_access_pointer(clnt->cl_xpi.xpi_xpswitch);
while (parent != clnt) {
if (rcu_dereference(parent->cl_xprt) != xprt)
if (rcu_access_pointer(parent->cl_xpi.xpi_xpswitch) != xps)
break;
if (clnt->cl_autobind)
break;
......@@ -683,11 +683,9 @@ void rpcb_getport_async(struct rpc_task *task)
int status;
rcu_read_lock();
do {
clnt = rpcb_find_transport_owner(task->tk_client);
xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
} while (xprt == NULL);
clnt = rpcb_find_transport_owner(task->tk_client);
rcu_read_unlock();
xprt = xprt_get(task->tk_xprt);
dprintk("RPC: %5u %s(%s, %u, %u, %d)\n",
task->tk_pid, __func__,
......
......@@ -909,6 +909,8 @@ static void rpc_init_task(struct rpc_task *task, const struct rpc_task_setup *ta
/* Initialize workqueue for async tasks */
task->tk_workqueue = task_setup_data->workqueue;
task->tk_xprt = xprt_get(task_setup_data->rpc_xprt);
if (task->tk_ops->rpc_call_prepare != NULL)
task->tk_action = rpc_prepare_task;
......
......@@ -48,6 +48,7 @@
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/metrics.h>
#include <linux/sunrpc/bc_xprt.h>
#include <linux/rcupdate.h>
#include <trace/events/sunrpc.h>
......@@ -1166,7 +1167,7 @@ void xprt_free(struct rpc_xprt *xprt)
{
put_net(xprt->xprt_net);
xprt_free_all_slots(xprt);
kfree(xprt);
kfree_rcu(xprt, rcu);
}
EXPORT_SYMBOL_GPL(xprt_free);
......@@ -1180,7 +1181,7 @@ EXPORT_SYMBOL_GPL(xprt_free);
*/
void xprt_reserve(struct rpc_task *task)
{
struct rpc_xprt *xprt;
struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
if (task->tk_rqstp != NULL)
......@@ -1188,11 +1189,8 @@ void xprt_reserve(struct rpc_task *task)
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
rcu_read_lock();
xprt = rcu_dereference(task->tk_client->cl_xprt);
if (!xprt_throttle_congested(xprt, task))
xprt->ops->alloc_slot(xprt, task);
rcu_read_unlock();
}
/**
......@@ -1206,7 +1204,7 @@ void xprt_reserve(struct rpc_task *task)
*/
void xprt_retry_reserve(struct rpc_task *task)
{
struct rpc_xprt *xprt;
struct rpc_xprt *xprt = task->tk_xprt;
task->tk_status = 0;
if (task->tk_rqstp != NULL)
......@@ -1214,10 +1212,7 @@ void xprt_retry_reserve(struct rpc_task *task)
task->tk_timeout = 0;
task->tk_status = -EAGAIN;
rcu_read_lock();
xprt = rcu_dereference(task->tk_client->cl_xprt);
xprt->ops->alloc_slot(xprt, task);
rcu_read_unlock();
}
static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
......@@ -1264,11 +1259,9 @@ void xprt_release(struct rpc_task *task)
if (req == NULL) {
if (task->tk_client) {
rcu_read_lock();
xprt = rcu_dereference(task->tk_client->cl_xprt);
xprt = task->tk_xprt;
if (xprt->snd_task == task)
xprt_release_write(xprt, task);
rcu_read_unlock();
}
return;
}
......@@ -1307,7 +1300,7 @@ void xprt_release(struct rpc_task *task)
static void xprt_init(struct rpc_xprt *xprt, struct net *net)
{
atomic_set(&xprt->count, 1);
kref_init(&xprt->kref);
spin_lock_init(&xprt->transport_lock);
spin_lock_init(&xprt->reserve_lock);
......@@ -1318,6 +1311,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
spin_lock_init(&xprt->bc_pa_lock);
INIT_LIST_HEAD(&xprt->bc_pa_list);
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
INIT_LIST_HEAD(&xprt->xprt_switch);
xprt->last_used = jiffies;
xprt->cwnd = RPC_INITCWND;
......@@ -1415,6 +1409,24 @@ static void xprt_destroy(struct rpc_xprt *xprt)
xprt->ops->destroy(xprt);
}
static void xprt_destroy_kref(struct kref *kref)
{
xprt_destroy(container_of(kref, struct rpc_xprt, kref));
}
/**
* xprt_get - return a reference to an RPC transport.
* @xprt: pointer to the transport
*
*/
struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
{
if (xprt != NULL && kref_get_unless_zero(&xprt->kref))
return xprt;
return NULL;
}
EXPORT_SYMBOL_GPL(xprt_get);
/**
* xprt_put - release a reference to an RPC transport.
* @xprt: pointer to the transport
......@@ -1422,7 +1434,7 @@ static void xprt_destroy(struct rpc_xprt *xprt)
*/
void xprt_put(struct rpc_xprt *xprt)
{
if (atomic_dec_and_test(&xprt->count))
xprt_destroy(xprt);
if (xprt != NULL)
kref_put(&xprt->kref, xprt_destroy_kref);
}
EXPORT_SYMBOL_GPL(xprt_put);
This diff is collapsed.
......@@ -80,13 +80,13 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
if (!r)
goto out;
r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
sizeof(u64), GFP_KERNEL);
if (!r->r.fmr.physaddrs)
r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
sizeof(u64), GFP_KERNEL);
if (!r->fmr.physaddrs)
goto out_free;
r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
if (IS_ERR(r->r.fmr.fmr))
r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
if (IS_ERR(r->fmr.fmr))
goto out_fmr_err;
list_add(&r->mw_list, &buf->rb_mws);
......@@ -95,9 +95,9 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
return 0;
out_fmr_err:
rc = PTR_ERR(r->r.fmr.fmr);
rc = PTR_ERR(r->fmr.fmr);
dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc);
kfree(r->r.fmr.physaddrs);
kfree(r->fmr.physaddrs);
out_free:
kfree(r);
out:
......@@ -109,7 +109,7 @@ __fmr_unmap(struct rpcrdma_mw *r)
{
LIST_HEAD(l);
list_add(&r->r.fmr.fmr->list, &l);
list_add(&r->fmr.fmr->list, &l);
return ib_unmap_fmr(&l);
}
......@@ -148,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
nsegs = RPCRDMA_MAX_FMR_SGES;
for (i = 0; i < nsegs;) {
rpcrdma_map_one(device, seg, direction);
mw->r.fmr.physaddrs[i] = seg->mr_dma;
mw->fmr.physaddrs[i] = seg->mr_dma;
len += seg->mr_len;
++seg;
++i;
......@@ -158,13 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
break;
}
rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs,
i, seg1->mr_dma);
if (rc)
goto out_maperr;
seg1->rl_mw = mw;
seg1->mr_rkey = mw->r.fmr.fmr->rkey;
seg1->mr_rkey = mw->fmr.fmr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
seg1->mr_len = len;
......@@ -219,7 +219,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
seg = &req->rl_segments[i];
mw = seg->rl_mw;
list_add(&mw->r.fmr.fmr->list, &unmap_list);
list_add(&mw->fmr.fmr->list, &unmap_list);
i += seg->mr_nsegs;
}
......@@ -281,9 +281,9 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
kfree(r->r.fmr.physaddrs);
kfree(r->fmr.physaddrs);
rc = ib_dealloc_fmr(r->r.fmr.fmr);
rc = ib_dealloc_fmr(r->fmr.fmr);
if (rc)
dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
__func__, rc);
......
......@@ -109,20 +109,20 @@ static void
__frwr_recovery_worker(struct work_struct *work)
{
struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
r.frmr.fr_work);
struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
frmr.fr_work);
struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt;
unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
if (ib_dereg_mr(r->r.frmr.fr_mr))
if (ib_dereg_mr(r->frmr.fr_mr))
goto out_fail;
r->r.frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
if (IS_ERR(r->r.frmr.fr_mr))
r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
if (IS_ERR(r->frmr.fr_mr))
goto out_fail;
dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
r->r.frmr.fr_state = FRMR_IS_INVALID;
r->frmr.fr_state = FRMR_IS_INVALID;
rpcrdma_put_mw(r_xprt, r);
return;
......@@ -137,15 +137,15 @@ __frwr_recovery_worker(struct work_struct *work)
static void
__frwr_queue_recovery(struct rpcrdma_mw *r)
{
INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker);
queue_work(frwr_recovery_wq, &r->frmr.fr_work);
}
static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
unsigned int depth)
{
struct rpcrdma_frmr *f = &r->r.frmr;
struct rpcrdma_frmr *f = &r->frmr;
int rc;
f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
......@@ -158,6 +158,8 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
sg_init_table(f->sg, depth);
init_completion(&f->fr_linv_done);
return 0;
out_mr_err:
......@@ -179,11 +181,11 @@ __frwr_release(struct rpcrdma_mw *r)
{
int rc;
rc = ib_dereg_mr(r->r.frmr.fr_mr);
rc = ib_dereg_mr(r->frmr.fr_mr);
if (rc)
dprintk("RPC: %s: ib_dereg_mr status %i\n",
__func__, rc);
kfree(r->r.frmr.sg);
kfree(r->frmr.sg);
}
static int
......@@ -244,39 +246,76 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
}
/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs
* to be reset.
static void
__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_frmr *frmr,
const char *wr)
{
frmr->fr_state = FRMR_IS_STALE;
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("rpcrdma: %s: %s (%u/0x%x)\n",
wr, ib_wc_status_msg(wc->status),
wc->status, wc->vendor_err);
}
/**
* frwr_wc_fastreg - Invoked by RDMA provider for each polled FastReg WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
* WARNING: Only wr_id and status are reliable at this point
*/
static void
__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r)
frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc)
{
if (likely(wc->status == IB_WC_SUCCESS))
return;
/* WARNING: Only wr_id and status are reliable at this point */
r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
if (wc->status == IB_WC_WR_FLUSH_ERR)
dprintk("RPC: %s: frmr %p flushed\n", __func__, r);
else
pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
__func__, r, ib_wc_status_msg(wc->status), wc->status);
struct rpcrdma_frmr *frmr;
struct ib_cqe *cqe;
r->r.frmr.fr_state = FRMR_IS_STALE;
/* WARNING: Only wr_cqe and status are reliable at this point */
if (wc->status != IB_WC_SUCCESS) {
cqe = wc->wr_cqe;
frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
__frwr_sendcompletion_flush(wc, frmr, "fastreg");
}
}
/**
* frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
*/
static void
frwr_sendcompletion(struct ib_wc *wc)
frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc)
{
struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
struct rpcrdma_frmr *f = &r->r.frmr;
struct rpcrdma_frmr *frmr;
struct ib_cqe *cqe;
if (unlikely(wc->status != IB_WC_SUCCESS))
__frwr_sendcompletion_flush(wc, r);
/* WARNING: Only wr_cqe and status are reliable at this point */
if (wc->status != IB_WC_SUCCESS) {
cqe = wc->wr_cqe;
frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
__frwr_sendcompletion_flush(wc, frmr, "localinv");
}
}
if (f->fr_waiter)
complete(&f->fr_linv_done);
/**
* frwr_wc_localinv - Invoked by RDMA provider for each polled LocalInv WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
* Awaken anyone waiting for an MR to finish being fenced.
*/
static void
frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc)
{
struct rpcrdma_frmr *frmr;
struct ib_cqe *cqe;
/* WARNING: Only wr_cqe and status are reliable at this point */
cqe = wc->wr_cqe;
frmr = container_of(cqe, struct rpcrdma_frmr, fr_cqe);
if (wc->status != IB_WC_SUCCESS)
__frwr_sendcompletion_flush(wc, frmr, "localinv");
complete_all(&frmr->fr_linv_done);
}
static int
......@@ -313,8 +352,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all);
r->mw_sendcompletion = frwr_sendcompletion;
r->r.frmr.fr_xprt = r_xprt;
r->frmr.fr_xprt = r_xprt;
}
return 0;
......@@ -347,10 +385,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
mw = rpcrdma_get_mw(r_xprt);
if (!mw)
return -ENOMEM;
} while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
frmr = &mw->r.frmr;
} while (mw->frmr.fr_state != FRMR_IS_INVALID);
frmr = &mw->frmr;
frmr->fr_state = FRMR_IS_VALID;
frmr->fr_waiter = false;
mr = frmr->fr_mr;
reg_wr = &frmr->fr_regwr;
......@@ -400,7 +437,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
reg_wr->wr.next = NULL;
reg_wr->wr.opcode = IB_WR_REG_MR;
reg_wr->wr.wr_id = (uintptr_t)mw;
frmr->fr_cqe.done = frwr_wc_fastreg;
reg_wr->wr.wr_cqe = &frmr->fr_cqe;
reg_wr->wr.num_sge = 0;
reg_wr->wr.send_flags = 0;
reg_wr->mr = mr;
......@@ -434,15 +472,15 @@ static struct ib_send_wr *
__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_mw *mw = seg->rl_mw;
struct rpcrdma_frmr *f = &mw->r.frmr;
struct rpcrdma_frmr *f = &mw->frmr;
struct ib_send_wr *invalidate_wr;
f->fr_waiter = false;
f->fr_state = FRMR_IS_INVALID;
invalidate_wr = &f->fr_invwr;
memset(invalidate_wr, 0, sizeof(*invalidate_wr));
invalidate_wr->wr_id = (unsigned long)(void *)mw;
f->fr_cqe.done = frwr_wc_localinv;
invalidate_wr->wr_cqe = &f->fr_cqe;
invalidate_wr->opcode = IB_WR_LOCAL_INV;
invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey;
......@@ -455,7 +493,7 @@ __frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
{
struct ib_device *device = r_xprt->rx_ia.ri_device;
struct rpcrdma_mw *mw = seg->rl_mw;
struct rpcrdma_frmr *f = &mw->r.frmr;
struct rpcrdma_frmr *f = &mw->frmr;
seg->rl_mw = NULL;
......@@ -504,15 +542,15 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
i += seg->mr_nsegs;
}
f = &seg->rl_mw->r.frmr;
f = &seg->rl_mw->frmr;
/* Strong send queue ordering guarantees that when the
* last WR in the chain completes, all WRs in the chain
* are complete.
*/
f->fr_invwr.send_flags = IB_SEND_SIGNALED;
f->fr_waiter = true;
init_completion(&f->fr_linv_done);
f->fr_cqe.done = frwr_wc_localinv_wake;
reinit_completion(&f->fr_linv_done);
INIT_CQCOUNT(&r_xprt->rx_ep);
/* Transport disconnect drains the receive CQ before it
......@@ -520,14 +558,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* unless ri_id->qp is a valid pointer.
*/
rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr);
if (rc)
if (rc) {
pr_warn("%s: ib_post_send failed %i\n", __func__, rc);
rdma_disconnect(ia->ri_id);
goto unmap;
}
wait_for_completion(&f->fr_linv_done);
/* ORDER: Now DMA unmap all of the req's MRs, and return
* them to the free MW list.
*/
unmap:
for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
seg = &req->rl_segments[i];
......@@ -549,7 +591,7 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mw *mw = seg1->rl_mw;
struct rpcrdma_frmr *frmr = &mw->r.frmr;
struct rpcrdma_frmr *frmr = &mw->frmr;
struct ib_send_wr *invalidate_wr, *bad_wr;
int rc, nsegs = seg->mr_nsegs;
......@@ -557,10 +599,11 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
seg1->rl_mw = NULL;
frmr->fr_state = FRMR_IS_INVALID;
invalidate_wr = &mw->r.frmr.fr_invwr;
invalidate_wr = &mw->frmr.fr_invwr;
memset(invalidate_wr, 0, sizeof(*invalidate_wr));
invalidate_wr->wr_id = (uintptr_t)mw;
frmr->fr_cqe.done = frwr_wc_localinv;
invalidate_wr->wr_cqe = &frmr->fr_cqe;
invalidate_wr->opcode = IB_WR_LOCAL_INV;
invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);
......
......@@ -68,7 +68,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
seg->mr_rkey = ia->ri_dma_mr->rkey;
seg->mr_base = seg->mr_dma;
seg->mr_nsegs = 1;
return 1;
}
......
......@@ -132,6 +132,33 @@ rpcrdma_tail_pullup(struct xdr_buf *buf)
return tlen;
}
/* Split "vec" on page boundaries into segments. FMR registers pages,
* not a byte range. Other modes coalesce these segments into a single
* MR when they can.
*/
static int
rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
int n, int nsegs)
{
size_t page_offset;
u32 remaining;
char *base;
base = vec->iov_base;
page_offset = offset_in_page(base);
remaining = vec->iov_len;
while (remaining && n < nsegs) {
seg[n].mr_page = NULL;
seg[n].mr_offset = base;
seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
remaining -= seg[n].mr_len;
base += seg[n].mr_len;
++n;
page_offset = 0;
}
return n;
}
/*
* Chunk assembly from upper layer xdr_buf.
*
......@@ -150,11 +177,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
int page_base;
struct page **ppages;
if (pos == 0 && xdrbuf->head[0].iov_len) {
seg[n].mr_page = NULL;
seg[n].mr_offset = xdrbuf->head[0].iov_base;
seg[n].mr_len = xdrbuf->head[0].iov_len;
++n;
if (pos == 0) {
n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs);
if (n == nsegs)
return -EIO;
}
len = xdrbuf->page_len;
......@@ -192,13 +218,9 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
* xdr pad bytes, saving the server an RDMA operation. */
if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
return n;
n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs);
if (n == nsegs)
/* Tail remains, but we're out of segments */
return -EIO;
seg[n].mr_page = NULL;
seg[n].mr_offset = xdrbuf->tail[0].iov_base;
seg[n].mr_len = xdrbuf->tail[0].iov_len;
++n;
}
return n;
......@@ -773,20 +795,17 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
int rdmalen, status;
int rdmalen, status, rmerr;
unsigned long cwnd;
u32 credits;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
if (rep->rr_len == RPCRDMA_BAD_LEN)
goto out_badstatus;
if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
goto out_shortreply;
headerp = rdmab_to_msg(rep->rr_rdmabuf);
if (headerp->rm_vers != rpcrdma_version)
goto out_badversion;
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
if (rpcrdma_is_bcall(headerp))
goto out_bcall;
......@@ -809,15 +828,16 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
*/
list_del_init(&rqst->rq_list);
spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
" RPC request 0x%p xid 0x%08x\n",
__func__, rep, req, rqst,
be32_to_cpu(headerp->rm_xid));
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
__func__, rep, req, be32_to_cpu(headerp->rm_xid));
/* from here on, the reply is no longer an orphan */
req->rl_reply = rep;
xprt->reestablish_timeout = 0;
if (headerp->rm_vers != rpcrdma_version)
goto out_badversion;
/* check for expected message types */
/* The order of some of these tests is important. */
switch (headerp->rm_type) {
......@@ -878,6 +898,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
status = rdmalen;
break;
case rdma_error:
goto out_rdmaerr;
badheader:
default:
dprintk("%s: invalid rpcrdma reply header (type %d):"
......@@ -893,6 +916,7 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
break;
}
out:
/* Invalidate and flush the data payloads before waking the
* waiting application. This guarantees the memory region is
* properly fenced from the server before the application
......@@ -903,15 +927,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
if (req->rl_nchunks)
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req);
credits = be32_to_cpu(headerp->rm_credit);
if (credits == 0)
credits = 1; /* don't deadlock */
else if (credits > r_xprt->rx_buf.rb_max_requests)
credits = r_xprt->rx_buf.rb_max_requests;
spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd;
xprt->cwnd = credits << RPC_CWNDSHIFT;
xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
......@@ -935,13 +953,43 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
return;
#endif
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
/* If the incoming reply terminated a pending RPC, the next
* RPC call will post a replacement receive buffer as it is
* being marshaled.
*/
out_badversion:
dprintk("RPC: %s: invalid version %d\n",
__func__, be32_to_cpu(headerp->rm_vers));
status = -EIO;
r_xprt->rx_stats.bad_reply_count++;
goto out;
out_rdmaerr:
rmerr = be32_to_cpu(headerp->rm_body.rm_error.rm_err);
switch (rmerr) {
case ERR_VERS:
pr_err("%s: server reports header version error (%u-%u)\n",
__func__,
be32_to_cpu(headerp->rm_body.rm_error.rm_vers_low),
be32_to_cpu(headerp->rm_body.rm_error.rm_vers_high));
break;
case ERR_CHUNK:
pr_err("%s: server reports header decoding error\n",
__func__);
break;
default:
pr_err("%s: server reports unknown error %d\n",
__func__, rmerr);
}
status = -EREMOTEIO;
r_xprt->rx_stats.bad_reply_count++;
goto out;
/* If no pending RPC transaction was matched, post a replacement
* receive buffer before returning.
*/
out_shortreply:
dprintk("RPC: %s: short/invalid reply\n", __func__);
goto repost;
out_nomatch:
......
......@@ -112,89 +112,65 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
}
}
/**
* rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
*/
static void
rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
struct rpcrdma_ep *ep = context;
pr_err("RPC: %s: %s on device %s ep %p\n",
__func__, ib_event_msg(event->event),
event->device->name, context);
if (ep->rep_connected == 1) {
ep->rep_connected = -EIO;
rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
}
/* WARNING: Only wr_cqe and status are reliable at this point */
if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
ib_wc_status_msg(wc->status),
wc->status, wc->vendor_err);
}
static void
rpcrdma_sendcq_process_wc(struct ib_wc *wc)
rpcrdma_receive_worker(struct work_struct *work)
{
/* WARNING: Only wr_id and status are reliable at this point */
if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
if (wc->status != IB_WC_SUCCESS &&
wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: SEND: %s\n",
__func__, ib_wc_status_msg(wc->status));
} else {
struct rpcrdma_mw *r;
struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work);
r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
r->mw_sendcompletion(wc);
}
rpcrdma_reply_handler(rep);
}
/* The common case is a single send completion is waiting. By
* passing two WC entries to ib_poll_cq, a return code of 1
* means there is exactly one WC waiting and no more. We don't
* have to invoke ib_poll_cq again to know that the CQ has been
* properly drained.
/* Perform basic sanity checking to avoid using garbage
* to update the credit grant value.
*/
static void
rpcrdma_sendcq_poll(struct ib_cq *cq)
rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
{
struct ib_wc *pos, wcs[2];
int count, rc;
struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
u32 credits;
do {
pos = wcs;
if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
return;
rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
if (rc < 0)
break;
credits = be32_to_cpu(rmsgp->rm_credit);
if (credits == 0)
credits = 1; /* don't deadlock */
else if (credits > buffer->rb_max_requests)
credits = buffer->rb_max_requests;
count = rc;
while (count-- > 0)
rpcrdma_sendcq_process_wc(pos++);
} while (rc == ARRAY_SIZE(wcs));
return;
atomic_set(&buffer->rb_credits, credits);
}
/* Handle provider send completion upcalls.
/**
* rpcrdma_receive_wc - Invoked by RDMA provider for each polled Receive WC
* @cq: completion queue (ignored)
* @wc: completed WR
*
*/
static void
rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
rpcrdma_receive_wc(struct ib_cq *cq, struct ib_wc *wc)
{
do {
rpcrdma_sendcq_poll(cq);
} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
IB_CQ_REPORT_MISSED_EVENTS) > 0);
}
static void
rpcrdma_receive_worker(struct work_struct *work)
{
struct rpcrdma_rep *rep =
container_of(work, struct rpcrdma_rep, rr_work);
rpcrdma_reply_handler(rep);
}
static void
rpcrdma_recvcq_process_wc(struct ib_wc *wc)
{
struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
struct ib_cqe *cqe = wc->wr_cqe;
struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
rr_cqe);
/* WARNING: Only wr_id and status are reliable at this point */
if (wc->status != IB_WC_SUCCESS)
......@@ -211,7 +187,8 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
ib_dma_sync_single_for_cpu(rep->rr_device,
rdmab_addr(rep->rr_rdmabuf),
rep->rr_len, DMA_FROM_DEVICE);
prefetch(rdmab_to_msg(rep->rr_rdmabuf));
rpcrdma_update_granted_credits(rep);
out_schedule:
queue_work(rpcrdma_receive_wq, &rep->rr_work);
......@@ -219,57 +196,20 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: rep %p: %s\n",
__func__, rep, ib_wc_status_msg(wc->status));
pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
ib_wc_status_msg(wc->status),
wc->status, wc->vendor_err);
rep->rr_len = RPCRDMA_BAD_LEN;
goto out_schedule;
}
/* The wc array is on stack: automatic memory is always CPU-local.
*
* struct ib_wc is 64 bytes, making the poll array potentially
* large. But this is at the bottom of the call chain. Further
* substantial work is done in another thread.
*/
static void
rpcrdma_recvcq_poll(struct ib_cq *cq)
{
struct ib_wc *pos, wcs[4];
int count, rc;
do {
pos = wcs;
rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
if (rc < 0)
break;
count = rc;
while (count-- > 0)
rpcrdma_recvcq_process_wc(pos++);
} while (rc == ARRAY_SIZE(wcs));
}
/* Handle provider receive completion upcalls.
*/
static void
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
{
do {
rpcrdma_recvcq_poll(cq);
} while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
IB_CQ_REPORT_MISSED_EVENTS) > 0);
}
static void
rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
{
struct ib_wc wc;
while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
rpcrdma_recvcq_process_wc(&wc);
while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
rpcrdma_sendcq_process_wc(&wc);
rpcrdma_receive_wc(NULL, &wc);
}
static int
......@@ -330,6 +270,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
connected:
dprintk("RPC: %s: %sconnected\n",
__func__, connstate > 0 ? "" : "dis");
atomic_set(&xprt->rx_buf.rb_credits, 1);
ep->rep_connected = connstate;
rpcrdma_conn_func(ep);
wake_up_all(&ep->rep_connect_wait);
......@@ -560,9 +501,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct rpcrdma_create_data_internal *cdata)
{
struct ib_cq *sendcq, *recvcq;
struct ib_cq_init_attr cq_attr = {};
unsigned int max_qp_wr;
int rc, err;
int rc;
if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) {
dprintk("RPC: %s: insufficient sge's available\n",
......@@ -614,9 +554,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
sendcq = ib_alloc_cq(ia->ri_device, NULL,
ep->rep_attr.cap.max_send_wr + 1,
0, IB_POLL_SOFTIRQ);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
......@@ -624,16 +564,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out1;
}
rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
if (rc) {
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
__func__, rc);
goto out2;
}
cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
recvcq = ib_alloc_cq(ia->ri_device, NULL,
ep->rep_attr.cap.max_recv_wr + 1,
0, IB_POLL_SOFTIRQ);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
......@@ -641,14 +574,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out2;
}
rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
if (rc) {
dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
__func__, rc);
ib_destroy_cq(recvcq);
goto out2;
}
ep->rep_attr.send_cq = sendcq;
ep->rep_attr.recv_cq = recvcq;
......@@ -673,10 +598,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
return 0;
out2:
err = ib_destroy_cq(sendcq);
if (err)
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
__func__, err);
ib_free_cq(sendcq);
out1:
if (ia->ri_dma_mr)
ib_dereg_mr(ia->ri_dma_mr);
......@@ -711,15 +633,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ia->ri_id->qp = NULL;
}
rc = ib_destroy_cq(ep->rep_attr.recv_cq);
if (rc)
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
__func__, rc);
rc = ib_destroy_cq(ep->rep_attr.send_cq);
if (rc)
dprintk("RPC: %s: ib_destroy_cq returned %i\n",
__func__, rc);
ib_free_cq(ep->rep_attr.recv_cq);
ib_free_cq(ep->rep_attr.send_cq);
if (ia->ri_dma_mr) {
rc = ib_dereg_mr(ia->ri_dma_mr);
......@@ -898,6 +813,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
spin_lock(&buffer->rb_reqslock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_reqslock);
req->rl_cqe.done = rpcrdma_wc_send;
req->rl_buffer = &r_xprt->rx_buf;
return req;
}
......@@ -923,6 +839,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
}
rep->rr_device = ia->ri_device;
rep->rr_cqe.done = rpcrdma_receive_wc;
rep->rr_rxprt = r_xprt;
INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
return rep;
......@@ -943,6 +860,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);
atomic_set(&buf->rb_credits, 1);
rc = ia->ri_ops->ro_init(r_xprt);
if (rc)
......@@ -1259,7 +1177,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
}
send_wr.next = NULL;
send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
send_wr.wr_cqe = &req->rl_cqe;
send_wr.sg_list = iov;
send_wr.num_sge = req->rl_niovs;
send_wr.opcode = IB_WR_SEND;
......@@ -1297,7 +1215,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
int rc;
recv_wr.next = NULL;
recv_wr.wr_id = (u64) (unsigned long) rep;
recv_wr.wr_cqe = &rep->rr_cqe;
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;
......
......@@ -95,10 +95,6 @@ struct rpcrdma_ep {
#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
/* Force completion handler to ignore the signal
*/
#define RPCRDMA_IGNORE_COMPLETION (0ULL)
/* Pre-allocate extra Work Requests for handling backward receives
* and sends. This is a fixed value because the Work Queues are
* allocated when the forward channel is set up.
......@@ -171,6 +167,7 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb)
struct rpcrdma_buffer;
struct rpcrdma_rep {
struct ib_cqe rr_cqe;
unsigned int rr_len;
struct ib_device *rr_device;
struct rpcrdma_xprt *rr_rxprt;
......@@ -204,11 +201,11 @@ struct rpcrdma_frmr {
struct scatterlist *sg;
int sg_nents;
struct ib_mr *fr_mr;
struct ib_cqe fr_cqe;
enum rpcrdma_frmr_state fr_state;
struct completion fr_linv_done;
struct work_struct fr_work;
struct rpcrdma_xprt *fr_xprt;
bool fr_waiter;
struct completion fr_linv_done;;
union {
struct ib_reg_wr fr_regwr;
struct ib_send_wr fr_invwr;
......@@ -224,8 +221,7 @@ struct rpcrdma_mw {
union {
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
} r;
void (*mw_sendcompletion)(struct ib_wc *);
};
struct list_head mw_list;
struct list_head mw_all;
};
......@@ -281,6 +277,7 @@ struct rpcrdma_req {
struct rpcrdma_regbuf *rl_sendbuf;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
struct ib_cqe rl_cqe;
struct list_head rl_all;
bool rl_backchannel;
};
......@@ -311,6 +308,7 @@ struct rpcrdma_buffer {
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
u32 rb_max_requests;
atomic_t rb_credits; /* most recent credit grant */
u32 rb_bc_srv_max_requests;
spinlock_t rb_reqslock; /* protect rb_allreqs */
......
......@@ -1844,9 +1844,7 @@ static int xs_bind(struct sock_xprt *transport, struct socket *sock)
*/
static void xs_local_rpcbind(struct rpc_task *task)
{
rcu_read_lock();
xprt_set_bound(rcu_dereference(task->tk_client->cl_xprt));
rcu_read_unlock();
xprt_set_bound(task->tk_xprt);
}
static void xs_local_set_port(struct rpc_xprt *xprt, unsigned short port)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment