Commit e20d258d authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: flush inline version

After converting inline data to normal data, client need to flush
the new i_inline_version (CEPH_INLINE_NONE) to MDS. This commit makes
cap messages (sent to MDS) contain inline_version and inline_data.
Client always converts inline data to normal data before data write,
so the inline data length part is always zero.
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 28127bdd
...@@ -975,10 +975,12 @@ static int send_cap_msg(struct ceph_mds_session *session, ...@@ -975,10 +975,12 @@ static int send_cap_msg(struct ceph_mds_session *session,
kuid_t uid, kgid_t gid, umode_t mode, kuid_t uid, kgid_t gid, umode_t mode,
u64 xattr_version, u64 xattr_version,
struct ceph_buffer *xattrs_buf, struct ceph_buffer *xattrs_buf,
u64 follows) u64 follows, bool inline_data)
{ {
struct ceph_mds_caps *fc; struct ceph_mds_caps *fc;
struct ceph_msg *msg; struct ceph_msg *msg;
void *p;
size_t extra_len;
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
" seq %u/%u mseq %u follows %lld size %llu/%llu" " seq %u/%u mseq %u follows %lld size %llu/%llu"
...@@ -988,7 +990,10 @@ static int send_cap_msg(struct ceph_mds_session *session, ...@@ -988,7 +990,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
seq, issue_seq, mseq, follows, size, max_size, seq, issue_seq, mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false); /* flock buffer size + inline version + inline data size */
extra_len = 4 + 8 + 4;
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
GFP_NOFS, false);
if (!msg) if (!msg)
return -ENOMEM; return -ENOMEM;
...@@ -1020,6 +1025,14 @@ static int send_cap_msg(struct ceph_mds_session *session, ...@@ -1020,6 +1025,14 @@ static int send_cap_msg(struct ceph_mds_session *session,
fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid)); fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
fc->mode = cpu_to_le32(mode); fc->mode = cpu_to_le32(mode);
p = fc + 1;
/* flock buffer size */
ceph_encode_32(&p, 0);
/* inline version */
ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
/* inline data size */
ceph_encode_32(&p, 0);
fc->xattr_version = cpu_to_le64(xattr_version); fc->xattr_version = cpu_to_le64(xattr_version);
if (xattrs_buf) { if (xattrs_buf) {
msg->middle = ceph_buffer_get(xattrs_buf); msg->middle = ceph_buffer_get(xattrs_buf);
...@@ -1126,6 +1139,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ...@@ -1126,6 +1139,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
u64 flush_tid = 0; u64 flush_tid = 0;
int i; int i;
int ret; int ret;
bool inline_data;
held = cap->issued | cap->implemented; held = cap->issued | cap->implemented;
revoking = cap->implemented & ~cap->issued; revoking = cap->implemented & ~cap->issued;
...@@ -1209,13 +1223,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ...@@ -1209,13 +1223,15 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
xattr_version = ci->i_xattrs.version; xattr_version = ci->i_xattrs.version;
} }
inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq, size, max_size, &mtime, &atime, time_warp_seq,
uid, gid, mode, xattr_version, xattr_blob, uid, gid, mode, xattr_version, xattr_blob,
follows); follows, inline_data);
if (ret < 0) { if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode); dout("error sending cap msg, must requeue %p\n", inode);
delayed = 1; delayed = 1;
...@@ -1336,7 +1352,7 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci, ...@@ -1336,7 +1352,7 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
capsnap->time_warp_seq, capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode, capsnap->uid, capsnap->gid, capsnap->mode,
capsnap->xattr_version, capsnap->xattr_blob, capsnap->xattr_version, capsnap->xattr_blob,
capsnap->follows); capsnap->follows, capsnap->inline_data);
next_follows = capsnap->follows + 1; next_follows = capsnap->follows + 1;
ceph_put_cap_snap(capsnap); ceph_put_cap_snap(capsnap);
......
...@@ -516,6 +516,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -516,6 +516,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
capsnap->xattr_version = 0; capsnap->xattr_version = 0;
} }
capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
/* dirty page count moved from _head to this cap_snap; /* dirty page count moved from _head to this cap_snap;
all subsequent writes page dirties occur _after_ this all subsequent writes page dirties occur _after_ this
snapshot. */ snapshot. */
......
...@@ -161,6 +161,7 @@ struct ceph_cap_snap { ...@@ -161,6 +161,7 @@ struct ceph_cap_snap {
u64 time_warp_seq; u64 time_warp_seq;
int writing; /* a sync write is still in progress */ int writing; /* a sync write is still in progress */
int dirty_pages; /* dirty pages awaiting writeback */ int dirty_pages; /* dirty pages awaiting writeback */
bool inline_data;
}; };
static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment