Commit 95288a9b authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.8-rc1' of git://github.com/ceph/ceph-client

Pull ceph updates from Ilya Dryomov:
 "The highlights are:

   - OSD/MDS latency and caps cache metrics infrastructure for the
     filesytem (Xiubo Li). Currently available through debugfs and will
     be periodically sent to the MDS in the future.

   - support for replica reads (balanced and localized reads) for rbd
     and the filesystem (myself). The default remains to always read
     from primary, users can opt-in with the new crush_location and
     read_from_replica options. Note that reading from replica is safe
     for general use only since Octopus.

   - support for RADOS allocation hint flags (myself). Currently used by
     rbd to propagate the compressible/incompressible hint given with
     the new compression_hint map option and ready for passing on more
     advanced hints, e.g. based on fadvise() from the filesystem.

   - support for efficient cross-quota-realm renames (Luis Henriques)

   - assorted cap handling improvements and cleanups, particularly
     untangling some of the locking (Jeff Layton)"

* tag 'ceph-for-5.8-rc1' of git://github.com/ceph/ceph-client: (29 commits)
  rbd: compression_hint option
  libceph: support for alloc hint flags
  libceph: read_from_replica option
  libceph: support for balanced and localized reads
  libceph: crush_location infrastructure
  libceph: decode CRUSH device/bucket types and names
  libceph: add non-asserting rbtree insertion helper
  ceph: skip checking caps when session reconnecting and releasing reqs
  ceph: make sure mdsc->mutex is nested in s->s_mutex to fix dead lock
  ceph: don't return -ESTALE if there's still an open file
  libceph, rbd: replace zero-length array with flexible-array
  ceph: allow rename operation under different quota realms
  ceph: normalize 'delta' parameter usage in check_quota_exceeded
  ceph: ceph_kick_flushing_caps needs the s_mutex
  ceph: request expedited service on session's last cap flush
  ceph: convert mdsc->cap_dirty to a per-session list
  ceph: reset i_requested_max_size if file write is not wanted
  ceph: throw a warning if we destroy session with mutex still locked
  ceph: fix potential race in ceph_check_caps
  ceph: document what protects i_dirty_item and i_flushing_item
  ...
parents ca687877 dc1dad8e
...@@ -836,6 +836,7 @@ enum { ...@@ -836,6 +836,7 @@ enum {
Opt_lock_timeout, Opt_lock_timeout,
/* int args above */ /* int args above */
Opt_pool_ns, Opt_pool_ns,
Opt_compression_hint,
/* string args above */ /* string args above */
Opt_read_only, Opt_read_only,
Opt_read_write, Opt_read_write,
...@@ -844,8 +845,23 @@ enum { ...@@ -844,8 +845,23 @@ enum {
Opt_notrim, Opt_notrim,
}; };
enum {
Opt_compression_hint_none,
Opt_compression_hint_compressible,
Opt_compression_hint_incompressible,
};
static const struct constant_table rbd_param_compression_hint[] = {
{"none", Opt_compression_hint_none},
{"compressible", Opt_compression_hint_compressible},
{"incompressible", Opt_compression_hint_incompressible},
{}
};
static const struct fs_parameter_spec rbd_parameters[] = { static const struct fs_parameter_spec rbd_parameters[] = {
fsparam_u32 ("alloc_size", Opt_alloc_size), fsparam_u32 ("alloc_size", Opt_alloc_size),
fsparam_enum ("compression_hint", Opt_compression_hint,
rbd_param_compression_hint),
fsparam_flag ("exclusive", Opt_exclusive), fsparam_flag ("exclusive", Opt_exclusive),
fsparam_flag ("lock_on_read", Opt_lock_on_read), fsparam_flag ("lock_on_read", Opt_lock_on_read),
fsparam_u32 ("lock_timeout", Opt_lock_timeout), fsparam_u32 ("lock_timeout", Opt_lock_timeout),
...@@ -867,6 +883,8 @@ struct rbd_options { ...@@ -867,6 +883,8 @@ struct rbd_options {
bool lock_on_read; bool lock_on_read;
bool exclusive; bool exclusive;
bool trim; bool trim;
u32 alloc_hint_flags; /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
}; };
#define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ #define RBD_QUEUE_DEPTH_DEFAULT BLKDEV_MAX_RQ
...@@ -2253,7 +2271,8 @@ static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req, ...@@ -2253,7 +2271,8 @@ static void __rbd_osd_setup_write_ops(struct ceph_osd_request *osd_req,
!(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) { !(obj_req->flags & RBD_OBJ_FLAG_MAY_EXIST)) {
osd_req_op_alloc_hint_init(osd_req, which++, osd_req_op_alloc_hint_init(osd_req, which++,
rbd_dev->layout.object_size, rbd_dev->layout.object_size,
rbd_dev->layout.object_size); rbd_dev->layout.object_size,
rbd_dev->opts->alloc_hint_flags);
} }
if (rbd_obj_is_entire(obj_req)) if (rbd_obj_is_entire(obj_req))
...@@ -6331,6 +6350,29 @@ static int rbd_parse_param(struct fs_parameter *param, ...@@ -6331,6 +6350,29 @@ static int rbd_parse_param(struct fs_parameter *param,
pctx->spec->pool_ns = param->string; pctx->spec->pool_ns = param->string;
param->string = NULL; param->string = NULL;
break; break;
case Opt_compression_hint:
switch (result.uint_32) {
case Opt_compression_hint_none:
opt->alloc_hint_flags &=
~(CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE |
CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE);
break;
case Opt_compression_hint_compressible:
opt->alloc_hint_flags |=
CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
opt->alloc_hint_flags &=
~CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
break;
case Opt_compression_hint_incompressible:
opt->alloc_hint_flags |=
CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE;
opt->alloc_hint_flags &=
~CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE;
break;
default:
BUG();
}
break;
case Opt_read_only: case Opt_read_only:
opt->read_only = true; opt->read_only = true;
break; break;
......
...@@ -93,7 +93,7 @@ struct rbd_image_header_ondisk { ...@@ -93,7 +93,7 @@ struct rbd_image_header_ondisk {
__le32 snap_count; __le32 snap_count;
__le32 reserved; __le32 reserved;
__le64 snap_names_len; __le64 snap_names_len;
struct rbd_image_snap_ondisk snaps[0]; struct rbd_image_snap_ondisk snaps[];
} __attribute__((packed)); } __attribute__((packed));
......
...@@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o ...@@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o
ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \ ceph-y := super.o inode.o dir.o file.o locks.o addr.o ioctl.o \
export.o caps.o snap.o xattr.o quota.o io.o \ export.o caps.o snap.o xattr.o quota.o io.o \
mds_client.o mdsmap.o strings.o ceph_frag.o \ mds_client.o mdsmap.o strings.o ceph_frag.o \
debugfs.o util.o debugfs.o util.o metric.o
ceph-$(CONFIG_CEPH_FSCACHE) += cache.o ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o ceph-$(CONFIG_CEPH_FS_POSIX_ACL) += acl.o
...@@ -22,7 +22,7 @@ static inline void ceph_set_cached_acl(struct inode *inode, ...@@ -22,7 +22,7 @@ static inline void ceph_set_cached_acl(struct inode *inode,
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0)) if (__ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 0))
set_cached_acl(inode, type, acl); set_cached_acl(inode, type, acl);
else else
forget_cached_acl(inode, type); forget_cached_acl(inode, type);
......
...@@ -11,10 +11,12 @@ ...@@ -11,10 +11,12 @@
#include <linux/task_io_accounting_ops.h> #include <linux/task_io_accounting_ops.h>
#include <linux/signal.h> #include <linux/signal.h>
#include <linux/iversion.h> #include <linux/iversion.h>
#include <linux/ktime.h>
#include "super.h" #include "super.h"
#include "mds_client.h" #include "mds_client.h"
#include "cache.h" #include "cache.h"
#include "metric.h"
#include <linux/ceph/osd_client.h> #include <linux/ceph/osd_client.h>
#include <linux/ceph/striper.h> #include <linux/ceph/striper.h>
...@@ -216,6 +218,9 @@ static int ceph_sync_readpages(struct ceph_fs_client *fsc, ...@@ -216,6 +218,9 @@ static int ceph_sync_readpages(struct ceph_fs_client *fsc,
if (!rc) if (!rc)
rc = ceph_osdc_wait_request(osdc, req); rc = ceph_osdc_wait_request(osdc, req);
ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, rc);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
dout("readpages result %d\n", rc); dout("readpages result %d\n", rc);
return rc; return rc;
...@@ -299,6 +304,7 @@ static int ceph_readpage(struct file *filp, struct page *page) ...@@ -299,6 +304,7 @@ static int ceph_readpage(struct file *filp, struct page *page)
static void finish_read(struct ceph_osd_request *req) static void finish_read(struct ceph_osd_request *req)
{ {
struct inode *inode = req->r_inode; struct inode *inode = req->r_inode;
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_osd_data *osd_data; struct ceph_osd_data *osd_data;
int rc = req->r_result <= 0 ? req->r_result : 0; int rc = req->r_result <= 0 ? req->r_result : 0;
int bytes = req->r_result >= 0 ? req->r_result : 0; int bytes = req->r_result >= 0 ? req->r_result : 0;
...@@ -336,6 +342,10 @@ static void finish_read(struct ceph_osd_request *req) ...@@ -336,6 +342,10 @@ static void finish_read(struct ceph_osd_request *req)
put_page(page); put_page(page);
bytes -= PAGE_SIZE; bytes -= PAGE_SIZE;
} }
ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, rc);
kfree(osd_data->pages); kfree(osd_data->pages);
} }
...@@ -643,6 +653,9 @@ static int ceph_sync_writepages(struct ceph_fs_client *fsc, ...@@ -643,6 +653,9 @@ static int ceph_sync_writepages(struct ceph_fs_client *fsc,
if (!rc) if (!rc)
rc = ceph_osdc_wait_request(osdc, req); rc = ceph_osdc_wait_request(osdc, req);
ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, rc);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (rc == 0) if (rc == 0)
rc = len; rc = len;
...@@ -794,6 +807,9 @@ static void writepages_finish(struct ceph_osd_request *req) ...@@ -794,6 +807,9 @@ static void writepages_finish(struct ceph_osd_request *req)
ceph_clear_error_write(ci); ceph_clear_error_write(ci);
} }
ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, rc);
/* /*
* We lost the cache cap, need to truncate the page before * We lost the cache cap, need to truncate the page before
* it is unlocked, otherwise we'd truncate it later in the * it is unlocked, otherwise we'd truncate it later in the
...@@ -1852,6 +1868,10 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ...@@ -1852,6 +1868,10 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)
err = ceph_osdc_start_request(&fsc->client->osdc, req, false); err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
if (!err) if (!err)
err = ceph_osdc_wait_request(&fsc->client->osdc, req); err = ceph_osdc_wait_request(&fsc->client->osdc, req);
ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, err);
out_put: out_put:
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (err == -ECANCELED) if (err == -ECANCELED)
......
This diff is collapsed.
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
#include <linux/ctype.h> #include <linux/ctype.h>
#include <linux/debugfs.h> #include <linux/debugfs.h>
#include <linux/seq_file.h> #include <linux/seq_file.h>
#include <linux/math64.h>
#include <linux/ktime.h>
#include <linux/ceph/libceph.h> #include <linux/ceph/libceph.h>
#include <linux/ceph/mon_client.h> #include <linux/ceph/mon_client.h>
...@@ -18,6 +20,7 @@ ...@@ -18,6 +20,7 @@
#ifdef CONFIG_DEBUG_FS #ifdef CONFIG_DEBUG_FS
#include "mds_client.h" #include "mds_client.h"
#include "metric.h"
static int mdsmap_show(struct seq_file *s, void *p) static int mdsmap_show(struct seq_file *s, void *p)
{ {
...@@ -124,6 +127,87 @@ static int mdsc_show(struct seq_file *s, void *p) ...@@ -124,6 +127,87 @@ static int mdsc_show(struct seq_file *s, void *p)
return 0; return 0;
} }
#define CEPH_METRIC_SHOW(name, total, avg, min, max, sq) { \
s64 _total, _avg, _min, _max, _sq, _st; \
_avg = ktime_to_us(avg); \
_min = ktime_to_us(min == KTIME_MAX ? 0 : min); \
_max = ktime_to_us(max); \
_total = total - 1; \
_sq = _total > 0 ? DIV64_U64_ROUND_CLOSEST(sq, _total) : 0; \
_st = int_sqrt64(_sq); \
_st = ktime_to_us(_st); \
seq_printf(s, "%-14s%-12lld%-16lld%-16lld%-16lld%lld\n", \
name, total, _avg, _min, _max, _st); \
}
static int metric_show(struct seq_file *s, void *p)
{
struct ceph_fs_client *fsc = s->private;
struct ceph_mds_client *mdsc = fsc->mdsc;
struct ceph_client_metric *m = &mdsc->metric;
int i, nr_caps = 0;
s64 total, sum, avg, min, max, sq;
seq_printf(s, "item total avg_lat(us) min_lat(us) max_lat(us) stdev(us)\n");
seq_printf(s, "-----------------------------------------------------------------------------------\n");
spin_lock(&m->read_latency_lock);
total = m->total_reads;
sum = m->read_latency_sum;
avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
min = m->read_latency_min;
max = m->read_latency_max;
sq = m->read_latency_sq_sum;
spin_unlock(&m->read_latency_lock);
CEPH_METRIC_SHOW("read", total, avg, min, max, sq);
spin_lock(&m->write_latency_lock);
total = m->total_writes;
sum = m->write_latency_sum;
avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
min = m->write_latency_min;
max = m->write_latency_max;
sq = m->write_latency_sq_sum;
spin_unlock(&m->write_latency_lock);
CEPH_METRIC_SHOW("write", total, avg, min, max, sq);
spin_lock(&m->metadata_latency_lock);
total = m->total_metadatas;
sum = m->metadata_latency_sum;
avg = total > 0 ? DIV64_U64_ROUND_CLOSEST(sum, total) : 0;
min = m->metadata_latency_min;
max = m->metadata_latency_max;
sq = m->metadata_latency_sq_sum;
spin_unlock(&m->metadata_latency_lock);
CEPH_METRIC_SHOW("metadata", total, avg, min, max, sq);
seq_printf(s, "\n");
seq_printf(s, "item total miss hit\n");
seq_printf(s, "-------------------------------------------------\n");
seq_printf(s, "%-14s%-16lld%-16lld%lld\n", "d_lease",
atomic64_read(&m->total_dentries),
percpu_counter_sum(&m->d_lease_mis),
percpu_counter_sum(&m->d_lease_hit));
mutex_lock(&mdsc->mutex);
for (i = 0; i < mdsc->max_sessions; i++) {
struct ceph_mds_session *s;
s = __ceph_lookup_mds_session(mdsc, i);
if (!s)
continue;
nr_caps += s->s_nr_caps;
ceph_put_mds_session(s);
}
mutex_unlock(&mdsc->mutex);
seq_printf(s, "%-14s%-16d%-16lld%lld\n", "caps", nr_caps,
percpu_counter_sum(&m->i_caps_mis),
percpu_counter_sum(&m->i_caps_hit));
return 0;
}
static int caps_show_cb(struct inode *inode, struct ceph_cap *cap, void *p) static int caps_show_cb(struct inode *inode, struct ceph_cap *cap, void *p)
{ {
struct seq_file *s = p; struct seq_file *s = p;
...@@ -222,6 +306,7 @@ DEFINE_SHOW_ATTRIBUTE(mdsmap); ...@@ -222,6 +306,7 @@ DEFINE_SHOW_ATTRIBUTE(mdsmap);
DEFINE_SHOW_ATTRIBUTE(mdsc); DEFINE_SHOW_ATTRIBUTE(mdsc);
DEFINE_SHOW_ATTRIBUTE(caps); DEFINE_SHOW_ATTRIBUTE(caps);
DEFINE_SHOW_ATTRIBUTE(mds_sessions); DEFINE_SHOW_ATTRIBUTE(mds_sessions);
DEFINE_SHOW_ATTRIBUTE(metric);
/* /*
...@@ -255,6 +340,7 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc) ...@@ -255,6 +340,7 @@ void ceph_fs_debugfs_cleanup(struct ceph_fs_client *fsc)
debugfs_remove(fsc->debugfs_mdsmap); debugfs_remove(fsc->debugfs_mdsmap);
debugfs_remove(fsc->debugfs_mds_sessions); debugfs_remove(fsc->debugfs_mds_sessions);
debugfs_remove(fsc->debugfs_caps); debugfs_remove(fsc->debugfs_caps);
debugfs_remove(fsc->debugfs_metric);
debugfs_remove(fsc->debugfs_mdsc); debugfs_remove(fsc->debugfs_mdsc);
} }
...@@ -295,11 +381,17 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc) ...@@ -295,11 +381,17 @@ void ceph_fs_debugfs_init(struct ceph_fs_client *fsc)
fsc, fsc,
&mdsc_fops); &mdsc_fops);
fsc->debugfs_metric = debugfs_create_file("metrics",
0400,
fsc->client->debugfs_dir,
fsc,
&metric_fops);
fsc->debugfs_caps = debugfs_create_file("caps", fsc->debugfs_caps = debugfs_create_file("caps",
0400, 0400,
fsc->client->debugfs_dir, fsc->client->debugfs_dir,
fsc, fsc,
&caps_fops); &caps_fops);
} }
......
...@@ -38,6 +38,8 @@ static int __dir_lease_try_check(const struct dentry *dentry); ...@@ -38,6 +38,8 @@ static int __dir_lease_try_check(const struct dentry *dentry);
static int ceph_d_init(struct dentry *dentry) static int ceph_d_init(struct dentry *dentry)
{ {
struct ceph_dentry_info *di; struct ceph_dentry_info *di;
struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL); di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL);
if (!di) if (!di)
...@@ -48,6 +50,9 @@ static int ceph_d_init(struct dentry *dentry) ...@@ -48,6 +50,9 @@ static int ceph_d_init(struct dentry *dentry)
di->time = jiffies; di->time = jiffies;
dentry->d_fsdata = di; dentry->d_fsdata = di;
INIT_LIST_HEAD(&di->lease_list); INIT_LIST_HEAD(&di->lease_list);
atomic64_inc(&mdsc->metric.total_dentries);
return 0; return 0;
} }
...@@ -344,8 +349,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ...@@ -344,8 +349,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR && ceph_snap(inode) != CEPH_SNAPDIR &&
__ceph_dir_is_complete_ordered(ci) && __ceph_dir_is_complete_ordered(ci) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
int shared_gen = atomic_read(&ci->i_shared_gen); int shared_gen = atomic_read(&ci->i_shared_gen);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
err = __dcache_readdir(file, ctx, shared_gen); err = __dcache_readdir(file, ctx, shared_gen);
if (err != -EAGAIN) if (err != -EAGAIN)
...@@ -762,7 +768,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, ...@@ -762,7 +768,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
!is_root_ceph_dentry(dir, dentry) && !is_root_ceph_dentry(dir, dentry) &&
ceph_test_mount_opt(fsc, DCACHE) && ceph_test_mount_opt(fsc, DCACHE) &&
__ceph_dir_is_complete(ci) && __ceph_dir_is_complete(ci) &&
(__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) {
__ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD); __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
dout(" dir %p complete, -ENOENT\n", dir); dout(" dir %p complete, -ENOENT\n", dir);
...@@ -1203,11 +1209,12 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, ...@@ -1203,11 +1209,12 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
op = CEPH_MDS_OP_RENAMESNAP; op = CEPH_MDS_OP_RENAMESNAP;
else else
return -EROFS; return -EROFS;
} else if (old_dir != new_dir) {
err = ceph_quota_check_rename(mdsc, d_inode(old_dentry),
new_dir);
if (err)
return err;
} }
/* don't allow cross-quota renames */
if ((old_dir != new_dir) &&
(!ceph_quota_is_same_realm(old_dir, new_dir)))
return -EXDEV;
dout("rename dir %p dentry %p to dir %p dentry %p\n", dout("rename dir %p dentry %p to dir %p dentry %p\n",
old_dir, old_dentry, new_dir, new_dentry); old_dir, old_dentry, new_dir, new_dentry);
...@@ -1709,6 +1716,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) ...@@ -1709,6 +1716,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
if (flags & LOOKUP_RCU) if (flags & LOOKUP_RCU)
return -ECHILD; return -ECHILD;
percpu_counter_inc(&mdsc->metric.d_lease_mis);
op = ceph_snap(dir) == CEPH_SNAPDIR ? op = ceph_snap(dir) == CEPH_SNAPDIR ?
CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
...@@ -1740,6 +1749,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) ...@@ -1740,6 +1749,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags)
dout("d_revalidate %p lookup result=%d\n", dout("d_revalidate %p lookup result=%d\n",
dentry, err); dentry, err);
} }
} else {
percpu_counter_inc(&mdsc->metric.d_lease_hit);
} }
dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid");
...@@ -1782,9 +1793,12 @@ static int ceph_d_delete(const struct dentry *dentry) ...@@ -1782,9 +1793,12 @@ static int ceph_d_delete(const struct dentry *dentry)
static void ceph_d_release(struct dentry *dentry) static void ceph_d_release(struct dentry *dentry)
{ {
struct ceph_dentry_info *di = ceph_dentry(dentry); struct ceph_dentry_info *di = ceph_dentry(dentry);
struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
dout("d_release %p\n", dentry); dout("d_release %p\n", dentry);
atomic64_dec(&fsc->mdsc->metric.total_dentries);
spin_lock(&dentry->d_lock); spin_lock(&dentry->d_lock);
__dentry_lease_unlist(di); __dentry_lease_unlist(di);
dentry->d_fsdata = NULL; dentry->d_fsdata = NULL;
......
...@@ -172,9 +172,16 @@ struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino) ...@@ -172,9 +172,16 @@ struct inode *ceph_lookup_inode(struct super_block *sb, u64 ino)
static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino) static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
{ {
struct inode *inode = __lookup_inode(sb, ino); struct inode *inode = __lookup_inode(sb, ino);
int err;
if (IS_ERR(inode)) if (IS_ERR(inode))
return ERR_CAST(inode); return ERR_CAST(inode);
if (inode->i_nlink == 0) { /* We need LINK caps to reliably check i_nlink */
err = ceph_do_getattr(inode, CEPH_CAP_LINK_SHARED, false);
if (err)
return ERR_PTR(err);
/* -ESTALE if inode as been unlinked and no file is open */
if ((inode->i_nlink == 0) && (atomic_read(&inode->i_count) == 1)) {
iput(inode); iput(inode);
return ERR_PTR(-ESTALE); return ERR_PTR(-ESTALE);
} }
......
...@@ -11,11 +11,13 @@ ...@@ -11,11 +11,13 @@
#include <linux/writeback.h> #include <linux/writeback.h>
#include <linux/falloc.h> #include <linux/falloc.h>
#include <linux/iversion.h> #include <linux/iversion.h>
#include <linux/ktime.h>
#include "super.h" #include "super.h"
#include "mds_client.h" #include "mds_client.h"
#include "cache.h" #include "cache.h"
#include "io.h" #include "io.h"
#include "metric.h"
static __le32 ceph_flags_sys2wire(u32 flags) static __le32 ceph_flags_sys2wire(u32 flags)
{ {
...@@ -906,6 +908,12 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, ...@@ -906,6 +908,12 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
ret = ceph_osdc_start_request(osdc, req, false); ret = ceph_osdc_start_request(osdc, req, false);
if (!ret) if (!ret)
ret = ceph_osdc_wait_request(osdc, req); ret = ceph_osdc_wait_request(osdc, req);
ceph_update_read_latency(&fsc->mdsc->metric,
req->r_start_latency,
req->r_end_latency,
ret);
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
i_size = i_size_read(inode); i_size = i_size_read(inode);
...@@ -1044,6 +1052,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) ...@@ -1044,6 +1052,8 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
struct inode *inode = req->r_inode; struct inode *inode = req->r_inode;
struct ceph_aio_request *aio_req = req->r_priv; struct ceph_aio_request *aio_req = req->r_priv;
struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_client_metric *metric = &fsc->mdsc->metric;
BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS); BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_BVECS);
BUG_ON(!osd_data->num_bvecs); BUG_ON(!osd_data->num_bvecs);
...@@ -1051,6 +1061,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req) ...@@ -1051,6 +1061,16 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
dout("ceph_aio_complete_req %p rc %d bytes %u\n", dout("ceph_aio_complete_req %p rc %d bytes %u\n",
inode, rc, osd_data->bvec_pos.iter.bi_size); inode, rc, osd_data->bvec_pos.iter.bi_size);
/* r_start_latency == 0 means the request was not submitted */
if (req->r_start_latency) {
if (aio_req->write)
ceph_update_write_latency(metric, req->r_start_latency,
req->r_end_latency, rc);
else
ceph_update_read_latency(metric, req->r_start_latency,
req->r_end_latency, rc);
}
if (rc == -EOLDSNAPC) { if (rc == -EOLDSNAPC) {
struct ceph_aio_work *aio_work; struct ceph_aio_work *aio_work;
BUG_ON(!aio_req->write); BUG_ON(!aio_req->write);
...@@ -1179,6 +1199,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, ...@@ -1179,6 +1199,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
struct inode *inode = file_inode(file); struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
struct ceph_client_metric *metric = &fsc->mdsc->metric;
struct ceph_vino vino; struct ceph_vino vino;
struct ceph_osd_request *req; struct ceph_osd_request *req;
struct bio_vec *bvecs; struct bio_vec *bvecs;
...@@ -1295,6 +1316,13 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, ...@@ -1295,6 +1316,13 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
if (!ret) if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req); ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
if (write)
ceph_update_write_latency(metric, req->r_start_latency,
req->r_end_latency, ret);
else
ceph_update_read_latency(metric, req->r_start_latency,
req->r_end_latency, ret);
size = i_size_read(inode); size = i_size_read(inode);
if (!write) { if (!write) {
if (ret == -ENOENT) if (ret == -ENOENT)
...@@ -1466,6 +1494,8 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, ...@@ -1466,6 +1494,8 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
if (!ret) if (!ret)
ret = ceph_osdc_wait_request(&fsc->client->osdc, req); ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
req->r_end_latency, ret);
out: out:
ceph_osdc_put_request(req); ceph_osdc_put_request(req);
if (ret != 0) { if (ret != 0) {
......
...@@ -2288,8 +2288,8 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page, ...@@ -2288,8 +2288,8 @@ int __ceph_do_getattr(struct inode *inode, struct page *locked_page,
dout("do_getattr inode %p mask %s mode 0%o\n", dout("do_getattr inode %p mask %s mode 0%o\n",
inode, ceph_cap_string(mask), inode->i_mode); inode, ceph_cap_string(mask), inode->i_mode);
if (!force && ceph_caps_issued_mask(ceph_inode(inode), mask, 1)) if (!force && ceph_caps_issued_mask_metric(ceph_inode(inode), mask, 1))
return 0; return 0;
mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS; mode = (mask & CEPH_STAT_RSTAT) ? USE_AUTH_MDS : USE_ANY_MDS;
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode); req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, mode);
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
#include <linux/seq_file.h> #include <linux/seq_file.h>
#include <linux/ratelimit.h> #include <linux/ratelimit.h>
#include <linux/bits.h> #include <linux/bits.h>
#include <linux/ktime.h>
#include "super.h" #include "super.h"
#include "mds_client.h" #include "mds_client.h"
...@@ -658,6 +659,7 @@ void ceph_put_mds_session(struct ceph_mds_session *s) ...@@ -658,6 +659,7 @@ void ceph_put_mds_session(struct ceph_mds_session *s)
if (refcount_dec_and_test(&s->s_ref)) { if (refcount_dec_and_test(&s->s_ref)) {
if (s->s_auth.authorizer) if (s->s_auth.authorizer)
ceph_auth_destroy_authorizer(s->s_auth.authorizer); ceph_auth_destroy_authorizer(s->s_auth.authorizer);
WARN_ON(mutex_is_locked(&s->s_mutex));
xa_destroy(&s->s_delegated_inos); xa_destroy(&s->s_delegated_inos);
kfree(s); kfree(s);
} }
...@@ -753,6 +755,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, ...@@ -753,6 +755,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
INIT_LIST_HEAD(&s->s_cap_releases); INIT_LIST_HEAD(&s->s_cap_releases);
INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work); INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);
INIT_LIST_HEAD(&s->s_cap_dirty);
INIT_LIST_HEAD(&s->s_cap_flushing); INIT_LIST_HEAD(&s->s_cap_flushing);
mdsc->sessions[mds] = s; mdsc->sessions[mds] = s;
...@@ -801,7 +804,7 @@ void ceph_mdsc_release_request(struct kref *kref) ...@@ -801,7 +804,7 @@ void ceph_mdsc_release_request(struct kref *kref)
struct ceph_mds_request *req = container_of(kref, struct ceph_mds_request *req = container_of(kref,
struct ceph_mds_request, struct ceph_mds_request,
r_kref); r_kref);
ceph_mdsc_release_dir_caps(req); ceph_mdsc_release_dir_caps_no_check(req);
destroy_reply_info(&req->r_reply_info); destroy_reply_info(&req->r_reply_info);
if (req->r_request) if (req->r_request)
ceph_msg_put(req->r_request); ceph_msg_put(req->r_request);
...@@ -2201,6 +2204,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode) ...@@ -2201,6 +2204,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
mutex_init(&req->r_fill_mutex); mutex_init(&req->r_fill_mutex);
req->r_mdsc = mdsc; req->r_mdsc = mdsc;
req->r_started = jiffies; req->r_started = jiffies;
req->r_start_latency = ktime_get();
req->r_resend_mds = -1; req->r_resend_mds = -1;
INIT_LIST_HEAD(&req->r_unsafe_dir_item); INIT_LIST_HEAD(&req->r_unsafe_dir_item);
INIT_LIST_HEAD(&req->r_unsafe_target_item); INIT_LIST_HEAD(&req->r_unsafe_target_item);
...@@ -2547,6 +2551,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc, ...@@ -2547,6 +2551,8 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
static void complete_request(struct ceph_mds_client *mdsc, static void complete_request(struct ceph_mds_client *mdsc,
struct ceph_mds_request *req) struct ceph_mds_request *req)
{ {
req->r_end_latency = ktime_get();
if (req->r_callback) if (req->r_callback)
req->r_callback(mdsc, req); req->r_callback(mdsc, req);
complete_all(&req->r_completion); complete_all(&req->r_completion);
...@@ -3155,6 +3161,9 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg) ...@@ -3155,6 +3161,9 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
/* kick calling process */ /* kick calling process */
complete_request(mdsc, req); complete_request(mdsc, req);
ceph_update_metadata_latency(&mdsc->metric, req->r_start_latency,
req->r_end_latency, err);
out: out:
ceph_mdsc_put_request(req); ceph_mdsc_put_request(req);
return; return;
...@@ -3393,6 +3402,18 @@ void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req) ...@@ -3393,6 +3402,18 @@ void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req)
} }
} }
void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req)
{
int dcaps;
dcaps = xchg(&req->r_dir_caps, 0);
if (dcaps) {
dout("releasing r_dir_caps=%s\n", ceph_cap_string(dcaps));
ceph_put_cap_refs_no_check_caps(ceph_inode(req->r_parent),
dcaps);
}
}
/* /*
* called under session->mutex. * called under session->mutex.
*/ */
...@@ -3425,7 +3446,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc, ...@@ -3425,7 +3446,7 @@ static void replay_unsafe_requests(struct ceph_mds_client *mdsc,
if (req->r_session->s_mds != session->s_mds) if (req->r_session->s_mds != session->s_mds)
continue; continue;
ceph_mdsc_release_dir_caps(req); ceph_mdsc_release_dir_caps_no_check(req);
__send_request(mdsc, session, req, true); __send_request(mdsc, session, req, true);
} }
...@@ -3760,8 +3781,6 @@ static int encode_snap_realms(struct ceph_mds_client *mdsc, ...@@ -3760,8 +3781,6 @@ static int encode_snap_realms(struct ceph_mds_client *mdsc,
* recovering MDS might have. * recovering MDS might have.
* *
* This is a relatively heavyweight operation, but it's rare. * This is a relatively heavyweight operation, but it's rare.
*
* called with mdsc->mutex held.
*/ */
static void send_mds_reconnect(struct ceph_mds_client *mdsc, static void send_mds_reconnect(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session) struct ceph_mds_session *session)
...@@ -4015,7 +4034,11 @@ static void check_new_map(struct ceph_mds_client *mdsc, ...@@ -4015,7 +4034,11 @@ static void check_new_map(struct ceph_mds_client *mdsc,
oldstate != CEPH_MDS_STATE_STARTING) oldstate != CEPH_MDS_STATE_STARTING)
pr_info("mds%d recovery completed\n", s->s_mds); pr_info("mds%d recovery completed\n", s->s_mds);
kick_requests(mdsc, i); kick_requests(mdsc, i);
mutex_unlock(&mdsc->mutex);
mutex_lock(&s->s_mutex);
mutex_lock(&mdsc->mutex);
ceph_kick_flushing_caps(mdsc, s); ceph_kick_flushing_caps(mdsc, s);
mutex_unlock(&s->s_mutex);
wake_up_session_caps(s, RECONNECT); wake_up_session_caps(s, RECONNECT);
} }
} }
...@@ -4323,6 +4346,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ...@@ -4323,6 +4346,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
{ {
struct ceph_mds_client *mdsc; struct ceph_mds_client *mdsc;
int err;
mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS); mdsc = kzalloc(sizeof(struct ceph_mds_client), GFP_NOFS);
if (!mdsc) if (!mdsc)
...@@ -4331,8 +4355,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ...@@ -4331,8 +4355,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
mutex_init(&mdsc->mutex); mutex_init(&mdsc->mutex);
mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS); mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
if (!mdsc->mdsmap) { if (!mdsc->mdsmap) {
kfree(mdsc); err = -ENOMEM;
return -ENOMEM; goto err_mdsc;
} }
fsc->mdsc = mdsc; fsc->mdsc = mdsc;
...@@ -4364,13 +4388,15 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ...@@ -4364,13 +4388,15 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
spin_lock_init(&mdsc->snap_flush_lock); spin_lock_init(&mdsc->snap_flush_lock);
mdsc->last_cap_flush_tid = 1; mdsc->last_cap_flush_tid = 1;
INIT_LIST_HEAD(&mdsc->cap_flush_list); INIT_LIST_HEAD(&mdsc->cap_flush_list);
INIT_LIST_HEAD(&mdsc->cap_dirty);
INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
mdsc->num_cap_flushing = 0; mdsc->num_cap_flushing = 0;
spin_lock_init(&mdsc->cap_dirty_lock); spin_lock_init(&mdsc->cap_dirty_lock);
init_waitqueue_head(&mdsc->cap_flushing_wq); init_waitqueue_head(&mdsc->cap_flushing_wq);
INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work); INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
atomic_set(&mdsc->cap_reclaim_pending, 0); atomic_set(&mdsc->cap_reclaim_pending, 0);
err = ceph_metric_init(&mdsc->metric);
if (err)
goto err_mdsmap;
spin_lock_init(&mdsc->dentry_list_lock); spin_lock_init(&mdsc->dentry_list_lock);
INIT_LIST_HEAD(&mdsc->dentry_leases); INIT_LIST_HEAD(&mdsc->dentry_leases);
...@@ -4389,6 +4415,12 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ...@@ -4389,6 +4415,12 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
strscpy(mdsc->nodename, utsname()->nodename, strscpy(mdsc->nodename, utsname()->nodename,
sizeof(mdsc->nodename)); sizeof(mdsc->nodename));
return 0; return 0;
err_mdsmap:
kfree(mdsc->mdsmap);
err_mdsc:
kfree(mdsc);
return err;
} }
/* /*
...@@ -4646,6 +4678,8 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc) ...@@ -4646,6 +4678,8 @@ void ceph_mdsc_destroy(struct ceph_fs_client *fsc)
ceph_mdsc_stop(mdsc); ceph_mdsc_stop(mdsc);
ceph_metric_destroy(&mdsc->metric);
fsc->mdsc = NULL; fsc->mdsc = NULL;
kfree(mdsc); kfree(mdsc);
dout("mdsc_destroy %p done\n", mdsc); dout("mdsc_destroy %p done\n", mdsc);
......
...@@ -10,12 +10,15 @@ ...@@ -10,12 +10,15 @@
#include <linux/spinlock.h> #include <linux/spinlock.h>
#include <linux/refcount.h> #include <linux/refcount.h>
#include <linux/utsname.h> #include <linux/utsname.h>
#include <linux/ktime.h>
#include <linux/ceph/types.h> #include <linux/ceph/types.h>
#include <linux/ceph/messenger.h> #include <linux/ceph/messenger.h>
#include <linux/ceph/mdsmap.h> #include <linux/ceph/mdsmap.h>
#include <linux/ceph/auth.h> #include <linux/ceph/auth.h>
#include "metric.h"
/* The first 8 bits are reserved for old ceph releases */ /* The first 8 bits are reserved for old ceph releases */
enum ceph_feature_type { enum ceph_feature_type {
CEPHFS_FEATURE_MIMIC = 8, CEPHFS_FEATURE_MIMIC = 8,
...@@ -196,8 +199,12 @@ struct ceph_mds_session { ...@@ -196,8 +199,12 @@ struct ceph_mds_session {
struct list_head s_cap_releases; /* waiting cap_release messages */ struct list_head s_cap_releases; /* waiting cap_release messages */
struct work_struct s_cap_release_work; struct work_struct s_cap_release_work;
/* protected by mutex */ /* See ceph_inode_info->i_dirty_item. */
struct list_head s_cap_dirty; /* inodes w/ dirty caps */
/* See ceph_inode_info->i_flushing_item. */
struct list_head s_cap_flushing; /* inodes w/ flushing caps */ struct list_head s_cap_flushing; /* inodes w/ flushing caps */
unsigned long s_renew_requested; /* last time we sent a renew req */ unsigned long s_renew_requested; /* last time we sent a renew req */
u64 s_renew_seq; u64 s_renew_seq;
...@@ -297,6 +304,8 @@ struct ceph_mds_request { ...@@ -297,6 +304,8 @@ struct ceph_mds_request {
unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */ unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */
unsigned long r_started; /* start time to measure timeout against */ unsigned long r_started; /* start time to measure timeout against */
unsigned long r_start_latency; /* start time to measure latency */
unsigned long r_end_latency; /* finish time to measure latency */
unsigned long r_request_started; /* start time for mds request only, unsigned long r_request_started; /* start time for mds request only,
used to measure lease durations */ used to measure lease durations */
...@@ -419,7 +428,6 @@ struct ceph_mds_client { ...@@ -419,7 +428,6 @@ struct ceph_mds_client {
u64 last_cap_flush_tid; u64 last_cap_flush_tid;
struct list_head cap_flush_list; struct list_head cap_flush_list;
struct list_head cap_dirty; /* inodes with dirty caps */
struct list_head cap_dirty_migrating; /* ...that are migration... */ struct list_head cap_dirty_migrating; /* ...that are migration... */
int num_cap_flushing; /* # caps we are flushing */ int num_cap_flushing; /* # caps we are flushing */
spinlock_t cap_dirty_lock; /* protects above items */ spinlock_t cap_dirty_lock; /* protects above items */
...@@ -454,6 +462,8 @@ struct ceph_mds_client { ...@@ -454,6 +462,8 @@ struct ceph_mds_client {
struct list_head dentry_leases; /* fifo list */ struct list_head dentry_leases; /* fifo list */
struct list_head dentry_dir_leases; /* lru list */ struct list_head dentry_dir_leases; /* lru list */
struct ceph_client_metric metric;
spinlock_t snapid_map_lock; spinlock_t snapid_map_lock;
struct rb_root snapid_map_tree; struct rb_root snapid_map_tree;
struct list_head snapid_map_lru; struct list_head snapid_map_lru;
...@@ -497,6 +507,7 @@ extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, ...@@ -497,6 +507,7 @@ extern int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
struct inode *dir, struct inode *dir,
struct ceph_mds_request *req); struct ceph_mds_request *req);
extern void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req); extern void ceph_mdsc_release_dir_caps(struct ceph_mds_request *req);
extern void ceph_mdsc_release_dir_caps_no_check(struct ceph_mds_request *req);
static inline void ceph_mdsc_get_request(struct ceph_mds_request *req) static inline void ceph_mdsc_get_request(struct ceph_mds_request *req)
{ {
kref_get(&req->r_kref); kref_get(&req->r_kref);
......
/* SPDX-License-Identifier: GPL-2.0 */
#include <linux/types.h>
#include <linux/percpu_counter.h>
#include <linux/math64.h>
#include "metric.h"
int ceph_metric_init(struct ceph_client_metric *m)
{
int ret;
if (!m)
return -EINVAL;
atomic64_set(&m->total_dentries, 0);
ret = percpu_counter_init(&m->d_lease_hit, 0, GFP_KERNEL);
if (ret)
return ret;
ret = percpu_counter_init(&m->d_lease_mis, 0, GFP_KERNEL);
if (ret)
goto err_d_lease_mis;
ret = percpu_counter_init(&m->i_caps_hit, 0, GFP_KERNEL);
if (ret)
goto err_i_caps_hit;
ret = percpu_counter_init(&m->i_caps_mis, 0, GFP_KERNEL);
if (ret)
goto err_i_caps_mis;
spin_lock_init(&m->read_latency_lock);
m->read_latency_sq_sum = 0;
m->read_latency_min = KTIME_MAX;
m->read_latency_max = 0;
m->total_reads = 0;
m->read_latency_sum = 0;
spin_lock_init(&m->write_latency_lock);
m->write_latency_sq_sum = 0;
m->write_latency_min = KTIME_MAX;
m->write_latency_max = 0;
m->total_writes = 0;
m->write_latency_sum = 0;
spin_lock_init(&m->metadata_latency_lock);
m->metadata_latency_sq_sum = 0;
m->metadata_latency_min = KTIME_MAX;
m->metadata_latency_max = 0;
m->total_metadatas = 0;
m->metadata_latency_sum = 0;
return 0;
err_i_caps_mis:
percpu_counter_destroy(&m->i_caps_hit);
err_i_caps_hit:
percpu_counter_destroy(&m->d_lease_mis);
err_d_lease_mis:
percpu_counter_destroy(&m->d_lease_hit);
return ret;
}
void ceph_metric_destroy(struct ceph_client_metric *m)
{
if (!m)
return;
percpu_counter_destroy(&m->i_caps_mis);
percpu_counter_destroy(&m->i_caps_hit);
percpu_counter_destroy(&m->d_lease_mis);
percpu_counter_destroy(&m->d_lease_hit);
}
static inline void __update_latency(ktime_t *totalp, ktime_t *lsump,
ktime_t *min, ktime_t *max,
ktime_t *sq_sump, ktime_t lat)
{
ktime_t total, avg, sq, lsum;
total = ++(*totalp);
lsum = (*lsump += lat);
if (unlikely(lat < *min))
*min = lat;
if (unlikely(lat > *max))
*max = lat;
if (unlikely(total == 1))
return;
/* the sq is (lat - old_avg) * (lat - new_avg) */
avg = DIV64_U64_ROUND_CLOSEST((lsum - lat), (total - 1));
sq = lat - avg;
avg = DIV64_U64_ROUND_CLOSEST(lsum, total);
sq = sq * (lat - avg);
*sq_sump += sq;
}
void ceph_update_read_latency(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
int rc)
{
ktime_t lat = ktime_sub(r_end, r_start);
if (unlikely(rc < 0 && rc != -ENOENT && rc != -ETIMEDOUT))
return;
spin_lock(&m->read_latency_lock);
__update_latency(&m->total_reads, &m->read_latency_sum,
&m->read_latency_min, &m->read_latency_max,
&m->read_latency_sq_sum, lat);
spin_unlock(&m->read_latency_lock);
}
void ceph_update_write_latency(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
int rc)
{
ktime_t lat = ktime_sub(r_end, r_start);
if (unlikely(rc && rc != -ETIMEDOUT))
return;
spin_lock(&m->write_latency_lock);
__update_latency(&m->total_writes, &m->write_latency_sum,
&m->write_latency_min, &m->write_latency_max,
&m->write_latency_sq_sum, lat);
spin_unlock(&m->write_latency_lock);
}
void ceph_update_metadata_latency(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
int rc)
{
ktime_t lat = ktime_sub(r_end, r_start);
if (unlikely(rc && rc != -ENOENT))
return;
spin_lock(&m->metadata_latency_lock);
__update_latency(&m->total_metadatas, &m->metadata_latency_sum,
&m->metadata_latency_min, &m->metadata_latency_max,
&m->metadata_latency_sq_sum, lat);
spin_unlock(&m->metadata_latency_lock);
}
/* SPDX-License-Identifier: GPL-2.0 */
#ifndef _FS_CEPH_MDS_METRIC_H
#define _FS_CEPH_MDS_METRIC_H
#include <linux/types.h>
#include <linux/percpu_counter.h>
#include <linux/ktime.h>
/* This is the global metrics */
struct ceph_client_metric {
atomic64_t total_dentries;
struct percpu_counter d_lease_hit;
struct percpu_counter d_lease_mis;
struct percpu_counter i_caps_hit;
struct percpu_counter i_caps_mis;
spinlock_t read_latency_lock;
u64 total_reads;
ktime_t read_latency_sum;
ktime_t read_latency_sq_sum;
ktime_t read_latency_min;
ktime_t read_latency_max;
spinlock_t write_latency_lock;
u64 total_writes;
ktime_t write_latency_sum;
ktime_t write_latency_sq_sum;
ktime_t write_latency_min;
ktime_t write_latency_max;
spinlock_t metadata_latency_lock;
u64 total_metadatas;
ktime_t metadata_latency_sum;
ktime_t metadata_latency_sq_sum;
ktime_t metadata_latency_min;
ktime_t metadata_latency_max;
};
extern int ceph_metric_init(struct ceph_client_metric *m);
extern void ceph_metric_destroy(struct ceph_client_metric *m);
static inline void ceph_update_cap_hit(struct ceph_client_metric *m)
{
percpu_counter_inc(&m->i_caps_hit);
}
static inline void ceph_update_cap_mis(struct ceph_client_metric *m)
{
percpu_counter_inc(&m->i_caps_mis);
}
extern void ceph_update_read_latency(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
int rc);
extern void ceph_update_write_latency(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
int rc);
extern void ceph_update_metadata_latency(struct ceph_client_metric *m,
ktime_t r_start, ktime_t r_end,
int rc);
#endif /* _FS_CEPH_MDS_METRIC_H */
...@@ -264,7 +264,7 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc, ...@@ -264,7 +264,7 @@ static struct ceph_snap_realm *get_quota_realm(struct ceph_mds_client *mdsc,
return NULL; return NULL;
} }
bool ceph_quota_is_same_realm(struct inode *old, struct inode *new) static bool ceph_quota_is_same_realm(struct inode *old, struct inode *new)
{ {
struct ceph_mds_client *mdsc = ceph_inode_to_client(old)->mdsc; struct ceph_mds_client *mdsc = ceph_inode_to_client(old)->mdsc;
struct ceph_snap_realm *old_realm, *new_realm; struct ceph_snap_realm *old_realm, *new_realm;
...@@ -361,8 +361,6 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op, ...@@ -361,8 +361,6 @@ static bool check_quota_exceeded(struct inode *inode, enum quota_check_op op,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
switch (op) { switch (op) {
case QUOTA_CHECK_MAX_FILES_OP: case QUOTA_CHECK_MAX_FILES_OP:
exceeded = (max && (rvalue >= max));
break;
case QUOTA_CHECK_MAX_BYTES_OP: case QUOTA_CHECK_MAX_BYTES_OP:
exceeded = (max && (rvalue + delta > max)); exceeded = (max && (rvalue + delta > max));
break; break;
...@@ -417,7 +415,7 @@ bool ceph_quota_is_max_files_exceeded(struct inode *inode) ...@@ -417,7 +415,7 @@ bool ceph_quota_is_max_files_exceeded(struct inode *inode)
WARN_ON(!S_ISDIR(inode->i_mode)); WARN_ON(!S_ISDIR(inode->i_mode));
return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 0); return check_quota_exceeded(inode, QUOTA_CHECK_MAX_FILES_OP, 1);
} }
/* /*
...@@ -518,3 +516,59 @@ bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf) ...@@ -518,3 +516,59 @@ bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, struct kstatfs *buf)
return is_updated; return is_updated;
} }
/*
* ceph_quota_check_rename - check if a rename can be executed
* @mdsc: MDS client instance
* @old: inode to be copied
* @new: destination inode (directory)
*
* This function verifies if a rename (e.g. moving a file or directory) can be
* executed. It forces an rstat update in the @new target directory (and in the
* source @old as well, if it's a directory). The actual check is done both for
* max_files and max_bytes.
*
* This function returns 0 if it's OK to do the rename, or, if quotas are
* exceeded, -EXDEV (if @old is a directory) or -EDQUOT.
*/
int ceph_quota_check_rename(struct ceph_mds_client *mdsc,
struct inode *old, struct inode *new)
{
struct ceph_inode_info *ci_old = ceph_inode(old);
int ret = 0;
if (ceph_quota_is_same_realm(old, new))
return 0;
/*
* Get the latest rstat for target directory (and for source, if a
* directory)
*/
ret = ceph_do_getattr(new, CEPH_STAT_RSTAT, false);
if (ret)
return ret;
if (S_ISDIR(old->i_mode)) {
ret = ceph_do_getattr(old, CEPH_STAT_RSTAT, false);
if (ret)
return ret;
ret = check_quota_exceeded(new, QUOTA_CHECK_MAX_BYTES_OP,
ci_old->i_rbytes);
if (!ret)
ret = check_quota_exceeded(new,
QUOTA_CHECK_MAX_FILES_OP,
ci_old->i_rfiles +
ci_old->i_rsubdirs);
if (ret)
ret = -EXDEV;
} else {
ret = check_quota_exceeded(new, QUOTA_CHECK_MAX_BYTES_OP,
i_size_read(old));
if (!ret)
ret = check_quota_exceeded(new,
QUOTA_CHECK_MAX_FILES_OP, 1);
if (ret)
ret = -EDQUOT;
}
return ret;
}
...@@ -128,6 +128,7 @@ struct ceph_fs_client { ...@@ -128,6 +128,7 @@ struct ceph_fs_client {
struct dentry *debugfs_congestion_kb; struct dentry *debugfs_congestion_kb;
struct dentry *debugfs_bdi; struct dentry *debugfs_bdi;
struct dentry *debugfs_mdsc, *debugfs_mdsmap; struct dentry *debugfs_mdsc, *debugfs_mdsmap;
struct dentry *debugfs_metric;
struct dentry *debugfs_mds_sessions; struct dentry *debugfs_mds_sessions;
#endif #endif
...@@ -350,7 +351,25 @@ struct ceph_inode_info { ...@@ -350,7 +351,25 @@ struct ceph_inode_info {
struct rb_root i_caps; /* cap list */ struct rb_root i_caps; /* cap list */
struct ceph_cap *i_auth_cap; /* authoritative cap, if any */ struct ceph_cap *i_auth_cap; /* authoritative cap, if any */
unsigned i_dirty_caps, i_flushing_caps; /* mask of dirtied fields */ unsigned i_dirty_caps, i_flushing_caps; /* mask of dirtied fields */
struct list_head i_dirty_item, i_flushing_item;
/*
* Link to the the auth cap's session's s_cap_dirty list. s_cap_dirty
* is protected by the mdsc->cap_dirty_lock, but each individual item
* is also protected by the inode's i_ceph_lock. Walking s_cap_dirty
* requires the mdsc->cap_dirty_lock. List presence for an item can
* be tested under the i_ceph_lock. Changing anything requires both.
*/
struct list_head i_dirty_item;
/*
* Link to session's s_cap_flushing list. Protected in a similar
* fashion to i_dirty_item, but also by the s_mutex for changes. The
* s_cap_flushing list can be walked while holding either the s_mutex
* or msdc->cap_dirty_lock. List presence can also be checked while
* holding the i_ceph_lock for this inode.
*/
struct list_head i_flushing_item;
/* we need to track cap writeback on a per-cap-bit basis, to allow /* we need to track cap writeback on a per-cap-bit basis, to allow
* overlapping, pipelined cap flushes to the mds. we can probably * overlapping, pipelined cap flushes to the mds. we can probably
* reduce the tid to 8 bits if we're concerned about inode size. */ * reduce the tid to 8 bits if we're concerned about inode size. */
...@@ -644,6 +663,8 @@ static inline bool __ceph_is_any_real_caps(struct ceph_inode_info *ci) ...@@ -644,6 +663,8 @@ static inline bool __ceph_is_any_real_caps(struct ceph_inode_info *ci)
extern int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented); extern int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented);
extern int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int t); extern int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int t);
extern int __ceph_caps_issued_mask_metric(struct ceph_inode_info *ci, int mask,
int t);
extern int __ceph_caps_issued_other(struct ceph_inode_info *ci, extern int __ceph_caps_issued_other(struct ceph_inode_info *ci,
struct ceph_cap *cap); struct ceph_cap *cap);
...@@ -656,12 +677,12 @@ static inline int ceph_caps_issued(struct ceph_inode_info *ci) ...@@ -656,12 +677,12 @@ static inline int ceph_caps_issued(struct ceph_inode_info *ci)
return issued; return issued;
} }
static inline int ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, static inline int ceph_caps_issued_mask_metric(struct ceph_inode_info *ci,
int touch) int mask, int touch)
{ {
int r; int r;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
r = __ceph_caps_issued_mask(ci, mask, touch); r = __ceph_caps_issued_mask_metric(ci, mask, touch);
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
return r; return r;
} }
...@@ -1074,6 +1095,8 @@ extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int caps, ...@@ -1074,6 +1095,8 @@ extern void ceph_take_cap_refs(struct ceph_inode_info *ci, int caps,
bool snap_rwsem_locked); bool snap_rwsem_locked);
extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps); extern void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps);
extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had); extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
extern void ceph_put_cap_refs_no_check_caps(struct ceph_inode_info *ci,
int had);
extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
struct ceph_snap_context *snapc); struct ceph_snap_context *snapc);
extern void ceph_flush_snaps(struct ceph_inode_info *ci, extern void ceph_flush_snaps(struct ceph_inode_info *ci,
...@@ -1189,13 +1212,14 @@ extern void ceph_handle_quota(struct ceph_mds_client *mdsc, ...@@ -1189,13 +1212,14 @@ extern void ceph_handle_quota(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_msg *msg); struct ceph_msg *msg);
extern bool ceph_quota_is_max_files_exceeded(struct inode *inode); extern bool ceph_quota_is_max_files_exceeded(struct inode *inode);
extern bool ceph_quota_is_same_realm(struct inode *old, struct inode *new);
extern bool ceph_quota_is_max_bytes_exceeded(struct inode *inode, extern bool ceph_quota_is_max_bytes_exceeded(struct inode *inode,
loff_t newlen); loff_t newlen);
extern bool ceph_quota_is_max_bytes_approaching(struct inode *inode, extern bool ceph_quota_is_max_bytes_approaching(struct inode *inode,
loff_t newlen); loff_t newlen);
extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc, extern bool ceph_quota_update_statfs(struct ceph_fs_client *fsc,
struct kstatfs *buf); struct kstatfs *buf);
extern int ceph_quota_check_rename(struct ceph_mds_client *mdsc,
struct inode *old, struct inode *new);
extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc); extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc);
#endif /* _FS_CEPH_SUPER_H */ #endif /* _FS_CEPH_SUPER_H */
...@@ -856,7 +856,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value, ...@@ -856,7 +856,7 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
if (ci->i_xattrs.version == 0 || if (ci->i_xattrs.version == 0 ||
!((req_mask & CEPH_CAP_XATTR_SHARED) || !((req_mask & CEPH_CAP_XATTR_SHARED) ||
__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) { __ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 1))) {
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
/* security module gets xattr while filling trace */ /* security module gets xattr while filling trace */
...@@ -914,7 +914,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size) ...@@ -914,7 +914,7 @@ ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
ci->i_xattrs.version, ci->i_xattrs.index_version); ci->i_xattrs.version, ci->i_xattrs.index_version);
if (ci->i_xattrs.version == 0 || if (ci->i_xattrs.version == 0 ||
!__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) { !__ceph_caps_issued_mask_metric(ci, CEPH_CAP_XATTR_SHARED, 1)) {
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true); err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
if (err) if (err)
......
...@@ -53,6 +53,8 @@ struct ceph_options { ...@@ -53,6 +53,8 @@ struct ceph_options {
unsigned long osd_keepalive_timeout; /* jiffies */ unsigned long osd_keepalive_timeout; /* jiffies */
unsigned long osd_request_timeout; /* jiffies */ unsigned long osd_request_timeout; /* jiffies */
u32 osd_req_flags; /* CEPH_OSD_FLAG_*, applied to each OSD request */
/* /*
* any type that can't be simply compared or doesn't need * any type that can't be simply compared or doesn't need
* to be compared should go beyond this point, * to be compared should go beyond this point,
...@@ -64,6 +66,7 @@ struct ceph_options { ...@@ -64,6 +66,7 @@ struct ceph_options {
int num_mon; int num_mon;
char *name; char *name;
struct ceph_crypto_key *key; struct ceph_crypto_key *key;
struct rb_root crush_locs;
}; };
/* /*
...@@ -188,7 +191,7 @@ static inline int calc_pages_for(u64 off, u64 len) ...@@ -188,7 +191,7 @@ static inline int calc_pages_for(u64 off, u64 len)
#define RB_CMP3WAY(a, b) ((a) < (b) ? -1 : (a) > (b)) #define RB_CMP3WAY(a, b) ((a) < (b) ? -1 : (a) > (b))
#define DEFINE_RB_INSDEL_FUNCS2(name, type, keyfld, cmpexp, keyexp, nodefld) \ #define DEFINE_RB_INSDEL_FUNCS2(name, type, keyfld, cmpexp, keyexp, nodefld) \
static void insert_##name(struct rb_root *root, type *t) \ static bool __insert_##name(struct rb_root *root, type *t) \
{ \ { \
struct rb_node **n = &root->rb_node; \ struct rb_node **n = &root->rb_node; \
struct rb_node *parent = NULL; \ struct rb_node *parent = NULL; \
...@@ -206,11 +209,17 @@ static void insert_##name(struct rb_root *root, type *t) \ ...@@ -206,11 +209,17 @@ static void insert_##name(struct rb_root *root, type *t) \
else if (cmp > 0) \ else if (cmp > 0) \
n = &(*n)->rb_right; \ n = &(*n)->rb_right; \
else \ else \
BUG(); \ return false; \
} \ } \
\ \
rb_link_node(&t->nodefld, parent, n); \ rb_link_node(&t->nodefld, parent, n); \
rb_insert_color(&t->nodefld, root); \ rb_insert_color(&t->nodefld, root); \
return true; \
} \
static void __maybe_unused insert_##name(struct rb_root *root, type *t) \
{ \
if (!__insert_##name(root, t)) \
BUG(); \
} \ } \
static void erase_##name(struct rb_root *root, type *t) \ static void erase_##name(struct rb_root *root, type *t) \
{ \ { \
......
...@@ -19,7 +19,7 @@ struct ceph_monmap { ...@@ -19,7 +19,7 @@ struct ceph_monmap {
struct ceph_fsid fsid; struct ceph_fsid fsid;
u32 epoch; u32 epoch;
u32 num_mon; u32 num_mon;
struct ceph_entity_inst mon_inst[0]; struct ceph_entity_inst mon_inst[];
}; };
struct ceph_mon_client; struct ceph_mon_client;
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include <linux/mempool.h> #include <linux/mempool.h>
#include <linux/rbtree.h> #include <linux/rbtree.h>
#include <linux/refcount.h> #include <linux/refcount.h>
#include <linux/ktime.h>
#include <linux/ceph/types.h> #include <linux/ceph/types.h>
#include <linux/ceph/osdmap.h> #include <linux/ceph/osdmap.h>
...@@ -135,6 +136,7 @@ struct ceph_osd_req_op { ...@@ -135,6 +136,7 @@ struct ceph_osd_req_op {
struct { struct {
u64 expected_object_size; u64 expected_object_size;
u64 expected_write_size; u64 expected_write_size;
u32 flags; /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
} alloc_hint; } alloc_hint;
struct { struct {
u64 snapid; u64 snapid;
...@@ -164,6 +166,7 @@ struct ceph_osd_request_target { ...@@ -164,6 +166,7 @@ struct ceph_osd_request_target {
bool recovery_deletes; bool recovery_deletes;
unsigned int flags; /* CEPH_OSD_FLAG_* */ unsigned int flags; /* CEPH_OSD_FLAG_* */
bool used_replica;
bool paused; bool paused;
u32 epoch; u32 epoch;
...@@ -213,6 +216,8 @@ struct ceph_osd_request { ...@@ -213,6 +216,8 @@ struct ceph_osd_request {
/* internal */ /* internal */
unsigned long r_stamp; /* jiffies, send or check time */ unsigned long r_stamp; /* jiffies, send or check time */
unsigned long r_start_stamp; /* jiffies */ unsigned long r_start_stamp; /* jiffies */
ktime_t r_start_latency; /* ktime_t */
ktime_t r_end_latency; /* ktime_t */
int r_attempts; int r_attempts;
u32 r_map_dne_bound; u32 r_map_dne_bound;
...@@ -468,7 +473,8 @@ extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int ...@@ -468,7 +473,8 @@ extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int
extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
unsigned int which, unsigned int which,
u64 expected_object_size, u64 expected_object_size,
u64 expected_write_size); u64 expected_write_size,
u32 flags);
extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
struct ceph_snap_context *snapc, struct ceph_snap_context *snapc,
......
...@@ -302,9 +302,26 @@ bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap, ...@@ -302,9 +302,26 @@ bool ceph_pg_to_primary_shard(struct ceph_osdmap *osdmap,
int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap, int ceph_pg_to_acting_primary(struct ceph_osdmap *osdmap,
const struct ceph_pg *raw_pgid); const struct ceph_pg *raw_pgid);
struct crush_loc {
char *cl_type_name;
char *cl_name;
};
struct crush_loc_node {
struct rb_node cl_node;
struct crush_loc cl_loc; /* pointers into cl_data */
char cl_data[];
};
int ceph_parse_crush_location(char *crush_location, struct rb_root *locs);
int ceph_compare_crush_locs(struct rb_root *locs1, struct rb_root *locs2);
void ceph_clear_crush_locs(struct rb_root *locs);
int ceph_get_crush_locality(struct ceph_osdmap *osdmap, int id,
struct rb_root *locs);
extern struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map, extern struct ceph_pg_pool_info *ceph_pg_pool_by_id(struct ceph_osdmap *map,
u64 id); u64 id);
extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id); extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id);
extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name); extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id); u64 ceph_pg_pool_flags(struct ceph_osdmap *map, u64 id);
......
...@@ -464,6 +464,19 @@ enum { ...@@ -464,6 +464,19 @@ enum {
const char *ceph_osd_watch_op_name(int o); const char *ceph_osd_watch_op_name(int o);
enum {
CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE = 1,
CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_WRITE = 2,
CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ = 4,
CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ = 8,
CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY = 16,
CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE = 32,
CEPH_OSD_ALLOC_HINT_FLAG_SHORTLIVED = 64,
CEPH_OSD_ALLOC_HINT_FLAG_LONGLIVED = 128,
CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE = 256,
CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE = 512,
};
enum { enum {
CEPH_OSD_BACKOFF_OP_BLOCK = 1, CEPH_OSD_BACKOFF_OP_BLOCK = 1,
CEPH_OSD_BACKOFF_OP_ACK_BLOCK = 2, CEPH_OSD_BACKOFF_OP_ACK_BLOCK = 2,
...@@ -517,6 +530,7 @@ struct ceph_osd_op { ...@@ -517,6 +530,7 @@ struct ceph_osd_op {
struct { struct {
__le64 expected_object_size; __le64 expected_object_size;
__le64 expected_write_size; __le64 expected_write_size;
__le32 flags; /* CEPH_OSD_OP_ALLOC_HINT_FLAG_* */
} __attribute__ ((packed)) alloc_hint; } __attribute__ ((packed)) alloc_hint;
struct { struct {
__le64 snapid; __le64 snapid;
......
...@@ -87,7 +87,7 @@ struct crush_rule_mask { ...@@ -87,7 +87,7 @@ struct crush_rule_mask {
struct crush_rule { struct crush_rule {
__u32 len; __u32 len;
struct crush_rule_mask mask; struct crush_rule_mask mask;
struct crush_rule_step steps[0]; struct crush_rule_step steps[];
}; };
#define crush_rule_size(len) (sizeof(struct crush_rule) + \ #define crush_rule_size(len) (sizeof(struct crush_rule) + \
...@@ -301,6 +301,12 @@ struct crush_map { ...@@ -301,6 +301,12 @@ struct crush_map {
__u32 *choose_tries; __u32 *choose_tries;
#else #else
/* device/bucket type id -> type name (CrushWrapper::type_map) */
struct rb_root type_names;
/* device/bucket id -> name (CrushWrapper::name_map) */
struct rb_root names;
/* CrushWrapper::choose_args */ /* CrushWrapper::choose_args */
struct rb_root choose_args; struct rb_root choose_args;
#endif #endif
...@@ -342,4 +348,10 @@ struct crush_work { ...@@ -342,4 +348,10 @@ struct crush_work {
struct crush_work_bucket **work; /* Per-bucket working store */ struct crush_work_bucket **work; /* Per-bucket working store */
}; };
#ifdef __KERNEL__
/* osdmap.c */
void clear_crush_names(struct rb_root *root);
void clear_choose_args(struct crush_map *c);
#endif
#endif #endif
...@@ -176,6 +176,10 @@ int ceph_compare_options(struct ceph_options *new_opt, ...@@ -176,6 +176,10 @@ int ceph_compare_options(struct ceph_options *new_opt,
} }
} }
ret = ceph_compare_crush_locs(&opt1->crush_locs, &opt2->crush_locs);
if (ret)
return ret;
/* any matching mon ip implies a match */ /* any matching mon ip implies a match */
for (i = 0; i < opt1->num_mon; i++) { for (i = 0; i < opt1->num_mon; i++) {
if (ceph_monmap_contains(client->monc.monmap, if (ceph_monmap_contains(client->monc.monmap,
...@@ -259,6 +263,8 @@ enum { ...@@ -259,6 +263,8 @@ enum {
Opt_secret, Opt_secret,
Opt_key, Opt_key,
Opt_ip, Opt_ip,
Opt_crush_location,
Opt_read_from_replica,
/* string args above */ /* string args above */
Opt_share, Opt_share,
Opt_crc, Opt_crc,
...@@ -268,11 +274,25 @@ enum { ...@@ -268,11 +274,25 @@ enum {
Opt_abort_on_full, Opt_abort_on_full,
}; };
enum {
Opt_read_from_replica_no,
Opt_read_from_replica_balance,
Opt_read_from_replica_localize,
};
static const struct constant_table ceph_param_read_from_replica[] = {
{"no", Opt_read_from_replica_no},
{"balance", Opt_read_from_replica_balance},
{"localize", Opt_read_from_replica_localize},
{}
};
static const struct fs_parameter_spec ceph_parameters[] = { static const struct fs_parameter_spec ceph_parameters[] = {
fsparam_flag ("abort_on_full", Opt_abort_on_full), fsparam_flag ("abort_on_full", Opt_abort_on_full),
fsparam_flag_no ("cephx_require_signatures", Opt_cephx_require_signatures), fsparam_flag_no ("cephx_require_signatures", Opt_cephx_require_signatures),
fsparam_flag_no ("cephx_sign_messages", Opt_cephx_sign_messages), fsparam_flag_no ("cephx_sign_messages", Opt_cephx_sign_messages),
fsparam_flag_no ("crc", Opt_crc), fsparam_flag_no ("crc", Opt_crc),
fsparam_string ("crush_location", Opt_crush_location),
fsparam_string ("fsid", Opt_fsid), fsparam_string ("fsid", Opt_fsid),
fsparam_string ("ip", Opt_ip), fsparam_string ("ip", Opt_ip),
fsparam_string ("key", Opt_key), fsparam_string ("key", Opt_key),
...@@ -283,6 +303,8 @@ static const struct fs_parameter_spec ceph_parameters[] = { ...@@ -283,6 +303,8 @@ static const struct fs_parameter_spec ceph_parameters[] = {
fsparam_u32 ("osdkeepalive", Opt_osdkeepalivetimeout), fsparam_u32 ("osdkeepalive", Opt_osdkeepalivetimeout),
__fsparam (fs_param_is_s32, "osdtimeout", Opt_osdtimeout, __fsparam (fs_param_is_s32, "osdtimeout", Opt_osdtimeout,
fs_param_deprecated, NULL), fs_param_deprecated, NULL),
fsparam_enum ("read_from_replica", Opt_read_from_replica,
ceph_param_read_from_replica),
fsparam_string ("secret", Opt_secret), fsparam_string ("secret", Opt_secret),
fsparam_flag_no ("share", Opt_share), fsparam_flag_no ("share", Opt_share),
fsparam_flag_no ("tcp_nodelay", Opt_tcp_nodelay), fsparam_flag_no ("tcp_nodelay", Opt_tcp_nodelay),
...@@ -297,6 +319,7 @@ struct ceph_options *ceph_alloc_options(void) ...@@ -297,6 +319,7 @@ struct ceph_options *ceph_alloc_options(void)
if (!opt) if (!opt)
return NULL; return NULL;
opt->crush_locs = RB_ROOT;
opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr), opt->mon_addr = kcalloc(CEPH_MAX_MON, sizeof(*opt->mon_addr),
GFP_KERNEL); GFP_KERNEL);
if (!opt->mon_addr) { if (!opt->mon_addr) {
...@@ -319,6 +342,7 @@ void ceph_destroy_options(struct ceph_options *opt) ...@@ -319,6 +342,7 @@ void ceph_destroy_options(struct ceph_options *opt)
if (!opt) if (!opt)
return; return;
ceph_clear_crush_locs(&opt->crush_locs);
kfree(opt->name); kfree(opt->name);
if (opt->key) { if (opt->key) {
ceph_crypto_key_destroy(opt->key); ceph_crypto_key_destroy(opt->key);
...@@ -453,6 +477,34 @@ int ceph_parse_param(struct fs_parameter *param, struct ceph_options *opt, ...@@ -453,6 +477,34 @@ int ceph_parse_param(struct fs_parameter *param, struct ceph_options *opt,
if (!opt->key) if (!opt->key)
return -ENOMEM; return -ENOMEM;
return get_secret(opt->key, param->string, &log); return get_secret(opt->key, param->string, &log);
case Opt_crush_location:
ceph_clear_crush_locs(&opt->crush_locs);
err = ceph_parse_crush_location(param->string,
&opt->crush_locs);
if (err) {
error_plog(&log, "Failed to parse CRUSH location: %d",
err);
return err;
}
break;
case Opt_read_from_replica:
switch (result.uint_32) {
case Opt_read_from_replica_no:
opt->osd_req_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
CEPH_OSD_FLAG_LOCALIZE_READS);
break;
case Opt_read_from_replica_balance:
opt->osd_req_flags |= CEPH_OSD_FLAG_BALANCE_READS;
opt->osd_req_flags &= ~CEPH_OSD_FLAG_LOCALIZE_READS;
break;
case Opt_read_from_replica_localize:
opt->osd_req_flags |= CEPH_OSD_FLAG_LOCALIZE_READS;
opt->osd_req_flags &= ~CEPH_OSD_FLAG_BALANCE_READS;
break;
default:
BUG();
}
break;
case Opt_osdtimeout: case Opt_osdtimeout:
warn_plog(&log, "Ignoring osdtimeout"); warn_plog(&log, "Ignoring osdtimeout");
...@@ -535,6 +587,7 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client, ...@@ -535,6 +587,7 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
{ {
struct ceph_options *opt = client->options; struct ceph_options *opt = client->options;
size_t pos = m->count; size_t pos = m->count;
struct rb_node *n;
if (opt->name) { if (opt->name) {
seq_puts(m, "name="); seq_puts(m, "name=");
...@@ -544,6 +597,28 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client, ...@@ -544,6 +597,28 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client,
if (opt->key) if (opt->key)
seq_puts(m, "secret=<hidden>,"); seq_puts(m, "secret=<hidden>,");
if (!RB_EMPTY_ROOT(&opt->crush_locs)) {
seq_puts(m, "crush_location=");
for (n = rb_first(&opt->crush_locs); ; ) {
struct crush_loc_node *loc =
rb_entry(n, struct crush_loc_node, cl_node);
seq_printf(m, "%s:%s", loc->cl_loc.cl_type_name,
loc->cl_loc.cl_name);
n = rb_next(n);
if (!n)
break;
seq_putc(m, '|');
}
seq_putc(m, ',');
}
if (opt->osd_req_flags & CEPH_OSD_FLAG_BALANCE_READS) {
seq_puts(m, "read_from_replica=balance,");
} else if (opt->osd_req_flags & CEPH_OSD_FLAG_LOCALIZE_READS) {
seq_puts(m, "read_from_replica=localize,");
}
if (opt->flags & CEPH_OPT_FSID) if (opt->flags & CEPH_OPT_FSID)
seq_printf(m, "fsid=%pU,", &opt->fsid); seq_printf(m, "fsid=%pU,", &opt->fsid);
if (opt->flags & CEPH_OPT_NOSHARE) if (opt->flags & CEPH_OPT_NOSHARE)
......
...@@ -2,7 +2,6 @@ ...@@ -2,7 +2,6 @@
#ifdef __KERNEL__ #ifdef __KERNEL__
# include <linux/slab.h> # include <linux/slab.h>
# include <linux/crush/crush.h> # include <linux/crush/crush.h>
void clear_choose_args(struct crush_map *c);
#else #else
# include "crush_compat.h" # include "crush_compat.h"
# include "crush.h" # include "crush.h"
...@@ -130,6 +129,8 @@ void crush_destroy(struct crush_map *map) ...@@ -130,6 +129,8 @@ void crush_destroy(struct crush_map *map)
#ifndef __KERNEL__ #ifndef __KERNEL__
kfree(map->choose_tries); kfree(map->choose_tries);
#else #else
clear_crush_names(&map->type_names);
clear_crush_names(&map->names);
clear_choose_args(map); clear_choose_args(map);
#endif #endif
kfree(map); kfree(map);
......
...@@ -81,11 +81,13 @@ static int osdmap_show(struct seq_file *s, void *p) ...@@ -81,11 +81,13 @@ static int osdmap_show(struct seq_file *s, void *p)
u32 state = map->osd_state[i]; u32 state = map->osd_state[i];
char sb[64]; char sb[64];
seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\n", seq_printf(s, "osd%d\t%s\t%3d%%\t(%s)\t%3d%%\t%2d\n",
i, ceph_pr_addr(addr), i, ceph_pr_addr(addr),
((map->osd_weight[i]*100) >> 16), ((map->osd_weight[i]*100) >> 16),
ceph_osdmap_state_str(sb, sizeof(sb), state), ceph_osdmap_state_str(sb, sizeof(sb), state),
((ceph_get_primary_affinity(map, i)*100) >> 16)); ((ceph_get_primary_affinity(map, i)*100) >> 16),
ceph_get_crush_locality(map, i,
&client->options->crush_locs));
} }
for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) { for (n = rb_first(&map->pg_temp); n; n = rb_next(n)) {
struct ceph_pg_mapping *pg = struct ceph_pg_mapping *pg =
......
...@@ -932,10 +932,14 @@ static void osd_req_op_watch_init(struct ceph_osd_request *req, int which, ...@@ -932,10 +932,14 @@ static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
op->watch.gen = 0; op->watch.gen = 0;
} }
/*
* @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
*/
void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
unsigned int which, unsigned int which,
u64 expected_object_size, u64 expected_object_size,
u64 expected_write_size) u64 expected_write_size,
u32 flags)
{ {
struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
CEPH_OSD_OP_SETALLOCHINT, CEPH_OSD_OP_SETALLOCHINT,
...@@ -943,6 +947,7 @@ void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req, ...@@ -943,6 +947,7 @@ void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
op->alloc_hint.expected_object_size = expected_object_size; op->alloc_hint.expected_object_size = expected_object_size;
op->alloc_hint.expected_write_size = expected_write_size; op->alloc_hint.expected_write_size = expected_write_size;
op->alloc_hint.flags = flags;
/* /*
* CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
...@@ -1018,6 +1023,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst, ...@@ -1018,6 +1023,7 @@ static u32 osd_req_encode_op(struct ceph_osd_op *dst,
cpu_to_le64(src->alloc_hint.expected_object_size); cpu_to_le64(src->alloc_hint.expected_object_size);
dst->alloc_hint.expected_write_size = dst->alloc_hint.expected_write_size =
cpu_to_le64(src->alloc_hint.expected_write_size); cpu_to_le64(src->alloc_hint.expected_write_size);
dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
break; break;
case CEPH_OSD_OP_SETXATTR: case CEPH_OSD_OP_SETXATTR:
case CEPH_OSD_OP_CMPXATTR: case CEPH_OSD_OP_CMPXATTR:
...@@ -1497,6 +1503,45 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc, ...@@ -1497,6 +1503,45 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
(osdc->osdmap->epoch < osdc->epoch_barrier); (osdc->osdmap->epoch < osdc->epoch_barrier);
} }
static int pick_random_replica(const struct ceph_osds *acting)
{
int i = prandom_u32() % acting->size;
dout("%s picked osd%d, primary osd%d\n", __func__,
acting->osds[i], acting->primary);
return i;
}
/*
* Picks the closest replica based on client's location given by
* crush_location option. Prefers the primary if the locality is
* the same.
*/
static int pick_closest_replica(struct ceph_osd_client *osdc,
const struct ceph_osds *acting)
{
struct ceph_options *opt = osdc->client->options;
int best_i, best_locality;
int i = 0, locality;
do {
locality = ceph_get_crush_locality(osdc->osdmap,
acting->osds[i],
&opt->crush_locs);
if (i == 0 ||
(locality >= 0 && best_locality < 0) ||
(locality >= 0 && best_locality >= 0 &&
locality < best_locality)) {
best_i = i;
best_locality = locality;
}
} while (++i < acting->size);
dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
acting->osds[best_i], best_locality, acting->primary);
return best_i;
}
enum calc_target_result { enum calc_target_result {
CALC_TARGET_NO_ACTION = 0, CALC_TARGET_NO_ACTION = 0,
CALC_TARGET_NEED_RESEND, CALC_TARGET_NEED_RESEND,
...@@ -1510,6 +1555,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, ...@@ -1510,6 +1555,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
struct ceph_pg_pool_info *pi; struct ceph_pg_pool_info *pi;
struct ceph_pg pgid, last_pgid; struct ceph_pg pgid, last_pgid;
struct ceph_osds up, acting; struct ceph_osds up, acting;
bool is_read = t->flags & CEPH_OSD_FLAG_READ;
bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
bool force_resend = false; bool force_resend = false;
bool unpaused = false; bool unpaused = false;
bool legacy_change = false; bool legacy_change = false;
...@@ -1540,9 +1587,9 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, ...@@ -1540,9 +1587,9 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
ceph_oid_copy(&t->target_oid, &t->base_oid); ceph_oid_copy(&t->target_oid, &t->base_oid);
ceph_oloc_copy(&t->target_oloc, &t->base_oloc); ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0) if (is_read && pi->read_tier >= 0)
t->target_oloc.pool = pi->read_tier; t->target_oloc.pool = pi->read_tier;
if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0) if (is_write && pi->write_tier >= 0)
t->target_oloc.pool = pi->write_tier; t->target_oloc.pool = pi->write_tier;
pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool); pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
...@@ -1581,7 +1628,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, ...@@ -1581,7 +1628,8 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
unpaused = true; unpaused = true;
} }
legacy_change = ceph_pg_compare(&t->pgid, &pgid) || legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
ceph_osds_changed(&t->acting, &acting, any_change); ceph_osds_changed(&t->acting, &acting,
t->used_replica || any_change);
if (t->pg_num) if (t->pg_num)
split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num); split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
...@@ -1597,7 +1645,24 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, ...@@ -1597,7 +1645,24 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
t->sort_bitwise = sort_bitwise; t->sort_bitwise = sort_bitwise;
t->recovery_deletes = recovery_deletes; t->recovery_deletes = recovery_deletes;
t->osd = acting.primary; if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
CEPH_OSD_FLAG_LOCALIZE_READS)) &&
!is_write && pi->type == CEPH_POOL_TYPE_REP &&
acting.size > 1) {
int pos;
WARN_ON(!is_read || acting.osds[0] != acting.primary);
if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
pos = pick_random_replica(&acting);
} else {
pos = pick_closest_replica(osdc, &acting);
}
t->osd = acting.osds[pos];
t->used_replica = pos > 0;
} else {
t->osd = acting.primary;
t->used_replica = false;
}
} }
if (unpaused || legacy_change || force_resend || split) if (unpaused || legacy_change || force_resend || split)
...@@ -2366,13 +2431,17 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked) ...@@ -2366,13 +2431,17 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
static void account_request(struct ceph_osd_request *req) static void account_request(struct ceph_osd_request *req)
{ {
struct ceph_osd_client *osdc = req->r_osdc;
WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK)); WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE))); WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
req->r_flags |= CEPH_OSD_FLAG_ONDISK; req->r_flags |= CEPH_OSD_FLAG_ONDISK;
atomic_inc(&req->r_osdc->num_requests); req->r_flags |= osdc->client->options->osd_req_flags;
atomic_inc(&osdc->num_requests);
req->r_start_stamp = jiffies; req->r_start_stamp = jiffies;
req->r_start_latency = ktime_get();
} }
static void submit_request(struct ceph_osd_request *req, bool wrlocked) static void submit_request(struct ceph_osd_request *req, bool wrlocked)
...@@ -2389,6 +2458,8 @@ static void finish_request(struct ceph_osd_request *req) ...@@ -2389,6 +2458,8 @@ static void finish_request(struct ceph_osd_request *req)
WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid)); WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
dout("%s req %p tid %llu\n", __func__, req, req->r_tid); dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
req->r_end_latency = ktime_get();
if (req->r_osd) if (req->r_osd)
unlink_request(req->r_osd, req); unlink_request(req->r_osd, req);
atomic_dec(&osdc->num_requests); atomic_dec(&osdc->num_requests);
...@@ -3657,6 +3728,26 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg) ...@@ -3657,6 +3728,26 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
goto out_unlock_osdc; goto out_unlock_osdc;
} }
if (m.result == -EAGAIN) {
dout("req %p tid %llu EAGAIN\n", req, req->r_tid);
unlink_request(osd, req);
mutex_unlock(&osd->lock);
/*
* The object is missing on the replica or not (yet)
* readable. Clear pgid to force a resend to the primary
* via legacy_change.
*/
req->r_t.pgid.pool = 0;
req->r_t.pgid.seed = 0;
WARN_ON(!req->r_t.used_replica);
req->r_flags &= ~(CEPH_OSD_FLAG_BALANCE_READS |
CEPH_OSD_FLAG_LOCALIZE_READS);
req->r_tid = 0;
__submit_request(req, false);
goto out_unlock_osdc;
}
if (m.num_ops != req->r_num_ops) { if (m.num_ops != req->r_num_ops) {
pr_err("num_ops %d != %d for tid %llu\n", m.num_ops, pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
req->r_num_ops, req->r_tid); req->r_num_ops, req->r_tid);
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment