Commit d15f9d69 authored by Ilya Dryomov's avatar Ilya Dryomov

libceph: check data_len in ->alloc_msg()

Only ->alloc_msg() should check data_len of the incoming message
against the preallocated ceph_msg, doing it in the messenger is not
right.  The contract is that either ->alloc_msg() returns a ceph_msg
which will fit all of the portions of the incoming message, or it
returns NULL and possibly sets skip, signaling whether NULL is due to
an -ENOMEM.  ->alloc_msg() should be the only place where we make the
skip/no-skip decision.

I stumbled upon this while looking at con/osd ref counting.  Right now,
if we get a non-extent message with a larger data portion than we are
prepared for, ->alloc_msg() returns a ceph_msg, and then, when we skip
it in the messenger, we don't put the con/osd ref acquired in
ceph_con_in_msg_alloc() (which is normally put in process_message()),
so this also fixes a memory leak.

An existing BUG_ON in ceph_msg_data_cursor_init() ensures we don't
corrupt random memory should a buggy ->alloc_msg() return an unfit
ceph_msg.

While at it, I changed the "unknown tid" dout() to a pr_warn() to make
sure all skips are seen and unified format strings.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
Reviewed-by: default avatarAlex Elder <elder@linaro.org>
parent 8b9558aa
...@@ -2337,13 +2337,6 @@ static int read_partial_message(struct ceph_connection *con) ...@@ -2337,13 +2337,6 @@ static int read_partial_message(struct ceph_connection *con)
return ret; return ret;
BUG_ON(!con->in_msg ^ skip); BUG_ON(!con->in_msg ^ skip);
if (con->in_msg && data_len > con->in_msg->data_length) {
pr_warn("%s skipping long message (%u > %zd)\n",
__func__, data_len, con->in_msg->data_length);
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
skip = 1;
}
if (skip) { if (skip) {
/* skip this message */ /* skip this message */
dout("alloc_msg said skip message\n"); dout("alloc_msg said skip message\n");
......
...@@ -2817,8 +2817,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg) ...@@ -2817,8 +2817,9 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
} }
/* /*
* lookup and return message for incoming reply. set up reply message * Lookup and return message for incoming reply. Don't try to do
* pages. * anything about a larger than preallocated data portion of the
* message at the moment - for now, just skip the message.
*/ */
static struct ceph_msg *get_reply(struct ceph_connection *con, static struct ceph_msg *get_reply(struct ceph_connection *con,
struct ceph_msg_header *hdr, struct ceph_msg_header *hdr,
...@@ -2836,10 +2837,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -2836,10 +2837,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid); req = __lookup_request(osdc, tid);
if (!req) { if (!req) {
*skip = 1; pr_warn("%s osd%d tid %llu unknown, skipping\n",
__func__, osd->o_osd, tid);
m = NULL; m = NULL;
dout("get_reply unknown tid %llu from osd%d\n", tid, *skip = 1;
osd->o_osd);
goto out; goto out;
} }
...@@ -2849,10 +2850,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -2849,10 +2850,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
ceph_msg_revoke_incoming(req->r_reply); ceph_msg_revoke_incoming(req->r_reply);
if (front_len > req->r_reply->front_alloc_len) { if (front_len > req->r_reply->front_alloc_len) {
pr_warn("get_reply front %d > preallocated %d (%u#%llu)\n", pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
front_len, req->r_reply->front_alloc_len, __func__, osd->o_osd, req->r_tid, front_len,
(unsigned int)con->peer_name.type, req->r_reply->front_alloc_len);
le64_to_cpu(con->peer_name.num));
m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS, m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
false); false);
if (!m) if (!m)
...@@ -2860,37 +2860,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con, ...@@ -2860,37 +2860,22 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
ceph_msg_put(req->r_reply); ceph_msg_put(req->r_reply);
req->r_reply = m; req->r_reply = m;
} }
m = ceph_msg_get(req->r_reply);
if (data_len > 0) {
struct ceph_osd_data *osd_data;
/*
* XXX This is assuming there is only one op containing
* XXX page data. Probably OK for reads, but this
* XXX ought to be done more generally.
*/
osd_data = osd_req_op_extent_osd_data(req, 0);
if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
if (osd_data->pages &&
unlikely(osd_data->length < data_len)) {
pr_warn("tid %lld reply has %d bytes we had only %llu bytes ready\n", if (data_len > req->r_reply->data_length) {
tid, data_len, osd_data->length); pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
*skip = 1; __func__, osd->o_osd, req->r_tid, data_len,
ceph_msg_put(m); req->r_reply->data_length);
m = NULL; m = NULL;
*skip = 1;
goto out; goto out;
} }
}
} m = ceph_msg_get(req->r_reply);
*skip = 0;
dout("get_reply tid %lld %p\n", tid, m); dout("get_reply tid %lld %p\n", tid, m);
out: out:
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
return m; return m;
} }
static struct ceph_msg *alloc_msg(struct ceph_connection *con, static struct ceph_msg *alloc_msg(struct ceph_connection *con,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment