Commit 40786717 authored by Darrick J. Wong's avatar Darrick J. Wong

xfs: multithreaded iwalk implementation

Create a parallel iwalk implementation and switch quotacheck to use it.
Signed-off-by: default avatarDarrick J. Wong <darrick.wong@oracle.com>
Reviewed-by: default avatarBrian Foster <bfoster@redhat.com>
parent 677717fb
......@@ -85,6 +85,7 @@ xfs-y += xfs_aops.o \
xfs_message.o \
xfs_mount.o \
xfs_mru_cache.o \
xfs_pwork.o \
xfs_reflink.o \
xfs_stats.o \
xfs_super.o \
......
......@@ -40,4 +40,7 @@ struct xfs_globals xfs_globals = {
#else
.bug_on_assert = false, /* assert failures WARN() */
#endif
#ifdef DEBUG
.pwork_threads = -1, /* automatic thread detection */
#endif
};
......@@ -20,6 +20,7 @@
#include "xfs_icache.h"
#include "xfs_health.h"
#include "xfs_trans.h"
#include "xfs_pwork.h"
/*
* Walking Inodes in the Filesystem
......@@ -45,6 +46,9 @@
*/
struct xfs_iwalk_ag {
/* parallel work control data; will be null if single threaded */
struct xfs_pwork pwork;
struct xfs_mount *mp;
struct xfs_trans *tp;
......@@ -182,6 +186,9 @@ xfs_iwalk_ag_recs(
trace_xfs_iwalk_ag_rec(mp, agno, irec);
if (xfs_pwork_want_abort(&iwag->pwork))
return 0;
if (iwag->inobt_walk_fn) {
error = iwag->inobt_walk_fn(mp, tp, agno, irec,
iwag->data);
......@@ -193,6 +200,9 @@ xfs_iwalk_ag_recs(
continue;
for (j = 0; j < XFS_INODES_PER_CHUNK; j++) {
if (xfs_pwork_want_abort(&iwag->pwork))
return 0;
/* Skip if this inode is free */
if (XFS_INOBT_MASK(j) & irec->ir_free)
continue;
......@@ -387,6 +397,8 @@ xfs_iwalk_ag(
struct xfs_inobt_rec_incore *irec;
cond_resched();
if (xfs_pwork_want_abort(&iwag->pwork))
goto out;
/* Fetch the inobt record. */
irec = &iwag->recs[iwag->nr_recs];
......@@ -520,6 +532,7 @@ xfs_iwalk(
.sz_recs = xfs_iwalk_prefetch(inode_records),
.trim_start = 1,
.skip_empty = 1,
.pwork = XFS_PWORK_SINGLE_THREADED,
};
xfs_agnumber_t agno = XFS_INO_TO_AGNO(mp, startino);
int error;
......@@ -541,6 +554,74 @@ xfs_iwalk(
return error;
}
/* Run per-thread iwalk work. */
static int
xfs_iwalk_ag_work(
struct xfs_mount *mp,
struct xfs_pwork *pwork)
{
struct xfs_iwalk_ag *iwag;
int error = 0;
iwag = container_of(pwork, struct xfs_iwalk_ag, pwork);
if (xfs_pwork_want_abort(pwork))
goto out;
error = xfs_iwalk_alloc(iwag);
if (error)
goto out;
error = xfs_iwalk_ag(iwag);
xfs_iwalk_free(iwag);
out:
kmem_free(iwag);
return error;
}
/*
* Walk all the inodes in the filesystem using multiple threads to process each
* AG.
*/
int
xfs_iwalk_threaded(
struct xfs_mount *mp,
xfs_ino_t startino,
xfs_iwalk_fn iwalk_fn,
unsigned int inode_records,
void *data)
{
struct xfs_pwork_ctl pctl;
xfs_agnumber_t agno = XFS_INO_TO_AGNO(mp, startino);
unsigned int nr_threads;
int error;
ASSERT(agno < mp->m_sb.sb_agcount);
nr_threads = xfs_pwork_guess_datadev_parallelism(mp);
error = xfs_pwork_init(mp, &pctl, xfs_iwalk_ag_work, "xfs_iwalk",
nr_threads);
if (error)
return error;
for (; agno < mp->m_sb.sb_agcount; agno++) {
struct xfs_iwalk_ag *iwag;
if (xfs_pwork_ctl_want_abort(&pctl))
break;
iwag = kmem_zalloc(sizeof(struct xfs_iwalk_ag), KM_SLEEP);
iwag->mp = mp;
iwag->iwalk_fn = iwalk_fn;
iwag->data = data;
iwag->startino = startino;
iwag->sz_recs = xfs_iwalk_prefetch(inode_records);
xfs_pwork_queue(&pctl, &iwag->pwork);
startino = XFS_AGINO_TO_INO(mp, agno + 1, 0);
}
return xfs_pwork_destroy(&pctl);
}
/*
* Allow callers to cache up to a page's worth of inobt records. This reflects
* the existing inumbers prefetching behavior. Since the inobt walk does not
......@@ -601,6 +682,7 @@ xfs_inobt_walk(
.data = data,
.startino = startino,
.sz_recs = xfs_inobt_walk_prefetch(inobt_records),
.pwork = XFS_PWORK_SINGLE_THREADED,
};
xfs_agnumber_t agno = XFS_INO_TO_AGNO(mp, startino);
int error;
......
......@@ -15,6 +15,8 @@ typedef int (*xfs_iwalk_fn)(struct xfs_mount *mp, struct xfs_trans *tp,
int xfs_iwalk(struct xfs_mount *mp, struct xfs_trans *tp, xfs_ino_t startino,
xfs_iwalk_fn iwalk_fn, unsigned int inode_records, void *data);
int xfs_iwalk_threaded(struct xfs_mount *mp, xfs_ino_t startino,
xfs_iwalk_fn iwalk_fn, unsigned int inode_records, void *data);
/* Walk all inode btree records in the filesystem starting from @startino. */
typedef int (*xfs_inobt_walk_fn)(struct xfs_mount *mp, struct xfs_trans *tp,
......
// SPDX-License-Identifier: GPL-2.0-or-later
/*
* Copyright (C) 2019 Oracle. All Rights Reserved.
* Author: Darrick J. Wong <darrick.wong@oracle.com>
*/
#include "xfs.h"
#include "xfs_fs.h"
#include "xfs_shared.h"
#include "xfs_format.h"
#include "xfs_log_format.h"
#include "xfs_trans_resv.h"
#include "xfs_mount.h"
#include "xfs_trace.h"
#include "xfs_sysctl.h"
#include "xfs_pwork.h"
/*
* Parallel Work Queue
* ===================
*
* Abstract away the details of running a large and "obviously" parallelizable
* task across multiple CPUs. Callers initialize the pwork control object with
* a desired level of parallelization and a work function. Next, they embed
* struct xfs_pwork in whatever structure they use to pass work context to a
* worker thread and queue that pwork. The work function will be passed the
* pwork item when it is run (from process context) and any returned error will
* be recorded in xfs_pwork_ctl.error. Work functions should check for errors
* and abort if necessary; the non-zeroness of xfs_pwork_ctl.error does not
* stop workqueue item processing.
*
* This is the rough equivalent of the xfsprogs workqueue code, though we can't
* reuse that name here.
*/
/* Invoke our caller's function. */
static void
xfs_pwork_work(
struct work_struct *work)
{
struct xfs_pwork *pwork;
struct xfs_pwork_ctl *pctl;
int error;
pwork = container_of(work, struct xfs_pwork, work);
pctl = pwork->pctl;
error = pctl->work_fn(pctl->mp, pwork);
if (error && !pctl->error)
pctl->error = error;
}
/*
* Set up control data for parallel work. @work_fn is the function that will
* be called. @tag will be written into the kernel threads. @nr_threads is
* the level of parallelism desired, or 0 for no limit.
*/
int
xfs_pwork_init(
struct xfs_mount *mp,
struct xfs_pwork_ctl *pctl,
xfs_pwork_work_fn work_fn,
const char *tag,
unsigned int nr_threads)
{
#ifdef DEBUG
if (xfs_globals.pwork_threads >= 0)
nr_threads = xfs_globals.pwork_threads;
#endif
trace_xfs_pwork_init(mp, nr_threads, current->pid);
pctl->wq = alloc_workqueue("%s-%d", WQ_FREEZABLE, nr_threads, tag,
current->pid);
if (!pctl->wq)
return -ENOMEM;
pctl->work_fn = work_fn;
pctl->error = 0;
pctl->mp = mp;
return 0;
}
/* Queue some parallel work. */
void
xfs_pwork_queue(
struct xfs_pwork_ctl *pctl,
struct xfs_pwork *pwork)
{
INIT_WORK(&pwork->work, xfs_pwork_work);
pwork->pctl = pctl;
queue_work(pctl->wq, &pwork->work);
}
/* Wait for the work to finish and tear down the control structure. */
int
xfs_pwork_destroy(
struct xfs_pwork_ctl *pctl)
{
destroy_workqueue(pctl->wq);
pctl->wq = NULL;
return pctl->error;
}
/*
* Return the amount of parallelism that the data device can handle, or 0 for
* no limit.
*/
unsigned int
xfs_pwork_guess_datadev_parallelism(
struct xfs_mount *mp)
{
struct xfs_buftarg *btp = mp->m_ddev_targp;
/*
* For now we'll go with the most conservative setting possible,
* which is two threads for an SSD and 1 thread everywhere else.
*/
return blk_queue_nonrot(btp->bt_bdev->bd_queue) ? 2 : 1;
}
/* SPDX-License-Identifier: GPL-2.0-or-later */
/*
* Copyright (C) 2019 Oracle. All Rights Reserved.
* Author: Darrick J. Wong <darrick.wong@oracle.com>
*/
#ifndef __XFS_PWORK_H__
#define __XFS_PWORK_H__
struct xfs_pwork;
struct xfs_mount;
typedef int (*xfs_pwork_work_fn)(struct xfs_mount *mp, struct xfs_pwork *pwork);
/*
* Parallel work coordination structure.
*/
struct xfs_pwork_ctl {
struct workqueue_struct *wq;
struct xfs_mount *mp;
xfs_pwork_work_fn work_fn;
int error;
};
/*
* Embed this parallel work control item inside your own work structure,
* then queue work with it.
*/
struct xfs_pwork {
struct work_struct work;
struct xfs_pwork_ctl *pctl;
};
#define XFS_PWORK_SINGLE_THREADED { .pctl = NULL }
/* Have we been told to abort? */
static inline bool
xfs_pwork_ctl_want_abort(
struct xfs_pwork_ctl *pctl)
{
return pctl && pctl->error;
}
/* Have we been told to abort? */
static inline bool
xfs_pwork_want_abort(
struct xfs_pwork *pwork)
{
return xfs_pwork_ctl_want_abort(pwork->pctl);
}
int xfs_pwork_init(struct xfs_mount *mp, struct xfs_pwork_ctl *pctl,
xfs_pwork_work_fn work_fn, const char *tag,
unsigned int nr_threads);
void xfs_pwork_queue(struct xfs_pwork_ctl *pctl, struct xfs_pwork *pwork);
int xfs_pwork_destroy(struct xfs_pwork_ctl *pctl);
unsigned int xfs_pwork_guess_datadev_parallelism(struct xfs_mount *mp);
#endif /* __XFS_PWORK_H__ */
......@@ -1300,7 +1300,7 @@ xfs_qm_quotacheck(
flags |= XFS_PQUOTA_CHKD;
}
error = xfs_iwalk(mp, NULL, 0, xfs_qm_dqusage_adjust, 0, NULL);
error = xfs_iwalk_threaded(mp, 0, xfs_qm_dqusage_adjust, 0, NULL);
if (error)
goto error_return;
......
......@@ -82,6 +82,9 @@ enum {
extern xfs_param_t xfs_params;
struct xfs_globals {
#ifdef DEBUG
int pwork_threads; /* parallel workqueue threads */
#endif
int log_recovery_delay; /* log recovery delay (secs) */
int mount_delay; /* mount setup delay (secs) */
bool bug_on_assert; /* BUG() the kernel on assert failure */
......
......@@ -204,11 +204,51 @@ always_cow_show(
}
XFS_SYSFS_ATTR_RW(always_cow);
#ifdef DEBUG
/*
* Override how many threads the parallel work queue is allowed to create.
* This has to be a debug-only global (instead of an errortag) because one of
* the main users of parallel workqueues is mount time quotacheck.
*/
STATIC ssize_t
pwork_threads_store(
struct kobject *kobject,
const char *buf,
size_t count)
{
int ret;
int val;
ret = kstrtoint(buf, 0, &val);
if (ret)
return ret;
if (val < -1 || val > num_possible_cpus())
return -EINVAL;
xfs_globals.pwork_threads = val;
return count;
}
STATIC ssize_t
pwork_threads_show(
struct kobject *kobject,
char *buf)
{
return snprintf(buf, PAGE_SIZE, "%d\n", xfs_globals.pwork_threads);
}
XFS_SYSFS_ATTR_RW(pwork_threads);
#endif /* DEBUG */
static struct attribute *xfs_dbg_attrs[] = {
ATTR_LIST(bug_on_assert),
ATTR_LIST(log_recovery_delay),
ATTR_LIST(mount_delay),
ATTR_LIST(always_cow),
#ifdef DEBUG
ATTR_LIST(pwork_threads),
#endif
NULL,
};
......
......@@ -3557,6 +3557,24 @@ TRACE_EVENT(xfs_iwalk_ag_rec,
__entry->startino, __entry->freemask)
)
TRACE_EVENT(xfs_pwork_init,
TP_PROTO(struct xfs_mount *mp, unsigned int nr_threads, pid_t pid),
TP_ARGS(mp, nr_threads, pid),
TP_STRUCT__entry(
__field(dev_t, dev)
__field(unsigned int, nr_threads)
__field(pid_t, pid)
),
TP_fast_assign(
__entry->dev = mp->m_super->s_dev;
__entry->nr_threads = nr_threads;
__entry->pid = pid;
),
TP_printk("dev %d:%d nr_threads %u pid %u",
MAJOR(__entry->dev), MINOR(__entry->dev),
__entry->nr_threads, __entry->pid)
)
#endif /* _TRACE_XFS_H */
#undef TRACE_INCLUDE_PATH
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment