Commit 96f37eab authored by Kent Overstreet's avatar Kent Overstreet

bcachefs: factor out thread_with_file, thread_with_stdio

thread_with_stdio now knows how to handle input - fsck can now prompt to
fix errors.
Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent f60250de
...@@ -82,6 +82,7 @@ bcachefs-y := \ ...@@ -82,6 +82,7 @@ bcachefs-y := \
super-io.o \ super-io.o \
sysfs.o \ sysfs.o \
tests.o \ tests.o \
thread_with_file.o \
trace.o \ trace.o \
two_state_shared_lock.o \ two_state_shared_lock.o \
util.o \ util.o \
......
...@@ -464,6 +464,7 @@ enum bch_time_stats { ...@@ -464,6 +464,7 @@ enum bch_time_stats {
#include "replicas_types.h" #include "replicas_types.h"
#include "subvolume_types.h" #include "subvolume_types.h"
#include "super_types.h" #include "super_types.h"
#include "thread_with_file_types.h"
/* Number of nodes btree coalesce will try to coalesce at once */ /* Number of nodes btree coalesce will try to coalesce at once */
#define GC_MERGE_NODES 4U #define GC_MERGE_NODES 4U
...@@ -478,12 +479,6 @@ enum bch_time_stats { ...@@ -478,12 +479,6 @@ enum bch_time_stats {
struct btree; struct btree;
struct log_output {
spinlock_t lock;
wait_queue_head_t wait;
struct printbuf buf;
};
enum gc_phase { enum gc_phase {
GC_PHASE_NOT_RUNNING, GC_PHASE_NOT_RUNNING,
GC_PHASE_START, GC_PHASE_START,
...@@ -739,8 +734,8 @@ struct bch_fs { ...@@ -739,8 +734,8 @@ struct bch_fs {
struct super_block *vfs_sb; struct super_block *vfs_sb;
dev_t dev; dev_t dev;
char name[40]; char name[40];
struct log_output *output; struct stdio_redirect *stdio;
struct task_struct *output_filter; struct task_struct *stdio_filter;
/* ro/rw, add/remove/resize devices: */ /* ro/rw, add/remove/resize devices: */
struct rw_semaphore state_lock; struct rw_semaphore state_lock;
...@@ -1252,6 +1247,15 @@ static inline bool bch2_dev_exists2(const struct bch_fs *c, unsigned dev) ...@@ -1252,6 +1247,15 @@ static inline bool bch2_dev_exists2(const struct bch_fs *c, unsigned dev)
return dev < c->sb.nr_devices && c->devs[dev]; return dev < c->sb.nr_devices && c->devs[dev];
} }
static inline struct stdio_redirect *bch2_fs_stdio_redirect(struct bch_fs *c)
{
struct stdio_redirect *stdio = c->stdio;
if (c->stdio_filter && c->stdio_filter != current)
stdio = NULL;
return stdio;
}
#define BKEY_PADDED_ONSTACK(key, pad) \ #define BKEY_PADDED_ONSTACK(key, pad) \
struct { struct bkey_i key; __u64 key ## _pad[pad]; } struct { struct bkey_i key; __u64 key ## _pad[pad]; }
......
...@@ -11,16 +11,13 @@ ...@@ -11,16 +11,13 @@
#include "replicas.h" #include "replicas.h"
#include "super.h" #include "super.h"
#include "super-io.h" #include "super-io.h"
#include "thread_with_file.h"
#include <linux/anon_inodes.h>
#include <linux/cdev.h> #include <linux/cdev.h>
#include <linux/device.h> #include <linux/device.h>
#include <linux/file.h>
#include <linux/fs.h> #include <linux/fs.h>
#include <linux/ioctl.h> #include <linux/ioctl.h>
#include <linux/kthread.h>
#include <linux/major.h> #include <linux/major.h>
#include <linux/poll.h>
#include <linux/sched/task.h> #include <linux/sched/task.h>
#include <linux/slab.h> #include <linux/slab.h>
#include <linux/uaccess.h> #include <linux/uaccess.h>
...@@ -31,65 +28,6 @@ static int copy_to_user_errcode(void __user *to, const void *from, unsigned long ...@@ -31,65 +28,6 @@ static int copy_to_user_errcode(void __user *to, const void *from, unsigned long
return copy_to_user(to, from, n) ? -EFAULT : 0; return copy_to_user(to, from, n) ? -EFAULT : 0;
} }
struct thread_with_file {
struct task_struct *task;
int ret;
bool done;
};
static void thread_with_file_exit(struct thread_with_file *thr)
{
if (thr->task) {
kthread_stop(thr->task);
put_task_struct(thr->task);
}
}
__printf(4, 0)
static int run_thread_with_file(struct thread_with_file *thr,
const struct file_operations *fops,
int (*fn)(void *), const char *fmt, ...)
{
va_list args;
struct file *file = NULL;
int ret, fd = -1;
struct printbuf name = PRINTBUF;
unsigned fd_flags = O_RDONLY|O_CLOEXEC|O_NONBLOCK;
va_start(args, fmt);
prt_vprintf(&name, fmt, args);
va_end(args);
thr->ret = 0;
thr->task = kthread_create(fn, thr, name.buf);
ret = PTR_ERR_OR_ZERO(thr->task);
if (ret)
goto err;
ret = get_unused_fd_flags(fd_flags);
if (ret < 0)
goto err_stop_task;
fd = ret;
file = anon_inode_getfile(name.buf, fops, thr, fd_flags);
ret = PTR_ERR_OR_ZERO(file);
if (ret)
goto err_put_fd;
fd_install(fd, file);
get_task_struct(thr->task);
wake_up_process(thr->task);
printbuf_exit(&name);
return fd;
err_put_fd:
put_unused_fd(fd);
err_stop_task:
kthread_stop(thr->task);
err:
printbuf_exit(&name);
return ret;
}
/* returns with ref on ca->ref */ /* returns with ref on ca->ref */
static struct bch_dev *bch2_device_lookup(struct bch_fs *c, u64 dev, static struct bch_dev *bch2_device_lookup(struct bch_fs *c, u64 dev,
unsigned flags) unsigned flags)
...@@ -200,132 +138,33 @@ static long bch2_ioctl_incremental(struct bch_ioctl_incremental __user *user_arg ...@@ -200,132 +138,33 @@ static long bch2_ioctl_incremental(struct bch_ioctl_incremental __user *user_arg
#endif #endif
struct fsck_thread { struct fsck_thread {
struct thread_with_file thr; struct thread_with_stdio thr;
struct printbuf buf;
struct bch_fs *c; struct bch_fs *c;
char **devs; char **devs;
size_t nr_devs; size_t nr_devs;
struct bch_opts opts; struct bch_opts opts;
struct log_output output;
DARRAY(char) output2;
}; };
static void bch2_fsck_thread_free(struct fsck_thread *thr) static void bch2_fsck_thread_exit(struct thread_with_stdio *_thr)
{ {
thread_with_file_exit(&thr->thr); struct fsck_thread *thr = container_of(_thr, struct fsck_thread, thr);
if (thr->devs) if (thr->devs)
for (size_t i = 0; i < thr->nr_devs; i++) for (size_t i = 0; i < thr->nr_devs; i++)
kfree(thr->devs[i]); kfree(thr->devs[i]);
darray_exit(&thr->output2);
printbuf_exit(&thr->output.buf);
kfree(thr->devs); kfree(thr->devs);
kfree(thr); kfree(thr);
} }
static int bch2_fsck_thread_release(struct inode *inode, struct file *file)
{
struct fsck_thread *thr = container_of(file->private_data, struct fsck_thread, thr);
bch2_fsck_thread_free(thr);
return 0;
}
static bool fsck_thread_ready(struct fsck_thread *thr)
{
return thr->output.buf.pos ||
thr->output2.nr ||
thr->thr.done;
}
static ssize_t bch2_fsck_thread_read(struct file *file, char __user *buf,
size_t len, loff_t *ppos)
{
struct fsck_thread *thr = container_of(file->private_data, struct fsck_thread, thr);
size_t copied = 0, b;
int ret = 0;
if ((file->f_flags & O_NONBLOCK) &&
!fsck_thread_ready(thr))
return -EAGAIN;
ret = wait_event_interruptible(thr->output.wait,
fsck_thread_ready(thr));
if (ret)
return ret;
if (thr->thr.done)
return 0;
while (len) {
ret = darray_make_room(&thr->output2, thr->output.buf.pos);
if (ret)
break;
spin_lock_irq(&thr->output.lock);
b = min_t(size_t, darray_room(thr->output2), thr->output.buf.pos);
memcpy(&darray_top(thr->output2), thr->output.buf.buf, b);
memmove(thr->output.buf.buf,
thr->output.buf.buf + b,
thr->output.buf.pos - b);
thr->output2.nr += b;
thr->output.buf.pos -= b;
spin_unlock_irq(&thr->output.lock);
b = min(len, thr->output2.nr);
if (!b)
break;
b -= copy_to_user(buf, thr->output2.data, b);
if (!b) {
ret = -EFAULT;
break;
}
copied += b;
buf += b;
len -= b;
memmove(thr->output2.data,
thr->output2.data + b,
thr->output2.nr - b);
thr->output2.nr -= b;
}
return copied ?: ret;
}
static __poll_t bch2_fsck_thread_poll(struct file *file, struct poll_table_struct *wait)
{
struct fsck_thread *thr = container_of(file->private_data, struct fsck_thread, thr);
poll_wait(file, &thr->output.wait, wait);
return fsck_thread_ready(thr)
? EPOLLIN|EPOLLHUP
: 0;
}
static const struct file_operations fsck_thread_ops = {
.release = bch2_fsck_thread_release,
.read = bch2_fsck_thread_read,
.poll = bch2_fsck_thread_poll,
.llseek = no_llseek,
};
static int bch2_fsck_offline_thread_fn(void *arg) static int bch2_fsck_offline_thread_fn(void *arg)
{ {
struct fsck_thread *thr = container_of(arg, struct fsck_thread, thr); struct fsck_thread *thr = container_of(arg, struct fsck_thread, thr);
struct bch_fs *c = bch2_fs_open(thr->devs, thr->nr_devs, thr->opts); struct bch_fs *c = bch2_fs_open(thr->devs, thr->nr_devs, thr->opts);
thr->thr.ret = PTR_ERR_OR_ZERO(c); thr->thr.thr.ret = PTR_ERR_OR_ZERO(c);
if (!thr->thr.ret) if (!thr->thr.thr.ret)
bch2_fs_stop(c); bch2_fs_stop(c);
thr->thr.done = true; thread_with_stdio_done(&thr->thr);
wake_up(&thr->output.wait);
return 0; return 0;
} }
...@@ -354,11 +193,6 @@ static long bch2_ioctl_fsck_offline(struct bch_ioctl_fsck_offline __user *user_a ...@@ -354,11 +193,6 @@ static long bch2_ioctl_fsck_offline(struct bch_ioctl_fsck_offline __user *user_a
thr->opts = bch2_opts_empty(); thr->opts = bch2_opts_empty();
thr->nr_devs = arg.nr_devs; thr->nr_devs = arg.nr_devs;
thr->output.buf = PRINTBUF;
thr->output.buf.atomic++;
spin_lock_init(&thr->output.lock);
init_waitqueue_head(&thr->output.wait);
darray_init(&thr->output2);
if (copy_from_user(devs, &user_arg->devs[0], if (copy_from_user(devs, &user_arg->devs[0],
array_size(sizeof(user_arg->devs[0]), arg.nr_devs))) { array_size(sizeof(user_arg->devs[0]), arg.nr_devs))) {
...@@ -384,16 +218,15 @@ static long bch2_ioctl_fsck_offline(struct bch_ioctl_fsck_offline __user *user_a ...@@ -384,16 +218,15 @@ static long bch2_ioctl_fsck_offline(struct bch_ioctl_fsck_offline __user *user_a
goto err; goto err;
} }
opt_set(thr->opts, log_output, (u64)(unsigned long)&thr->output); opt_set(thr->opts, stdio, (u64)(unsigned long)&thr->thr.stdio);
ret = run_thread_with_file(&thr->thr, ret = bch2_run_thread_with_stdio(&thr->thr,
&fsck_thread_ops, bch2_fsck_thread_exit,
bch2_fsck_offline_thread_fn, bch2_fsck_offline_thread_fn);
"bch-fsck");
err: err:
if (ret < 0) { if (ret < 0) {
if (thr) if (thr)
bch2_fsck_thread_free(thr); bch2_fsck_thread_exit(&thr->thr);
pr_err("ret %s", bch2_err_str(ret)); pr_err("ret %s", bch2_err_str(ret));
} }
kfree(devs); kfree(devs);
...@@ -592,7 +425,7 @@ static int bch2_data_job_release(struct inode *inode, struct file *file) ...@@ -592,7 +425,7 @@ static int bch2_data_job_release(struct inode *inode, struct file *file)
{ {
struct bch_data_ctx *ctx = container_of(file->private_data, struct bch_data_ctx, thr); struct bch_data_ctx *ctx = container_of(file->private_data, struct bch_data_ctx, thr);
thread_with_file_exit(&ctx->thr); bch2_thread_with_file_exit(&ctx->thr);
kfree(ctx); kfree(ctx);
return 0; return 0;
} }
...@@ -642,10 +475,9 @@ static long bch2_ioctl_data(struct bch_fs *c, ...@@ -642,10 +475,9 @@ static long bch2_ioctl_data(struct bch_fs *c,
ctx->c = c; ctx->c = c;
ctx->arg = arg; ctx->arg = arg;
ret = run_thread_with_file(&ctx->thr, ret = bch2_run_thread_with_file(&ctx->thr,
&bcachefs_data_ops, &bcachefs_data_ops,
bch2_data_thread, bch2_data_thread);
"bch-data/%s", c->name);
if (ret < 0) if (ret < 0)
kfree(ctx); kfree(ctx);
return ret; return ret;
...@@ -936,8 +768,8 @@ static int bch2_fsck_online_thread_fn(void *arg) ...@@ -936,8 +768,8 @@ static int bch2_fsck_online_thread_fn(void *arg)
struct fsck_thread *thr = container_of(arg, struct fsck_thread, thr); struct fsck_thread *thr = container_of(arg, struct fsck_thread, thr);
struct bch_fs *c = thr->c; struct bch_fs *c = thr->c;
c->output_filter = current; c->stdio_filter = current;
c->output = &thr->output; c->stdio = &thr->thr.stdio;
/* /*
* XXX: can we figure out a way to do this without mucking with c->opts? * XXX: can we figure out a way to do this without mucking with c->opts?
...@@ -949,11 +781,10 @@ static int bch2_fsck_online_thread_fn(void *arg) ...@@ -949,11 +781,10 @@ static int bch2_fsck_online_thread_fn(void *arg)
c->curr_recovery_pass = BCH_RECOVERY_PASS_check_alloc_info; c->curr_recovery_pass = BCH_RECOVERY_PASS_check_alloc_info;
bch2_run_online_recovery_passes(c); bch2_run_online_recovery_passes(c);
c->output = NULL; c->stdio = NULL;
c->output_filter = NULL; c->stdio_filter = NULL;
thr->thr.done = true; thread_with_stdio_done(&thr->thr);
wake_up(&thr->output.wait);
up(&c->online_fsck_mutex); up(&c->online_fsck_mutex);
bch2_ro_ref_put(c); bch2_ro_ref_put(c);
...@@ -988,11 +819,6 @@ static long bch2_ioctl_fsck_online(struct bch_fs *c, ...@@ -988,11 +819,6 @@ static long bch2_ioctl_fsck_online(struct bch_fs *c,
thr->c = c; thr->c = c;
thr->opts = bch2_opts_empty(); thr->opts = bch2_opts_empty();
thr->output.buf = PRINTBUF;
thr->output.buf.atomic++;
spin_lock_init(&thr->output.lock);
init_waitqueue_head(&thr->output.wait);
darray_init(&thr->output2);
if (arg.opts) { if (arg.opts) {
char *optstr = strndup_user((char __user *)(unsigned long) arg.opts, 1 << 16); char *optstr = strndup_user((char __user *)(unsigned long) arg.opts, 1 << 16);
...@@ -1005,15 +831,14 @@ static long bch2_ioctl_fsck_online(struct bch_fs *c, ...@@ -1005,15 +831,14 @@ static long bch2_ioctl_fsck_online(struct bch_fs *c,
goto err; goto err;
} }
ret = run_thread_with_file(&thr->thr, ret = bch2_run_thread_with_stdio(&thr->thr,
&fsck_thread_ops, bch2_fsck_thread_exit,
bch2_fsck_online_thread_fn, bch2_fsck_online_thread_fn);
"bch-fsck");
err: err:
if (ret < 0) { if (ret < 0) {
bch_err_fn(c, ret); bch_err_fn(c, ret);
if (thr) if (thr)
bch2_fsck_thread_free(thr); bch2_fsck_thread_exit(&thr->thr);
up(&c->online_fsck_mutex); up(&c->online_fsck_mutex);
bch2_ro_ref_put(c); bch2_ro_ref_put(c);
} }
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "bcachefs.h" #include "bcachefs.h"
#include "error.h" #include "error.h"
#include "super.h" #include "super.h"
#include "thread_with_file.h"
#define FSCK_ERR_RATELIMIT_NR 10 #define FSCK_ERR_RATELIMIT_NR 10
...@@ -69,40 +70,66 @@ enum ask_yn { ...@@ -69,40 +70,66 @@ enum ask_yn {
YN_ALLYES, YN_ALLYES,
}; };
static enum ask_yn parse_yn_response(char *buf)
{
buf = strim(buf);
if (strlen(buf) == 1)
switch (buf[0]) {
case 'n':
return YN_NO;
case 'y':
return YN_YES;
case 'N':
return YN_ALLNO;
case 'Y':
return YN_ALLYES;
}
return -1;
}
#ifdef __KERNEL__ #ifdef __KERNEL__
#define bch2_fsck_ask_yn() YN_NO static enum ask_yn bch2_fsck_ask_yn(struct bch_fs *c)
{
struct stdio_redirect *stdio = c->stdio;
if (c->stdio_filter && c->stdio_filter != current)
stdio = NULL;
if (!stdio)
return YN_NO;
char buf[100];
int ret;
do {
bch2_print(c, " (y,n, or Y,N for all errors of this type) ");
int r = bch2_stdio_redirect_readline(stdio, buf, sizeof(buf) - 1);
if (r < 0)
return YN_NO;
buf[r] = '\0';
} while ((ret = parse_yn_response(buf)) < 0);
return ret;
}
#else #else
#include "tools-util.h" #include "tools-util.h"
enum ask_yn bch2_fsck_ask_yn(void) static enum ask_yn bch2_fsck_ask_yn(struct bch_fs *c)
{ {
char *buf = NULL; char *buf = NULL;
size_t buflen = 0; size_t buflen = 0;
bool ret; int ret;
while (true) { do {
fputs(" (y,n, or Y,N for all errors of this type) ", stdout); fputs(" (y,n, or Y,N for all errors of this type) ", stdout);
fflush(stdout); fflush(stdout);
if (getline(&buf, &buflen, stdin) < 0) if (getline(&buf, &buflen, stdin) < 0)
die("error reading from standard input"); die("error reading from standard input");
} while ((ret = parse_yn_response(buf)) < 0);
strim(buf);
if (strlen(buf) != 1)
continue;
switch (buf[0]) {
case 'n':
return YN_NO;
case 'y':
return YN_YES;
case 'N':
return YN_ALLNO;
case 'Y':
return YN_ALLYES;
}
}
free(buf); free(buf);
return ret; return ret;
...@@ -221,10 +248,13 @@ int bch2_fsck_err(struct bch_fs *c, ...@@ -221,10 +248,13 @@ int bch2_fsck_err(struct bch_fs *c,
int ask; int ask;
prt_str(out, ": fix?"); prt_str(out, ": fix?");
bch2_print_string_as_lines(KERN_ERR, out->buf); if (bch2_fs_stdio_redirect(c))
bch2_print(c, "%s", out->buf);
else
bch2_print_string_as_lines(KERN_ERR, out->buf);
print = false; print = false;
ask = bch2_fsck_ask_yn(); ask = bch2_fsck_ask_yn(c);
if (ask >= YN_ALLNO && s) if (ask >= YN_ALLNO && s)
s->fix = ask == YN_ALLNO s->fix = ask == YN_ALLNO
...@@ -253,8 +283,12 @@ int bch2_fsck_err(struct bch_fs *c, ...@@ -253,8 +283,12 @@ int bch2_fsck_err(struct bch_fs *c,
!(flags & FSCK_CAN_IGNORE))) !(flags & FSCK_CAN_IGNORE)))
ret = -BCH_ERR_fsck_errors_not_fixed; ret = -BCH_ERR_fsck_errors_not_fixed;
if (print) if (print) {
bch2_print_string_as_lines(KERN_ERR, out->buf); if (bch2_fs_stdio_redirect(c))
bch2_print(c, "%s\n", out->buf);
else
bch2_print_string_as_lines(KERN_ERR, out->buf);
}
if (!test_bit(BCH_FS_fsck_done, &c->flags) && if (!test_bit(BCH_FS_fsck_done, &c->flags) &&
(ret != -BCH_ERR_fsck_fix && (ret != -BCH_ERR_fsck_fix &&
......
...@@ -414,11 +414,11 @@ enum fsck_err_opts { ...@@ -414,11 +414,11 @@ enum fsck_err_opts {
OPT_BOOL(), \ OPT_BOOL(), \
BCH2_NO_SB_OPT, false, \ BCH2_NO_SB_OPT, false, \
NULL, "Allocate the buckets_nouse bitmap") \ NULL, "Allocate the buckets_nouse bitmap") \
x(log_output, u64, \ x(stdio, u64, \
0, \ 0, \
OPT_UINT(0, S64_MAX), \ OPT_UINT(0, S64_MAX), \
BCH2_NO_SB_OPT, false, \ BCH2_NO_SB_OPT, false, \
NULL, "Pointer to a struct log_output") \ NULL, "Pointer to a struct stdio_redirect") \
x(project, u8, \ x(project, u8, \
OPT_INODE, \ OPT_INODE, \
OPT_BOOL(), \ OPT_BOOL(), \
......
...@@ -88,14 +88,11 @@ const char * const bch2_fs_flag_strs[] = { ...@@ -88,14 +88,11 @@ const char * const bch2_fs_flag_strs[] = {
void __bch2_print(struct bch_fs *c, const char *fmt, ...) void __bch2_print(struct bch_fs *c, const char *fmt, ...)
{ {
struct log_output *output = c->output; struct stdio_redirect *stdio = bch2_fs_stdio_redirect(c);
va_list args;
if (c->output_filter && c->output_filter != current)
output = NULL;
va_list args;
va_start(args, fmt); va_start(args, fmt);
if (likely(!output)) { if (likely(!stdio)) {
vprintk(fmt, args); vprintk(fmt, args);
} else { } else {
unsigned long flags; unsigned long flags;
...@@ -103,11 +100,11 @@ void __bch2_print(struct bch_fs *c, const char *fmt, ...) ...@@ -103,11 +100,11 @@ void __bch2_print(struct bch_fs *c, const char *fmt, ...)
if (fmt[0] == KERN_SOH[0]) if (fmt[0] == KERN_SOH[0])
fmt += 2; fmt += 2;
spin_lock_irqsave(&output->lock, flags); spin_lock_irqsave(&stdio->output_lock, flags);
prt_vprintf(&output->buf, fmt, args); prt_vprintf(&stdio->output_buf, fmt, args);
spin_unlock_irqrestore(&output->lock, flags); spin_unlock_irqrestore(&stdio->output_lock, flags);
wake_up(&output->wait); wake_up(&stdio->output_wait);
} }
va_end(args); va_end(args);
} }
...@@ -724,7 +721,7 @@ static struct bch_fs *bch2_fs_alloc(struct bch_sb *sb, struct bch_opts opts) ...@@ -724,7 +721,7 @@ static struct bch_fs *bch2_fs_alloc(struct bch_sb *sb, struct bch_opts opts)
goto out; goto out;
} }
c->output = (void *)(unsigned long) opts.log_output; c->stdio = (void *)(unsigned long) opts.stdio;
__module_get(THIS_MODULE); __module_get(THIS_MODULE);
......
// SPDX-License-Identifier: GPL-2.0
#include "bcachefs.h"
#include "printbuf.h"
#include "thread_with_file.h"
#include <linux/anon_inodes.h>
#include <linux/file.h>
#include <linux/kthread.h>
#include <linux/pagemap.h>
#include <linux/poll.h>
void bch2_thread_with_file_exit(struct thread_with_file *thr)
{
if (thr->task) {
kthread_stop(thr->task);
put_task_struct(thr->task);
}
}
int bch2_run_thread_with_file(struct thread_with_file *thr,
const struct file_operations *fops,
int (*fn)(void *))
{
struct file *file = NULL;
int ret, fd = -1;
unsigned fd_flags = O_CLOEXEC;
if (fops->read && fops->write)
fd_flags |= O_RDWR;
else if (fops->read)
fd_flags |= O_RDONLY;
else if (fops->write)
fd_flags |= O_WRONLY;
char name[TASK_COMM_LEN];
get_task_comm(name, current);
thr->ret = 0;
thr->task = kthread_create(fn, thr, "%s", name);
ret = PTR_ERR_OR_ZERO(thr->task);
if (ret)
return ret;
ret = get_unused_fd_flags(fd_flags);
if (ret < 0)
goto err;
fd = ret;
file = anon_inode_getfile(name, fops, thr, fd_flags);
ret = PTR_ERR_OR_ZERO(file);
if (ret)
goto err;
fd_install(fd, file);
get_task_struct(thr->task);
wake_up_process(thr->task);
return fd;
err:
if (fd >= 0)
put_unused_fd(fd);
if (thr->task)
kthread_stop(thr->task);
return ret;
}
static inline bool thread_with_stdio_has_output(struct thread_with_stdio *thr)
{
return thr->stdio.output_buf.pos ||
thr->output2.nr ||
thr->thr.done;
}
static ssize_t thread_with_stdio_read(struct file *file, char __user *buf,
size_t len, loff_t *ppos)
{
struct thread_with_stdio *thr =
container_of(file->private_data, struct thread_with_stdio, thr);
size_t copied = 0, b;
int ret = 0;
if ((file->f_flags & O_NONBLOCK) &&
!thread_with_stdio_has_output(thr))
return -EAGAIN;
ret = wait_event_interruptible(thr->stdio.output_wait,
thread_with_stdio_has_output(thr));
if (ret)
return ret;
if (thr->thr.done)
return 0;
while (len) {
ret = darray_make_room(&thr->output2, thr->stdio.output_buf.pos);
if (ret)
break;
spin_lock_irq(&thr->stdio.output_lock);
b = min_t(size_t, darray_room(thr->output2), thr->stdio.output_buf.pos);
memcpy(&darray_top(thr->output2), thr->stdio.output_buf.buf, b);
memmove(thr->stdio.output_buf.buf,
thr->stdio.output_buf.buf + b,
thr->stdio.output_buf.pos - b);
thr->output2.nr += b;
thr->stdio.output_buf.pos -= b;
spin_unlock_irq(&thr->stdio.output_lock);
b = min(len, thr->output2.nr);
if (!b)
break;
b -= copy_to_user(buf, thr->output2.data, b);
if (!b) {
ret = -EFAULT;
break;
}
copied += b;
buf += b;
len -= b;
memmove(thr->output2.data,
thr->output2.data + b,
thr->output2.nr - b);
thr->output2.nr -= b;
}
return copied ?: ret;
}
static int thread_with_stdio_release(struct inode *inode, struct file *file)
{
struct thread_with_stdio *thr =
container_of(file->private_data, struct thread_with_stdio, thr);
bch2_thread_with_file_exit(&thr->thr);
printbuf_exit(&thr->stdio.input_buf);
printbuf_exit(&thr->stdio.output_buf);
darray_exit(&thr->output2);
thr->exit(thr);
return 0;
}
#define WRITE_BUFFER 4096
static inline bool thread_with_stdio_has_input_space(struct thread_with_stdio *thr)
{
return thr->stdio.input_buf.pos < WRITE_BUFFER || thr->thr.done;
}
static ssize_t thread_with_stdio_write(struct file *file, const char __user *ubuf,
size_t len, loff_t *ppos)
{
struct thread_with_stdio *thr =
container_of(file->private_data, struct thread_with_stdio, thr);
struct printbuf *buf = &thr->stdio.input_buf;
size_t copied = 0;
ssize_t ret = 0;
while (len) {
if (thr->thr.done) {
ret = -EPIPE;
break;
}
size_t b = len - fault_in_readable(ubuf, len);
if (!b) {
ret = -EFAULT;
break;
}
spin_lock(&thr->stdio.input_lock);
if (buf->pos < WRITE_BUFFER)
bch2_printbuf_make_room(buf, min(b, WRITE_BUFFER - buf->pos));
b = min(len, printbuf_remaining_size(buf));
if (b && !copy_from_user_nofault(&buf->buf[buf->pos], ubuf, b)) {
ubuf += b;
len -= b;
copied += b;
buf->pos += b;
}
spin_unlock(&thr->stdio.input_lock);
if (b) {
wake_up(&thr->stdio.input_wait);
} else {
if ((file->f_flags & O_NONBLOCK)) {
ret = -EAGAIN;
break;
}
ret = wait_event_interruptible(thr->stdio.input_wait,
thread_with_stdio_has_input_space(thr));
if (ret)
break;
}
}
return copied ?: ret;
}
static __poll_t thread_with_stdio_poll(struct file *file, struct poll_table_struct *wait)
{
struct thread_with_stdio *thr =
container_of(file->private_data, struct thread_with_stdio, thr);
poll_wait(file, &thr->stdio.output_wait, wait);
poll_wait(file, &thr->stdio.input_wait, wait);
__poll_t mask = 0;
if (thread_with_stdio_has_output(thr))
mask |= EPOLLIN;
if (thread_with_stdio_has_input_space(thr))
mask |= EPOLLOUT;
if (thr->thr.done)
mask |= EPOLLHUP|EPOLLERR;
return mask;
}
static const struct file_operations thread_with_stdio_fops = {
.release = thread_with_stdio_release,
.read = thread_with_stdio_read,
.write = thread_with_stdio_write,
.poll = thread_with_stdio_poll,
.llseek = no_llseek,
};
int bch2_run_thread_with_stdio(struct thread_with_stdio *thr,
void (*exit)(struct thread_with_stdio *),
int (*fn)(void *))
{
thr->stdio.input_buf = PRINTBUF;
thr->stdio.input_buf.atomic++;
spin_lock_init(&thr->stdio.input_lock);
init_waitqueue_head(&thr->stdio.input_wait);
thr->stdio.output_buf = PRINTBUF;
thr->stdio.output_buf.atomic++;
spin_lock_init(&thr->stdio.output_lock);
init_waitqueue_head(&thr->stdio.output_wait);
darray_init(&thr->output2);
thr->exit = exit;
return bch2_run_thread_with_file(&thr->thr, &thread_with_stdio_fops, fn);
}
int bch2_stdio_redirect_read(struct stdio_redirect *stdio, char *buf, size_t len)
{
wait_event(stdio->input_wait,
stdio->input_buf.pos || stdio->done);
if (stdio->done)
return -1;
spin_lock(&stdio->input_lock);
int ret = min(len, stdio->input_buf.pos);
stdio->input_buf.pos -= ret;
memcpy(buf, stdio->input_buf.buf, ret);
memmove(stdio->input_buf.buf,
stdio->input_buf.buf + ret,
stdio->input_buf.pos);
spin_unlock(&stdio->input_lock);
wake_up(&stdio->input_wait);
return ret;
}
int bch2_stdio_redirect_readline(struct stdio_redirect *stdio, char *buf, size_t len)
{
wait_event(stdio->input_wait,
stdio->input_buf.pos || stdio->done);
if (stdio->done)
return -1;
spin_lock(&stdio->input_lock);
int ret = min(len, stdio->input_buf.pos);
char *n = memchr(stdio->input_buf.buf, '\n', ret);
if (n)
ret = min(ret, n + 1 - stdio->input_buf.buf);
stdio->input_buf.pos -= ret;
memcpy(buf, stdio->input_buf.buf, ret);
memmove(stdio->input_buf.buf,
stdio->input_buf.buf + ret,
stdio->input_buf.pos);
spin_unlock(&stdio->input_lock);
wake_up(&stdio->input_wait);
return ret;
}
/* SPDX-License-Identifier: GPL-2.0 */
#ifndef _BCACHEFS_THREAD_WITH_FILE_H
#define _BCACHEFS_THREAD_WITH_FILE_H
#include "thread_with_file_types.h"
struct task_struct;
struct thread_with_file {
struct task_struct *task;
int ret;
bool done;
};
void bch2_thread_with_file_exit(struct thread_with_file *);
int bch2_run_thread_with_file(struct thread_with_file *,
const struct file_operations *,
int (*fn)(void *));
struct thread_with_stdio {
struct thread_with_file thr;
struct stdio_redirect stdio;
DARRAY(char) output2;
void (*exit)(struct thread_with_stdio *);
};
static inline void thread_with_stdio_done(struct thread_with_stdio *thr)
{
thr->thr.done = true;
thr->stdio.done = true;
wake_up(&thr->stdio.input_wait);
wake_up(&thr->stdio.output_wait);
}
int bch2_run_thread_with_stdio(struct thread_with_stdio *,
void (*exit)(struct thread_with_stdio *),
int (*fn)(void *));
int bch2_stdio_redirect_read(struct stdio_redirect *, char *, size_t);
int bch2_stdio_redirect_readline(struct stdio_redirect *, char *, size_t);
#endif /* _BCACHEFS_THREAD_WITH_FILE_H */
/* SPDX-License-Identifier: GPL-2.0 */
#ifndef _BCACHEFS_THREAD_WITH_FILE_TYPES_H
#define _BCACHEFS_THREAD_WITH_FILE_TYPES_H
struct stdio_redirect {
spinlock_t output_lock;
wait_queue_head_t output_wait;
struct printbuf output_buf;
spinlock_t input_lock;
wait_queue_head_t input_wait;
struct printbuf input_buf;
bool done;
};
#endif /* _BCACHEFS_THREAD_WITH_FILE_TYPES_H */
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment