Commit 6df058c0 authored by Sage Weil's avatar Sage Weil

ceph: include transaction id in ceph_msg_header (protocol change)

Many (most?) message types include a transaction id.  By including it in
the fixed size header, we always have it available even when we are unable
to allocate memory for the (larger, variable sized) message body.  This
will allow us to error out the appropriate request instead of (silently)
dropping the reply.
Signed-off-by: default avatarSage Weil <sage@newdream.net>
parent 0cf90ab5
...@@ -922,14 +922,14 @@ static int send_cap_msg(struct ceph_mds_session *session, ...@@ -922,14 +922,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
if (IS_ERR(msg)) if (IS_ERR(msg))
return PTR_ERR(msg); return PTR_ERR(msg);
fc = msg->front.iov_base; msg->hdr.tid = cpu_to_le64(flush_tid);
fc = msg->front.iov_base;
memset(fc, 0, sizeof(*fc)); memset(fc, 0, sizeof(*fc));
fc->cap_id = cpu_to_le64(cid); fc->cap_id = cpu_to_le64(cid);
fc->op = cpu_to_le32(op); fc->op = cpu_to_le32(op);
fc->seq = cpu_to_le32(seq); fc->seq = cpu_to_le32(seq);
fc->client_tid = cpu_to_le64(flush_tid);
fc->issue_seq = cpu_to_le32(issue_seq); fc->issue_seq = cpu_to_le32(issue_seq);
fc->migrate_seq = cpu_to_le32(mseq); fc->migrate_seq = cpu_to_le32(mseq);
fc->caps = cpu_to_le32(caps); fc->caps = cpu_to_le32(caps);
...@@ -2329,7 +2329,7 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant, ...@@ -2329,7 +2329,7 @@ static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
* Handle FLUSH_ACK from MDS, indicating that metadata we sent to the * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
* MDS has been safely committed. * MDS has been safely committed.
*/ */
static void handle_cap_flush_ack(struct inode *inode, static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
struct ceph_mds_caps *m, struct ceph_mds_caps *m,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_cap *cap) struct ceph_cap *cap)
...@@ -2340,7 +2340,6 @@ static void handle_cap_flush_ack(struct inode *inode, ...@@ -2340,7 +2340,6 @@ static void handle_cap_flush_ack(struct inode *inode,
unsigned seq = le32_to_cpu(m->seq); unsigned seq = le32_to_cpu(m->seq);
int dirty = le32_to_cpu(m->dirty); int dirty = le32_to_cpu(m->dirty);
int cleaned = 0; int cleaned = 0;
u64 flush_tid = le64_to_cpu(m->client_tid);
int drop = 0; int drop = 0;
int i; int i;
...@@ -2396,13 +2395,12 @@ static void handle_cap_flush_ack(struct inode *inode, ...@@ -2396,13 +2395,12 @@ static void handle_cap_flush_ack(struct inode *inode,
* *
* Caller hold s_mutex. * Caller hold s_mutex.
*/ */
static void handle_cap_flushsnap_ack(struct inode *inode, static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
struct ceph_mds_caps *m, struct ceph_mds_caps *m,
struct ceph_mds_session *session) struct ceph_mds_session *session)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
u64 follows = le64_to_cpu(m->snap_follows); u64 follows = le64_to_cpu(m->snap_follows);
u64 flush_tid = le64_to_cpu(m->client_tid);
struct ceph_cap_snap *capsnap; struct ceph_cap_snap *capsnap;
int drop = 0; int drop = 0;
...@@ -2587,12 +2585,14 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -2587,12 +2585,14 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_vino vino; struct ceph_vino vino;
u64 cap_id; u64 cap_id;
u64 size, max_size; u64 size, max_size;
u64 tid;
int check_caps = 0; int check_caps = 0;
int r; int r;
dout("handle_caps from mds%d\n", mds); dout("handle_caps from mds%d\n", mds);
/* decode */ /* decode */
tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len < sizeof(*h)) if (msg->front.iov_len < sizeof(*h))
goto bad; goto bad;
h = msg->front.iov_base; h = msg->front.iov_base;
...@@ -2621,7 +2621,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -2621,7 +2621,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
/* these will work even if we don't have a cap yet */ /* these will work even if we don't have a cap yet */
switch (op) { switch (op) {
case CEPH_CAP_OP_FLUSHSNAP_ACK: case CEPH_CAP_OP_FLUSHSNAP_ACK:
handle_cap_flushsnap_ack(inode, h, session); handle_cap_flushsnap_ack(inode, tid, h, session);
goto done; goto done;
case CEPH_CAP_OP_EXPORT: case CEPH_CAP_OP_EXPORT:
...@@ -2662,7 +2662,7 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -2662,7 +2662,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
break; break;
case CEPH_CAP_OP_FLUSH_ACK: case CEPH_CAP_OP_FLUSH_ACK:
handle_cap_flush_ack(inode, h, session, cap); handle_cap_flush_ack(inode, tid, h, session, cap);
break; break;
case CEPH_CAP_OP_TRUNC: case CEPH_CAP_OP_TRUNC:
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
* internal cluster protocols separately from the public, * internal cluster protocols separately from the public,
* client-facing protocol. * client-facing protocol.
*/ */
#define CEPH_OSD_PROTOCOL 7 /* cluster internal */ #define CEPH_OSD_PROTOCOL 8 /* cluster internal */
#define CEPH_MDS_PROTOCOL 9 /* cluster internal */ #define CEPH_MDS_PROTOCOL 9 /* cluster internal */
#define CEPH_MON_PROTOCOL 5 /* cluster internal */ #define CEPH_MON_PROTOCOL 5 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 22 /* server/client */ #define CEPH_OSDC_PROTOCOL 22 /* server/client */
...@@ -136,7 +136,6 @@ struct ceph_mon_request_header { ...@@ -136,7 +136,6 @@ struct ceph_mon_request_header {
struct ceph_mon_statfs { struct ceph_mon_statfs {
struct ceph_mon_request_header monhdr; struct ceph_mon_request_header monhdr;
struct ceph_fsid fsid; struct ceph_fsid fsid;
__le64 tid;
} __attribute__ ((packed)); } __attribute__ ((packed));
struct ceph_statfs { struct ceph_statfs {
...@@ -146,7 +145,6 @@ struct ceph_statfs { ...@@ -146,7 +145,6 @@ struct ceph_statfs {
struct ceph_mon_statfs_reply { struct ceph_mon_statfs_reply {
struct ceph_fsid fsid; struct ceph_fsid fsid;
__le64 tid;
__le64 version; __le64 version;
struct ceph_statfs st; struct ceph_statfs st;
} __attribute__ ((packed)); } __attribute__ ((packed));
...@@ -333,7 +331,7 @@ union ceph_mds_request_args { ...@@ -333,7 +331,7 @@ union ceph_mds_request_args {
#define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */ #define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */
struct ceph_mds_request_head { struct ceph_mds_request_head {
__le64 tid, oldest_client_tid; __le64 oldest_client_tid;
__le32 mdsmap_epoch; /* on client */ __le32 mdsmap_epoch; /* on client */
__le32 flags; /* CEPH_MDS_FLAG_* */ __le32 flags; /* CEPH_MDS_FLAG_* */
__u8 num_retry, num_fwd; /* count retry, fwd attempts */ __u8 num_retry, num_fwd; /* count retry, fwd attempts */
...@@ -356,7 +354,6 @@ struct ceph_mds_request_release { ...@@ -356,7 +354,6 @@ struct ceph_mds_request_release {
/* client reply */ /* client reply */
struct ceph_mds_reply_head { struct ceph_mds_reply_head {
__le64 tid;
__le32 op; __le32 op;
__le32 result; __le32 result;
__le32 mdsmap_epoch; __le32 mdsmap_epoch;
...@@ -542,7 +539,6 @@ struct ceph_mds_caps { ...@@ -542,7 +539,6 @@ struct ceph_mds_caps {
__le32 migrate_seq; __le32 migrate_seq;
__le64 snap_follows; __le64 snap_follows;
__le32 snap_trace_len; __le32 snap_trace_len;
__le64 client_tid; /* for FLUSH(SNAP) -> FLUSH(SNAP)_ACK */
/* authlock */ /* authlock */
__le32 uid, gid, mode; __le32 uid, gid, mode;
......
...@@ -1339,6 +1339,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ...@@ -1339,6 +1339,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
if (IS_ERR(msg)) if (IS_ERR(msg))
goto out_free2; goto out_free2;
msg->hdr.tid = cpu_to_le64(req->r_tid);
head = msg->front.iov_base; head = msg->front.iov_base;
p = msg->front.iov_base + sizeof(*head); p = msg->front.iov_base + sizeof(*head);
end = msg->front.iov_base + msg->front.iov_len; end = msg->front.iov_base + msg->front.iov_len;
...@@ -1431,7 +1433,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc, ...@@ -1431,7 +1433,6 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
req->r_request = msg; req->r_request = msg;
rhead = msg->front.iov_base; rhead = msg->front.iov_base;
rhead->tid = cpu_to_le64(req->r_tid);
rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc)); rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
if (req->r_got_unsafe) if (req->r_got_unsafe)
flags |= CEPH_MDS_FLAG_REPLAY; flags |= CEPH_MDS_FLAG_REPLAY;
...@@ -1664,7 +1665,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -1664,7 +1665,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
} }
/* get request, session */ /* get request, session */
tid = le64_to_cpu(head->tid); tid = le64_to_cpu(msg->hdr.tid);
mutex_lock(&mdsc->mutex); mutex_lock(&mdsc->mutex);
req = __lookup_request(mdsc, tid); req = __lookup_request(mdsc, tid);
if (!req) { if (!req) {
......
...@@ -349,7 +349,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc, ...@@ -349,7 +349,7 @@ static void handle_statfs_reply(struct ceph_mon_client *monc,
if (msg->front.iov_len != sizeof(*reply)) if (msg->front.iov_len != sizeof(*reply))
goto bad; goto bad;
tid = le64_to_cpu(reply->tid); tid = le64_to_cpu(msg->hdr.tid);
dout("handle_statfs_reply %p tid %llu\n", msg, tid); dout("handle_statfs_reply %p tid %llu\n", msg, tid);
mutex_lock(&monc->mutex); mutex_lock(&monc->mutex);
...@@ -382,12 +382,12 @@ static int send_statfs(struct ceph_mon_client *monc, ...@@ -382,12 +382,12 @@ static int send_statfs(struct ceph_mon_client *monc,
if (IS_ERR(msg)) if (IS_ERR(msg))
return PTR_ERR(msg); return PTR_ERR(msg);
req->request = msg; req->request = msg;
msg->hdr.tid = cpu_to_le64(req->tid);
h = msg->front.iov_base; h = msg->front.iov_base;
h->monhdr.have_version = 0; h->monhdr.have_version = 0;
h->monhdr.session_mon = cpu_to_le16(-1); h->monhdr.session_mon = cpu_to_le16(-1);
h->monhdr.session_mon_tid = 0; h->monhdr.session_mon_tid = 0;
h->fsid = monc->monmap->fsid; h->fsid = monc->monmap->fsid;
h->tid = cpu_to_le64(req->tid);
ceph_con_send(monc->con, msg); ceph_con_send(monc->con, msg);
return 0; return 0;
} }
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
* whenever the wire protocol changes. try to keep this string length * whenever the wire protocol changes. try to keep this string length
* constant. * constant.
*/ */
#define CEPH_BANNER "ceph v024" #define CEPH_BANNER "ceph v025"
#define CEPH_BANNER_MAX_LEN 30 #define CEPH_BANNER_MAX_LEN 30
...@@ -132,6 +132,7 @@ struct ceph_msg_connect_reply { ...@@ -132,6 +132,7 @@ struct ceph_msg_connect_reply {
*/ */
struct ceph_msg_header { struct ceph_msg_header {
__le64 seq; /* message seq# for this session */ __le64 seq; /* message seq# for this session */
__le64 tid; /* transaction id */
__le16 type; /* message type */ __le16 type; /* message type */
__le16 priority; /* priority. higher value == higher priority */ __le16 priority; /* priority. higher value == higher priority */
__le16 version; /* version of message encoding */ __le16 version; /* version of message encoding */
......
...@@ -439,11 +439,9 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) ...@@ -439,11 +439,9 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
static void register_request(struct ceph_osd_client *osdc, static void register_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req) struct ceph_osd_request *req)
{ {
struct ceph_osd_request_head *head = req->r_request->front.iov_base;
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
req->r_tid = ++osdc->last_tid; req->r_tid = ++osdc->last_tid;
head->tid = cpu_to_le64(req->r_tid); req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
dout("register_request %p tid %lld\n", req, req->r_tid); dout("register_request %p tid %lld\n", req, req->r_tid);
__insert_request(osdc, req); __insert_request(osdc, req);
...@@ -702,9 +700,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg, ...@@ -702,9 +700,9 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
u64 tid; u64 tid;
int numops, object_len, flags; int numops, object_len, flags;
tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len < sizeof(*rhead)) if (msg->front.iov_len < sizeof(*rhead))
goto bad; goto bad;
tid = le64_to_cpu(rhead->tid);
numops = le32_to_cpu(rhead->num_ops); numops = le32_to_cpu(rhead->num_ops);
object_len = le32_to_cpu(rhead->object_len); object_len = le32_to_cpu(rhead->object_len);
if (msg->front.iov_len != sizeof(*rhead) + object_len + if (msg->front.iov_len != sizeof(*rhead) + object_len +
...@@ -1002,7 +1000,6 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m, ...@@ -1002,7 +1000,6 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
{ {
struct ceph_osd *osd = con->private; struct ceph_osd *osd = con->private;
struct ceph_osd_client *osdc; struct ceph_osd_client *osdc;
struct ceph_osd_reply_head *rhead = m->front.iov_base;
struct ceph_osd_request *req; struct ceph_osd_request *req;
u64 tid; u64 tid;
int ret = -1; int ret = -1;
...@@ -1016,7 +1013,7 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m, ...@@ -1016,7 +1013,7 @@ static int prepare_pages(struct ceph_connection *con, struct ceph_msg *m,
if (unlikely(type != CEPH_MSG_OSD_OPREPLY)) if (unlikely(type != CEPH_MSG_OSD_OPREPLY))
return -1; /* hmm! */ return -1; /* hmm! */
tid = le64_to_cpu(rhead->tid); tid = le64_to_cpu(m->hdr.tid);
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
req = __lookup_request(osdc, tid); req = __lookup_request(osdc, tid);
if (!req) { if (!req) {
......
...@@ -331,7 +331,6 @@ struct ceph_osd_op { ...@@ -331,7 +331,6 @@ struct ceph_osd_op {
* ceph_osd_op object operations. * ceph_osd_op object operations.
*/ */
struct ceph_osd_request_head { struct ceph_osd_request_head {
__le64 tid; /* transaction id */
__le32 client_inc; /* client incarnation */ __le32 client_inc; /* client incarnation */
struct ceph_object_layout layout; /* pgid */ struct ceph_object_layout layout; /* pgid */
__le32 osdmap_epoch; /* client's osdmap epoch */ __le32 osdmap_epoch; /* client's osdmap epoch */
...@@ -352,7 +351,6 @@ struct ceph_osd_request_head { ...@@ -352,7 +351,6 @@ struct ceph_osd_request_head {
} __attribute__ ((packed)); } __attribute__ ((packed));
struct ceph_osd_reply_head { struct ceph_osd_reply_head {
__le64 tid; /* transaction id */
__le32 client_inc; /* client incarnation */ __le32 client_inc; /* client incarnation */
__le32 flags; __le32 flags;
struct ceph_object_layout layout; struct ceph_object_layout layout;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment