Commit e3ec8d68 authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: send cap releases more aggressively

When pending cap releases fill up one message, start a work to send
cap release message. (old way is sending cap releases every 5 seconds)
Signed-off-by: default avatar"Yan, Zheng" <zyan@redhat.com>
Reviewed-by: default avatarJeff Layton <jlayton@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 08796873
......@@ -1081,9 +1081,7 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
(!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
cap->queue_release = 1;
if (removed) {
list_add_tail(&cap->session_caps,
&session->s_cap_releases);
session->s_num_cap_releases++;
__ceph_queue_cap_release(session, cap);
removed = 0;
}
} else {
......@@ -1245,7 +1243,7 @@ static int send_cap_msg(struct cap_msg_args *arg)
* Queue cap releases when an inode is dropped from our cache. Since
* inode is about to be destroyed, there is no need for i_ceph_lock.
*/
void ceph_queue_caps_release(struct inode *inode)
void __ceph_remove_caps(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct rb_node *p;
......@@ -3886,12 +3884,10 @@ void ceph_handle_caps(struct ceph_mds_session *session,
cap->seq = seq;
cap->issue_seq = seq;
spin_lock(&session->s_cap_lock);
list_add_tail(&cap->session_caps,
&session->s_cap_releases);
session->s_num_cap_releases++;
__ceph_queue_cap_release(session, cap);
spin_unlock(&session->s_cap_lock);
}
goto flush_cap_releases;
goto done;
}
/* these will work even if we don't have a cap yet */
......@@ -3961,7 +3957,12 @@ void ceph_handle_caps(struct ceph_mds_session *session,
ceph_cap_op_name(op));
}
goto done;
done:
mutex_unlock(&session->s_mutex);
done_unlocked:
iput(inode);
ceph_put_string(extra_info.pool_ns);
return;
flush_cap_releases:
/*
......@@ -3969,14 +3970,8 @@ void ceph_handle_caps(struct ceph_mds_session *session,
* along for the mds (who clearly thinks we still have this
* cap).
*/
ceph_send_cap_releases(mdsc, session);
done:
mutex_unlock(&session->s_mutex);
done_unlocked:
iput(inode);
ceph_put_string(extra_info.pool_ns);
return;
ceph_flush_cap_releases(mdsc, session);
goto done;
bad:
pr_err("ceph_handle_caps: corrupt message\n");
......
......@@ -537,7 +537,7 @@ void ceph_destroy_inode(struct inode *inode)
ceph_fscache_unregister_inode_cookie(ci);
ceph_queue_caps_release(inode);
__ceph_remove_caps(inode);
if (__ceph_has_any_quota(ci))
ceph_adjust_quota_realms_count(inode, false);
......
......@@ -57,6 +57,7 @@ struct ceph_reconnect_state {
static void __wake_requests(struct ceph_mds_client *mdsc,
struct list_head *head);
static void ceph_cap_release_work(struct work_struct *work);
static const struct ceph_connection_operations mds_con_ops;
......@@ -636,6 +637,8 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
s->s_cap_reconnect = 0;
s->s_cap_iterator = NULL;
INIT_LIST_HEAD(&s->s_cap_releases);
INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
INIT_LIST_HEAD(&s->s_cap_flushing);
mdsc->sessions[mds] = s;
......@@ -661,6 +664,7 @@ static void __unregister_session(struct ceph_mds_client *mdsc,
dout("__unregister_session mds%d %p\n", s->s_mds, s);
BUG_ON(mdsc->sessions[s->s_mds] != s);
mdsc->sessions[s->s_mds] = NULL;
s->s_state = 0;
ceph_con_close(&s->s_con);
ceph_put_mds_session(s);
atomic_dec(&mdsc->num_sessions);
......@@ -1323,14 +1327,11 @@ static int iterate_session_caps(struct ceph_mds_session *session,
cap->session = NULL;
list_del_init(&cap->session_caps);
session->s_nr_caps--;
if (cap->queue_release) {
list_add_tail(&cap->session_caps,
&session->s_cap_releases);
session->s_num_cap_releases++;
} else {
if (cap->queue_release)
__ceph_queue_cap_release(session, cap);
else
old_cap = cap; /* put_cap it w/o locks held */
}
}
if (ret < 0)
goto out;
}
......@@ -1764,7 +1765,7 @@ int ceph_trim_caps(struct ceph_mds_client *mdsc,
session->s_trim_caps = 0;
}
ceph_send_cap_releases(mdsc, session);
ceph_flush_cap_releases(mdsc, session);
return 0;
}
......@@ -1807,7 +1808,7 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc,
/*
* called under s_mutex
*/
void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_msg *msg = NULL;
......@@ -1900,6 +1901,48 @@ void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
spin_unlock(&session->s_cap_lock);
}
static void ceph_cap_release_work(struct work_struct *work)
{
struct ceph_mds_session *session =
container_of(work, struct ceph_mds_session, s_cap_release_work);
mutex_lock(&session->s_mutex);
if (session->s_state == CEPH_MDS_SESSION_OPEN ||
session->s_state == CEPH_MDS_SESSION_HUNG)
ceph_send_cap_releases(session->s_mdsc, session);
mutex_unlock(&session->s_mutex);
ceph_put_mds_session(session);
}
void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
if (mdsc->stopping)
return;
get_session(session);
if (queue_work(mdsc->fsc->cap_wq,
&session->s_cap_release_work)) {
dout("cap release work queued\n");
} else {
ceph_put_mds_session(session);
dout("failed to queue cap release work\n");
}
}
/*
* caller holds session->s_cap_lock
*/
void __ceph_queue_cap_release(struct ceph_mds_session *session,
struct ceph_cap *cap)
{
list_add_tail(&cap->session_caps, &session->s_cap_releases);
session->s_num_cap_releases++;
if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
ceph_flush_cap_releases(session->s_mdsc, session);
}
/*
* requests
*/
......
......@@ -172,12 +172,13 @@ struct ceph_mds_session {
/* protected by s_cap_lock */
spinlock_t s_cap_lock;
struct list_head s_caps; /* all caps issued by this session */
struct ceph_cap *s_cap_iterator;
int s_nr_caps, s_trim_caps;
int s_num_cap_releases;
int s_cap_reconnect;
int s_readonly;
struct list_head s_cap_releases; /* waiting cap_release messages */
struct ceph_cap *s_cap_iterator;
struct work_struct s_cap_release_work;
/* protected by mutex */
struct list_head s_cap_flushing; /* inodes w/ flushing caps */
......@@ -457,9 +458,10 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req)
kref_put(&req->r_kref, ceph_mdsc_release_request);
}
extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
extern void __ceph_queue_cap_release(struct ceph_mds_session *session,
struct ceph_cap *cap);
extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
extern void ceph_mdsc_pre_umount(struct ceph_mds_client *mdsc);
extern char *ceph_mdsc_build_path(struct dentry *dentry, int *plen, u64 *base,
......
......@@ -671,6 +671,9 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
if (!fsc->trunc_wq)
goto fail_pg_inv_wq;
fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1);
if (!fsc->cap_wq)
goto fail_trunc_wq;
/* set up mempools */
err = -ENOMEM;
......@@ -678,13 +681,15 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
size = sizeof (struct page *) * (page_count ? page_count : 1);
fsc->wb_pagevec_pool = mempool_create_kmalloc_pool(10, size);
if (!fsc->wb_pagevec_pool)
goto fail_trunc_wq;
goto fail_cap_wq;
/* caps */
fsc->min_caps = fsopt->max_readdir;
return fsc;
fail_cap_wq:
destroy_workqueue(fsc->cap_wq);
fail_trunc_wq:
destroy_workqueue(fsc->trunc_wq);
fail_pg_inv_wq:
......@@ -706,6 +711,7 @@ static void flush_fs_workqueues(struct ceph_fs_client *fsc)
flush_workqueue(fsc->wb_wq);
flush_workqueue(fsc->pg_inv_wq);
flush_workqueue(fsc->trunc_wq);
flush_workqueue(fsc->cap_wq);
}
static void destroy_fs_client(struct ceph_fs_client *fsc)
......@@ -715,6 +721,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
destroy_workqueue(fsc->wb_wq);
destroy_workqueue(fsc->pg_inv_wq);
destroy_workqueue(fsc->trunc_wq);
destroy_workqueue(fsc->cap_wq);
mempool_destroy(fsc->wb_pagevec_pool);
......
......@@ -107,10 +107,12 @@ struct ceph_fs_client {
/* writeback */
mempool_t *wb_pagevec_pool;
atomic_long_t writeback_count;
struct workqueue_struct *wb_wq;
struct workqueue_struct *pg_inv_wq;
struct workqueue_struct *trunc_wq;
atomic_long_t writeback_count;
struct workqueue_struct *cap_wq;
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_dentry_lru, *debugfs_caps;
......@@ -988,11 +990,11 @@ extern void ceph_add_cap(struct inode *inode,
unsigned cap, unsigned seq, u64 realmino, int flags,
struct ceph_cap **new_cap);
extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
extern void __ceph_remove_caps(struct inode* inode);
extern void ceph_put_cap(struct ceph_mds_client *mdsc,
struct ceph_cap *cap);
extern int ceph_is_any_caps(struct inode *inode);
extern void ceph_queue_caps_release(struct inode *inode);
extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
int datasync);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment