Commit c7eaf342 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-4.21-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "A fairly quiet round: a couple of messenger performance improvements
  from myself and a few cap handling fixes from Zheng"

* tag 'ceph-for-4.21-rc1' of git://github.com/ceph/ceph-client:
  ceph: don't encode inode pathes into reconnect message
  ceph: update wanted caps after resuming stale session
  ceph: skip updating 'wanted' caps if caps are already issued
  ceph: don't request excl caps when mount is readonly
  ceph: don't update importing cap's mseq when handing cap export
  libceph: switch more to bool in ceph_tcp_sendmsg()
  libceph: use MSG_SENDPAGE_NOTLAST with ceph_tcp_sendpage()
  libceph: use sock_no_sendpage() as a fallback in ceph_tcp_sendpage()
  libceph: drop last_piece logic from write_partial_message_data()
  ceph: remove redundant assignment
  ceph: cleanup splice_dentry()
parents 35004f2e 5ccedf1c
...@@ -657,6 +657,9 @@ void ceph_add_cap(struct inode *inode, ...@@ -657,6 +657,9 @@ void ceph_add_cap(struct inode *inode,
session->s_nr_caps++; session->s_nr_caps++;
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
} else { } else {
if (cap->cap_gen < session->s_cap_gen)
cap->issued = cap->implemented = CEPH_CAP_PIN;
/* /*
* auth mds of the inode changed. we received the cap export * auth mds of the inode changed. we received the cap export
* message, but still haven't received the cap import message. * message, but still haven't received the cap import message.
...@@ -1862,6 +1865,9 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1862,6 +1865,9 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
* revoking the shared cap on every create/unlink * revoking the shared cap on every create/unlink
* operation. * operation.
*/ */
if (IS_RDONLY(inode))
want = CEPH_CAP_ANY_SHARED;
else
want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL; want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
retain |= want; retain |= want;
} else { } else {
...@@ -1970,8 +1976,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, ...@@ -1970,8 +1976,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
goto ack; goto ack;
/* things we might delay */ /* things we might delay */
if ((cap->issued & ~retain) == 0 && if ((cap->issued & ~retain) == 0)
cap->mds_wanted == want)
continue; /* nope, all good */ continue; /* nope, all good */
if (no_delay) if (no_delay)
...@@ -3048,7 +3053,8 @@ static void handle_cap_grant(struct inode *inode, ...@@ -3048,7 +3053,8 @@ static void handle_cap_grant(struct inode *inode,
int used, wanted, dirty; int used, wanted, dirty;
u64 size = le64_to_cpu(grant->size); u64 size = le64_to_cpu(grant->size);
u64 max_size = le64_to_cpu(grant->max_size); u64 max_size = le64_to_cpu(grant->max_size);
int check_caps = 0; unsigned char check_caps = 0;
bool was_stale = cap->cap_gen < session->s_cap_gen;
bool wake = false; bool wake = false;
bool writeback = false; bool writeback = false;
bool queue_trunc = false; bool queue_trunc = false;
...@@ -3062,21 +3068,6 @@ static void handle_cap_grant(struct inode *inode, ...@@ -3062,21 +3068,6 @@ static void handle_cap_grant(struct inode *inode,
inode->i_size); inode->i_size);
/*
* auth mds of the inode changed. we received the cap export message,
* but still haven't received the cap import message. handle_cap_export
* updated the new auth MDS' cap.
*
* "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
* that was sent before the cap import message. So don't remove caps.
*/
if (ceph_seq_cmp(seq, cap->seq) <= 0) {
WARN_ON(cap != ci->i_auth_cap);
WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
seq = cap->seq;
newcaps |= cap->issued;
}
/* /*
* If CACHE is being revoked, and we have no dirty buffers, * If CACHE is being revoked, and we have no dirty buffers,
* try to invalidate (once). (If there are dirty buffers, we * try to invalidate (once). (If there are dirty buffers, we
...@@ -3096,6 +3087,24 @@ static void handle_cap_grant(struct inode *inode, ...@@ -3096,6 +3087,24 @@ static void handle_cap_grant(struct inode *inode,
} }
} }
if (was_stale)
cap->issued = cap->implemented = CEPH_CAP_PIN;
/*
* auth mds of the inode changed. we received the cap export message,
* but still haven't received the cap import message. handle_cap_export
* updated the new auth MDS' cap.
*
* "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
* that was sent before the cap import message. So don't remove caps.
*/
if (ceph_seq_cmp(seq, cap->seq) <= 0) {
WARN_ON(cap != ci->i_auth_cap);
WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
seq = cap->seq;
newcaps |= cap->issued;
}
/* side effects now are allowed */ /* side effects now are allowed */
cap->cap_gen = session->s_cap_gen; cap->cap_gen = session->s_cap_gen;
cap->seq = seq; cap->seq = seq;
...@@ -3200,12 +3209,19 @@ static void handle_cap_grant(struct inode *inode, ...@@ -3200,12 +3209,19 @@ static void handle_cap_grant(struct inode *inode,
ceph_cap_string(wanted), ceph_cap_string(wanted),
ceph_cap_string(used), ceph_cap_string(used),
ceph_cap_string(dirty)); ceph_cap_string(dirty));
if (wanted != le32_to_cpu(grant->wanted)) {
dout("mds wanted %s -> %s\n", if ((was_stale || le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) &&
ceph_cap_string(le32_to_cpu(grant->wanted)), (wanted & ~(cap->mds_wanted | newcaps))) {
ceph_cap_string(wanted)); /*
/* imported cap may not have correct mds_wanted */ * If mds is importing cap, prior cap messages that update
if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) * 'wanted' may get dropped by mds (migrate seq mismatch).
*
* We don't send cap message to update 'wanted' if what we
* want are already issued. If mds revokes caps, cap message
* that releases caps also tells mds what we want. But if
* caps got revoked by mds forcedly (session stale). We may
* haven't told mds what we want.
*/
check_caps = 1; check_caps = 1;
} }
...@@ -3539,9 +3555,9 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -3539,9 +3555,9 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
goto out_unlock; goto out_unlock;
if (target < 0) { if (target < 0) {
__ceph_remove_cap(cap, false); if (cap->mds_wanted | cap->issued)
if (!ci->i_auth_cap)
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
__ceph_remove_cap(cap, false);
goto out_unlock; goto out_unlock;
} }
...@@ -3569,7 +3585,6 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex, ...@@ -3569,7 +3585,6 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
tcap->cap_id = t_cap_id; tcap->cap_id = t_cap_id;
tcap->seq = t_seq - 1; tcap->seq = t_seq - 1;
tcap->issue_seq = t_seq - 1; tcap->issue_seq = t_seq - 1;
tcap->mseq = t_mseq;
tcap->issued |= issued; tcap->issued |= issued;
tcap->implemented |= issued; tcap->implemented |= issued;
if (cap == ci->i_auth_cap) if (cap == ci->i_auth_cap)
......
...@@ -1098,8 +1098,9 @@ static void update_dentry_lease(struct dentry *dentry, ...@@ -1098,8 +1098,9 @@ static void update_dentry_lease(struct dentry *dentry,
* splice a dentry to an inode. * splice a dentry to an inode.
* caller must hold directory i_mutex for this to be safe. * caller must hold directory i_mutex for this to be safe.
*/ */
static struct dentry *splice_dentry(struct dentry *dn, struct inode *in) static int splice_dentry(struct dentry **pdn, struct inode *in)
{ {
struct dentry *dn = *pdn;
struct dentry *realdn; struct dentry *realdn;
BUG_ON(d_inode(dn)); BUG_ON(d_inode(dn));
...@@ -1132,28 +1133,23 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in) ...@@ -1132,28 +1133,23 @@ static struct dentry *splice_dentry(struct dentry *dn, struct inode *in)
if (IS_ERR(realdn)) { if (IS_ERR(realdn)) {
pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n", pr_err("splice_dentry error %ld %p inode %p ino %llx.%llx\n",
PTR_ERR(realdn), dn, in, ceph_vinop(in)); PTR_ERR(realdn), dn, in, ceph_vinop(in));
dn = realdn; return PTR_ERR(realdn);
/* }
* Caller should release 'dn' in the case of error.
* If 'req->r_dentry' is passed to this function, if (realdn) {
* caller should leave 'req->r_dentry' untouched.
*/
goto out;
} else if (realdn) {
dout("dn %p (%d) spliced with %p (%d) " dout("dn %p (%d) spliced with %p (%d) "
"inode %p ino %llx.%llx\n", "inode %p ino %llx.%llx\n",
dn, d_count(dn), dn, d_count(dn),
realdn, d_count(realdn), realdn, d_count(realdn),
d_inode(realdn), ceph_vinop(d_inode(realdn))); d_inode(realdn), ceph_vinop(d_inode(realdn)));
dput(dn); dput(dn);
dn = realdn; *pdn = realdn;
} else { } else {
BUG_ON(!ceph_dentry(dn)); BUG_ON(!ceph_dentry(dn));
dout("dn %p attached to %p ino %llx.%llx\n", dout("dn %p attached to %p ino %llx.%llx\n",
dn, d_inode(dn), ceph_vinop(d_inode(dn))); dn, d_inode(dn), ceph_vinop(d_inode(dn)));
} }
out: return 0;
return dn;
} }
/* /*
...@@ -1340,7 +1336,12 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) ...@@ -1340,7 +1336,12 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
dout("dn %p gets new offset %lld\n", req->r_old_dentry, dout("dn %p gets new offset %lld\n", req->r_old_dentry,
ceph_dentry(req->r_old_dentry)->offset); ceph_dentry(req->r_old_dentry)->offset);
dn = req->r_old_dentry; /* use old_dentry */ /* swap r_dentry and r_old_dentry in case that
* splice_dentry() gets called later. This is safe
* because no other place will use them */
req->r_dentry = req->r_old_dentry;
req->r_old_dentry = dn;
dn = req->r_dentry;
} }
/* null dentry? */ /* null dentry? */
...@@ -1365,12 +1366,10 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) ...@@ -1365,12 +1366,10 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
if (d_really_is_negative(dn)) { if (d_really_is_negative(dn)) {
ceph_dir_clear_ordered(dir); ceph_dir_clear_ordered(dir);
ihold(in); ihold(in);
dn = splice_dentry(dn, in); err = splice_dentry(&req->r_dentry, in);
if (IS_ERR(dn)) { if (err < 0)
err = PTR_ERR(dn);
goto done; goto done;
} dn = req->r_dentry; /* may have spliced */
req->r_dentry = dn; /* may have spliced */
} else if (d_really_is_positive(dn) && d_inode(dn) != in) { } else if (d_really_is_positive(dn) && d_inode(dn) != in) {
dout(" %p links to %p %llx.%llx, not %llx.%llx\n", dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
dn, d_inode(dn), ceph_vinop(d_inode(dn)), dn, d_inode(dn), ceph_vinop(d_inode(dn)),
...@@ -1390,22 +1389,18 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req) ...@@ -1390,22 +1389,18 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
} else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP || } else if ((req->r_op == CEPH_MDS_OP_LOOKUPSNAP ||
req->r_op == CEPH_MDS_OP_MKSNAP) && req->r_op == CEPH_MDS_OP_MKSNAP) &&
!test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) { !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
struct dentry *dn = req->r_dentry;
struct inode *dir = req->r_parent; struct inode *dir = req->r_parent;
/* fill out a snapdir LOOKUPSNAP dentry */ /* fill out a snapdir LOOKUPSNAP dentry */
BUG_ON(!dn);
BUG_ON(!dir); BUG_ON(!dir);
BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR); BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
dout(" linking snapped dir %p to dn %p\n", in, dn); BUG_ON(!req->r_dentry);
dout(" linking snapped dir %p to dn %p\n", in, req->r_dentry);
ceph_dir_clear_ordered(dir); ceph_dir_clear_ordered(dir);
ihold(in); ihold(in);
dn = splice_dentry(dn, in); err = splice_dentry(&req->r_dentry, in);
if (IS_ERR(dn)) { if (err < 0)
err = PTR_ERR(dn);
goto done; goto done;
}
req->r_dentry = dn; /* may have spliced */
} else if (rinfo->head->is_dentry) { } else if (rinfo->head->is_dentry) {
struct ceph_vino *ptvino = NULL; struct ceph_vino *ptvino = NULL;
...@@ -1669,8 +1664,6 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1669,8 +1664,6 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
} }
if (d_really_is_negative(dn)) { if (d_really_is_negative(dn)) {
struct dentry *realdn;
if (ceph_security_xattr_deadlock(in)) { if (ceph_security_xattr_deadlock(in)) {
dout(" skip splicing dn %p to inode %p" dout(" skip splicing dn %p to inode %p"
" (security xattr deadlock)\n", dn, in); " (security xattr deadlock)\n", dn, in);
...@@ -1679,14 +1672,10 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1679,14 +1672,10 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
goto next_item; goto next_item;
} }
realdn = splice_dentry(dn, in); err = splice_dentry(&dn, in);
if (IS_ERR(realdn)) { if (err < 0)
err = PTR_ERR(realdn);
d_drop(dn);
goto next_item; goto next_item;
} }
dn = realdn;
}
ceph_dentry(dn)->offset = rde->offset; ceph_dentry(dn)->offset = rde->offset;
...@@ -1701,7 +1690,6 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1701,7 +1690,6 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
err = ret; err = ret;
} }
next_item: next_item:
if (dn)
dput(dn); dput(dn);
} }
out: out:
......
...@@ -1232,13 +1232,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -1232,13 +1232,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
dout("removing cap %p, ci is %p, inode is %p\n", dout("removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->vfs_inode); cap, ci, &ci->vfs_inode);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (cap->mds_wanted | cap->issued)
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
__ceph_remove_cap(cap, false); __ceph_remove_cap(cap, false);
if (!ci->i_auth_cap) { if (!ci->i_auth_cap) {
struct ceph_cap_flush *cf; struct ceph_cap_flush *cf;
struct ceph_mds_client *mdsc = fsc->mdsc; struct ceph_mds_client *mdsc = fsc->mdsc;
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
if (ci->i_wrbuffer_ref > 0 && if (ci->i_wrbuffer_ref > 0 &&
READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) READ_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
invalidate = true; invalidate = true;
...@@ -1355,6 +1355,12 @@ static void remove_session_caps(struct ceph_mds_session *session) ...@@ -1355,6 +1355,12 @@ static void remove_session_caps(struct ceph_mds_session *session)
dispose_cap_releases(session->s_mdsc, &dispose); dispose_cap_releases(session->s_mdsc, &dispose);
} }
enum {
RECONNECT,
RENEWCAPS,
FORCE_RO,
};
/* /*
* wake up any threads waiting on this session's caps. if the cap is * wake up any threads waiting on this session's caps. if the cap is
* old (didn't get renewed on the client reconnect), remove it now. * old (didn't get renewed on the client reconnect), remove it now.
...@@ -1365,23 +1371,34 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -1365,23 +1371,34 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,
void *arg) void *arg)
{ {
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
unsigned long ev = (unsigned long)arg;
if (arg) { if (ev == RECONNECT) {
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
ci->i_wanted_max_size = 0; ci->i_wanted_max_size = 0;
ci->i_requested_max_size = 0; ci->i_requested_max_size = 0;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
} else if (ev == RENEWCAPS) {
if (cap->cap_gen < cap->session->s_cap_gen) {
/* mds did not re-issue stale cap */
spin_lock(&ci->i_ceph_lock);
cap->issued = cap->implemented = CEPH_CAP_PIN;
/* make sure mds knows what we want */
if (__ceph_caps_file_wanted(ci) & ~cap->mds_wanted)
ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
spin_unlock(&ci->i_ceph_lock);
}
} else if (ev == FORCE_RO) {
} }
wake_up_all(&ci->i_cap_wq); wake_up_all(&ci->i_cap_wq);
return 0; return 0;
} }
static void wake_up_session_caps(struct ceph_mds_session *session, static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
int reconnect)
{ {
dout("wake_up_session_caps %p mds%d\n", session, session->s_mds); dout("wake_up_session_caps %p mds%d\n", session, session->s_mds);
iterate_session_caps(session, wake_up_session_cb, iterate_session_caps(session, wake_up_session_cb,
(void *)(unsigned long)reconnect); (void *)(unsigned long)ev);
} }
/* /*
...@@ -1466,7 +1483,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc, ...@@ -1466,7 +1483,7 @@ static void renewed_caps(struct ceph_mds_client *mdsc,
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
if (wake) if (wake)
wake_up_session_caps(session, 0); wake_up_session_caps(session, RENEWCAPS);
} }
/* /*
...@@ -2847,7 +2864,7 @@ static void handle_session(struct ceph_mds_session *session, ...@@ -2847,7 +2864,7 @@ static void handle_session(struct ceph_mds_session *session,
spin_lock(&session->s_cap_lock); spin_lock(&session->s_cap_lock);
session->s_readonly = true; session->s_readonly = true;
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
wake_up_session_caps(session, 0); wake_up_session_caps(session, FORCE_RO);
break; break;
case CEPH_SESSION_REJECT: case CEPH_SESSION_REJECT:
...@@ -2943,11 +2960,8 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -2943,11 +2960,8 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
struct ceph_inode_info *ci = cap->ci; struct ceph_inode_info *ci = cap->ci;
struct ceph_reconnect_state *recon_state = arg; struct ceph_reconnect_state *recon_state = arg;
struct ceph_pagelist *pagelist = recon_state->pagelist; struct ceph_pagelist *pagelist = recon_state->pagelist;
char *path; int err;
int pathlen, err;
u64 pathbase;
u64 snap_follows; u64 snap_follows;
struct dentry *dentry;
dout(" adding %p ino %llx.%llx cap %p %lld %s\n", dout(" adding %p ino %llx.%llx cap %p %lld %s\n",
inode, ceph_vinop(inode), cap, cap->cap_id, inode, ceph_vinop(inode), cap, cap->cap_id,
...@@ -2956,19 +2970,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -2956,19 +2970,6 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
if (err) if (err)
return err; return err;
dentry = d_find_alias(inode);
if (dentry) {
path = ceph_mdsc_build_path(dentry, &pathlen, &pathbase, 0);
if (IS_ERR(path)) {
err = PTR_ERR(path);
goto out_dput;
}
} else {
path = NULL;
pathlen = 0;
pathbase = 0;
}
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
cap->seq = 0; /* reset cap seq */ cap->seq = 0; /* reset cap seq */
cap->issue_seq = 0; /* and issue_seq */ cap->issue_seq = 0; /* and issue_seq */
...@@ -2980,7 +2981,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -2980,7 +2981,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci)); rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
rec.v2.issued = cpu_to_le32(cap->issued); rec.v2.issued = cpu_to_le32(cap->issued);
rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.v2.pathbase = cpu_to_le64(pathbase); rec.v2.pathbase = 0;
rec.v2.flock_len = (__force __le32) rec.v2.flock_len = (__force __le32)
((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1); ((ci->i_ceph_flags & CEPH_I_ERROR_FILELOCK) ? 0 : 1);
} else { } else {
...@@ -2991,7 +2992,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -2991,7 +2992,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime); ceph_encode_timespec64(&rec.v1.mtime, &inode->i_mtime);
ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime); ceph_encode_timespec64(&rec.v1.atime, &inode->i_atime);
rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino); rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
rec.v1.pathbase = cpu_to_le64(pathbase); rec.v1.pathbase = 0;
} }
if (list_empty(&ci->i_cap_snaps)) { if (list_empty(&ci->i_cap_snaps)) {
...@@ -3023,7 +3024,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3023,7 +3024,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
GFP_NOFS); GFP_NOFS);
if (!flocks) { if (!flocks) {
err = -ENOMEM; err = -ENOMEM;
goto out_free; goto out_err;
} }
err = ceph_encode_locks_to_buffer(inode, flocks, err = ceph_encode_locks_to_buffer(inode, flocks,
num_fcntl_locks, num_fcntl_locks,
...@@ -3033,7 +3034,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3033,7 +3034,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
flocks = NULL; flocks = NULL;
if (err == -ENOSPC) if (err == -ENOSPC)
goto encode_again; goto encode_again;
goto out_free; goto out_err;
} }
} else { } else {
kfree(flocks); kfree(flocks);
...@@ -3053,44 +3054,64 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap, ...@@ -3053,44 +3054,64 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
sizeof(struct ceph_filelock); sizeof(struct ceph_filelock);
rec.v2.flock_len = cpu_to_le32(struct_len); rec.v2.flock_len = cpu_to_le32(struct_len);
struct_len += sizeof(rec.v2); struct_len += sizeof(u32) + sizeof(rec.v2);
struct_len += sizeof(u32) + pathlen;
if (struct_v >= 2) if (struct_v >= 2)
struct_len += sizeof(u64); /* snap_follows */ struct_len += sizeof(u64); /* snap_follows */
total_len += struct_len; total_len += struct_len;
err = ceph_pagelist_reserve(pagelist, total_len); err = ceph_pagelist_reserve(pagelist, total_len);
if (err) {
kfree(flocks);
goto out_err;
}
if (!err) {
if (recon_state->msg_version >= 3) { if (recon_state->msg_version >= 3) {
ceph_pagelist_encode_8(pagelist, struct_v); ceph_pagelist_encode_8(pagelist, struct_v);
ceph_pagelist_encode_8(pagelist, 1); ceph_pagelist_encode_8(pagelist, 1);
ceph_pagelist_encode_32(pagelist, struct_len); ceph_pagelist_encode_32(pagelist, struct_len);
} }
ceph_pagelist_encode_string(pagelist, path, pathlen); ceph_pagelist_encode_string(pagelist, NULL, 0);
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2)); ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
ceph_locks_to_pagelist(flocks, pagelist, ceph_locks_to_pagelist(flocks, pagelist,
num_fcntl_locks, num_fcntl_locks, num_flock_locks);
num_flock_locks);
if (struct_v >= 2) if (struct_v >= 2)
ceph_pagelist_encode_64(pagelist, snap_follows); ceph_pagelist_encode_64(pagelist, snap_follows);
}
kfree(flocks); kfree(flocks);
} else { } else {
size_t size = sizeof(u32) + pathlen + sizeof(rec.v1); u64 pathbase = 0;
err = ceph_pagelist_reserve(pagelist, size); int pathlen = 0;
if (!err) { char *path = NULL;
struct dentry *dentry;
dentry = d_find_alias(inode);
if (dentry) {
path = ceph_mdsc_build_path(dentry,
&pathlen, &pathbase, 0);
dput(dentry);
if (IS_ERR(path)) {
err = PTR_ERR(path);
goto out_err;
}
rec.v1.pathbase = cpu_to_le64(pathbase);
}
err = ceph_pagelist_reserve(pagelist,
pathlen + sizeof(u32) + sizeof(rec.v1));
if (err) {
kfree(path);
goto out_err;
}
ceph_pagelist_encode_string(pagelist, path, pathlen); ceph_pagelist_encode_string(pagelist, path, pathlen);
ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1)); ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
}
kfree(path);
} }
recon_state->nr_caps++; recon_state->nr_caps++;
out_free: out_err:
kfree(path);
out_dput:
dput(dentry);
return err; return err;
} }
...@@ -3339,7 +3360,7 @@ static void check_new_map(struct ceph_mds_client *mdsc, ...@@ -3339,7 +3360,7 @@ static void check_new_map(struct ceph_mds_client *mdsc,
pr_info("mds%d recovery completed\n", s->s_mds); pr_info("mds%d recovery completed\n", s->s_mds);
kick_requests(mdsc, i); kick_requests(mdsc, i);
ceph_kick_flushing_caps(mdsc, s); ceph_kick_flushing_caps(mdsc, s);
wake_up_session_caps(s, 1); wake_up_session_caps(s, RECONNECT);
} }
} }
......
...@@ -18,13 +18,15 @@ ...@@ -18,13 +18,15 @@
/* The first 8 bits are reserved for old ceph releases */ /* The first 8 bits are reserved for old ceph releases */
#define CEPHFS_FEATURE_MIMIC 8 #define CEPHFS_FEATURE_MIMIC 8
#define CEPHFS_FEATURE_REPLY_ENCODING 9
#define CEPHFS_FEATURE_RECLAIM_CLIENT 10
#define CEPHFS_FEATURE_LAZY_CAP_WANTED 11
#define CEPHFS_FEATURES_ALL { \ #define CEPHFS_FEATURES_CLIENT_SUPPORTED { \
0, 1, 2, 3, 4, 5, 6, 7, \ 0, 1, 2, 3, 4, 5, 6, 7, \
CEPHFS_FEATURE_MIMIC, \ CEPHFS_FEATURE_MIMIC, \
CEPHFS_FEATURE_LAZY_CAP_WANTED, \
} }
#define CEPHFS_FEATURES_CLIENT_SUPPORTED CEPHFS_FEATURES_ALL
#define CEPHFS_FEATURES_CLIENT_REQUIRED {} #define CEPHFS_FEATURES_CLIENT_REQUIRED {}
......
...@@ -35,7 +35,6 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m) ...@@ -35,7 +35,6 @@ int ceph_mdsmap_get_random_mds(struct ceph_mdsmap *m)
/* pick */ /* pick */
n = prandom_u32() % n; n = prandom_u32() % n;
i = 0;
for (i = 0; n > 0; i++, n--) for (i = 0; n > 0; i++, n--)
while (m->m_info[i].state <= 0) while (m->m_info[i].state <= 0)
i++; i++;
......
...@@ -544,7 +544,7 @@ static int ceph_tcp_recvpage(struct socket *sock, struct page *page, ...@@ -544,7 +544,7 @@ static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
* shortly. * shortly.
*/ */
static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
size_t kvlen, size_t len, int more) size_t kvlen, size_t len, bool more)
{ {
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
int r; int r;
...@@ -560,24 +560,15 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov, ...@@ -560,24 +560,15 @@ static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
return r; return r;
} }
static int __ceph_tcp_sendpage(struct socket *sock, struct page *page, /*
int offset, size_t size, bool more) * @more: either or both of MSG_MORE and MSG_SENDPAGE_NOTLAST
{ */
int flags = MSG_DONTWAIT | MSG_NOSIGNAL | (more ? MSG_MORE : MSG_EOR);
int ret;
ret = kernel_sendpage(sock, page, offset, size, flags);
if (ret == -EAGAIN)
ret = 0;
return ret;
}
static int ceph_tcp_sendpage(struct socket *sock, struct page *page, static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
int offset, size_t size, bool more) int offset, size_t size, int more)
{ {
struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL }; ssize_t (*sendpage)(struct socket *sock, struct page *page,
struct bio_vec bvec; int offset, size_t size, int flags);
int flags = MSG_DONTWAIT | MSG_NOSIGNAL | more;
int ret; int ret;
/* /*
...@@ -589,19 +580,11 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page, ...@@ -589,19 +580,11 @@ static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
* triggers one of hardened usercopy checks. * triggers one of hardened usercopy checks.
*/ */
if (page_count(page) >= 1 && !PageSlab(page)) if (page_count(page) >= 1 && !PageSlab(page))
return __ceph_tcp_sendpage(sock, page, offset, size, more); sendpage = sock->ops->sendpage;
bvec.bv_page = page;
bvec.bv_offset = offset;
bvec.bv_len = size;
if (more)
msg.msg_flags |= MSG_MORE;
else else
msg.msg_flags |= MSG_EOR; /* superfluous, but what the hell */ sendpage = sock_no_sendpage;
iov_iter_bvec(&msg.msg_iter, WRITE, &bvec, 1, size); ret = sendpage(sock, page, offset, size, flags);
ret = sock_sendmsg(sock, &msg);
if (ret == -EAGAIN) if (ret == -EAGAIN)
ret = 0; ret = 0;
...@@ -1572,6 +1555,7 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1572,6 +1555,7 @@ static int write_partial_message_data(struct ceph_connection *con)
struct ceph_msg *msg = con->out_msg; struct ceph_msg *msg = con->out_msg;
struct ceph_msg_data_cursor *cursor = &msg->cursor; struct ceph_msg_data_cursor *cursor = &msg->cursor;
bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC); bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
u32 crc; u32 crc;
dout("%s %p msg %p\n", __func__, con, msg); dout("%s %p msg %p\n", __func__, con, msg);
...@@ -1592,7 +1576,6 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1592,7 +1576,6 @@ static int write_partial_message_data(struct ceph_connection *con)
struct page *page; struct page *page;
size_t page_offset; size_t page_offset;
size_t length; size_t length;
bool last_piece;
int ret; int ret;
if (!cursor->resid) { if (!cursor->resid) {
...@@ -1600,10 +1583,11 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1600,10 +1583,11 @@ static int write_partial_message_data(struct ceph_connection *con)
continue; continue;
} }
page = ceph_msg_data_next(cursor, &page_offset, &length, page = ceph_msg_data_next(cursor, &page_offset, &length, NULL);
&last_piece); if (length == cursor->total_resid)
ret = ceph_tcp_sendpage(con->sock, page, page_offset, more = MSG_MORE;
length, !last_piece); ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
more);
if (ret <= 0) { if (ret <= 0) {
if (do_datacrc) if (do_datacrc)
msg->footer.data_crc = cpu_to_le32(crc); msg->footer.data_crc = cpu_to_le32(crc);
...@@ -1633,13 +1617,16 @@ static int write_partial_message_data(struct ceph_connection *con) ...@@ -1633,13 +1617,16 @@ static int write_partial_message_data(struct ceph_connection *con)
*/ */
static int write_partial_skip(struct ceph_connection *con) static int write_partial_skip(struct ceph_connection *con)
{ {
int more = MSG_MORE | MSG_SENDPAGE_NOTLAST;
int ret; int ret;
dout("%s %p %d left\n", __func__, con, con->out_skip); dout("%s %p %d left\n", __func__, con, con->out_skip);
while (con->out_skip > 0) { while (con->out_skip > 0) {
size_t size = min(con->out_skip, (int) PAGE_SIZE); size_t size = min(con->out_skip, (int) PAGE_SIZE);
ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, true); if (size == con->out_skip)
more = MSG_MORE;
ret = ceph_tcp_sendpage(con->sock, zero_page, 0, size, more);
if (ret <= 0) if (ret <= 0)
goto out; goto out;
con->out_skip -= ret; con->out_skip -= ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment