Commit a3021a59 authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph fixes from Sage Weil:
 "We have a few follow-up fixes for the libceph refactor from Ilya, and
  then some cephfs + fscache fixes from Zheng.

  The first two FS-Cache patches are acked by David Howells and deemed
  trivial enough to go through our tree.  The rest fix some issues with
  the ceph fscache handling (disable cache for inodes opened for write,
  and simplify the revalidation logic accordingly, dropping the
  now-unnecessary work queue)"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: use i_version to check validity of fscache
  ceph: improve fscache revalidation
  ceph: disable fscache when inode is opened for write
  ceph: avoid unnecessary fscache invalidation/revlidation
  ceph: call __fscache_uncache_page() if readpages fails
  FS-Cache: make check_consistency callback return int
  FS-Cache: wake write waiter after invalidating writes
  libceph: use %s instead of %pE in dout()s
  libceph: put request only if it's done in handle_reply()
  libceph: change ceph_osdmap_flag() to take osdc
parents eb10a7b7 f6973c09
......@@ -380,7 +380,7 @@ static void cachefiles_sync_cache(struct fscache_cache *_cache)
* check if the backing cache is updated to FS-Cache
* - called by FS-Cache when evaluates if need to invalidate the cache
*/
static bool cachefiles_check_consistency(struct fscache_operation *op)
static int cachefiles_check_consistency(struct fscache_operation *op)
{
struct cachefiles_object *object;
struct cachefiles_cache *cache;
......
......@@ -276,8 +276,10 @@ static void finish_read(struct ceph_osd_request *req)
for (i = 0; i < num_pages; i++) {
struct page *page = osd_data->pages[i];
if (rc < 0 && rc != -ENOENT)
if (rc < 0 && rc != -ENOENT) {
ceph_fscache_readpage_cancel(inode, page);
goto unlock;
}
if (bytes < (int)PAGE_SIZE) {
/* zero (remainder of) page */
int s = bytes < 0 ? 0 : bytes;
......@@ -535,8 +537,6 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
ceph_readpage_to_fscache(inode, page);
set_page_writeback(page);
err = ceph_osdc_writepages(osdc, ceph_vino(inode),
&ci->i_layout, snapc,
......
......@@ -25,6 +25,7 @@
#include "cache.h"
struct ceph_aux_inode {
u64 version;
struct timespec mtime;
loff_t size;
};
......@@ -69,15 +70,8 @@ int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index,
&ceph_fscache_fsid_object_def,
fsc, true);
if (fsc->fscache == NULL) {
if (!fsc->fscache)
pr_err("Unable to resgister fsid: %p fscache cookie", fsc);
return 0;
}
fsc->revalidate_wq = alloc_workqueue("ceph-revalidate", 0, 1);
if (fsc->revalidate_wq == NULL)
return -ENOMEM;
return 0;
}
......@@ -105,6 +99,7 @@ static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
const struct inode* inode = &ci->vfs_inode;
memset(&aux, 0, sizeof(aux));
aux.version = ci->i_version;
aux.mtime = inode->i_mtime;
aux.size = i_size_read(inode);
......@@ -131,6 +126,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux(
return FSCACHE_CHECKAUX_OBSOLETE;
memset(&aux, 0, sizeof(aux));
aux.version = ci->i_version;
aux.mtime = inode->i_mtime;
aux.size = i_size_read(inode);
......@@ -181,32 +177,26 @@ static const struct fscache_cookie_def ceph_fscache_inode_object_def = {
.now_uncached = ceph_fscache_inode_now_uncached,
};
void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
struct ceph_inode_info* ci)
void ceph_fscache_register_inode_cookie(struct inode *inode)
{
struct inode* inode = &ci->vfs_inode;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
/* No caching for filesystem */
if (fsc->fscache == NULL)
return;
/* Only cache for regular files that are read only */
if ((ci->vfs_inode.i_mode & S_IFREG) == 0)
if (!S_ISREG(inode->i_mode))
return;
/* Avoid multiple racing open requests */
inode_lock(inode);
if (ci->fscache)
goto done;
ci->fscache = fscache_acquire_cookie(fsc->fscache,
&ceph_fscache_inode_object_def,
ci, true);
fscache_check_consistency(ci->fscache);
done:
inode_lock_nested(inode, I_MUTEX_CHILD);
if (!ci->fscache) {
ci->fscache = fscache_acquire_cookie(fsc->fscache,
&ceph_fscache_inode_object_def,
ci, false);
}
inode_unlock(inode);
}
void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
......@@ -222,6 +212,34 @@ void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
fscache_relinquish_cookie(cookie, 0);
}
static bool ceph_fscache_can_enable(void *data)
{
struct inode *inode = data;
return !inode_is_open_for_write(inode);
}
void ceph_fscache_file_set_cookie(struct inode *inode, struct file *filp)
{
struct ceph_inode_info *ci = ceph_inode(inode);
if (!fscache_cookie_valid(ci->fscache))
return;
if (inode_is_open_for_write(inode)) {
dout("fscache_file_set_cookie %p %p disabling cache\n",
inode, filp);
fscache_disable_cookie(ci->fscache, false);
fscache_uncache_all_inode_pages(ci->fscache, inode);
} else {
fscache_enable_cookie(ci->fscache, ceph_fscache_can_enable,
inode);
if (fscache_cookie_enabled(ci->fscache)) {
dout("fscache_file_set_cookie %p %p enabing cache\n",
inode, filp);
}
}
}
static void ceph_vfs_readpage_complete(struct page *page, void *data, int error)
{
if (!error)
......@@ -238,8 +256,7 @@ static void ceph_vfs_readpage_complete_unlock(struct page *page, void *data, int
static inline bool cache_valid(struct ceph_inode_info *ci)
{
return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) &&
(ci->i_fscache_gen == ci->i_rdcache_gen));
return ci->i_fscache_gen == ci->i_rdcache_gen;
}
......@@ -332,69 +349,27 @@ void ceph_invalidate_fscache_page(struct inode* inode, struct page *page)
void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
{
if (fsc->revalidate_wq)
destroy_workqueue(fsc->revalidate_wq);
fscache_relinquish_cookie(fsc->fscache, 0);
fsc->fscache = NULL;
}
static void ceph_revalidate_work(struct work_struct *work)
{
int issued;
u32 orig_gen;
struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
i_revalidate_work);
struct inode *inode = &ci->vfs_inode;
spin_lock(&ci->i_ceph_lock);
issued = __ceph_caps_issued(ci, NULL);
orig_gen = ci->i_rdcache_gen;
spin_unlock(&ci->i_ceph_lock);
if (!(issued & CEPH_CAP_FILE_CACHE)) {
dout("revalidate_work lost cache before validation %p\n",
inode);
goto out;
}
if (!fscache_check_consistency(ci->fscache))
fscache_invalidate(ci->fscache);
spin_lock(&ci->i_ceph_lock);
/* Update the new valid generation (backwards sanity check too) */
if (orig_gen > ci->i_fscache_gen) {
ci->i_fscache_gen = orig_gen;
}
spin_unlock(&ci->i_ceph_lock);
out:
iput(&ci->vfs_inode);
}
void ceph_queue_revalidate(struct inode *inode)
/*
* caller should hold CEPH_CAP_FILE_{RD,CACHE}
*/
void ceph_fscache_revalidate_cookie(struct ceph_inode_info *ci)
{
struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
struct ceph_inode_info *ci = ceph_inode(inode);
if (fsc->revalidate_wq == NULL || ci->fscache == NULL)
if (cache_valid(ci))
return;
ihold(inode);
if (queue_work(ceph_sb_to_client(inode->i_sb)->revalidate_wq,
&ci->i_revalidate_work)) {
dout("ceph_queue_revalidate %p\n", inode);
} else {
dout("ceph_queue_revalidate %p failed\n)", inode);
iput(inode);
/* resue i_truncate_mutex. There should be no pending
* truncate while the caller holds CEPH_CAP_FILE_RD */
mutex_lock(&ci->i_truncate_mutex);
if (!cache_valid(ci)) {
if (fscache_check_consistency(ci->fscache))
fscache_invalidate(ci->fscache);
spin_lock(&ci->i_ceph_lock);
ci->i_fscache_gen = ci->i_rdcache_gen;
spin_unlock(&ci->i_ceph_lock);
}
}
void ceph_fscache_inode_init(struct ceph_inode_info *ci)
{
ci->fscache = NULL;
/* The first load is verifed cookie open time */
ci->i_fscache_gen = 1;
INIT_WORK(&ci->i_revalidate_work, ceph_revalidate_work);
mutex_unlock(&ci->i_truncate_mutex);
}
......@@ -34,10 +34,10 @@ void ceph_fscache_unregister(void);
int ceph_fscache_register_fs(struct ceph_fs_client* fsc);
void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc);
void ceph_fscache_inode_init(struct ceph_inode_info *ci);
void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
struct ceph_inode_info* ci);
void ceph_fscache_register_inode_cookie(struct inode *inode);
void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci);
void ceph_fscache_file_set_cookie(struct inode *inode, struct file *filp);
void ceph_fscache_revalidate_cookie(struct ceph_inode_info *ci);
int ceph_readpage_from_fscache(struct inode *inode, struct page *page);
int ceph_readpages_from_fscache(struct inode *inode,
......@@ -46,12 +46,11 @@ int ceph_readpages_from_fscache(struct inode *inode,
unsigned *nr_pages);
void ceph_readpage_to_fscache(struct inode *inode, struct page *page);
void ceph_invalidate_fscache_page(struct inode* inode, struct page *page);
void ceph_queue_revalidate(struct inode *inode);
static inline void ceph_fscache_update_objectsize(struct inode *inode)
static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci)
{
struct ceph_inode_info *ci = ceph_inode(inode);
fscache_attr_changed(ci->fscache);
ci->fscache = NULL;
ci->i_fscache_gen = 0;
}
static inline void ceph_fscache_invalidate(struct inode *inode)
......@@ -88,6 +87,11 @@ static inline void ceph_fscache_readpages_cancel(struct inode *inode,
return fscache_readpages_cancel(ci->fscache, pages);
}
static inline void ceph_disable_fscache_readpage(struct ceph_inode_info *ci)
{
ci->i_fscache_gen = ci->i_rdcache_gen - 1;
}
#else
static inline int ceph_fscache_register(void)
......@@ -112,8 +116,20 @@ static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci)
{
}
static inline void ceph_fscache_register_inode_cookie(struct ceph_fs_client* parent_fsc,
struct ceph_inode_info* ci)
static inline void ceph_fscache_register_inode_cookie(struct inode *inode)
{
}
static inline void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
{
}
static inline void ceph_fscache_file_set_cookie(struct inode *inode,
struct file *filp)
{
}
static inline void ceph_fscache_revalidate_cookie(struct ceph_inode_info *ci)
{
}
......@@ -141,10 +157,6 @@ static inline void ceph_readpage_to_fscache(struct inode *inode,
{
}
static inline void ceph_fscache_update_objectsize(struct inode *inode)
{
}
static inline void ceph_fscache_invalidate(struct inode *inode)
{
}
......@@ -154,10 +166,6 @@ static inline void ceph_invalidate_fscache_page(struct inode *inode,
{
}
static inline void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
{
}
static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
{
return 1;
......@@ -173,7 +181,7 @@ static inline void ceph_fscache_readpages_cancel(struct inode *inode,
{
}
static inline void ceph_queue_revalidate(struct inode *inode)
static inline void ceph_disable_fscache_readpage(struct ceph_inode_info *ci)
{
}
......
......@@ -2393,6 +2393,9 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
snap_rwsem_locked = true;
}
*got = need | (have & want);
if ((need & CEPH_CAP_FILE_RD) &&
!(*got & CEPH_CAP_FILE_CACHE))
ceph_disable_fscache_readpage(ci);
__take_cap_refs(ci, *got, true);
ret = 1;
}
......@@ -2554,6 +2557,9 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
break;
}
if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
ceph_fscache_revalidate_cookie(ci);
*got = _got;
return 0;
}
......@@ -2795,7 +2801,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
bool writeback = false;
bool queue_trunc = false;
bool queue_invalidate = false;
bool queue_revalidate = false;
bool deleted_inode = false;
bool fill_inline = false;
......@@ -2837,8 +2842,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
ci->i_rdcache_revoking = ci->i_rdcache_gen;
}
}
ceph_fscache_invalidate(inode);
}
/* side effects now are allowed */
......@@ -2880,11 +2883,6 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
}
}
/* Do we need to revalidate our fscache cookie. Don't bother on the
* first cache cap as we already validate at cookie creation time. */
if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
queue_revalidate = true;
if (newcaps & CEPH_CAP_ANY_RD) {
/* ctime/mtime/atime? */
ceph_decode_timespec(&mtime, &grant->mtime);
......@@ -2993,11 +2991,8 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
if (fill_inline)
ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
if (queue_trunc) {
if (queue_trunc)
ceph_queue_vmtruncate(inode);
ceph_queue_revalidate(inode);
} else if (queue_revalidate)
ceph_queue_revalidate(inode);
if (writeback)
/*
......@@ -3199,10 +3194,8 @@ static void handle_cap_trunc(struct inode *inode,
truncate_seq, truncate_size, size);
spin_unlock(&ci->i_ceph_lock);
if (queue_trunc) {
if (queue_trunc)
ceph_queue_vmtruncate(inode);
ceph_fscache_invalidate(inode);
}
}
/*
......
......@@ -137,23 +137,11 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)
{
struct ceph_file_info *cf;
int ret = 0;
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
struct ceph_mds_client *mdsc = fsc->mdsc;
switch (inode->i_mode & S_IFMT) {
case S_IFREG:
/* First file open request creates the cookie, we want to keep
* this cookie around for the filetime of the inode as not to
* have to worry about fscache register / revoke / operation
* races.
*
* Also, if we know the operation is going to invalidate data
* (non readonly) just nuke the cache right away.
*/
ceph_fscache_register_inode_cookie(mdsc->fsc, ci);
if ((fmode & CEPH_FILE_MODE_WR))
ceph_fscache_invalidate(inode);
ceph_fscache_register_inode_cookie(inode);
ceph_fscache_file_set_cookie(inode, file);
case S_IFDIR:
dout("init_file %p %p 0%o (regular)\n", inode, file,
inode->i_mode);
......@@ -1349,7 +1337,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
}
retry_snap:
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL)) {
if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL)) {
err = -ENOSPC;
goto out;
}
......@@ -1407,7 +1395,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
iov_iter_advance(from, written);
ceph_put_snap_context(snapc);
} else {
loff_t old_size = i_size_read(inode);
/*
* No need to acquire the i_truncate_mutex. Because
* the MDS revokes Fwb caps before sending truncate
......@@ -1418,8 +1405,6 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
written = generic_perform_write(file, from, pos);
if (likely(written >= 0))
iocb->ki_pos = pos + written;
if (i_size_read(inode) > old_size)
ceph_fscache_update_objectsize(inode);
inode_unlock(inode);
}
......@@ -1440,7 +1425,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from)
ceph_put_cap_refs(ci, got);
if (written >= 0) {
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))
if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_NEARFULL))
iocb->ki_flags |= IOCB_DSYNC;
written = generic_write_sync(iocb, written);
......@@ -1672,8 +1657,8 @@ static long ceph_fallocate(struct file *file, int mode,
goto unlock;
}
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) &&
!(mode & FALLOC_FL_PUNCH_HOLE)) {
if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) &&
!(mode & FALLOC_FL_PUNCH_HOLE)) {
ret = -ENOSPC;
goto unlock;
}
......
......@@ -103,7 +103,6 @@ struct ceph_fs_client {
#ifdef CONFIG_CEPH_FSCACHE
struct fscache_cookie *fscache;
struct workqueue_struct *revalidate_wq;
#endif
};
......@@ -360,8 +359,7 @@ struct ceph_inode_info {
#ifdef CONFIG_CEPH_FSCACHE
struct fscache_cookie *fscache;
u32 i_fscache_gen; /* sequence, for delayed fscache validate */
struct work_struct i_revalidate_work;
u32 i_fscache_gen;
#endif
struct inode vfs_inode; /* at end */
};
......
......@@ -887,6 +887,8 @@ void fscache_invalidate_writes(struct fscache_cookie *cookie)
put_page(results[i]);
}
wake_up_bit(&cookie->flags, 0);
_leave("");
}
......
......@@ -279,6 +279,11 @@ struct ceph_osd_client {
struct workqueue_struct *notify_wq;
};
static inline bool ceph_osdmap_flag(struct ceph_osd_client *osdc, int flag)
{
return osdc->osdmap->flags & flag;
}
extern int ceph_osdc_setup(void);
extern void ceph_osdc_cleanup(void);
......
......@@ -189,11 +189,6 @@ static inline bool ceph_osd_is_down(struct ceph_osdmap *map, int osd)
return !ceph_osd_is_up(map, osd);
}
static inline bool ceph_osdmap_flag(struct ceph_osdmap *map, int flag)
{
return map && (map->flags & flag);
}
extern char *ceph_osdmap_state_str(char *str, int len, int state);
extern u32 ceph_get_primary_affinity(struct ceph_osdmap *map, int osd);
......
......@@ -241,7 +241,7 @@ struct fscache_cache_ops {
/* check the consistency between the backing cache and the FS-Cache
* cookie */
bool (*check_consistency)(struct fscache_operation *op);
int (*check_consistency)(struct fscache_operation *op);
/* store the updated auxiliary data on an object */
void (*update_object)(struct fscache_object *object);
......
......@@ -1276,9 +1276,9 @@ static bool target_should_be_paused(struct ceph_osd_client *osdc,
const struct ceph_osd_request_target *t,
struct ceph_pg_pool_info *pi)
{
bool pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
bool pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
__pool_full(pi);
WARN_ON(pi->id != t->base_oloc.pool);
......@@ -1303,8 +1303,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
bool force_resend = false;
bool need_check_tiering = false;
bool need_resend = false;
bool sort_bitwise = ceph_osdmap_flag(osdc->osdmap,
CEPH_OSDMAP_SORTBITWISE);
bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
enum calc_target_result ct_res;
int ret;
......@@ -1540,9 +1539,9 @@ static void encode_request(struct ceph_osd_request *req, struct ceph_msg *msg)
*/
msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
dout("%s req %p oid %*pE oid_len %d front %zu data %u\n", __func__,
req, req->r_t.target_oid.name_len, req->r_t.target_oid.name,
req->r_t.target_oid.name_len, msg->front.iov_len, data_len);
dout("%s req %p oid %s oid_len %d front %zu data %u\n", __func__,
req, req->r_t.target_oid.name, req->r_t.target_oid.name_len,
msg->front.iov_len, data_len);
}
/*
......@@ -1590,9 +1589,9 @@ static void maybe_request_map(struct ceph_osd_client *osdc)
verify_osdc_locked(osdc);
WARN_ON(!osdc->osdmap->epoch);
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
dout("%s osdc %p continuous\n", __func__, osdc);
continuous = true;
} else {
......@@ -1629,19 +1628,19 @@ static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
}
if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
dout("req %p pausewr\n", req);
req->r_t.paused = true;
maybe_request_map(osdc);
} else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
dout("req %p pauserd\n", req);
req->r_t.paused = true;
maybe_request_map(osdc);
} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
!(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
CEPH_OSD_FLAG_FULL_FORCE)) &&
(ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
(ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
pool_full(osdc, req->r_t.base_oloc.pool))) {
dout("req %p full/pool_full\n", req);
pr_warn_ratelimited("FULL or reached pool quota\n");
......@@ -2280,7 +2279,7 @@ static void send_linger_ping(struct ceph_osd_linger_request *lreq)
struct ceph_osd_request *req = lreq->ping_req;
struct ceph_osd_req_op *op = &req->r_ops[0];
if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
dout("%s PAUSERD\n", __func__);
return;
}
......@@ -2893,6 +2892,9 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
dout("req %p tid %llu cb\n", req, req->r_tid);
__complete_request(req);
}
if (m.flags & CEPH_OSD_FLAG_ONDISK)
complete_all(&req->r_safe_completion);
ceph_osdc_put_request(req);
} else {
if (req->r_unsafe_callback) {
dout("req %p tid %llu unsafe-cb\n", req, req->r_tid);
......@@ -2901,10 +2903,7 @@ static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
WARN_ON(1);
}
}
if (m.flags & CEPH_OSD_FLAG_ONDISK)
complete_all(&req->r_safe_completion);
ceph_osdc_put_request(req);
return;
fail_request:
......@@ -3050,7 +3049,7 @@ static int handle_one_map(struct ceph_osd_client *osdc,
bool skipped_map = false;
bool was_full;
was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
set_pool_was_full(osdc);
if (incremental)
......@@ -3088,7 +3087,7 @@ static int handle_one_map(struct ceph_osd_client *osdc,
osdc->osdmap = newmap;
}
was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
need_resend, need_resend_linger);
......@@ -3174,9 +3173,9 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
if (ceph_check_fsid(osdc->client, &fsid) < 0)
goto bad;
was_pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
was_pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
have_pool_full(osdc);
/* incremental maps */
......@@ -3238,9 +3237,9 @@ void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
* we find out when we are no longer full and stop returning
* ENOSPC.
*/
pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
have_pool_full(osdc);
if (was_pauserd || was_pausewr || pauserd || pausewr)
maybe_request_map(osdc);
......
......@@ -1778,8 +1778,8 @@ int ceph_object_locator_to_pg(struct ceph_osdmap *osdmap,
raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
oid->name_len);
dout("%s %*pE -> raw_pgid %llu.%x\n", __func__, oid->name_len,
oid->name, raw_pgid->pool, raw_pgid->seed);
dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
raw_pgid->pool, raw_pgid->seed);
return 0;
}
EXPORT_SYMBOL(ceph_object_locator_to_pg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment