]> Cypherpunks.ru repositories - gostls13.git/blob - src/internal/trace/v2/reader.go
runtime: add execution tracer v2 behind GOEXPERIMENT=exectracer2
[gostls13.git] / src / internal / trace / v2 / reader.go
1 // Copyright 2023 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package trace
6
7 import (
8         "bufio"
9         "fmt"
10         "io"
11         "slices"
12         "strings"
13
14         "internal/trace/v2/event/go122"
15         "internal/trace/v2/version"
16 )
17
18 // Reader reads a byte stream, validates it, and produces trace events.
19 type Reader struct {
20         r           *bufio.Reader
21         lastTs      Time
22         gen         *generation
23         spill       *spilledBatch
24         frontier    []*batchCursor
25         cpuSamples  []cpuSample
26         order       ordering
27         emittedSync bool
28 }
29
30 // NewReader creates a new trace reader.
31 func NewReader(r io.Reader) (*Reader, error) {
32         br := bufio.NewReader(r)
33         v, err := version.ReadHeader(br)
34         if err != nil {
35                 return nil, err
36         }
37         if v != version.Go122 {
38                 return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
39         }
40         return &Reader{
41                 r: br,
42                 order: ordering{
43                         mStates:     make(map[ThreadID]*mState),
44                         pStates:     make(map[ProcID]*pState),
45                         gStates:     make(map[GoID]*gState),
46                         activeTasks: make(map[TaskID]taskState),
47                 },
48                 // Don't emit a sync event when we first go to emit events.
49                 emittedSync: true,
50         }, nil
51 }
52
53 // ReadEvent reads a single event from the stream.
54 //
55 // If the stream has been exhausted, it returns an invalid
56 // event and io.EOF.
57 func (r *Reader) ReadEvent() (e Event, err error) {
58         // Go 1.22+ trace parsing algorithm.
59         //
60         // (1) Read in all the batches for the next generation from the stream.
61         //   (a) Use the size field in the header to quickly find all batches.
62         // (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
63         // (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
64         // (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
65         // (5) Try to advance the next event for the M at the top of the min-heap.
66         //   (a) On success, select that M.
67         //   (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
68         //   (c) If there's nothing left to advance, goto (1).
69         // (6) Select the latest event for the selected M and get it ready to be returned.
70         // (7) Read the next event for the selected M and update the min-heap.
71         // (8) Return the selected event, goto (5) on the next call.
72
73         // Set us up to track the last timestamp and fix up
74         // the timestamp of any event that comes through.
75         defer func() {
76                 if err != nil {
77                         return
78                 }
79                 if err = e.validateTableIDs(); err != nil {
80                         return
81                 }
82                 if e.base.time <= r.lastTs {
83                         e.base.time = r.lastTs + 1
84                 }
85                 r.lastTs = e.base.time
86         }()
87
88         // Check if we need to refresh the generation.
89         if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
90                 if !r.emittedSync {
91                         r.emittedSync = true
92                         return syncEvent(r.gen.evTable, r.lastTs), nil
93                 }
94                 if r.gen != nil && r.spill == nil {
95                         // If we have a generation from the last read,
96                         // and there's nothing left in the frontier, and
97                         // there's no spilled batch, indicating that there's
98                         // no further generation, it means we're done.
99                         // Return io.EOF.
100                         return Event{}, io.EOF
101                 }
102                 // Read the next generation.
103                 r.gen, r.spill, err = readGeneration(r.r, r.spill)
104                 if err != nil {
105                         return Event{}, err
106                 }
107
108                 // Reset CPU samples cursor.
109                 r.cpuSamples = r.gen.cpuSamples
110
111                 // Reset frontier.
112                 for m, batches := range r.gen.batches {
113                         bc := &batchCursor{m: m}
114                         ok, err := bc.nextEvent(batches, r.gen.freq)
115                         if err != nil {
116                                 return Event{}, err
117                         }
118                         if !ok {
119                                 // Turns out there aren't actually any events in these batches.
120                                 continue
121                         }
122                         r.frontier = heapInsert(r.frontier, bc)
123                 }
124
125                 // Reset emittedSync.
126                 r.emittedSync = false
127         }
128         refresh := func(i int) error {
129                 bc := r.frontier[i]
130
131                 // Refresh the cursor's event.
132                 ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
133                 if err != nil {
134                         return err
135                 }
136                 if ok {
137                         // If we successfully refreshed, update the heap.
138                         heapUpdate(r.frontier, i)
139                 } else {
140                         // There's nothing else to read. Delete this cursor from the frontier.
141                         r.frontier = heapRemove(r.frontier, i)
142                 }
143                 return nil
144         }
145         // Inject a CPU sample if it comes next.
146         if len(r.cpuSamples) != 0 {
147                 if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
148                         e := r.cpuSamples[0].asEvent(r.gen.evTable)
149                         r.cpuSamples = r.cpuSamples[1:]
150                         return e, nil
151                 }
152         }
153         // Try to advance the head of the frontier, which should have the minimum timestamp.
154         // This should be by far the most common case
155         bc := r.frontier[0]
156         if ctx, ok, err := r.order.advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); err != nil {
157                 return Event{}, err
158         } else if ok {
159                 e := Event{table: r.gen.evTable, ctx: ctx, base: bc.ev}
160                 return e, refresh(0)
161         }
162         // Sort the min-heap. A sorted min-heap is still a min-heap,
163         // but now we can iterate over the rest and try to advance in
164         // order. This path should be rare.
165         slices.SortFunc(r.frontier, (*batchCursor).compare)
166         // Try to advance the rest of the frontier, in timestamp order.
167         for i := 1; i < len(r.frontier); i++ {
168                 bc := r.frontier[i]
169                 if ctx, ok, err := r.order.advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); err != nil {
170                         return Event{}, err
171                 } else if ok {
172                         e := Event{table: r.gen.evTable, ctx: ctx, base: bc.ev}
173                         return e, refresh(i)
174                 }
175         }
176         return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
177 }
178
179 func dumpFrontier(frontier []*batchCursor) string {
180         var sb strings.Builder
181         for _, bc := range frontier {
182                 spec := go122.Specs()[bc.ev.typ]
183                 fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
184                 for i, arg := range spec.Args[1:] {
185                         fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
186                 }
187                 fmt.Fprintf(&sb, "]\n")
188         }
189         return sb.String()
190 }