Commit 5240d9f9 authored by Alex Elder's avatar Alex Elder Committed by Sage Weil

libceph: replace message data pointer with list

In place of the message data pointer, use a list head which links
through message data items.  For now we only support a single entry
on that list.
Signed-off-by: default avatarAlex Elder <elder@inktank.com>
Reviewed-by: default avatarJosh Durgin <josh.durgin@inktank.com>
parent 8ae4f4f5
...@@ -89,6 +89,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) ...@@ -89,6 +89,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
} }
struct ceph_msg_data { struct ceph_msg_data {
struct list_head links; /* ceph_msg->data */
enum ceph_msg_data_type type; enum ceph_msg_data_type type;
union { union {
#ifdef CONFIG_BLOCK #ifdef CONFIG_BLOCK
...@@ -143,7 +144,7 @@ struct ceph_msg { ...@@ -143,7 +144,7 @@ struct ceph_msg {
struct ceph_buffer *middle; struct ceph_buffer *middle;
size_t data_length; size_t data_length;
struct ceph_msg_data *data; struct list_head data;
struct ceph_msg_data_cursor cursor; struct ceph_msg_data_cursor cursor;
struct ceph_connection *con; struct ceph_connection *con;
......
...@@ -985,8 +985,10 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor, ...@@ -985,8 +985,10 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length) static void ceph_msg_data_cursor_init(struct ceph_msg *msg, size_t length)
{ {
struct ceph_msg_data_cursor *cursor = &msg->cursor; struct ceph_msg_data_cursor *cursor = &msg->cursor;
struct ceph_msg_data *data;
cursor->data = msg->data; data = list_first_entry(&msg->data, struct ceph_msg_data, links);
cursor->data = data;
switch (cursor->data->type) { switch (cursor->data->type) {
case CEPH_MSG_DATA_PAGELIST: case CEPH_MSG_DATA_PAGELIST:
ceph_msg_data_pagelist_cursor_init(cursor, length); ceph_msg_data_pagelist_cursor_init(cursor, length);
...@@ -1410,7 +1412,7 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1410,7 +1412,7 @@ static int write_partial_message_data(struct ceph_connection *con)
dout("%s %p msg %p\n", __func__, con, msg); dout("%s %p msg %p\n", __func__, con, msg);
if (WARN_ON(!msg->data)) if (list_empty(&msg->data))
return -EINVAL; return -EINVAL;
/* /*
...@@ -2111,7 +2113,7 @@ static int read_partial_msg_data(struct ceph_connection *con) ...@@ -2111,7 +2113,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
int ret; int ret;
BUG_ON(!msg); BUG_ON(!msg);
if (!msg->data) if (list_empty(&msg->data))
return -EIO; return -EIO;
if (do_datacrc) if (do_datacrc)
...@@ -2963,6 +2965,7 @@ static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) ...@@ -2963,6 +2965,7 @@ static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
data = kzalloc(sizeof (*data), GFP_NOFS); data = kzalloc(sizeof (*data), GFP_NOFS);
if (data) if (data)
data->type = type; data->type = type;
INIT_LIST_HEAD(&data->links);
return data; return data;
} }
...@@ -2972,6 +2975,7 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data) ...@@ -2972,6 +2975,7 @@ static void ceph_msg_data_destroy(struct ceph_msg_data *data)
if (!data) if (!data)
return; return;
WARN_ON(!list_empty(&data->links));
if (data->type == CEPH_MSG_DATA_PAGELIST) { if (data->type == CEPH_MSG_DATA_PAGELIST) {
ceph_pagelist_release(data->pagelist); ceph_pagelist_release(data->pagelist);
kfree(data->pagelist); kfree(data->pagelist);
...@@ -2987,7 +2991,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, ...@@ -2987,7 +2991,7 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
BUG_ON(!pages); BUG_ON(!pages);
BUG_ON(!length); BUG_ON(!length);
BUG_ON(msg->data_length); BUG_ON(msg->data_length);
BUG_ON(msg->data != NULL); BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES); data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
BUG_ON(!data); BUG_ON(!data);
...@@ -2995,8 +2999,9 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages, ...@@ -2995,8 +2999,9 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
data->length = length; data->length = length;
data->alignment = alignment & ~PAGE_MASK; data->alignment = alignment & ~PAGE_MASK;
msg->data = data; BUG_ON(!list_empty(&msg->data));
msg->data_length = length; list_add_tail(&data->links, &msg->data);
msg->data_length += length;
} }
EXPORT_SYMBOL(ceph_msg_data_set_pages); EXPORT_SYMBOL(ceph_msg_data_set_pages);
...@@ -3008,14 +3013,14 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg, ...@@ -3008,14 +3013,14 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
BUG_ON(!pagelist); BUG_ON(!pagelist);
BUG_ON(!pagelist->length); BUG_ON(!pagelist->length);
BUG_ON(msg->data_length); BUG_ON(msg->data_length);
BUG_ON(msg->data != NULL); BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST); data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
BUG_ON(!data); BUG_ON(!data);
data->pagelist = pagelist; data->pagelist = pagelist;
msg->data = data; list_add_tail(&data->links, &msg->data);
msg->data_length = pagelist->length; msg->data_length += pagelist->length;
} }
EXPORT_SYMBOL(ceph_msg_data_set_pagelist); EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
...@@ -3027,15 +3032,15 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio, ...@@ -3027,15 +3032,15 @@ void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio,
BUG_ON(!bio); BUG_ON(!bio);
BUG_ON(msg->data_length); BUG_ON(msg->data_length);
BUG_ON(msg->data != NULL); BUG_ON(!list_empty(&msg->data));
data = ceph_msg_data_create(CEPH_MSG_DATA_BIO); data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
BUG_ON(!data); BUG_ON(!data);
data->bio = bio; data->bio = bio;
data->bio_length = length; data->bio_length = length;
msg->data = data; list_add_tail(&data->links, &msg->data);
msg->data_length = length; msg->data_length += length;
} }
EXPORT_SYMBOL(ceph_msg_data_set_bio); EXPORT_SYMBOL(ceph_msg_data_set_bio);
#endif /* CONFIG_BLOCK */ #endif /* CONFIG_BLOCK */
...@@ -3059,6 +3064,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags, ...@@ -3059,6 +3064,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
INIT_LIST_HEAD(&m->list_head); INIT_LIST_HEAD(&m->list_head);
kref_init(&m->kref); kref_init(&m->kref);
INIT_LIST_HEAD(&m->data);
/* front */ /* front */
m->front_max = front_len; m->front_max = front_len;
...@@ -3204,6 +3210,9 @@ void ceph_msg_kfree(struct ceph_msg *m) ...@@ -3204,6 +3210,9 @@ void ceph_msg_kfree(struct ceph_msg *m)
void ceph_msg_last_put(struct kref *kref) void ceph_msg_last_put(struct kref *kref)
{ {
struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
LIST_HEAD(data);
struct list_head *links;
struct list_head *next;
dout("ceph_msg_put last one on %p\n", m); dout("ceph_msg_put last one on %p\n", m);
WARN_ON(!list_empty(&m->list_head)); WARN_ON(!list_empty(&m->list_head));
...@@ -3213,8 +3222,15 @@ void ceph_msg_last_put(struct kref *kref) ...@@ -3213,8 +3222,15 @@ void ceph_msg_last_put(struct kref *kref)
ceph_buffer_put(m->middle); ceph_buffer_put(m->middle);
m->middle = NULL; m->middle = NULL;
} }
ceph_msg_data_destroy(m->data);
m->data = NULL; list_splice_init(&m->data, &data);
list_for_each_safe(links, next, &data) {
struct ceph_msg_data *data;
data = list_entry(links, struct ceph_msg_data, links);
list_del_init(links);
ceph_msg_data_destroy(data);
}
m->data_length = 0; m->data_length = 0;
if (m->pool) if (m->pool)
...@@ -3227,7 +3243,7 @@ EXPORT_SYMBOL(ceph_msg_last_put); ...@@ -3227,7 +3243,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
void ceph_msg_dump(struct ceph_msg *msg) void ceph_msg_dump(struct ceph_msg *msg)
{ {
pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
msg->front_max, msg->data->length); msg->front_max, msg->data_length);
print_hex_dump(KERN_DEBUG, "header: ", print_hex_dump(KERN_DEBUG, "header: ",
DUMP_PREFIX_OFFSET, 16, 1, DUMP_PREFIX_OFFSET, 16, 1,
&msg->hdr, sizeof(msg->hdr), true); &msg->hdr, sizeof(msg->hdr), true);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment