Commit 9d0b09d5 authored by Chuck Lever's avatar Chuck Lever

svcrdma: Support multiple write chunks when pulling up

When counting the number of SGEs needed to construct a Send request,
do not count result payloads. And, when copying the Reply message
into the pull-up buffer, result payloads are not to be copied to the
Send buffer.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 6911f3e1
...@@ -182,6 +182,8 @@ extern void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp, ...@@ -182,6 +182,8 @@ extern void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp,
/* svc_rdma_recvfrom.c */ /* svc_rdma_recvfrom.c */
extern void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma); extern void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma);
extern bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma); extern bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma);
extern struct svc_rdma_recv_ctxt *
svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma);
extern void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma, extern void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
struct svc_rdma_recv_ctxt *ctxt); struct svc_rdma_recv_ctxt *ctxt);
extern void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma); extern void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma);
......
...@@ -1805,20 +1805,30 @@ TRACE_EVENT(svcrdma_small_wrch_err, ...@@ -1805,20 +1805,30 @@ TRACE_EVENT(svcrdma_small_wrch_err,
TRACE_EVENT(svcrdma_send_pullup, TRACE_EVENT(svcrdma_send_pullup,
TP_PROTO( TP_PROTO(
unsigned int len const struct svc_rdma_send_ctxt *ctxt,
unsigned int msglen
), ),
TP_ARGS(len), TP_ARGS(ctxt, msglen),
TP_STRUCT__entry( TP_STRUCT__entry(
__field(unsigned int, len) __field(u32, cq_id)
__field(int, completion_id)
__field(unsigned int, hdrlen)
__field(unsigned int, msglen)
), ),
TP_fast_assign( TP_fast_assign(
__entry->len = len; __entry->cq_id = ctxt->sc_cid.ci_queue_id;
__entry->completion_id = ctxt->sc_cid.ci_completion_id;
__entry->hdrlen = ctxt->sc_hdrbuf.len,
__entry->msglen = msglen;
), ),
TP_printk("len=%u", __entry->len) TP_printk("cq_id=%u cid=%d hdr=%u msg=%u (total %u)",
__entry->cq_id, __entry->completion_id,
__entry->hdrlen, __entry->msglen,
__entry->hdrlen + __entry->msglen)
); );
TRACE_EVENT(svcrdma_send_err, TRACE_EVENT(svcrdma_send_err,
......
...@@ -74,11 +74,17 @@ void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp, ...@@ -74,11 +74,17 @@ void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp,
*/ */
static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
struct rpc_rqst *rqst, struct rpc_rqst *rqst,
struct svc_rdma_send_ctxt *ctxt) struct svc_rdma_send_ctxt *sctxt)
{ {
struct svc_rdma_recv_ctxt *rctxt;
int ret; int ret;
ret = svc_rdma_map_reply_msg(rdma, ctxt, NULL, &rqst->rq_snd_buf); rctxt = svc_rdma_recv_ctxt_get(rdma);
if (!rctxt)
return -EIO;
ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf);
svc_rdma_recv_ctxt_put(rdma, rctxt);
if (ret < 0) if (ret < 0)
return -EIO; return -EIO;
...@@ -86,8 +92,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, ...@@ -86,8 +92,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
* the rq_buffer before all retransmits are complete. * the rq_buffer before all retransmits are complete.
*/ */
get_page(virt_to_page(rqst->rq_buffer)); get_page(virt_to_page(rqst->rq_buffer));
ctxt->sc_send_wr.opcode = IB_WR_SEND; sctxt->sc_send_wr.opcode = IB_WR_SEND;
return svc_rdma_send(rdma, ctxt); return svc_rdma_send(rdma, sctxt);
} }
/* Server-side transport endpoint wants a whole page for its send /* Server-side transport endpoint wants a whole page for its send
......
...@@ -194,8 +194,13 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma) ...@@ -194,8 +194,13 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
} }
} }
static struct svc_rdma_recv_ctxt * /**
svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma) * svc_rdma_recv_ctxt_get - Allocate a recv_ctxt
* @rdma: controlling svcxprt_rdma
*
* Returns a recv_ctxt or (rarely) NULL if none are available.
*/
struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
{ {
struct svc_rdma_recv_ctxt *ctxt; struct svc_rdma_recv_ctxt *ctxt;
struct llist_node *node; struct llist_node *node;
......
...@@ -531,6 +531,45 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, ...@@ -531,6 +531,45 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
offset_in_page(base), len); offset_in_page(base), len);
} }
struct svc_rdma_pullup_data {
u8 *pd_dest;
unsigned int pd_length;
unsigned int pd_num_sges;
};
/**
* svc_rdma_xb_count_sges - Count how many SGEs will be needed
* @xdr: xdr_buf containing portion of an RPC message to transmit
* @data: pointer to arguments
*
* Returns:
* Number of SGEs needed to Send the contents of @xdr inline
*/
static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
void *data)
{
struct svc_rdma_pullup_data *args = data;
unsigned int remaining;
unsigned long offset;
if (xdr->head[0].iov_len)
++args->pd_num_sges;
offset = offset_in_page(xdr->page_base);
remaining = xdr->page_len;
while (remaining) {
++args->pd_num_sges;
remaining -= min_t(u32, PAGE_SIZE - offset, remaining);
offset = 0;
}
if (xdr->tail[0].iov_len)
++args->pd_num_sges;
args->pd_length += xdr->len;
return 0;
}
/** /**
* svc_rdma_pull_up_needed - Determine whether to use pull-up * svc_rdma_pull_up_needed - Determine whether to use pull-up
* @rdma: controlling transport * @rdma: controlling transport
...@@ -539,50 +578,71 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma, ...@@ -539,50 +578,71 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
* @xdr: xdr_buf containing RPC message to transmit * @xdr: xdr_buf containing RPC message to transmit
* *
* Returns: * Returns:
* %true if pull-up must be used * %true if pull-up must be used
* %false otherwise * %false otherwise
*/ */
static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *sctxt, const struct svc_rdma_send_ctxt *sctxt,
const struct svc_rdma_recv_ctxt *rctxt, const struct svc_rdma_recv_ctxt *rctxt,
struct xdr_buf *xdr) const struct xdr_buf *xdr)
{ {
bool write_chunk_present = rctxt && rctxt->rc_write_list; /* Resources needed for the transport header */
int elements; struct svc_rdma_pullup_data args = {
.pd_length = sctxt->sc_hdrbuf.len,
.pd_num_sges = 1,
};
int ret;
/* For small messages, copying bytes is cheaper than DMA mapping. ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
*/ svc_rdma_xb_count_sges, &args);
if (!write_chunk_present && if (ret < 0)
sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH) return false;
if (args.pd_length < RPCRDMA_PULLUP_THRESH)
return true; return true;
return args.pd_num_sges >= rdma->sc_max_send_sges;
}
/* Check whether the xdr_buf has more elements than can /**
* fit in a single RDMA Send. * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
*/ * @xdr: xdr_buf containing portion of an RPC message to copy
/* xdr->head */ * @data: pointer to arguments
elements = 1; *
* Returns:
/* xdr->pages */ * Always zero.
if (!rctxt || !rctxt->rc_write_list) { */
unsigned int remaining; static int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
unsigned long pageoff; void *data)
{
pageoff = xdr->page_base & ~PAGE_MASK; struct svc_rdma_pullup_data *args = data;
remaining = xdr->page_len; unsigned int len, remaining;
while (remaining) { unsigned long pageoff;
++elements; struct page **ppages;
remaining -= min_t(u32, PAGE_SIZE - pageoff,
remaining); if (xdr->head[0].iov_len) {
pageoff = 0; memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len);
} args->pd_dest += xdr->head[0].iov_len;
} }
/* xdr->tail */ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
if (xdr->tail[0].iov_len) pageoff = offset_in_page(xdr->page_base);
++elements; remaining = xdr->page_len;
while (remaining) {
len = min_t(u32, PAGE_SIZE - pageoff, remaining);
memcpy(args->pd_dest, page_address(*ppages) + pageoff, len);
remaining -= len;
args->pd_dest += len;
pageoff = 0;
ppages++;
}
/* assume 1 SGE is needed for the transport header */ if (xdr->tail[0].iov_len) {
return elements >= rdma->sc_max_send_sges; memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
args->pd_dest += xdr->tail[0].iov_len;
}
args->pd_length += xdr->len;
return 0;
} }
/** /**
...@@ -595,54 +655,30 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma, ...@@ -595,54 +655,30 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
* The device is not capable of sending the reply directly. * The device is not capable of sending the reply directly.
* Assemble the elements of @xdr into the transport header buffer. * Assemble the elements of @xdr into the transport header buffer.
* *
* Returns zero on success, or a negative errno on failure. * Assumptions:
* pull_up_needed has determined that @xdr will fit in the buffer.
*
* Returns:
* %0 if pull-up was successful
* %-EMSGSIZE if a buffer manipulation problem occurred
*/ */
static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma, static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *sctxt, struct svc_rdma_send_ctxt *sctxt,
const struct svc_rdma_recv_ctxt *rctxt, const struct svc_rdma_recv_ctxt *rctxt,
const struct xdr_buf *xdr) const struct xdr_buf *xdr)
{ {
unsigned char *dst, *tailbase; struct svc_rdma_pullup_data args = {
unsigned int taillen; .pd_dest = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len,
};
dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len; int ret;
memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
dst += xdr->head[0].iov_len;
tailbase = xdr->tail[0].iov_base;
taillen = xdr->tail[0].iov_len;
if (rctxt && rctxt->rc_write_list) {
u32 xdrpad;
xdrpad = xdr_pad_size(xdr->page_len);
if (taillen && xdrpad) {
tailbase += xdrpad;
taillen -= xdrpad;
}
} else {
unsigned int len, remaining;
unsigned long pageoff;
struct page **ppages;
ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
pageoff = xdr->page_base & ~PAGE_MASK;
remaining = xdr->page_len;
while (remaining) {
len = min_t(u32, PAGE_SIZE - pageoff, remaining);
memcpy(dst, page_address(*ppages) + pageoff, len);
remaining -= len;
dst += len;
pageoff = 0;
ppages++;
}
}
if (taillen) ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
memcpy(dst, tailbase, taillen); svc_rdma_xb_linearize, &args);
if (ret < 0)
return ret;
sctxt->sc_sges[0].length += xdr->len; sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length;
trace_svcrdma_send_pullup(sctxt->sc_sges[0].length); trace_svcrdma_send_pullup(sctxt, args.pd_length);
return 0; return 0;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment