Commit ec3bc567 authored by Jeff Layton's avatar Jeff Layton Committed by Ilya Dryomov

libceph: new sparse_read op, support sparse reads on msgr2 crc codepath

Add support for a new sparse_read ceph_connection operation. The idea is
that the client driver can define this operation use it to do special
handling for incoming reads.

The alloc_msg routine will look at the request and determine whether the
reply is expected to be sparse. If it is, then we'll dispatch to a
different set of state machine states that will repeatedly call the
driver's sparse_read op to get length and placement info for reading the
extent map, and the extents themselves.

This necessitates adding some new field to some other structs:

- The msg gets a new bool to track whether it's a sparse_read request.

- A new field is added to the cursor to track the amount remaining in the
current extent. This is used to cap the read from the socket into the
msg_data

- Handing a revoke with all of this is particularly difficult, so I've
added a new data_len_remain field to the v2 connection info, and then
use that to skip that much on a revoke. We may want to expand the use of
that to the normal read path as well, just for consistency's sake.
Signed-off-by: default avatarJeff Layton <jlayton@kernel.org>
Reviewed-by: default avatarXiubo Li <xiubli@redhat.com>
Reviewed-and-tested-by: default avatarLuís Henriques <lhenriques@suse.de>
Reviewed-by: default avatarMilind Changire <mchangir@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent a679e50f
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
struct ceph_msg; struct ceph_msg;
struct ceph_connection; struct ceph_connection;
struct ceph_msg_data_cursor;
/* /*
* Ceph defines these callbacks for handling connection events. * Ceph defines these callbacks for handling connection events.
...@@ -70,6 +71,30 @@ struct ceph_connection_operations { ...@@ -70,6 +71,30 @@ struct ceph_connection_operations {
int used_proto, int result, int used_proto, int result,
const int *allowed_protos, int proto_cnt, const int *allowed_protos, int proto_cnt,
const int *allowed_modes, int mode_cnt); const int *allowed_modes, int mode_cnt);
/**
* sparse_read: read sparse data
* @con: connection we're reading from
* @cursor: data cursor for reading extents
* @buf: optional buffer to read into
*
* This should be called more than once, each time setting up to
* receive an extent into the current cursor position, and zeroing
* the holes between them.
*
* Returns amount of data to be read (in bytes), 0 if reading is
* complete, or -errno if there was an error.
*
* If @buf is set on a >0 return, then the data should be read into
* the provided buffer. Otherwise, it should be read into the cursor.
*
* The sparse read operation is expected to initialize the cursor
* with a length covering up to the end of the last extent.
*/
int (*sparse_read)(struct ceph_connection *con,
struct ceph_msg_data_cursor *cursor,
char **buf);
}; };
/* use format string %s%lld */ /* use format string %s%lld */
...@@ -207,6 +232,7 @@ struct ceph_msg_data_cursor { ...@@ -207,6 +232,7 @@ struct ceph_msg_data_cursor {
struct ceph_msg_data *data; /* current data item */ struct ceph_msg_data *data; /* current data item */
size_t resid; /* bytes not yet consumed */ size_t resid; /* bytes not yet consumed */
int sr_resid; /* residual sparse_read len */
bool need_crc; /* crc update needed */ bool need_crc; /* crc update needed */
union { union {
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
...@@ -251,6 +277,7 @@ struct ceph_msg { ...@@ -251,6 +277,7 @@ struct ceph_msg {
struct kref kref; struct kref kref;
bool more_to_follow; bool more_to_follow;
bool needs_out_seq; bool needs_out_seq;
bool sparse_read;
int front_alloc_len; int front_alloc_len;
struct ceph_msgpool *pool; struct ceph_msgpool *pool;
...@@ -395,6 +422,7 @@ struct ceph_connection_v2_info { ...@@ -395,6 +422,7 @@ struct ceph_connection_v2_info {
void *conn_bufs[16]; void *conn_bufs[16];
int conn_buf_cnt; int conn_buf_cnt;
int data_len_remain;
struct kvec in_sign_kvecs[8]; struct kvec in_sign_kvecs[8];
struct kvec out_sign_kvecs[8]; struct kvec out_sign_kvecs[8];
......
...@@ -1013,6 +1013,7 @@ void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor, ...@@ -1013,6 +1013,7 @@ void ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor,
cursor->total_resid = length; cursor->total_resid = length;
cursor->data = msg->data; cursor->data = msg->data;
cursor->sr_resid = 0;
__ceph_msg_data_cursor_init(cursor); __ceph_msg_data_cursor_init(cursor);
} }
......
...@@ -58,8 +58,10 @@ ...@@ -58,8 +58,10 @@
#define IN_S_PREPARE_READ_DATA 4 #define IN_S_PREPARE_READ_DATA 4
#define IN_S_PREPARE_READ_DATA_CONT 5 #define IN_S_PREPARE_READ_DATA_CONT 5
#define IN_S_PREPARE_READ_ENC_PAGE 6 #define IN_S_PREPARE_READ_ENC_PAGE 6
#define IN_S_HANDLE_EPILOGUE 7 #define IN_S_PREPARE_SPARSE_DATA 7
#define IN_S_FINISH_SKIP 8 #define IN_S_PREPARE_SPARSE_DATA_CONT 8
#define IN_S_HANDLE_EPILOGUE 9
#define IN_S_FINISH_SKIP 10
#define OUT_S_QUEUE_DATA 1 #define OUT_S_QUEUE_DATA 1
#define OUT_S_QUEUE_DATA_CONT 2 #define OUT_S_QUEUE_DATA_CONT 2
...@@ -1825,6 +1827,123 @@ static void prepare_read_data_cont(struct ceph_connection *con) ...@@ -1825,6 +1827,123 @@ static void prepare_read_data_cont(struct ceph_connection *con)
con->v2.in_state = IN_S_HANDLE_EPILOGUE; con->v2.in_state = IN_S_HANDLE_EPILOGUE;
} }
static int prepare_sparse_read_cont(struct ceph_connection *con)
{
int ret;
struct bio_vec bv;
char *buf = NULL;
struct ceph_msg_data_cursor *cursor = &con->v2.in_cursor;
WARN_ON(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_CONT);
if (iov_iter_is_bvec(&con->v2.in_iter)) {
if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
con->in_data_crc = crc32c(con->in_data_crc,
page_address(con->bounce_page),
con->v2.in_bvec.bv_len);
get_bvec_at(cursor, &bv);
memcpy_to_page(bv.bv_page, bv.bv_offset,
page_address(con->bounce_page),
con->v2.in_bvec.bv_len);
} else {
con->in_data_crc = ceph_crc32c_page(con->in_data_crc,
con->v2.in_bvec.bv_page,
con->v2.in_bvec.bv_offset,
con->v2.in_bvec.bv_len);
}
ceph_msg_data_advance(cursor, con->v2.in_bvec.bv_len);
cursor->sr_resid -= con->v2.in_bvec.bv_len;
dout("%s: advance by 0x%x sr_resid 0x%x\n", __func__,
con->v2.in_bvec.bv_len, cursor->sr_resid);
WARN_ON_ONCE(cursor->sr_resid > cursor->total_resid);
if (cursor->sr_resid) {
get_bvec_at(cursor, &bv);
if (bv.bv_len > cursor->sr_resid)
bv.bv_len = cursor->sr_resid;
if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
bv.bv_page = con->bounce_page;
bv.bv_offset = 0;
}
set_in_bvec(con, &bv);
con->v2.data_len_remain -= bv.bv_len;
return 0;
}
} else if (iov_iter_is_kvec(&con->v2.in_iter)) {
/* On first call, we have no kvec so don't compute crc */
if (con->v2.in_kvec_cnt) {
WARN_ON_ONCE(con->v2.in_kvec_cnt > 1);
con->in_data_crc = crc32c(con->in_data_crc,
con->v2.in_kvecs[0].iov_base,
con->v2.in_kvecs[0].iov_len);
}
} else {
return -EIO;
}
/* get next extent */
ret = con->ops->sparse_read(con, cursor, &buf);
if (ret <= 0) {
if (ret < 0)
return ret;
reset_in_kvecs(con);
add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
con->v2.in_state = IN_S_HANDLE_EPILOGUE;
return 0;
}
if (buf) {
/* receive into buffer */
reset_in_kvecs(con);
add_in_kvec(con, buf, ret);
con->v2.data_len_remain -= ret;
return 0;
}
if (ret > cursor->total_resid) {
pr_warn("%s: ret 0x%x total_resid 0x%zx resid 0x%zx\n",
__func__, ret, cursor->total_resid, cursor->resid);
return -EIO;
}
get_bvec_at(cursor, &bv);
if (bv.bv_len > cursor->sr_resid)
bv.bv_len = cursor->sr_resid;
if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
if (unlikely(!con->bounce_page)) {
con->bounce_page = alloc_page(GFP_NOIO);
if (!con->bounce_page) {
pr_err("failed to allocate bounce page\n");
return -ENOMEM;
}
}
bv.bv_page = con->bounce_page;
bv.bv_offset = 0;
}
set_in_bvec(con, &bv);
con->v2.data_len_remain -= ret;
return ret;
}
static int prepare_sparse_read_data(struct ceph_connection *con)
{
struct ceph_msg *msg = con->in_msg;
dout("%s: starting sparse read\n", __func__);
if (WARN_ON_ONCE(!con->ops->sparse_read))
return -EOPNOTSUPP;
if (!con_secure(con))
con->in_data_crc = -1;
reset_in_kvecs(con);
con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_CONT;
con->v2.data_len_remain = data_len(msg);
return prepare_sparse_read_cont(con);
}
static int prepare_read_tail_plain(struct ceph_connection *con) static int prepare_read_tail_plain(struct ceph_connection *con)
{ {
struct ceph_msg *msg = con->in_msg; struct ceph_msg *msg = con->in_msg;
...@@ -1845,6 +1964,9 @@ static int prepare_read_tail_plain(struct ceph_connection *con) ...@@ -1845,6 +1964,9 @@ static int prepare_read_tail_plain(struct ceph_connection *con)
} }
if (data_len(msg)) { if (data_len(msg)) {
if (msg->sparse_read)
con->v2.in_state = IN_S_PREPARE_SPARSE_DATA;
else
con->v2.in_state = IN_S_PREPARE_READ_DATA; con->v2.in_state = IN_S_PREPARE_READ_DATA;
} else { } else {
add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN); add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
...@@ -2898,6 +3020,12 @@ static int populate_in_iter(struct ceph_connection *con) ...@@ -2898,6 +3020,12 @@ static int populate_in_iter(struct ceph_connection *con)
prepare_read_enc_page(con); prepare_read_enc_page(con);
ret = 0; ret = 0;
break; break;
case IN_S_PREPARE_SPARSE_DATA:
ret = prepare_sparse_read_data(con);
break;
case IN_S_PREPARE_SPARSE_DATA_CONT:
ret = prepare_sparse_read_cont(con);
break;
case IN_S_HANDLE_EPILOGUE: case IN_S_HANDLE_EPILOGUE:
ret = handle_epilogue(con); ret = handle_epilogue(con);
break; break;
...@@ -3489,6 +3617,23 @@ static void revoke_at_prepare_read_enc_page(struct ceph_connection *con) ...@@ -3489,6 +3617,23 @@ static void revoke_at_prepare_read_enc_page(struct ceph_connection *con)
con->v2.in_state = IN_S_FINISH_SKIP; con->v2.in_state = IN_S_FINISH_SKIP;
} }
static void revoke_at_prepare_sparse_data(struct ceph_connection *con)
{
int resid; /* current piece of data */
int remaining;
WARN_ON(con_secure(con));
WARN_ON(!data_len(con->in_msg));
WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter));
resid = iov_iter_count(&con->v2.in_iter);
dout("%s con %p resid %d\n", __func__, con, resid);
remaining = CEPH_EPILOGUE_PLAIN_LEN + con->v2.data_len_remain;
con->v2.in_iter.count -= resid;
set_in_skip(con, resid + remaining);
con->v2.in_state = IN_S_FINISH_SKIP;
}
static void revoke_at_handle_epilogue(struct ceph_connection *con) static void revoke_at_handle_epilogue(struct ceph_connection *con)
{ {
int resid; int resid;
...@@ -3505,6 +3650,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con) ...@@ -3505,6 +3650,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con)
void ceph_con_v2_revoke_incoming(struct ceph_connection *con) void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
{ {
switch (con->v2.in_state) { switch (con->v2.in_state) {
case IN_S_PREPARE_SPARSE_DATA:
case IN_S_PREPARE_READ_DATA: case IN_S_PREPARE_READ_DATA:
revoke_at_prepare_read_data(con); revoke_at_prepare_read_data(con);
break; break;
...@@ -3514,6 +3660,9 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con) ...@@ -3514,6 +3660,9 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
case IN_S_PREPARE_READ_ENC_PAGE: case IN_S_PREPARE_READ_ENC_PAGE:
revoke_at_prepare_read_enc_page(con); revoke_at_prepare_read_enc_page(con);
break; break;
case IN_S_PREPARE_SPARSE_DATA_CONT:
revoke_at_prepare_sparse_data(con);
break;
case IN_S_HANDLE_EPILOGUE: case IN_S_HANDLE_EPILOGUE:
revoke_at_handle_epilogue(con); revoke_at_handle_epilogue(con);
break; break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment