Commit ffdb8f1b authored by Linus Torvalds's avatar Linus Torvalds

Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: unwind canceled flock state
  ceph: fix ENOENT logic in striped_read
  ceph: fix short sync reads from the OSD
  ceph: fix sync vs canceled write
  ceph: use ihold when we already have an inode ref
parents 80dadf86 0c1f91f2
......@@ -453,7 +453,7 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)
int err;
struct inode *inode = page->mapping->host;
BUG_ON(!inode);
igrab(inode);
ihold(inode);
err = writepage_nounlock(page, wbc);
unlock_page(page);
iput(inode);
......
......@@ -2940,14 +2940,12 @@ void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
while (!list_empty(&mdsc->cap_dirty)) {
ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info,
i_dirty_item);
inode = igrab(&ci->vfs_inode);
inode = &ci->vfs_inode;
ihold(inode);
dout("flush_dirty_caps %p\n", inode);
spin_unlock(&mdsc->cap_dirty_lock);
if (inode) {
ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH,
NULL);
ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL);
iput(inode);
}
spin_lock(&mdsc->cap_dirty_lock);
}
spin_unlock(&mdsc->cap_dirty_lock);
......
......@@ -308,7 +308,8 @@ static int ceph_readdir(struct file *filp, void *dirent, filldir_t filldir)
req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
req->r_inode = inode;
ihold(inode);
req->r_dentry = dget(filp->f_dentry);
/* hints to request -> mds selection code */
req->r_direct_mode = USE_AUTH_MDS;
......@@ -787,10 +788,12 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
err = ceph_mdsc_do_request(mdsc, dir, req);
if (err)
if (err) {
d_drop(dentry);
else if (!req->r_reply_info.head->is_dentry)
d_instantiate(dentry, igrab(old_dentry->d_inode));
} else if (!req->r_reply_info.head->is_dentry) {
ihold(old_dentry->d_inode);
d_instantiate(dentry, old_dentry->d_inode);
}
ceph_mdsc_put_request(req);
return err;
}
......
......@@ -109,7 +109,7 @@ static struct dentry *__fh_to_dentry(struct super_block *sb,
err = ceph_mdsc_do_request(mdsc, NULL, req);
inode = req->r_target_inode;
if (inode)
igrab(inode);
ihold(inode);
ceph_mdsc_put_request(req);
if (!inode)
return ERR_PTR(-ESTALE);
......@@ -167,7 +167,7 @@ static struct dentry *__cfh_to_dentry(struct super_block *sb,
err = ceph_mdsc_do_request(mdsc, NULL, req);
inode = req->r_target_inode;
if (inode)
igrab(inode);
ihold(inode);
ceph_mdsc_put_request(req);
if (!inode)
return ERR_PTR(err ? err : -ESTALE);
......
......@@ -191,7 +191,8 @@ int ceph_open(struct inode *inode, struct file *file)
err = PTR_ERR(req);
goto out;
}
req->r_inode = igrab(inode);
req->r_inode = inode;
ihold(inode);
req->r_num_caps = 1;
err = ceph_mdsc_do_request(mdsc, parent_inode, req);
if (!err)
......@@ -282,7 +283,7 @@ int ceph_release(struct inode *inode, struct file *file)
static int striped_read(struct inode *inode,
u64 off, u64 len,
struct page **pages, int num_pages,
int *checkeof, bool align_to_pages,
int *checkeof, bool o_direct,
unsigned long buf_align)
{
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
......@@ -307,7 +308,7 @@ static int striped_read(struct inode *inode,
io_align = off & ~PAGE_MASK;
more:
if (align_to_pages)
if (o_direct)
page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
else
page_align = pos & ~PAGE_MASK;
......@@ -317,10 +318,10 @@ static int striped_read(struct inode *inode,
ci->i_truncate_seq,
ci->i_truncate_size,
page_pos, pages_left, page_align);
hit_stripe = this_len < left;
was_short = ret >= 0 && ret < this_len;
if (ret == -ENOENT)
ret = 0;
hit_stripe = this_len < left;
was_short = ret >= 0 && ret < this_len;
dout("striped_read %llu~%u (read %u) got %d%s%s\n", pos, left, read,
ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
......@@ -345,20 +346,22 @@ static int striped_read(struct inode *inode,
}
if (was_short) {
/* was original extent fully inside i_size? */
if (pos + left <= inode->i_size) {
dout("zero tail\n");
ceph_zero_page_vector_range(page_off + read, len - read,
/* did we bounce off eof? */
if (pos + left > inode->i_size)
*checkeof = 1;
/* zero trailing bytes (inside i_size) */
if (left > 0 && pos < inode->i_size) {
if (pos + left > inode->i_size)
left = inode->i_size - pos;
dout("zero tail %d\n", left);
ceph_zero_page_vector_range(page_off + read, left,
pages);
read = len;
goto out;
read += left;
}
/* check i_size */
*checkeof = 1;
}
out:
if (ret >= 0)
ret = read;
dout("striped_read returns %d\n", ret);
......@@ -658,7 +661,7 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
/* hit EOF or hole? */
if (statret == 0 && *ppos < inode->i_size) {
dout("aio_read sync_read hit hole, reading more\n");
dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size);
read += ret;
base += ret;
len -= ret;
......
......@@ -1101,10 +1101,10 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
goto done;
}
req->r_dentry = dn; /* may have spliced */
igrab(in);
ihold(in);
} else if (ceph_ino(in) == vino.ino &&
ceph_snap(in) == vino.snap) {
igrab(in);
ihold(in);
} else {
dout(" %p links to %p %llx.%llx, not %llx.%llx\n",
dn, in, ceph_ino(in), ceph_snap(in),
......@@ -1144,7 +1144,7 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req,
goto done;
}
req->r_dentry = dn; /* may have spliced */
igrab(in);
ihold(in);
rinfo->head->is_dentry = 1; /* fool notrace handlers */
}
......@@ -1328,7 +1328,7 @@ void ceph_queue_writeback(struct inode *inode)
if (queue_work(ceph_inode_to_client(inode)->wb_wq,
&ceph_inode(inode)->i_wb_work)) {
dout("ceph_queue_writeback %p\n", inode);
igrab(inode);
ihold(inode);
} else {
dout("ceph_queue_writeback %p failed\n", inode);
}
......@@ -1353,7 +1353,7 @@ void ceph_queue_invalidate(struct inode *inode)
if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
&ceph_inode(inode)->i_pg_inv_work)) {
dout("ceph_queue_invalidate %p\n", inode);
igrab(inode);
ihold(inode);
} else {
dout("ceph_queue_invalidate %p failed\n", inode);
}
......@@ -1477,7 +1477,7 @@ void ceph_queue_vmtruncate(struct inode *inode)
if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
&ci->i_vmtruncate_work)) {
dout("ceph_queue_vmtruncate %p\n", inode);
igrab(inode);
ihold(inode);
} else {
dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
inode, ci->i_truncate_pending);
......@@ -1738,7 +1738,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
__mark_inode_dirty(inode, inode_dirty_flags);
if (mask) {
req->r_inode = igrab(inode);
req->r_inode = inode;
ihold(inode);
req->r_inode_drop = release;
req->r_args.setattr.mask = cpu_to_le32(mask);
req->r_num_caps = 1;
......@@ -1779,7 +1780,8 @@ int ceph_do_getattr(struct inode *inode, int mask)
req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_GETATTR, USE_ANY_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
req->r_inode = inode;
ihold(inode);
req->r_num_caps = 1;
req->r_args.getattr.mask = cpu_to_le32(mask);
err = ceph_mdsc_do_request(mdsc, NULL, req);
......
......@@ -73,7 +73,8 @@ static long ceph_ioctl_set_layout(struct file *file, void __user *arg)
USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
req->r_inode = inode;
ihold(inode);
req->r_inode_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_FILE_EXCL;
req->r_args.setlayout.layout.fl_stripe_unit =
......@@ -135,7 +136,8 @@ static long ceph_ioctl_set_layout_policy (struct file *file, void __user *arg)
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
req->r_inode = inode;
ihold(inode);
req->r_args.setlayout.layout.fl_stripe_unit =
cpu_to_le32(l.stripe_unit);
......
......@@ -23,7 +23,8 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
req = ceph_mdsc_create_request(mdsc, operation, USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
req->r_inode = inode;
ihold(inode);
/* mds requires start and length rather than start and end */
if (LLONG_MAX == fl->fl_end)
......@@ -32,11 +33,10 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
length = fl->fl_end - fl->fl_start + 1;
dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
"length: %llu, wait: %d, type`: %d", (int)lock_type,
"length: %llu, wait: %d, type: %d", (int)lock_type,
(int)operation, (u64)fl->fl_pid, fl->fl_start,
length, wait, fl->fl_type);
req->r_args.filelock_change.rule = lock_type;
req->r_args.filelock_change.type = cmd;
req->r_args.filelock_change.pid = cpu_to_le64((u64)fl->fl_pid);
......@@ -70,7 +70,7 @@ static int ceph_lock_message(u8 lock_type, u16 operation, struct file *file,
}
ceph_mdsc_put_request(req);
dout("ceph_lock_message: rule: %d, op: %d, pid: %llu, start: %llu, "
"length: %llu, wait: %d, type`: %d, err code %d", (int)lock_type,
"length: %llu, wait: %d, type: %d, err code %d", (int)lock_type,
(int)operation, (u64)fl->fl_pid, fl->fl_start,
length, wait, fl->fl_type, err);
return err;
......@@ -109,16 +109,20 @@ int ceph_lock(struct file *file, int cmd, struct file_lock *fl)
dout("mds locked, locking locally");
err = posix_lock_file(file, fl, NULL);
if (err && (CEPH_MDS_OP_SETFILELOCK == op)) {
/* undo! This should only happen if the kernel detects
* local deadlock. */
/* undo! This should only happen if
* the kernel detects local
* deadlock. */
ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
CEPH_LOCK_UNLOCK, 0, fl);
dout("got %d on posix_lock_file, undid lock", err);
dout("got %d on posix_lock_file, undid lock",
err);
}
}
} else {
dout("mds returned error code %d", err);
} else if (err == -ERESTARTSYS) {
dout("undoing lock\n");
ceph_lock_message(CEPH_LOCK_FCNTL, op, file,
CEPH_LOCK_UNLOCK, 0, fl);
}
return err;
}
......@@ -155,8 +159,11 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
file, CEPH_LOCK_UNLOCK, 0, fl);
dout("got %d on flock_lock_file_wait, undid lock", err);
}
} else {
dout("mds error code %d", err);
} else if (err == -ERESTARTSYS) {
dout("undoing lock\n");
ceph_lock_message(CEPH_LOCK_FLOCK,
CEPH_MDS_OP_SETFILELOCK,
file, CEPH_LOCK_UNLOCK, 0, fl);
}
return err;
}
......
......@@ -722,7 +722,7 @@ static void flush_snaps(struct ceph_mds_client *mdsc)
ci = list_first_entry(&mdsc->snap_flush_list,
struct ceph_inode_info, i_snap_flush_item);
inode = &ci->vfs_inode;
igrab(inode);
ihold(inode);
spin_unlock(&mdsc->snap_flush_lock);
spin_lock(&inode->i_lock);
__ceph_flush_snaps(ci, &session, 0);
......
......@@ -665,7 +665,8 @@ static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
err = PTR_ERR(req);
goto out;
}
req->r_inode = igrab(inode);
req->r_inode = inode;
ihold(inode);
req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
req->r_num_caps = 1;
req->r_args.setxattr.flags = cpu_to_le32(flags);
......@@ -795,7 +796,8 @@ static int ceph_send_removexattr(struct dentry *dentry, const char *name)
USE_AUTH_MDS);
if (IS_ERR(req))
return PTR_ERR(req);
req->r_inode = igrab(inode);
req->r_inode = inode;
ihold(inode);
req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
req->r_num_caps = 1;
req->r_path2 = kstrdup(name, GFP_NOFS);
......
......@@ -1144,6 +1144,13 @@ static void handle_osds_timeout(struct work_struct *work)
round_jiffies_relative(delay));
}
static void complete_request(struct ceph_osd_request *req)
{
if (req->r_safe_callback)
req->r_safe_callback(req, NULL);
complete_all(&req->r_safe_completion); /* fsync waiter */
}
/*
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
......@@ -1226,11 +1233,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
else
complete_all(&req->r_completion);
if (flags & CEPH_OSD_FLAG_ONDISK) {
if (req->r_safe_callback)
req->r_safe_callback(req, msg);
complete_all(&req->r_safe_completion); /* fsync waiter */
}
if (flags & CEPH_OSD_FLAG_ONDISK)
complete_request(req);
done:
dout("req=%p req->r_linger=%d\n", req, req->r_linger);
......@@ -1732,6 +1736,7 @@ int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
__cancel_request(req);
__unregister_request(osdc, req);
mutex_unlock(&osdc->request_mutex);
complete_request(req);
dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
return rc;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment