Commit 11d4616b authored by Linus Torvalds's avatar Linus Torvalds

futex: revert back to the explicit waiter counting code

Srikar Dronamraju reports that commit b0c29f79 ("futexes: Avoid
taking the hb->lock if there's nothing to wake up") causes java threads
getting stuck on futexes when runing specjbb on a power7 numa box.

The cause appears to be that the powerpc spinlocks aren't using the same
ticket lock model that we use on x86 (and other) architectures, which in
turn result in the "spin_is_locked()" test in hb_waiters_pending()
occasionally reporting an unlocked spinlock even when there are pending
waiters.

So this reinstates Davidlohr Bueso's original explicit waiter counting
code, which I had convinced Davidlohr to drop in favor of figuring out
the pending waiters by just using the existing state of the spinlock and
the wait queue.
Reported-and-tested-by: default avatarSrikar Dronamraju <srikar@linux.vnet.ibm.com>
Original-code-by: default avatarDavidlohr Bueso <davidlohr@hp.com>
Signed-off-by: default avatarLinus Torvalds <torvalds@linux-foundation.org>
parent 477cc484
...@@ -234,6 +234,7 @@ static const struct futex_q futex_q_init = { ...@@ -234,6 +234,7 @@ static const struct futex_q futex_q_init = {
* waiting on a futex. * waiting on a futex.
*/ */
struct futex_hash_bucket { struct futex_hash_bucket {
atomic_t waiters;
spinlock_t lock; spinlock_t lock;
struct plist_head chain; struct plist_head chain;
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
...@@ -253,22 +254,37 @@ static inline void futex_get_mm(union futex_key *key) ...@@ -253,22 +254,37 @@ static inline void futex_get_mm(union futex_key *key)
smp_mb__after_atomic_inc(); smp_mb__after_atomic_inc();
} }
static inline bool hb_waiters_pending(struct futex_hash_bucket *hb) /*
* Reflects a new waiter being added to the waitqueue.
*/
static inline void hb_waiters_inc(struct futex_hash_bucket *hb)
{ {
#ifdef CONFIG_SMP #ifdef CONFIG_SMP
atomic_inc(&hb->waiters);
/* /*
* Tasks trying to enter the critical region are most likely * Full barrier (A), see the ordering comment above.
* potential waiters that will be added to the plist. Ensure
* that wakers won't miss to-be-slept tasks in the window between
* the wait call and the actual plist_add.
*/ */
if (spin_is_locked(&hb->lock)) smp_mb__after_atomic_inc();
return true; #endif
smp_rmb(); /* Make sure we check the lock state first */ }
return !plist_head_empty(&hb->chain); /*
* Reflects a waiter being removed from the waitqueue by wakeup
* paths.
*/
static inline void hb_waiters_dec(struct futex_hash_bucket *hb)
{
#ifdef CONFIG_SMP
atomic_dec(&hb->waiters);
#endif
}
static inline int hb_waiters_pending(struct futex_hash_bucket *hb)
{
#ifdef CONFIG_SMP
return atomic_read(&hb->waiters);
#else #else
return true; return 1;
#endif #endif
} }
...@@ -954,6 +970,7 @@ static void __unqueue_futex(struct futex_q *q) ...@@ -954,6 +970,7 @@ static void __unqueue_futex(struct futex_q *q)
hb = container_of(q->lock_ptr, struct futex_hash_bucket, lock); hb = container_of(q->lock_ptr, struct futex_hash_bucket, lock);
plist_del(&q->list, &hb->chain); plist_del(&q->list, &hb->chain);
hb_waiters_dec(hb);
} }
/* /*
...@@ -1257,7 +1274,9 @@ void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1, ...@@ -1257,7 +1274,9 @@ void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1,
*/ */
if (likely(&hb1->chain != &hb2->chain)) { if (likely(&hb1->chain != &hb2->chain)) {
plist_del(&q->list, &hb1->chain); plist_del(&q->list, &hb1->chain);
hb_waiters_dec(hb1);
plist_add(&q->list, &hb2->chain); plist_add(&q->list, &hb2->chain);
hb_waiters_inc(hb2);
q->lock_ptr = &hb2->lock; q->lock_ptr = &hb2->lock;
} }
get_futex_key_refs(key2); get_futex_key_refs(key2);
...@@ -1600,6 +1619,17 @@ static inline struct futex_hash_bucket *queue_lock(struct futex_q *q) ...@@ -1600,6 +1619,17 @@ static inline struct futex_hash_bucket *queue_lock(struct futex_q *q)
struct futex_hash_bucket *hb; struct futex_hash_bucket *hb;
hb = hash_futex(&q->key); hb = hash_futex(&q->key);
/*
* Increment the counter before taking the lock so that
* a potential waker won't miss a to-be-slept task that is
* waiting for the spinlock. This is safe as all queue_lock()
* users end up calling queue_me(). Similarly, for housekeeping,
* decrement the counter at queue_unlock() when some error has
* occurred and we don't end up adding the task to the list.
*/
hb_waiters_inc(hb);
q->lock_ptr = &hb->lock; q->lock_ptr = &hb->lock;
spin_lock(&hb->lock); /* implies MB (A) */ spin_lock(&hb->lock); /* implies MB (A) */
...@@ -1611,6 +1641,7 @@ queue_unlock(struct futex_hash_bucket *hb) ...@@ -1611,6 +1641,7 @@ queue_unlock(struct futex_hash_bucket *hb)
__releases(&hb->lock) __releases(&hb->lock)
{ {
spin_unlock(&hb->lock); spin_unlock(&hb->lock);
hb_waiters_dec(hb);
} }
/** /**
...@@ -2342,6 +2373,7 @@ int handle_early_requeue_pi_wakeup(struct futex_hash_bucket *hb, ...@@ -2342,6 +2373,7 @@ int handle_early_requeue_pi_wakeup(struct futex_hash_bucket *hb,
* Unqueue the futex_q and determine which it was. * Unqueue the futex_q and determine which it was.
*/ */
plist_del(&q->list, &hb->chain); plist_del(&q->list, &hb->chain);
hb_waiters_dec(hb);
/* Handle spurious wakeups gracefully */ /* Handle spurious wakeups gracefully */
ret = -EWOULDBLOCK; ret = -EWOULDBLOCK;
...@@ -2875,6 +2907,7 @@ static int __init futex_init(void) ...@@ -2875,6 +2907,7 @@ static int __init futex_init(void)
futex_cmpxchg_enabled = 1; futex_cmpxchg_enabled = 1;
for (i = 0; i < futex_hashsize; i++) { for (i = 0; i < futex_hashsize; i++) {
atomic_set(&futex_queues[i].waiters, 0);
plist_head_init(&futex_queues[i].chain); plist_head_init(&futex_queues[i].chain);
spin_lock_init(&futex_queues[i].lock); spin_lock_init(&futex_queues[i].lock);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment