Commit 6b8268b1 authored by Jens Axboe's avatar Jens Axboe Committed by David Howells

SLOW_WORK: Add delayed_slow_work support

This adds support for starting slow work with a delay, similar
to the functionality we have for workqueues.
Signed-off-by: default avatarJens Axboe <jens.axboe@oracle.com>
Signed-off-by: default avatarDavid Howells <dhowells@redhat.com>
parent 01609502
...@@ -41,6 +41,13 @@ expand files, provided the time taken to do so isn't too long. ...@@ -41,6 +41,13 @@ expand files, provided the time taken to do so isn't too long.
Operations of both types may sleep during execution, thus tying up the thread Operations of both types may sleep during execution, thus tying up the thread
loaned to it. loaned to it.
A further class of work item is available, based on the slow work item class:
(*) Delayed slow work items.
These are slow work items that have a timer to defer queueing of the item for
a while.
THREAD-TO-CLASS ALLOCATION THREAD-TO-CLASS ALLOCATION
-------------------------- --------------------------
...@@ -93,6 +100,10 @@ Slow work items may then be set up by: ...@@ -93,6 +100,10 @@ Slow work items may then be set up by:
slow_work_init(&myitem, &myitem_ops); slow_work_init(&myitem, &myitem_ops);
or:
delayed_slow_work_init(&myitem, &myitem_ops);
or: or:
vslow_work_init(&myitem, &myitem_ops); vslow_work_init(&myitem, &myitem_ops);
...@@ -104,7 +115,9 @@ A suitably set up work item can then be enqueued for processing: ...@@ -104,7 +115,9 @@ A suitably set up work item can then be enqueued for processing:
int ret = slow_work_enqueue(&myitem); int ret = slow_work_enqueue(&myitem);
This will return a -ve error if the thread pool is unable to gain a reference This will return a -ve error if the thread pool is unable to gain a reference
on the item, 0 otherwise. on the item, 0 otherwise, or (for delayed work):
int ret = delayed_slow_work_enqueue(&myitem, my_jiffy_delay);
The items are reference counted, so there ought to be no need for a flush The items are reference counted, so there ought to be no need for a flush
...@@ -112,6 +125,7 @@ operation. But as the reference counting is optional, means to cancel ...@@ -112,6 +125,7 @@ operation. But as the reference counting is optional, means to cancel
existing work items are also included: existing work items are also included:
cancel_slow_work(&myitem); cancel_slow_work(&myitem);
cancel_delayed_slow_work(&myitem);
can be used to cancel pending work. The above cancel function waits for can be used to cancel pending work. The above cancel function waits for
existing work to have been executed (or prevent execution of them, depending existing work to have been executed (or prevent execution of them, depending
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#ifdef CONFIG_SLOW_WORK #ifdef CONFIG_SLOW_WORK
#include <linux/sysctl.h> #include <linux/sysctl.h>
#include <linux/timer.h>
struct slow_work; struct slow_work;
...@@ -52,10 +53,16 @@ struct slow_work { ...@@ -52,10 +53,16 @@ struct slow_work {
#define SLOW_WORK_ENQ_DEFERRED 2 /* item enqueue deferred */ #define SLOW_WORK_ENQ_DEFERRED 2 /* item enqueue deferred */
#define SLOW_WORK_VERY_SLOW 3 /* item is very slow */ #define SLOW_WORK_VERY_SLOW 3 /* item is very slow */
#define SLOW_WORK_CANCELLING 4 /* item is being cancelled, don't enqueue */ #define SLOW_WORK_CANCELLING 4 /* item is being cancelled, don't enqueue */
#define SLOW_WORK_DELAYED 5 /* item is struct delayed_slow_work with active timer */
const struct slow_work_ops *ops; /* operations table for this item */ const struct slow_work_ops *ops; /* operations table for this item */
struct list_head link; /* link in queue */ struct list_head link; /* link in queue */
}; };
struct delayed_slow_work {
struct slow_work work;
struct timer_list timer;
};
/** /**
* slow_work_init - Initialise a slow work item * slow_work_init - Initialise a slow work item
* @work: The work item to initialise * @work: The work item to initialise
...@@ -71,6 +78,20 @@ static inline void slow_work_init(struct slow_work *work, ...@@ -71,6 +78,20 @@ static inline void slow_work_init(struct slow_work *work,
INIT_LIST_HEAD(&work->link); INIT_LIST_HEAD(&work->link);
} }
/**
* slow_work_init - Initialise a delayed slow work item
* @work: The work item to initialise
* @ops: The operations to use to handle the slow work item
*
* Initialise a delayed slow work item.
*/
static inline void delayed_slow_work_init(struct delayed_slow_work *dwork,
const struct slow_work_ops *ops)
{
init_timer(&dwork->timer);
slow_work_init(&dwork->work, ops);
}
/** /**
* vslow_work_init - Initialise a very slow work item * vslow_work_init - Initialise a very slow work item
* @work: The work item to initialise * @work: The work item to initialise
...@@ -93,6 +114,14 @@ extern void slow_work_cancel(struct slow_work *work); ...@@ -93,6 +114,14 @@ extern void slow_work_cancel(struct slow_work *work);
extern int slow_work_register_user(struct module *owner); extern int slow_work_register_user(struct module *owner);
extern void slow_work_unregister_user(struct module *owner); extern void slow_work_unregister_user(struct module *owner);
extern int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
unsigned long delay);
static inline void delayed_slow_work_cancel(struct delayed_slow_work *dwork)
{
slow_work_cancel(&dwork->work);
}
#ifdef CONFIG_SYSCTL #ifdef CONFIG_SYSCTL
extern ctl_table slow_work_sysctls[]; extern ctl_table slow_work_sysctls[];
#endif #endif
......
...@@ -406,11 +406,40 @@ void slow_work_cancel(struct slow_work *work) ...@@ -406,11 +406,40 @@ void slow_work_cancel(struct slow_work *work)
bool wait = true, put = false; bool wait = true, put = false;
set_bit(SLOW_WORK_CANCELLING, &work->flags); set_bit(SLOW_WORK_CANCELLING, &work->flags);
smp_mb();
/* if the work item is a delayed work item with an active timer, we
* need to wait for the timer to finish _before_ getting the spinlock,
* lest we deadlock against the timer routine
*
* the timer routine will leave DELAYED set if it notices the
* CANCELLING flag in time
*/
if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
struct delayed_slow_work *dwork =
container_of(work, struct delayed_slow_work, work);
del_timer_sync(&dwork->timer);
}
spin_lock_irq(&slow_work_queue_lock); spin_lock_irq(&slow_work_queue_lock);
if (test_bit(SLOW_WORK_PENDING, &work->flags) && if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
!list_empty(&work->link)) { /* the timer routine aborted or never happened, so we are left
* holding the timer's reference on the item and should just
* drop the pending flag and wait for any ongoing execution to
* finish */
struct delayed_slow_work *dwork =
container_of(work, struct delayed_slow_work, work);
BUG_ON(timer_pending(&dwork->timer));
BUG_ON(!list_empty(&work->link));
clear_bit(SLOW_WORK_DELAYED, &work->flags);
put = true;
clear_bit(SLOW_WORK_PENDING, &work->flags);
} else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
!list_empty(&work->link)) {
/* the link in the pending queue holds a reference on the item /* the link in the pending queue holds a reference on the item
* that we will need to release */ * that we will need to release */
list_del_init(&work->link); list_del_init(&work->link);
...@@ -440,6 +469,102 @@ void slow_work_cancel(struct slow_work *work) ...@@ -440,6 +469,102 @@ void slow_work_cancel(struct slow_work *work)
} }
EXPORT_SYMBOL(slow_work_cancel); EXPORT_SYMBOL(slow_work_cancel);
/*
* Handle expiry of the delay timer, indicating that a delayed slow work item
* should now be queued if not cancelled
*/
static void delayed_slow_work_timer(unsigned long data)
{
struct slow_work *work = (struct slow_work *) data;
unsigned long flags;
bool queued = false, put = false;
spin_lock_irqsave(&slow_work_queue_lock, flags);
if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
clear_bit(SLOW_WORK_DELAYED, &work->flags);
if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
/* we discard the reference the timer was holding in
* favour of the one the executor holds */
set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
put = true;
} else {
if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
list_add_tail(&work->link, &vslow_work_queue);
else
list_add_tail(&work->link, &slow_work_queue);
queued = true;
}
}
spin_unlock_irqrestore(&slow_work_queue_lock, flags);
if (put)
slow_work_put_ref(work);
if (queued)
wake_up(&slow_work_thread_wq);
}
/**
* delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
* @dwork: The delayed work item to queue
* @delay: When to start executing the work, in jiffies from now
*
* This is similar to slow_work_enqueue(), but it adds a delay before the work
* is actually queued for processing.
*
* The item can have delayed processing requested on it whilst it is being
* executed. The delay will begin immediately, and if it expires before the
* item finishes executing, the item will be placed back on the queue when it
* has done executing.
*/
int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
unsigned long delay)
{
struct slow_work *work = &dwork->work;
unsigned long flags;
int ret;
if (delay == 0)
return slow_work_enqueue(&dwork->work);
BUG_ON(slow_work_user_count <= 0);
BUG_ON(!work);
BUG_ON(!work->ops);
if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
return -ECANCELED;
if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
spin_lock_irqsave(&slow_work_queue_lock, flags);
if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
goto cancelled;
/* the timer holds a reference whilst it is pending */
ret = work->ops->get_ref(work);
if (ret < 0)
goto cant_get_ref;
if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
BUG();
dwork->timer.expires = jiffies + delay;
dwork->timer.data = (unsigned long) work;
dwork->timer.function = delayed_slow_work_timer;
add_timer(&dwork->timer);
spin_unlock_irqrestore(&slow_work_queue_lock, flags);
}
return 0;
cancelled:
ret = -ECANCELED;
cant_get_ref:
spin_unlock_irqrestore(&slow_work_queue_lock, flags);
return ret;
}
EXPORT_SYMBOL(delayed_slow_work_enqueue);
/* /*
* Schedule a cull of the thread pool at some time in the near future * Schedule a cull of the thread pool at some time in the near future
*/ */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment