Commit c58d3aeb authored by Andrew Morton's avatar Andrew Morton Committed by Linus Torvalds

[PATCH] AIO+DIO bio_count race fix

From: Suparna Bhattacharya <suparna@in.ibm.com>,
      Daniel McNeil <daniel@osdl.org>

This patch ensures that when the DIO code falls back to buffered i/o after
having submitted part of the i/o, then buffered i/o is issued only for the
remaining part of the request (i.e.  the part not already covered by DIO),
rather than redo the entire i/o.  Now, instead of returning written ==
-ENOTBLK, generic_file_direct_IO returns the number of bytes already handled
by DIO, so that the caller knows how much of the I/O is left to be handled
via fallback to buffered write.

We need to careful not to access dio fields if its possible that the dio
could already have been freed asynchronously during i/o completion.  A tricky
part of this involves plugging the window between the decrement of bio_count
and accessing dio->waiter during i/o completion where the dio could get freed
by the submission path.  This potential "bio_count race" was tackled (by
Daniel) by changing bio_list_lock into bio_lock and using that for all the
bio fields.  Now bio_count and bios_in_flight have been converted from
atomics into int and are both protected by the bio_lock.  The race in
finished_one_bio() could thus be fixed by leaving the bio_count at 1 until
after the dio_complete() and then doing the bio_count decrement and wakeup
holding the bio_lock.  It appears that shifting to the spin_lock instead of
atomic_inc/decs is ok performance wise as well.

Update:

An AIO O_DIRECT request was extending the file so it was done
synchronously.  However, the request got an EFAULT and direct_io_worker()
was calling aio_complete() on the iocb and returning the EFAULT.  When
io_submit_one() got the EFAULT return, it assume it had to call
aio_complete() since the i/o never got queued.

The fix is for direct_io_worker() to only call aio_complete() when the
upper layer is going to return -EIOCBQUEUED and not when getting errors
that are being return to the submit path.
parent 332c8cf1
...@@ -74,6 +74,7 @@ struct dio { ...@@ -74,6 +74,7 @@ struct dio {
been performed at the start of a been performed at the start of a
write */ write */
int pages_in_io; /* approximate total IO pages */ int pages_in_io; /* approximate total IO pages */
size_t size; /* total request size (doesn't change)*/
sector_t block_in_file; /* Current offset into the underlying sector_t block_in_file; /* Current offset into the underlying
file in dio_block units. */ file in dio_block units. */
unsigned blocks_available; /* At block_in_file. changes */ unsigned blocks_available; /* At block_in_file. changes */
...@@ -115,9 +116,9 @@ struct dio { ...@@ -115,9 +116,9 @@ struct dio {
int page_errors; /* errno from get_user_pages() */ int page_errors; /* errno from get_user_pages() */
/* BIO completion state */ /* BIO completion state */
atomic_t bio_count; /* nr bios to be completed */ spinlock_t bio_lock; /* protects BIO fields below */
atomic_t bios_in_flight; /* nr bios in flight */ int bio_count; /* nr bios to be completed */
spinlock_t bio_list_lock; /* protects bio_list */ int bios_in_flight; /* nr bios in flight */
struct bio *bio_list; /* singly linked via bi_private */ struct bio *bio_list; /* singly linked via bi_private */
struct task_struct *waiter; /* waiting task (NULL if none) */ struct task_struct *waiter; /* waiting task (NULL if none) */
...@@ -221,20 +222,38 @@ static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes) ...@@ -221,20 +222,38 @@ static void dio_complete(struct dio *dio, loff_t offset, ssize_t bytes)
*/ */
static void finished_one_bio(struct dio *dio) static void finished_one_bio(struct dio *dio)
{ {
if (atomic_dec_and_test(&dio->bio_count)) { unsigned long flags;
spin_lock_irqsave(&dio->bio_lock, flags);
if (dio->bio_count == 1) {
if (dio->is_async) { if (dio->is_async) {
/*
* Last reference to the dio is going away.
* Drop spinlock and complete the DIO.
*/
spin_unlock_irqrestore(&dio->bio_lock, flags);
dio_complete(dio, dio->block_in_file << dio->blkbits, dio_complete(dio, dio->block_in_file << dio->blkbits,
dio->result); dio->result);
/* Complete AIO later if falling back to buffered i/o */ /* Complete AIO later if falling back to buffered i/o */
if (dio->result != -ENOTBLK) { if (dio->result == dio->size || dio->rw == READ) {
aio_complete(dio->iocb, dio->result, 0); aio_complete(dio->iocb, dio->result, 0);
kfree(dio); kfree(dio);
return;
} else { } else {
/*
* Falling back to buffered
*/
spin_lock_irqsave(&dio->bio_lock, flags);
dio->bio_count--;
if (dio->waiter) if (dio->waiter)
wake_up_process(dio->waiter); wake_up_process(dio->waiter);
spin_unlock_irqrestore(&dio->bio_lock, flags);
return;
} }
} }
} }
dio->bio_count--;
spin_unlock_irqrestore(&dio->bio_lock, flags);
} }
static int dio_bio_complete(struct dio *dio, struct bio *bio); static int dio_bio_complete(struct dio *dio, struct bio *bio);
...@@ -268,13 +287,13 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error) ...@@ -268,13 +287,13 @@ static int dio_bio_end_io(struct bio *bio, unsigned int bytes_done, int error)
if (bio->bi_size) if (bio->bi_size)
return 1; return 1;
spin_lock_irqsave(&dio->bio_list_lock, flags); spin_lock_irqsave(&dio->bio_lock, flags);
bio->bi_private = dio->bio_list; bio->bi_private = dio->bio_list;
dio->bio_list = bio; dio->bio_list = bio;
atomic_dec(&dio->bios_in_flight); dio->bios_in_flight--;
if (dio->waiter && atomic_read(&dio->bios_in_flight) == 0) if (dio->waiter && dio->bios_in_flight == 0)
wake_up_process(dio->waiter); wake_up_process(dio->waiter);
spin_unlock_irqrestore(&dio->bio_list_lock, flags); spin_unlock_irqrestore(&dio->bio_lock, flags);
return 0; return 0;
} }
...@@ -307,10 +326,13 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev, ...@@ -307,10 +326,13 @@ dio_bio_alloc(struct dio *dio, struct block_device *bdev,
static void dio_bio_submit(struct dio *dio) static void dio_bio_submit(struct dio *dio)
{ {
struct bio *bio = dio->bio; struct bio *bio = dio->bio;
unsigned long flags;
bio->bi_private = dio; bio->bi_private = dio;
atomic_inc(&dio->bio_count); spin_lock_irqsave(&dio->bio_lock, flags);
atomic_inc(&dio->bios_in_flight); dio->bio_count++;
dio->bios_in_flight++;
spin_unlock_irqrestore(&dio->bio_lock, flags);
if (dio->is_async && dio->rw == READ) if (dio->is_async && dio->rw == READ)
bio_set_pages_dirty(bio); bio_set_pages_dirty(bio);
submit_bio(dio->rw, bio); submit_bio(dio->rw, bio);
...@@ -336,22 +358,22 @@ static struct bio *dio_await_one(struct dio *dio) ...@@ -336,22 +358,22 @@ static struct bio *dio_await_one(struct dio *dio)
unsigned long flags; unsigned long flags;
struct bio *bio; struct bio *bio;
spin_lock_irqsave(&dio->bio_list_lock, flags); spin_lock_irqsave(&dio->bio_lock, flags);
while (dio->bio_list == NULL) { while (dio->bio_list == NULL) {
set_current_state(TASK_UNINTERRUPTIBLE); set_current_state(TASK_UNINTERRUPTIBLE);
if (dio->bio_list == NULL) { if (dio->bio_list == NULL) {
dio->waiter = current; dio->waiter = current;
spin_unlock_irqrestore(&dio->bio_list_lock, flags); spin_unlock_irqrestore(&dio->bio_lock, flags);
blk_run_queues(); blk_run_queues();
io_schedule(); io_schedule();
spin_lock_irqsave(&dio->bio_list_lock, flags); spin_lock_irqsave(&dio->bio_lock, flags);
dio->waiter = NULL; dio->waiter = NULL;
} }
set_current_state(TASK_RUNNING); set_current_state(TASK_RUNNING);
} }
bio = dio->bio_list; bio = dio->bio_list;
dio->bio_list = bio->bi_private; dio->bio_list = bio->bi_private;
spin_unlock_irqrestore(&dio->bio_list_lock, flags); spin_unlock_irqrestore(&dio->bio_lock, flags);
return bio; return bio;
} }
...@@ -393,7 +415,12 @@ static int dio_await_completion(struct dio *dio) ...@@ -393,7 +415,12 @@ static int dio_await_completion(struct dio *dio)
if (dio->bio) if (dio->bio)
dio_bio_submit(dio); dio_bio_submit(dio);
while (atomic_read(&dio->bio_count)) { /*
* The bio_lock is not held for the read of bio_count.
* This is ok since it is the dio_bio_complete() that changes
* bio_count.
*/
while (dio->bio_count) {
struct bio *bio = dio_await_one(dio); struct bio *bio = dio_await_one(dio);
int ret2; int ret2;
...@@ -420,10 +447,10 @@ static int dio_bio_reap(struct dio *dio) ...@@ -420,10 +447,10 @@ static int dio_bio_reap(struct dio *dio)
unsigned long flags; unsigned long flags;
struct bio *bio; struct bio *bio;
spin_lock_irqsave(&dio->bio_list_lock, flags); spin_lock_irqsave(&dio->bio_lock, flags);
bio = dio->bio_list; bio = dio->bio_list;
dio->bio_list = bio->bi_private; dio->bio_list = bio->bi_private;
spin_unlock_irqrestore(&dio->bio_list_lock, flags); spin_unlock_irqrestore(&dio->bio_lock, flags);
ret = dio_bio_complete(dio, bio); ret = dio_bio_complete(dio, bio);
} }
dio->reap_counter = 0; dio->reap_counter = 0;
...@@ -889,6 +916,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, ...@@ -889,6 +916,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
dio->blkbits = blkbits; dio->blkbits = blkbits;
dio->blkfactor = inode->i_blkbits - blkbits; dio->blkfactor = inode->i_blkbits - blkbits;
dio->start_zero_done = 0; dio->start_zero_done = 0;
dio->size = 0;
dio->block_in_file = offset >> blkbits; dio->block_in_file = offset >> blkbits;
dio->blocks_available = 0; dio->blocks_available = 0;
dio->cur_page = NULL; dio->cur_page = NULL;
...@@ -913,9 +941,9 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, ...@@ -913,9 +941,9 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
* (or synchronous) device could take the count to zero while we're * (or synchronous) device could take the count to zero while we're
* still submitting BIOs. * still submitting BIOs.
*/ */
atomic_set(&dio->bio_count, 1); dio->bio_count = 1;
atomic_set(&dio->bios_in_flight, 0); dio->bios_in_flight = 0;
spin_lock_init(&dio->bio_list_lock); spin_lock_init(&dio->bio_lock);
dio->bio_list = NULL; dio->bio_list = NULL;
dio->waiter = NULL; dio->waiter = NULL;
...@@ -925,7 +953,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, ...@@ -925,7 +953,7 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
for (seg = 0; seg < nr_segs; seg++) { for (seg = 0; seg < nr_segs; seg++) {
user_addr = (unsigned long)iov[seg].iov_base; user_addr = (unsigned long)iov[seg].iov_base;
bytes = iov[seg].iov_len; dio->size += bytes = iov[seg].iov_len;
/* Index into the first page of the first block */ /* Index into the first page of the first block */
dio->first_block_in_page = (user_addr & ~PAGE_MASK) >> blkbits; dio->first_block_in_page = (user_addr & ~PAGE_MASK) >> blkbits;
...@@ -956,6 +984,13 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, ...@@ -956,6 +984,13 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
} }
} /* end iovec loop */ } /* end iovec loop */
if (ret == -ENOTBLK && rw == WRITE) {
/*
* The remaining part of the request will be
* be handled by buffered I/O when we return
*/
ret = 0;
}
/* /*
* There may be some unwritten disk at the end of a part-written * There may be some unwritten disk at the end of a part-written
* fs-block-sized block. Go zero that now. * fs-block-sized block. Go zero that now.
...@@ -991,32 +1026,35 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, ...@@ -991,32 +1026,35 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
* reflect the number of to-be-processed BIOs. * reflect the number of to-be-processed BIOs.
*/ */
if (dio->is_async) { if (dio->is_async) {
if (ret == 0) int should_wait = 0;
ret = dio->result; /* Bytes written */
if (ret == -ENOTBLK) { if (dio->result < dio->size && rw == WRITE) {
/*
* The request will be reissued via buffered I/O
* when we return; Any I/O already issued
* effectively becomes redundant.
*/
dio->result = ret;
dio->waiter = current; dio->waiter = current;
should_wait = 1;
} }
if (ret == 0)
ret = dio->result;
finished_one_bio(dio); /* This can free the dio */ finished_one_bio(dio); /* This can free the dio */
blk_run_queues(); blk_run_queues();
if (ret == -ENOTBLK) { if (should_wait) {
unsigned long flags;
/* /*
* Wait for already issued I/O to drain out and * Wait for already issued I/O to drain out and
* release its references to user-space pages * release its references to user-space pages
* before returning to fallback on buffered I/O * before returning to fallback on buffered I/O
*/ */
spin_lock_irqsave(&dio->bio_lock, flags);
set_current_state(TASK_UNINTERRUPTIBLE); set_current_state(TASK_UNINTERRUPTIBLE);
while (atomic_read(&dio->bio_count)) { while (dio->bio_count) {
spin_unlock_irqrestore(&dio->bio_lock, flags);
io_schedule(); io_schedule();
spin_lock_irqsave(&dio->bio_lock, flags);
set_current_state(TASK_UNINTERRUPTIBLE); set_current_state(TASK_UNINTERRUPTIBLE);
} }
spin_unlock_irqrestore(&dio->bio_lock, flags);
set_current_state(TASK_RUNNING); set_current_state(TASK_RUNNING);
dio->waiter = NULL; kfree(dio);
} }
} else { } else {
finished_one_bio(dio); finished_one_bio(dio);
...@@ -1038,7 +1076,12 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode, ...@@ -1038,7 +1076,12 @@ direct_io_worker(int rw, struct kiocb *iocb, struct inode *inode,
} }
dio_complete(dio, offset, ret); dio_complete(dio, offset, ret);
/* We could have also come here on an AIO file extend */ /* We could have also come here on an AIO file extend */
if (!is_sync_kiocb(iocb) && (ret != -ENOTBLK)) if (!is_sync_kiocb(iocb) && rw == WRITE &&
ret >= 0 && dio->result == dio->size)
/*
* For AIO writes where we have completed the
* i/o, we have to mark the the aio complete.
*/
aio_complete(iocb, ret, 0); aio_complete(iocb, ret, 0);
kfree(dio); kfree(dio);
} }
......
...@@ -1827,14 +1827,16 @@ generic_file_aio_write_nolock(struct kiocb *iocb, const struct iovec *iov, ...@@ -1827,14 +1827,16 @@ generic_file_aio_write_nolock(struct kiocb *iocb, const struct iovec *iov,
*/ */
if (written >= 0 && file->f_flags & O_SYNC) if (written >= 0 && file->f_flags & O_SYNC)
status = generic_osync_inode(inode, mapping, OSYNC_METADATA); status = generic_osync_inode(inode, mapping, OSYNC_METADATA);
if (written >= 0 && !is_sync_kiocb(iocb)) if (written == count && !is_sync_kiocb(iocb))
written = -EIOCBQUEUED; written = -EIOCBQUEUED;
if (written != -ENOTBLK) if (written < 0 || written == count)
goto out_status; goto out_status;
/* /*
* direct-io write to a hole: fall through to buffered I/O * direct-io write to a hole: fall through to buffered I/O
* for completing the rest of the request.
*/ */
written = 0; pos += written;
count -= written;
} }
buf = iov->iov_base; buf = iov->iov_base;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment