Commit 68432a03 authored by J. Bruce Fields's avatar J. Bruce Fields

Merge branch 'from-tomtucker' into for-2.6.26

parents d71a4dd7 a6f911c0
...@@ -71,7 +71,8 @@ extern atomic_t rdma_stat_sq_prod; ...@@ -71,7 +71,8 @@ extern atomic_t rdma_stat_sq_prod;
* completes. * completes.
*/ */
struct svc_rdma_op_ctxt { struct svc_rdma_op_ctxt {
struct svc_rdma_op_ctxt *next; struct svc_rdma_op_ctxt *read_hdr;
struct list_head free_list;
struct xdr_buf arg; struct xdr_buf arg;
struct list_head dto_q; struct list_head dto_q;
enum ib_wr_opcode wr_op; enum ib_wr_opcode wr_op;
...@@ -85,7 +86,6 @@ struct svc_rdma_op_ctxt { ...@@ -85,7 +86,6 @@ struct svc_rdma_op_ctxt {
struct page *pages[RPCSVC_MAXPAGES]; struct page *pages[RPCSVC_MAXPAGES];
}; };
#define RDMACTXT_F_READ_DONE 1
#define RDMACTXT_F_LAST_CTXT 2 #define RDMACTXT_F_LAST_CTXT 2
struct svcxprt_rdma { struct svcxprt_rdma {
...@@ -104,7 +104,8 @@ struct svcxprt_rdma { ...@@ -104,7 +104,8 @@ struct svcxprt_rdma {
struct ib_pd *sc_pd; struct ib_pd *sc_pd;
struct svc_rdma_op_ctxt *sc_ctxt_head; atomic_t sc_ctxt_used;
struct list_head sc_ctxt_free;
int sc_ctxt_cnt; int sc_ctxt_cnt;
int sc_ctxt_bump; int sc_ctxt_bump;
int sc_ctxt_max; int sc_ctxt_max;
...@@ -123,6 +124,7 @@ struct svcxprt_rdma { ...@@ -123,6 +124,7 @@ struct svcxprt_rdma {
struct list_head sc_dto_q; /* DTO tasklet I/O pending Q */ struct list_head sc_dto_q; /* DTO tasklet I/O pending Q */
struct list_head sc_read_complete_q; struct list_head sc_read_complete_q;
spinlock_t sc_read_complete_lock; spinlock_t sc_read_complete_lock;
struct work_struct sc_work;
}; };
/* sc_flags */ /* sc_flags */
#define RDMAXPRT_RQ_PENDING 1 #define RDMAXPRT_RQ_PENDING 1
...@@ -164,7 +166,7 @@ extern int svc_rdma_sendto(struct svc_rqst *); ...@@ -164,7 +166,7 @@ extern int svc_rdma_sendto(struct svc_rqst *);
/* svc_rdma_transport.c */ /* svc_rdma_transport.c */
extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *); extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
extern int svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *, extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
enum rpcrdma_errcode); enum rpcrdma_errcode);
struct page *svc_rdma_get_page(void); struct page *svc_rdma_get_page(void);
extern int svc_rdma_post_recv(struct svcxprt_rdma *); extern int svc_rdma_post_recv(struct svcxprt_rdma *);
......
...@@ -6,30 +6,9 @@ ...@@ -6,30 +6,9 @@
#include <linux/sched.h> #include <linux/sched.h>
#include <linux/errno.h> #include <linux/errno.h>
#include <linux/fcntl.h>
#include <linux/net.h>
#include <linux/in.h>
#include <linux/inet.h>
#include <linux/udp.h>
#include <linux/tcp.h>
#include <linux/unistd.h>
#include <linux/slab.h>
#include <linux/netdevice.h>
#include <linux/skbuff.h>
#include <linux/file.h>
#include <linux/freezer.h> #include <linux/freezer.h>
#include <linux/kthread.h> #include <linux/kthread.h>
#include <net/sock.h> #include <net/sock.h>
#include <net/checksum.h>
#include <net/ip.h>
#include <net/ipv6.h>
#include <net/tcp_states.h>
#include <linux/uaccess.h>
#include <asm/ioctls.h>
#include <linux/sunrpc/types.h>
#include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/stats.h> #include <linux/sunrpc/stats.h>
#include <linux/sunrpc/svc_xprt.h> #include <linux/sunrpc/svc_xprt.h>
...@@ -296,8 +275,6 @@ void svc_xprt_enqueue(struct svc_xprt *xprt) ...@@ -296,8 +275,6 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
if (!(xprt->xpt_flags & if (!(xprt->xpt_flags &
((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED))))
return; return;
if (test_bit(XPT_DEAD, &xprt->xpt_flags))
return;
cpu = get_cpu(); cpu = get_cpu();
pool = svc_pool_for_cpu(xprt->xpt_server, cpu); pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
......
...@@ -260,11 +260,16 @@ static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) ...@@ -260,11 +260,16 @@ static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
* On our side, we need to read into a pagelist. The first page immediately * On our side, we need to read into a pagelist. The first page immediately
* follows the RPC header. * follows the RPC header.
* *
* This function returns 1 to indicate success. The data is not yet in * This function returns:
* 0 - No error and no read-list found.
*
* 1 - Successful read-list processing. The data is not yet in
* the pagelist and therefore the RPC request must be deferred. The * the pagelist and therefore the RPC request must be deferred. The
* I/O completion will enqueue the transport again and * I/O completion will enqueue the transport again and
* svc_rdma_recvfrom will complete the request. * svc_rdma_recvfrom will complete the request.
* *
* <0 - Error processing/posting read-list.
*
* NOTE: The ctxt must not be touched after the last WR has been posted * NOTE: The ctxt must not be touched after the last WR has been posted
* because the I/O completion processing may occur on another * because the I/O completion processing may occur on another
* processor and free / modify the context. Ne touche pas! * processor and free / modify the context. Ne touche pas!
...@@ -284,7 +289,6 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt, ...@@ -284,7 +289,6 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
u64 sgl_offset; u64 sgl_offset;
struct rpcrdma_read_chunk *ch; struct rpcrdma_read_chunk *ch;
struct svc_rdma_op_ctxt *ctxt = NULL; struct svc_rdma_op_ctxt *ctxt = NULL;
struct svc_rdma_op_ctxt *head;
struct svc_rdma_op_ctxt *tmp_sge_ctxt; struct svc_rdma_op_ctxt *tmp_sge_ctxt;
struct svc_rdma_op_ctxt *tmp_ch_ctxt; struct svc_rdma_op_ctxt *tmp_ch_ctxt;
struct chunk_sge *ch_sge_ary; struct chunk_sge *ch_sge_ary;
...@@ -302,25 +306,19 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt, ...@@ -302,25 +306,19 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge; ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge;
svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count); svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
if (ch_count > RPCSVC_MAXPAGES)
return -EINVAL;
sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp, sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
sge, ch_sge_ary, sge, ch_sge_ary,
ch_count, byte_count); ch_count, byte_count);
head = svc_rdma_get_context(xprt);
sgl_offset = 0; sgl_offset = 0;
ch_no = 0; ch_no = 0;
for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
ch->rc_discrim != 0; ch++, ch_no++) { ch->rc_discrim != 0; ch++, ch_no++) {
next_sge: next_sge:
if (!ctxt) ctxt = svc_rdma_get_context(xprt);
ctxt = head;
else {
ctxt->next = svc_rdma_get_context(xprt);
ctxt = ctxt->next;
}
ctxt->next = NULL;
ctxt->direction = DMA_FROM_DEVICE; ctxt->direction = DMA_FROM_DEVICE;
clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
/* Prepare READ WR */ /* Prepare READ WR */
...@@ -347,20 +345,15 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt, ...@@ -347,20 +345,15 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
* the client and the RPC needs to be enqueued. * the client and the RPC needs to be enqueued.
*/ */
set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
ctxt->next = hdr_ctxt; ctxt->read_hdr = hdr_ctxt;
hdr_ctxt->next = head;
} }
/* Post the read */ /* Post the read */
err = svc_rdma_send(xprt, &read_wr); err = svc_rdma_send(xprt, &read_wr);
if (err) { if (err) {
printk(KERN_ERR "svcrdma: Error posting send = %d\n", printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
err); err);
/* set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
* Break the circular list so free knows when svc_rdma_put_context(ctxt, 0);
* to stop if the error happened to occur on
* the last read
*/
ctxt->next = NULL;
goto out; goto out;
} }
atomic_inc(&rdma_stat_read); atomic_inc(&rdma_stat_read);
...@@ -371,7 +364,7 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt, ...@@ -371,7 +364,7 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
goto next_sge; goto next_sge;
} }
sgl_offset = 0; sgl_offset = 0;
err = 0; err = 1;
} }
out: out:
...@@ -389,25 +382,12 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt, ...@@ -389,25 +382,12 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
while (rqstp->rq_resused) while (rqstp->rq_resused)
rqstp->rq_respages[--rqstp->rq_resused] = NULL; rqstp->rq_respages[--rqstp->rq_resused] = NULL;
if (err) { return err;
printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err);
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
/* Free the linked list of read contexts */
while (head != NULL) {
ctxt = head->next;
svc_rdma_put_context(head, 1);
head = ctxt;
}
return 0;
}
return 1;
} }
static int rdma_read_complete(struct svc_rqst *rqstp, static int rdma_read_complete(struct svc_rqst *rqstp,
struct svc_rdma_op_ctxt *data) struct svc_rdma_op_ctxt *head)
{ {
struct svc_rdma_op_ctxt *head = data->next;
int page_no; int page_no;
int ret; int ret;
...@@ -433,21 +413,12 @@ static int rdma_read_complete(struct svc_rqst *rqstp, ...@@ -433,21 +413,12 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
rqstp->rq_arg.len = head->arg.len; rqstp->rq_arg.len = head->arg.len;
rqstp->rq_arg.buflen = head->arg.buflen; rqstp->rq_arg.buflen = head->arg.buflen;
/* Free the context */
svc_rdma_put_context(head, 0);
/* XXX: What should this be? */ /* XXX: What should this be? */
rqstp->rq_prot = IPPROTO_MAX; rqstp->rq_prot = IPPROTO_MAX;
svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
/*
* Free the contexts we used to build the RDMA_READ. We have
* to be careful here because the context list uses the same
* next pointer used to chain the contexts associated with the
* RDMA_READ
*/
data->next = NULL; /* terminate circular list */
do {
data = head->next;
svc_rdma_put_context(head, 0);
head = data;
} while (head != NULL);
ret = rqstp->rq_arg.head[0].iov_len ret = rqstp->rq_arg.head[0].iov_len
+ rqstp->rq_arg.page_len + rqstp->rq_arg.page_len
...@@ -457,8 +428,6 @@ static int rdma_read_complete(struct svc_rqst *rqstp, ...@@ -457,8 +428,6 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
rqstp->rq_arg.head[0].iov_len); rqstp->rq_arg.head[0].iov_len);
/* Indicate that we've consumed an RQ credit */
rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
svc_xprt_received(rqstp->rq_xprt); svc_xprt_received(rqstp->rq_xprt);
return ret; return ret;
} }
...@@ -480,13 +449,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) ...@@ -480,13 +449,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
dprintk("svcrdma: rqstp=%p\n", rqstp); dprintk("svcrdma: rqstp=%p\n", rqstp);
/*
* The rq_xprt_ctxt indicates if we've consumed an RQ credit
* or not. It is used in the rdma xpo_release_rqst function to
* determine whether or not to return an RQ WQE to the RQ.
*/
rqstp->rq_xprt_ctxt = NULL;
spin_lock_bh(&rdma_xprt->sc_read_complete_lock); spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
if (!list_empty(&rdma_xprt->sc_read_complete_q)) { if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
ctxt = list_entry(rdma_xprt->sc_read_complete_q.next, ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
...@@ -537,21 +499,22 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) ...@@ -537,21 +499,22 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
/* If the request is invalid, reply with an error */ /* If the request is invalid, reply with an error */
if (len < 0) { if (len < 0) {
if (len == -ENOSYS) if (len == -ENOSYS)
(void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS); svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
goto close_out; goto close_out;
} }
/* Read read-list data. If we would need to wait, defer /* Read read-list data. */
* it. Not that in this case, we don't return the RQ credit ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
* until after the read completes. if (ret > 0) {
*/ /* read-list posted, defer until data received from client. */
if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) {
svc_xprt_received(xprt); svc_xprt_received(xprt);
return 0; return 0;
} }
if (ret < 0) {
/* Indicate we've consumed an RQ credit */ /* Post of read-list failed, free context. */
rqstp->rq_xprt_ctxt = rqstp->rq_xprt; svc_rdma_put_context(ctxt, 1);
return 0;
}
ret = rqstp->rq_arg.head[0].iov_len ret = rqstp->rq_arg.head[0].iov_len
+ rqstp->rq_arg.page_len + rqstp->rq_arg.page_len
...@@ -569,11 +532,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) ...@@ -569,11 +532,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
return ret; return ret;
close_out: close_out:
if (ctxt) { if (ctxt)
svc_rdma_put_context(ctxt, 1); svc_rdma_put_context(ctxt, 1);
/* Indicate we've consumed an RQ credit */
rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
}
dprintk("svcrdma: transport %p is closing\n", xprt); dprintk("svcrdma: transport %p is closing\n", xprt);
/* /*
* Set the close bit and enqueue it. svc_recv will see the * Set the close bit and enqueue it. svc_recv will see the
......
...@@ -389,6 +389,17 @@ static int send_reply(struct svcxprt_rdma *rdma, ...@@ -389,6 +389,17 @@ static int send_reply(struct svcxprt_rdma *rdma,
int page_no; int page_no;
int ret; int ret;
/* Post a recv buffer to handle another request. */
ret = svc_rdma_post_recv(rdma);
if (ret) {
printk(KERN_INFO
"svcrdma: could not post a receive buffer, err=%d."
"Closing transport %p.\n", ret, rdma);
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_rdma_put_context(ctxt, 0);
return -ENOTCONN;
}
/* Prepare the context */ /* Prepare the context */
ctxt->pages[0] = page; ctxt->pages[0] = page;
ctxt->count = 1; ctxt->count = 1;
......
...@@ -103,8 +103,8 @@ static int rdma_bump_context_cache(struct svcxprt_rdma *xprt) ...@@ -103,8 +103,8 @@ static int rdma_bump_context_cache(struct svcxprt_rdma *xprt)
spin_lock_bh(&xprt->sc_ctxt_lock); spin_lock_bh(&xprt->sc_ctxt_lock);
if (ctxt) { if (ctxt) {
at_least_one = 1; at_least_one = 1;
ctxt->next = xprt->sc_ctxt_head; INIT_LIST_HEAD(&ctxt->free_list);
xprt->sc_ctxt_head = ctxt; list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
} else { } else {
/* kmalloc failed...give up for now */ /* kmalloc failed...give up for now */
xprt->sc_ctxt_cnt--; xprt->sc_ctxt_cnt--;
...@@ -123,7 +123,7 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) ...@@ -123,7 +123,7 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
while (1) { while (1) {
spin_lock_bh(&xprt->sc_ctxt_lock); spin_lock_bh(&xprt->sc_ctxt_lock);
if (unlikely(xprt->sc_ctxt_head == NULL)) { if (unlikely(list_empty(&xprt->sc_ctxt_free))) {
/* Try to bump my cache. */ /* Try to bump my cache. */
spin_unlock_bh(&xprt->sc_ctxt_lock); spin_unlock_bh(&xprt->sc_ctxt_lock);
...@@ -136,12 +136,15 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) ...@@ -136,12 +136,15 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
schedule_timeout_uninterruptible(msecs_to_jiffies(500)); schedule_timeout_uninterruptible(msecs_to_jiffies(500));
continue; continue;
} }
ctxt = xprt->sc_ctxt_head; ctxt = list_entry(xprt->sc_ctxt_free.next,
xprt->sc_ctxt_head = ctxt->next; struct svc_rdma_op_ctxt,
free_list);
list_del_init(&ctxt->free_list);
spin_unlock_bh(&xprt->sc_ctxt_lock); spin_unlock_bh(&xprt->sc_ctxt_lock);
ctxt->xprt = xprt; ctxt->xprt = xprt;
INIT_LIST_HEAD(&ctxt->dto_q); INIT_LIST_HEAD(&ctxt->dto_q);
ctxt->count = 0; ctxt->count = 0;
atomic_inc(&xprt->sc_ctxt_used);
break; break;
} }
return ctxt; return ctxt;
...@@ -159,14 +162,15 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) ...@@ -159,14 +162,15 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
put_page(ctxt->pages[i]); put_page(ctxt->pages[i]);
for (i = 0; i < ctxt->count; i++) for (i = 0; i < ctxt->count; i++)
dma_unmap_single(xprt->sc_cm_id->device->dma_device, ib_dma_unmap_single(xprt->sc_cm_id->device,
ctxt->sge[i].addr, ctxt->sge[i].addr,
ctxt->sge[i].length, ctxt->sge[i].length,
ctxt->direction); ctxt->direction);
spin_lock_bh(&xprt->sc_ctxt_lock); spin_lock_bh(&xprt->sc_ctxt_lock);
ctxt->next = xprt->sc_ctxt_head; list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
xprt->sc_ctxt_head = ctxt;
spin_unlock_bh(&xprt->sc_ctxt_lock); spin_unlock_bh(&xprt->sc_ctxt_lock);
atomic_dec(&xprt->sc_ctxt_used);
} }
/* ib_cq event handler */ /* ib_cq event handler */
...@@ -228,23 +232,8 @@ static void dto_tasklet_func(unsigned long data) ...@@ -228,23 +232,8 @@ static void dto_tasklet_func(unsigned long data)
list_del_init(&xprt->sc_dto_q); list_del_init(&xprt->sc_dto_q);
spin_unlock_irqrestore(&dto_lock, flags); spin_unlock_irqrestore(&dto_lock, flags);
if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
rq_cq_reap(xprt); rq_cq_reap(xprt);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
/*
* If data arrived before established event,
* don't enqueue. This defers RPC I/O until the
* RDMA connection is complete.
*/
if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
svc_xprt_enqueue(&xprt->sc_xprt);
}
if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
sq_cq_reap(xprt); sq_cq_reap(xprt);
}
svc_xprt_put(&xprt->sc_xprt); svc_xprt_put(&xprt->sc_xprt);
spin_lock_irqsave(&dto_lock, flags); spin_lock_irqsave(&dto_lock, flags);
...@@ -263,6 +252,10 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context) ...@@ -263,6 +252,10 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
struct svcxprt_rdma *xprt = cq_context; struct svcxprt_rdma *xprt = cq_context;
unsigned long flags; unsigned long flags;
/* Guard against unconditional flush call for destroyed QP */
if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
return;
/* /*
* Set the bit regardless of whether or not it's on the list * Set the bit regardless of whether or not it's on the list
* because it may be on the list already due to an SQ * because it may be on the list already due to an SQ
...@@ -290,6 +283,8 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context) ...@@ -290,6 +283,8 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
* *
* Take all completing WC off the CQE and enqueue the associated DTO * Take all completing WC off the CQE and enqueue the associated DTO
* context on the dto_q for the transport. * context on the dto_q for the transport.
*
* Note that caller must hold a transport reference.
*/ */
static void rq_cq_reap(struct svcxprt_rdma *xprt) static void rq_cq_reap(struct svcxprt_rdma *xprt)
{ {
...@@ -297,29 +292,47 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt) ...@@ -297,29 +292,47 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
struct ib_wc wc; struct ib_wc wc;
struct svc_rdma_op_ctxt *ctxt = NULL; struct svc_rdma_op_ctxt *ctxt = NULL;
if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
return;
ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
atomic_inc(&rdma_stat_rq_poll); atomic_inc(&rdma_stat_rq_poll);
spin_lock_bh(&xprt->sc_rq_dto_lock);
while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
ctxt->wc_status = wc.status; ctxt->wc_status = wc.status;
ctxt->byte_len = wc.byte_len; ctxt->byte_len = wc.byte_len;
if (wc.status != IB_WC_SUCCESS) { if (wc.status != IB_WC_SUCCESS) {
/* Close the transport */ /* Close the transport */
dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
svc_rdma_put_context(ctxt, 1); svc_rdma_put_context(ctxt, 1);
svc_xprt_put(&xprt->sc_xprt);
continue; continue;
} }
spin_lock_bh(&xprt->sc_rq_dto_lock);
list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
}
spin_unlock_bh(&xprt->sc_rq_dto_lock); spin_unlock_bh(&xprt->sc_rq_dto_lock);
svc_xprt_put(&xprt->sc_xprt);
}
if (ctxt) if (ctxt)
atomic_inc(&rdma_stat_rq_prod); atomic_inc(&rdma_stat_rq_prod);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
/*
* If data arrived before established event,
* don't enqueue. This defers RPC I/O until the
* RDMA connection is complete.
*/
if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
svc_xprt_enqueue(&xprt->sc_xprt);
} }
/* /*
* Send Queue Completion Handler - potentially called on interrupt context. * Send Queue Completion Handler - potentially called on interrupt context.
*
* Note that caller must hold a transport reference.
*/ */
static void sq_cq_reap(struct svcxprt_rdma *xprt) static void sq_cq_reap(struct svcxprt_rdma *xprt)
{ {
...@@ -328,6 +341,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt) ...@@ -328,6 +341,11 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
struct ib_cq *cq = xprt->sc_sq_cq; struct ib_cq *cq = xprt->sc_sq_cq;
int ret; int ret;
if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
return;
ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
atomic_inc(&rdma_stat_sq_poll); atomic_inc(&rdma_stat_sq_poll);
while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
...@@ -349,14 +367,16 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt) ...@@ -349,14 +367,16 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
case IB_WR_RDMA_READ: case IB_WR_RDMA_READ:
if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
BUG_ON(!read_hdr);
set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
spin_lock_bh(&xprt->sc_read_complete_lock); spin_lock_bh(&xprt->sc_read_complete_lock);
list_add_tail(&ctxt->dto_q, list_add_tail(&read_hdr->dto_q,
&xprt->sc_read_complete_q); &xprt->sc_read_complete_q);
spin_unlock_bh(&xprt->sc_read_complete_lock); spin_unlock_bh(&xprt->sc_read_complete_lock);
svc_xprt_enqueue(&xprt->sc_xprt); svc_xprt_enqueue(&xprt->sc_xprt);
} }
svc_rdma_put_context(ctxt, 0);
break; break;
default: default:
...@@ -365,6 +385,7 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt) ...@@ -365,6 +385,7 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
wc.opcode, wc.status); wc.opcode, wc.status);
break; break;
} }
svc_xprt_put(&xprt->sc_xprt);
} }
if (ctxt) if (ctxt)
...@@ -376,6 +397,10 @@ static void sq_comp_handler(struct ib_cq *cq, void *cq_context) ...@@ -376,6 +397,10 @@ static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
struct svcxprt_rdma *xprt = cq_context; struct svcxprt_rdma *xprt = cq_context;
unsigned long flags; unsigned long flags;
/* Guard against unconditional flush call for destroyed QP */
if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
return;
/* /*
* Set the bit regardless of whether or not it's on the list * Set the bit regardless of whether or not it's on the list
* because it may be on the list already due to an RQ * because it may be on the list already due to an RQ
...@@ -407,28 +432,29 @@ static void create_context_cache(struct svcxprt_rdma *xprt, ...@@ -407,28 +432,29 @@ static void create_context_cache(struct svcxprt_rdma *xprt,
xprt->sc_ctxt_max = ctxt_max; xprt->sc_ctxt_max = ctxt_max;
xprt->sc_ctxt_bump = ctxt_bump; xprt->sc_ctxt_bump = ctxt_bump;
xprt->sc_ctxt_cnt = 0; xprt->sc_ctxt_cnt = 0;
xprt->sc_ctxt_head = NULL; atomic_set(&xprt->sc_ctxt_used, 0);
INIT_LIST_HEAD(&xprt->sc_ctxt_free);
for (i = 0; i < ctxt_count; i++) { for (i = 0; i < ctxt_count; i++) {
ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL); ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
if (ctxt) { if (ctxt) {
ctxt->next = xprt->sc_ctxt_head; INIT_LIST_HEAD(&ctxt->free_list);
xprt->sc_ctxt_head = ctxt; list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
xprt->sc_ctxt_cnt++; xprt->sc_ctxt_cnt++;
} }
} }
} }
static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt) static void destroy_context_cache(struct svcxprt_rdma *xprt)
{ {
struct svc_rdma_op_ctxt *next; while (!list_empty(&xprt->sc_ctxt_free)) {
if (!ctxt) struct svc_rdma_op_ctxt *ctxt;
return; ctxt = list_entry(xprt->sc_ctxt_free.next,
struct svc_rdma_op_ctxt,
do { free_list);
next = ctxt->next; list_del_init(&ctxt->free_list);
kfree(ctxt); kfree(ctxt);
ctxt = next; }
} while (next);
} }
static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
...@@ -465,7 +491,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, ...@@ -465,7 +491,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
reqs + reqs +
cma_xprt->sc_sq_depth + cma_xprt->sc_sq_depth +
RPCRDMA_MAX_THREADS + 1); /* max */ RPCRDMA_MAX_THREADS + 1); /* max */
if (!cma_xprt->sc_ctxt_head) { if (list_empty(&cma_xprt->sc_ctxt_free)) {
kfree(cma_xprt); kfree(cma_xprt);
return NULL; return NULL;
} }
...@@ -520,7 +546,12 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt) ...@@ -520,7 +546,12 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
recv_wr.num_sge = ctxt->count; recv_wr.num_sge = ctxt->count;
recv_wr.wr_id = (u64)(unsigned long)ctxt; recv_wr.wr_id = (u64)(unsigned long)ctxt;
svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
if (ret) {
svc_xprt_put(&xprt->sc_xprt);
svc_rdma_put_context(ctxt, 1);
}
return ret; return ret;
} }
...@@ -539,6 +570,7 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id) ...@@ -539,6 +570,7 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id)
{ {
struct svcxprt_rdma *listen_xprt = new_cma_id->context; struct svcxprt_rdma *listen_xprt = new_cma_id->context;
struct svcxprt_rdma *newxprt; struct svcxprt_rdma *newxprt;
struct sockaddr *sa;
/* Create a new transport */ /* Create a new transport */
newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
...@@ -551,6 +583,12 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id) ...@@ -551,6 +583,12 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id)
dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
newxprt, newxprt->sc_cm_id, listen_xprt); newxprt, newxprt->sc_cm_id, listen_xprt);
/* Set the local and remote addresses in the transport */
sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
/* /*
* Enqueue the new transport on the accept queue of the listening * Enqueue the new transport on the accept queue of the listening
* transport * transport
...@@ -627,6 +665,7 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id, ...@@ -627,6 +665,7 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id,
if (xprt) { if (xprt) {
set_bit(XPT_CLOSE, &xprt->xpt_flags); set_bit(XPT_CLOSE, &xprt->xpt_flags);
svc_xprt_enqueue(xprt); svc_xprt_enqueue(xprt);
svc_xprt_put(xprt);
} }
break; break;
case RDMA_CM_EVENT_DEVICE_REMOVAL: case RDMA_CM_EVENT_DEVICE_REMOVAL:
...@@ -661,31 +700,27 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, ...@@ -661,31 +700,27 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
cma_xprt = rdma_create_xprt(serv, 1); cma_xprt = rdma_create_xprt(serv, 1);
if (!cma_xprt) if (!cma_xprt)
return ERR_PTR(ENOMEM); return ERR_PTR(-ENOMEM);
xprt = &cma_xprt->sc_xprt; xprt = &cma_xprt->sc_xprt;
listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP); listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
if (IS_ERR(listen_id)) { if (IS_ERR(listen_id)) {
svc_xprt_put(&cma_xprt->sc_xprt); ret = PTR_ERR(listen_id);
dprintk("svcrdma: rdma_create_id failed = %ld\n", dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
PTR_ERR(listen_id)); goto err0;
return (void *)listen_id;
} }
ret = rdma_bind_addr(listen_id, sa); ret = rdma_bind_addr(listen_id, sa);
if (ret) { if (ret) {
rdma_destroy_id(listen_id);
svc_xprt_put(&cma_xprt->sc_xprt);
dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
return ERR_PTR(ret); goto err1;
} }
cma_xprt->sc_cm_id = listen_id; cma_xprt->sc_cm_id = listen_id;
ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
if (ret) { if (ret) {
rdma_destroy_id(listen_id);
svc_xprt_put(&cma_xprt->sc_xprt);
dprintk("svcrdma: rdma_listen failed = %d\n", ret); dprintk("svcrdma: rdma_listen failed = %d\n", ret);
return ERR_PTR(ret); goto err1;
} }
/* /*
...@@ -696,6 +731,12 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, ...@@ -696,6 +731,12 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
return &cma_xprt->sc_xprt; return &cma_xprt->sc_xprt;
err1:
rdma_destroy_id(listen_id);
err0:
kfree(cma_xprt);
return ERR_PTR(ret);
} }
/* /*
...@@ -716,7 +757,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -716,7 +757,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
struct rdma_conn_param conn_param; struct rdma_conn_param conn_param;
struct ib_qp_init_attr qp_attr; struct ib_qp_init_attr qp_attr;
struct ib_device_attr devattr; struct ib_device_attr devattr;
struct sockaddr *sa;
int ret; int ret;
int i; int i;
...@@ -826,7 +866,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -826,7 +866,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
newxprt->sc_sq_depth = qp_attr.cap.max_send_wr; newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
newxprt->sc_max_requests = qp_attr.cap.max_recv_wr; newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
} }
svc_xprt_get(&newxprt->sc_xprt);
newxprt->sc_qp = newxprt->sc_cm_id->qp; newxprt->sc_qp = newxprt->sc_cm_id->qp;
/* Register all of physical memory */ /* Register all of physical memory */
...@@ -850,6 +889,13 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -850,6 +889,13 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
/* Swap out the handler */ /* Swap out the handler */
newxprt->sc_cm_id->event_handler = rdma_cma_handler; newxprt->sc_cm_id->event_handler = rdma_cma_handler;
/*
* Arm the CQs for the SQ and RQ before accepting so we can't
* miss the first message
*/
ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
/* Accept Connection */ /* Accept Connection */
set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
memset(&conn_param, 0, sizeof conn_param); memset(&conn_param, 0, sizeof conn_param);
...@@ -886,58 +932,26 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -886,58 +932,26 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
newxprt->sc_max_requests, newxprt->sc_max_requests,
newxprt->sc_ord); newxprt->sc_ord);
/* Set the local and remote addresses in the transport */
sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
return &newxprt->sc_xprt; return &newxprt->sc_xprt;
errout: errout:
dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
/* Take a reference in case the DTO handler runs */ /* Take a reference in case the DTO handler runs */
svc_xprt_get(&newxprt->sc_xprt); svc_xprt_get(&newxprt->sc_xprt);
if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) { if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
ib_destroy_qp(newxprt->sc_qp); ib_destroy_qp(newxprt->sc_qp);
svc_xprt_put(&newxprt->sc_xprt);
}
rdma_destroy_id(newxprt->sc_cm_id); rdma_destroy_id(newxprt->sc_cm_id);
/* This call to put will destroy the transport */ /* This call to put will destroy the transport */
svc_xprt_put(&newxprt->sc_xprt); svc_xprt_put(&newxprt->sc_xprt);
return NULL; return NULL;
} }
/*
* Post an RQ WQE to the RQ when the rqst is being released. This
* effectively returns an RQ credit to the client. The rq_xprt_ctxt
* will be null if the request is deferred due to an RDMA_READ or the
* transport had no data ready (EAGAIN). Note that an RPC deferred in
* svc_process will still return the credit, this is because the data
* is copied and no longer consume a WQE/WC.
*/
static void svc_rdma_release_rqst(struct svc_rqst *rqstp) static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
{ {
int err;
struct svcxprt_rdma *rdma =
container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
if (rqstp->rq_xprt_ctxt) {
BUG_ON(rqstp->rq_xprt_ctxt != rdma);
err = svc_rdma_post_recv(rdma);
if (err)
dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
err);
}
rqstp->rq_xprt_ctxt = NULL;
} }
/* /*
* When connected, an svc_xprt has at least three references: * When connected, an svc_xprt has at least two references:
*
* - A reference held by the QP. We still hold that here because this
* code deletes the QP and puts the reference.
* *
* - A reference held by the cm_id between the ESTABLISHED and * - A reference held by the cm_id between the ESTABLISHED and
* DISCONNECTED events. If the remote peer disconnected first, this * DISCONNECTED events. If the remote peer disconnected first, this
...@@ -946,7 +960,7 @@ static void svc_rdma_release_rqst(struct svc_rqst *rqstp) ...@@ -946,7 +960,7 @@ static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
* - A reference held by the svc_recv code that called this function * - A reference held by the svc_recv code that called this function
* as part of close processing. * as part of close processing.
* *
* At a minimum two references should still be held. * At a minimum one references should still be held.
*/ */
static void svc_rdma_detach(struct svc_xprt *xprt) static void svc_rdma_detach(struct svc_xprt *xprt)
{ {
...@@ -956,23 +970,53 @@ static void svc_rdma_detach(struct svc_xprt *xprt) ...@@ -956,23 +970,53 @@ static void svc_rdma_detach(struct svc_xprt *xprt)
/* Disconnect and flush posted WQE */ /* Disconnect and flush posted WQE */
rdma_disconnect(rdma->sc_cm_id); rdma_disconnect(rdma->sc_cm_id);
/* Destroy the QP if present (not a listener) */
if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) {
ib_destroy_qp(rdma->sc_qp);
svc_xprt_put(xprt);
}
/* Destroy the CM ID */
rdma_destroy_id(rdma->sc_cm_id);
} }
static void svc_rdma_free(struct svc_xprt *xprt) static void __svc_rdma_free(struct work_struct *work)
{ {
struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt; struct svcxprt_rdma *rdma =
container_of(work, struct svcxprt_rdma, sc_work);
dprintk("svcrdma: svc_rdma_free(%p)\n", rdma); dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
/* We should only be called from kref_put */ /* We should only be called from kref_put */
BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0); BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
/*
* Destroy queued, but not processed read completions. Note
* that this cleanup has to be done before destroying the
* cm_id because the device ptr is needed to unmap the dma in
* svc_rdma_put_context.
*/
spin_lock_bh(&rdma->sc_read_complete_lock);
while (!list_empty(&rdma->sc_read_complete_q)) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = list_entry(rdma->sc_read_complete_q.next,
struct svc_rdma_op_ctxt,
dto_q);
list_del_init(&ctxt->dto_q);
svc_rdma_put_context(ctxt, 1);
}
spin_unlock_bh(&rdma->sc_read_complete_lock);
/* Destroy queued, but not processed recv completions */
spin_lock_bh(&rdma->sc_rq_dto_lock);
while (!list_empty(&rdma->sc_rq_dto_q)) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = list_entry(rdma->sc_rq_dto_q.next,
struct svc_rdma_op_ctxt,
dto_q);
list_del_init(&ctxt->dto_q);
svc_rdma_put_context(ctxt, 1);
}
spin_unlock_bh(&rdma->sc_rq_dto_lock);
/* Warn if we leaked a resource or under-referenced */
WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
/* Destroy the QP if present (not a listener) */
if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
ib_destroy_qp(rdma->sc_qp);
if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
ib_destroy_cq(rdma->sc_sq_cq); ib_destroy_cq(rdma->sc_sq_cq);
...@@ -985,10 +1029,21 @@ static void svc_rdma_free(struct svc_xprt *xprt) ...@@ -985,10 +1029,21 @@ static void svc_rdma_free(struct svc_xprt *xprt)
if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
ib_dealloc_pd(rdma->sc_pd); ib_dealloc_pd(rdma->sc_pd);
destroy_context_cache(rdma->sc_ctxt_head); /* Destroy the CM ID */
rdma_destroy_id(rdma->sc_cm_id);
destroy_context_cache(rdma);
kfree(rdma); kfree(rdma);
} }
static void svc_rdma_free(struct svc_xprt *xprt)
{
struct svcxprt_rdma *rdma =
container_of(xprt, struct svcxprt_rdma, sc_xprt);
INIT_WORK(&rdma->sc_work, __svc_rdma_free);
schedule_work(&rdma->sc_work);
}
static int svc_rdma_has_wspace(struct svc_xprt *xprt) static int svc_rdma_has_wspace(struct svc_xprt *xprt)
{ {
struct svcxprt_rdma *rdma = struct svcxprt_rdma *rdma =
...@@ -1018,7 +1073,7 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) ...@@ -1018,7 +1073,7 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
int ret; int ret;
if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
return 0; return -ENOTCONN;
BUG_ON(wr->send_flags != IB_SEND_SIGNALED); BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op != BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
...@@ -1029,7 +1084,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) ...@@ -1029,7 +1084,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) { if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
spin_unlock_bh(&xprt->sc_lock); spin_unlock_bh(&xprt->sc_lock);
atomic_inc(&rdma_stat_sq_starve); atomic_inc(&rdma_stat_sq_starve);
/* See if we can reap some SQ WR */
/* See if we can opportunistically reap SQ WR to make room */
sq_cq_reap(xprt); sq_cq_reap(xprt);
/* Wait until SQ WR available if SQ still full */ /* Wait until SQ WR available if SQ still full */
...@@ -1041,21 +1097,24 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) ...@@ -1041,21 +1097,24 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
continue; continue;
} }
/* Bumped used SQ WR count and post */ /* Bumped used SQ WR count and post */
svc_xprt_get(&xprt->sc_xprt);
ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
if (!ret) if (!ret)
atomic_inc(&xprt->sc_sq_count); atomic_inc(&xprt->sc_sq_count);
else else {
svc_xprt_put(&xprt->sc_xprt);
dprintk("svcrdma: failed to post SQ WR rc=%d, " dprintk("svcrdma: failed to post SQ WR rc=%d, "
"sc_sq_count=%d, sc_sq_depth=%d\n", "sc_sq_count=%d, sc_sq_depth=%d\n",
ret, atomic_read(&xprt->sc_sq_count), ret, atomic_read(&xprt->sc_sq_count),
xprt->sc_sq_depth); xprt->sc_sq_depth);
}
spin_unlock_bh(&xprt->sc_lock); spin_unlock_bh(&xprt->sc_lock);
break; break;
} }
return ret; return ret;
} }
int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
enum rpcrdma_errcode err) enum rpcrdma_errcode err)
{ {
struct ib_send_wr err_wr; struct ib_send_wr err_wr;
...@@ -1094,9 +1153,8 @@ int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, ...@@ -1094,9 +1153,8 @@ int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
/* Post It */ /* Post It */
ret = svc_rdma_send(xprt, &err_wr); ret = svc_rdma_send(xprt, &err_wr);
if (ret) { if (ret) {
dprintk("svcrdma: Error posting send = %d\n", ret); dprintk("svcrdma: Error %d posting send for protocol error\n",
ret);
svc_rdma_put_context(ctxt, 1); svc_rdma_put_context(ctxt, 1);
} }
return ret;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment