Commit a2971c8c authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: send TID of the oldest pending caps flush to MDS

According to this information, MDS can trim its completed caps flush
list (which is used to detect duplicated cap flush).
Signed-off-by: default avatarYan, Zheng <zyan@redhat.com>
parent 8310b089
......@@ -986,8 +986,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
static int send_cap_msg(struct ceph_mds_session *session,
u64 ino, u64 cid, int op,
int caps, int wanted, int dirty,
u32 seq, u64 flush_tid, u32 issue_seq, u32 mseq,
u64 size, u64 max_size,
u32 seq, u64 flush_tid, u64 oldest_flush_tid,
u32 issue_seq, u32 mseq, u64 size, u64 max_size,
struct timespec *mtime, struct timespec *atime,
u64 time_warp_seq,
kuid_t uid, kgid_t gid, umode_t mode,
......@@ -1001,20 +1001,23 @@ static int send_cap_msg(struct ceph_mds_session *session,
size_t extra_len;
dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
" seq %u/%u mseq %u follows %lld size %llu/%llu"
" seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
" xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op),
cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
ceph_cap_string(dirty),
seq, issue_seq, mseq, follows, size, max_size,
seq, issue_seq, flush_tid, oldest_flush_tid,
mseq, follows, size, max_size,
xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
/* flock buffer size + inline version + inline data size */
extra_len = 4 + 8 + 4;
/* flock buffer size + inline version + inline data size +
* osd_epoch_barrier + oldest_flush_tid */
extra_len = 4 + 8 + 4 + 4 + 8;
msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
GFP_NOFS, false);
if (!msg)
return -ENOMEM;
msg->hdr.version = cpu_to_le16(6);
msg->hdr.tid = cpu_to_le64(flush_tid);
fc = msg->front.iov_base;
......@@ -1050,6 +1053,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
/* inline data size */
ceph_encode_32(&p, 0);
/* osd_epoch_barrier */
ceph_encode_32(&p, 0);
/* oldest_flush_tid */
ceph_encode_64(&p, oldest_flush_tid);
fc->xattr_version = cpu_to_le64(xattr_version);
if (xattrs_buf) {
......@@ -1098,7 +1105,7 @@ void ceph_queue_caps_release(struct inode *inode)
*/
static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
int op, int used, int want, int retain, int flushing,
u64 flush_tid)
u64 flush_tid, u64 oldest_flush_tid)
__releases(cap->ci->i_ceph_lock)
{
struct ceph_inode_info *ci = cap->ci;
......@@ -1187,7 +1194,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
spin_unlock(&ci->i_ceph_lock);
ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
op, keep, want, flushing, seq,
flush_tid, oldest_flush_tid, issue_seq, mseq,
size, max_size, &mtime, &atime, time_warp_seq,
uid, gid, mode, xattr_version, xattr_blob,
follows, inline_data);
......@@ -1307,8 +1315,8 @@ void __ceph_flush_snaps(struct ceph_inode_info *ci,
inode, capsnap, capsnap->follows, capsnap->flush_tid);
send_cap_msg(session, ceph_vino(inode).ino, 0,
CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
capsnap->dirty, 0, capsnap->flush_tid, 0, mseq,
capsnap->size, 0,
capsnap->dirty, 0, capsnap->flush_tid, 0,
0, mseq, capsnap->size, 0,
&capsnap->mtime, &capsnap->atime,
capsnap->time_warp_seq,
capsnap->uid, capsnap->gid, capsnap->mode,
......@@ -1438,6 +1446,17 @@ static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc,
rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree);
}
static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
{
struct rb_node *n = rb_first(&mdsc->cap_flush_tree);
if (n) {
struct ceph_cap_flush *cf =
rb_entry(n, struct ceph_cap_flush, g_node);
return cf->tid;
}
return 0;
}
/*
* Add dirty inode to the flushing list. Assigned a seq number so we
* can wait for caps to flush without starving.
......@@ -1446,7 +1465,7 @@ static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc,
*/
static int __mark_caps_flushing(struct inode *inode,
struct ceph_mds_session *session,
u64 *flush_tid)
u64 *flush_tid, u64 *oldest_flush_tid)
{
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
......@@ -1473,6 +1492,7 @@ static int __mark_caps_flushing(struct inode *inode,
cf->tid = ++mdsc->last_cap_flush_tid;
__add_cap_flushing_to_mdsc(mdsc, cf);
*oldest_flush_tid = __get_oldest_flush_tid(mdsc);
if (list_empty(&ci->i_flushing_item)) {
list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
......@@ -1533,7 +1553,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct ceph_mds_client *mdsc = fsc->mdsc;
struct inode *inode = &ci->vfs_inode;
struct ceph_cap *cap;
u64 flush_tid;
u64 flush_tid, oldest_flush_tid;
int file_wanted, used, cap_used;
int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */
int issued, implemented, want, retain, revoking, flushing = 0;
......@@ -1754,10 +1774,14 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
flushing = __mark_caps_flushing(inode, session,
&flush_tid);
&flush_tid,
&oldest_flush_tid);
} else {
flushing = 0;
flush_tid = 0;
spin_lock(&mdsc->cap_dirty_lock);
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
spin_unlock(&mdsc->cap_dirty_lock);
}
mds = cap->mds; /* remember mds, so we don't repeat */
......@@ -1765,7 +1789,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
/* __send_cap drops i_ceph_lock */
delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
want, retain, flushing, flush_tid);
want, retain, flushing,
flush_tid, oldest_flush_tid);
goto retry; /* retake i_ceph_lock and restart our cap scan. */
}
......@@ -1800,7 +1825,7 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_session *session = NULL;
int flushing = 0;
u64 flush_tid = 0;
u64 flush_tid = 0, oldest_flush_tid = 0;
retry:
spin_lock(&ci->i_ceph_lock);
......@@ -1825,12 +1850,13 @@ static int try_flush_caps(struct inode *inode, u64 *ptid)
if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
goto out;
flushing = __mark_caps_flushing(inode, session, &flush_tid);
flushing = __mark_caps_flushing(inode, session, &flush_tid,
&oldest_flush_tid);
/* __send_cap drops i_ceph_lock */
delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
(cap->issued | cap->implemented),
flushing, flush_tid);
flushing, flush_tid, oldest_flush_tid);
if (delayed) {
spin_lock(&ci->i_ceph_lock);
......@@ -2083,6 +2109,11 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
struct rb_node *n;
int delayed = 0;
u64 first_tid = 0;
u64 oldest_flush_tid;
spin_lock(&mdsc->cap_dirty_lock);
oldest_flush_tid = __get_oldest_flush_tid(mdsc);
spin_unlock(&mdsc->cap_dirty_lock);
while (true) {
spin_lock(&ci->i_ceph_lock);
......@@ -2113,7 +2144,7 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
__ceph_caps_used(ci),
__ceph_caps_wanted(ci),
cap->issued | cap->implemented,
cf->caps, cf->tid);
cf->caps, cf->tid, oldest_flush_tid);
}
return delayed;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment