Commit ff513ace authored by Ilya Dryomov's avatar Ilya Dryomov Committed by Sage Weil

libceph: take map_sem for read in handle_reply()

Handling redirect replies requires both map_sem and request_mutex.
Taking map_sem unconditionally near the top of handle_reply() avoids
possible race conditions that arise from releasing request_mutex to be
able to acquire map_sem in redirect reply case.  (Lock ordering is:
map_sem, request_mutex, crush_mutex.)
Signed-off-by: default avatarIlya Dryomov <ilya.dryomov@inktank.com>
Reviewed-by: default avatarSage Weil <sage@inktank.com>
parent 0bbfdfe8
...@@ -1687,6 +1687,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1687,6 +1687,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
osdmap_epoch = ceph_decode_32(&p); osdmap_epoch = ceph_decode_32(&p);
/* lookup */ /* lookup */
down_read(&osdc->map_sem);
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid); req = __lookup_request(osdc, tid);
if (req == NULL) { if (req == NULL) {
...@@ -1743,7 +1744,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1743,7 +1744,6 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
dout("redirect pool %lld\n", redir.oloc.pool); dout("redirect pool %lld\n", redir.oloc.pool);
__unregister_request(osdc, req); __unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
req->r_target_oloc = redir.oloc; /* struct */ req->r_target_oloc = redir.oloc; /* struct */
...@@ -1755,10 +1755,10 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1755,10 +1755,10 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
* successfully. In the future we might want to follow * successfully. In the future we might want to follow
* original request's nofail setting here. * original request's nofail setting here.
*/ */
err = ceph_osdc_start_request(osdc, req, true); err = __ceph_osdc_start_request(osdc, req, true);
BUG_ON(err); BUG_ON(err);
goto done; goto out_unlock;
} }
already_completed = req->r_got_reply; already_completed = req->r_got_reply;
...@@ -1776,8 +1776,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1776,8 +1776,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
req->r_got_reply = 1; req->r_got_reply = 1;
} else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) { } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
dout("handle_reply tid %llu dup ack\n", tid); dout("handle_reply tid %llu dup ack\n", tid);
mutex_unlock(&osdc->request_mutex); goto out_unlock;
goto done;
} }
dout("handle_reply tid %llu flags %d\n", tid, flags); dout("handle_reply tid %llu flags %d\n", tid, flags);
...@@ -1792,6 +1791,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1792,6 +1791,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
__unregister_request(osdc, req); __unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
if (!already_completed) { if (!already_completed) {
if (req->r_unsafe_callback && if (req->r_unsafe_callback &&
...@@ -1809,10 +1809,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1809,10 +1809,14 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
complete_request(req); complete_request(req);
} }
done: out:
dout("req=%p req->r_linger=%d\n", req, req->r_linger); dout("req=%p req->r_linger=%d\n", req, req->r_linger);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
return; return;
out_unlock:
mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
goto out;
bad_put: bad_put:
req->r_result = -EIO; req->r_result = -EIO;
...@@ -1825,6 +1829,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -1825,6 +1829,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
bad_mutex: bad_mutex:
mutex_unlock(&osdc->request_mutex); mutex_unlock(&osdc->request_mutex);
up_read(&osdc->map_sem);
bad: bad:
pr_err("corrupt osd_op_reply got %d %d\n", pr_err("corrupt osd_op_reply got %d %d\n",
(int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len)); (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment