// awaitBlockedGoroutine spins on runtime.Gosched until a runtime stack dump
// shows a goroutine in the given state with a stack frame in
// runtime/pprof.<fName>.
-func awaitBlockedGoroutine(t *testing.T, state, fName string) {
+func awaitBlockedGoroutine(t *testing.T, state, fName string, count int) {
re := fmt.Sprintf(`(?m)^goroutine \d+ \[%s\]:\n(?:.+\n\t.+\n)*runtime/pprof\.%s`, regexp.QuoteMeta(state), fName)
r := regexp.MustCompile(re)
buf = make([]byte, 2*len(buf))
continue
}
- if r.Match(buf[:n]) {
+ if len(r.FindAll(buf[:n], -1)) >= count {
return
}
}
func blockChanRecv(t *testing.T) {
c := make(chan bool)
go func() {
- awaitBlockedGoroutine(t, "chan receive", "blockChanRecv")
+ awaitBlockedGoroutine(t, "chan receive", "blockChanRecv", 1)
c <- true
}()
<-c
func blockChanSend(t *testing.T) {
c := make(chan bool)
go func() {
- awaitBlockedGoroutine(t, "chan send", "blockChanSend")
+ awaitBlockedGoroutine(t, "chan send", "blockChanSend", 1)
<-c
}()
c <- true
func blockChanClose(t *testing.T) {
c := make(chan bool)
go func() {
- awaitBlockedGoroutine(t, "chan receive", "blockChanClose")
+ awaitBlockedGoroutine(t, "chan receive", "blockChanClose", 1)
close(c)
}()
<-c
c2 := make(chan bool, 1)
go func() {
for i := 0; i < numTries; i++ {
- awaitBlockedGoroutine(t, "select", "blockSelectRecvAsync")
+ awaitBlockedGoroutine(t, "select", "blockSelectRecvAsync", 1)
c <- true
}
}()
c := make(chan bool)
c2 := make(chan bool)
go func() {
- awaitBlockedGoroutine(t, "select", "blockSelectSendSync")
+ awaitBlockedGoroutine(t, "select", "blockSelectSendSync", 1)
<-c
}()
select {
var mu sync.Mutex
mu.Lock()
go func() {
- awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex")
+ awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex", 1)
mu.Unlock()
}()
// Note: Unlock releases mu before recording the mutex event,
mu.Lock()
}
+func blockMutexN(t *testing.T, n int, d time.Duration) {
+ var wg sync.WaitGroup
+ var mu sync.Mutex
+ mu.Lock()
+ go func() {
+ awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex", n)
+ time.Sleep(d)
+ mu.Unlock()
+ }()
+ // Note: Unlock releases mu before recording the mutex event,
+ // so it's theoretically possible for this to proceed and
+ // capture the profile before the event is recorded. As long
+ // as this is blocked before the unlock happens, it's okay.
+ for i := 0; i < n; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ mu.Lock()
+ mu.Unlock()
+ }()
+ }
+ wg.Wait()
+}
+
func blockCond(t *testing.T) {
var mu sync.Mutex
c := sync.NewCond(&mu)
mu.Lock()
go func() {
- awaitBlockedGoroutine(t, "sync.Cond.Wait", "blockCond")
+ awaitBlockedGoroutine(t, "sync.Cond.Wait", "blockCond", 1)
mu.Lock()
c.Signal()
mu.Unlock()
t.Fatalf("need MutexProfileRate 0, got %d", old)
}
- blockMutex(t)
+ const (
+ N = 100
+ D = 100 * time.Millisecond
+ )
+ blockMutexN(t, N, D)
t.Run("debug=1", func(t *testing.T) {
var w strings.Builder
}
prof = strings.Trim(prof, "\n")
lines := strings.Split(prof, "\n")
- if len(lines) != 6 {
- t.Errorf("expected 6 lines, got %d %q\n%s", len(lines), prof, prof)
- }
if len(lines) < 6 {
- return
+ t.Fatalf("expected >=6 lines, got %d %q\n%s", len(lines), prof, prof)
}
// checking that the line is like "35258904 1 @ 0x48288d 0x47cd28 0x458931"
r2 := `^\d+ \d+ @(?: 0x[[:xdigit:]]+)+`
- //r2 := "^[0-9]+ 1 @ 0x[0-9a-f x]+$"
if ok, err := regexp.MatchString(r2, lines[3]); err != nil || !ok {
t.Errorf("%q didn't match %q", lines[3], r2)
}
stks := stacks(p)
for _, want := range [][]string{
- {"sync.(*Mutex).Unlock", "runtime/pprof.blockMutex.func1"},
+ {"sync.(*Mutex).Unlock", "runtime/pprof.blockMutexN.func1"},
} {
if !containsStack(stks, want) {
t.Errorf("No matching stack entry for %+v", want)
}
}
+
+ i := 0
+ for ; i < len(p.SampleType); i++ {
+ if p.SampleType[i].Unit == "nanoseconds" {
+ break
+ }
+ }
+ if i >= len(p.SampleType) {
+ t.Fatalf("profile did not contain nanoseconds sample")
+ }
+ total := int64(0)
+ for _, s := range p.Sample {
+ total += s.Value[i]
+ }
+ d := time.Duration(total)
+ if d < N*D*9/10 || d > N*D*2 { // want N*D but allow [0.9,2.0]*that.
+ t.Fatalf("profile samples total %v, want %v", d, N*D)
+ }
})
}
unlock(&root.lock)
return
}
- s, t0 := root.dequeue(addr)
+ s, t0, tailtime := root.dequeue(addr)
if s != nil {
root.nwait.Add(-1)
}
if s != nil { // May be slow or even yield, so unlock first
acquiretime := s.acquiretime
if acquiretime != 0 {
- mutexevent(t0-acquiretime, 3+skipframes)
+ // Charge contention that this (delayed) unlock caused.
+ // If there are N more goroutines waiting beyond the
+ // one that's waking up, charge their delay as well, so that
+ // contention holding up many goroutines shows up as
+ // more costly than contention holding up a single goroutine.
+ // It would take O(N) time to calculate how long each goroutine
+ // has been waiting, so instead we charge avg(head-wait, tail-wait)*N.
+ // head-wait is the longest wait and tail-wait is the shortest.
+ // (When we do a lifo insertion, we preserve this property by
+ // copying the old head's acquiretime into the inserted new head.
+ // In that case the overall average may be slightly high, but that's fine:
+ // the average of the ends is only an approximation to the actual
+ // average anyway.)
+ // The root.dequeue above changed the head and tail acquiretime
+ // to the current time, so the next unlock will not re-count this contention.
+ dt0 := t0 - acquiretime
+ dt := dt0
+ if s.waiters != 0 {
+ dtail := t0 - tailtime
+ dt += (dtail + dt0) / 2 * int64(s.waiters)
+ }
+ mutexevent(dt, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
s.elem = unsafe.Pointer(addr)
s.next = nil
s.prev = nil
+ s.waiters = 0
var last *sudog
pt := &root.treap
// Substitute s in t's place in treap.
*pt = s
s.ticket = t.ticket
- s.acquiretime = t.acquiretime
+ s.acquiretime = t.acquiretime // preserve head acquiretime as oldest time
s.parent = t.parent
s.prev = t.prev
s.next = t.next
if s.waittail == nil {
s.waittail = t
}
+ s.waiters = t.waiters
+ if s.waiters+1 != 0 {
+ s.waiters++
+ }
t.parent = nil
t.prev = nil
t.next = nil
}
t.waittail = s
s.waitlink = nil
+ if t.waiters+1 != 0 {
+ t.waiters++
+ }
}
return
}
// in semaRoot blocked on addr.
// If the sudog was being profiled, dequeue returns the time
// at which it was woken up as now. Otherwise now is 0.
-func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
+// If there are additional entries in the wait list, dequeue
+// returns tailtime set to the last entry's acquiretime.
+// Otherwise tailtime is found.acquiretime.
+func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now, tailtime int64) {
ps := &root.treap
s := *ps
for ; s != nil; s = *ps {
ps = &s.next
}
}
- return nil, 0
+ return nil, 0, 0
Found:
now = int64(0)
} else {
t.waittail = nil
}
+ t.waiters = s.waiters
+ if t.waiters > 1 {
+ t.waiters--
+ }
+ // Set head and tail acquire time to 'now',
+ // because the caller will take care of charging
+ // the delays before now for all entries in the list.
t.acquiretime = now
+ tailtime = s.waittail.acquiretime
+ s.waittail.acquiretime = now
s.waitlink = nil
s.waittail = nil
} else {
} else {
root.treap = nil
}
+ tailtime = s.acquiretime
}
s.parent = nil
s.elem = nil
s.next = nil
s.prev = nil
s.ticket = 0
- return s, now
+ return s, now, tailtime
}
// rotateLeft rotates the tree rooted at node x.