Commit 92b817f8 authored by Andrew Morton's avatar Andrew Morton Committed by Linus Torvalds

[PATCH] flush_work_queue() fixes

The workqueue code currently has a notion of a per-cpu queue being "busy".
flush_scheduled_work()'s responsibility is to wait for a queue to be not busy.

Problem is, flush_scheduled_work() can easily hang up.

- The workqueue is deemed "busy" when there are pending delayed
  (timer-based) works.  But if someone repeatedly schedules new delayed work
  in the callback, the queue will never fall idle, and flush_scheduled_work()
  will not terminate.

- If someone reschedules work (not delayed work) in the work function, that
  too will cause the queue to never go idle, and flush_scheduled_work() will
  not terminate.

So what this patch does is:

- Create a new "cancel_delayed_work()" which will try to kill off any
  timer-based delayed works.

- Change flush_scheduled_work() so that it is immune to people re-adding
  work in the work callout handler.

  We can do this by recognising that the caller does *not* want to wait
  until the workqueue is "empty".  The caller merely wants to wait until all
  works which were pending at the time flush_scheduled_work() was called have
  completed.

  The patch uses a couple of sequence numbers for that.

So now, if someone wants to reliably remove delayed work they should do:


	/*
	 * Make sure that my work-callback will no longer schedule new work
	 */
	my_driver_is_shutting_down = 1;

	/*
	 * Kill off any pending delayed work
	 */
	cancel_delayed_work(&my_work);

	/*
	 * OK, there will be no new works scheduled.  But there may be one
	 * currently queued or in progress.  So wait for that to complete.
	 */
	flush_scheduled_work();


The patch also changes the flush_workqueue() sleep to be uninterruptible.
We cannot legally bale out if a signal is delivered anyway.
parent ec7708a2
...@@ -63,5 +63,15 @@ extern int current_is_keventd(void); ...@@ -63,5 +63,15 @@ extern int current_is_keventd(void);
extern void init_workqueues(void); extern void init_workqueues(void);
/*
* Kill off a pending schedule_delayed_work(). Note that the work callback
* function may still be running on return from cancel_delayed_work(). Run
* flush_scheduled_work() to wait on it.
*/
static inline int cancel_delayed_work(struct work_struct *work)
{
return del_timer_sync(&work->timer);
}
#endif #endif
...@@ -27,13 +27,21 @@ ...@@ -27,13 +27,21 @@
#include <linux/slab.h> #include <linux/slab.h>
/* /*
* The per-CPU workqueue: * The per-CPU workqueue.
*
* The sequence counters are for flush_scheduled_work(). It wants to wait
* until until all currently-scheduled works are completed, but it doesn't
* want to be livelocked by new, incoming ones. So it waits until
* remove_sequence is >= the insert_sequence which pertained when
* flush_scheduled_work() was called.
*/ */
struct cpu_workqueue_struct { struct cpu_workqueue_struct {
spinlock_t lock; spinlock_t lock;
atomic_t nr_queued; long remove_sequence; /* Least-recently added (next to run) */
long insert_sequence; /* Next to add */
struct list_head worklist; struct list_head worklist;
wait_queue_head_t more_work; wait_queue_head_t more_work;
wait_queue_head_t work_done; wait_queue_head_t work_done;
...@@ -71,10 +79,9 @@ int queue_work(struct workqueue_struct *wq, struct work_struct *work) ...@@ -71,10 +79,9 @@ int queue_work(struct workqueue_struct *wq, struct work_struct *work)
spin_lock_irqsave(&cwq->lock, flags); spin_lock_irqsave(&cwq->lock, flags);
list_add_tail(&work->entry, &cwq->worklist); list_add_tail(&work->entry, &cwq->worklist);
atomic_inc(&cwq->nr_queued); cwq->insert_sequence++;
spin_unlock_irqrestore(&cwq->lock, flags);
wake_up(&cwq->more_work); wake_up(&cwq->more_work);
spin_unlock_irqrestore(&cwq->lock, flags);
ret = 1; ret = 1;
} }
put_cpu(); put_cpu();
...@@ -93,11 +100,13 @@ static void delayed_work_timer_fn(unsigned long __data) ...@@ -93,11 +100,13 @@ static void delayed_work_timer_fn(unsigned long __data)
*/ */
spin_lock_irqsave(&cwq->lock, flags); spin_lock_irqsave(&cwq->lock, flags);
list_add_tail(&work->entry, &cwq->worklist); list_add_tail(&work->entry, &cwq->worklist);
cwq->insert_sequence++;
wake_up(&cwq->more_work); wake_up(&cwq->more_work);
spin_unlock_irqrestore(&cwq->lock, flags); spin_unlock_irqrestore(&cwq->lock, flags);
} }
int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay) int queue_delayed_work(struct workqueue_struct *wq,
struct work_struct *work, unsigned long delay)
{ {
int ret = 0, cpu = get_cpu(); int ret = 0, cpu = get_cpu();
struct timer_list *timer = &work->timer; struct timer_list *timer = &work->timer;
...@@ -107,18 +116,11 @@ int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, un ...@@ -107,18 +116,11 @@ int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, un
BUG_ON(timer_pending(timer)); BUG_ON(timer_pending(timer));
BUG_ON(!list_empty(&work->entry)); BUG_ON(!list_empty(&work->entry));
/*
* Increase nr_queued so that the flush function
* knows that there's something pending.
*/
atomic_inc(&cwq->nr_queued);
work->wq_data = cwq; work->wq_data = cwq;
timer->expires = jiffies + delay; timer->expires = jiffies + delay;
timer->data = (unsigned long)work; timer->data = (unsigned long)work;
timer->function = delayed_work_timer_fn; timer->function = delayed_work_timer_fn;
add_timer(timer); add_timer(timer);
ret = 1; ret = 1;
} }
put_cpu(); put_cpu();
...@@ -135,7 +137,8 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -135,7 +137,8 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
*/ */
spin_lock_irqsave(&cwq->lock, flags); spin_lock_irqsave(&cwq->lock, flags);
while (!list_empty(&cwq->worklist)) { while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry); struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
void (*f) (void *) = work->func; void (*f) (void *) = work->func;
void *data = work->data; void *data = work->data;
...@@ -146,14 +149,9 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq) ...@@ -146,14 +149,9 @@ static inline void run_workqueue(struct cpu_workqueue_struct *cwq)
clear_bit(0, &work->pending); clear_bit(0, &work->pending);
f(data); f(data);
/*
* We only wake up 'work done' waiters (flush) when
* the last function has been fully processed.
*/
if (atomic_dec_and_test(&cwq->nr_queued))
wake_up(&cwq->work_done);
spin_lock_irqsave(&cwq->lock, flags); spin_lock_irqsave(&cwq->lock, flags);
cwq->remove_sequence++;
wake_up(&cwq->work_done);
} }
spin_unlock_irqrestore(&cwq->lock, flags); spin_unlock_irqrestore(&cwq->lock, flags);
} }
...@@ -223,37 +221,41 @@ static int worker_thread(void *__startup) ...@@ -223,37 +221,41 @@ static int worker_thread(void *__startup)
* Forces execution of the workqueue and blocks until its completion. * Forces execution of the workqueue and blocks until its completion.
* This is typically used in driver shutdown handlers. * This is typically used in driver shutdown handlers.
* *
* NOTE: if work is being added to the queue constantly by some other * This function will sample each workqueue's current insert_sequence number and
* context then this function might block indefinitely. * will sleep until the head sequence is greater than or equal to that. This
* means that we sleep until all works which were queued on entry have been
* handled, but we are not livelocked by new incoming ones.
*
* This function used to run the workqueues itself. Now we just wait for the
* helper threads to do it.
*/ */
void flush_workqueue(struct workqueue_struct *wq) void flush_workqueue(struct workqueue_struct *wq)
{ {
struct cpu_workqueue_struct *cwq; struct cpu_workqueue_struct *cwq;
int cpu; int cpu;
might_sleep();
for (cpu = 0; cpu < NR_CPUS; cpu++) { for (cpu = 0; cpu < NR_CPUS; cpu++) {
DEFINE_WAIT(wait);
long sequence_needed;
if (!cpu_online(cpu)) if (!cpu_online(cpu))
continue; continue;
cwq = wq->cpu_wq + cpu; cwq = wq->cpu_wq + cpu;
if (atomic_read(&cwq->nr_queued)) { spin_lock_irq(&cwq->lock);
DECLARE_WAITQUEUE(wait, current); sequence_needed = cwq->insert_sequence;
if (!list_empty(&cwq->worklist)) while (sequence_needed - cwq->remove_sequence > 0) {
run_workqueue(cwq); prepare_to_wait(&cwq->work_done, &wait,
TASK_UNINTERRUPTIBLE);
/* spin_unlock_irq(&cwq->lock);
* Wait for helper thread(s) to finish up
* the queue:
*/
set_task_state(current, TASK_INTERRUPTIBLE);
add_wait_queue(&cwq->work_done, &wait);
if (atomic_read(&cwq->nr_queued))
schedule(); schedule();
else spin_lock_irq(&cwq->lock);
set_task_state(current, TASK_RUNNING);
remove_wait_queue(&cwq->work_done, &wait);
} }
finish_wait(&cwq->work_done, &wait);
spin_unlock_irq(&cwq->lock);
} }
} }
...@@ -279,7 +281,8 @@ struct workqueue_struct *create_workqueue(const char *name) ...@@ -279,7 +281,8 @@ struct workqueue_struct *create_workqueue(const char *name)
spin_lock_init(&cwq->lock); spin_lock_init(&cwq->lock);
cwq->wq = wq; cwq->wq = wq;
cwq->thread = NULL; cwq->thread = NULL;
atomic_set(&cwq->nr_queued, 0); cwq->insert_sequence = 0;
cwq->remove_sequence = 0;
INIT_LIST_HEAD(&cwq->worklist); INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work); init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done); init_waitqueue_head(&cwq->work_done);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment