Commit 0f4897ae authored by Dmitriy Vyukov's avatar Dmitriy Vyukov

sync: improve race instrumentation of WaitGroup

Do not synchronize Add(1) with Wait().
Imitate read on first Add(1) and write on Wait(),
it allows to catch common misuses of WaitGroup:
- Add() called in the additional goroutine itself
- incorrect reuse of WaitGroup with multiple waiters

R=golang-dev, iant
CC=golang-dev
https://golang.org/cl/10093044
parent ed6dce6f
......@@ -102,22 +102,22 @@ func TestRaceWaitGroupWrongWait(t *testing.T) {
<-c
}
// A common WaitGroup misuse that can potentially be caught be the race detector.
// For this simple case we must emulate Add() as read on &wg and Wait() as write on &wg.
// However it will have false positives if there are several concurrent Wait() calls.
func TestRaceFailingWaitGroupWrongAdd(t *testing.T) {
func TestRaceWaitGroupWrongAdd(t *testing.T) {
c := make(chan bool, 2)
var wg sync.WaitGroup
go func() {
wg.Add(1)
time.Sleep(100 * time.Millisecond)
wg.Done()
c <- true
}()
go func() {
wg.Add(1)
time.Sleep(100 * time.Millisecond)
wg.Done()
c <- true
}()
time.Sleep(50 * time.Millisecond)
wg.Wait()
<-c
<-c
......@@ -158,6 +158,32 @@ func TestNoRaceWaitGroupMultipleWait2(t *testing.T) {
<-c
}
func TestNoRaceWaitGroupMultipleWait3(t *testing.T) {
const P = 3
var data [P]int
done := make(chan bool, P)
var wg sync.WaitGroup
wg.Add(P)
for p := 0; p < P; p++ {
go func(p int) {
data[p] = 42
wg.Done()
}(p)
}
for p := 0; p < P; p++ {
go func() {
wg.Wait()
for p1 := 0; p1 < P; p1++ {
_ = data[p1]
}
done <- true
}()
}
for p := 0; p < P; p++ {
<-done
}
}
// Correct usage but still a race
func TestRaceWaitGroup2(t *testing.T) {
var x int
......@@ -230,3 +256,97 @@ func TestNoRaceWaitGroupTransitive(t *testing.T) {
_ = x
_ = y
}
func TestNoRaceWaitGroupReuse(t *testing.T) {
const P = 3
var data [P]int
var wg sync.WaitGroup
for try := 0; try < 3; try++ {
wg.Add(P)
for p := 0; p < P; p++ {
go func(p int) {
data[p]++
wg.Done()
}(p)
}
wg.Wait()
for p := 0; p < P; p++ {
data[p]++
}
}
}
func TestNoRaceWaitGroupReuse2(t *testing.T) {
const P = 3
var data [P]int
var wg sync.WaitGroup
for try := 0; try < 3; try++ {
wg.Add(P)
for p := 0; p < P; p++ {
go func(p int) {
data[p]++
wg.Done()
}(p)
}
done := make(chan bool)
go func() {
wg.Wait()
for p := 0; p < P; p++ {
data[p]++
}
done <- true
}()
wg.Wait()
<-done
for p := 0; p < P; p++ {
data[p]++
}
}
}
func TestRaceWaitGroupReuse(t *testing.T) {
const P = 3
const T = 3
done := make(chan bool, T)
var wg sync.WaitGroup
for try := 0; try < T; try++ {
var data [P]int
wg.Add(P)
for p := 0; p < P; p++ {
go func(p int) {
time.Sleep(50 * time.Millisecond)
data[p]++
wg.Done()
}(p)
}
go func() {
wg.Wait()
for p := 0; p < P; p++ {
data[p]++
}
done <- true
}()
time.Sleep(100 * time.Millisecond)
wg.Wait()
}
for try := 0; try < T; try++ {
<-done
}
}
func TestNoRaceWaitGroupConcurrentAdd(t *testing.T) {
const P = 4
waiting := make(chan bool, P)
var wg sync.WaitGroup
for p := 0; p < P; p++ {
go func() {
wg.Add(1)
waiting <- true
wg.Done()
}()
}
for p := 0; p < P; p++ {
<-waiting
}
wg.Wait()
}
......@@ -32,3 +32,11 @@ func raceDisable() {
func raceEnable() {
runtime.RaceEnable()
}
func raceRead(addr unsafe.Pointer) {
runtime.RaceRead(addr)
}
func raceWrite(addr unsafe.Pointer) {
runtime.RaceWrite(addr)
}
......@@ -26,3 +26,9 @@ func raceDisable() {
func raceEnable() {
}
func raceRead(addr unsafe.Pointer) {
}
func raceWrite(addr unsafe.Pointer) {
}
......@@ -43,12 +43,23 @@ type WaitGroup struct {
// other event to be waited for. See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
if raceenabled {
_ = wg.m.state
raceReleaseMerge(unsafe.Pointer(wg))
_ = wg.m.state // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
raceReleaseMerge(unsafe.Pointer(wg))
}
raceDisable()
defer raceEnable()
}
v := atomic.AddInt32(&wg.counter, int32(delta))
if raceenabled {
if delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
raceRead(unsafe.Pointer(&wg.sema))
}
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
......@@ -72,7 +83,7 @@ func (wg *WaitGroup) Done() {
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
if raceenabled {
_ = wg.m.state
_ = wg.m.state // trigger nil deref early
raceDisable()
}
if atomic.LoadInt32(&wg.counter) == 0 {
......@@ -83,7 +94,14 @@ func (wg *WaitGroup) Wait() {
return
}
wg.m.Lock()
atomic.AddInt32(&wg.waiters, 1)
w := atomic.AddInt32(&wg.waiters, 1)
if raceenabled && w == 1 {
// Wait's must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As the consequence, can do the write only for the first waiter,
// otherwise concurrent Wait's will race with each other.
raceWrite(unsafe.Pointer(&wg.sema))
}
// This code is racing with the unlocked path in Add above.
// The code above modifies counter and then reads waiters.
// We must modify waiters and then read counter (the opposite order)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment