Commit 63b05203 authored by Suparna Bhattacharya's avatar Suparna Bhattacharya Committed by Linus Torvalds

[PATCH] AIO: retry infrastructure fixes and enhancements

From: Daniel McNeil <daniel@osdl.org>
From: Chris Mason <mason@suse.com>

 AIO: retry infrastructure fixes and enhancements

 Reorganises, comments and fixes the AIO retry logic. Fixes 
 and enhancements include:

   - Split iocb setup and execution in io_submit
        (also fixes io_submit error reporting)
   - Use aio workqueue instead of keventd for retries
   - Default high level retry methods
   - Subtle use_mm/unuse_mm fix
   - Code commenting
   - Fix aio process hang on EINVAL (Daniel McNeil)
   - Hold the context lock across unuse_mm
   - Acquire task_lock in use_mm()
   - Allow fops to override the retry method with their own
   - Elevated ref count for AIO retries (Daniel McNeil)
   - set_fs needed when calling use_mm
   - Flush workqueue on __put_ioctx (Chris Mason)
   - Fix io_cancel to work with retries (Chris Mason)
   - Read-immediate option for socket/pipe retry support

 Note on default high-level retry methods support
 ================================================

 High-level retry methods allows an AIO request to be executed as a series of
 non-blocking iterations, where each iteration retries the remaining part of
 the request from where the last iteration left off, by reissuing the
 corresponding AIO fop routine with modified arguments representing the
 remaining I/O.  The retries are "kicked" via the AIO waitqueue callback
 aio_wake_function() which replaces the default wait queue entry used for
 blocking waits.

 The high level retry infrastructure is responsible for running the
 iterations in the mm context (address space) of the caller, and ensures that
 only one retry instance is active at a given time, thus relieving the fops
 themselves from having to deal with potential races of that sort.
Signed-off-by: default avatarAndrew Morton <akpm@osdl.org>
Signed-off-by: default avatarLinus Torvalds <torvalds@osdl.org>
parent 86b9159a
...@@ -39,6 +39,9 @@ ...@@ -39,6 +39,9 @@
#define dprintk(x...) do { ; } while (0) #define dprintk(x...) do { ; } while (0)
#endif #endif
long aio_run = 0; /* for testing only */
long aio_wakeups = 0; /* for testing only */
/*------ sysctl variables----*/ /*------ sysctl variables----*/
atomic_t aio_nr = ATOMIC_INIT(0); /* current system wide number of aio requests */ atomic_t aio_nr = ATOMIC_INIT(0); /* current system wide number of aio requests */
unsigned aio_max_nr = 0x10000; /* system wide maximum number of aio requests */ unsigned aio_max_nr = 0x10000; /* system wide maximum number of aio requests */
...@@ -277,6 +280,7 @@ static void aio_cancel_all(struct kioctx *ctx) ...@@ -277,6 +280,7 @@ static void aio_cancel_all(struct kioctx *ctx)
struct kiocb *iocb = list_kiocb(pos); struct kiocb *iocb = list_kiocb(pos);
list_del_init(&iocb->ki_list); list_del_init(&iocb->ki_list);
cancel = iocb->ki_cancel; cancel = iocb->ki_cancel;
kiocbSetCancelled(iocb);
if (cancel) { if (cancel) {
iocb->ki_users++; iocb->ki_users++;
spin_unlock_irq(&ctx->ctx_lock); spin_unlock_irq(&ctx->ctx_lock);
...@@ -337,6 +341,11 @@ void fastcall exit_aio(struct mm_struct *mm) ...@@ -337,6 +341,11 @@ void fastcall exit_aio(struct mm_struct *mm)
aio_cancel_all(ctx); aio_cancel_all(ctx);
wait_for_all_aios(ctx); wait_for_all_aios(ctx);
/*
* this is an overkill, but ensures we don't leave
* the ctx on the aio_wq
*/
flush_workqueue(aio_wq);
if (1 != atomic_read(&ctx->users)) if (1 != atomic_read(&ctx->users))
printk(KERN_DEBUG printk(KERN_DEBUG
...@@ -359,6 +368,7 @@ void fastcall __put_ioctx(struct kioctx *ctx) ...@@ -359,6 +368,7 @@ void fastcall __put_ioctx(struct kioctx *ctx)
if (unlikely(ctx->reqs_active)) if (unlikely(ctx->reqs_active))
BUG(); BUG();
flush_workqueue(aio_wq);
aio_free_ring(ctx); aio_free_ring(ctx);
mmdrop(ctx->mm); mmdrop(ctx->mm);
ctx->mm = NULL; ctx->mm = NULL;
...@@ -398,6 +408,7 @@ static struct kiocb fastcall *__aio_get_req(struct kioctx *ctx) ...@@ -398,6 +408,7 @@ static struct kiocb fastcall *__aio_get_req(struct kioctx *ctx)
req->ki_obj.user = NULL; req->ki_obj.user = NULL;
req->ki_dtor = NULL; req->ki_dtor = NULL;
req->private = NULL; req->private = NULL;
INIT_LIST_HEAD(&req->ki_run_list);
/* Check if the completion queue has enough free space to /* Check if the completion queue has enough free space to
* accept an event from this io. * accept an event from this io.
...@@ -543,85 +554,323 @@ struct kioctx *lookup_ioctx(unsigned long ctx_id) ...@@ -543,85 +554,323 @@ struct kioctx *lookup_ioctx(unsigned long ctx_id)
return ioctx; return ioctx;
} }
/*
* use_mm
* Makes the calling kernel thread take on the specified
* mm context.
* Called by the retry thread execute retries within the
* iocb issuer's mm context, so that copy_from/to_user
* operations work seamlessly for aio.
* (Note: this routine is intended to be called only
* from a kernel thread context)
*/
static void use_mm(struct mm_struct *mm) static void use_mm(struct mm_struct *mm)
{ {
struct mm_struct *active_mm; struct mm_struct *active_mm;
struct task_struct *tsk = current;
task_lock(tsk);
active_mm = tsk->active_mm;
atomic_inc(&mm->mm_count); atomic_inc(&mm->mm_count);
task_lock(current); tsk->mm = mm;
active_mm = current->active_mm; tsk->active_mm = mm;
current->mm = mm; activate_mm(active_mm, mm);
if (mm != active_mm) { task_unlock(tsk);
current->active_mm = mm;
activate_mm(active_mm, mm);
}
task_unlock(current);
mmdrop(active_mm); mmdrop(active_mm);
} }
static void unuse_mm(struct mm_struct *mm) /*
* unuse_mm
* Reverses the effect of use_mm, i.e. releases the
* specified mm context which was earlier taken on
* by the calling kernel thread
* (Note: this routine is intended to be called only
* from a kernel thread context)
*
* Comments: Called with ctx->ctx_lock held. This nests
* task_lock instead ctx_lock.
*/
void unuse_mm(struct mm_struct *mm)
{ {
task_lock(current); struct task_struct *tsk = current;
current->mm = NULL;
task_unlock(current); task_lock(tsk);
tsk->mm = NULL;
/* active_mm is still 'mm' */ /* active_mm is still 'mm' */
enter_lazy_tlb(mm, current); enter_lazy_tlb(mm, tsk);
task_unlock(tsk);
} }
/* Run on kevent's context. FIXME: needs to be per-cpu and warn if an /*
* operation blocks. * Queue up a kiocb to be retried. Assumes that the kiocb
* has already been marked as kicked, and places it on
* the retry run list for the corresponding ioctx, if it
* isn't already queued. Returns 1 if it actually queued
* the kiocb (to tell the caller to activate the work
* queue to process it), or 0, if it found that it was
* already queued.
*
* Should be called with the spin lock iocb->ki_ctx->ctx_lock
* held
*/ */
static void aio_kick_handler(void *data) static inline int __queue_kicked_iocb(struct kiocb *iocb)
{ {
struct kioctx *ctx = data; struct kioctx *ctx = iocb->ki_ctx;
use_mm(ctx->mm); if (list_empty(&iocb->ki_run_list)) {
list_add_tail(&iocb->ki_run_list,
&ctx->run_list);
iocb->ki_queued++;
return 1;
}
return 0;
}
spin_lock_irq(&ctx->ctx_lock); /* aio_run_iocb
while (!list_empty(&ctx->run_list)) { * This is the core aio execution routine. It is
struct kiocb *iocb; * invoked both for initial i/o submission and
long ret; * subsequent retries via the aio_kick_handler.
* Expects to be invoked with iocb->ki_ctx->lock
* already held. The lock is released and reaquired
* as needed during processing.
*
* Calls the iocb retry method (already setup for the
* iocb on initial submission) for operation specific
* handling, but takes care of most of common retry
* execution details for a given iocb. The retry method
* needs to be non-blocking as far as possible, to avoid
* holding up other iocbs waiting to be serviced by the
* retry kernel thread.
*
* The trickier parts in this code have to do with
* ensuring that only one retry instance is in progress
* for a given iocb at any time. Providing that guarantee
* simplifies the coding of individual aio operations as
* it avoids various potential races.
*/
static ssize_t aio_run_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
ssize_t (*retry)(struct kiocb *);
ssize_t ret;
iocb = list_entry(ctx->run_list.next, struct kiocb, if (iocb->ki_retried++ > 1024*1024) {
ki_run_list); printk("Maximal retry count. Bytes done %Zd\n",
list_del(&iocb->ki_run_list); iocb->ki_nbytes - iocb->ki_left);
iocb->ki_users ++; return -EAGAIN;
spin_unlock_irq(&ctx->ctx_lock); }
if (!(iocb->ki_retried & 0xff)) {
pr_debug("%ld retry: %d of %d (kick %ld, Q %ld run %ld, wake %ld)\n",
iocb->ki_retried,
iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes,
iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups);
}
if (!(retry = iocb->ki_retry)) {
printk("aio_run_iocb: iocb->ki_retry = NULL\n");
return 0;
}
/*
* We don't want the next retry iteration for this
* operation to start until this one has returned and
* updated the iocb state. However, wait_queue functions
* can trigger a kick_iocb from interrupt context in the
* meantime, indicating that data is available for the next
* iteration. We want to remember that and enable the
* next retry iteration _after_ we are through with
* this one.
*
* So, in order to be able to register a "kick", but
* prevent it from being queued now, we clear the kick
* flag, but make the kick code *think* that the iocb is
* still on the run list until we are actually done.
* When we are done with this iteration, we check if
* the iocb was kicked in the meantime and if so, queue
* it up afresh.
*/
kiocbClearKicked(iocb);
/*
* This is so that aio_complete knows it doesn't need to
* pull the iocb off the run list (We can't just call
* INIT_LIST_HEAD because we don't want a kick_iocb to
* queue this on the run list yet)
*/
iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
spin_unlock_irq(&ctx->ctx_lock);
kiocbClearKicked(iocb); /* Quit retrying if the i/o has been cancelled */
ret = iocb->ki_retry(iocb); if (kiocbIsCancelled(iocb)) {
if (-EIOCBQUEUED != ret) { ret = -EINTR;
aio_complete(iocb, ret, 0);
/* must not access the iocb after this */
goto out;
}
/*
* Now we are all set to call the retry method in async
* context. By setting this thread's io_wait context
* to point to the wait queue entry inside the currently
* running iocb for the duration of the retry, we ensure
* that async notification wakeups are queued by the
* operation instead of blocking waits, and when notified,
* cause the iocb to be kicked for continuation (through
* the aio_wake_function callback).
*/
BUG_ON(current->io_wait != NULL);
current->io_wait = &iocb->ki_wait;
ret = retry(iocb);
current->io_wait = NULL;
if (-EIOCBRETRY != ret) {
if (-EIOCBQUEUED != ret) {
BUG_ON(!list_empty(&iocb->ki_wait.task_list));
aio_complete(iocb, ret, 0); aio_complete(iocb, ret, 0);
iocb = NULL; /* must not access the iocb after this */
} }
} else {
/*
* Issue an additional retry to avoid waiting forever if
* no waits were queued (e.g. in case of a short read).
*/
if (list_empty(&iocb->ki_wait.task_list))
kiocbSetKicked(iocb);
}
out:
spin_lock_irq(&ctx->ctx_lock);
spin_lock_irq(&ctx->ctx_lock); if (-EIOCBRETRY == ret) {
if (NULL != iocb) /*
__aio_put_req(ctx, iocb); * OK, now that we are done with this iteration
* and know that there is more left to go,
* this is where we let go so that a subsequent
* "kick" can start the next iteration
*/
/* will make __queue_kicked_iocb succeed from here on */
INIT_LIST_HEAD(&iocb->ki_run_list);
/* we must queue the next iteration ourselves, if it
* has already been kicked */
if (kiocbIsKicked(iocb)) {
__queue_kicked_iocb(iocb);
}
} }
return ret;
}
/*
* __aio_run_iocbs:
* Process all pending retries queued on the ioctx
* run list.
* Assumes it is operating within the aio issuer's mm
* context. Expects to be called with ctx->ctx_lock held
*/
static void __aio_run_iocbs(struct kioctx *ctx)
{
struct kiocb *iocb;
int count = 0;
while (!list_empty(&ctx->run_list)) {
iocb = list_entry(ctx->run_list.next, struct kiocb,
ki_run_list);
list_del(&iocb->ki_run_list);
/*
* Hold an extra reference while retrying i/o.
*/
iocb->ki_users++; /* grab extra reference */
aio_run_iocb(iocb);
if (__aio_put_req(ctx, iocb)) /* drop extra ref */
put_ioctx(ctx);
count++;
}
aio_run++;
}
/*
* aio_run_iocbs:
* Process all pending retries queued on the ioctx
* run list.
* Assumes it is operating within the aio issuer's mm
* context.
*/
static inline void aio_run_iocbs(struct kioctx *ctx)
{
spin_lock_irq(&ctx->ctx_lock);
__aio_run_iocbs(ctx);
spin_unlock_irq(&ctx->ctx_lock);
}
/*
* aio_kick_handler:
* Work queue handler triggered to process pending
* retries on an ioctx. Takes on the aio issuer's
* mm context before running the iocbs, so that
* copy_xxx_user operates on the issuer's address
* space.
* Run on aiod's context.
*/
static void aio_kick_handler(void *data)
{
struct kioctx *ctx = data;
mm_segment_t oldfs = get_fs();
set_fs(USER_DS);
use_mm(ctx->mm);
spin_lock_irq(&ctx->ctx_lock);
__aio_run_iocbs(ctx);
unuse_mm(ctx->mm);
spin_unlock_irq(&ctx->ctx_lock); spin_unlock_irq(&ctx->ctx_lock);
set_fs(oldfs);
}
/*
* Called by kick_iocb to queue the kiocb for retry
* and if required activate the aio work queue to process
* it
*/
void queue_kicked_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
unsigned long flags;
int run = 0;
unuse_mm(ctx->mm); WARN_ON((!list_empty(&iocb->ki_wait.task_list)));
spin_lock_irqsave(&ctx->ctx_lock, flags);
run = __queue_kicked_iocb(iocb);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
if (run) {
queue_work(aio_wq, &ctx->wq);
aio_wakeups++;
}
} }
/*
* kick_iocb:
* Called typically from a wait queue callback context
* (aio_wake_function) to trigger a retry of the iocb.
* The retry is usually executed by aio workqueue
* threads (See aio_kick_handler).
*/
void fastcall kick_iocb(struct kiocb *iocb) void fastcall kick_iocb(struct kiocb *iocb)
{ {
struct kioctx *ctx = iocb->ki_ctx;
/* sync iocbs are easy: they can only ever be executing from a /* sync iocbs are easy: they can only ever be executing from a
* single context. */ * single context. */
if (is_sync_kiocb(iocb)) { if (is_sync_kiocb(iocb)) {
kiocbSetKicked(iocb); kiocbSetKicked(iocb);
wake_up_process(iocb->ki_obj.tsk); wake_up_process(iocb->ki_obj.tsk);
return; return;
} }
iocb->ki_kicked++;
/* If its already kicked we shouldn't queue it again */
if (!kiocbTryKick(iocb)) { if (!kiocbTryKick(iocb)) {
unsigned long flags; queue_kicked_iocb(iocb);
spin_lock_irqsave(&ctx->ctx_lock, flags);
list_add_tail(&iocb->ki_run_list, &ctx->run_list);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
queue_work(aio_wq, &ctx->wq);
} }
} }
EXPORT_SYMBOL(kick_iocb); EXPORT_SYMBOL(kick_iocb);
...@@ -675,6 +924,16 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2) ...@@ -675,6 +924,16 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2)
*/ */
spin_lock_irqsave(&ctx->ctx_lock, flags); spin_lock_irqsave(&ctx->ctx_lock, flags);
if (iocb->ki_run_list.prev && !list_empty(&iocb->ki_run_list))
list_del_init(&iocb->ki_run_list);
/*
* cancelled requests don't get events, userland was given one
* when the event got cancelled.
*/
if (kiocbIsCancelled(iocb))
goto put_rq;
ring = kmap_atomic(info->ring_pages[0], KM_IRQ1); ring = kmap_atomic(info->ring_pages[0], KM_IRQ1);
tail = info->tail; tail = info->tail;
...@@ -703,6 +962,11 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2) ...@@ -703,6 +962,11 @@ int fastcall aio_complete(struct kiocb *iocb, long res, long res2)
pr_debug("added to ring %p at [%lu]\n", iocb, tail); pr_debug("added to ring %p at [%lu]\n", iocb, tail);
pr_debug("%ld retries: %d of %d (kicked %ld, Q %ld run %ld wake %ld)\n",
iocb->ki_retried,
iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes,
iocb->ki_kicked, iocb->ki_queued, aio_run, aio_wakeups);
put_rq:
/* everything turned out well, dispose of the aiocb. */ /* everything turned out well, dispose of the aiocb. */
ret = __aio_put_req(ctx, iocb); ret = __aio_put_req(ctx, iocb);
...@@ -809,6 +1073,7 @@ static int read_events(struct kioctx *ctx, ...@@ -809,6 +1073,7 @@ static int read_events(struct kioctx *ctx,
int i = 0; int i = 0;
struct io_event ent; struct io_event ent;
struct aio_timeout to; struct aio_timeout to;
int event_loop = 0; /* testing only */
/* needed to zero any padding within an entry (there shouldn't be /* needed to zero any padding within an entry (there shouldn't be
* any, but C is fun! * any, but C is fun!
...@@ -858,7 +1123,6 @@ static int read_events(struct kioctx *ctx, ...@@ -858,7 +1123,6 @@ static int read_events(struct kioctx *ctx,
add_wait_queue_exclusive(&ctx->wait, &wait); add_wait_queue_exclusive(&ctx->wait, &wait);
do { do {
set_task_state(tsk, TASK_INTERRUPTIBLE); set_task_state(tsk, TASK_INTERRUPTIBLE);
ret = aio_read_evt(ctx, &ent); ret = aio_read_evt(ctx, &ent);
if (ret) if (ret)
break; break;
...@@ -868,6 +1132,7 @@ static int read_events(struct kioctx *ctx, ...@@ -868,6 +1132,7 @@ static int read_events(struct kioctx *ctx,
if (to.timed_out) /* Only check after read evt */ if (to.timed_out) /* Only check after read evt */
break; break;
schedule(); schedule();
event_loop++;
if (signal_pending(tsk)) { if (signal_pending(tsk)) {
ret = -EINTR; ret = -EINTR;
break; break;
...@@ -895,6 +1160,9 @@ static int read_events(struct kioctx *ctx, ...@@ -895,6 +1160,9 @@ static int read_events(struct kioctx *ctx,
if (timeout) if (timeout)
clear_timeout(&to); clear_timeout(&to);
out: out:
pr_debug("event loop executed %d times\n", event_loop);
pr_debug("aio_run %ld\n", aio_run);
pr_debug("aio_wakeups %ld\n", aio_wakeups);
return i ? i : ret; return i ? i : ret;
} }
...@@ -962,7 +1230,7 @@ asmlinkage long sys_io_setup(unsigned nr_events, aio_context_t __user *ctxp) ...@@ -962,7 +1230,7 @@ asmlinkage long sys_io_setup(unsigned nr_events, aio_context_t __user *ctxp)
ret = put_user(ioctx->user_id, ctxp); ret = put_user(ioctx->user_id, ctxp);
if (!ret) if (!ret)
return 0; return 0;
get_ioctx(ioctx);
io_destroy(ioctx); io_destroy(ioctx);
} }
...@@ -987,13 +1255,181 @@ asmlinkage long sys_io_destroy(aio_context_t ctx) ...@@ -987,13 +1255,181 @@ asmlinkage long sys_io_destroy(aio_context_t ctx)
return -EINVAL; return -EINVAL;
} }
/*
* Default retry method for aio_read (also used for first time submit)
* Responsible for updating iocb state as retries progress
*/
static ssize_t aio_pread(struct kiocb *iocb)
{
struct file *file = iocb->ki_filp;
struct address_space *mapping = file->f_mapping;
struct inode *inode = mapping->host;
ssize_t ret = 0;
ret = file->f_op->aio_read(iocb, iocb->ki_buf,
iocb->ki_left, iocb->ki_pos);
/*
* Can't just depend on iocb->ki_left to determine
* whether we are done. This may have been a short read.
*/
if (ret > 0) {
iocb->ki_buf += ret;
iocb->ki_left -= ret;
/*
* For pipes and sockets we return once we have
* some data; for regular files we retry till we
* complete the entire read or find that we can't
* read any more data (e.g short reads).
*/
if (!S_ISFIFO(inode->i_mode) && !S_ISSOCK(inode->i_mode))
ret = -EIOCBRETRY;
}
/* This means we must have transferred all that we could */
/* No need to retry anymore */
if ((ret == 0) || (iocb->ki_left == 0))
ret = iocb->ki_nbytes - iocb->ki_left;
return ret;
}
/*
* Default retry method for aio_write (also used for first time submit)
* Responsible for updating iocb state as retries progress
*/
static ssize_t aio_pwrite(struct kiocb *iocb)
{
struct file *file = iocb->ki_filp;
ssize_t ret = 0;
ret = file->f_op->aio_write(iocb, iocb->ki_buf,
iocb->ki_left, iocb->ki_pos);
if (ret > 0) {
iocb->ki_buf += ret;
iocb->ki_left -= ret;
ret = -EIOCBRETRY;
}
/* This means we must have transferred all that we could */
/* No need to retry anymore */
if ((ret == 0) || (iocb->ki_left == 0))
ret = iocb->ki_nbytes - iocb->ki_left;
return ret;
}
static ssize_t aio_fdsync(struct kiocb *iocb)
{
struct file *file = iocb->ki_filp;
ssize_t ret = -EINVAL;
if (file->f_op->aio_fsync)
ret = file->f_op->aio_fsync(iocb, 1);
return ret;
}
static ssize_t aio_fsync(struct kiocb *iocb)
{
struct file *file = iocb->ki_filp;
ssize_t ret = -EINVAL;
if (file->f_op->aio_fsync)
ret = file->f_op->aio_fsync(iocb, 0);
return ret;
}
/*
* aio_setup_iocb:
* Performs the initial checks and aio retry method
* setup for the kiocb at the time of io submission.
*/
ssize_t aio_setup_iocb(struct kiocb *kiocb)
{
struct file *file = kiocb->ki_filp;
ssize_t ret = 0;
switch (kiocb->ki_opcode) {
case IOCB_CMD_PREAD:
ret = -EBADF;
if (unlikely(!(file->f_mode & FMODE_READ)))
break;
ret = -EFAULT;
if (unlikely(!access_ok(VERIFY_WRITE, kiocb->ki_buf,
kiocb->ki_left)))
break;
ret = -EINVAL;
if (file->f_op->aio_read)
kiocb->ki_retry = aio_pread;
break;
case IOCB_CMD_PWRITE:
ret = -EBADF;
if (unlikely(!(file->f_mode & FMODE_WRITE)))
break;
ret = -EFAULT;
if (unlikely(!access_ok(VERIFY_READ, kiocb->ki_buf,
kiocb->ki_left)))
break;
ret = -EINVAL;
if (file->f_op->aio_write)
kiocb->ki_retry = aio_pwrite;
break;
case IOCB_CMD_FDSYNC:
ret = -EINVAL;
if (file->f_op->aio_fsync)
kiocb->ki_retry = aio_fdsync;
break;
case IOCB_CMD_FSYNC:
ret = -EINVAL;
if (file->f_op->aio_fsync)
kiocb->ki_retry = aio_fsync;
break;
default:
dprintk("EINVAL: io_submit: no operation provided\n");
ret = -EINVAL;
}
if (!kiocb->ki_retry)
return ret;
return 0;
}
/*
* aio_wake_function:
* wait queue callback function for aio notification,
* Simply triggers a retry of the operation via kick_iocb.
*
* This callback is specified in the wait queue entry in
* a kiocb (current->io_wait points to this wait queue
* entry when an aio operation executes; it is used
* instead of a synchronous wait when an i/o blocking
* condition is encountered during aio).
*
* Note:
* This routine is executed with the wait queue lock held.
* Since kick_iocb acquires iocb->ctx->ctx_lock, it nests
* the ioctx lock inside the wait queue lock. This is safe
* because this callback isn't used for wait queues which
* are nested inside ioctx lock (i.e. ctx->wait)
*/
int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
list_del_init(&wait->task_list);
kick_iocb(iocb);
return 1;
}
int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
struct iocb *iocb) struct iocb *iocb)
{ {
struct kiocb *req; struct kiocb *req;
struct file *file; struct file *file;
ssize_t ret; ssize_t ret;
char __user *buf;
/* enforce forwards compatibility on users */ /* enforce forwards compatibility on users */
if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 || if (unlikely(iocb->aio_reserved1 || iocb->aio_reserved2 ||
...@@ -1034,58 +1470,31 @@ int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, ...@@ -1034,58 +1470,31 @@ int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
req->ki_user_data = iocb->aio_data; req->ki_user_data = iocb->aio_data;
req->ki_pos = iocb->aio_offset; req->ki_pos = iocb->aio_offset;
buf = (char __user *)(unsigned long)iocb->aio_buf; req->ki_buf = (char *)(unsigned long)iocb->aio_buf;
req->ki_left = req->ki_nbytes = iocb->aio_nbytes;
req->ki_opcode = iocb->aio_lio_opcode;
init_waitqueue_func_entry(&req->ki_wait, aio_wake_function);
INIT_LIST_HEAD(&req->ki_wait.task_list);
req->ki_run_list.next = req->ki_run_list.prev = NULL;
req->ki_retry = NULL;
req->ki_retried = 0;
req->ki_kicked = 0;
req->ki_queued = 0;
aio_run = 0;
aio_wakeups = 0;
switch (iocb->aio_lio_opcode) { ret = aio_setup_iocb(req);
case IOCB_CMD_PREAD:
ret = -EBADF; if (ret)
if (unlikely(!(file->f_mode & FMODE_READ))) goto out_put_req;
goto out_put_req;
ret = -EFAULT;
if (unlikely(!access_ok(VERIFY_WRITE, buf, iocb->aio_nbytes)))
goto out_put_req;
ret = security_file_permission (file, MAY_READ);
if (ret)
goto out_put_req;
ret = -EINVAL;
if (file->f_op->aio_read)
ret = file->f_op->aio_read(req, buf,
iocb->aio_nbytes, req->ki_pos);
break;
case IOCB_CMD_PWRITE:
ret = -EBADF;
if (unlikely(!(file->f_mode & FMODE_WRITE)))
goto out_put_req;
ret = -EFAULT;
if (unlikely(!access_ok(VERIFY_READ, buf, iocb->aio_nbytes)))
goto out_put_req;
ret = security_file_permission (file, MAY_WRITE);
if (ret)
goto out_put_req;
ret = -EINVAL;
if (file->f_op->aio_write)
ret = file->f_op->aio_write(req, buf,
iocb->aio_nbytes, req->ki_pos);
break;
case IOCB_CMD_FDSYNC:
ret = -EINVAL;
if (file->f_op->aio_fsync)
ret = file->f_op->aio_fsync(req, 1);
break;
case IOCB_CMD_FSYNC:
ret = -EINVAL;
if (file->f_op->aio_fsync)
ret = file->f_op->aio_fsync(req, 0);
break;
default:
dprintk("EINVAL: io_submit: no operation provided\n");
ret = -EINVAL;
}
spin_lock_irq(&ctx->ctx_lock);
ret = aio_run_iocb(req);
spin_unlock_irq(&ctx->ctx_lock);
if (-EIOCBRETRY == ret)
queue_work(aio_wq, &ctx->wq);
aio_put_req(req); /* drop extra ref to req */ aio_put_req(req); /* drop extra ref to req */
if (likely(-EIOCBQUEUED == ret))
return 0;
aio_complete(req, ret, 0); /* will drop i/o ref to req */
return 0; return 0;
out_put_req: out_put_req:
...@@ -1201,6 +1610,7 @@ asmlinkage long sys_io_cancel(aio_context_t ctx_id, struct iocb __user *iocb, ...@@ -1201,6 +1610,7 @@ asmlinkage long sys_io_cancel(aio_context_t ctx_id, struct iocb __user *iocb,
if (kiocb && kiocb->ki_cancel) { if (kiocb && kiocb->ki_cancel) {
cancel = kiocb->ki_cancel; cancel = kiocb->ki_cancel;
kiocb->ki_users ++; kiocb->ki_users ++;
kiocbSetCancelled(kiocb);
} else } else
cancel = NULL; cancel = NULL;
spin_unlock_irq(&ctx->ctx_lock); spin_unlock_irq(&ctx->ctx_lock);
......
...@@ -52,7 +52,7 @@ struct kiocb { ...@@ -52,7 +52,7 @@ struct kiocb {
struct file *ki_filp; struct file *ki_filp;
struct kioctx *ki_ctx; /* may be NULL for sync ops */ struct kioctx *ki_ctx; /* may be NULL for sync ops */
int (*ki_cancel)(struct kiocb *, struct io_event *); int (*ki_cancel)(struct kiocb *, struct io_event *);
long (*ki_retry)(struct kiocb *); ssize_t (*ki_retry)(struct kiocb *);
void (*ki_dtor)(struct kiocb *); void (*ki_dtor)(struct kiocb *);
struct list_head ki_list; /* the aio core uses this struct list_head ki_list; /* the aio core uses this
...@@ -64,6 +64,16 @@ struct kiocb { ...@@ -64,6 +64,16 @@ struct kiocb {
} ki_obj; } ki_obj;
__u64 ki_user_data; /* user's data for completion */ __u64 ki_user_data; /* user's data for completion */
loff_t ki_pos; loff_t ki_pos;
/* State that we remember to be able to restart/retry */
unsigned short ki_opcode;
size_t ki_nbytes; /* copy of iocb->aio_nbytes */
char *ki_buf; /* remaining iocb->aio_buf */
size_t ki_left; /* remaining bytes */
wait_queue_t ki_wait;
long ki_retried; /* just for testing */
long ki_kicked; /* just for testing */
long ki_queued; /* just for testing */
void *private; void *private;
}; };
...@@ -79,6 +89,8 @@ struct kiocb { ...@@ -79,6 +89,8 @@ struct kiocb {
(x)->ki_cancel = NULL; \ (x)->ki_cancel = NULL; \
(x)->ki_dtor = NULL; \ (x)->ki_dtor = NULL; \
(x)->ki_obj.tsk = tsk; \ (x)->ki_obj.tsk = tsk; \
(x)->ki_user_data = 0; \
init_wait((&(x)->ki_wait)); \
} while (0) } while (0)
#define AIO_RING_MAGIC 0xa10a10a1 #define AIO_RING_MAGIC 0xa10a10a1
...@@ -161,6 +173,20 @@ int FASTCALL(io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb, ...@@ -161,6 +173,20 @@ int FASTCALL(io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
#define get_ioctx(kioctx) do { if (unlikely(atomic_read(&(kioctx)->users) <= 0)) BUG(); atomic_inc(&(kioctx)->users); } while (0) #define get_ioctx(kioctx) do { if (unlikely(atomic_read(&(kioctx)->users) <= 0)) BUG(); atomic_inc(&(kioctx)->users); } while (0)
#define put_ioctx(kioctx) do { if (unlikely(atomic_dec_and_test(&(kioctx)->users))) __put_ioctx(kioctx); else if (unlikely(atomic_read(&(kioctx)->users) < 0)) BUG(); } while (0) #define put_ioctx(kioctx) do { if (unlikely(atomic_dec_and_test(&(kioctx)->users))) __put_ioctx(kioctx); else if (unlikely(atomic_read(&(kioctx)->users) < 0)) BUG(); } while (0)
#define in_aio() !is_sync_wait(current->io_wait)
/* may be used for debugging */
#define warn_if_async() \
do { \
if (in_aio()) { \
printk(KERN_ERR "%s(%s:%d) called in async context!\n", \
__FUNCTION__, __FILE__, __LINE__); \
dump_stack(); \
} \
} while (0)
#define io_wait_to_kiocb(wait) container_of(wait, struct kiocb, ki_wait)
#define is_retried_kiocb(iocb) ((iocb)->ki_retried > 1)
#include <linux/aio_abi.h> #include <linux/aio_abi.h>
static inline struct kiocb *list_kiocb(struct list_head *h) static inline struct kiocb *list_kiocb(struct list_head *h)
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#define EBADTYPE 527 /* Type not supported by server */ #define EBADTYPE 527 /* Type not supported by server */
#define EJUKEBOX 528 /* Request initiated, but will not complete before timeout */ #define EJUKEBOX 528 /* Request initiated, but will not complete before timeout */
#define EIOCBQUEUED 529 /* iocb queued, will get completion event */ #define EIOCBQUEUED 529 /* iocb queued, will get completion event */
#define EIOCBRETRY 530 /* iocb queued, will trigger a retry */
#endif #endif
......
...@@ -561,7 +561,13 @@ struct task_struct { ...@@ -561,7 +561,13 @@ struct task_struct {
unsigned long ptrace_message; unsigned long ptrace_message;
siginfo_t *last_siginfo; /* For ptrace use. */ siginfo_t *last_siginfo; /* For ptrace use. */
/*
* current io wait handle: wait queue entry to use for io waits
* If this thread is processing aio, this points at the waitqueue
* inside the currently handled kiocb. It may be NULL (i.e. default
* to a stack based synchronous wait) if its doing sync IO.
*/
wait_queue_t *io_wait;
#ifdef CONFIG_NUMA #ifdef CONFIG_NUMA
struct mempolicy *mempolicy; struct mempolicy *mempolicy;
short il_next; /* could be shared with used_math */ short il_next; /* could be shared with used_math */
......
...@@ -80,6 +80,15 @@ static inline int waitqueue_active(wait_queue_head_t *q) ...@@ -80,6 +80,15 @@ static inline int waitqueue_active(wait_queue_head_t *q)
return !list_empty(&q->task_list); return !list_empty(&q->task_list);
} }
/*
* Used to distinguish between sync and async io wait context:
* sync i/o typically specifies a NULL wait queue entry or a wait
* queue entry bound to a task (current task) to wake up.
* aio specifies a wait queue entry with an async notification
* callback routine, not associated with any task.
*/
#define is_sync_wait(wait) (!(wait) || ((wait)->task))
extern void FASTCALL(add_wait_queue(wait_queue_head_t *q, wait_queue_t * wait)); extern void FASTCALL(add_wait_queue(wait_queue_head_t *q, wait_queue_t * wait));
extern void FASTCALL(add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t * wait)); extern void FASTCALL(add_wait_queue_exclusive(wait_queue_head_t *q, wait_queue_t * wait));
extern void FASTCALL(remove_wait_queue(wait_queue_head_t *q, wait_queue_t * wait)); extern void FASTCALL(remove_wait_queue(wait_queue_head_t *q, wait_queue_t * wait));
......
...@@ -152,7 +152,12 @@ void fastcall prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int stat ...@@ -152,7 +152,12 @@ void fastcall prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int stat
spin_lock_irqsave(&q->lock, flags); spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list)) if (list_empty(&wait->task_list))
__add_wait_queue(q, wait); __add_wait_queue(q, wait);
set_current_state(state); /*
* don't alter the task state if this is just going to
* queue an async wait queue callback
*/
if (is_sync_wait(wait))
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags); spin_unlock_irqrestore(&q->lock, flags);
} }
...@@ -167,7 +172,12 @@ prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state) ...@@ -167,7 +172,12 @@ prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)
spin_lock_irqsave(&q->lock, flags); spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list)) if (list_empty(&wait->task_list))
__add_wait_queue_tail(q, wait); __add_wait_queue_tail(q, wait);
set_current_state(state); /*
* don't alter the task state if this is just going to
* queue an async wait queue callback
*/
if (is_sync_wait(wait))
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags); spin_unlock_irqrestore(&q->lock, flags);
} }
...@@ -965,6 +975,7 @@ static task_t *copy_process(unsigned long clone_flags, ...@@ -965,6 +975,7 @@ static task_t *copy_process(unsigned long clone_flags,
p->start_time = get_jiffies_64(); p->start_time = get_jiffies_64();
p->security = NULL; p->security = NULL;
p->io_context = NULL; p->io_context = NULL;
p->io_wait = NULL;
p->audit_context = NULL; p->audit_context = NULL;
#ifdef CONFIG_NUMA #ifdef CONFIG_NUMA
p->mempolicy = mpol_copy(p->mempolicy); p->mempolicy = mpol_copy(p->mempolicy);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment