Commit 0556e262 authored by Dmitry Vyukov's avatar Dmitry Vyukov Committed by Russ Cox

sync: make Mutex more fair

Add new starvation mode for Mutex.
In starvation mode ownership is directly handed off from
unlocking goroutine to the next waiter. New arriving goroutines
don't compete for ownership.
Unfair wait time is now limited to 1ms.
Also fix a long standing bug that goroutines were requeued
at the tail of the wait queue. That lead to even more unfair
acquisition times with multiple waiters.

Performance of normal mode is not considerably affected.

Fixes #13086

On the provided in the issue lockskew program:

done in 1.207853ms
done in 1.177451ms
done in 1.184168ms
done in 1.198633ms
done in 1.185797ms
done in 1.182502ms
done in 1.316485ms
done in 1.211611ms
done in 1.182418ms

name                    old time/op  new time/op   delta
MutexUncontended-48     0.65ns ± 0%   0.65ns ± 1%     ~           (p=0.087 n=10+10)
Mutex-48                 112ns ± 1%    114ns ± 1%   +1.69%        (p=0.000 n=10+10)
MutexSlack-48            113ns ± 0%     87ns ± 1%  -22.65%         (p=0.000 n=8+10)
MutexWork-48             149ns ± 0%    145ns ± 0%   -2.48%         (p=0.000 n=9+10)
MutexWorkSlack-48        149ns ± 0%    122ns ± 3%  -18.26%         (p=0.000 n=6+10)
MutexNoSpin-48           103ns ± 4%    105ns ± 3%     ~           (p=0.089 n=10+10)
MutexSpin-48             490ns ± 4%    515ns ± 6%   +5.08%        (p=0.006 n=10+10)
Cond32-48               13.4µs ± 6%   13.1µs ± 5%   -2.75%        (p=0.023 n=10+10)
RWMutexWrite100-48      53.2ns ± 3%   41.2ns ± 3%  -22.57%        (p=0.000 n=10+10)
RWMutexWrite10-48       45.9ns ± 2%   43.9ns ± 2%   -4.38%        (p=0.000 n=10+10)
RWMutexWorkWrite100-48   122ns ± 2%    134ns ± 1%   +9.92%        (p=0.000 n=10+10)
RWMutexWorkWrite10-48    206ns ± 1%    188ns ± 1%   -8.52%         (p=0.000 n=8+10)
Cond32-24               12.1µs ± 3%   12.4µs ± 3%   +1.98%         (p=0.043 n=10+9)
MutexUncontended-24     0.74ns ± 1%   0.75ns ± 1%     ~           (p=0.650 n=10+10)
Mutex-24                 122ns ± 2%    124ns ± 1%   +1.31%        (p=0.007 n=10+10)
MutexSlack-24           96.9ns ± 2%  102.8ns ± 2%   +6.11%        (p=0.000 n=10+10)
MutexWork-24             146ns ± 1%    135ns ± 2%   -7.70%         (p=0.000 n=10+9)
MutexWorkSlack-24        135ns ± 1%    128ns ± 2%   -5.01%         (p=0.000 n=10+9)
MutexNoSpin-24           114ns ± 3%    110ns ± 4%   -3.84%        (p=0.000 n=10+10)
MutexSpin-24             482ns ± 4%    475ns ± 8%     ~           (p=0.286 n=10+10)
RWMutexWrite100-24      43.0ns ± 3%   43.1ns ± 2%     ~           (p=0.956 n=10+10)
RWMutexWrite10-24       43.4ns ± 1%   43.2ns ± 1%     ~            (p=0.085 n=10+9)
RWMutexWorkWrite100-24   130ns ± 3%    131ns ± 3%     ~           (p=0.747 n=10+10)
RWMutexWorkWrite10-24    191ns ± 1%    192ns ± 1%     ~           (p=0.210 n=10+10)
Cond32-12               11.5µs ± 2%   11.7µs ± 2%   +1.98%        (p=0.002 n=10+10)
MutexUncontended-12     1.48ns ± 0%   1.50ns ± 1%   +1.08%        (p=0.004 n=10+10)
Mutex-12                 141ns ± 1%    143ns ± 1%   +1.63%        (p=0.000 n=10+10)
MutexSlack-12            121ns ± 0%    119ns ± 0%   -1.65%          (p=0.001 n=8+9)
MutexWork-12             141ns ± 2%    150ns ± 3%   +6.36%         (p=0.000 n=9+10)
MutexWorkSlack-12        131ns ± 0%    138ns ± 0%   +5.73%         (p=0.000 n=9+10)
MutexNoSpin-12          87.0ns ± 1%   83.7ns ± 1%   -3.80%        (p=0.000 n=10+10)
MutexSpin-12             364ns ± 1%    377ns ± 1%   +3.77%        (p=0.000 n=10+10)
RWMutexWrite100-12      42.8ns ± 1%   43.9ns ± 1%   +2.41%         (p=0.000 n=8+10)
RWMutexWrite10-12       39.8ns ± 4%   39.3ns ± 1%     ~            (p=0.433 n=10+9)
RWMutexWorkWrite100-12   131ns ± 1%    131ns ± 0%     ~            (p=0.591 n=10+9)
RWMutexWorkWrite10-12    173ns ± 1%    174ns ± 0%     ~            (p=0.059 n=10+8)
Cond32-6                10.9µs ± 2%   10.9µs ± 2%     ~           (p=0.739 n=10+10)
MutexUncontended-6      2.97ns ± 0%   2.97ns ± 0%     ~     (all samples are equal)
Mutex-6                  122ns ± 6%    122ns ± 2%     ~           (p=0.668 n=10+10)
MutexSlack-6             149ns ± 3%    142ns ± 3%   -4.63%        (p=0.000 n=10+10)
MutexWork-6              136ns ± 3%    140ns ± 5%     ~           (p=0.077 n=10+10)
MutexWorkSlack-6         152ns ± 0%    138ns ± 2%   -9.21%         (p=0.000 n=6+10)
MutexNoSpin-6            150ns ± 1%    152ns ± 0%   +1.50%         (p=0.000 n=8+10)
MutexSpin-6              726ns ± 0%    730ns ± 1%     ~           (p=0.069 n=10+10)
RWMutexWrite100-6       40.6ns ± 1%   40.9ns ± 1%   +0.91%         (p=0.001 n=8+10)
RWMutexWrite10-6        37.1ns ± 0%   37.0ns ± 1%     ~            (p=0.386 n=9+10)
RWMutexWorkWrite100-6    133ns ± 1%    134ns ± 1%   +1.01%         (p=0.005 n=9+10)
RWMutexWorkWrite10-6     152ns ± 0%    152ns ± 0%     ~     (all samples are equal)
Cond32-2                7.86µs ± 2%   7.95µs ± 2%   +1.10%        (p=0.023 n=10+10)
MutexUncontended-2      8.10ns ± 0%   9.11ns ± 4%  +12.44%         (p=0.000 n=9+10)
Mutex-2                 32.9ns ± 9%   38.4ns ± 6%  +16.58%        (p=0.000 n=10+10)
MutexSlack-2            93.4ns ± 1%   98.5ns ± 2%   +5.39%         (p=0.000 n=10+9)
MutexWork-2             40.8ns ± 3%   43.8ns ± 7%   +7.38%         (p=0.000 n=10+9)
MutexWorkSlack-2        98.6ns ± 5%  108.2ns ± 2%   +9.80%         (p=0.000 n=10+8)
MutexNoSpin-2            399ns ± 1%    398ns ± 2%     ~             (p=0.463 n=8+9)
MutexSpin-2             1.99µs ± 3%   1.97µs ± 1%   -0.81%          (p=0.003 n=9+8)
RWMutexWrite100-2       37.6ns ± 5%   46.0ns ± 4%  +22.17%         (p=0.000 n=10+8)
RWMutexWrite10-2        50.1ns ± 6%   36.8ns ±12%  -26.46%         (p=0.000 n=9+10)
RWMutexWorkWrite100-2    136ns ± 0%    134ns ± 2%   -1.80%          (p=0.001 n=7+9)
RWMutexWorkWrite10-2     140ns ± 1%    138ns ± 1%   -1.50%        (p=0.000 n=10+10)
Cond32                  5.93µs ± 1%   5.91µs ± 0%     ~            (p=0.411 n=9+10)
MutexUncontended        15.9ns ± 0%   15.8ns ± 0%   -0.63%          (p=0.000 n=8+8)
Mutex                   15.9ns ± 0%   15.8ns ± 0%   -0.44%        (p=0.003 n=10+10)
MutexSlack              26.9ns ± 3%   26.7ns ± 2%     ~           (p=0.084 n=10+10)
MutexWork               47.8ns ± 0%   47.9ns ± 0%   +0.21%          (p=0.014 n=9+8)
MutexWorkSlack          54.9ns ± 3%   54.5ns ± 3%     ~           (p=0.254 n=10+10)
MutexNoSpin              786ns ± 2%    765ns ± 1%   -2.66%        (p=0.000 n=10+10)
MutexSpin               3.87µs ± 1%   3.83µs ± 0%   -0.85%          (p=0.005 n=9+8)
RWMutexWrite100         21.2ns ± 2%   21.0ns ± 1%   -0.88%         (p=0.018 n=10+9)
RWMutexWrite10          22.6ns ± 1%   22.6ns ± 0%     ~             (p=0.471 n=9+9)
RWMutexWorkWrite100      132ns ± 0%    132ns ± 0%     ~     (all samples are equal)
RWMutexWorkWrite10       124ns ± 0%    123ns ± 0%     ~           (p=0.656 n=10+10)

Change-Id: I66412a3a0980df1233ad7a5a0cd9723b4274528b
Reviewed-on: https://go-review.googlesource.com/34310
Run-TryBot: Russ Cox <rsc@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent 79f6a5c7
...@@ -953,7 +953,7 @@ func gcStart(mode gcMode, forceTrigger bool) { ...@@ -953,7 +953,7 @@ func gcStart(mode gcMode, forceTrigger bool) {
// another thread. // another thread.
useStartSema := mode == gcBackgroundMode useStartSema := mode == gcBackgroundMode
if useStartSema { if useStartSema {
semacquire(&work.startSema, 0) semacquire(&work.startSema)
// Re-check transition condition under transition lock. // Re-check transition condition under transition lock.
if !gcShouldStart(forceTrigger) { if !gcShouldStart(forceTrigger) {
semrelease(&work.startSema) semrelease(&work.startSema)
...@@ -977,7 +977,7 @@ func gcStart(mode gcMode, forceTrigger bool) { ...@@ -977,7 +977,7 @@ func gcStart(mode gcMode, forceTrigger bool) {
} }
// Ok, we're doing it! Stop everybody else // Ok, we're doing it! Stop everybody else
semacquire(&worldsema, 0) semacquire(&worldsema)
if trace.enabled { if trace.enabled {
traceGCStart() traceGCStart()
...@@ -1087,7 +1087,7 @@ func gcStart(mode gcMode, forceTrigger bool) { ...@@ -1087,7 +1087,7 @@ func gcStart(mode gcMode, forceTrigger bool) {
// by mark termination. // by mark termination.
func gcMarkDone() { func gcMarkDone() {
top: top:
semacquire(&work.markDoneSema, 0) semacquire(&work.markDoneSema)
// Re-check transition condition under transition lock. // Re-check transition condition under transition lock.
if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) { if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) {
......
...@@ -928,7 +928,7 @@ func restartg(gp *g) { ...@@ -928,7 +928,7 @@ func restartg(gp *g) {
// in panic or being exited, this may not reliably stop all // in panic or being exited, this may not reliably stop all
// goroutines. // goroutines.
func stopTheWorld(reason string) { func stopTheWorld(reason string) {
semacquire(&worldsema, 0) semacquire(&worldsema)
getg().m.preemptoff = reason getg().m.preemptoff = reason
systemstack(stopTheWorldWithSema) systemstack(stopTheWorldWithSema)
} }
......
...@@ -53,22 +53,22 @@ var semtable [semTabSize]struct { ...@@ -53,22 +53,22 @@ var semtable [semTabSize]struct {
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) { func sync_runtime_Semacquire(addr *uint32) {
semacquire(addr, semaBlockProfile) semacquire1(addr, false, semaBlockProfile)
} }
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) { func poll_runtime_Semacquire(addr *uint32) {
semacquire(addr, semaBlockProfile) semacquire1(addr, false, semaBlockProfile)
} }
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32) { func sync_runtime_Semrelease(addr *uint32, handoff bool) {
semrelease(addr) semrelease1(addr, handoff)
} }
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32) { func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
semacquire(addr, semaBlockProfile|semaMutexProfile) semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
} }
//go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
...@@ -91,7 +91,11 @@ const ( ...@@ -91,7 +91,11 @@ const (
) )
// Called from runtime. // Called from runtime.
func semacquire(addr *uint32, profile semaProfileFlags) { func semacquire(addr *uint32) {
semacquire1(addr, false, 0)
}
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
gp := getg() gp := getg()
if gp != gp.m.curg { if gp != gp.m.curg {
throw("semacquire not on the G stack") throw("semacquire not on the G stack")
...@@ -113,6 +117,7 @@ func semacquire(addr *uint32, profile semaProfileFlags) { ...@@ -113,6 +117,7 @@ func semacquire(addr *uint32, profile semaProfileFlags) {
t0 := int64(0) t0 := int64(0)
s.releasetime = 0 s.releasetime = 0
s.acquiretime = 0 s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 { if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks() t0 = cputicks()
s.releasetime = -1 s.releasetime = -1
...@@ -135,9 +140,9 @@ func semacquire(addr *uint32, profile semaProfileFlags) { ...@@ -135,9 +140,9 @@ func semacquire(addr *uint32, profile semaProfileFlags) {
} }
// Any semrelease after the cansemacquire knows we're waiting // Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep. // (we set nwait above), so go to sleep.
root.queue(addr, s) root.queue(addr, s, lifo)
goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4) goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
if cansemacquire(addr) { if s.ticket != 0 || cansemacquire(addr) {
break break
} }
} }
...@@ -148,6 +153,10 @@ func semacquire(addr *uint32, profile semaProfileFlags) { ...@@ -148,6 +153,10 @@ func semacquire(addr *uint32, profile semaProfileFlags) {
} }
func semrelease(addr *uint32) { func semrelease(addr *uint32) {
semrelease1(addr, false)
}
func semrelease1(addr *uint32, handoff bool) {
root := semroot(addr) root := semroot(addr)
atomic.Xadd(addr, 1) atomic.Xadd(addr, 1)
...@@ -173,6 +182,12 @@ func semrelease(addr *uint32) { ...@@ -173,6 +182,12 @@ func semrelease(addr *uint32) {
unlock(&root.lock) unlock(&root.lock)
if s != nil { // May be slow, so unlock first if s != nil { // May be slow, so unlock first
acquiretime := s.acquiretime acquiretime := s.acquiretime
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
readyWithTime(s, 5) readyWithTime(s, 5)
if acquiretime != 0 { if acquiretime != 0 {
mutexevent(t0-acquiretime, 3) mutexevent(t0-acquiretime, 3)
...@@ -197,7 +212,7 @@ func cansemacquire(addr *uint32) bool { ...@@ -197,7 +212,7 @@ func cansemacquire(addr *uint32) bool {
} }
// queue adds s to the blocked goroutines in semaRoot. // queue adds s to the blocked goroutines in semaRoot.
func (root *semaRoot) queue(addr *uint32, s *sudog) { func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
s.g = getg() s.g = getg()
s.elem = unsafe.Pointer(addr) s.elem = unsafe.Pointer(addr)
s.next = nil s.next = nil
...@@ -207,14 +222,41 @@ func (root *semaRoot) queue(addr *uint32, s *sudog) { ...@@ -207,14 +222,41 @@ func (root *semaRoot) queue(addr *uint32, s *sudog) {
pt := &root.treap pt := &root.treap
for t := *pt; t != nil; t = *pt { for t := *pt; t != nil; t = *pt {
if t.elem == unsafe.Pointer(addr) { if t.elem == unsafe.Pointer(addr) {
// Already have addr in list; add s to end of per-addr list. // Already have addr in list.
if t.waittail == nil { if lifo {
t.waitlink = s // Substitute s in t's place in treap.
*pt = s
s.ticket = t.ticket
s.acquiretime = t.acquiretime
s.parent = t.parent
s.prev = t.prev
s.next = t.next
if s.prev != nil {
s.prev.parent = s
}
if s.next != nil {
s.next.parent = s
}
// Add t first in s's wait list.
s.waitlink = t
s.waittail = t.waittail
if s.waittail == nil {
s.waittail = t
}
t.parent = nil
t.prev = nil
t.next = nil
t.waittail = nil
} else { } else {
t.waittail.waitlink = s // Add s to end of t's wait list.
if t.waittail == nil {
t.waitlink = s
} else {
t.waittail.waitlink = s
}
t.waittail = s
s.waitlink = nil
} }
t.waittail = s
s.waitlink = nil
return return
} }
last = t last = t
...@@ -319,6 +361,7 @@ Found: ...@@ -319,6 +361,7 @@ Found:
s.elem = nil s.elem = nil
s.next = nil s.next = nil
s.prev = nil s.prev = nil
s.ticket = 0
return s, now return s, now
} }
...@@ -561,3 +604,8 @@ func notifyListCheck(sz uintptr) { ...@@ -561,3 +604,8 @@ func notifyListCheck(sz uintptr) {
throw("bad notifyList size") throw("bad notifyList size")
} }
} }
//go:linkname sync_nanotime sync.runtime_nanotime
func sync_nanotime() int64 {
return nanotime()
}
...@@ -313,7 +313,7 @@ func StopTrace() { ...@@ -313,7 +313,7 @@ func StopTrace() {
// The world is started but we've set trace.shutdown, so new tracing can't start. // The world is started but we've set trace.shutdown, so new tracing can't start.
// Wait for the trace reader to flush pending buffers and stop. // Wait for the trace reader to flush pending buffers and stop.
semacquire(&trace.shutdownSema, 0) semacquire(&trace.shutdownSema)
if raceenabled { if raceenabled {
raceacquire(unsafe.Pointer(&trace.shutdownSema)) raceacquire(unsafe.Pointer(&trace.shutdownSema))
} }
......
...@@ -37,7 +37,34 @@ type Locker interface { ...@@ -37,7 +37,34 @@ type Locker interface {
const ( const (
mutexLocked = 1 << iota // mutex is locked mutexLocked = 1 << iota // mutex is locked
mutexWoken mutexWoken
mutexStarving
mutexWaiterShift = iota mutexWaiterShift = iota
// Mutex fairness.
//
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
starvationThresholdNs = 1e6
) )
// Lock locks m. // Lock locks m.
...@@ -52,41 +79,86 @@ func (m *Mutex) Lock() { ...@@ -52,41 +79,86 @@ func (m *Mutex) Lock() {
return return
} }
var waitStartTime int64
starving := false
awoke := false awoke := false
iter := 0 iter := 0
old := m.state
for { for {
old := m.state // Don't spin in starvation mode, ownership is handed off to waiters
new := old | mutexLocked // so we won't be able to acquire the mutex anyway.
if old&mutexLocked != 0 { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if runtime_canSpin(iter) { // Active spinning makes sense.
// Active spinning makes sense. // Try to set mutexWoken flag to inform Unlock
// Try to set mutexWoken flag to inform Unlock // to not wake other blocked goroutines.
// to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true
awoke = true
}
runtime_doSpin()
iter++
continue
} }
new = old + 1<<mutexWaiterShift runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
} }
if awoke { if awoke {
// The goroutine has been woken from sleep, // The goroutine has been woken from sleep,
// so we need to reset the flag in either case. // so we need to reset the flag in either case.
if new&mutexWoken == 0 { if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state") panic("sync: inconsistent mutex state")
} }
new &^= mutexWoken new &^= mutexWoken
} }
if atomic.CompareAndSwapInt32(&m.state, old, new) { if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&mutexLocked == 0 { if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
panic("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break break
} }
runtime_SemacquireMutex(&m.sema)
awoke = true awoke = true
iter = 0 iter = 0
} else {
old = m.state
} }
} }
...@@ -110,22 +182,33 @@ func (m *Mutex) Unlock() { ...@@ -110,22 +182,33 @@ func (m *Mutex) Unlock() {
// Fast path: drop lock bit. // Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked) new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 { if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex") panic("sync: unlock of unlocked mutex")
} }
if new&mutexStarving == 0 {
old := new old := new
for { for {
// If there are no waiters or a goroutine has already // If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone. // been woken or grabbed the lock, no need to wake anyone.
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // In starvation mode ownership is directly handed off from unlocking
return // goroutine to the next waiter. We are not part of this chain,
} // since we did not observe mutexStarving when we unlocked the mutex above.
// Grab the right to wake someone. // So get off the way.
new = (old - 1<<mutexWaiterShift) | mutexWoken if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
if atomic.CompareAndSwapInt32(&m.state, old, new) { return
runtime_Semrelease(&m.sema) }
return // Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
} }
old = m.state } else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true)
} }
} }
...@@ -15,12 +15,13 @@ import ( ...@@ -15,12 +15,13 @@ import (
"strings" "strings"
. "sync" . "sync"
"testing" "testing"
"time"
) )
func HammerSemaphore(s *uint32, loops int, cdone chan bool) { func HammerSemaphore(s *uint32, loops int, cdone chan bool) {
for i := 0; i < loops; i++ { for i := 0; i < loops; i++ {
Runtime_Semacquire(s) Runtime_Semacquire(s)
Runtime_Semrelease(s) Runtime_Semrelease(s, false)
} }
cdone <- true cdone <- true
} }
...@@ -174,6 +175,38 @@ func TestMutexMisuse(t *testing.T) { ...@@ -174,6 +175,38 @@ func TestMutexMisuse(t *testing.T) {
} }
} }
func TestMutexFairness(t *testing.T) {
var mu Mutex
stop := make(chan bool)
defer close(stop)
go func() {
for {
mu.Lock()
time.Sleep(100 * time.Microsecond)
mu.Unlock()
select {
case <-stop:
return
default:
}
}
}()
done := make(chan bool)
go func() {
for i := 0; i < 10; i++ {
time.Sleep(100 * time.Microsecond)
mu.Lock()
mu.Unlock()
}
done <- true
}()
select {
case <-done:
case <-time.After(10 * time.Second):
t.Fatalf("can't acquire Mutex in 10 seconds")
}
}
func BenchmarkMutexUncontended(b *testing.B) { func BenchmarkMutexUncontended(b *testing.B) {
type PaddedMutex struct { type PaddedMutex struct {
Mutex Mutex
......
...@@ -14,13 +14,15 @@ import "unsafe" ...@@ -14,13 +14,15 @@ import "unsafe"
func runtime_Semacquire(s *uint32) func runtime_Semacquire(s *uint32)
// SemacquireMutex is like Semacquire, but for profiling contended Mutexes. // SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
func runtime_SemacquireMutex(*uint32) // If lifo is true, queue waiter at the head of wait queue.
func runtime_SemacquireMutex(s *uint32, lifo bool)
// Semrelease atomically increments *s and notifies a waiting goroutine // Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire. // if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization // It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly. // library and should not be used directly.
func runtime_Semrelease(s *uint32) // If handoff is true, pass count directly to the first waiter.
func runtime_Semrelease(s *uint32, handoff bool)
// Approximation of notifyList in runtime/sema.go. Size and alignment must // Approximation of notifyList in runtime/sema.go. Size and alignment must
// agree. // agree.
...@@ -57,3 +59,5 @@ func runtime_canSpin(i int) bool ...@@ -57,3 +59,5 @@ func runtime_canSpin(i int) bool
// runtime_doSpin does active spinning. // runtime_doSpin does active spinning.
func runtime_doSpin() func runtime_doSpin()
func runtime_nanotime() int64
...@@ -18,7 +18,7 @@ func BenchmarkSemaUncontended(b *testing.B) { ...@@ -18,7 +18,7 @@ func BenchmarkSemaUncontended(b *testing.B) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
sem := new(PaddedSem) sem := new(PaddedSem)
for pb.Next() { for pb.Next() {
Runtime_Semrelease(&sem.sem) Runtime_Semrelease(&sem.sem, false)
Runtime_Semacquire(&sem.sem) Runtime_Semacquire(&sem.sem)
} }
}) })
...@@ -44,7 +44,7 @@ func benchmarkSema(b *testing.B, block, work bool) { ...@@ -44,7 +44,7 @@ func benchmarkSema(b *testing.B, block, work bool) {
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
foo := 0 foo := 0
for pb.Next() { for pb.Next() {
Runtime_Semrelease(&sem) Runtime_Semrelease(&sem, false)
if work { if work {
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
foo *= 2 foo *= 2
...@@ -54,7 +54,7 @@ func benchmarkSema(b *testing.B, block, work bool) { ...@@ -54,7 +54,7 @@ func benchmarkSema(b *testing.B, block, work bool) {
Runtime_Semacquire(&sem) Runtime_Semacquire(&sem)
} }
_ = foo _ = foo
Runtime_Semrelease(&sem) Runtime_Semrelease(&sem, false)
}) })
} }
......
...@@ -66,7 +66,7 @@ func (rw *RWMutex) RUnlock() { ...@@ -66,7 +66,7 @@ func (rw *RWMutex) RUnlock() {
// A writer is pending. // A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 { if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer. // The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem) runtime_Semrelease(&rw.writerSem, false)
} }
} }
if race.Enabled { if race.Enabled {
...@@ -119,7 +119,7 @@ func (rw *RWMutex) Unlock() { ...@@ -119,7 +119,7 @@ func (rw *RWMutex) Unlock() {
} }
// Unblock blocked readers, if any. // Unblock blocked readers, if any.
for i := 0; i < int(r); i++ { for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem) runtime_Semrelease(&rw.readerSem, false)
} }
// Allow other writers to proceed. // Allow other writers to proceed.
rw.w.Unlock() rw.w.Unlock()
......
...@@ -91,7 +91,7 @@ func (wg *WaitGroup) Add(delta int) { ...@@ -91,7 +91,7 @@ func (wg *WaitGroup) Add(delta int) {
// Reset waiters count to 0. // Reset waiters count to 0.
*statep = 0 *statep = 0
for ; w != 0; w-- { for ; w != 0; w-- {
runtime_Semrelease(&wg.sema) runtime_Semrelease(&wg.sema, false)
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment