Commit 17d8e3d9 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.19-rc1' of https://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "A big pile of assorted fixes and improvements for the filesystem with
  nothing in particular standing out, except perhaps that the fact that
  the MDS never really maintained atime was made official and thus it's
  no longer updated on the client either.

  We also have a MAINTAINERS update: Jeff is transitioning his
  filesystem maintainership duties to Xiubo"

* tag 'ceph-for-5.19-rc1' of https://github.com/ceph/ceph-client: (23 commits)
  MAINTAINERS: move myself from ceph "Maintainer" to "Reviewer"
  ceph: fix decoding of client session messages flags
  ceph: switch TASK_INTERRUPTIBLE to TASK_KILLABLE
  ceph: remove redundant variable ino
  ceph: try to queue a writeback if revoking fails
  ceph: fix statfs for subdir mounts
  ceph: fix possible deadlock when holding Fwb to get inline_data
  ceph: redirty the page for writepage on failure
  ceph: try to choose the auth MDS if possible for getattr
  ceph: disable updating the atime since cephfs won't maintain it
  ceph: flush the mdlog for filesystem sync
  ceph: rename unsafe_request_wait()
  libceph: use swap() macro instead of taking tmp variable
  ceph: fix statx AT_STATX_DONT_SYNC vs AT_STATX_FORCE_SYNC check
  ceph: no need to invalidate the fscache twice
  ceph: replace usage of found with dedicated list iterator variable
  ceph: use dedicated list iterator variable
  ceph: update the dlease for the hashed dentry when removing
  ceph: stop retrying the request when exceeding 256 times
  ceph: stop forwarding the request when exceeding 256 times
  ...
parents 7c9e960c af7dc8e5
......@@ -4566,8 +4566,8 @@ F: drivers/power/supply/cw2015_battery.c
CEPH COMMON CODE (LIBCEPH)
M: Ilya Dryomov <idryomov@gmail.com>
M: Jeff Layton <jlayton@kernel.org>
M: Xiubo Li <xiubli@redhat.com>
R: Jeff Layton <jlayton@kernel.org>
L: ceph-devel@vger.kernel.org
S: Supported
W: http://ceph.com/
......@@ -4577,9 +4577,9 @@ F: include/linux/crush/
F: net/ceph/
CEPH DISTRIBUTED FILE SYSTEM CLIENT (CEPH)
M: Jeff Layton <jlayton@kernel.org>
M: Xiubo Li <xiubli@redhat.com>
M: Ilya Dryomov <idryomov@gmail.com>
R: Jeff Layton <jlayton@kernel.org>
L: ceph-devel@vger.kernel.org
S: Supported
W: http://ceph.com/
......
......@@ -756,24 +756,23 @@ static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
*/
static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
{
struct rbd_client *client_node;
bool found = false;
struct rbd_client *rbdc = NULL, *iter;
if (ceph_opts->flags & CEPH_OPT_NOSHARE)
return NULL;
spin_lock(&rbd_client_list_lock);
list_for_each_entry(client_node, &rbd_client_list, node) {
if (!ceph_compare_options(ceph_opts, client_node->client)) {
__rbd_get_client(client_node);
list_for_each_entry(iter, &rbd_client_list, node) {
if (!ceph_compare_options(ceph_opts, iter->client)) {
__rbd_get_client(iter);
found = true;
rbdc = iter;
break;
}
}
spin_unlock(&rbd_client_list_lock);
return found ? client_node : NULL;
return rbdc;
}
/*
......
......@@ -256,6 +256,7 @@ static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest *subreq)
struct iov_iter iter;
ssize_t err = 0;
size_t len;
int mode;
__set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
__clear_bit(NETFS_SREQ_COPY_TO_CACHE, &subreq->flags);
......@@ -264,7 +265,8 @@ static bool ceph_netfs_issue_op_inline(struct netfs_io_subrequest *subreq)
goto out;
/* We need to fetch the inline data. */
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
mode = ceph_try_to_choose_auth_mds(inode, CEPH_STAT_CAP_INLINE_DATA);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
if (IS_ERR(req)) {
err = PTR_ERR(req);
goto out;
......@@ -604,8 +606,10 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc,
ceph_wbc.truncate_seq, ceph_wbc.truncate_size,
true);
if (IS_ERR(req))
if (IS_ERR(req)) {
redirty_page_for_writepage(wbc, page);
return PTR_ERR(req);
}
set_page_writeback(page);
if (caching)
......@@ -1644,7 +1648,7 @@ int ceph_uninline_data(struct file *file)
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_request *req;
struct ceph_osd_request *req = NULL;
struct ceph_cap_flush *prealloc_cf;
struct folio *folio = NULL;
u64 inline_version = CEPH_INLINE_NONE;
......@@ -1652,10 +1656,23 @@ int ceph_uninline_data(struct file *file)
int err = 0;
u64 len;
spin_lock(&ci->i_ceph_lock);
inline_version = ci->i_inline_version;
spin_unlock(&ci->i_ceph_lock);
dout("uninline_data %p %llx.%llx inline_version %llu\n",
inode, ceph_vinop(inode), inline_version);
if (inline_version == CEPH_INLINE_NONE)
return 0;
prealloc_cf = ceph_alloc_cap_flush();
if (!prealloc_cf)
return -ENOMEM;
if (inline_version == 1) /* initial version, no data */
goto out_uninline;
folio = read_mapping_folio(inode->i_mapping, 0, file);
if (IS_ERR(folio)) {
err = PTR_ERR(folio);
......@@ -1664,17 +1681,6 @@ int ceph_uninline_data(struct file *file)
folio_lock(folio);
spin_lock(&ci->i_ceph_lock);
inline_version = ci->i_inline_version;
spin_unlock(&ci->i_ceph_lock);
dout("uninline_data %p %llx.%llx inline_version %llu\n",
inode, ceph_vinop(inode), inline_version);
if (inline_version == 1 || /* initial version, no data */
inline_version == CEPH_INLINE_NONE)
goto out_unlock;
len = i_size_read(inode);
if (len > folio_size(folio))
len = folio_size(folio);
......@@ -1739,6 +1745,7 @@ int ceph_uninline_data(struct file *file)
ceph_update_write_metrics(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, len, err);
out_uninline:
if (!err) {
int dirty;
......@@ -1757,8 +1764,10 @@ int ceph_uninline_data(struct file *file)
if (err == -ECANCELED)
err = 0;
out_unlock:
folio_unlock(folio);
folio_put(folio);
if (folio) {
folio_unlock(folio);
folio_put(folio);
}
out:
ceph_free_cap_flush(prealloc_cf);
dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
......@@ -1777,7 +1786,6 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma)
if (!mapping->a_ops->read_folio)
return -ENOEXEC;
file_accessed(file);
vma->vm_ops = &ceph_vmops;
return 0;
}
......
......@@ -1577,7 +1577,7 @@ static void __ceph_flush_snaps(struct ceph_inode_info *ci,
while (first_tid <= last_tid) {
struct ceph_cap *cap = ci->i_auth_cap;
struct ceph_cap_flush *cf;
struct ceph_cap_flush *cf = NULL, *iter;
int ret;
if (!(cap && cap->session == session)) {
......@@ -1587,8 +1587,9 @@ static void __ceph_flush_snaps(struct ceph_inode_info *ci,
}
ret = -ENOENT;
list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
if (cf->tid >= first_tid) {
list_for_each_entry(iter, &ci->i_cap_flush_list, i_list) {
if (iter->tid >= first_tid) {
cf = iter;
ret = 0;
break;
}
......@@ -1910,6 +1911,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
struct rb_node *p;
bool queue_invalidate = false;
bool tried_invalidate = false;
bool queue_writeback = false;
if (session)
ceph_get_mds_session(session);
......@@ -2062,10 +2064,27 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
}
/* completed revocation? going down and there are no caps? */
if (revoking && (revoking & cap_used) == 0) {
dout("completed revocation of %s\n",
ceph_cap_string(cap->implemented & ~cap->issued));
goto ack;
if (revoking) {
if ((revoking & cap_used) == 0) {
dout("completed revocation of %s\n",
ceph_cap_string(cap->implemented & ~cap->issued));
goto ack;
}
/*
* If the "i_wrbuffer_ref" was increased by mmap or generic
* cache write just before the ceph_check_caps() is called,
* the Fb capability revoking will fail this time. Then we
* must wait for the BDI's delayed work to flush the dirty
* pages and to release the "i_wrbuffer_ref", which will cost
* at most 5 seconds. That means the MDS needs to wait at
* most 5 seconds to finished the Fb capability's revocation.
*
* Let's queue a writeback for it.
*/
if (S_ISREG(inode->i_mode) && ci->i_wrbuffer_ref &&
(revoking & CEPH_CAP_FILE_BUFFER))
queue_writeback = true;
}
/* want more caps from mds? */
......@@ -2135,6 +2154,8 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
spin_unlock(&ci->i_ceph_lock);
ceph_put_mds_session(session);
if (queue_writeback)
ceph_queue_writeback(inode);
if (queue_invalidate)
ceph_queue_invalidate(inode);
}
......@@ -2218,9 +2239,9 @@ static int caps_are_flushed(struct inode *inode, u64 flush_tid)
}
/*
* wait for any unsafe requests to complete.
* flush the mdlog and wait for any unsafe requests to complete.
*/
static int unsafe_request_wait(struct inode *inode)
static int flush_mdlog_and_wait_inode_unsafe_requests(struct inode *inode)
{
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_inode_info *ci = ceph_inode(inode);
......@@ -2336,7 +2357,7 @@ static int unsafe_request_wait(struct inode *inode)
kfree(sessions);
}
dout("unsafe_request_wait %p wait on tid %llu %llu\n",
dout("%s %p wait on tid %llu %llu\n", __func__,
inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
if (req1) {
ret = !wait_for_completion_timeout(&req1->r_safe_completion,
......@@ -2380,7 +2401,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
dirty = try_flush_caps(inode, &flush_tid);
dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
err = unsafe_request_wait(inode);
err = flush_mdlog_and_wait_inode_unsafe_requests(inode);
/*
* only wait on non-file metadata writeback (the mds
......@@ -3182,10 +3203,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc)
{
struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap = NULL;
struct ceph_cap_snap *capsnap = NULL, *iter;
int put = 0;
bool last = false;
bool found = false;
bool flush_snaps = false;
bool complete_capsnap = false;
......@@ -3212,14 +3232,14 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
last ? " LAST" : "");
} else {
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
if (capsnap->context == snapc) {
found = true;
list_for_each_entry(iter, &ci->i_cap_snaps, ci_item) {
if (iter->context == snapc) {
capsnap = iter;
break;
}
}
if (!found) {
if (!capsnap) {
/*
* The capsnap should already be removed when removing
* auth cap in the case of a forced unmount.
......@@ -3769,8 +3789,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
u64 follows = le64_to_cpu(m->snap_follows);
struct ceph_cap_snap *capsnap;
bool flushed = false;
struct ceph_cap_snap *capsnap = NULL, *iter;
bool wake_ci = false;
bool wake_mdsc = false;
......@@ -3778,26 +3797,26 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
inode, ci, session->s_mds, follows);
spin_lock(&ci->i_ceph_lock);
list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
if (capsnap->follows == follows) {
if (capsnap->cap_flush.tid != flush_tid) {
list_for_each_entry(iter, &ci->i_cap_snaps, ci_item) {
if (iter->follows == follows) {
if (iter->cap_flush.tid != flush_tid) {
dout(" cap_snap %p follows %lld tid %lld !="
" %lld\n", capsnap, follows,
flush_tid, capsnap->cap_flush.tid);
" %lld\n", iter, follows,
flush_tid, iter->cap_flush.tid);
break;
}
flushed = true;
capsnap = iter;
break;
} else {
dout(" skipping cap_snap %p follows %lld\n",
capsnap, capsnap->follows);
iter, iter->follows);
}
}
if (flushed)
if (capsnap)
ceph_remove_capsnap(inode, capsnap, &wake_ci, &wake_mdsc);
spin_unlock(&ci->i_ceph_lock);
if (flushed) {
if (capsnap) {
ceph_put_snap_context(capsnap->context);
ceph_put_cap_snap(capsnap);
if (wake_ci)
......
......@@ -578,7 +578,7 @@ void ceph_evict_inode(struct inode *inode)
__ceph_remove_caps(ci);
if (__ceph_has_any_quota(ci))
if (__ceph_has_quota(ci, QUOTA_GET_ANY))
ceph_adjust_quota_realms_count(inode, false);
/*
......@@ -1466,10 +1466,12 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
} else if (have_lease) {
if (d_unhashed(dn))
d_add(dn, NULL);
}
if (!d_unhashed(dn) && have_lease)
update_dentry_lease(dir, dn,
rinfo->dlease, session,
req->r_request_started);
}
goto done;
}
......@@ -1884,7 +1886,6 @@ static void ceph_do_invalidate_pages(struct inode *inode)
orig_gen = ci->i_rdcache_gen;
spin_unlock(&ci->i_ceph_lock);
ceph_fscache_invalidate(inode, false);
if (invalidate_inode_pages2(inode->i_mapping) < 0) {
pr_err("invalidate_inode_pages2 %llx.%llx failed\n",
ceph_vinop(inode));
......@@ -2258,6 +2259,30 @@ int ceph_setattr(struct user_namespace *mnt_userns, struct dentry *dentry,
return err;
}
int ceph_try_to_choose_auth_mds(struct inode *inode, int mask)
{
int issued = ceph_caps_issued(ceph_inode(inode));
/*
* If any 'x' caps is issued we can just choose the auth MDS
* instead of the random replica MDSes. Because only when the
* Locker is in LOCK_EXEC state will the loner client could
* get the 'x' caps. And if we send the getattr requests to
* any replica MDS it must auth pin and tries to rdlock from
* the auth MDS, and then the auth MDS need to do the Locker
* state transition to LOCK_SYNC. And after that the lock state
* will change back.
*
* This cost much when doing the Locker state transition and
* usually will need to revoke caps from clients.
*/
if (((mask & CEPH_CAP_ANY_SHARED) && (issued & CEPH_CAP_ANY_EXCL))
|| (mask & CEPH_STAT_RSTAT))
return USE_AUTH_MDS;
else
return USE_ANY_MDS;
}
/*
* Verify that we have a lease on the given mask. If not,
* do a getattr against an mds.
......@@ -2281,7 +2306,7 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
if (!force && ceph_caps_issued_mask_metric(ceph_inode(inode), mask, 1))
return 0;
mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
mode = ceph_try_to_choose_auth_mds(inode, mask);
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
if (IS_ERR(req))
return PTR_ERR(req);
......@@ -2423,7 +2448,7 @@ int ceph_getattr(struct user_namespace *mnt_userns, const struct path *path,
return -ESTALE;
/* Skip the getattr altogether if we're asked not to sync */
if (!(flags & AT_STATX_DONT_SYNC)) {
if ((flags & AT_STATX_SYNC_TYPE) != AT_STATX_DONT_SYNC) {
err = ceph_do_getattr(inode,
statx_to_caps(request_mask, inode->i_mode),
flags & AT_STATX_FORCE_SYNC);
......
......@@ -437,7 +437,7 @@ static int ceph_parse_deleg_inos(void **p, void *end,
ceph_decode_32_safe(p, end, sets, bad);
dout("got %u sets of delegated inodes\n", sets);
while (sets--) {
u64 start, len, ino;
u64 start, len;
ceph_decode_64_safe(p, end, start, bad);
ceph_decode_64_safe(p, end, len, bad);
......@@ -449,7 +449,7 @@ static int ceph_parse_deleg_inos(void **p, void *end,
continue;
}
while (len--) {
int err = xa_insert(&s->s_delegated_inos, ino = start++,
int err = xa_insert(&s->s_delegated_inos, start++,
DELEGATED_INO_AVAILABLE,
GFP_KERNEL);
if (!err) {
......@@ -2651,7 +2651,28 @@ static int __prepare_send_request(struct ceph_mds_session *session,
struct ceph_mds_client *mdsc = session->s_mdsc;
struct ceph_mds_request_head_old *rhead;
struct ceph_msg *msg;
int flags = 0;
int flags = 0, max_retry;
/*
* The type of 'r_attempts' in kernel 'ceph_mds_request'
* is 'int', while in 'ceph_mds_request_head' the type of
* 'num_retry' is '__u8'. So in case the request retries
* exceeding 256 times, the MDS will receive a incorrect
* retry seq.
*
* In this case it's ususally a bug in MDS and continue
* retrying the request makes no sense.
*
* In future this could be fixed in ceph code, so avoid
* using the hardcode here.
*/
max_retry = sizeof_field(struct ceph_mds_request_head, num_retry);
max_retry = 1 << (max_retry * BITS_PER_BYTE);
if (req->r_attempts >= max_retry) {
pr_warn_ratelimited("%s request tid %llu seq overflow\n",
__func__, req->r_tid);
return -EMULTIHOP;
}
req->r_attempts++;
if (req->r_inode) {
......@@ -2663,7 +2684,7 @@ static int __prepare_send_request(struct ceph_mds_session *session,
else
req->r_sent_on_mseq = -1;
}
dout("prepare_send_request %p tid %lld %s (attempt %d)\n", req,
dout("%s %p tid %lld %s (attempt %d)\n", __func__, req,
req->r_tid, ceph_mds_op_name(req->r_op), req->r_attempts);
if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
......@@ -3265,6 +3286,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,
int err = -EINVAL;
void *p = msg->front.iov_base;
void *end = p + msg->front.iov_len;
bool aborted = false;
ceph_decode_need(&p, end, 2*sizeof(u32), bad);
next_mds = ceph_decode_32(&p);
......@@ -3273,16 +3295,41 @@ static void handle_forward(struct ceph_mds_client *mdsc,
mutex_lock(&mdsc->mutex);
req = lookup_get_request(mdsc, tid);
if (!req) {
mutex_unlock(&mdsc->mutex);
dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);
goto out; /* dup reply? */
return; /* dup reply? */
}
if (test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
dout("forward tid %llu aborted, unregistering\n", tid);
__unregister_request(mdsc, req);
} else if (fwd_seq <= req->r_num_fwd) {
dout("forward tid %llu to mds%d - old seq %d <= %d\n",
tid, next_mds, req->r_num_fwd, fwd_seq);
/*
* The type of 'num_fwd' in ceph 'MClientRequestForward'
* is 'int32_t', while in 'ceph_mds_request_head' the
* type is '__u8'. So in case the request bounces between
* MDSes exceeding 256 times, the client will get stuck.
*
* In this case it's ususally a bug in MDS and continue
* bouncing the request makes no sense.
*
* In future this could be fixed in ceph code, so avoid
* using the hardcode here.
*/
int max = sizeof_field(struct ceph_mds_request_head, num_fwd);
max = 1 << (max * BITS_PER_BYTE);
if (req->r_num_fwd >= max) {
mutex_lock(&req->r_fill_mutex);
req->r_err = -EMULTIHOP;
set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
mutex_unlock(&req->r_fill_mutex);
aborted = true;
pr_warn_ratelimited("forward tid %llu seq overflow\n",
tid);
} else {
dout("forward tid %llu to mds%d - old seq %d <= %d\n",
tid, next_mds, req->r_num_fwd, fwd_seq);
}
} else {
/* resend. forward race not possible; mds would drop */
dout("forward tid %llu to mds%d (we resend)\n", tid, next_mds);
......@@ -3294,9 +3341,12 @@ static void handle_forward(struct ceph_mds_client *mdsc,
put_request_session(req);
__do_request(mdsc, req);
}
ceph_mdsc_put_request(req);
out:
mutex_unlock(&mdsc->mutex);
/* kick calling process */
if (aborted)
complete_request(mdsc, req);
ceph_mdsc_put_request(req);
return;
bad:
......@@ -3375,13 +3425,17 @@ static void handle_session(struct ceph_mds_session *session,
}
if (msg_version >= 5) {
u32 flags;
/* version >= 4, struct_v, struct_cv, len, metric_spec */
ceph_decode_skip_n(&p, end, 2 + sizeof(u32) * 2, bad);
u32 flags, len;
/* version >= 4 */
ceph_decode_skip_16(&p, end, bad); /* struct_v, struct_cv */
ceph_decode_32_safe(&p, end, len, bad); /* len */
ceph_decode_skip_n(&p, end, len, bad); /* metric_spec */
/* version >= 5, flags */
ceph_decode_32_safe(&p, end, flags, bad);
ceph_decode_32_safe(&p, end, flags, bad);
if (flags & CEPH_SESSION_BLOCKLISTED) {
pr_warn("mds%d session blocklisted\n", session->s_mds);
pr_warn("mds%d session blocklisted\n", session->s_mds);
blocklisted = true;
}
}
......@@ -4396,12 +4450,6 @@ void ceph_mdsc_lease_send_msg(struct ceph_mds_session *session,
memcpy((void *)(lease + 1) + 4,
dentry->d_name.name, dentry->d_name.len);
spin_unlock(&dentry->d_lock);
/*
* if this is a preemptive lease RELEASE, no need to
* flush request stream, since the actual request will
* soon follow.
*/
msg->more_to_follow = (action == CEPH_MDS_LEASE_RELEASE);
ceph_con_send(&session->s_con, msg);
}
......@@ -4696,15 +4744,17 @@ void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc)
}
/*
* wait for all write mds requests to flush.
* flush the mdlog and wait for all write mds requests to flush.
*/
static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
static void flush_mdlog_and_wait_mdsc_unsafe_requests(struct ceph_mds_client *mdsc,
u64 want_tid)
{
struct ceph_mds_request *req = NULL, *nextreq;
struct ceph_mds_session *last_session = NULL;
struct rb_node *n;
mutex_lock(&mdsc->mutex);
dout("wait_unsafe_requests want %lld\n", want_tid);
dout("%s want %lld\n", __func__, want_tid);
restart:
req = __get_oldest_req(mdsc);
while (req && req->r_tid <= want_tid) {
......@@ -4716,14 +4766,32 @@ static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
nextreq = NULL;
if (req->r_op != CEPH_MDS_OP_SETFILELOCK &&
(req->r_op & CEPH_MDS_OP_WRITE)) {
struct ceph_mds_session *s = req->r_session;
if (!s) {
req = nextreq;
continue;
}
/* write op */
ceph_mdsc_get_request(req);
if (nextreq)
ceph_mdsc_get_request(nextreq);
s = ceph_get_mds_session(s);
mutex_unlock(&mdsc->mutex);
dout("wait_unsafe_requests wait on %llu (want %llu)\n",
/* send flush mdlog request to MDS */
if (last_session != s) {
send_flush_mdlog(s);
ceph_put_mds_session(last_session);
last_session = s;
} else {
ceph_put_mds_session(s);
}
dout("%s wait on %llu (want %llu)\n", __func__,
req->r_tid, want_tid);
wait_for_completion(&req->r_safe_completion);
mutex_lock(&mdsc->mutex);
ceph_mdsc_put_request(req);
if (!nextreq)
......@@ -4738,7 +4806,8 @@ static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
req = nextreq;
}
mutex_unlock(&mdsc->mutex);
dout("wait_unsafe_requests done\n");
ceph_put_mds_session(last_session);
dout("%s done\n", __func__);
}
void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
......@@ -4767,7 +4836,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
dout("sync want tid %lld flush_seq %lld\n",
want_tid, want_flush);
wait_unsafe_requests(mdsc, want_tid);
flush_mdlog_and_wait_mdsc_unsafe_requests(mdsc, want_tid);
wait_caps_flush(mdsc, want_flush);
}
......
......@@ -579,7 +579,7 @@ static inline int ceph_wait_on_async_create(struct inode *inode)
struct ceph_inode_info *ci = ceph_inode(inode);
return wait_on_bit(&ci->i_ceph_flags, CEPH_ASYNC_CREATE_BIT,
TASK_INTERRUPTIBLE);
TASK_KILLABLE);
}
extern u64 ceph_get_deleg_ino(struct ceph_mds_session *session);
......
......@@ -195,9 +195,9 @@ void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc)
/*
* This function walks through the snaprealm for an inode and returns the
* ceph_snap_realm for the first snaprealm that has quotas set (either max_files
* or max_bytes). If the root is reached, return the root ceph_snap_realm
* instead.
* ceph_snap_realm for the first snaprealm that has quotas set (max_files,
* max_bytes, or any, depending on the 'which_quota' argument). If the root is
* reached, return the root ceph_snap_realm instead.
*
* Note that the caller is responsible for calling ceph_put_snap_realm() on the
* returned realm.
......@@ -209,7 +209,9 @@ void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc)
* will be restarted.
*/
static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,
struct inode *inode, bool retry)
struct inode *inode,
enum quota_get_realm which_quota,
bool retry)
{
struct ceph_inode_info *ci = NULL;
struct ceph_snap_realm *realm, *next;
......@@ -248,7 +250,7 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,
}
ci = ceph_inode(in);
has_quota = __ceph_has_any_quota(ci);
has_quota = __ceph_has_quota(ci, which_quota);
iput(in);
next = realm->parent;
......@@ -279,8 +281,8 @@ bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
* dropped and we can then restart the whole operation.
*/
down_read(&mdsc->snap_rwsem);
old_realm = get_quota_realm(mdsc, old, true);
new_realm = get_quota_realm(mdsc, new, false);
old_realm = get_quota_realm(mdsc, old, QUOTA_GET_ANY, true);
new_realm = get_quota_realm(mdsc, new, QUOTA_GET_ANY, false);
if (PTR_ERR(new_realm) == -EAGAIN) {
up_read(&mdsc->snap_rwsem);
if (old_realm)
......@@ -483,7 +485,8 @@ bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf)
bool is_updated = false;
down_read(&mdsc->snap_rwsem);
realm = get_quota_realm(mdsc, d_inode(fsc->sb->s_root), true);
realm = get_quota_realm(mdsc, d_inode(fsc->sb->s_root),
QUOTA_GET_MAX_BYTES, true);
up_read(&mdsc->snap_rwsem);
if (!realm)
return false;
......
......@@ -1119,6 +1119,7 @@ static int ceph_set_super(struct super_block *s, struct fs_context *fc)
s->s_time_gran = 1;
s->s_time_min = 0;
s->s_time_max = U32_MAX;
s->s_flags |= SB_NODIRATIME | SB_NOATIME;
ret = set_anon_super_fc(s, fc);
if (ret != 0)
......
......@@ -1022,6 +1022,7 @@ static inline void ceph_queue_flush_snaps(struct inode *inode)
ceph_queue_inode_work(inode, CEPH_I_WORK_FLUSH_SNAPS);
}
extern int ceph_try_to_choose_auth_mds(struct inode *inode, int mask);
extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
int mask, bool force);
static inline int ceph_do_getattr(struct inode *inode, int mask, bool force)
......@@ -1278,9 +1279,29 @@ extern void ceph_fs_debugfs_init(struct ceph_fs_client *client);
extern void ceph_fs_debugfs_cleanup(struct ceph_fs_client *client);
/* quota.c */
static inline bool __ceph_has_any_quota(struct ceph_inode_info *ci)
enum quota_get_realm {
QUOTA_GET_MAX_FILES,
QUOTA_GET_MAX_BYTES,
QUOTA_GET_ANY
};
static inline bool __ceph_has_quota(struct ceph_inode_info *ci,
enum quota_get_realm which)
{
return ci->i_max_files || ci->i_max_bytes;
bool has_quota = false;
switch (which) {
case QUOTA_GET_MAX_BYTES:
has_quota = !!ci->i_max_bytes;
break;
case QUOTA_GET_MAX_FILES:
has_quota = !!ci->i_max_files;
break;
default:
has_quota = !!(ci->i_max_files || ci->i_max_bytes);
}
return has_quota;
}
extern void ceph_adjust_quota_realms_count(struct inode *inode, bool inc);
......@@ -1289,10 +1310,10 @@ static inline void __ceph_update_quota(struct ceph_inode_info *ci,
u64 max_bytes, u64 max_files)
{
bool had_quota, has_quota;
had_quota = __ceph_has_any_quota(ci);
had_quota = __ceph_has_quota(ci, QUOTA_GET_ANY);
ci->i_max_bytes = max_bytes;
ci->i_max_files = max_files;
has_quota = __ceph_has_any_quota(ci);
has_quota = __ceph_has_quota(ci, QUOTA_GET_ANY);
if (had_quota != has_quota)
ceph_adjust_quota_realms_count(&ci->vfs_inode, has_quota);
......
......@@ -366,6 +366,14 @@ static ssize_t ceph_vxattrcb_auth_mds(struct ceph_inode_info *ci,
}
#define XATTR_RSTAT_FIELD(_type, _name) \
XATTR_NAME_CEPH(_type, _name, VXATTR_FLAG_RSTAT)
#define XATTR_RSTAT_FIELD_UPDATABLE(_type, _name) \
{ \
.name = CEPH_XATTR_NAME(_type, _name), \
.name_size = sizeof (CEPH_XATTR_NAME(_type, _name)), \
.getxattr_cb = ceph_vxattrcb_ ## _type ## _ ## _name, \
.exists_cb = NULL, \
.flags = VXATTR_FLAG_RSTAT, \
}
#define XATTR_LAYOUT_FIELD(_type, _name, _field) \
{ \
.name = CEPH_XATTR_NAME2(_type, _name, _field), \
......@@ -404,7 +412,7 @@ static struct ceph_vxattr ceph_dir_vxattrs[] = {
XATTR_RSTAT_FIELD(dir, rsubdirs),
XATTR_RSTAT_FIELD(dir, rsnaps),
XATTR_RSTAT_FIELD(dir, rbytes),
XATTR_RSTAT_FIELD(dir, rctime),
XATTR_RSTAT_FIELD_UPDATABLE(dir, rctime),
{
.name = "ceph.dir.pin",
.name_size = sizeof("ceph.dir.pin"),
......
......@@ -906,7 +906,6 @@ int crush_do_rule(const struct crush_map *map,
int recurse_to_leaf;
int wsize = 0;
int osize;
int *tmp;
const struct crush_rule *rule;
__u32 step;
int i, j;
......@@ -1073,9 +1072,7 @@ int crush_do_rule(const struct crush_map *map,
memcpy(o, c, osize*sizeof(*o));
/* swap o and w arrays */
tmp = o;
o = w;
w = tmp;
swap(o, w);
wsize = osize;
break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment