Commit 26f887e0 authored by Ilya Dryomov's avatar Ilya Dryomov

libceph, rbd, ceph: move ceph_osdc_alloc_messages() calls

The current requirement is that ceph_osdc_alloc_messages() should be
called after oid and oloc are known.  In preparation for preallocating
message data items, move ceph_osdc_alloc_messages() further down, so
that it is called when OSD op codes are known.
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 39e58c34
...@@ -1500,9 +1500,6 @@ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops) ...@@ -1500,9 +1500,6 @@ rbd_osd_req_create(struct rbd_obj_request *obj_req, unsigned int num_ops)
rbd_dev->header.object_prefix, obj_req->ex.oe_objno)) rbd_dev->header.object_prefix, obj_req->ex.oe_objno))
goto err_req; goto err_req;
if (ceph_osdc_alloc_messages(req, GFP_NOIO))
goto err_req;
return req; return req;
err_req: err_req:
...@@ -1945,6 +1942,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req) ...@@ -1945,6 +1942,10 @@ static int __rbd_img_fill_request(struct rbd_img_request *img_req)
} }
if (ret) if (ret)
return ret; return ret;
ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
if (ret)
return ret;
} }
return 0; return 0;
...@@ -2404,6 +2405,10 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes) ...@@ -2404,6 +2405,10 @@ static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
rbd_assert(0); rbd_assert(0);
} }
ret = ceph_osdc_alloc_messages(obj_req->osd_req, GFP_NOIO);
if (ret)
return ret;
rbd_obj_request_submit(obj_req); rbd_obj_request_submit(obj_req);
return 0; return 0;
} }
...@@ -3783,10 +3788,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, ...@@ -3783,10 +3788,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
ceph_oloc_copy(&req->r_base_oloc, oloc); ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ; req->r_flags = CEPH_OSD_FLAG_READ;
ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
if (ret)
goto out_req;
pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
if (IS_ERR(pages)) { if (IS_ERR(pages)) {
ret = PTR_ERR(pages); ret = PTR_ERR(pages);
...@@ -3797,6 +3798,10 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev, ...@@ -3797,6 +3798,10 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false, osd_req_op_extent_osd_data_pages(req, 0, pages, buf_len, 0, false,
true); true);
ret = ceph_osdc_alloc_messages(req, GFP_KERNEL);
if (ret)
goto out_req;
ceph_osdc_start_request(osdc, req, false); ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req); ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) if (ret >= 0)
......
...@@ -870,6 +870,11 @@ static void ceph_aio_retry_work(struct work_struct *work) ...@@ -870,6 +870,11 @@ static void ceph_aio_retry_work(struct work_struct *work)
ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc);
ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid);
req->r_ops[0] = orig_req->r_ops[0];
req->r_mtime = aio_req->mtime;
req->r_data_offset = req->r_ops[0].extent.offset;
ret = ceph_osdc_alloc_messages(req, GFP_NOFS); ret = ceph_osdc_alloc_messages(req, GFP_NOFS);
if (ret) { if (ret) {
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
...@@ -877,11 +882,6 @@ static void ceph_aio_retry_work(struct work_struct *work) ...@@ -877,11 +882,6 @@ static void ceph_aio_retry_work(struct work_struct *work)
goto out; goto out;
} }
req->r_ops[0] = orig_req->r_ops[0];
req->r_mtime = aio_req->mtime;
req->r_data_offset = req->r_ops[0].extent.offset;
ceph_osdc_put_request(orig_req); ceph_osdc_put_request(orig_req);
req->r_callback = ceph_aio_complete_req; req->r_callback = ceph_aio_complete_req;
......
...@@ -4483,12 +4483,6 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq) ...@@ -4483,12 +4483,6 @@ alloc_linger_request(struct ceph_osd_linger_request *lreq)
ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid); ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc); ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
ceph_osdc_put_request(req);
return NULL;
}
return req; return req;
} }
...@@ -4506,6 +4500,12 @@ alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode) ...@@ -4506,6 +4500,12 @@ alloc_watch_request(struct ceph_osd_linger_request *lreq, u8 watch_opcode)
* filled in by linger_submit(). * filled in by linger_submit().
*/ */
osd_req_op_watch_init(req, 0, 0, watch_opcode); osd_req_op_watch_init(req, 0, 0, watch_opcode);
if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
ceph_osdc_put_request(req);
return NULL;
}
return req; return req;
} }
...@@ -4656,12 +4656,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc, ...@@ -4656,12 +4656,12 @@ int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
ceph_oloc_copy(&req->r_base_oloc, oloc); ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ; req->r_flags = CEPH_OSD_FLAG_READ;
ret = ceph_osdc_alloc_messages(req, GFP_NOIO); ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
payload_len);
if (ret) if (ret)
goto out_put_req; goto out_put_req;
ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload, ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
payload_len);
if (ret) if (ret)
goto out_put_req; goto out_put_req;
...@@ -4766,6 +4766,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc, ...@@ -4766,6 +4766,10 @@ int ceph_osdc_notify(struct ceph_osd_client *osdc,
response_data), response_data),
pages, PAGE_SIZE, 0, false, true); pages, PAGE_SIZE, 0, false, true);
ret = ceph_osdc_alloc_messages(lreq->reg_req, GFP_NOIO);
if (ret)
goto out_put_lreq;
linger_submit(lreq); linger_submit(lreq);
ret = linger_reg_commit_wait(lreq); ret = linger_reg_commit_wait(lreq);
if (!ret) if (!ret)
...@@ -4892,10 +4896,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, ...@@ -4892,10 +4896,6 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
ceph_oloc_copy(&req->r_base_oloc, oloc); ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = CEPH_OSD_FLAG_READ; req->r_flags = CEPH_OSD_FLAG_READ;
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;
pages = ceph_alloc_page_vector(1, GFP_NOIO); pages = ceph_alloc_page_vector(1, GFP_NOIO);
if (IS_ERR(pages)) { if (IS_ERR(pages)) {
ret = PTR_ERR(pages); ret = PTR_ERR(pages);
...@@ -4907,6 +4907,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc, ...@@ -4907,6 +4907,10 @@ int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
response_data), response_data),
pages, PAGE_SIZE, 0, false, true); pages, PAGE_SIZE, 0, false, true);
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;
ceph_osdc_start_request(osdc, req, false); ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req); ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) { if (ret >= 0) {
...@@ -4969,10 +4973,6 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, ...@@ -4969,10 +4973,6 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
ceph_oloc_copy(&req->r_base_oloc, oloc); ceph_oloc_copy(&req->r_base_oloc, oloc);
req->r_flags = flags; req->r_flags = flags;
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;
ret = osd_req_op_cls_init(req, 0, class, method); ret = osd_req_op_cls_init(req, 0, class, method);
if (ret) if (ret)
goto out_put_req; goto out_put_req;
...@@ -4984,6 +4984,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc, ...@@ -4984,6 +4984,10 @@ int ceph_osdc_call(struct ceph_osd_client *osdc,
osd_req_op_cls_response_data_pages(req, 0, &resp_page, osd_req_op_cls_response_data_pages(req, 0, &resp_page,
*resp_len, 0, false, false); *resp_len, 0, false, false);
ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
if (ret)
goto out_put_req;
ceph_osdc_start_request(osdc, req, false); ceph_osdc_start_request(osdc, req, false);
ret = ceph_osdc_wait_request(osdc, req); ret = ceph_osdc_wait_request(osdc, req);
if (ret >= 0) { if (ret >= 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment