Commit 2759e05c authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.2-rc4' of git://github.com/ceph/ceph-client

Pull ceph fixes from Ilya Dryomov:
 "A change to call iput() asynchronously to avoid a possible deadlock
  when iput_final() needs to wait for in-flight I/O (e.g. readahead) and
  a fixup for a cleanup that went into -rc1"

* tag 'ceph-for-5.2-rc4' of git://github.com/ceph/ceph-client:
  ceph: fix error handling in ceph_get_caps()
  ceph: avoid iput_final() while holding mutex or in dispatch thread
  ceph: single workqueue for inode related works
parents 8e61f6f7 7b2f936f
...@@ -2738,15 +2738,13 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, ...@@ -2738,15 +2738,13 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
_got = 0; _got = 0;
ret = try_get_cap_refs(ci, need, want, endoff, ret = try_get_cap_refs(ci, need, want, endoff,
false, &_got); false, &_got);
if (ret == -EAGAIN) { if (ret == -EAGAIN)
continue; continue;
} else if (!ret) { if (!ret) {
int err;
DEFINE_WAIT_FUNC(wait, woken_wake_function); DEFINE_WAIT_FUNC(wait, woken_wake_function);
add_wait_queue(&ci->i_cap_wq, &wait); add_wait_queue(&ci->i_cap_wq, &wait);
while (!(err = try_get_cap_refs(ci, need, want, endoff, while (!(ret = try_get_cap_refs(ci, need, want, endoff,
true, &_got))) { true, &_got))) {
if (signal_pending(current)) { if (signal_pending(current)) {
ret = -ERESTARTSYS; ret = -ERESTARTSYS;
...@@ -2756,14 +2754,16 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, ...@@ -2756,14 +2754,16 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
} }
remove_wait_queue(&ci->i_cap_wq, &wait); remove_wait_queue(&ci->i_cap_wq, &wait);
if (err == -EAGAIN) if (ret == -EAGAIN)
continue; continue;
} }
if (ret < 0) {
if (ret == -ESTALE) { if (ret == -ESTALE) {
/* session was killed, try renew caps */ /* session was killed, try renew caps */
ret = ceph_renew_caps(&ci->vfs_inode); ret = ceph_renew_caps(&ci->vfs_inode);
if (ret == 0) if (ret == 0)
continue; continue;
}
return ret; return ret;
} }
...@@ -2992,8 +2992,10 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, ...@@ -2992,8 +2992,10 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
} }
if (complete_capsnap) if (complete_capsnap)
wake_up_all(&ci->i_cap_wq); wake_up_all(&ci->i_cap_wq);
while (put-- > 0) while (put-- > 0) {
iput(inode); /* avoid calling iput_final() in osd dispatch threads */
ceph_async_iput(inode);
}
} }
/* /*
...@@ -3964,8 +3966,9 @@ void ceph_handle_caps(struct ceph_mds_session *session, ...@@ -3964,8 +3966,9 @@ void ceph_handle_caps(struct ceph_mds_session *session,
done: done:
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
done_unlocked: done_unlocked:
iput(inode);
ceph_put_string(extra_info.pool_ns); ceph_put_string(extra_info.pool_ns);
/* avoid calling iput_final() in mds dispatch threads */
ceph_async_iput(inode);
return; return;
flush_cap_releases: flush_cap_releases:
...@@ -4011,7 +4014,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc) ...@@ -4011,7 +4014,8 @@ void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
if (inode) { if (inode) {
dout("check_delayed_caps on %p\n", inode); dout("check_delayed_caps on %p\n", inode);
ceph_check_caps(ci, flags, NULL); ceph_check_caps(ci, flags, NULL);
iput(inode); /* avoid calling iput_final() in tick thread */
ceph_async_iput(inode);
} }
} }
spin_unlock(&mdsc->cap_delay_lock); spin_unlock(&mdsc->cap_delay_lock);
......
...@@ -791,7 +791,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) ...@@ -791,7 +791,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
if (aio_work) { if (aio_work) {
INIT_WORK(&aio_work->work, ceph_aio_retry_work); INIT_WORK(&aio_work->work, ceph_aio_retry_work);
aio_work->req = req; aio_work->req = req;
queue_work(ceph_inode_to_client(inode)->wb_wq, queue_work(ceph_inode_to_client(inode)->inode_wq,
&aio_work->work); &aio_work->work);
return; return;
} }
......
...@@ -33,9 +33,7 @@ ...@@ -33,9 +33,7 @@
static const struct inode_operations ceph_symlink_iops; static const struct inode_operations ceph_symlink_iops;
static void ceph_invalidate_work(struct work_struct *work); static void ceph_inode_work(struct work_struct *work);
static void ceph_writeback_work(struct work_struct *work);
static void ceph_vmtruncate_work(struct work_struct *work);
/* /*
* find or create an inode, given the ceph ino number * find or create an inode, given the ceph ino number
...@@ -509,10 +507,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ...@@ -509,10 +507,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
INIT_LIST_HEAD(&ci->i_snap_realm_item); INIT_LIST_HEAD(&ci->i_snap_realm_item);
INIT_LIST_HEAD(&ci->i_snap_flush_item); INIT_LIST_HEAD(&ci->i_snap_flush_item);
INIT_WORK(&ci->i_wb_work, ceph_writeback_work); INIT_WORK(&ci->i_work, ceph_inode_work);
INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work); ci->i_work_mask = 0;
INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
ceph_fscache_inode_init(ci); ceph_fscache_inode_init(ci);
...@@ -1480,7 +1476,8 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, ...@@ -1480,7 +1476,8 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,
pr_err("fill_inode badness on %p got %d\n", in, rc); pr_err("fill_inode badness on %p got %d\n", in, rc);
err = rc; err = rc;
} }
iput(in); /* avoid calling iput_final() in mds dispatch threads */
ceph_async_iput(in);
} }
return err; return err;
...@@ -1678,8 +1675,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1678,8 +1675,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
&req->r_caps_reservation); &req->r_caps_reservation);
if (ret < 0) { if (ret < 0) {
pr_err("fill_inode badness on %p\n", in); pr_err("fill_inode badness on %p\n", in);
if (d_really_is_negative(dn)) if (d_really_is_negative(dn)) {
iput(in); /* avoid calling iput_final() in mds
* dispatch threads */
ceph_async_iput(in);
}
d_drop(dn); d_drop(dn);
err = ret; err = ret;
goto next_item; goto next_item;
...@@ -1689,7 +1689,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ...@@ -1689,7 +1689,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
if (ceph_security_xattr_deadlock(in)) { if (ceph_security_xattr_deadlock(in)) {
dout(" skip splicing dn %p to inode %p" dout(" skip splicing dn %p to inode %p"
" (security xattr deadlock)\n", dn, in); " (security xattr deadlock)\n", dn, in);
iput(in); ceph_async_iput(in);
skipped++; skipped++;
goto next_item; goto next_item;
} }
...@@ -1740,57 +1740,87 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size) ...@@ -1740,57 +1740,87 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size)
return ret; return ret;
} }
/*
* Put reference to inode, but avoid calling iput_final() in current thread.
* iput_final() may wait for reahahead pages. The wait can cause deadlock in
* some contexts.
*/
void ceph_async_iput(struct inode *inode)
{
if (!inode)
return;
for (;;) {
if (atomic_add_unless(&inode->i_count, -1, 1))
break;
if (queue_work(ceph_inode_to_client(inode)->inode_wq,
&ceph_inode(inode)->i_work))
break;
/* queue work failed, i_count must be at least 2 */
}
}
/* /*
* Write back inode data in a worker thread. (This can't be done * Write back inode data in a worker thread. (This can't be done
* in the message handler context.) * in the message handler context.)
*/ */
void ceph_queue_writeback(struct inode *inode) void ceph_queue_writeback(struct inode *inode)
{ {
struct ceph_inode_info *ci = ceph_inode(inode);
set_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask);
ihold(inode); ihold(inode);
if (queue_work(ceph_inode_to_client(inode)->wb_wq, if (queue_work(ceph_inode_to_client(inode)->inode_wq,
&ceph_inode(inode)->i_wb_work)) { &ci->i_work)) {
dout("ceph_queue_writeback %p\n", inode); dout("ceph_queue_writeback %p\n", inode);
} else { } else {
dout("ceph_queue_writeback %p failed\n", inode); dout("ceph_queue_writeback %p already queued, mask=%lx\n",
inode, ci->i_work_mask);
iput(inode); iput(inode);
} }
} }
static void ceph_writeback_work(struct work_struct *work)
{
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
i_wb_work);
struct inode *inode = &ci->vfs_inode;
dout("writeback %p\n", inode);
filemap_fdatawrite(&inode->i_data);
iput(inode);
}
/* /*
* queue an async invalidation * queue an async invalidation
*/ */
void ceph_queue_invalidate(struct inode *inode) void ceph_queue_invalidate(struct inode *inode)
{ {
struct ceph_inode_info *ci = ceph_inode(inode);
set_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask);
ihold(inode); ihold(inode);
if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq, if (queue_work(ceph_inode_to_client(inode)->inode_wq,
&ceph_inode(inode)->i_pg_inv_work)) { &ceph_inode(inode)->i_work)) {
dout("ceph_queue_invalidate %p\n", inode); dout("ceph_queue_invalidate %p\n", inode);
} else { } else {
dout("ceph_queue_invalidate %p failed\n", inode); dout("ceph_queue_invalidate %p already queued, mask=%lx\n",
inode, ci->i_work_mask);
iput(inode); iput(inode);
} }
} }
/* /*
* Invalidate inode pages in a worker thread. (This can't be done * Queue an async vmtruncate. If we fail to queue work, we will handle
* in the message handler context.) * the truncation the next time we call __ceph_do_pending_vmtruncate.
*/ */
static void ceph_invalidate_work(struct work_struct *work) void ceph_queue_vmtruncate(struct inode *inode)
{ {
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info, struct ceph_inode_info *ci = ceph_inode(inode);
i_pg_inv_work); set_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask);
struct inode *inode = &ci->vfs_inode;
ihold(inode);
if (queue_work(ceph_inode_to_client(inode)->inode_wq,
&ci->i_work)) {
dout("ceph_queue_vmtruncate %p\n", inode);
} else {
dout("ceph_queue_vmtruncate %p already queued, mask=%lx\n",
inode, ci->i_work_mask);
iput(inode);
}
}
static void ceph_do_invalidate_pages(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
u32 orig_gen; u32 orig_gen;
int check = 0; int check = 0;
...@@ -1842,44 +1872,6 @@ static void ceph_invalidate_work(struct work_struct *work) ...@@ -1842,44 +1872,6 @@ static void ceph_invalidate_work(struct work_struct *work)
out: out:
if (check) if (check)
ceph_check_caps(ci, 0, NULL); ceph_check_caps(ci, 0, NULL);
iput(inode);
}
/*
* called by trunc_wq;
*
* We also truncate in a separate thread as well.
*/
static void ceph_vmtruncate_work(struct work_struct *work)
{
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
i_vmtruncate_work);
struct inode *inode = &ci->vfs_inode;
dout("vmtruncate_work %p\n", inode);
__ceph_do_pending_vmtruncate(inode);
iput(inode);
}
/*
* Queue an async vmtruncate. If we fail to queue work, we will handle
* the truncation the next time we call __ceph_do_pending_vmtruncate.
*/
void ceph_queue_vmtruncate(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
ihold(inode);
if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
&ci->i_vmtruncate_work)) {
dout("ceph_queue_vmtruncate %p\n", inode);
} else {
dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
inode, ci->i_truncate_pending);
iput(inode);
}
} }
/* /*
...@@ -1943,6 +1935,25 @@ void __ceph_do_pending_vmtruncate(struct inode *inode) ...@@ -1943,6 +1935,25 @@ void __ceph_do_pending_vmtruncate(struct inode *inode)
wake_up_all(&ci->i_cap_wq); wake_up_all(&ci->i_cap_wq);
} }
static void ceph_inode_work(struct work_struct *work)
{
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
i_work);
struct inode *inode = &ci->vfs_inode;
if (test_and_clear_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask)) {
dout("writeback %p\n", inode);
filemap_fdatawrite(&inode->i_data);
}
if (test_and_clear_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask))
ceph_do_invalidate_pages(inode);
if (test_and_clear_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask))
__ceph_do_pending_vmtruncate(inode);
iput(inode);
}
/* /*
* symlinks * symlinks
*/ */
......
...@@ -690,11 +690,12 @@ void ceph_mdsc_release_request(struct kref *kref) ...@@ -690,11 +690,12 @@ void ceph_mdsc_release_request(struct kref *kref)
ceph_msg_put(req->r_reply); ceph_msg_put(req->r_reply);
if (req->r_inode) { if (req->r_inode) {
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN); ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
iput(req->r_inode); /* avoid calling iput_final() in mds dispatch threads */
ceph_async_iput(req->r_inode);
} }
if (req->r_parent) if (req->r_parent)
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN); ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
iput(req->r_target_inode); ceph_async_iput(req->r_target_inode);
if (req->r_dentry) if (req->r_dentry)
dput(req->r_dentry); dput(req->r_dentry);
if (req->r_old_dentry) if (req->r_old_dentry)
...@@ -708,7 +709,7 @@ void ceph_mdsc_release_request(struct kref *kref) ...@@ -708,7 +709,7 @@ void ceph_mdsc_release_request(struct kref *kref)
*/ */
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir), ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN); CEPH_CAP_PIN);
iput(req->r_old_dentry_dir); ceph_async_iput(req->r_old_dentry_dir);
} }
kfree(req->r_path1); kfree(req->r_path1);
kfree(req->r_path2); kfree(req->r_path2);
...@@ -818,7 +819,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc, ...@@ -818,7 +819,8 @@ static void __unregister_request(struct ceph_mds_client *mdsc,
} }
if (req->r_unsafe_dir) { if (req->r_unsafe_dir) {
iput(req->r_unsafe_dir); /* avoid calling iput_final() in mds dispatch threads */
ceph_async_iput(req->r_unsafe_dir);
req->r_unsafe_dir = NULL; req->r_unsafe_dir = NULL;
} }
...@@ -983,7 +985,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc, ...@@ -983,7 +985,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
if (!cap) { if (!cap) {
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
iput(inode); ceph_async_iput(inode);
goto random; goto random;
} }
mds = cap->session->s_mds; mds = cap->session->s_mds;
...@@ -992,7 +994,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc, ...@@ -992,7 +994,9 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
cap == ci->i_auth_cap ? "auth " : "", cap); cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
out: out:
iput(inode); /* avoid calling iput_final() while holding mdsc->mutex or
* in mds dispatch threads */
ceph_async_iput(inode);
return mds; return mds;
random: random:
...@@ -1302,7 +1306,9 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, ...@@ -1302,7 +1306,9 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
if (last_inode) { if (last_inode) {
iput(last_inode); /* avoid calling iput_final() while holding
* s_mutex or in mds dispatch threads */
ceph_async_iput(last_inode);
last_inode = NULL; last_inode = NULL;
} }
if (old_cap) { if (old_cap) {
...@@ -1335,7 +1341,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session, ...@@ -1335,7 +1341,7 @@ int ceph_iterate_session_caps(struct ceph_mds_session *session,
session->s_cap_iterator = NULL; session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
iput(last_inode); ceph_async_iput(last_inode);
if (old_cap) if (old_cap)
ceph_put_cap(session->s_mdsc, old_cap); ceph_put_cap(session->s_mdsc, old_cap);
...@@ -1471,7 +1477,8 @@ static void remove_session_caps(struct ceph_mds_session *session) ...@@ -1471,7 +1477,8 @@ static void remove_session_caps(struct ceph_mds_session *session)
spin_unlock(&session->s_cap_lock); spin_unlock(&session->s_cap_lock);
inode = ceph_find_inode(sb, vino); inode = ceph_find_inode(sb, vino);
iput(inode); /* avoid calling iput_final() while holding s_mutex */
ceph_async_iput(inode);
spin_lock(&session->s_cap_lock); spin_lock(&session->s_cap_lock);
} }
...@@ -3912,8 +3919,9 @@ static void handle_lease(struct ceph_mds_client *mdsc, ...@@ -3912,8 +3919,9 @@ static void handle_lease(struct ceph_mds_client *mdsc,
ceph_con_send(&session->s_con, msg); ceph_con_send(&session->s_con, msg);
out: out:
iput(inode);
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
/* avoid calling iput_final() in mds dispatch threads */
ceph_async_iput(inode);
return; return;
bad: bad:
......
...@@ -74,7 +74,8 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc, ...@@ -74,7 +74,8 @@ void ceph_handle_quota(struct ceph_mds_client *mdsc,
le64_to_cpu(h->max_files)); le64_to_cpu(h->max_files));
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
iput(inode); /* avoid calling iput_final() in dispatch thread */
ceph_async_iput(inode);
} }
static struct ceph_quotarealm_inode * static struct ceph_quotarealm_inode *
...@@ -235,7 +236,8 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc, ...@@ -235,7 +236,8 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,
ci = ceph_inode(in); ci = ceph_inode(in);
has_quota = __ceph_has_any_quota(ci); has_quota = __ceph_has_any_quota(ci);
iput(in); /* avoid calling iput_final() while holding mdsc->snap_rwsem */
ceph_async_iput(in);
next = realm->parent; next = realm->parent;
if (has_quota || !next) if (has_quota || !next)
...@@ -372,7 +374,8 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op, ...@@ -372,7 +374,8 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op,
pr_warn("Invalid quota check op (%d)\n", op); pr_warn("Invalid quota check op (%d)\n", op);
exceeded = true; /* Just break the loop */ exceeded = true; /* Just break the loop */
} }
iput(in); /* avoid calling iput_final() while holding mdsc->snap_rwsem */
ceph_async_iput(in);
next = realm->parent; next = realm->parent;
if (exceeded || !next) if (exceeded || !next)
......
...@@ -648,13 +648,15 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm) ...@@ -648,13 +648,15 @@ static void queue_realm_cap_snaps(struct ceph_snap_realm *realm)
if (!inode) if (!inode)
continue; continue;
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
iput(lastinode); /* avoid calling iput_final() while holding
* mdsc->snap_rwsem or in mds dispatch threads */
ceph_async_iput(lastinode);
lastinode = inode; lastinode = inode;
ceph_queue_cap_snap(ci); ceph_queue_cap_snap(ci);
spin_lock(&realm->inodes_with_caps_lock); spin_lock(&realm->inodes_with_caps_lock);
} }
spin_unlock(&realm->inodes_with_caps_lock); spin_unlock(&realm->inodes_with_caps_lock);
iput(lastinode); ceph_async_iput(lastinode);
dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino); dout("queue_realm_cap_snaps %p %llx done\n", realm, realm->ino);
} }
...@@ -806,7 +808,9 @@ static void flush_snaps(struct ceph_mds_client *mdsc) ...@@ -806,7 +808,9 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
ihold(inode); ihold(inode);
spin_unlock(&mdsc->snap_flush_lock); spin_unlock(&mdsc->snap_flush_lock);
ceph_flush_snaps(ci, &session); ceph_flush_snaps(ci, &session);
iput(inode); /* avoid calling iput_final() while holding
* session->s_mutex or in mds dispatch threads */
ceph_async_iput(inode);
spin_lock(&mdsc->snap_flush_lock); spin_lock(&mdsc->snap_flush_lock);
} }
spin_unlock(&mdsc->snap_flush_lock); spin_unlock(&mdsc->snap_flush_lock);
...@@ -950,12 +954,14 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc, ...@@ -950,12 +954,14 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
ceph_get_snap_realm(mdsc, realm); ceph_get_snap_realm(mdsc, realm);
ceph_put_snap_realm(mdsc, oldrealm); ceph_put_snap_realm(mdsc, oldrealm);
iput(inode); /* avoid calling iput_final() while holding
* mdsc->snap_rwsem or mds in dispatch threads */
ceph_async_iput(inode);
continue; continue;
skip_inode: skip_inode:
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
iput(inode); ceph_async_iput(inode);
} }
/* we may have taken some of the old realm's children. */ /* we may have taken some of the old realm's children. */
......
...@@ -672,18 +672,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, ...@@ -672,18 +672,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
* The number of concurrent works can be high but they don't need * The number of concurrent works can be high but they don't need
* to be processed in parallel, limit concurrency. * to be processed in parallel, limit concurrency.
*/ */
fsc->wb_wq = alloc_workqueue("ceph-writeback", 0, 1); fsc->inode_wq = alloc_workqueue("ceph-inode", WQ_UNBOUND, 0);
if (!fsc->wb_wq) if (!fsc->inode_wq)
goto fail_client; goto fail_client;
fsc->pg_inv_wq = alloc_workqueue("ceph-pg-invalid", 0, 1);
if (!fsc->pg_inv_wq)
goto fail_wb_wq;
fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
if (!fsc->trunc_wq)
goto fail_pg_inv_wq;
fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1); fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1);
if (!fsc->cap_wq) if (!fsc->cap_wq)
goto fail_trunc_wq; goto fail_inode_wq;
/* set up mempools */ /* set up mempools */
err = -ENOMEM; err = -ENOMEM;
...@@ -697,12 +691,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, ...@@ -697,12 +691,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
fail_cap_wq: fail_cap_wq:
destroy_workqueue(fsc->cap_wq); destroy_workqueue(fsc->cap_wq);
fail_trunc_wq: fail_inode_wq:
destroy_workqueue(fsc->trunc_wq); destroy_workqueue(fsc->inode_wq);
fail_pg_inv_wq:
destroy_workqueue(fsc->pg_inv_wq);
fail_wb_wq:
destroy_workqueue(fsc->wb_wq);
fail_client: fail_client:
ceph_destroy_client(fsc->client); ceph_destroy_client(fsc->client);
fail: fail:
...@@ -715,9 +705,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt, ...@@ -715,9 +705,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
static void flush_fs_workqueues(struct ceph_fs_client *fsc) static void flush_fs_workqueues(struct ceph_fs_client *fsc)
{ {
flush_workqueue(fsc->wb_wq); flush_workqueue(fsc->inode_wq);
flush_workqueue(fsc->pg_inv_wq);
flush_workqueue(fsc->trunc_wq);
flush_workqueue(fsc->cap_wq); flush_workqueue(fsc->cap_wq);
} }
...@@ -725,9 +713,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) ...@@ -725,9 +713,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
{ {
dout("destroy_fs_client %p\n", fsc); dout("destroy_fs_client %p\n", fsc);
destroy_workqueue(fsc->wb_wq); destroy_workqueue(fsc->inode_wq);
destroy_workqueue(fsc->pg_inv_wq);
destroy_workqueue(fsc->trunc_wq);
destroy_workqueue(fsc->cap_wq); destroy_workqueue(fsc->cap_wq);
mempool_destroy(fsc->wb_pagevec_pool); mempool_destroy(fsc->wb_pagevec_pool);
......
...@@ -109,9 +109,7 @@ struct ceph_fs_client { ...@@ -109,9 +109,7 @@ struct ceph_fs_client {
mempool_t *wb_pagevec_pool; mempool_t *wb_pagevec_pool;
atomic_long_t writeback_count; atomic_long_t writeback_count;
struct workqueue_struct *wb_wq; struct workqueue_struct *inode_wq;
struct workqueue_struct *pg_inv_wq;
struct workqueue_struct *trunc_wq;
struct workqueue_struct *cap_wq; struct workqueue_struct *cap_wq;
#ifdef CONFIG_DEBUG_FS #ifdef CONFIG_DEBUG_FS
...@@ -387,10 +385,8 @@ struct ceph_inode_info { ...@@ -387,10 +385,8 @@ struct ceph_inode_info {
struct list_head i_snap_realm_item; struct list_head i_snap_realm_item;
struct list_head i_snap_flush_item; struct list_head i_snap_flush_item;
struct work_struct i_wb_work; /* writeback work */ struct work_struct i_work;
struct work_struct i_pg_inv_work; /* page invalidation work */ unsigned long i_work_mask;
struct work_struct i_vmtruncate_work;
#ifdef CONFIG_CEPH_FSCACHE #ifdef CONFIG_CEPH_FSCACHE
struct fscache_cookie *fscache; struct fscache_cookie *fscache;
...@@ -512,6 +508,13 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, ...@@ -512,6 +508,13 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
#define CEPH_I_ERROR_FILELOCK (1 << 12) /* have seen file lock errors */ #define CEPH_I_ERROR_FILELOCK (1 << 12) /* have seen file lock errors */
/*
* Masks of ceph inode work.
*/
#define CEPH_I_WORK_WRITEBACK 0 /* writeback */
#define CEPH_I_WORK_INVALIDATE_PAGES 1 /* invalidate pages */
#define CEPH_I_WORK_VMTRUNCATE 2 /* vmtruncate */
/* /*
* We set the ERROR_WRITE bit when we start seeing write errors on an inode * We set the ERROR_WRITE bit when we start seeing write errors on an inode
* and then clear it when they start succeeding. Note that we do a lockless * and then clear it when they start succeeding. Note that we do a lockless
...@@ -896,9 +899,9 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask); ...@@ -896,9 +899,9 @@ extern int ceph_inode_holds_cap(struct inode *inode, int mask);
extern bool ceph_inode_set_size(struct inode *inode, loff_t size); extern bool ceph_inode_set_size(struct inode *inode, loff_t size);
extern void __ceph_do_pending_vmtruncate(struct inode *inode); extern void __ceph_do_pending_vmtruncate(struct inode *inode);
extern void ceph_queue_vmtruncate(struct inode *inode); extern void ceph_queue_vmtruncate(struct inode *inode);
extern void ceph_queue_invalidate(struct inode *inode); extern void ceph_queue_invalidate(struct inode *inode);
extern void ceph_queue_writeback(struct inode *inode); extern void ceph_queue_writeback(struct inode *inode);
extern void ceph_async_iput(struct inode *inode);
extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page, extern int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
int mask, bool force); int mask, bool force);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment