Commit d7cc7397 authored by Chuck Lever's avatar Chuck Lever

svcrdma: support multiple Read chunks per RPC

An efficient way to handle multiple Read chunks is to post them all
together and then take a single completion. This is also how the
code is already structured: when the Read completion fires, all
portions of the incoming RPC message are available to be assembled.

The difficult problem is setting up the Read sink buffers so that
the server pulls the client's data into place, making subsequent
pull-up unnecessary. There are several cases:

* No Read chunks. No-op.

* One data item Read chunk. This is the fast case, where the inline
  part of the RPC-over-RDMA message becomes the head and tail, and
  the data item chunk is placed in buf->pages.

* A Position-zero Read chunk. Treated like TCP: the Read chunk is
  pulled into contiguous pages.

+ A Position-zero Read chunk with data item chunks. Treated like
  TCP: all of the Read chunks are pulled into contiguous pages.

+ Multiple data item chunks. Treated like TCP: the inline part is
  copied and the data item chunks are pulled into contiguous pages.

The "*" cases are already supported. This patch adds support for the
"+" cases.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent d96962e6
...@@ -756,7 +756,121 @@ static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info, ...@@ -756,7 +756,121 @@ static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
} }
/** /**
* svc_rdma_read_data_items - Construct RDMA Reads to pull data item Read chunks * svc_rdma_copy_inline_range - Copy part of the inline content into pages
* @info: context for RDMA Reads
* @offset: offset into the Receive buffer of region to copy
* @remaining: length of region to copy
*
* Take a page at a time from rqstp->rq_pages and copy the inline
* content from the Receive buffer into that page. Update
* info->ri_pageno and info->ri_pageoff so that the next RDMA Read
* result will land contiguously with the copied content.
*
* Return values:
* %0: Inline content was successfully copied
* %-EINVAL: offset or length was incorrect
*/
static int svc_rdma_copy_inline_range(struct svc_rdma_read_info *info,
unsigned int offset,
unsigned int remaining)
{
struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
unsigned char *dst, *src = head->rc_recv_buf;
struct svc_rqst *rqstp = info->ri_rqst;
unsigned int page_no, numpages;
numpages = PAGE_ALIGN(info->ri_pageoff + remaining) >> PAGE_SHIFT;
for (page_no = 0; page_no < numpages; page_no++) {
unsigned int page_len;
page_len = min_t(unsigned int, remaining,
PAGE_SIZE - info->ri_pageoff);
head->rc_arg.pages[info->ri_pageno] =
rqstp->rq_pages[info->ri_pageno];
if (!info->ri_pageoff)
head->rc_page_count++;
dst = page_address(head->rc_arg.pages[info->ri_pageno]);
memcpy(dst + info->ri_pageno, src + offset, page_len);
info->ri_totalbytes += page_len;
info->ri_pageoff += page_len;
if (info->ri_pageoff == PAGE_SIZE) {
info->ri_pageno++;
info->ri_pageoff = 0;
}
remaining -= page_len;
offset += page_len;
}
return -EINVAL;
}
/**
* svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
* @info: context for RDMA Reads
*
* The chunk data lands in head->rc_arg as a series of contiguous pages,
* like an incoming TCP call.
*
* Return values:
* %0: RDMA Read WQEs were successfully built
* %-EINVAL: client provided too many chunks or segments,
* %-ENOMEM: rdma_rw context pool was exhausted,
* %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
static noinline int svc_rdma_read_multiple_chunks(struct svc_rdma_read_info *info)
{
struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
struct svc_rdma_chunk *chunk, *next;
struct xdr_buf *buf = &head->rc_arg;
unsigned int start, length;
int ret;
start = 0;
chunk = pcl_first_chunk(pcl);
length = chunk->ch_position;
ret = svc_rdma_copy_inline_range(info, start, length);
if (ret < 0)
return ret;
pcl_for_each_chunk(chunk, pcl) {
ret = svc_rdma_build_read_chunk(info, chunk);
if (ret < 0)
return ret;
next = pcl_next_chunk(pcl, chunk);
if (!next)
break;
start += length;
length = next->ch_position - info->ri_totalbytes;
ret = svc_rdma_copy_inline_range(info, start, length);
if (ret < 0)
return ret;
}
start += length;
length = head->rc_byte_len - start;
ret = svc_rdma_copy_inline_range(info, start, length);
if (ret < 0)
return ret;
buf->len += info->ri_totalbytes;
buf->buflen += info->ri_totalbytes;
head->rc_hdr_count = 1;
buf->head[0].iov_base = page_address(head->rc_pages[0]);
buf->head[0].iov_len = min_t(size_t, PAGE_SIZE, info->ri_totalbytes);
buf->page_len = info->ri_totalbytes - buf->head[0].iov_len;
return 0;
}
/**
* svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
* @info: context for RDMA Reads * @info: context for RDMA Reads
* *
* The chunk data lands in the page list of head->rc_arg.pages. * The chunk data lands in the page list of head->rc_arg.pages.
...@@ -772,7 +886,7 @@ static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info, ...@@ -772,7 +886,7 @@ static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
* %-ENOTCONN: posting failed (connection is lost), * %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc). * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/ */
static int svc_rdma_read_data_items(struct svc_rdma_read_info *info) static int svc_rdma_read_data_item(struct svc_rdma_read_info *info)
{ {
struct svc_rdma_recv_ctxt *head = info->ri_readctxt; struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
struct xdr_buf *buf = &head->rc_arg; struct xdr_buf *buf = &head->rc_arg;
...@@ -780,9 +894,6 @@ static int svc_rdma_read_data_items(struct svc_rdma_read_info *info) ...@@ -780,9 +894,6 @@ static int svc_rdma_read_data_items(struct svc_rdma_read_info *info)
unsigned int length; unsigned int length;
int ret; int ret;
if (head->rc_read_pcl.cl_count > 1)
return -EINVAL;
chunk = pcl_first_chunk(&head->rc_read_pcl); chunk = pcl_first_chunk(&head->rc_read_pcl);
ret = svc_rdma_build_read_chunk(info, chunk); ret = svc_rdma_build_read_chunk(info, chunk);
if (ret < 0) if (ret < 0)
...@@ -817,6 +928,104 @@ static int svc_rdma_read_data_items(struct svc_rdma_read_info *info) ...@@ -817,6 +928,104 @@ static int svc_rdma_read_data_items(struct svc_rdma_read_info *info)
return ret; return ret;
} }
/**
* svc_rdma_read_chunk_range - Build RDMA Read WQEs for portion of a chunk
* @info: context for RDMA Reads
* @chunk: parsed Call chunk to pull
* @offset: offset of region to pull
* @length: length of region to pull
*
* Return values:
* %0: RDMA Read WQEs were successfully built
* %-EINVAL: there were not enough resources to finish
* %-ENOMEM: rdma_rw context pool was exhausted,
* %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
static int svc_rdma_read_chunk_range(struct svc_rdma_read_info *info,
const struct svc_rdma_chunk *chunk,
unsigned int offset, unsigned int length)
{
const struct svc_rdma_segment *segment;
int ret;
ret = -EINVAL;
pcl_for_each_segment(segment, chunk) {
struct svc_rdma_segment dummy;
if (offset > segment->rs_length) {
offset -= segment->rs_length;
continue;
}
dummy.rs_handle = segment->rs_handle;
dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
dummy.rs_offset = segment->rs_offset + offset;
ret = svc_rdma_build_read_segment(info, &dummy);
if (ret < 0)
break;
info->ri_totalbytes += dummy.rs_length;
length -= dummy.rs_length;
offset = 0;
}
return ret;
}
/**
* svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
* @info: context for RDMA Reads
*
* Return values:
* %0: RDMA Read WQEs were successfully built
* %-EINVAL: there were not enough resources to finish
* %-ENOMEM: rdma_rw context pool was exhausted,
* %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/
static int svc_rdma_read_call_chunk(struct svc_rdma_read_info *info)
{
struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
const struct svc_rdma_chunk *call_chunk =
pcl_first_chunk(&head->rc_call_pcl);
const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
struct svc_rdma_chunk *chunk, *next;
unsigned int start, length;
int ret;
if (pcl_is_empty(pcl))
return svc_rdma_build_read_chunk(info, call_chunk);
start = 0;
chunk = pcl_first_chunk(pcl);
length = chunk->ch_position;
ret = svc_rdma_read_chunk_range(info, call_chunk, start, length);
if (ret < 0)
return ret;
pcl_for_each_chunk(chunk, pcl) {
ret = svc_rdma_build_read_chunk(info, chunk);
if (ret < 0)
return ret;
next = pcl_next_chunk(pcl, chunk);
if (!next)
break;
start += length;
length = next->ch_position - info->ri_totalbytes;
ret = svc_rdma_read_chunk_range(info, call_chunk,
start, length);
if (ret < 0)
return ret;
}
start += length;
length = call_chunk->ch_length - start;
return svc_rdma_read_chunk_range(info, call_chunk, start, length);
}
/** /**
* svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
* @info: context for RDMA Reads * @info: context for RDMA Reads
...@@ -836,17 +1045,13 @@ static int svc_rdma_read_data_items(struct svc_rdma_read_info *info) ...@@ -836,17 +1045,13 @@ static int svc_rdma_read_data_items(struct svc_rdma_read_info *info)
* %-ENOTCONN: posting failed (connection is lost), * %-ENOTCONN: posting failed (connection is lost),
* %-EIO: rdma_rw initialization failed (DMA mapping, etc). * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
*/ */
static int svc_rdma_read_special(struct svc_rdma_read_info *info) static noinline int svc_rdma_read_special(struct svc_rdma_read_info *info)
{ {
struct svc_rdma_recv_ctxt *head = info->ri_readctxt; struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
struct xdr_buf *buf = &head->rc_arg; struct xdr_buf *buf = &head->rc_arg;
int ret; int ret;
if (head->rc_call_pcl.cl_count > 1) ret = svc_rdma_read_call_chunk(info);
return -EINVAL;
ret = svc_rdma_build_read_chunk(info,
pcl_first_chunk(&head->rc_call_pcl));
if (ret < 0) if (ret < 0)
goto out; goto out;
...@@ -935,9 +1140,12 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma, ...@@ -935,9 +1140,12 @@ int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
info->ri_pageoff = 0; info->ri_pageoff = 0;
info->ri_totalbytes = 0; info->ri_totalbytes = 0;
if (pcl_is_empty(&head->rc_call_pcl)) if (pcl_is_empty(&head->rc_call_pcl)) {
ret = svc_rdma_read_data_items(info); if (head->rc_read_pcl.cl_count == 1)
ret = svc_rdma_read_data_item(info);
else else
ret = svc_rdma_read_multiple_chunks(info);
} else
ret = svc_rdma_read_special(info); ret = svc_rdma_read_special(info);
if (ret < 0) if (ret < 0)
goto out_err; goto out_err;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment