Commit 8e46a2d0 authored by Xiubo Li's avatar Xiubo Li Committed by Ilya Dryomov

libceph: just wait for more data to be available on the socket

A short read may occur while reading the message footer from the
socket.  Later, when the socket is ready for another read, the
messenger invokes all read_partial_*() handlers, including
read_partial_sparse_msg_data().  The expectation is that
read_partial_sparse_msg_data() would bail, allowing the messenger to
invoke read_partial() for the footer and pick up where it left off.

However read_partial_sparse_msg_data() violates that and ends up
calling into the state machine in the OSD client.  The sparse-read
state machine assumes that it's a new op and interprets some piece of
the footer as the sparse-read header and returns bogus extents/data
length, etc.

To determine whether read_partial_sparse_msg_data() should bail, let's
reuse cursor->total_resid.  Because once it reaches to zero that means
all the extents and data have been successfully received in last read,
else it could break out when partially reading any of the extents and
data.  And then osd_sparse_read() could continue where it left off.

[ idryomov: changelog ]

Link: https://tracker.ceph.com/issues/63586
Fixes: d396f89d ("libceph: add sparse read support to msgr1")
Signed-off-by: default avatarXiubo Li <xiubli@redhat.com>
Reviewed-by: default avatarJeff Layton <jlayton@kernel.org>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent ee97302f
...@@ -283,7 +283,7 @@ struct ceph_msg { ...@@ -283,7 +283,7 @@ struct ceph_msg {
struct kref kref; struct kref kref;
bool more_to_follow; bool more_to_follow;
bool needs_out_seq; bool needs_out_seq;
bool sparse_read; u64 sparse_read_total;
int front_alloc_len; int front_alloc_len;
struct ceph_msgpool *pool; struct ceph_msgpool *pool;
......
...@@ -160,8 +160,9 @@ static size_t sizeof_footer(struct ceph_connection *con) ...@@ -160,8 +160,9 @@ static size_t sizeof_footer(struct ceph_connection *con)
static void prepare_message_data(struct ceph_msg *msg, u32 data_len) static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
{ {
/* Initialize data cursor if it's not a sparse read */ /* Initialize data cursor if it's not a sparse read */
if (!msg->sparse_read) u64 len = msg->sparse_read_total ? : data_len;
ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
ceph_msg_data_cursor_init(&msg->cursor, msg, len);
} }
/* /*
...@@ -1036,7 +1037,7 @@ static int read_partial_sparse_msg_data(struct ceph_connection *con) ...@@ -1036,7 +1037,7 @@ static int read_partial_sparse_msg_data(struct ceph_connection *con)
if (do_datacrc) if (do_datacrc)
crc = con->in_data_crc; crc = con->in_data_crc;
do { while (cursor->total_resid) {
if (con->v1.in_sr_kvec.iov_base) if (con->v1.in_sr_kvec.iov_base)
ret = read_partial_message_chunk(con, ret = read_partial_message_chunk(con,
&con->v1.in_sr_kvec, &con->v1.in_sr_kvec,
...@@ -1044,23 +1045,23 @@ static int read_partial_sparse_msg_data(struct ceph_connection *con) ...@@ -1044,23 +1045,23 @@ static int read_partial_sparse_msg_data(struct ceph_connection *con)
&crc); &crc);
else if (cursor->sr_resid > 0) else if (cursor->sr_resid > 0)
ret = read_partial_sparse_msg_extent(con, &crc); ret = read_partial_sparse_msg_extent(con, &crc);
if (ret <= 0)
if (ret <= 0) { break;
if (do_datacrc)
con->in_data_crc = crc;
return ret;
}
memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec)); memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
ret = con->ops->sparse_read(con, cursor, ret = con->ops->sparse_read(con, cursor,
(char **)&con->v1.in_sr_kvec.iov_base); (char **)&con->v1.in_sr_kvec.iov_base);
if (ret <= 0) {
ret = ret ? ret : 1; /* must return > 0 to indicate success */
break;
}
con->v1.in_sr_len = ret; con->v1.in_sr_len = ret;
} while (ret > 0); }
if (do_datacrc) if (do_datacrc)
con->in_data_crc = crc; con->in_data_crc = crc;
return ret < 0 ? ret : 1; /* must return > 0 to indicate success */ return ret;
} }
static int read_partial_msg_data(struct ceph_connection *con) static int read_partial_msg_data(struct ceph_connection *con)
...@@ -1253,7 +1254,7 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -1253,7 +1254,7 @@ static int read_partial_message(struct ceph_connection *con)
if (!m->num_data_items) if (!m->num_data_items)
return -EIO; return -EIO;
if (m->sparse_read) if (m->sparse_read_total)
ret = read_partial_sparse_msg_data(con); ret = read_partial_sparse_msg_data(con);
else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
ret = read_partial_msg_data_bounce(con); ret = read_partial_msg_data_bounce(con);
......
...@@ -1128,7 +1128,7 @@ static int decrypt_tail(struct ceph_connection *con) ...@@ -1128,7 +1128,7 @@ static int decrypt_tail(struct ceph_connection *con)
struct sg_table enc_sgt = {}; struct sg_table enc_sgt = {};
struct sg_table sgt = {}; struct sg_table sgt = {};
struct page **pages = NULL; struct page **pages = NULL;
bool sparse = con->in_msg->sparse_read; bool sparse = !!con->in_msg->sparse_read_total;
int dpos = 0; int dpos = 0;
int tail_len; int tail_len;
int ret; int ret;
...@@ -2060,7 +2060,7 @@ static int prepare_read_tail_plain(struct ceph_connection *con) ...@@ -2060,7 +2060,7 @@ static int prepare_read_tail_plain(struct ceph_connection *con)
} }
if (data_len(msg)) { if (data_len(msg)) {
if (msg->sparse_read) if (msg->sparse_read_total)
con->v2.in_state = IN_S_PREPARE_SPARSE_DATA; con->v2.in_state = IN_S_PREPARE_SPARSE_DATA;
else else
con->v2.in_state = IN_S_PREPARE_READ_DATA; con->v2.in_state = IN_S_PREPARE_READ_DATA;
......
...@@ -5510,7 +5510,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -5510,7 +5510,7 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
} }
m = ceph_msg_get(req->r_reply); m = ceph_msg_get(req->r_reply);
m->sparse_read = (bool)srlen; m->sparse_read_total = srlen;
dout("get_reply tid %lld %p\n", tid, m); dout("get_reply tid %lld %p\n", tid, m);
...@@ -5777,11 +5777,8 @@ static int prep_next_sparse_read(struct ceph_connection *con, ...@@ -5777,11 +5777,8 @@ static int prep_next_sparse_read(struct ceph_connection *con,
} }
if (o->o_sparse_op_idx < 0) { if (o->o_sparse_op_idx < 0) {
u64 srlen = sparse_data_requested(req); dout("%s: [%d] starting new sparse read req\n",
__func__, o->o_osd);
dout("%s: [%d] starting new sparse read req. srlen=0x%llx\n",
__func__, o->o_osd, srlen);
ceph_msg_data_cursor_init(cursor, con->in_msg, srlen);
} else { } else {
u64 end; u64 end;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment