Commit 1fec7093 authored by Yehuda Sadeh's avatar Yehuda Sadeh Committed by Sage Weil

rbd: fix split bio handling

The rbd driver currently splits bios when they span an object boundary.
However, the blk_end_request expects the completions to roll up the results
in block device order, and the split rbd/ceph ops can complete in any
order.  This patch adds a struct rbd_req_coll to track completion of split
requests and ensures that the results are passed back up to the block layer
in order.

This fixes errors where the file system gets completion of a read operation
that spans an object boundary before the data has actually arrived.  The
bug is easily reproduced with iozone with a working set larger than
available RAM.
Reported-by: default avatarFyodor Ustinov <ufm@ufm.su>
Signed-off-by: default avatarYehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent 11f77002
......@@ -92,6 +92,8 @@ struct rbd_client {
struct list_head node;
};
struct rbd_req_coll;
/*
* a single io request
*/
......@@ -100,6 +102,24 @@ struct rbd_request {
struct bio *bio; /* cloned bio */
struct page **pages; /* list of used pages */
u64 len;
int coll_index;
struct rbd_req_coll *coll;
};
struct rbd_req_status {
int done;
int rc;
u64 bytes;
};
/*
* a collection of requests
*/
struct rbd_req_coll {
int total;
int num_done;
struct kref kref;
struct rbd_req_status status[0];
};
struct rbd_snap {
......@@ -416,6 +436,17 @@ static void rbd_put_client(struct rbd_device *rbd_dev)
rbd_dev->client = NULL;
}
/*
* Destroy requests collection
*/
static void rbd_coll_release(struct kref *kref)
{
struct rbd_req_coll *coll =
container_of(kref, struct rbd_req_coll, kref);
dout("rbd_coll_release %p\n", coll);
kfree(coll);
}
/*
* Create a new header structure, translate header format from the on-disk
......@@ -590,6 +621,14 @@ static u64 rbd_get_segment(struct rbd_image_header *header,
return len;
}
static int rbd_get_num_segments(struct rbd_image_header *header,
u64 ofs, u64 len)
{
u64 start_seg = ofs >> header->obj_order;
u64 end_seg = (ofs + len - 1) >> header->obj_order;
return end_seg - start_seg + 1;
}
/*
* bio helpers
*/
......@@ -735,6 +774,50 @@ static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
kfree(ops);
}
static void rbd_coll_end_req_index(struct request *rq,
struct rbd_req_coll *coll,
int index,
int ret, u64 len)
{
struct request_queue *q;
int min, max, i;
dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
coll, index, ret, len);
if (!rq)
return;
if (!coll) {
blk_end_request(rq, ret, len);
return;
}
q = rq->q;
spin_lock_irq(q->queue_lock);
coll->status[index].done = 1;
coll->status[index].rc = ret;
coll->status[index].bytes = len;
max = min = coll->num_done;
while (max < coll->total && coll->status[max].done)
max++;
for (i = min; i<max; i++) {
__blk_end_request(rq, coll->status[i].rc,
coll->status[i].bytes);
coll->num_done++;
kref_put(&coll->kref, rbd_coll_release);
}
spin_unlock_irq(q->queue_lock);
}
static void rbd_coll_end_req(struct rbd_request *req,
int ret, u64 len)
{
rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
}
/*
* Send ceph osd request
*/
......@@ -749,6 +832,8 @@ static int rbd_do_request(struct request *rq,
int flags,
struct ceph_osd_req_op *ops,
int num_reply,
struct rbd_req_coll *coll,
int coll_index,
void (*rbd_cb)(struct ceph_osd_request *req,
struct ceph_msg *msg),
struct ceph_osd_request **linger_req,
......@@ -763,12 +848,20 @@ static int rbd_do_request(struct request *rq,
struct ceph_osd_request_head *reqhead;
struct rbd_image_header *header = &dev->header;
ret = -ENOMEM;
req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
if (!req_data)
goto done;
if (!req_data) {
if (coll)
rbd_coll_end_req_index(rq, coll, coll_index,
-ENOMEM, len);
return -ENOMEM;
}
if (coll) {
req_data->coll = coll;
req_data->coll_index = coll_index;
}
dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
down_read(&header->snap_rwsem);
......@@ -828,7 +921,8 @@ static int rbd_do_request(struct request *rq,
ret = ceph_osdc_wait_request(&dev->client->osdc, req);
if (ver)
*ver = le64_to_cpu(req->r_reassert_version.version);
dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
dout("reassert_ver=%lld\n",
le64_to_cpu(req->r_reassert_version.version));
ceph_osdc_put_request(req);
}
return ret;
......@@ -837,10 +931,8 @@ static int rbd_do_request(struct request *rq,
bio_chain_put(req_data->bio);
ceph_osdc_put_request(req);
done_pages:
rbd_coll_end_req(req_data, ret, len);
kfree(req_data);
done:
if (rq)
blk_end_request(rq, ret, len);
return ret;
}
......@@ -874,7 +966,7 @@ static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
bytes = req_data->len;
}
blk_end_request(req_data->rq, rc, bytes);
rbd_coll_end_req(req_data, rc, bytes);
if (req_data->bio)
bio_chain_put(req_data->bio);
......@@ -934,6 +1026,7 @@ static int rbd_req_sync_op(struct rbd_device *dev,
flags,
ops,
2,
NULL, 0,
NULL,
linger_req, ver);
if (ret < 0)
......@@ -959,7 +1052,9 @@ static int rbd_do_op(struct request *rq,
u64 snapid,
int opcode, int flags, int num_reply,
u64 ofs, u64 len,
struct bio *bio)
struct bio *bio,
struct rbd_req_coll *coll,
int coll_index)
{
char *seg_name;
u64 seg_ofs;
......@@ -995,6 +1090,7 @@ static int rbd_do_op(struct request *rq,
flags,
ops,
num_reply,
coll, coll_index,
rbd_req_cb, 0, NULL);
rbd_destroy_ops(ops);
......@@ -1010,13 +1106,15 @@ static int rbd_req_write(struct request *rq,
struct rbd_device *rbd_dev,
struct ceph_snap_context *snapc,
u64 ofs, u64 len,
struct bio *bio)
struct bio *bio,
struct rbd_req_coll *coll,
int coll_index)
{
return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
CEPH_OSD_OP_WRITE,
CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
2,
ofs, len, bio);
ofs, len, bio, coll, coll_index);
}
/*
......@@ -1026,14 +1124,16 @@ static int rbd_req_read(struct request *rq,
struct rbd_device *rbd_dev,
u64 snapid,
u64 ofs, u64 len,
struct bio *bio)
struct bio *bio,
struct rbd_req_coll *coll,
int coll_index)
{
return rbd_do_op(rq, rbd_dev, NULL,
(snapid ? snapid : CEPH_NOSNAP),
CEPH_OSD_OP_READ,
CEPH_OSD_FLAG_READ,
2,
ofs, len, bio);
ofs, len, bio, coll, coll_index);
}
/*
......@@ -1081,6 +1181,7 @@ static int rbd_req_sync_notify_ack(struct rbd_device *dev,
CEPH_OSD_FLAG_READ,
ops,
1,
NULL, 0,
rbd_simple_req_cb, 0, NULL);
rbd_destroy_ops(ops);
......@@ -1278,6 +1379,20 @@ static int rbd_req_sync_exec(struct rbd_device *dev,
return ret;
}
static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
{
struct rbd_req_coll *coll =
kzalloc(sizeof(struct rbd_req_coll) +
sizeof(struct rbd_req_status) * num_reqs,
GFP_ATOMIC);
if (!coll)
return NULL;
coll->total = num_reqs;
kref_init(&coll->kref);
return coll;
}
/*
* block device queue callback
*/
......@@ -1295,6 +1410,8 @@ static void rbd_rq_fn(struct request_queue *q)
bool do_write;
int size, op_size = 0;
u64 ofs;
int num_segs, cur_seg = 0;
struct rbd_req_coll *coll;
/* peek at request from block layer */
if (!rq)
......@@ -1325,6 +1442,14 @@ static void rbd_rq_fn(struct request_queue *q)
do_write ? "write" : "read",
size, blk_rq_pos(rq) * 512ULL);
num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
coll = rbd_alloc_coll(num_segs);
if (!coll) {
spin_lock_irq(q->queue_lock);
__blk_end_request_all(rq, -ENOMEM);
goto next;
}
do {
/* a bio clone to be passed down to OSD req */
dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
......@@ -1332,35 +1457,41 @@ static void rbd_rq_fn(struct request_queue *q)
rbd_dev->header.block_name,
ofs, size,
NULL, NULL);
kref_get(&coll->kref);
bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
op_size, GFP_ATOMIC);
if (!bio) {
spin_lock_irq(q->queue_lock);
__blk_end_request_all(rq, -ENOMEM);
goto next;
rbd_coll_end_req_index(rq, coll, cur_seg,
-ENOMEM, op_size);
goto next_seg;
}
/* init OSD command: write or read */
if (do_write)
rbd_req_write(rq, rbd_dev,
rbd_dev->header.snapc,
ofs,
op_size, bio);
op_size, bio,
coll, cur_seg);
else
rbd_req_read(rq, rbd_dev,
cur_snap_id(rbd_dev),
ofs,
op_size, bio);
op_size, bio,
coll, cur_seg);
next_seg:
size -= op_size;
ofs += op_size;
cur_seg++;
rq_bio = next_bio;
} while (size > 0);
kref_put(&coll->kref, rbd_coll_release);
if (bp)
bio_pair_release(bp);
spin_lock_irq(q->queue_lock);
next:
rq = blk_fetch_request(q);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment