Commit 80832974 authored by Austin Clements's avatar Austin Clements

runtime: make rwmutex work on Ms instead of Gs

Currently runtime.rwmutex is written to block the calling goroutine
rather than the calling thread. However, rwmutex was intended to be
used in the scheduler, which means it needs to be a thread-level
synchronization primitive.

Hence, this modifies rwmutex to synchronize threads instead of
goroutines. This has the consequence of making it write-barrier-free,
which is also important for using it in the scheduler.

The implementation makes three changes: it replaces the "w" semaphore
with a mutex, since this was all it was being used for anyway; it
replaces "writerSem" with a single pending M that parks on its note;
and it replaces "readerSem" with a list of Ms that park on their notes
plus a pass count that together emulate a counting semaphore. I
model-checked the safety and liveness of this implementation through
>1 billion schedules.

For #20738.

Change-Id: I3cf5a18c266a96a3f38165083812803510217787
Reviewed-on: https://go-review.googlesource.com/47071
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarIan Lance Taylor <iant@golang.org>
parent 3ea53cb0
...@@ -105,7 +105,7 @@ The simplest is `mutex`, which is manipulated using `lock` and ...@@ -105,7 +105,7 @@ The simplest is `mutex`, which is manipulated using `lock` and
periods. Blocking on a `mutex` directly blocks the M, without periods. Blocking on a `mutex` directly blocks the M, without
interacting with the Go scheduler. This means it is safe to use from interacting with the Go scheduler. This means it is safe to use from
the lowest levels of the runtime, but also prevents any associated G the lowest levels of the runtime, but also prevents any associated G
and P from being rescheduled. and P from being rescheduled. `rwmutex` is similar.
For one-shot notifications, use `note`, which provides `notesleep` and For one-shot notifications, use `note`, which provides `notesleep` and
`notewakeup`. Unlike traditional UNIX `sleep`/`wakeup`, `note`s are `notewakeup`. Unlike traditional UNIX `sleep`/`wakeup`, `note`s are
...@@ -130,7 +130,7 @@ In summary, ...@@ -130,7 +130,7 @@ In summary,
<table> <table>
<tr><th></th><th colspan="3">Blocks</th></tr> <tr><th></th><th colspan="3">Blocks</th></tr>
<tr><th>Interface</th><th>G</th><th>M</th><th>P</th></tr> <tr><th>Interface</th><th>G</th><th>M</th><th>P</th></tr>
<tr><td>mutex</td><td>Y</td><td>Y</td><td>Y</td></tr> <tr><td>(rw)mutex</td><td>Y</td><td>Y</td><td>Y</td></tr>
<tr><td>note</td><td>Y</td><td>Y</td><td>Y/N</td></tr> <tr><td>note</td><td>Y</td><td>Y</td><td>Y/N</td></tr>
<tr><td>park</td><td>Y</td><td>N</td><td>N</td></tr> <tr><td>park</td><td>Y</td><td>N</td><td>N</td></tr>
</table> </table>
......
...@@ -354,10 +354,6 @@ type RWMutex struct { ...@@ -354,10 +354,6 @@ type RWMutex struct {
rw rwmutex rw rwmutex
} }
func (rw *RWMutex) Init() {
rw.rw.init()
}
func (rw *RWMutex) RLock() { func (rw *RWMutex) RLock() {
rw.rw.rlock() rw.rw.rlock()
} }
......
...@@ -13,28 +13,43 @@ import ( ...@@ -13,28 +13,43 @@ import (
// An rwmutex is a reader/writer mutual exclusion lock. // An rwmutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer. // The lock can be held by an arbitrary number of readers or a single writer.
// This is a variant of sync.RWMutex, for the runtime package. // This is a variant of sync.RWMutex, for the runtime package.
// This is less convenient than sync.RWMutex, because it must be // Like mutex, rwmutex blocks the calling M.
// initialized before use. Sorry. // It does not interact with the goroutine scheduler.
type rwmutex struct { type rwmutex struct {
w uint32 // semaphore for pending writers rLock mutex // protects readers, readerPass, writer
writerSem uint32 // semaphore for writers to wait for completing readers readers muintptr // list of pending readers
readerSem uint32 // semaphore for readers to wait for completing writers readerPass uint32 // number of pending readers to skip readers list
wLock mutex // serializes writers
writer muintptr // pending writer waiting for completing readers
readerCount uint32 // number of pending readers readerCount uint32 // number of pending readers
readerWait uint32 // number of departing readers readerWait uint32 // number of departing readers
} }
const rwmutexMaxReaders = 1 << 30 const rwmutexMaxReaders = 1 << 30
// init initializes rw. This must be called before any other methods.
func (rw *rwmutex) init() {
rw.w = 1
}
// rlock locks rw for reading. // rlock locks rw for reading.
func (rw *rwmutex) rlock() { func (rw *rwmutex) rlock() {
if int32(atomic.Xadd(&rw.readerCount, 1)) < 0 { if int32(atomic.Xadd(&rw.readerCount, 1)) < 0 {
// A writer is pending. // A writer is pending. Park on the reader queue.
semacquire(&rw.readerSem) systemstack(func() {
lock(&rw.rLock)
if rw.readerPass > 0 {
// Writer finished.
rw.readerPass -= 1
unlock(&rw.rLock)
} else {
// Queue this reader to be woken by
// the writer.
m := getg().m
m.schedlink = rw.readers
rw.readers.set(m)
unlock(&rw.rLock)
notesleep(&m.park)
noteclear(&m.park)
}
})
} }
} }
...@@ -47,7 +62,12 @@ func (rw *rwmutex) runlock() { ...@@ -47,7 +62,12 @@ func (rw *rwmutex) runlock() {
// A writer is pending. // A writer is pending.
if atomic.Xadd(&rw.readerWait, -1) == 0 { if atomic.Xadd(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer. // The last reader unblocks the writer.
semrelease(&rw.writerSem) lock(&rw.rLock)
w := rw.writer.ptr()
if w != nil {
notewakeup(&w.park)
}
unlock(&rw.rLock)
} }
} }
} }
...@@ -55,12 +75,22 @@ func (rw *rwmutex) runlock() { ...@@ -55,12 +75,22 @@ func (rw *rwmutex) runlock() {
// lock locks rw for writing. // lock locks rw for writing.
func (rw *rwmutex) lock() { func (rw *rwmutex) lock() {
// Resolve competition with other writers. // Resolve competition with other writers.
semacquire(&rw.w) lock(&rw.wLock)
m := getg().m
// Announce that there is a pending writer. // Announce that there is a pending writer.
r := int32(atomic.Xadd(&rw.readerCount, -rwmutexMaxReaders)) + rwmutexMaxReaders r := int32(atomic.Xadd(&rw.readerCount, -rwmutexMaxReaders)) + rwmutexMaxReaders
// Wait for any active readers to complete. // Wait for any active readers to complete.
lock(&rw.rLock)
if r != 0 && atomic.Xadd(&rw.readerWait, r) != 0 { if r != 0 && atomic.Xadd(&rw.readerWait, r) != 0 {
semacquire(&rw.writerSem) // Wait for reader to wake us up.
systemstack(func() {
rw.writer.set(m)
unlock(&rw.rLock)
notesleep(&m.park)
noteclear(&m.park)
})
} else {
unlock(&rw.rLock)
} }
} }
...@@ -71,10 +101,19 @@ func (rw *rwmutex) unlock() { ...@@ -71,10 +101,19 @@ func (rw *rwmutex) unlock() {
if r >= rwmutexMaxReaders { if r >= rwmutexMaxReaders {
throw("unlock of unlocked rwmutex") throw("unlock of unlocked rwmutex")
} }
// Unblock blocked readers, if any. // Unblock blocked readers.
for i := int32(0); i < r; i++ { lock(&rw.rLock)
semrelease(&rw.readerSem) for rw.readers.ptr() != nil {
reader := rw.readers.ptr()
rw.readers = reader.schedlink
reader.schedlink.set(nil)
notewakeup(&reader.park)
r -= 1
} }
// If r > 0, there are pending readers that aren't on the
// queue. Tell them to skip waiting.
rw.readerPass += uint32(r)
unlock(&rw.rLock)
// Allow other writers to proceed. // Allow other writers to proceed.
semrelease(&rw.w) unlock(&rw.wLock)
} }
...@@ -27,7 +27,6 @@ func parallelReader(m *RWMutex, clocked, cunlock, cdone chan bool) { ...@@ -27,7 +27,6 @@ func parallelReader(m *RWMutex, clocked, cunlock, cdone chan bool) {
func doTestParallelReaders(numReaders, gomaxprocs int) { func doTestParallelReaders(numReaders, gomaxprocs int) {
GOMAXPROCS(gomaxprocs) GOMAXPROCS(gomaxprocs)
var m RWMutex var m RWMutex
m.Init()
clocked := make(chan bool) clocked := make(chan bool)
cunlock := make(chan bool) cunlock := make(chan bool)
cdone := make(chan bool) cdone := make(chan bool)
...@@ -89,7 +88,6 @@ func HammerRWMutex(gomaxprocs, numReaders, num_iterations int) { ...@@ -89,7 +88,6 @@ func HammerRWMutex(gomaxprocs, numReaders, num_iterations int) {
// Number of active readers + 10000 * number of active writers. // Number of active readers + 10000 * number of active writers.
var activity int32 var activity int32
var rwm RWMutex var rwm RWMutex
rwm.Init()
cdone := make(chan bool) cdone := make(chan bool)
go writer(&rwm, num_iterations, &activity, cdone) go writer(&rwm, num_iterations, &activity, cdone)
var i int var i int
...@@ -131,7 +129,6 @@ func BenchmarkRWMutexUncontended(b *testing.B) { ...@@ -131,7 +129,6 @@ func BenchmarkRWMutexUncontended(b *testing.B) {
} }
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
var rwm PaddedRWMutex var rwm PaddedRWMutex
rwm.RWMutex.Init()
for pb.Next() { for pb.Next() {
rwm.RLock() rwm.RLock()
rwm.RLock() rwm.RLock()
...@@ -145,7 +142,6 @@ func BenchmarkRWMutexUncontended(b *testing.B) { ...@@ -145,7 +142,6 @@ func BenchmarkRWMutexUncontended(b *testing.B) {
func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) {
var rwm RWMutex var rwm RWMutex
rwm.Init()
b.RunParallel(func(pb *testing.PB) { b.RunParallel(func(pb *testing.PB) {
foo := 0 foo := 0
for pb.Next() { for pb.Next() {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment