Commit 451d26e1 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Pass only the list of registered MRs to ro_unmap_sync

There are rare cases where an rpcrdma_req can be re-used (via
rpcrdma_buffer_put) while the RPC reply handler is still running.
This is due to a signal firing at just the wrong instant.

Since commit 9d6b0409 ("xprtrdma: Place registered MWs on a
per-req list"), rpcrdma_mws are self-contained; ie., they fully
describe an MR and scatterlist, and no part of that information is
stored in struct rpcrdma_req.

As part of closing the above race window, pass only the req's list
of registered MRs to ro_unmap_sync, rather than the rpcrdma_req
itself.

Some extra transport header sanity checking is removed. Since the
client depends on its own recollection of what memory had been
registered, there doesn't seem to be a way to abuse this change.

And, the check was not terribly effective. If the client had sent
Read chunks, the "list_empty" test is negative in both of the
removed cases, which are actually looking for Write or Reply
chunks.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=305
Fixes: 68791649 ('xprtrdma: Invalidate in the RPC reply ... ')
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 4b196dc6
...@@ -255,24 +255,26 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, ...@@ -255,24 +255,26 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
* Sleeps until it is safe for the host CPU to access the * Sleeps until it is safe for the host CPU to access the
* previously mapped memory regions. * previously mapped memory regions.
* *
* Caller ensures that req->rl_registered is not empty. * Caller ensures that @mws is not empty before the call. This
* function empties the list.
*/ */
static void static void
fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
{ {
struct rpcrdma_mw *mw, *tmp; struct rpcrdma_mw *mw, *tmp;
LIST_HEAD(unmap_list); LIST_HEAD(unmap_list);
int rc; int rc;
dprintk("RPC: %s: req %p\n", __func__, req);
/* ORDER: Invalidate all of the req's MRs first /* ORDER: Invalidate all of the req's MRs first
* *
* ib_unmap_fmr() is slow, so use a single call instead * ib_unmap_fmr() is slow, so use a single call instead
* of one call per mapped FMR. * of one call per mapped FMR.
*/ */
list_for_each_entry(mw, &req->rl_registered, mw_list) list_for_each_entry(mw, mws, mw_list) {
dprintk("RPC: %s: unmapping fmr %p\n",
__func__, &mw->fmr);
list_add_tail(&mw->fmr.fm_mr->list, &unmap_list); list_add_tail(&mw->fmr.fm_mr->list, &unmap_list);
}
r_xprt->rx_stats.local_inv_needed++; r_xprt->rx_stats.local_inv_needed++;
rc = ib_unmap_fmr(&unmap_list); rc = ib_unmap_fmr(&unmap_list);
if (rc) if (rc)
...@@ -281,7 +283,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -281,7 +283,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* ORDER: Now DMA unmap all of the req's MRs, and return /* ORDER: Now DMA unmap all of the req's MRs, and return
* them to the free MW list. * them to the free MW list.
*/ */
list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { list_for_each_entry_safe(mw, tmp, mws, mw_list) {
list_del_init(&mw->mw_list); list_del_init(&mw->mw_list);
list_del_init(&mw->fmr.fm_mr->list); list_del_init(&mw->fmr.fm_mr->list);
ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
...@@ -294,7 +296,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -294,7 +296,7 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
out_reset: out_reset:
pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc); pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { list_for_each_entry_safe(mw, tmp, mws, mw_list) {
list_del_init(&mw->mw_list); list_del_init(&mw->mw_list);
list_del_init(&mw->fmr.fm_mr->list); list_del_init(&mw->fmr.fm_mr->list);
fmr_op_recover_mr(mw); fmr_op_recover_mr(mw);
......
...@@ -458,10 +458,11 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, ...@@ -458,10 +458,11 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
* Sleeps until it is safe for the host CPU to access the * Sleeps until it is safe for the host CPU to access the
* previously mapped memory regions. * previously mapped memory regions.
* *
* Caller ensures that req->rl_registered is not empty. * Caller ensures that @mws is not empty before the call. This
* function empties the list.
*/ */
static void static void
frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mws)
{ {
struct ib_send_wr *first, **prev, *last, *bad_wr; struct ib_send_wr *first, **prev, *last, *bad_wr;
struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_ia *ia = &r_xprt->rx_ia;
...@@ -469,9 +470,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -469,9 +470,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
struct rpcrdma_mw *mw; struct rpcrdma_mw *mw;
int count, rc; int count, rc;
dprintk("RPC: %s: req %p\n", __func__, req); /* ORDER: Invalidate all of the MRs first
/* ORDER: Invalidate all of the req's MRs first
* *
* Chain the LOCAL_INV Work Requests and post them with * Chain the LOCAL_INV Work Requests and post them with
* a single ib_post_send() call. * a single ib_post_send() call.
...@@ -479,7 +478,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -479,7 +478,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
f = NULL; f = NULL;
count = 0; count = 0;
prev = &first; prev = &first;
list_for_each_entry(mw, &req->rl_registered, mw_list) { list_for_each_entry(mw, mws, mw_list) {
mw->frmr.fr_state = FRMR_IS_INVALID; mw->frmr.fr_state = FRMR_IS_INVALID;
if (mw->mw_flags & RPCRDMA_MW_F_RI) if (mw->mw_flags & RPCRDMA_MW_F_RI)
...@@ -528,12 +527,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -528,12 +527,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
wait_for_completion(&f->fr_linv_done); wait_for_completion(&f->fr_linv_done);
/* ORDER: Now DMA unmap all of the req's MRs, and return /* ORDER: Now DMA unmap all of the MRs, and return
* them to the free MW list. * them to the free MW list.
*/ */
unmap: unmap:
while (!list_empty(&req->rl_registered)) { while (!list_empty(mws)) {
mw = rpcrdma_pop_mw(&req->rl_registered); mw = rpcrdma_pop_mw(mws);
dprintk("RPC: %s: DMA unmapping frmr %p\n", dprintk("RPC: %s: DMA unmapping frmr %p\n",
__func__, &mw->frmr); __func__, &mw->frmr);
ib_dma_unmap_sg(ia->ri_device, ib_dma_unmap_sg(ia->ri_device,
...@@ -549,7 +548,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -549,7 +548,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
/* Find and reset the MRs in the LOCAL_INV WRs that did not /* Find and reset the MRs in the LOCAL_INV WRs that did not
* get posted. This is synchronous, and slow. * get posted. This is synchronous, and slow.
*/ */
list_for_each_entry(mw, &req->rl_registered, mw_list) { list_for_each_entry(mw, mws, mw_list) {
f = &mw->frmr; f = &mw->frmr;
if (mw->mw_handle == bad_wr->ex.invalidate_rkey) { if (mw->mw_handle == bad_wr->ex.invalidate_rkey) {
__frwr_reset_mr(ia, mw); __frwr_reset_mr(ia, mw);
......
...@@ -995,6 +995,7 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -995,6 +995,7 @@ rpcrdma_reply_handler(struct work_struct *work)
__be32 *iptr; __be32 *iptr;
int rdmalen, status, rmerr; int rdmalen, status, rmerr;
unsigned long cwnd; unsigned long cwnd;
struct list_head mws;
dprintk("RPC: %s: incoming rep %p\n", __func__, rep); dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
...@@ -1024,7 +1025,8 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1024,7 +1025,8 @@ rpcrdma_reply_handler(struct work_struct *work)
/* Sanity checking has passed. We are now committed /* Sanity checking has passed. We are now committed
* to complete this transaction. * to complete this transaction.
*/ */
rpcrdma_mark_remote_invalidation(&req->rl_registered, rep); list_replace_init(&req->rl_registered, &mws);
rpcrdma_mark_remote_invalidation(&mws, rep);
list_del_init(&rqst->rq_list); list_del_init(&rqst->rq_list);
req->rl_reply = rep; req->rl_reply = rep;
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
...@@ -1042,12 +1044,9 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1042,12 +1044,9 @@ rpcrdma_reply_handler(struct work_struct *work)
case rdma_msg: case rdma_msg:
/* never expect read chunks */ /* never expect read chunks */
/* never expect reply chunks (two ways to check) */ /* never expect reply chunks (two ways to check) */
/* never expect write chunks without having offered RDMA */
if (headerp->rm_body.rm_chunks[0] != xdr_zero || if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
(headerp->rm_body.rm_chunks[1] == xdr_zero && (headerp->rm_body.rm_chunks[1] == xdr_zero &&
headerp->rm_body.rm_chunks[2] != xdr_zero) || headerp->rm_body.rm_chunks[2] != xdr_zero))
(headerp->rm_body.rm_chunks[1] != xdr_zero &&
list_empty(&req->rl_registered)))
goto badheader; goto badheader;
if (headerp->rm_body.rm_chunks[1] != xdr_zero) { if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
/* count any expected write chunks in read reply */ /* count any expected write chunks in read reply */
...@@ -1084,8 +1083,7 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1084,8 +1083,7 @@ rpcrdma_reply_handler(struct work_struct *work)
/* never expect read or write chunks, always reply chunks */ /* never expect read or write chunks, always reply chunks */
if (headerp->rm_body.rm_chunks[0] != xdr_zero || if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
headerp->rm_body.rm_chunks[1] != xdr_zero || headerp->rm_body.rm_chunks[1] != xdr_zero ||
headerp->rm_body.rm_chunks[2] != xdr_one || headerp->rm_body.rm_chunks[2] != xdr_one)
list_empty(&req->rl_registered))
goto badheader; goto badheader;
iptr = (__be32 *)((unsigned char *)headerp + iptr = (__be32 *)((unsigned char *)headerp +
RPCRDMA_HDRLEN_MIN); RPCRDMA_HDRLEN_MIN);
...@@ -1118,8 +1116,8 @@ rpcrdma_reply_handler(struct work_struct *work) ...@@ -1118,8 +1116,8 @@ rpcrdma_reply_handler(struct work_struct *work)
* control: waking the next RPC waits until this RPC has * control: waking the next RPC waits until this RPC has
* relinquished all its Send Queue entries. * relinquished all its Send Queue entries.
*/ */
if (!list_empty(&req->rl_registered)) if (!list_empty(&mws))
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
spin_lock_bh(&xprt->transport_lock); spin_lock_bh(&xprt->transport_lock);
cwnd = xprt->cwnd; cwnd = xprt->cwnd;
......
...@@ -467,7 +467,7 @@ struct rpcrdma_memreg_ops { ...@@ -467,7 +467,7 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_mr_seg *, int, bool, struct rpcrdma_mr_seg *, int, bool,
struct rpcrdma_mw **); struct rpcrdma_mw **);
void (*ro_unmap_sync)(struct rpcrdma_xprt *, void (*ro_unmap_sync)(struct rpcrdma_xprt *,
struct rpcrdma_req *); struct list_head *);
void (*ro_unmap_safe)(struct rpcrdma_xprt *, void (*ro_unmap_safe)(struct rpcrdma_xprt *,
struct rpcrdma_req *, bool); struct rpcrdma_req *, bool);
void (*ro_recover_mr)(struct rpcrdma_mw *); void (*ro_recover_mr)(struct rpcrdma_mw *);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment