Commit ca922b6d authored by Peter Weinberger's avatar Peter Weinberger

runtime: Profile goroutines holding contended mutexes.

runtime.SetMutexProfileFraction(n int) will capture 1/n-th of stack
traces of goroutines holding contended mutexes if n > 0. From runtime/pprof,
pprot.Lookup("mutex").WriteTo writes the accumulated
stack traces to w (in essentially the same format that blocking
profiling uses).

Change-Id: Ie0b54fa4226853d99aa42c14cb529ae586a8335a
Reviewed-on: https://go-review.googlesource.com/29650Reviewed-by: default avatarAustin Clements <austin@google.com>
parent b679665a
......@@ -238,6 +238,15 @@ profile the tests during execution::
To profile all memory allocations, use -test.memprofilerate=1
and pass --alloc_space flag to the pprof tool.
-mutexprofile mutex.out
Write a mutex contention profile to the specified file
when all tests are complete.
Writes test binary as -c would.
-mutexprofilefraction n
Sample 1 in n stack traces of goroutines holding a
contended mutex.
-outputdir directory
Place output files from profiling in the specified directory,
by default the directory in which "go test" is running.
......
......@@ -50,6 +50,8 @@ var testFlagDefn = []*testFlagSpec{
{name: "memprofilerate", passToTest: true},
{name: "blockprofile", passToTest: true},
{name: "blockprofilerate", passToTest: true},
{name: "mutexprofile", passToTest: true},
{name: "mutexprofilefraction", passToTest: true},
{name: "outputdir", passToTest: true},
{name: "parallel", passToTest: true},
{name: "run", passToTest: true},
......@@ -152,7 +154,7 @@ func testFlags(args []string) (packageNames, passToTest []string) {
case "blockprofile", "cpuprofile", "memprofile":
testProfile = true
testNeedBinary = true
case "trace":
case "mutexprofile", "trace":
testProfile = true
case "coverpkg":
testCover = true
......
......@@ -679,20 +679,32 @@ func scaleHeapSample(count, size, rate int64) (int64, int64) {
return int64(float64(count) * scale), int64(float64(size) * scale)
}
// parseContention parses a contentionz profile and returns a newly
// populated Profile.
func parseContention(b []byte) (p *Profile, err error) {
// parseContention parses a mutex or contention profile. There are 2 cases:
// "--- contentionz " for legacy C++ profiles (and backwards compatibility)
// "--- mutex:" or "--- contention:" for profiles generated by the Go runtime.
// This code converts the text output from runtime into a *Profile. (In the future
// the runtime might write a serialized Profile directly making this unnecessary.)
func parseContention(b []byte) (*Profile, error) {
r := bytes.NewBuffer(b)
l, err := r.ReadString('\n')
if err != nil {
return nil, errUnrecognized
}
if !strings.HasPrefix(l, "--- contention") {
return nil, errUnrecognized
if strings.HasPrefix(l, "--- contentionz ") {
return parseCppContention(r)
} else if strings.HasPrefix(l, "--- mutex:") {
return parseCppContention(r)
} else if strings.HasPrefix(l, "--- contention:") {
return parseCppContention(r)
}
return nil, errUnrecognized
}
p = &Profile{
// parseCppContention parses the output from synchronization_profiling.cc
// for backward compatibility, and the compatible (non-debug) block profile
// output from the Go runtime.
func parseCppContention(r *bytes.Buffer) (*Profile, error) {
p := &Profile{
PeriodType: &ValueType{Type: "contentions", Unit: "count"},
Period: 1,
SampleType: []*ValueType{
......@@ -702,6 +714,8 @@ func parseContention(b []byte) (p *Profile, err error) {
}
var cpuHz int64
var l string
var err error
// Parse text of the form "attribute = value" before the samples.
const delimiter = "="
for {
......
......@@ -932,7 +932,7 @@ func gcStart(mode gcMode, forceTrigger bool) {
// another thread.
useStartSema := mode == gcBackgroundMode
if useStartSema {
semacquire(&work.startSema, false)
semacquire(&work.startSema, 0)
// Re-check transition condition under transition lock.
if !gcShouldStart(forceTrigger) {
semrelease(&work.startSema)
......@@ -953,7 +953,7 @@ func gcStart(mode gcMode, forceTrigger bool) {
}
// Ok, we're doing it! Stop everybody else
semacquire(&worldsema, false)
semacquire(&worldsema, 0)
if trace.enabled {
traceGCStart()
......@@ -1063,7 +1063,7 @@ func gcStart(mode gcMode, forceTrigger bool) {
// by mark termination.
func gcMarkDone() {
top:
semacquire(&work.markDoneSema, false)
semacquire(&work.markDoneSema, 0)
// Re-check transition condition under transition lock.
if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) {
......
......@@ -22,6 +22,7 @@ const (
// profile types
memProfile bucketType = 1 + iota
blockProfile
mutexProfile
// size of bucket hash table
buckHashSize = 179999
......@@ -47,7 +48,7 @@ type bucketType int
type bucket struct {
next *bucket
allnext *bucket
typ bucketType // memBucket or blockBucket
typ bucketType // memBucket or blockBucket (includes mutexProfile)
hash uintptr
size uintptr
nstk uintptr
......@@ -87,7 +88,7 @@ type memRecord struct {
}
// A blockRecord is the bucket data for a bucket of type blockProfile,
// part of the blocking profile.
// which is used in blocking and mutex profiles.
type blockRecord struct {
count int64
cycles int64
......@@ -96,6 +97,7 @@ type blockRecord struct {
var (
mbuckets *bucket // memory profile buckets
bbuckets *bucket // blocking profile buckets
xbuckets *bucket // mutex profile buckets
buckhash *[179999]*bucket
bucketmem uintptr
)
......@@ -108,7 +110,7 @@ func newBucket(typ bucketType, nstk int) *bucket {
throw("invalid profile bucket type")
case memProfile:
size += unsafe.Sizeof(memRecord{})
case blockProfile:
case blockProfile, mutexProfile:
size += unsafe.Sizeof(blockRecord{})
}
......@@ -136,7 +138,7 @@ func (b *bucket) mp() *memRecord {
// bp returns the blockRecord associated with the blockProfile bucket b.
func (b *bucket) bp() *blockRecord {
if b.typ != blockProfile {
if b.typ != blockProfile && b.typ != mutexProfile {
throw("bad use of bucket.bp")
}
data := add(unsafe.Pointer(b), unsafe.Sizeof(*b)+b.nstk*unsafe.Sizeof(uintptr(0)))
......@@ -188,6 +190,9 @@ func stkbucket(typ bucketType, size uintptr, stk []uintptr, alloc bool) *bucket
if typ == memProfile {
b.allnext = mbuckets
mbuckets = b
} else if typ == mutexProfile {
b.allnext = xbuckets
xbuckets = b
} else {
b.allnext = bbuckets
bbuckets = b
......@@ -292,10 +297,20 @@ func blockevent(cycles int64, skip int) {
if cycles <= 0 {
cycles = 1
}
if blocksampled(cycles) {
saveblockevent(cycles, skip+1, blockProfile, &blockprofilerate)
}
}
func blocksampled(cycles int64) bool {
rate := int64(atomic.Load64(&blockprofilerate))
if rate <= 0 || (rate > cycles && int64(fastrand())%rate > cycles) {
return
return false
}
return true
}
func saveblockevent(cycles int64, skip int, which bucketType, ratep *uint64) {
gp := getg()
var nstk int
var stk [maxStack]uintptr
......@@ -305,12 +320,40 @@ func blockevent(cycles int64, skip int) {
nstk = gcallers(gp.m.curg, skip, stk[:])
}
lock(&proflock)
b := stkbucket(blockProfile, 0, stk[:nstk], true)
b := stkbucket(which, 0, stk[:nstk], true)
b.bp().count++
b.bp().cycles += cycles
unlock(&proflock)
}
var mutexprofilerate uint64 // fraction sampled
// SetMutexProfileFraction controls the fraction of mutex contention events
// that are reported in the mutex profile. On average 1/rate events are
// reported. The previous rate is returned.
//
// To turn off profiling entirely, pass rate 0.
// To just read the current rate, pass rate -1.
// (For n>1 the details of sampling may change.)
func SetMutexProfileFraction(rate int) int {
if rate < 0 {
return int(mutexprofilerate)
}
old := mutexprofilerate
atomic.Store64(&mutexprofilerate, uint64(rate))
return int(old)
}
//go:linkname mutexevent sync.event
func mutexevent(cycles int64, skip int) {
rate := int64(atomic.Load64(&mutexprofilerate))
// TODO(pjw): measure impact of always calling fastrand vs using something
// like malloc.go:nextSample()
if rate > 0 && int64(fastrand())%rate == 0 {
saveblockevent(cycles, skip+1, mutexProfile, &mutexprofilerate)
}
}
// Go interface to profile data.
// A StackRecord describes a single execution stack.
......@@ -507,6 +550,35 @@ func BlockProfile(p []BlockProfileRecord) (n int, ok bool) {
return
}
// MutexProfile returns n, the number of records in the current mutex profile.
// If len(p) >= n, MutexProfile copies the profile into p and returns n, true.
// Otherwise, MutexProfile does not change p, and returns n, false.
//
// Most clients should use the runtime/pprof package
// instead of calling MutexProfile directly.
func MutexProfile(p []BlockProfileRecord) (n int, ok bool) {
lock(&proflock)
for b := xbuckets; b != nil; b = b.allnext {
n++
}
if n <= len(p) {
ok = true
for b := xbuckets; b != nil; b = b.allnext {
bp := b.bp()
r := &p[0]
r.Count = int64(bp.count)
r.Cycles = bp.cycles
i := copy(r.Stack0[:], b.stk())
for ; i < len(r.Stack0); i++ {
r.Stack0[i] = 0
}
p = p[1:]
}
}
unlock(&proflock)
return
}
// ThreadCreateProfile returns n, the number of records in the thread creation profile.
// If len(p) >= n, ThreadCreateProfile copies the profile into p and returns n, true.
// If len(p) < n, ThreadCreateProfile does not change p and returns n, false.
......
......@@ -99,6 +99,7 @@ import (
// heap - a sampling of all heap allocations
// threadcreate - stack traces that led to the creation of new OS threads
// block - stack traces that led to blocking on synchronization primitives
// mutex - stack traces of holders of contended mutexes
//
// These predefined profiles maintain themselves and panic on an explicit
// Add or Remove method call.
......@@ -152,6 +153,12 @@ var blockProfile = &Profile{
write: writeBlock,
}
var mutexProfile = &Profile{
name: "mutex",
count: countMutex,
write: writeMutex,
}
func lockProfiles() {
profiles.mu.Lock()
if profiles.m == nil {
......@@ -161,6 +168,7 @@ func lockProfiles() {
"threadcreate": threadcreateProfile,
"heap": heapProfile,
"block": blockProfile,
"mutex": mutexProfile,
}
}
}
......@@ -729,6 +737,12 @@ func countBlock() int {
return n
}
// countMutex returns the number of records in the mutex profile.
func countMutex() int {
n, _ := runtime.MutexProfile(nil)
return n
}
// writeBlock writes the current blocking profile to w.
func writeBlock(w io.Writer, debug int) error {
var p []runtime.BlockProfileRecord
......@@ -772,4 +786,49 @@ func writeBlock(w io.Writer, debug int) error {
return b.Flush()
}
// writeMutex writes the current mutex profile to w.
func writeMutex(w io.Writer, debug int) error {
// TODO(pjw): too much common code with writeBlock. FIX!
var p []runtime.BlockProfileRecord
n, ok := runtime.MutexProfile(nil)
for {
p = make([]runtime.BlockProfileRecord, n+50)
n, ok = runtime.MutexProfile(p)
if ok {
p = p[:n]
break
}
}
sort.Slice(p, func(i, j int) bool { return p[i].Cycles > p[j].Cycles })
b := bufio.NewWriter(w)
var tw *tabwriter.Writer
w = b
if debug > 0 {
tw = tabwriter.NewWriter(w, 1, 8, 1, '\t', 0)
w = tw
}
fmt.Fprintf(w, "--- mutex:\n")
fmt.Fprintf(w, "cycles/second=%v\n", runtime_cyclesPerSecond())
fmt.Fprintf(w, "sampling period=%d\n", runtime.SetMutexProfileFraction(-1))
for i := range p {
r := &p[i]
fmt.Fprintf(w, "%v %v @", r.Cycles, r.Count)
for _, pc := range r.Stack() {
fmt.Fprintf(w, " %#x", pc)
}
fmt.Fprint(w, "\n")
if debug > 0 {
printStackRecord(w, r.Stack(), true)
}
}
if tw != nil {
tw.Flush()
}
return b.Flush()
}
func runtime_cyclesPerSecond() int64
......@@ -592,6 +592,42 @@ func blockCond() {
mu.Unlock()
}
func TestMutexProfile(t *testing.T) {
old := runtime.SetMutexProfileFraction(1)
defer runtime.SetMutexProfileFraction(old)
if old != 0 {
t.Fatalf("need MutexProfileRate 0, got %d", old)
}
blockMutex()
var w bytes.Buffer
Lookup("mutex").WriteTo(&w, 1)
prof := w.String()
if !strings.HasPrefix(prof, "--- mutex:\ncycles/second=") {
t.Errorf("Bad profile header:\n%v", prof)
}
prof = strings.Trim(prof, "\n")
lines := strings.Split(prof, "\n")
if len(lines) != 6 {
t.Errorf("expected 6 lines, got %d %q\n%s", len(lines), prof, prof)
}
if len(lines) < 6 {
return
}
// checking that the line is like "35258904 1 @ 0x48288d 0x47cd28 0x458931"
r2 := `^\d+ 1 @(?: 0x[[:xdigit:]]+)+`
//r2 := "^[0-9]+ 1 @ 0x[0-9a-f x]+$"
if ok, err := regexp.MatchString(r2, lines[3]); err != nil || !ok {
t.Errorf("%q didn't match %q", lines[3], r2)
}
r3 := "^#.*runtime/pprof_test.blockMutex.*$"
if ok, err := regexp.MatchString(r3, lines[5]); err != nil || !ok {
t.Errorf("%q didn't match %q", lines[5], r3)
}
}
func func1(c chan int) { <-c }
func func2(c chan int) { <-c }
func func3(c chan int) { <-c }
......
......@@ -923,7 +923,7 @@ func restartg(gp *g) {
// in panic or being exited, this may not reliably stop all
// goroutines.
func stopTheWorld(reason string) {
semacquire(&worldsema, false)
semacquire(&worldsema, 0)
getg().m.preemptoff = reason
systemstack(stopTheWorldWithSema)
}
......@@ -946,7 +946,7 @@ var worldsema uint32 = 1
// preemption first and then should stopTheWorldWithSema on the system
// stack:
//
// semacquire(&worldsema, false)
// semacquire(&worldsema, 0)
// m.preemptoff = "reason"
// systemstack(stopTheWorldWithSema)
//
......
......@@ -256,6 +256,7 @@ type sudog struct {
// The following fields are never accessed concurrently.
// waitlink is only accessed by g.
acquiretime int64
releasetime int64
ticket uint32
waitlink *sudog // g.waiting list
......
......@@ -44,12 +44,12 @@ var semtable [semTabSize]struct {
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire(addr, true)
semacquire(addr, semaBlockProfile)
}
//go:linkname net_runtime_Semacquire net.runtime_Semacquire
func net_runtime_Semacquire(addr *uint32) {
semacquire(addr, true)
semacquire(addr, semaBlockProfile)
}
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
......@@ -57,6 +57,11 @@ func sync_runtime_Semrelease(addr *uint32) {
semrelease(addr)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32) {
semacquire(addr, semaBlockProfile|semaMutexProfile)
}
//go:linkname net_runtime_Semrelease net.runtime_Semrelease
func net_runtime_Semrelease(addr *uint32) {
semrelease(addr)
......@@ -69,8 +74,15 @@ func readyWithTime(s *sudog, traceskip int) {
goready(s.g, traceskip)
}
type semaProfileFlags int
const (
semaBlockProfile semaProfileFlags = 1 << iota
semaMutexProfile
)
// Called from runtime.
func semacquire(addr *uint32, profile bool) {
func semacquire(addr *uint32, profile semaProfileFlags) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
......@@ -91,10 +103,17 @@ func semacquire(addr *uint32, profile bool) {
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
if profile && blockprofilerate > 0 {
s.acquiretime = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lock(&root.lock)
// Add ourselves to nwait to disable "easy case" in semrelease.
......@@ -146,8 +165,19 @@ func semrelease(addr *uint32) {
break
}
}
unlock(&root.lock)
if s != nil {
if s.acquiretime != 0 {
t0 := cputicks()
for x := root.head; x != nil; x = x.next {
if x.elem == unsafe.Pointer(addr) {
x.acquiretime = t0
}
}
mutexevent(t0-s.acquiretime, 3)
}
}
unlock(&root.lock)
if s != nil { // May be slow, so unlock first
readyWithTime(s, 5)
}
}
......
......@@ -292,7 +292,7 @@ func StopTrace() {
// The world is started but we've set trace.shutdown, so new tracing can't start.
// Wait for the trace reader to flush pending buffers and stop.
semacquire(&trace.shutdownSema, false)
semacquire(&trace.shutdownSema, 0)
if raceenabled {
raceacquire(unsafe.Pointer(&trace.shutdownSema))
}
......
......@@ -84,7 +84,7 @@ func (m *Mutex) Lock() {
if old&mutexLocked == 0 {
break
}
runtime_Semacquire(&m.sema)
runtime_SemacquireMutex(&m.sema)
awoke = true
iter = 0
}
......
......@@ -66,6 +66,10 @@ func HammerMutex(m *Mutex, loops int, cdone chan bool) {
}
func TestMutex(t *testing.T) {
if n := runtime.SetMutexProfileFraction(1); n != 0 {
t.Logf("got mutexrate %d expected 0", n)
}
defer runtime.SetMutexProfileFraction(0)
m := new(Mutex)
c := make(chan bool)
for i := 0; i < 10; i++ {
......
......@@ -13,6 +13,9 @@ import "unsafe"
// library and should not be used directly.
func runtime_Semacquire(s *uint32)
// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
func runtime_SemacquireMutex(*uint32)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
......
......@@ -242,6 +242,8 @@ var (
cpuProfile = flag.String("test.cpuprofile", "", "write a cpu profile to `file`")
blockProfile = flag.String("test.blockprofile", "", "write a goroutine blocking profile to `file`")
blockProfileRate = flag.Int("test.blockprofilerate", 1, "set blocking profile `rate` (see runtime.SetBlockProfileRate)")
mutexProfile = flag.String("test.mutexprofile", "", "write a mutex contention profile to the named file after execution")
mutexProfileFraction = flag.Int("test.mutexprofilefraction", 1, "if >= 0, calls runtime.SetMutexProfileFraction()")
traceFile = flag.String("test.trace", "", "write an execution trace to `file`")
timeout = flag.Duration("test.timeout", 0, "fail test binary execution after duration `d` (0 means unlimited)")
cpuListStr = flag.String("test.cpu", "", "comma-separated `list` of cpu counts to run each test with")
......@@ -874,6 +876,9 @@ func before() {
if *blockProfile != "" && *blockProfileRate >= 0 {
runtime.SetBlockProfileRate(*blockProfileRate)
}
if *mutexProfile != "" && *mutexProfileFraction >= 0 {
runtime.SetMutexProfileFraction(*mutexProfileFraction)
}
if *coverProfile != "" && cover.Mode == "" {
fmt.Fprintf(os.Stderr, "testing: cannot use -test.coverprofile because test binary was not built with coverage enabled\n")
os.Exit(2)
......@@ -913,6 +918,18 @@ func after() {
}
f.Close()
}
if *mutexProfile != "" && *mutexProfileFraction >= 0 {
f, err := os.Create(toOutputDir(*mutexProfile))
if err != nil {
fmt.Fprintf(os.Stderr, "testing: %s\n", err)
os.Exit(2)
}
if err = pprof.Lookup("mutex").WriteTo(f, 0); err != nil {
fmt.Fprintf(os.Stderr, "testing: can't write %s: %s\n", *blockProfile, err)
os.Exit(2)
}
f.Close()
}
if cover.Mode != "" {
coverReport()
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment