Commit 75c9627e authored by Yan, Zheng's avatar Yan, Zheng Committed by Ilya Dryomov

ceph: map snapid to anonymous bdev ID

ceph_getattr() return zero dev ID for head inodes and set dev ID to
snapid directly for snaphost inodes. This is not good because userspace
utilities may consider device ID of 0 as invalid, snapid may conflict
with other device's ID.

This patch introduces "snapids to anonymous bdev IDs" map. we create a
new mapping when we see a snapid for the first time. we trim unused
mapping after it is ilde for 5 minutes.

Link: http://tracker.ceph.com/issues/22353Signed-off-by: default avatar"Yan, Zheng" <zyan@redhat.com>
Acked-by: default avatarJeff Layton <jlayton@redhat.com>
Signed-off-by: default avatarIlya Dryomov <idryomov@gmail.com>
parent 81c5a148
......@@ -548,10 +548,11 @@ void ceph_destroy_inode(struct inode *inode)
*/
if (ci->i_snap_realm) {
struct ceph_mds_client *mdsc =
ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
ceph_inode_to_client(inode)->mdsc;
if (ceph_snap(inode) == CEPH_NOSNAP) {
struct ceph_snap_realm *realm = ci->i_snap_realm;
dout(" dropping residual ref to snap realm %p\n", realm);
dout(" dropping residual ref to snap realm %p\n",
realm);
spin_lock(&realm->inodes_with_caps_lock);
list_del_init(&ci->i_snap_realm_item);
ci->i_snap_realm = NULL;
......@@ -559,6 +560,10 @@ void ceph_destroy_inode(struct inode *inode)
realm->inode = NULL;
spin_unlock(&realm->inodes_with_caps_lock);
ceph_put_snap_realm(mdsc, realm);
} else {
ceph_put_snapid_map(mdsc, ci->i_snapid_map);
ci->i_snap_realm = NULL;
}
}
kfree(ci->i_symlink);
......@@ -776,6 +781,9 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
iinfo->pool_ns_len);
if (ceph_snap(inode) != CEPH_NOSNAP && !ci->i_snapid_map)
ci->i_snapid_map = ceph_get_snapid_map(mdsc, ceph_snap(inode));
spin_lock(&ci->i_ceph_lock);
/*
......@@ -2260,10 +2268,11 @@ int ceph_getattr(const struct path *path, struct kstat *stat,
if (!err) {
generic_fillattr(inode, stat);
stat->ino = ceph_translate_ino(inode->i_sb, inode->i_ino);
if (ceph_snap(inode) != CEPH_NOSNAP)
stat->dev = ceph_snap(inode);
if (ceph_snap(inode) == CEPH_NOSNAP)
stat->dev = inode->i_sb->s_dev;
else
stat->dev = 0;
stat->dev = ci->i_snapid_map ? ci->i_snapid_map->dev : 0;
if (S_ISDIR(inode->i_mode)) {
if (ceph_test_mount_opt(ceph_sb_to_client(inode->i_sb),
RBYTES))
......
......@@ -3791,6 +3791,8 @@ static void delayed_work(struct work_struct *work)
dout("mdsc delayed_work\n");
ceph_check_delayed_caps(mdsc);
ceph_trim_snapid_map(mdsc);
mutex_lock(&mdsc->mutex);
renew_interval = mdsc->mdsmap->m_session_timeout >> 2;
renew_caps = time_after_eq(jiffies, HZ*renew_interval +
......@@ -3893,6 +3895,10 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
ceph_caps_init(mdsc);
ceph_adjust_min_caps(mdsc, fsc->min_caps);
spin_lock_init(&mdsc->snapid_map_lock);
mdsc->snapid_map_tree = RB_ROOT;
INIT_LIST_HEAD(&mdsc->snapid_map_lru);
init_rwsem(&mdsc->pool_perm_rwsem);
mdsc->pool_perm_tree = RB_ROOT;
......@@ -4086,6 +4092,8 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
WARN_ON(!list_empty(&mdsc->cap_delay_list));
mutex_unlock(&mdsc->mutex);
ceph_cleanup_snapid_map(mdsc);
ceph_cleanup_empty_realms(mdsc);
cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
......
......@@ -313,6 +313,15 @@ struct ceph_pool_perm {
char pool_ns[];
};
struct ceph_snapid_map {
struct rb_node node;
struct list_head lru;
atomic_t ref;
u64 snap;
dev_t dev;
unsigned long last_used;
};
/*
* mds client state
*/
......@@ -390,6 +399,10 @@ struct ceph_mds_client {
struct list_head dentry_lru;
int num_dentry;
spinlock_t snapid_map_lock;
struct rb_root snapid_map_tree;
struct list_head snapid_map_lru;
struct rw_semaphore pool_perm_rwsem;
struct rb_root pool_perm_tree;
......
......@@ -3,12 +3,13 @@
#include <linux/sort.h>
#include <linux/slab.h>
#include "super.h"
#include "mds_client.h"
#include <linux/ceph/decode.h>
/* unused map expires after 5 minutes */
#define CEPH_SNAPID_MAP_TIMEOUT (5 * 60 * HZ)
/*
* Snapshots in ceph are driven in large part by cooperation from the
* client. In contrast to local file systems or file servers that
......@@ -989,3 +990,154 @@ void ceph_handle_snap(struct ceph_mds_client *mdsc,
up_write(&mdsc->snap_rwsem);
return;
}
struct ceph_snapid_map* ceph_get_snapid_map(struct ceph_mds_client *mdsc,
u64 snap)
{
struct ceph_snapid_map *sm, *exist;
struct rb_node **p, *parent;
int ret;
exist = NULL;
spin_lock(&mdsc->snapid_map_lock);
p = &mdsc->snapid_map_tree.rb_node;
while (*p) {
exist = rb_entry(*p, struct ceph_snapid_map, node);
if (snap > exist->snap) {
p = &(*p)->rb_left;
} else if (snap < exist->snap) {
p = &(*p)->rb_right;
} else {
if (atomic_inc_return(&exist->ref) == 1)
list_del_init(&exist->lru);
break;
}
exist = NULL;
}
spin_unlock(&mdsc->snapid_map_lock);
if (exist) {
dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
return exist;
}
sm = kmalloc(sizeof(*sm), GFP_NOFS);
if (!sm)
return NULL;
ret = get_anon_bdev(&sm->dev);
if (ret < 0) {
kfree(sm);
return NULL;
}
INIT_LIST_HEAD(&sm->lru);
atomic_set(&sm->ref, 1);
sm->snap = snap;
exist = NULL;
parent = NULL;
p = &mdsc->snapid_map_tree.rb_node;
spin_lock(&mdsc->snapid_map_lock);
while (*p) {
parent = *p;
exist = rb_entry(*p, struct ceph_snapid_map, node);
if (snap > exist->snap)
p = &(*p)->rb_left;
else if (snap < exist->snap)
p = &(*p)->rb_right;
else
break;
exist = NULL;
}
if (exist) {
if (atomic_inc_return(&exist->ref) == 1)
list_del_init(&exist->lru);
} else {
rb_link_node(&sm->node, parent, p);
rb_insert_color(&sm->node, &mdsc->snapid_map_tree);
}
spin_unlock(&mdsc->snapid_map_lock);
if (exist) {
free_anon_bdev(sm->dev);
kfree(sm);
dout("found snapid map %llx -> %x\n", exist->snap, exist->dev);
return exist;
}
dout("create snapid map %llx -> %x\n", sm->snap, sm->dev);
return sm;
}
void ceph_put_snapid_map(struct ceph_mds_client* mdsc,
struct ceph_snapid_map *sm)
{
if (!sm)
return;
if (atomic_dec_and_lock(&sm->ref, &mdsc->snapid_map_lock)) {
if (!RB_EMPTY_NODE(&sm->node)) {
sm->last_used = jiffies;
list_add_tail(&sm->lru, &mdsc->snapid_map_lru);
spin_unlock(&mdsc->snapid_map_lock);
} else {
/* already cleaned up by
* ceph_cleanup_snapid_map() */
spin_unlock(&mdsc->snapid_map_lock);
kfree(sm);
}
}
}
void ceph_trim_snapid_map(struct ceph_mds_client *mdsc)
{
struct ceph_snapid_map *sm;
unsigned long now;
LIST_HEAD(to_free);
spin_lock(&mdsc->snapid_map_lock);
now = jiffies;
while (!list_empty(&mdsc->snapid_map_lru)) {
sm = list_first_entry(&mdsc->snapid_map_lru,
struct ceph_snapid_map, lru);
if (time_after(sm->last_used + CEPH_SNAPID_MAP_TIMEOUT, now))
break;
rb_erase(&sm->node, &mdsc->snapid_map_tree);
list_move(&sm->lru, &to_free);
}
spin_unlock(&mdsc->snapid_map_lock);
while (!list_empty(&to_free)) {
sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
list_del(&sm->lru);
dout("trim snapid map %llx -> %x\n", sm->snap, sm->dev);
free_anon_bdev(sm->dev);
kfree(sm);
}
}
void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc)
{
struct ceph_snapid_map *sm;
struct rb_node *p;
LIST_HEAD(to_free);
spin_lock(&mdsc->snapid_map_lock);
while ((p = rb_first(&mdsc->snapid_map_tree))) {
sm = rb_entry(p, struct ceph_snapid_map, node);
rb_erase(p, &mdsc->snapid_map_tree);
RB_CLEAR_NODE(p);
list_move(&sm->lru, &to_free);
}
spin_unlock(&mdsc->snapid_map_lock);
while (!list_empty(&to_free)) {
sm = list_first_entry(&to_free, struct ceph_snapid_map, lru);
list_del(&sm->lru);
free_anon_bdev(sm->dev);
if (WARN_ON_ONCE(atomic_read(&sm->ref))) {
pr_err("snapid map %llx -> %x still in use\n",
sm->snap, sm->dev);
}
}
}
......@@ -370,7 +370,10 @@ struct ceph_inode_info {
struct list_head i_unsafe_iops; /* uncommitted mds inode ops */
spinlock_t i_unsafe_lock;
union {
struct ceph_snap_realm *i_snap_realm; /* snap realm (if caps) */
struct ceph_snapid_map *i_snapid_map; /* snapid -> dev_t */
};
int i_snap_realm_counter; /* snap realm (if caps) */
struct list_head i_snap_realm_item;
struct list_head i_snap_flush_item;
......@@ -837,6 +840,14 @@ extern int __ceph_finish_cap_snap(struct ceph_inode_info *ci,
struct ceph_cap_snap *capsnap);
extern void ceph_cleanup_empty_realms(struct ceph_mds_client *mdsc);
extern struct ceph_snapid_map *ceph_get_snapid_map(struct ceph_mds_client *mdsc,
u64 snap);
extern void ceph_put_snapid_map(struct ceph_mds_client* mdsc,
struct ceph_snapid_map *sm);
extern void ceph_trim_snapid_map(struct ceph_mds_client *mdsc);
extern void ceph_cleanup_snapid_map(struct ceph_mds_client *mdsc);
/*
* a cap_snap is "pending" if it is still awaiting an in-progress
* sync write (that may/may not still update size, mtime, etc.).
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment