Commit c99d2d4a authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

libceph: specify osd op by index in request

An osd request now holds all of its source op structures, and every
place that initializes one of these is in fact initializing one
of the entries in the the osd request's array.

So rather than supplying the address of the op to initialize, have
caller specify the osd request and an indication of which op it
would like to initialize.  This better hides the details the
op structure (and faciltates moving the data pointers they use).

Since osd_req_op_init() is a common routine, and it's not used
outside the osd client code, give it static scope.  Also make
it return the address of the specified op (so all the other
init routines don't have to repeat that code).
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent 8c042b0d
......@@ -1336,16 +1336,17 @@ static void rbd_osd_req_format_op(struct rbd_obj_request *obj_request,
snap_id = img_request->snap_id;
}
if (obj_request->type != OBJ_REQUEST_NODATA) {
struct ceph_osd_req_op *op = &obj_request->osd_req->r_ops[0];
/*
* If it has data, it's either a object class method
* call (cls) or it's an extent operation.
*/
if (op->op == CEPH_OSD_OP_CALL)
osd_req_op_cls_response_data(op, osd_data);
/* XXX This use of the ops array goes away in the next patch */
if (obj_request->osd_req->r_ops[0].op == CEPH_OSD_OP_CALL)
osd_req_op_cls_response_data(obj_request->osd_req, 0,
osd_data);
else
osd_req_op_extent_osd_data(op, osd_data);
osd_req_op_extent_osd_data(obj_request->osd_req, 0,
osd_data);
}
ceph_osdc_build_request(osd_req, obj_request->offset,
snapc, snap_id, mtime);
......@@ -1577,7 +1578,6 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
while (resid) {
const char *object_name;
unsigned int clone_size;
struct ceph_osd_req_op *op;
u64 offset;
u64 length;
......@@ -1606,8 +1606,8 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
if (!obj_request->osd_req)
goto out_partial;
op = &obj_request->osd_req->r_ops[0];
osd_req_op_extent_init(op, opcode, offset, length, 0, 0);
osd_req_op_extent_init(obj_request->osd_req, 0,
opcode, offset, length, 0, 0);
rbd_osd_req_format_op(obj_request, write_request);
/* status and version are initially zero-filled */
......@@ -1710,7 +1710,6 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
u64 ver, u64 notify_id)
{
struct rbd_obj_request *obj_request;
struct ceph_osd_req_op *op;
struct ceph_osd_client *osdc;
int ret;
......@@ -1724,8 +1723,8 @@ static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
if (!obj_request->osd_req)
goto out;
op = &obj_request->osd_req->r_ops[0];
osd_req_op_watch_init(op, CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0);
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
notify_id, ver, 0);
rbd_osd_req_format_op(obj_request, false);
osdc = &rbd_dev->rbd_client->client->osdc;
......@@ -1766,7 +1765,6 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
{
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
struct ceph_osd_req_op *op;
int ret;
rbd_assert(start ^ !!rbd_dev->watch_event);
......@@ -1790,8 +1788,7 @@ static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
if (!obj_request->osd_req)
goto out_cancel;
op = &obj_request->osd_req->r_ops[0];
osd_req_op_watch_init(op, CEPH_OSD_OP_WATCH,
osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
rbd_dev->watch_event->cookie,
rbd_dev->header.obj_version, start);
rbd_osd_req_format_op(obj_request, true);
......@@ -1854,7 +1851,6 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
{
struct rbd_obj_request *obj_request;
struct ceph_osd_client *osdc;
struct ceph_osd_req_op *op;
struct page **pages;
u32 page_count;
int ret;
......@@ -1884,8 +1880,8 @@ static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
if (!obj_request->osd_req)
goto out;
op = &obj_request->osd_req->r_ops[0];
osd_req_op_cls_init(op, CEPH_OSD_OP_CALL, class_name, method_name,
osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
class_name, method_name,
outbound, outbound_size);
rbd_osd_req_format_op(obj_request, false);
......@@ -2066,7 +2062,6 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
{
struct rbd_obj_request *obj_request;
struct ceph_osd_req_op *op;
struct ceph_osd_client *osdc;
struct page **pages = NULL;
u32 page_count;
......@@ -2091,8 +2086,8 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
if (!obj_request->osd_req)
goto out;
op = &obj_request->osd_req->r_ops[0];
osd_req_op_extent_init(op, CEPH_OSD_OP_READ, offset, length, 0, 0);
osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
offset, length, 0, 0);
rbd_osd_req_format_op(obj_request, false);
osdc = &rbd_dev->rbd_client->client->osdc;
......
......@@ -926,7 +926,7 @@ static int ceph_writepages_start(struct address_space *mapping,
/* Update the write op length in case we changed it */
osd_req_op_extent_update(&req->r_ops[0], len);
osd_req_op_extent_update(req, 0, len);
vino = ceph_vino(inode);
ceph_osdc_build_request(req, offset, snapc, vino.snap,
......
......@@ -233,20 +233,25 @@ extern void ceph_osdc_handle_reply(struct ceph_osd_client *osdc,
extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
struct ceph_msg *msg);
extern void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode);
extern void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
extern void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 offset, u64 length,
u64 truncate_size, u32 truncate_seq);
extern void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length);
extern void osd_req_op_extent_osd_data(struct ceph_osd_req_op *op,
extern void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
unsigned int which, u64 length);
extern void osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_osd_data *osd_data);
extern void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
extern void osd_req_op_cls_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
const char *class, const char *method,
const void *request_data,
size_t request_data_size);
extern void osd_req_op_cls_response_data(struct ceph_osd_req_op *op,
extern void osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_osd_data *response_data);
extern void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode,
extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag);
extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
......
......@@ -329,25 +329,32 @@ static bool osd_req_opcode_valid(u16 opcode)
* other information associated with them. It also serves as a
* common init routine for all the other init functions, below.
*/
void osd_req_op_init(struct ceph_osd_req_op *op, u16 opcode)
static struct ceph_osd_req_op *
osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode)
{
struct ceph_osd_req_op *op;
BUG_ON(which >= osd_req->r_num_ops);
BUG_ON(!osd_req_opcode_valid(opcode));
op = &osd_req->r_ops[which];
memset(op, 0, sizeof (*op));
op->op = opcode;
return op;
}
void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 offset, u64 length,
u64 truncate_size, u32 truncate_seq)
{
struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode);
size_t payload_len = 0;
BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
osd_req_op_init(op, opcode);
op->extent.offset = offset;
op->extent.length = length;
op->extent.truncate_size = truncate_size;
......@@ -359,9 +366,15 @@ void osd_req_op_extent_init(struct ceph_osd_req_op *op, u16 opcode,
}
EXPORT_SYMBOL(osd_req_op_extent_init);
void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length)
void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
unsigned int which, u64 length)
{
u64 previous = op->extent.length;
struct ceph_osd_req_op *op;
u64 previous;
BUG_ON(which >= osd_req->r_num_ops);
op = &osd_req->r_ops[which];
previous = op->extent.length;
if (length == previous)
return; /* Nothing to do */
......@@ -372,24 +385,25 @@ void osd_req_op_extent_update(struct ceph_osd_req_op *op, u64 length)
}
EXPORT_SYMBOL(osd_req_op_extent_update);
void osd_req_op_extent_osd_data(struct ceph_osd_req_op *op,
void osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_osd_data *osd_data)
{
op->extent.osd_data = osd_data;
BUG_ON(which >= osd_req->r_num_ops);
osd_req->r_ops[which].extent.osd_data = osd_data;
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data);
void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
const char *class, const char *method,
void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
u16 opcode, const char *class, const char *method,
const void *request_data, size_t request_data_size)
{
struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode);
size_t payload_len = 0;
size_t size;
BUG_ON(opcode != CEPH_OSD_OP_CALL);
osd_req_op_init(op, opcode);
op->cls.class_name = class;
size = strlen(class);
BUG_ON(size > (size_t) U8_MAX);
......@@ -412,26 +426,28 @@ void osd_req_op_cls_init(struct ceph_osd_req_op *op, u16 opcode,
op->payload_len = payload_len;
}
EXPORT_SYMBOL(osd_req_op_cls_init);
void osd_req_op_cls_response_data(struct ceph_osd_req_op *op,
void osd_req_op_cls_response_data(struct ceph_osd_request *osd_req,
unsigned int which,
struct ceph_osd_data *response_data)
{
op->cls.response_data = response_data;
BUG_ON(which >= osd_req->r_num_ops);
osd_req->r_ops[which].cls.response_data = response_data;
}
EXPORT_SYMBOL(osd_req_op_cls_response_data);
void osd_req_op_watch_init(struct ceph_osd_req_op *op, u16 opcode,
void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
unsigned int which, u16 opcode,
u64 cookie, u64 version, int flag)
{
BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which, opcode);
osd_req_op_init(op, opcode);
BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
op->watch.cookie = cookie;
/* op->watch.ver = version; */ /* XXX 3847 */
op->watch.ver = cpu_to_le64(version);
if (opcode == CEPH_OSD_OP_WATCH && flag)
op->watch.flag = (u8) 1;
op->watch.flag = (u8)1;
}
EXPORT_SYMBOL(osd_req_op_watch_init);
......@@ -629,7 +645,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
{
struct ceph_osd_request *req;
struct ceph_osd_data *osd_data;
struct ceph_osd_req_op *op;
u64 objnum = 0;
u64 objoff = 0;
u64 objlen = 0;
......@@ -665,10 +680,9 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
truncate_size = object_size;
}
op = &req->r_ops[0];
osd_req_op_extent_init(op, opcode, objoff, objlen,
osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
truncate_size, truncate_seq);
osd_req_op_extent_osd_data(op, osd_data);
osd_req_op_extent_osd_data(req, 0, osd_data);
/*
* A second op in the ops array means the caller wants to
......@@ -676,7 +690,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
* osd will flush data quickly.
*/
if (num_ops > 1)
osd_req_op_init(++op, CEPH_OSD_OP_STARTSYNC);
osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC);
req->r_file_layout = *layout; /* keep a copy */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment