Commit a9577554 authored by Andrew Morton's avatar Andrew Morton Committed by Jens Axboe

[PATCH] restructure direct-io to suit bio_add_page

The direct IO code was initially designed to allocate a known-sized
BIO, to fill it with pages and to then send it off.

Then along came bio_add_page().  Really, it broke direct-io.c - it
meant that the direct-IO BIO assembly code no longer had a-priori
knowledge of whether a page would fit into the current BIO.

Our attempts to rework the initial design to play well with
bio_add_page() really weren't adequate.  The code was getting more and
more twisty and we kept finding corner-cases which failed.

So this patch redesigns the BIO assembly and submission path of the
direct-IO code so that it better suits the bio_add_page() semantics.

It introduces another layer in the assembly phase: the 'cur_page' which
is cached in the dio structure.

The function which walks the file mapping do_direct_IO() simply emits a
sequence of (page,offset,len,sector) quads into the next layer down -
submit_page_section().

submit_page_section() is responsible for looking for a merge of the new
quad against the previous page section (same page).  If no merge is
possible it passes the currently-cached page down to the next level,
dio_send_cur_page().

dio_send_cur_page() will try to add the current page to the current
BIO.  If that fails, the current BIO is submitted for IO and we open a
new one.

So it's all nicely layered.  The assembly of sections-of-page into the
current page closely mirrors the assembly of sections-of-BIO into the
current BIO.

At both of these levels everything is done in a "deferred" manner: try
to merge a new request onto the currently-cached one.  If that fails
then send the currently-cached request and then cache this one instead.

Some variables have been renamed to more closely represent their usage.

Some thought has been put into ownership of the various state variables
within `struct dio'.  We were updating and inspecting these in various
places in a rather hard-to-follow manner.  So things have been reworked
so that particular functions "own" particular parts of the dio
structure.  Violators have been exterminated and commentary has been
added to describe this ownership.

The handling of file holes has been simplified.

As a consequence of all this, the code is clearer and simpler than it
used to be, and it now passes the modified-for-O_DIRECT fsx-linux
testing again.
parent caa2f807
...@@ -35,6 +35,7 @@ struct dio { ...@@ -35,6 +35,7 @@ struct dio {
struct inode *inode; struct inode *inode;
int rw; int rw;
unsigned blkbits; /* doesn't change */ unsigned blkbits; /* doesn't change */
int pages_in_io; /* approximate total IO pages */
sector_t block_in_file; /* changes */ sector_t block_in_file; /* changes */
unsigned blocks_available; /* At block_in_file. changes */ unsigned blocks_available; /* At block_in_file. changes */
sector_t final_block_in_request;/* doesn't change */ sector_t final_block_in_request;/* doesn't change */
...@@ -42,17 +43,31 @@ struct dio { ...@@ -42,17 +43,31 @@ struct dio {
int boundary; /* prev block is at a boundary */ int boundary; /* prev block is at a boundary */
int reap_counter; /* rate limit reaping */ int reap_counter; /* rate limit reaping */
get_blocks_t *get_blocks; /* block mapping function */ get_blocks_t *get_blocks; /* block mapping function */
sector_t last_block_in_bio; /* current final block in bio */ sector_t final_block_in_bio; /* current final block in bio + 1 */
sector_t next_block_in_bio; /* next block to be added to bio */ sector_t next_block_for_io; /* next block to be put under IO */
struct buffer_head map_bh; /* last get_blocks() result */ struct buffer_head map_bh; /* last get_blocks() result */
/* Page fetching state */ /*
* Deferred addition of a page to the dio. These variables are
* private to dio_send_cur_page(), submit_page_section() and
* dio_bio_add_page().
*/
struct page *cur_page; /* The page */
unsigned cur_page_offset; /* Offset into it, in bytes */
unsigned cur_page_len; /* Nr of bytes at cur_page_offset */
sector_t cur_page_block; /* Where it starts */
/*
* Page fetching state. These variables belong to dio_refill_pages().
*/
int curr_page; /* changes */ int curr_page; /* changes */
int total_pages; /* doesn't change */ int total_pages; /* doesn't change */
int pages_left; /* approximate total IO pages */
unsigned long curr_user_address;/* changes */ unsigned long curr_user_address;/* changes */
/* Page queue */ /*
* Page queue. These variables belong to dio_refill_pages() and
* dio_get_page().
*/
struct page *pages[DIO_PAGES]; /* page buffer */ struct page *pages[DIO_PAGES]; /* page buffer */
unsigned head; /* next page to process */ unsigned head; /* next page to process */
unsigned tail; /* last valid page + 1 */ unsigned tail; /* last valid page + 1 */
...@@ -318,73 +333,30 @@ static int dio_bio_reap(struct dio *dio) ...@@ -318,73 +333,30 @@ static int dio_bio_reap(struct dio *dio)
* *
* In the case of filesystem holes: the fs may return an arbitrarily-large * In the case of filesystem holes: the fs may return an arbitrarily-large
* hole by returning an appropriate value in b_size and by clearing * hole by returning an appropriate value in b_size and by clearing
* buffer_mapped(). This code _should_ handle that case correctly, but it has * buffer_mapped(). However the direct-io code will only process holes one
* only been tested against single-block holes (b_size == blocksize). * block at a time - it will repeatedly call get_blocks() as it walks the hole.
*/ */
static int get_more_blocks(struct dio *dio) static int get_more_blocks(struct dio *dio)
{ {
int ret; int ret;
struct buffer_head *map_bh = &dio->map_bh; struct buffer_head *map_bh = &dio->map_bh;
if (dio->blocks_available)
return 0;
/* /*
* If there was a memory error and we've overwritten all the * If there was a memory error and we've overwritten all the
* mapped blocks then we can now return that memory error * mapped blocks then we can now return that memory error
*/ */
if (dio->page_errors) {
ret = dio->page_errors; ret = dio->page_errors;
goto out; if (ret == 0) {
}
map_bh->b_state = 0; map_bh->b_state = 0;
map_bh->b_size = 0; map_bh->b_size = 0;
BUG_ON(dio->block_in_file >= dio->final_block_in_request); BUG_ON(dio->block_in_file >= dio->final_block_in_request);
ret = (*dio->get_blocks)(dio->inode, dio->block_in_file, ret = (*dio->get_blocks)(dio->inode, dio->block_in_file,
dio->final_block_in_request - dio->block_in_file, dio->final_block_in_request-dio->block_in_file,
map_bh, dio->rw == WRITE); map_bh, dio->rw == WRITE);
if (ret)
goto out;
if (buffer_mapped(map_bh)) {
BUG_ON(map_bh->b_size == 0);
BUG_ON((map_bh->b_size & ((1 << dio->blkbits) - 1)) != 0);
dio->blocks_available = map_bh->b_size >> dio->blkbits;
/* blockdevs do not set buffer_new */
if (buffer_new(map_bh)) {
sector_t block = map_bh->b_blocknr;
unsigned i;
for (i = 0; i < dio->blocks_available; i++)
unmap_underlying_metadata(map_bh->b_bdev,
block++);
}
} else {
BUG_ON(dio->rw != READ);
if (dio->bio)
dio_bio_submit(dio);
} }
dio->next_block_in_bio = map_bh->b_blocknr;
out:
return ret; return ret;
} }
/*
* Check to see if we can continue to grow the BIO. If not, then send it.
*/
static void dio_prep_bio(struct dio *dio)
{
if (dio->bio == NULL)
return;
if (dio->boundary ||
dio->last_block_in_bio != dio->next_block_in_bio - 1)
dio_bio_submit(dio);
}
/* /*
* There is no bio. Make one now. * There is no bio. Make one now.
*/ */
...@@ -397,7 +369,7 @@ static int dio_new_bio(struct dio *dio, sector_t blkno) ...@@ -397,7 +369,7 @@ static int dio_new_bio(struct dio *dio, sector_t blkno)
if (ret) if (ret)
goto out; goto out;
sector = blkno << (dio->blkbits - 9); sector = blkno << (dio->blkbits - 9);
nr_pages = min(dio->pages_left, bio_get_nr_vecs(dio->map_bh.b_bdev)); nr_pages = min(dio->pages_in_io, bio_get_nr_vecs(dio->map_bh.b_bdev));
BUG_ON(nr_pages <= 0); BUG_ON(nr_pages <= 0);
ret = dio_bio_alloc(dio, dio->map_bh.b_bdev, sector, nr_pages); ret = dio_bio_alloc(dio, dio->map_bh.b_bdev, sector, nr_pages);
dio->boundary = 0; dio->boundary = 0;
...@@ -405,37 +377,156 @@ static int dio_new_bio(struct dio *dio, sector_t blkno) ...@@ -405,37 +377,156 @@ static int dio_new_bio(struct dio *dio, sector_t blkno)
return ret; return ret;
} }
/*
* Attempt tp put the current chunk of 'cur_page' into the current BIO. If
* that was successful then update final_block_in_bio and take a ref against
* the just-added page.
*/
static int dio_bio_add_page(struct dio *dio)
{
int ret;
static int ret = bio_add_page(dio->bio, dio->cur_page,
dio_bio_add_page(struct dio *dio, struct page *page, dio->cur_page_len, dio->cur_page_offset);
unsigned int bv_len, unsigned int bv_offset, sector_t blkno) if (ret == dio->cur_page_len) {
dio->pages_in_io--;
page_cache_get(dio->cur_page);
dio->final_block_in_bio = dio->cur_page_block +
(dio->cur_page_len >> dio->blkbits);
ret = 0;
}
return ret;
}
/*
* Put cur_page under IO. The section of cur_page which is described by
* cur_page_offset,cur_page_len is put into a BIO. The section of cur_page
* starts on-disk at cur_page_block.
*
* We take a ref against the page here (on behalf of its presence in the bio).
*
* The caller of this function is responsible for removing cur_page from the
* dio, and for dropping the refcount which came from that presence.
*/
static int dio_send_cur_page(struct dio *dio)
{ {
int ret = 0; int ret = 0;
if (bv_len == 0) if (dio->bio) {
/*
* See whether this new request is contiguous with the old
*/
if (dio->final_block_in_bio != dio->cur_page_block)
dio_bio_submit(dio);
/*
* Submit now if the underlying fs is about to perform a
* metadata read
*/
if (dio->boundary)
dio_bio_submit(dio);
}
if (dio->bio == NULL) {
ret = dio_new_bio(dio, dio->cur_page_block);
if (ret)
goto out; goto out;
}
/* Take a ref against the page each time it is placed into a BIO */ if (dio_bio_add_page(dio) != 0) {
page_cache_get(page);
if (bio_add_page(dio->bio, page, bv_len, bv_offset) < bv_len) {
dio_bio_submit(dio); dio_bio_submit(dio);
ret = dio_new_bio(dio, blkno); ret = dio_new_bio(dio, dio->cur_page_block);
if (ret == 0) { if (ret == 0) {
ret = bio_add_page(dio->bio, page, bv_len, bv_offset); ret = dio_bio_add_page(dio);
BUG_ON(ret < bv_len); BUG_ON(ret != 0);
} else {
/* The page didn't make it into a BIO */
page_cache_release(page);
} }
} }
dio->pages_left--;
out: out:
return ret; return ret;
} }
/*
* An autonomous function to put a chunk of a page under deferred IO.
*
* The caller doesn't actually know (or care) whether this piece of page is in
* a BIO, or is under IO or whatever. We just take care of all possible
* situations here. The separation between the logic of do_direct_IO() and
* that of submit_page_section() is important for clarity. Please don't break.
*
* The chunk of page starts on-disk at blocknr.
*
* We perform deferred IO, by recording the last-submitted page inside our
* private part of the dio structure. If possible, we just expand the IO
* across that page here.
*
* If that doesn't work out then we put the old page into the bio and add this
* page to the dio instead.
*/
static int
submit_page_section(struct dio *dio, struct page *page,
unsigned offset, unsigned len, sector_t blocknr)
{
int ret = 0;
/*
* Can we just grow the current page's presence in the dio?
*/
if ( (dio->cur_page == page) &&
(dio->cur_page_offset + dio->cur_page_len == offset) &&
(dio->cur_page_block +
(dio->cur_page_len >> dio->blkbits) == blocknr)) {
dio->cur_page_len += len;
/*
* If dio->boundary then we want to schedule the IO now to
* avoid metadata seeks.
*/
if (dio->boundary) {
ret = dio_send_cur_page(dio);
page_cache_release(dio->cur_page);
dio->cur_page = NULL;
}
goto out;
}
/*
* If there's a deferred page already there then send it.
*/
if (dio->cur_page) {
ret = dio_send_cur_page(dio);
page_cache_release(dio->cur_page);
dio->cur_page = NULL;
if (ret)
goto out;
}
page_cache_get(page); /* It is in dio */
dio->cur_page = page;
dio->cur_page_offset = offset;
dio->cur_page_len = len;
dio->cur_page_block = blocknr;
out:
return ret;
}
/*
* Clean any dirty buffers in the blockdev mapping which alias newly-created
* file blocks. Only called for S_ISREG files - blockdevs do not set
* buffer_new
*/
static void clean_blockdev_aliases(struct dio *dio)
{
unsigned i;
for (i = 0; i < dio->blocks_available; i++) {
unmap_underlying_metadata(dio->map_bh.b_bdev,
dio->map_bh.b_blocknr + i);
}
}
/* /*
* Walk the user pages, and the file, mapping blocks to disk and emitting BIOs. * Walk the user pages, and the file, mapping blocks to disk and generating
* a sequence of (page,offset,len,block) mappings. These mappings are injected
* into submit_page_section(), which takes care of the next stage of submission
* *
* Direct IO against a blockdev is different from a file. Because we can * Direct IO against a blockdev is different from a file. Because we can
* happily perform page-sized but 512-byte aligned IOs. It is important that * happily perform page-sized but 512-byte aligned IOs. It is important that
...@@ -448,73 +539,65 @@ dio_bio_add_page(struct dio *dio, struct page *page, ...@@ -448,73 +539,65 @@ dio_bio_add_page(struct dio *dio, struct page *page,
* it should set b_size to PAGE_SIZE or more inside get_blocks(). This gives * it should set b_size to PAGE_SIZE or more inside get_blocks(). This gives
* fine alignment but still allows this function to work in PAGE_SIZE units. * fine alignment but still allows this function to work in PAGE_SIZE units.
*/ */
int do_direct_IO(struct dio *dio) static int do_direct_IO(struct dio *dio)
{ {
const unsigned blkbits = dio->blkbits; const unsigned blkbits = dio->blkbits;
const unsigned blocks_per_page = PAGE_SIZE >> blkbits; const unsigned blocks_per_page = PAGE_SIZE >> blkbits;
struct page *page; struct page *page;
unsigned block_in_page; unsigned block_in_page;
struct buffer_head *map_bh = &dio->map_bh;
int ret = 0; int ret = 0;
/* The I/O can start at any block offset within the first page */ /* The I/O can start at any block offset within the first page */
block_in_page = dio->first_block_in_page; block_in_page = dio->first_block_in_page;
while (dio->block_in_file < dio->final_block_in_request) { while (dio->block_in_file < dio->final_block_in_request) {
int new_page; /* Need to insert this page into the BIO? */
unsigned int bv_offset;
unsigned int bv_len;
sector_t curr_blkno;
page = dio_get_page(dio); page = dio_get_page(dio);
if (IS_ERR(page)) { if (IS_ERR(page)) {
ret = PTR_ERR(page); ret = PTR_ERR(page);
goto out; goto out;
} }
new_page = 1;
bv_offset = 0;
bv_len = 0;
curr_blkno = 0;
while (block_in_page < blocks_per_page) { while (block_in_page < blocks_per_page) {
unsigned offset_in_page = block_in_page << blkbits;
unsigned this_chunk_bytes; /* # of bytes mapped */ unsigned this_chunk_bytes; /* # of bytes mapped */
unsigned this_chunk_blocks; /* # of blocks */ unsigned this_chunk_blocks; /* # of blocks */
unsigned u; unsigned u;
if (dio->blocks_available == 0) {
ret = get_more_blocks(dio); ret = get_more_blocks(dio);
if (ret) if (ret) {
goto fail_release; page_cache_release(page);
goto out;
}
if (buffer_mapped(map_bh)) {
dio->blocks_available =
map_bh->b_size >> dio->blkbits;
dio->next_block_for_io =
map_bh->b_blocknr;
if (buffer_new(map_bh))
clean_blockdev_aliases(dio);
}
}
/* Handle holes */ /* Handle holes */
if (!buffer_mapped(&dio->map_bh)) { if (!buffer_mapped(map_bh)) {
char *kaddr = kmap_atomic(page, KM_USER0); char *kaddr = kmap_atomic(page, KM_USER0);
memset(kaddr + (block_in_page << blkbits), memset(kaddr + (block_in_page << blkbits),
0, 1 << blkbits); 0, 1 << blkbits);
flush_dcache_page(page); flush_dcache_page(page);
kunmap_atomic(kaddr, KM_USER0); kunmap_atomic(kaddr, KM_USER0);
dio->block_in_file++; dio->block_in_file++;
dio->next_block_in_bio++;
block_in_page++; block_in_page++;
goto next_block; goto next_block;
} }
dio_prep_bio(dio); /*
if (dio->bio == NULL) { * Work out, in this_chunk_blocks, how much disk we
ret = dio_new_bio(dio, dio->next_block_in_bio); * can add to this page
if (ret) */
goto fail_release;
new_page = 1;
}
if (new_page) {
bv_len = 0;
bv_offset = block_in_page << blkbits;
curr_blkno = dio->next_block_in_bio;
new_page = 0;
}
/* Work out how much disk we can add to this page */
this_chunk_blocks = dio->blocks_available; this_chunk_blocks = dio->blocks_available;
u = (PAGE_SIZE - (bv_len + bv_offset)) >> blkbits; u = (PAGE_SIZE - offset_in_page) >> blkbits;
if (this_chunk_blocks > u) if (this_chunk_blocks > u)
this_chunk_blocks = u; this_chunk_blocks = u;
u = dio->final_block_in_request - dio->block_in_file; u = dio->final_block_in_request - dio->block_in_file;
...@@ -523,10 +606,15 @@ int do_direct_IO(struct dio *dio) ...@@ -523,10 +606,15 @@ int do_direct_IO(struct dio *dio)
this_chunk_bytes = this_chunk_blocks << blkbits; this_chunk_bytes = this_chunk_blocks << blkbits;
BUG_ON(this_chunk_bytes == 0); BUG_ON(this_chunk_bytes == 0);
bv_len += this_chunk_bytes; dio->boundary = buffer_boundary(map_bh);
dio->next_block_in_bio += this_chunk_blocks; ret = submit_page_section(dio, page, offset_in_page,
dio->last_block_in_bio = dio->next_block_in_bio - 1; this_chunk_bytes, dio->next_block_for_io);
dio->boundary = buffer_boundary(&dio->map_bh); if (ret) {
page_cache_release(page);
goto out;
}
dio->next_block_for_io += this_chunk_blocks;
dio->block_in_file += this_chunk_blocks; dio->block_in_file += this_chunk_blocks;
block_in_page += this_chunk_blocks; block_in_page += this_chunk_blocks;
dio->blocks_available -= this_chunk_blocks; dio->blocks_available -= this_chunk_blocks;
...@@ -536,23 +624,16 @@ int do_direct_IO(struct dio *dio) ...@@ -536,23 +624,16 @@ int do_direct_IO(struct dio *dio)
if (dio->block_in_file == dio->final_block_in_request) if (dio->block_in_file == dio->final_block_in_request)
break; break;
} }
ret = dio_bio_add_page(dio, page, bv_len,
bv_offset, curr_blkno);
if (ret)
goto fail_release;
/* Drop the ref which was taken in get_user_pages() */ /* Drop the ref which was taken in get_user_pages() */
page_cache_release(page); page_cache_release(page);
block_in_page = 0; block_in_page = 0;
} }
goto out;
fail_release:
page_cache_release(page);
out: out:
return ret; return ret;
} }
int static int
direct_io_worker(int rw, struct inode *inode, const struct iovec *iov, direct_io_worker(int rw, struct inode *inode, const struct iovec *iov,
loff_t offset, unsigned long nr_segs, get_blocks_t get_blocks) loff_t offset, unsigned long nr_segs, get_blocks_t get_blocks)
{ {
...@@ -569,11 +650,13 @@ direct_io_worker(int rw, struct inode *inode, const struct iovec *iov, ...@@ -569,11 +650,13 @@ direct_io_worker(int rw, struct inode *inode, const struct iovec *iov,
dio.block_in_file = offset >> blkbits; dio.block_in_file = offset >> blkbits;
dio.blocks_available = 0; dio.blocks_available = 0;
dio.cur_page = NULL;
dio.boundary = 0; dio.boundary = 0;
dio.reap_counter = 0; dio.reap_counter = 0;
dio.get_blocks = get_blocks; dio.get_blocks = get_blocks;
dio.last_block_in_bio = -1; dio.final_block_in_bio = -1;
dio.next_block_in_bio = -1; dio.next_block_for_io = -1;
dio.page_errors = 0; dio.page_errors = 0;
...@@ -582,10 +665,10 @@ direct_io_worker(int rw, struct inode *inode, const struct iovec *iov, ...@@ -582,10 +665,10 @@ direct_io_worker(int rw, struct inode *inode, const struct iovec *iov,
spin_lock_init(&dio.bio_list_lock); spin_lock_init(&dio.bio_list_lock);
dio.bio_list = NULL; dio.bio_list = NULL;
dio.waiter = NULL; dio.waiter = NULL;
dio.pages_left = 0; dio.pages_in_io = 0;
for (seg = 0; seg < nr_segs; seg++) for (seg = 0; seg < nr_segs; seg++)
dio.pages_left += (iov[seg].iov_len / PAGE_SIZE) + 2; dio.pages_in_io += (iov[seg].iov_len >> blkbits) + 2;
for (seg = 0; seg < nr_segs; seg++) { for (seg = 0; seg < nr_segs; seg++) {
user_addr = (unsigned long)iov[seg].iov_base; user_addr = (unsigned long)iov[seg].iov_base;
...@@ -619,6 +702,12 @@ direct_io_worker(int rw, struct inode *inode, const struct iovec *iov, ...@@ -619,6 +702,12 @@ direct_io_worker(int rw, struct inode *inode, const struct iovec *iov,
} /* end iovec loop */ } /* end iovec loop */
if (dio.cur_page) {
ret2 = dio_send_cur_page(&dio);
page_cache_release(dio.cur_page);
if (ret == 0)
ret = ret2;
}
ret2 = dio_await_completion(&dio); ret2 = dio_await_completion(&dio);
if (ret == 0) if (ret == 0)
ret = ret2; ret = ret2;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment