Commit 9c7504aa authored by Brian Foster's avatar Brian Foster Committed by Dave Chinner

xfs: track and serialize in-flight async buffers against unmount

Newly allocated XFS metadata buffers are added to the LRU once the hold
count is released, which typically occurs after I/O completion. There is
no other mechanism at current that tracks the existence or I/O state of
a new buffer. Further, readahead I/O tends to be submitted
asynchronously by nature, which means the I/O can remain in flight and
actually complete long after the calling context is gone. This means
that file descriptors or any other holds on the filesystem can be
released, allowing the filesystem to be unmounted while I/O is still in
flight. When I/O completion occurs, core data structures may have been
freed, causing completion to run into invalid memory accesses and likely
to panic.

This problem is reproduced on XFS via directory readahead. A filesystem
is mounted, a directory is opened/closed and the filesystem immediately
unmounted. The open/close cycle triggers a directory readahead that if
delayed long enough, runs buffer I/O completion after the unmount has
completed.

To address this problem, add a mechanism to track all in-flight,
asynchronous buffers using per-cpu counters in the buftarg. The buffer
is accounted on the first I/O submission after the current reference is
acquired and unaccounted once the buffer is returned to the LRU or
freed. Update xfs_wait_buftarg() to wait on all in-flight I/O before
walking the LRU list. Once in-flight I/O has completed and the workqueue
has drained, all new buffers should have been released onto the LRU.
Signed-off-by: default avatarBrian Foster <bfoster@redhat.com>
Reviewed-by: default avatarDave Chinner <dchinner@redhat.com>
Signed-off-by: default avatarDave Chinner <david@fromorbit.com>
parent c891c30a
...@@ -79,6 +79,47 @@ xfs_buf_vmap_len( ...@@ -79,6 +79,47 @@ xfs_buf_vmap_len(
return (bp->b_page_count * PAGE_SIZE) - bp->b_offset; return (bp->b_page_count * PAGE_SIZE) - bp->b_offset;
} }
/*
* Bump the I/O in flight count on the buftarg if we haven't yet done so for
* this buffer. The count is incremented once per buffer (per hold cycle)
* because the corresponding decrement is deferred to buffer release. Buffers
* can undergo I/O multiple times in a hold-release cycle and per buffer I/O
* tracking adds unnecessary overhead. This is used for sychronization purposes
* with unmount (see xfs_wait_buftarg()), so all we really need is a count of
* in-flight buffers.
*
* Buffers that are never released (e.g., superblock, iclog buffers) must set
* the XBF_NO_IOACCT flag before I/O submission. Otherwise, the buftarg count
* never reaches zero and unmount hangs indefinitely.
*/
static inline void
xfs_buf_ioacct_inc(
struct xfs_buf *bp)
{
if (bp->b_flags & (XBF_NO_IOACCT|_XBF_IN_FLIGHT))
return;
ASSERT(bp->b_flags & XBF_ASYNC);
bp->b_flags |= _XBF_IN_FLIGHT;
percpu_counter_inc(&bp->b_target->bt_io_count);
}
/*
* Clear the in-flight state on a buffer about to be released to the LRU or
* freed and unaccount from the buftarg.
*/
static inline void
xfs_buf_ioacct_dec(
struct xfs_buf *bp)
{
if (!(bp->b_flags & _XBF_IN_FLIGHT))
return;
ASSERT(bp->b_flags & XBF_ASYNC);
bp->b_flags &= ~_XBF_IN_FLIGHT;
percpu_counter_dec(&bp->b_target->bt_io_count);
}
/* /*
* When we mark a buffer stale, we remove the buffer from the LRU and clear the * When we mark a buffer stale, we remove the buffer from the LRU and clear the
* b_lru_ref count so that the buffer is freed immediately when the buffer * b_lru_ref count so that the buffer is freed immediately when the buffer
...@@ -102,6 +143,14 @@ xfs_buf_stale( ...@@ -102,6 +143,14 @@ xfs_buf_stale(
*/ */
bp->b_flags &= ~_XBF_DELWRI_Q; bp->b_flags &= ~_XBF_DELWRI_Q;
/*
* Once the buffer is marked stale and unlocked, a subsequent lookup
* could reset b_flags. There is no guarantee that the buffer is
* unaccounted (released to LRU) before that occurs. Drop in-flight
* status now to preserve accounting consistency.
*/
xfs_buf_ioacct_dec(bp);
spin_lock(&bp->b_lock); spin_lock(&bp->b_lock);
atomic_set(&bp->b_lru_ref, 0); atomic_set(&bp->b_lru_ref, 0);
if (!(bp->b_state & XFS_BSTATE_DISPOSE) && if (!(bp->b_state & XFS_BSTATE_DISPOSE) &&
...@@ -867,63 +916,85 @@ xfs_buf_hold( ...@@ -867,63 +916,85 @@ xfs_buf_hold(
} }
/* /*
* Releases a hold on the specified buffer. If the * Release a hold on the specified buffer. If the hold count is 1, the buffer is
* the hold count is 1, calls xfs_buf_free. * placed on LRU or freed (depending on b_lru_ref).
*/ */
void void
xfs_buf_rele( xfs_buf_rele(
xfs_buf_t *bp) xfs_buf_t *bp)
{ {
struct xfs_perag *pag = bp->b_pag; struct xfs_perag *pag = bp->b_pag;
bool release;
bool freebuf = false;
trace_xfs_buf_rele(bp, _RET_IP_); trace_xfs_buf_rele(bp, _RET_IP_);
if (!pag) { if (!pag) {
ASSERT(list_empty(&bp->b_lru)); ASSERT(list_empty(&bp->b_lru));
ASSERT(RB_EMPTY_NODE(&bp->b_rbnode)); ASSERT(RB_EMPTY_NODE(&bp->b_rbnode));
if (atomic_dec_and_test(&bp->b_hold)) if (atomic_dec_and_test(&bp->b_hold)) {
xfs_buf_ioacct_dec(bp);
xfs_buf_free(bp); xfs_buf_free(bp);
}
return; return;
} }
ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode)); ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode));
ASSERT(atomic_read(&bp->b_hold) > 0); ASSERT(atomic_read(&bp->b_hold) > 0);
if (atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock)) {
spin_lock(&bp->b_lock);
if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
/*
* If the buffer is added to the LRU take a new
* reference to the buffer for the LRU and clear the
* (now stale) dispose list state flag
*/
if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
bp->b_state &= ~XFS_BSTATE_DISPOSE;
atomic_inc(&bp->b_hold);
}
spin_unlock(&bp->b_lock);
spin_unlock(&pag->pag_buf_lock);
} else {
/*
* most of the time buffers will already be removed from
* the LRU, so optimise that case by checking for the
* XFS_BSTATE_DISPOSE flag indicating the last list the
* buffer was on was the disposal list
*/
if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
} else {
ASSERT(list_empty(&bp->b_lru));
}
spin_unlock(&bp->b_lock);
ASSERT(!(bp->b_flags & _XBF_DELWRI_Q)); release = atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock);
rb_erase(&bp->b_rbnode, &pag->pag_buf_tree); spin_lock(&bp->b_lock);
spin_unlock(&pag->pag_buf_lock); if (!release) {
xfs_perag_put(pag); /*
xfs_buf_free(bp); * Drop the in-flight state if the buffer is already on the LRU
* and it holds the only reference. This is racy because we
* haven't acquired the pag lock, but the use of _XBF_IN_FLIGHT
* ensures the decrement occurs only once per-buf.
*/
if ((atomic_read(&bp->b_hold) == 1) && !list_empty(&bp->b_lru))
xfs_buf_ioacct_dec(bp);
goto out_unlock;
}
/* the last reference has been dropped ... */
xfs_buf_ioacct_dec(bp);
if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
/*
* If the buffer is added to the LRU take a new reference to the
* buffer for the LRU and clear the (now stale) dispose list
* state flag
*/
if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
bp->b_state &= ~XFS_BSTATE_DISPOSE;
atomic_inc(&bp->b_hold);
} }
spin_unlock(&pag->pag_buf_lock);
} else {
/*
* most of the time buffers will already be removed from the
* LRU, so optimise that case by checking for the
* XFS_BSTATE_DISPOSE flag indicating the last list the buffer
* was on was the disposal list
*/
if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
} else {
ASSERT(list_empty(&bp->b_lru));
}
ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
spin_unlock(&pag->pag_buf_lock);
xfs_perag_put(pag);
freebuf = true;
} }
out_unlock:
spin_unlock(&bp->b_lock);
if (freebuf)
xfs_buf_free(bp);
} }
...@@ -1340,6 +1411,7 @@ xfs_buf_submit( ...@@ -1340,6 +1411,7 @@ xfs_buf_submit(
* xfs_buf_ioend too early. * xfs_buf_ioend too early.
*/ */
atomic_set(&bp->b_io_remaining, 1); atomic_set(&bp->b_io_remaining, 1);
xfs_buf_ioacct_inc(bp);
_xfs_buf_ioapply(bp); _xfs_buf_ioapply(bp);
/* /*
...@@ -1525,13 +1597,19 @@ xfs_wait_buftarg( ...@@ -1525,13 +1597,19 @@ xfs_wait_buftarg(
int loop = 0; int loop = 0;
/* /*
* We need to flush the buffer workqueue to ensure that all IO * First wait on the buftarg I/O count for all in-flight buffers to be
* completion processing is 100% done. Just waiting on buffer locks is * released. This is critical as new buffers do not make the LRU until
* not sufficient for async IO as the reference count held over IO is * they are released.
* not released until after the buffer lock is dropped. Hence we need to *
* ensure here that all reference counts have been dropped before we * Next, flush the buffer workqueue to ensure all completion processing
* start walking the LRU list. * has finished. Just waiting on buffer locks is not sufficient for
* async IO as the reference count held over IO is not released until
* after the buffer lock is dropped. Hence we need to ensure here that
* all reference counts have been dropped before we start walking the
* LRU list.
*/ */
while (percpu_counter_sum(&btp->bt_io_count))
delay(100);
drain_workqueue(btp->bt_mount->m_buf_workqueue); drain_workqueue(btp->bt_mount->m_buf_workqueue);
/* loop until there is nothing left on the lru list. */ /* loop until there is nothing left on the lru list. */
...@@ -1628,6 +1706,8 @@ xfs_free_buftarg( ...@@ -1628,6 +1706,8 @@ xfs_free_buftarg(
struct xfs_buftarg *btp) struct xfs_buftarg *btp)
{ {
unregister_shrinker(&btp->bt_shrinker); unregister_shrinker(&btp->bt_shrinker);
ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
percpu_counter_destroy(&btp->bt_io_count);
list_lru_destroy(&btp->bt_lru); list_lru_destroy(&btp->bt_lru);
if (mp->m_flags & XFS_MOUNT_BARRIER) if (mp->m_flags & XFS_MOUNT_BARRIER)
...@@ -1692,6 +1772,9 @@ xfs_alloc_buftarg( ...@@ -1692,6 +1772,9 @@ xfs_alloc_buftarg(
if (list_lru_init(&btp->bt_lru)) if (list_lru_init(&btp->bt_lru))
goto error; goto error;
if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
goto error;
btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count; btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count;
btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan; btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan;
btp->bt_shrinker.seeks = DEFAULT_SEEKS; btp->bt_shrinker.seeks = DEFAULT_SEEKS;
......
...@@ -63,6 +63,7 @@ typedef enum { ...@@ -63,6 +63,7 @@ typedef enum {
#define _XBF_KMEM (1 << 21)/* backed by heap memory */ #define _XBF_KMEM (1 << 21)/* backed by heap memory */
#define _XBF_DELWRI_Q (1 << 22)/* buffer on a delwri queue */ #define _XBF_DELWRI_Q (1 << 22)/* buffer on a delwri queue */
#define _XBF_COMPOUND (1 << 23)/* compound buffer */ #define _XBF_COMPOUND (1 << 23)/* compound buffer */
#define _XBF_IN_FLIGHT (1 << 25) /* I/O in flight, for accounting purposes */
typedef unsigned int xfs_buf_flags_t; typedef unsigned int xfs_buf_flags_t;
...@@ -82,7 +83,8 @@ typedef unsigned int xfs_buf_flags_t; ...@@ -82,7 +83,8 @@ typedef unsigned int xfs_buf_flags_t;
{ _XBF_PAGES, "PAGES" }, \ { _XBF_PAGES, "PAGES" }, \
{ _XBF_KMEM, "KMEM" }, \ { _XBF_KMEM, "KMEM" }, \
{ _XBF_DELWRI_Q, "DELWRI_Q" }, \ { _XBF_DELWRI_Q, "DELWRI_Q" }, \
{ _XBF_COMPOUND, "COMPOUND" } { _XBF_COMPOUND, "COMPOUND" }, \
{ _XBF_IN_FLIGHT, "IN_FLIGHT" }
/* /*
...@@ -116,6 +118,8 @@ typedef struct xfs_buftarg { ...@@ -116,6 +118,8 @@ typedef struct xfs_buftarg {
/* LRU control structures */ /* LRU control structures */
struct shrinker bt_shrinker; struct shrinker bt_shrinker;
struct list_lru bt_lru; struct list_lru bt_lru;
struct percpu_counter bt_io_count;
} xfs_buftarg_t; } xfs_buftarg_t;
struct xfs_buf; struct xfs_buf;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment