]> Cypherpunks.ru repositories - gostls13.git/commitdiff
runtime: change mutex profile to count every blocked goroutine
authorRuss Cox <rsc@golang.org>
Mon, 26 Jun 2023 21:12:44 +0000 (17:12 -0400)
committerGopher Robot <gobot@golang.org>
Thu, 17 Aug 2023 01:30:31 +0000 (01:30 +0000)
The pprof mutex profile was meant to match the Google C++ (now Abseil)
mutex profiler, originally designed and implemented by Mike Burrows.
When we worked on the Go version, pjw and I missed that C++ counts the
time each thread is blocked, even if multiple threads are blocked on a
mutex. That is, if 100 threads are blocked on the same mutex for the
same 10ms, that still counts as 1000ms of contention in C++. In Go, to
date, /debug/pprof/mutex has counted that as only 10ms of contention.
If 100 goroutines are blocked on one mutex and only 1 goroutine is
blocked on another mutex, we probably do want to see the first mutex
as being more contended, so the Abseil approach is the more useful one.

This CL adopts "contention scales with number of goroutines blocked",
to better match Abseil [1]. However, it still makes sure to attribute the
time to the unlock that caused the backup, not subsequent innocent
unlocks that were affected by the congestion. In this way it still gives
more accurate profiles than Abseil does.

[1] https://github.com/abseil/abseil-cpp/blob/lts_2023_01_25/absl/synchronization/mutex.cc#L2390

Fixes #61015.

Change-Id: I7eb9e706867ffa8c0abb5b26a1b448f6eba49331
Reviewed-on: https://go-review.googlesource.com/c/go/+/506415
Run-TryBot: Russ Cox <rsc@golang.org>
Auto-Submit: Russ Cox <rsc@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
src/runtime/export_test.go
src/runtime/mprof.go
src/runtime/pprof/pprof_test.go
src/runtime/runtime2.go
src/runtime/sema.go

index a89220e0dd0a5986214cef4623433a3390337595..4168705f2a05135f925bdd253a55ebd55eeafb48 100644 (file)
@@ -1330,7 +1330,7 @@ func (t *SemTable) Enqueue(addr *uint32) {
 //
 // Returns true if there actually was a waiter to be dequeued.
 func (t *SemTable) Dequeue(addr *uint32) bool {
-       s, _ := t.semTable.rootFor(addr).dequeue(addr)
+       s, _, _ := t.semTable.rootFor(addr).dequeue(addr)
        if s != nil {
                releaseSudog(s)
                return true
index 308ebaebe6e84fac2e2f6f9e48bd5db132f5418a..45f107722aa4ace31d1e40dec6f8365ef9754cc2 100644 (file)
@@ -553,8 +553,6 @@ func mutexevent(cycles int64, skip int) {
                cycles = 0
        }
        rate := int64(atomic.Load64(&mutexprofilerate))
-       // TODO(pjw): measure impact of always calling fastrand vs using something
-       // like malloc.go:nextSample()
        if rate > 0 && int64(fastrand())%rate == 0 {
                saveblockevent(cycles, rate, skip+1, mutexProfile)
        }
index 1ade860441ca6aee49f36e287acb50c7f29e85cd..948024061524b2d5d97c6939c9155d967a4b5738 100644 (file)
@@ -1023,7 +1023,7 @@ func containsStack(got [][]string, want []string) bool {
 // awaitBlockedGoroutine spins on runtime.Gosched until a runtime stack dump
 // shows a goroutine in the given state with a stack frame in
 // runtime/pprof.<fName>.
-func awaitBlockedGoroutine(t *testing.T, state, fName string) {
+func awaitBlockedGoroutine(t *testing.T, state, fName string, count int) {
        re := fmt.Sprintf(`(?m)^goroutine \d+ \[%s\]:\n(?:.+\n\t.+\n)*runtime/pprof\.%s`, regexp.QuoteMeta(state), fName)
        r := regexp.MustCompile(re)
 
@@ -1047,7 +1047,7 @@ func awaitBlockedGoroutine(t *testing.T, state, fName string) {
                        buf = make([]byte, 2*len(buf))
                        continue
                }
-               if r.Match(buf[:n]) {
+               if len(r.FindAll(buf[:n], -1)) >= count {
                        return
                }
        }
@@ -1056,7 +1056,7 @@ func awaitBlockedGoroutine(t *testing.T, state, fName string) {
 func blockChanRecv(t *testing.T) {
        c := make(chan bool)
        go func() {
-               awaitBlockedGoroutine(t, "chan receive", "blockChanRecv")
+               awaitBlockedGoroutine(t, "chan receive", "blockChanRecv", 1)
                c <- true
        }()
        <-c
@@ -1065,7 +1065,7 @@ func blockChanRecv(t *testing.T) {
 func blockChanSend(t *testing.T) {
        c := make(chan bool)
        go func() {
-               awaitBlockedGoroutine(t, "chan send", "blockChanSend")
+               awaitBlockedGoroutine(t, "chan send", "blockChanSend", 1)
                <-c
        }()
        c <- true
@@ -1074,7 +1074,7 @@ func blockChanSend(t *testing.T) {
 func blockChanClose(t *testing.T) {
        c := make(chan bool)
        go func() {
-               awaitBlockedGoroutine(t, "chan receive", "blockChanClose")
+               awaitBlockedGoroutine(t, "chan receive", "blockChanClose", 1)
                close(c)
        }()
        <-c
@@ -1086,7 +1086,7 @@ func blockSelectRecvAsync(t *testing.T) {
        c2 := make(chan bool, 1)
        go func() {
                for i := 0; i < numTries; i++ {
-                       awaitBlockedGoroutine(t, "select", "blockSelectRecvAsync")
+                       awaitBlockedGoroutine(t, "select", "blockSelectRecvAsync", 1)
                        c <- true
                }
        }()
@@ -1102,7 +1102,7 @@ func blockSelectSendSync(t *testing.T) {
        c := make(chan bool)
        c2 := make(chan bool)
        go func() {
-               awaitBlockedGoroutine(t, "select", "blockSelectSendSync")
+               awaitBlockedGoroutine(t, "select", "blockSelectSendSync", 1)
                <-c
        }()
        select {
@@ -1115,7 +1115,7 @@ func blockMutex(t *testing.T) {
        var mu sync.Mutex
        mu.Lock()
        go func() {
-               awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex")
+               awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex", 1)
                mu.Unlock()
        }()
        // Note: Unlock releases mu before recording the mutex event,
@@ -1125,12 +1125,36 @@ func blockMutex(t *testing.T) {
        mu.Lock()
 }
 
+func blockMutexN(t *testing.T, n int, d time.Duration) {
+       var wg sync.WaitGroup
+       var mu sync.Mutex
+       mu.Lock()
+       go func() {
+               awaitBlockedGoroutine(t, "sync.Mutex.Lock", "blockMutex", n)
+               time.Sleep(d)
+               mu.Unlock()
+       }()
+       // Note: Unlock releases mu before recording the mutex event,
+       // so it's theoretically possible for this to proceed and
+       // capture the profile before the event is recorded. As long
+       // as this is blocked before the unlock happens, it's okay.
+       for i := 0; i < n; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       mu.Lock()
+                       mu.Unlock()
+               }()
+       }
+       wg.Wait()
+}
+
 func blockCond(t *testing.T) {
        var mu sync.Mutex
        c := sync.NewCond(&mu)
        mu.Lock()
        go func() {
-               awaitBlockedGoroutine(t, "sync.Cond.Wait", "blockCond")
+               awaitBlockedGoroutine(t, "sync.Cond.Wait", "blockCond", 1)
                mu.Lock()
                c.Signal()
                mu.Unlock()
@@ -1217,7 +1241,11 @@ func TestMutexProfile(t *testing.T) {
                t.Fatalf("need MutexProfileRate 0, got %d", old)
        }
 
-       blockMutex(t)
+       const (
+               N = 100
+               D = 100 * time.Millisecond
+       )
+       blockMutexN(t, N, D)
 
        t.Run("debug=1", func(t *testing.T) {
                var w strings.Builder
@@ -1230,15 +1258,11 @@ func TestMutexProfile(t *testing.T) {
                }
                prof = strings.Trim(prof, "\n")
                lines := strings.Split(prof, "\n")
-               if len(lines) != 6 {
-                       t.Errorf("expected 6 lines, got %d %q\n%s", len(lines), prof, prof)
-               }
                if len(lines) < 6 {
-                       return
+                       t.Fatalf("expected >=6 lines, got %d %q\n%s", len(lines), prof, prof)
                }
                // checking that the line is like "35258904 1 @ 0x48288d 0x47cd28 0x458931"
                r2 := `^\d+ \d+ @(?: 0x[[:xdigit:]]+)+`
-               //r2 := "^[0-9]+ 1 @ 0x[0-9a-f x]+$"
                if ok, err := regexp.MatchString(r2, lines[3]); err != nil || !ok {
                        t.Errorf("%q didn't match %q", lines[3], r2)
                }
@@ -1263,12 +1287,30 @@ func TestMutexProfile(t *testing.T) {
 
                stks := stacks(p)
                for _, want := range [][]string{
-                       {"sync.(*Mutex).Unlock", "runtime/pprof.blockMutex.func1"},
+                       {"sync.(*Mutex).Unlock", "runtime/pprof.blockMutexN.func1"},
                } {
                        if !containsStack(stks, want) {
                                t.Errorf("No matching stack entry for %+v", want)
                        }
                }
+
+               i := 0
+               for ; i < len(p.SampleType); i++ {
+                       if p.SampleType[i].Unit == "nanoseconds" {
+                               break
+                       }
+               }
+               if i >= len(p.SampleType) {
+                       t.Fatalf("profile did not contain nanoseconds sample")
+               }
+               total := int64(0)
+               for _, s := range p.Sample {
+                       total += s.Value[i]
+               }
+               d := time.Duration(total)
+               if d < N*D*9/10 || d > N*D*2 { // want N*D but allow [0.9,2.0]*that.
+                       t.Fatalf("profile samples total %v, want %v", d, N*D)
+               }
        })
 }
 
index 5017a7a80a681f363b412366d0d067ed9f16e71b..2a02e1fb3bfe2e405133dc68758304688c16dd74 100644 (file)
@@ -342,7 +342,7 @@ type gobuf struct {
        bp   uintptr // for framepointer-enabled architectures
 }
 
-// sudog represents a g in a wait list, such as for sending/receiving
+// sudog (pseudo-g) represents a g in a wait list, such as for sending/receiving
 // on a channel.
 //
 // sudog is necessary because the g ↔ synchronization object relation
@@ -382,6 +382,13 @@ type sudog struct {
        // because c was closed.
        success bool
 
+       // waiters is a count of semaRoot waiting list other than head of list,
+       // clamped to a uint16 to fit in unused space.
+       // Only meaningful at the head of the list.
+       // (If we wanted to be overly clever, we could store a high 16 bits
+       // in the second entry in the list.)
+       waiters uint16
+
        parent   *sudog // semaRoot binary tree
        waitlink *sudog // g.waiting list or semaRoot
        waittail *sudog // semaRoot
index d0a81170c3c25631287e640dad1c6f8ea2d303e0..3b6874ca1192c16c5f1e7333c991bf35bcb0b5d3 100644 (file)
@@ -191,7 +191,7 @@ func semrelease1(addr *uint32, handoff bool, skipframes int) {
                unlock(&root.lock)
                return
        }
-       s, t0 := root.dequeue(addr)
+       s, t0, tailtime := root.dequeue(addr)
        if s != nil {
                root.nwait.Add(-1)
        }
@@ -199,7 +199,28 @@ func semrelease1(addr *uint32, handoff bool, skipframes int) {
        if s != nil { // May be slow or even yield, so unlock first
                acquiretime := s.acquiretime
                if acquiretime != 0 {
-                       mutexevent(t0-acquiretime, 3+skipframes)
+                       // Charge contention that this (delayed) unlock caused.
+                       // If there are N more goroutines waiting beyond the
+                       // one that's waking up, charge their delay as well, so that
+                       // contention holding up many goroutines shows up as
+                       // more costly than contention holding up a single goroutine.
+                       // It would take O(N) time to calculate how long each goroutine
+                       // has been waiting, so instead we charge avg(head-wait, tail-wait)*N.
+                       // head-wait is the longest wait and tail-wait is the shortest.
+                       // (When we do a lifo insertion, we preserve this property by
+                       // copying the old head's acquiretime into the inserted new head.
+                       // In that case the overall average may be slightly high, but that's fine:
+                       // the average of the ends is only an approximation to the actual
+                       // average anyway.)
+                       // The root.dequeue above changed the head and tail acquiretime
+                       // to the current time, so the next unlock will not re-count this contention.
+                       dt0 := t0 - acquiretime
+                       dt := dt0
+                       if s.waiters != 0 {
+                               dtail := t0 - tailtime
+                               dt += (dtail + dt0) / 2 * int64(s.waiters)
+                       }
+                       mutexevent(dt, 3+skipframes)
                }
                if s.ticket != 0 {
                        throw("corrupted semaphore ticket")
@@ -248,6 +269,7 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
        s.elem = unsafe.Pointer(addr)
        s.next = nil
        s.prev = nil
+       s.waiters = 0
 
        var last *sudog
        pt := &root.treap
@@ -258,7 +280,7 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
                                // Substitute s in t's place in treap.
                                *pt = s
                                s.ticket = t.ticket
-                               s.acquiretime = t.acquiretime
+                               s.acquiretime = t.acquiretime // preserve head acquiretime as oldest time
                                s.parent = t.parent
                                s.prev = t.prev
                                s.next = t.next
@@ -274,6 +296,10 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
                                if s.waittail == nil {
                                        s.waittail = t
                                }
+                               s.waiters = t.waiters
+                               if s.waiters+1 != 0 {
+                                       s.waiters++
+                               }
                                t.parent = nil
                                t.prev = nil
                                t.next = nil
@@ -287,6 +313,9 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
                                }
                                t.waittail = s
                                s.waitlink = nil
+                               if t.waiters+1 != 0 {
+                                       t.waiters++
+                               }
                        }
                        return
                }
@@ -330,7 +359,10 @@ func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
 // in semaRoot blocked on addr.
 // If the sudog was being profiled, dequeue returns the time
 // at which it was woken up as now. Otherwise now is 0.
-func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
+// If there are additional entries in the wait list, dequeue
+// returns tailtime set to the last entry's acquiretime.
+// Otherwise tailtime is found.acquiretime.
+func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now, tailtime int64) {
        ps := &root.treap
        s := *ps
        for ; s != nil; s = *ps {
@@ -343,7 +375,7 @@ func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
                        ps = &s.next
                }
        }
-       return nil, 0
+       return nil, 0, 0
 
 Found:
        now = int64(0)
@@ -368,7 +400,16 @@ Found:
                } else {
                        t.waittail = nil
                }
+               t.waiters = s.waiters
+               if t.waiters > 1 {
+                       t.waiters--
+               }
+               // Set head and tail acquire time to 'now',
+               // because the caller will take care of charging
+               // the delays before now for all entries in the list.
                t.acquiretime = now
+               tailtime = s.waittail.acquiretime
+               s.waittail.acquiretime = now
                s.waitlink = nil
                s.waittail = nil
        } else {
@@ -390,13 +431,14 @@ Found:
                } else {
                        root.treap = nil
                }
+               tailtime = s.acquiretime
        }
        s.parent = nil
        s.elem = nil
        s.next = nil
        s.prev = nil
        s.ticket = 0
-       return s, now
+       return s, now, tailtime
 }
 
 // rotateLeft rotates the tree rooted at node x.