Commit 4e563944 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'ceph-for-5.3-rc6' of git://github.com/ceph/ceph-client

Pull ceph fixes from Ilya Dryomov:
 "Three important fixes tagged for stable (an indefinite hang, a crash
  on an assert and a NULL pointer dereference) plus a small series from
  Luis fixing instances of vfree() under spinlock"

* tag 'ceph-for-5.3-rc6' of git://github.com/ceph/ceph-client:
  libceph: fix PG split vs OSD (re)connect race
  ceph: don't try fill file_lock on unsuccessful GETFILELOCK reply
  ceph: clear page dirty before invalidate page
  ceph: fix buffer free while holding i_ceph_lock in fill_inode()
  ceph: fix buffer free while holding i_ceph_lock in __ceph_build_xattrs_blob()
  ceph: fix buffer free while holding i_ceph_lock in __ceph_setxattr()
  libceph: allow ceph_buffer_put() to receive a NULL ceph_buffer
parents 1374a22e a5613724
...@@ -913,8 +913,9 @@ static int ceph_writepages_start(struct address_space *mapping, ...@@ -913,8 +913,9 @@ static int ceph_writepages_start(struct address_space *mapping,
if (page_offset(page) >= ceph_wbc.i_size) { if (page_offset(page) >= ceph_wbc.i_size) {
dout("%p page eof %llu\n", dout("%p page eof %llu\n",
page, ceph_wbc.i_size); page, ceph_wbc.i_size);
if (ceph_wbc.size_stable || if ((ceph_wbc.size_stable ||
page_offset(page) >= i_size_read(inode)) page_offset(page) >= i_size_read(inode)) &&
clear_page_dirty_for_io(page))
mapping->a_ops->invalidatepage(page, mapping->a_ops->invalidatepage(page,
0, PAGE_SIZE); 0, PAGE_SIZE);
unlock_page(page); unlock_page(page);
......
...@@ -1301,6 +1301,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ...@@ -1301,6 +1301,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
{ {
struct ceph_inode_info *ci = cap->ci; struct ceph_inode_info *ci = cap->ci;
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
struct ceph_buffer *old_blob = NULL;
struct cap_msg_args arg; struct cap_msg_args arg;
int held, revoking; int held, revoking;
int wake = 0; int wake = 0;
...@@ -1365,7 +1366,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ...@@ -1365,7 +1366,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
ci->i_requested_max_size = arg.max_size; ci->i_requested_max_size = arg.max_size;
if (flushing & CEPH_CAP_XATTR_EXCL) { if (flushing & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci); old_blob = __ceph_build_xattrs_blob(ci);
arg.xattr_version = ci->i_xattrs.version; arg.xattr_version = ci->i_xattrs.version;
arg.xattr_buf = ci->i_xattrs.blob; arg.xattr_buf = ci->i_xattrs.blob;
} else { } else {
...@@ -1409,6 +1410,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, ...@@ -1409,6 +1410,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ceph_buffer_put(old_blob);
ret = send_cap_msg(&arg); ret = send_cap_msg(&arg);
if (ret < 0) { if (ret < 0) {
dout("error sending cap msg, must requeue %p\n", inode); dout("error sending cap msg, must requeue %p\n", inode);
......
...@@ -736,6 +736,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ...@@ -736,6 +736,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
int issued, new_issued, info_caps; int issued, new_issued, info_caps;
struct timespec64 mtime, atime, ctime; struct timespec64 mtime, atime, ctime;
struct ceph_buffer *xattr_blob = NULL; struct ceph_buffer *xattr_blob = NULL;
struct ceph_buffer *old_blob = NULL;
struct ceph_string *pool_ns = NULL; struct ceph_string *pool_ns = NULL;
struct ceph_cap *new_cap = NULL; struct ceph_cap *new_cap = NULL;
int err = 0; int err = 0;
...@@ -881,7 +882,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ...@@ -881,7 +882,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) && if ((ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) &&
le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) { le64_to_cpu(info->xattr_version) > ci->i_xattrs.version) {
if (ci->i_xattrs.blob) if (ci->i_xattrs.blob)
ceph_buffer_put(ci->i_xattrs.blob); old_blob = ci->i_xattrs.blob;
ci->i_xattrs.blob = xattr_blob; ci->i_xattrs.blob = xattr_blob;
if (xattr_blob) if (xattr_blob)
memcpy(ci->i_xattrs.blob->vec.iov_base, memcpy(ci->i_xattrs.blob->vec.iov_base,
...@@ -1022,8 +1023,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page, ...@@ -1022,8 +1023,8 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
out: out:
if (new_cap) if (new_cap)
ceph_put_cap(mdsc, new_cap); ceph_put_cap(mdsc, new_cap);
if (xattr_blob) ceph_buffer_put(old_blob);
ceph_buffer_put(xattr_blob); ceph_buffer_put(xattr_blob);
ceph_put_string(pool_ns); ceph_put_string(pool_ns);
return err; return err;
} }
......
...@@ -111,8 +111,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode, ...@@ -111,8 +111,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct inode *inode,
req->r_wait_for_completion = ceph_lock_wait_for_completion; req->r_wait_for_completion = ceph_lock_wait_for_completion;
err = ceph_mdsc_do_request(mdsc, inode, req); err = ceph_mdsc_do_request(mdsc, inode, req);
if (!err && operation == CEPH_MDS_OP_GETFILELOCK) {
if (operation == CEPH_MDS_OP_GETFILELOCK) {
fl->fl_pid = -le64_to_cpu(req->r_reply_info.filelock_reply->pid); fl->fl_pid = -le64_to_cpu(req->r_reply_info.filelock_reply->pid);
if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type) if (CEPH_LOCK_SHARED == req->r_reply_info.filelock_reply->type)
fl->fl_type = F_RDLCK; fl->fl_type = F_RDLCK;
......
...@@ -465,6 +465,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -465,6 +465,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap; struct ceph_cap_snap *capsnap;
struct ceph_snap_context *old_snapc, *new_snapc; struct ceph_snap_context *old_snapc, *new_snapc;
struct ceph_buffer *old_blob = NULL;
int used, dirty; int used, dirty;
capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS); capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
...@@ -541,7 +542,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -541,7 +542,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
capsnap->gid = inode->i_gid; capsnap->gid = inode->i_gid;
if (dirty & CEPH_CAP_XATTR_EXCL) { if (dirty & CEPH_CAP_XATTR_EXCL) {
__ceph_build_xattrs_blob(ci); old_blob = __ceph_build_xattrs_blob(ci);
capsnap->xattr_blob = capsnap->xattr_blob =
ceph_buffer_get(ci->i_xattrs.blob); ceph_buffer_get(ci->i_xattrs.blob);
capsnap->xattr_version = ci->i_xattrs.version; capsnap->xattr_version = ci->i_xattrs.version;
...@@ -584,6 +585,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) ...@@ -584,6 +585,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ceph_buffer_put(old_blob);
kfree(capsnap); kfree(capsnap);
ceph_put_snap_context(old_snapc); ceph_put_snap_context(old_snapc);
} }
......
...@@ -926,7 +926,7 @@ extern int ceph_getattr(const struct path *path, struct kstat *stat, ...@@ -926,7 +926,7 @@ extern int ceph_getattr(const struct path *path, struct kstat *stat,
int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int); int __ceph_setxattr(struct inode *, const char *, const void *, size_t, int);
ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t); ssize_t __ceph_getxattr(struct inode *, const char *, void *, size_t);
extern ssize_t ceph_listxattr(struct dentry *, char *, size_t); extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci); extern struct ceph_buffer *__ceph_build_xattrs_blob(struct ceph_inode_info *ci);
extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci); extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
extern const struct xattr_handler *ceph_xattr_handlers[]; extern const struct xattr_handler *ceph_xattr_handlers[];
......
...@@ -754,12 +754,15 @@ static int __get_required_blob_size(struct ceph_inode_info *ci, int name_size, ...@@ -754,12 +754,15 @@ static int __get_required_blob_size(struct ceph_inode_info *ci, int name_size,
/* /*
* If there are dirty xattrs, reencode xattrs into the prealloc_blob * If there are dirty xattrs, reencode xattrs into the prealloc_blob
* and swap into place. * and swap into place. It returns the old i_xattrs.blob (or NULL) so
* that it can be freed by the caller as the i_ceph_lock is likely to be
* held.
*/ */
void __ceph_build_xattrs_blob(struct ceph_inode_info *ci) struct ceph_buffer *__ceph_build_xattrs_blob(struct ceph_inode_info *ci)
{ {
struct rb_node *p; struct rb_node *p;
struct ceph_inode_xattr *xattr = NULL; struct ceph_inode_xattr *xattr = NULL;
struct ceph_buffer *old_blob = NULL;
void *dest; void *dest;
dout("__build_xattrs_blob %p\n", &ci->vfs_inode); dout("__build_xattrs_blob %p\n", &ci->vfs_inode);
...@@ -790,12 +793,14 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci) ...@@ -790,12 +793,14 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
dest - ci->i_xattrs.prealloc_blob->vec.iov_base; dest - ci->i_xattrs.prealloc_blob->vec.iov_base;
if (ci->i_xattrs.blob) if (ci->i_xattrs.blob)
ceph_buffer_put(ci->i_xattrs.blob); old_blob = ci->i_xattrs.blob;
ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob; ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob;
ci->i_xattrs.prealloc_blob = NULL; ci->i_xattrs.prealloc_blob = NULL;
ci->i_xattrs.dirty = false; ci->i_xattrs.dirty = false;
ci->i_xattrs.version++; ci->i_xattrs.version++;
} }
return old_blob;
} }
static inline int __get_request_mask(struct inode *in) { static inline int __get_request_mask(struct inode *in) {
...@@ -1036,6 +1041,7 @@ int __ceph_setxattr(struct inode *inode, const char *name, ...@@ -1036,6 +1041,7 @@ int __ceph_setxattr(struct inode *inode, const char *name,
struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
struct ceph_cap_flush *prealloc_cf = NULL; struct ceph_cap_flush *prealloc_cf = NULL;
struct ceph_buffer *old_blob = NULL;
int issued; int issued;
int err; int err;
int dirty = 0; int dirty = 0;
...@@ -1109,13 +1115,15 @@ int __ceph_setxattr(struct inode *inode, const char *name, ...@@ -1109,13 +1115,15 @@ int __ceph_setxattr(struct inode *inode, const char *name,
struct ceph_buffer *blob; struct ceph_buffer *blob;
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
dout(" preaallocating new blob size=%d\n", required_blob_size); ceph_buffer_put(old_blob); /* Shouldn't be required */
dout(" pre-allocating new blob size=%d\n", required_blob_size);
blob = ceph_buffer_new(required_blob_size, GFP_NOFS); blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
if (!blob) if (!blob)
goto do_sync_unlocked; goto do_sync_unlocked;
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
/* prealloc_blob can't be released while holding i_ceph_lock */
if (ci->i_xattrs.prealloc_blob) if (ci->i_xattrs.prealloc_blob)
ceph_buffer_put(ci->i_xattrs.prealloc_blob); old_blob = ci->i_xattrs.prealloc_blob;
ci->i_xattrs.prealloc_blob = blob; ci->i_xattrs.prealloc_blob = blob;
goto retry; goto retry;
} }
...@@ -1131,6 +1139,7 @@ int __ceph_setxattr(struct inode *inode, const char *name, ...@@ -1131,6 +1139,7 @@ int __ceph_setxattr(struct inode *inode, const char *name,
} }
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
ceph_buffer_put(old_blob);
if (lock_snap_rwsem) if (lock_snap_rwsem)
up_read(&mdsc->snap_rwsem); up_read(&mdsc->snap_rwsem);
if (dirty) if (dirty)
......
...@@ -30,7 +30,8 @@ static inline struct ceph_buffer *ceph_buffer_get(struct ceph_buffer *b) ...@@ -30,7 +30,8 @@ static inline struct ceph_buffer *ceph_buffer_get(struct ceph_buffer *b)
static inline void ceph_buffer_put(struct ceph_buffer *b) static inline void ceph_buffer_put(struct ceph_buffer *b)
{ {
kref_put(&b->kref, ceph_buffer_release); if (b)
kref_put(&b->kref, ceph_buffer_release);
} }
extern int ceph_decode_buffer(struct ceph_buffer **b, void **p, void *end); extern int ceph_decode_buffer(struct ceph_buffer **b, void **p, void *end);
......
...@@ -1496,7 +1496,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, ...@@ -1496,7 +1496,7 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
struct ceph_osds up, acting; struct ceph_osds up, acting;
bool force_resend = false; bool force_resend = false;
bool unpaused = false; bool unpaused = false;
bool legacy_change; bool legacy_change = false;
bool split = false; bool split = false;
bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE); bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
bool recovery_deletes = ceph_osdmap_flag(osdc, bool recovery_deletes = ceph_osdmap_flag(osdc,
...@@ -1584,15 +1584,14 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc, ...@@ -1584,15 +1584,14 @@ static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
t->osd = acting.primary; t->osd = acting.primary;
} }
if (unpaused || legacy_change || force_resend || if (unpaused || legacy_change || force_resend || split)
(split && con && CEPH_HAVE_FEATURE(con->peer_features,
RESEND_ON_SPLIT)))
ct_res = CALC_TARGET_NEED_RESEND; ct_res = CALC_TARGET_NEED_RESEND;
else else
ct_res = CALC_TARGET_NO_ACTION; ct_res = CALC_TARGET_NO_ACTION;
out: out:
dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd); dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
legacy_change, force_resend, split, ct_res, t->osd);
return ct_res; return ct_res;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment