Commit ca07eda3 authored by Chuck Lever's avatar Chuck Lever

SUNRPC: Refactor svc_recvfrom()

This function is not currently "generic" so remove the documenting
comment and rename it appropriately. Its internals are converted to
use bio_vecs for reading from the transport socket.

In existing typical sunrpc uses of bio_vecs, the bio_vec array is
allocated dynamically. Here, instead, an array of bio_vecs is added
to svc_rqst. The lifetime of this array can be greater than one call
to xpo_recvfrom():

- Multiple calls to xpo_recvfrom() might be needed to read an RPC
  message completely.

- At some later point, rq_arg.bvecs will point to this array and it
  will carry the received message into svc_process().

I also expect that a future optimization will remove either the
rq_vec or rq_pages array in favor of rq_bvec, thus conserving the
size of struct svc_rqst.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent cca557a5
...@@ -254,6 +254,7 @@ struct svc_rqst { ...@@ -254,6 +254,7 @@ struct svc_rqst {
struct page * *rq_page_end; /* one past the last page */ struct page * *rq_page_end; /* one past the last page */
struct kvec rq_vec[RPCSVC_MAXPAGES]; /* generally useful.. */ struct kvec rq_vec[RPCSVC_MAXPAGES]; /* generally useful.. */
struct bio_vec rq_bvec[RPCSVC_MAXPAGES];
__be32 rq_xid; /* transmission id */ __be32 rq_xid; /* transmission id */
u32 rq_prog; /* program number */ u32 rq_prog; /* program number */
......
...@@ -223,26 +223,62 @@ static int svc_one_sock_name(struct svc_sock *svsk, char *buf, int remaining) ...@@ -223,26 +223,62 @@ static int svc_one_sock_name(struct svc_sock *svsk, char *buf, int remaining)
return len; return len;
} }
#if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE
static void svc_flush_bvec(const struct bio_vec *bvec, size_t size, size_t seek)
{
struct bvec_iter bi = {
.bi_size = size,
};
struct bio_vec bv;
bvec_iter_advance(bvec, &bi, seek & PAGE_MASK);
for_each_bvec(bv, bvec, bi, bi)
flush_dcache_page(bv.bv_page);
}
#else
static inline void svc_flush_bvec(const struct bio_vec *bvec, size_t size,
size_t seek)
{
}
#endif
/* /*
* Generic recvfrom routine. * Read from @rqstp's transport socket. The incoming message fills whole
* pages in @rqstp's rq_pages array until the last page of the message
* has been received into a partial page.
*/ */
static ssize_t svc_recvfrom(struct svc_rqst *rqstp, struct kvec *iov, static ssize_t svc_tcp_read_msg(struct svc_rqst *rqstp, size_t buflen,
unsigned int nr, size_t buflen, unsigned int base) size_t seek)
{ {
struct svc_sock *svsk = struct svc_sock *svsk =
container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
struct bio_vec *bvec = rqstp->rq_bvec;
struct msghdr msg = { NULL }; struct msghdr msg = { NULL };
unsigned int i;
ssize_t len; ssize_t len;
size_t t;
rqstp->rq_xprt_hlen = 0; rqstp->rq_xprt_hlen = 0;
clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
iov_iter_kvec(&msg.msg_iter, READ, iov, nr, buflen);
if (base != 0) { for (i = 0, t = 0; t < buflen; i++, t += PAGE_SIZE) {
iov_iter_advance(&msg.msg_iter, base); bvec[i].bv_page = rqstp->rq_pages[i];
buflen -= base; bvec[i].bv_len = PAGE_SIZE;
bvec[i].bv_offset = 0;
}
rqstp->rq_respages = &rqstp->rq_pages[i];
rqstp->rq_next_page = rqstp->rq_respages + 1;
iov_iter_bvec(&msg.msg_iter, READ, bvec, i, buflen);
if (seek) {
iov_iter_advance(&msg.msg_iter, seek);
buflen -= seek;
} }
len = sock_recvmsg(svsk->sk_sock, &msg, MSG_DONTWAIT); len = sock_recvmsg(svsk->sk_sock, &msg, MSG_DONTWAIT);
if (len > 0)
svc_flush_bvec(bvec, len, seek);
/* If we read a full record, then assume there may be more /* If we read a full record, then assume there may be more
* data to read (stream based sockets only!) * data to read (stream based sockets only!)
*/ */
...@@ -775,13 +811,14 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt) ...@@ -775,13 +811,14 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt)
return NULL; return NULL;
} }
static unsigned int svc_tcp_restore_pages(struct svc_sock *svsk, struct svc_rqst *rqstp) static size_t svc_tcp_restore_pages(struct svc_sock *svsk,
struct svc_rqst *rqstp)
{ {
unsigned int i, len, npages; size_t len = svsk->sk_datalen;
unsigned int i, npages;
if (svsk->sk_datalen == 0) if (!len)
return 0; return 0;
len = svsk->sk_datalen;
npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT; npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
for (i = 0; i < npages; i++) { for (i = 0; i < npages; i++) {
if (rqstp->rq_pages[i] != NULL) if (rqstp->rq_pages[i] != NULL)
...@@ -917,20 +954,6 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp) ...@@ -917,20 +954,6 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
return -EAGAIN; return -EAGAIN;
} }
static int copy_pages_to_kvecs(struct kvec *vec, struct page **pages, int len)
{
int i = 0;
int t = 0;
while (t < len) {
vec[i].iov_base = page_address(pages[i]);
vec[i].iov_len = PAGE_SIZE;
i++;
t += PAGE_SIZE;
}
return i;
}
static void svc_tcp_fragment_received(struct svc_sock *svsk) static void svc_tcp_fragment_received(struct svc_sock *svsk)
{ {
/* If we have more data, signal svc_xprt_enqueue() to try again */ /* If we have more data, signal svc_xprt_enqueue() to try again */
...@@ -938,20 +961,33 @@ static void svc_tcp_fragment_received(struct svc_sock *svsk) ...@@ -938,20 +961,33 @@ static void svc_tcp_fragment_received(struct svc_sock *svsk)
svsk->sk_marker = xdr_zero; svsk->sk_marker = xdr_zero;
} }
/* /**
* Receive data from a TCP socket. * svc_tcp_recvfrom - Receive data from a TCP socket
* @rqstp: request structure into which to receive an RPC Call
*
* Called in a loop when XPT_DATA has been set.
*
* Read the 4-byte stream record marker, then use the record length
* in that marker to set up exactly the resources needed to receive
* the next RPC message into @rqstp.
*
* Returns:
* On success, the number of bytes in a received RPC Call, or
* %0 if a complete RPC Call message was not ready to return
*
* The zero return case handles partial receives and callback Replies.
* The state of a partial receive is preserved in the svc_sock for
* the next call to svc_tcp_recvfrom.
*/ */
static int svc_tcp_recvfrom(struct svc_rqst *rqstp) static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
{ {
struct svc_sock *svsk = struct svc_sock *svsk =
container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt); container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
struct svc_serv *serv = svsk->sk_xprt.xpt_server; struct svc_serv *serv = svsk->sk_xprt.xpt_server;
int len; size_t want, base;
struct kvec *vec; ssize_t len;
unsigned int want, base;
__be32 *p; __be32 *p;
__be32 calldir; __be32 calldir;
int pnum;
clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
len = svc_tcp_read_marker(svsk, rqstp); len = svc_tcp_read_marker(svsk, rqstp);
...@@ -960,16 +996,7 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp) ...@@ -960,16 +996,7 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
base = svc_tcp_restore_pages(svsk, rqstp); base = svc_tcp_restore_pages(svsk, rqstp);
want = len - (svsk->sk_tcplen - sizeof(rpc_fraghdr)); want = len - (svsk->sk_tcplen - sizeof(rpc_fraghdr));
len = svc_tcp_read_msg(rqstp, base + want, base);
vec = rqstp->rq_vec;
pnum = copy_pages_to_kvecs(&vec[0], &rqstp->rq_pages[0], base + want);
rqstp->rq_respages = &rqstp->rq_pages[pnum];
rqstp->rq_next_page = rqstp->rq_respages + 1;
/* Now receive data */
len = svc_recvfrom(rqstp, vec, pnum, base + want, base);
if (len >= 0) { if (len >= 0) {
trace_svcsock_tcp_recv(&svsk->sk_xprt, len); trace_svcsock_tcp_recv(&svsk->sk_xprt, len);
svsk->sk_tcplen += len; svsk->sk_tcplen += len;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment