Commit a1c6b835 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: define argument structure for handle_cap_grant

The data structure includes the versioned feilds of cap message.
Signed-off-by: default avatar"Yan, Zheng" <zyan@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 2af54a72
......@@ -3024,24 +3024,32 @@ static void invalidate_aliases(struct inode *inode)
dput(prev);
}
struct cap_extra_info {
struct ceph_string *pool_ns;
/* inline data */
u64 inline_version;
void *inline_data;
u32 inline_len;
/* currently issued */
int issued;
};
/*
* Handle a cap GRANT message from the MDS. (Note that a GRANT may
* actually be a revocation if it specifies a smaller cap set.)
*
* caller holds s_mutex and i_ceph_lock, we drop both.
*/
static void handle_cap_grant(struct ceph_mds_client *mdsc,
struct inode *inode, struct ceph_mds_caps *grant,
struct ceph_string **pns, u64 inline_version,
void *inline_data, u32 inline_len,
struct ceph_buffer *xattr_buf,
static void handle_cap_grant(struct inode *inode,
struct ceph_mds_session *session,
struct ceph_cap *cap, int issued)
struct ceph_cap *cap,
struct ceph_mds_caps *grant,
struct ceph_buffer *xattr_buf,
struct cap_extra_info *extra_info)
__releases(ci->i_ceph_lock)
__releases(mdsc->snap_rwsem)
__releases(session->s_mdsc->snap_rwsem)
{
struct ceph_inode_info *ci = ceph_inode(inode);
int mds = session->s_mds;
int seq = le32_to_cpu(grant->seq);
int newcaps = le32_to_cpu(grant->caps);
int used, wanted, dirty;
......@@ -3057,7 +3065,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
bool fill_inline = false;
dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
inode, cap, mds, seq, ceph_cap_string(newcaps));
inode, cap, session->s_mds, seq, ceph_cap_string(newcaps));
dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
inode->i_size);
......@@ -3103,7 +3111,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
__check_cap_issue(ci, cap, newcaps);
if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
(issued & CEPH_CAP_AUTH_EXCL) == 0) {
(extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
inode->i_mode = le32_to_cpu(grant->mode);
inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
......@@ -3113,14 +3121,15 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
}
if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
(issued & CEPH_CAP_LINK_EXCL) == 0) {
(extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) {
set_nlink(inode, le32_to_cpu(grant->nlink));
if (inode->i_nlink == 0 &&
(newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
deleted_inode = true;
}
if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 &&
grant->xattr_len) {
int len = le32_to_cpu(grant->xattr_len);
u64 version = le64_to_cpu(grant->xattr_version);
......@@ -3140,7 +3149,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
ceph_decode_timespec(&mtime, &grant->mtime);
ceph_decode_timespec(&atime, &grant->atime);
ceph_decode_timespec(&ctime, &grant->ctime);
ceph_fill_file_time(inode, issued,
ceph_fill_file_time(inode, extra_info->issued,
le32_to_cpu(grant->time_warp_seq),
&ctime, &mtime, &atime);
}
......@@ -3153,15 +3162,16 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
lockdep_is_held(&ci->i_ceph_lock));
rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns);
if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
if (ci->i_layout.pool_id != old_pool ||
extra_info->pool_ns != old_ns)
ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
*pns = old_ns;
extra_info->pool_ns = old_ns;
/* size/truncate_seq? */
queue_trunc = ceph_fill_file_size(inode, issued,
queue_trunc = ceph_fill_file_size(inode, extra_info->issued,
le32_to_cpu(grant->truncate_seq),
le64_to_cpu(grant->truncate_size),
size);
......@@ -3240,24 +3250,26 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
}
BUG_ON(cap->issued & ~cap->implemented);
if (inline_version > 0 && inline_version >= ci->i_inline_version) {
ci->i_inline_version = inline_version;
if (extra_info->inline_version > 0 &&
extra_info->inline_version >= ci->i_inline_version) {
ci->i_inline_version = extra_info->inline_version;
if (ci->i_inline_version != CEPH_INLINE_NONE &&
(newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
fill_inline = true;
}
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
if (newcaps & ~issued)
if (newcaps & ~extra_info->issued)
wake = true;
kick_flushing_inode_caps(mdsc, session, inode);
up_read(&mdsc->snap_rwsem);
kick_flushing_inode_caps(session->s_mdsc, session, inode);
up_read(&session->s_mdsc->snap_rwsem);
} else {
spin_unlock(&ci->i_ceph_lock);
}
if (fill_inline)
ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
extra_info->inline_len);
if (queue_trunc)
ceph_queue_vmtruncate(inode);
......@@ -3722,31 +3734,24 @@ void ceph_handle_caps(struct ceph_mds_session *session,
struct ceph_msg *msg)
{
struct ceph_mds_client *mdsc = session->s_mdsc;
struct super_block *sb = mdsc->fsc->sb;
struct inode *inode;
struct ceph_inode_info *ci;
struct ceph_cap *cap;
struct ceph_mds_caps *h;
struct ceph_mds_cap_peer *peer = NULL;
struct ceph_snap_realm *realm = NULL;
struct ceph_string *pool_ns = NULL;
int mds = session->s_mds;
int op, issued;
int op;
u32 seq, mseq;
struct ceph_vino vino;
u64 tid;
u64 inline_version = 0;
void *inline_data = NULL;
u32 inline_len = 0;
void *snaptrace;
size_t snaptrace_len;
void *p, *end;
struct cap_extra_info extra_info = {};
dout("handle_caps from mds%d\n", mds);
dout("handle_caps from mds%d\n", session->s_mds);
/* decode */
end = msg->front.iov_base + msg->front.iov_len;
tid = le64_to_cpu(msg->hdr.tid);
if (msg->front.iov_len < sizeof(*h))
goto bad;
h = msg->front.iov_base;
......@@ -3781,12 +3786,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
}
if (le16_to_cpu(msg->hdr.version) >= 4) {
ceph_decode_64_safe(&p, end, inline_version, bad);
ceph_decode_32_safe(&p, end, inline_len, bad);
if (p + inline_len > end)
ceph_decode_64_safe(&p, end, extra_info.inline_version, bad);
ceph_decode_32_safe(&p, end, extra_info.inline_len, bad);
if (p + extra_info.inline_len > end)
goto bad;
inline_data = p;
p += inline_len;
extra_info.inline_data = p;
p += extra_info.inline_len;
}
if (le16_to_cpu(msg->hdr.version) >= 5) {
......@@ -3811,13 +3816,14 @@ void ceph_handle_caps(struct ceph_mds_session *session,
ceph_decode_32_safe(&p, end, pool_ns_len, bad);
if (pool_ns_len > 0) {
ceph_decode_need(&p, end, pool_ns_len, bad);
pool_ns = ceph_find_or_create_string(p, pool_ns_len);
extra_info.pool_ns =
ceph_find_or_create_string(p, pool_ns_len);
p += pool_ns_len;
}
}
/* lookup ino */
inode = ceph_find_inode(sb, vino);
inode = ceph_find_inode(mdsc->fsc->sb, vino);
ci = ceph_inode(inode);
dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
vino.snap, inode);
......@@ -3850,7 +3856,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
/* these will work even if we don't have a cap yet */
switch (op) {
case CEPH_CAP_OP_FLUSHSNAP_ACK:
handle_cap_flushsnap_ack(inode, tid, h, session);
handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid),
h, session);
goto done;
case CEPH_CAP_OP_EXPORT:
......@@ -3869,10 +3876,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
down_read(&mdsc->snap_rwsem);
}
handle_cap_import(mdsc, inode, h, peer, session,
&cap, &issued);
handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
&cap, &extra_info.issued);
handle_cap_grant(inode, session, cap,
h, msg->middle, &extra_info);
if (realm)
ceph_put_snap_realm(mdsc, realm);
goto done_unlocked;
......@@ -3880,10 +3886,11 @@ void ceph_handle_caps(struct ceph_mds_session *session,
/* the rest require a cap */
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ceph_inode(inode), mds);
cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds);
if (!cap) {
dout(" no cap on %p ino %llx.%llx from mds%d\n",
inode, ceph_ino(inode), ceph_snap(inode), mds);
inode, ceph_ino(inode), ceph_snap(inode),
session->s_mds);
spin_unlock(&ci->i_ceph_lock);
goto flush_cap_releases;
}
......@@ -3892,15 +3899,15 @@ void ceph_handle_caps(struct ceph_mds_session *session,
switch (op) {
case CEPH_CAP_OP_REVOKE:
case CEPH_CAP_OP_GRANT:
__ceph_caps_issued(ci, &issued);
issued |= __ceph_caps_dirty(ci);
handle_cap_grant(mdsc, inode, h, &pool_ns,
inline_version, inline_data, inline_len,
msg->middle, session, cap, issued);
__ceph_caps_issued(ci, &extra_info.issued);
extra_info.issued |= __ceph_caps_dirty(ci);
handle_cap_grant(inode, session, cap,
h, msg->middle, &extra_info);
goto done_unlocked;
case CEPH_CAP_OP_FLUSH_ACK:
handle_cap_flush_ack(inode, tid, h, session, cap);
handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid),
h, session, cap);
break;
case CEPH_CAP_OP_TRUNC:
......@@ -3927,7 +3934,7 @@ void ceph_handle_caps(struct ceph_mds_session *session,
mutex_unlock(&session->s_mutex);
done_unlocked:
iput(inode);
ceph_put_string(pool_ns);
ceph_put_string(extra_info.pool_ns);
return;
bad:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment