]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: remove futile wakeups from trace
authorDmitry Vyukov <dvyukov@google.com>
Wed, 11 Mar 2015 15:29:12 +0000 (18:29 +0300)
committerDmitry Vyukov <dvyukov@google.com>
Tue, 17 Mar 2015 14:14:55 +0000 (14:14 +0000)
Channels and sync.Mutex'es allow another goroutine to acquire resource
ahead of an unblocked goroutine. This is good for performance, but
leads to futile wakeups (the unblocked goroutine needs to block again).
Futile wakeups caused user confusion during the very first evaluation
of tracing functionality on a real server (a goroutine as if acquires a mutex
in a loop, while there is no loop in user code).

This change detects futile wakeups on channels and emits a special event
to denote the fact. Later parser finds entire wakeup sequences
(unblock->start->block) and removes them.

sync.Mutex will be supported in a separate change.

Change-Id: Iaaaee9d5c0921afc62b449a97447445030ac19d3
Reviewed-on: https://go-review.googlesource.com/7380
Reviewed-by: Keith Randall <khr@golang.org>
src/internal/trace/parser.go
src/runtime/chan.go
src/runtime/pprof/trace_test.go
src/runtime/select.go
src/runtime/trace.go

index 3c93e78a61b435a7a0338e37f276f5b62f675f94..f1f709e4fdefac25579739a717626efb5f6da837 100644 (file)
@@ -56,7 +56,7 @@ const (
        SyscallP // depicts returns from syscalls
 )
 
-// parseTrace parses, post-processes and verifies the trace.
+// Parse parses, post-processes and verifies the trace.
 func Parse(r io.Reader) ([]*Event, error) {
        rawEvents, err := readTrace(r)
        if err != nil {
@@ -66,6 +66,10 @@ func Parse(r io.Reader) ([]*Event, error) {
        if err != nil {
                return nil, err
        }
+       events, err = removeFutile(events)
+       if err != nil {
+               return nil, err
+       }
        err = postProcessTrace(events)
        if err != nil {
                return nil, err
@@ -265,6 +269,61 @@ func parseEvents(rawEvents []rawEvent) (events []*Event, err error) {
        return
 }
 
+// removeFutile removes all constituents of futile wakeups (block, unblock, start).
+// For example, a goroutine was unblocked on a mutex, but another goroutine got
+// ahead and acquired the mutex before the first goroutine is scheduled,
+// so the first goroutine has to block again. Such wakeups happen on buffered
+// channels and sync.Mutex, but are generally not interesting for end user.
+func removeFutile(events []*Event) ([]*Event, error) {
+       // Two non-trivial aspects:
+       // 1. A goroutine can be preempted during a futile wakeup and migrate to another P.
+       //      We want to remove all of that.
+       // 2. Tracing can start in the middle of a futile wakeup.
+       //      That is, we can see a futile wakeup event w/o the actual wakeup before it.
+       // postProcessTrace runs after us and ensures that we leave the trace in a consistent state.
+
+       // Phase 1: determine futile wakeup sequences.
+       type G struct {
+               futile bool
+               wakeup []*Event // wakeup sequence (subject for removal)
+       }
+       gs := make(map[uint64]G)
+       futile := make(map[*Event]bool)
+       for _, ev := range events {
+               switch ev.Type {
+               case EvGoUnblock:
+                       g := gs[ev.Args[0]]
+                       g.wakeup = []*Event{ev}
+                       gs[ev.Args[0]] = g
+               case EvGoStart, EvGoPreempt, EvFutileWakeup:
+                       g := gs[ev.G]
+                       g.wakeup = append(g.wakeup, ev)
+                       if ev.Type == EvFutileWakeup {
+                               g.futile = true
+                       }
+                       gs[ev.G] = g
+               case EvGoBlock, EvGoBlockSend, EvGoBlockRecv, EvGoBlockSelect, EvGoBlockSync, EvGoBlockCond:
+                       g := gs[ev.G]
+                       if g.futile {
+                               futile[ev] = true
+                               for _, ev1 := range g.wakeup {
+                                       futile[ev1] = true
+                               }
+                       }
+                       delete(gs, ev.G)
+               }
+       }
+
+       // Phase 2: remove futile wakeup sequences.
+       newEvents := events[:0] // overwrite the original slice
+       for _, ev := range events {
+               if !futile[ev] {
+                       newEvents = append(newEvents, ev)
+               }
+       }
+       return newEvents, nil
+}
+
 // postProcessTrace does inter-event verification and information restoration.
 // The resulting trace is guaranteed to be consistent
 // (for example, a P does not run two Gs at the same time, or a G is indeed
@@ -618,7 +677,8 @@ const (
        EvHeapAlloc      = 33 // memstats.heap_alloc change [timestamp, heap_alloc]
        EvNextGC         = 34 // memstats.next_gc change [timestamp, next_gc]
        EvTimerGoroutine = 35 // denotes timer goroutine [timer goroutine id]
-       EvCount          = 36
+       EvFutileWakeup   = 36 // denotes that the revious wakeup of this goroutine was futile [timestamp]
+       EvCount          = 37
 )
 
 var EventDescriptions = [EvCount]struct {
@@ -662,4 +722,5 @@ var EventDescriptions = [EvCount]struct {
        EvHeapAlloc:      {"HeapAlloc", false, []string{"mem"}},
        EvNextGC:         {"NextGC", false, []string{"mem"}},
        EvTimerGoroutine: {"TimerGoroutine", false, []string{"g"}},
+       EvFutileWakeup:   {"FutileWakeup", false, []string{}},
 }
index ba226a1b42b6f9317deb52f07ad51129bba5b394..87e05bdf6cb239cc94d74fae9685e830a9f12503 100644 (file)
@@ -219,7 +219,7 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
        // asynchronous channel
        // wait for some space to write our data
        var t1 int64
-       for c.qcount >= c.dataqsiz {
+       for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup {
                if !block {
                        unlock(&c.lock)
                        return false
@@ -234,7 +234,7 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin
                mysg.elem = nil
                mysg.selectdone = nil
                c.sendq.enqueue(mysg)
-               goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
+               goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3)
 
                // someone woke us up - try again
                if mysg.releasetime > 0 {
@@ -462,7 +462,7 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r
        // asynchronous channel
        // wait for some data to appear
        var t1 int64
-       for c.qcount <= 0 {
+       for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup {
                if c.closed != 0 {
                        selected, received = recvclosed(c, ep)
                        if t1 > 0 {
@@ -488,7 +488,7 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r
                mysg.selectdone = nil
 
                c.recvq.enqueue(mysg)
-               goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
+               goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3)
 
                // someone woke us up - try again
                if mysg.releasetime > 0 {
index 3753e3c4ddbd2a5bc1f097d122336531e69e5ef9..2b85e47993985231d29a8ac63f6ea7459807f506 100644 (file)
@@ -360,3 +360,91 @@ func TestTraceStressStartStop(t *testing.T) {
        }
        <-outerDone
 }
+
+func TestTraceFutileWakeup(t *testing.T) {
+       // The test generates a full-load of futile wakeups on channels,
+       // and ensures that the trace is consistent after their removal.
+       skipTraceTestsIfNeeded(t)
+       buf := new(bytes.Buffer)
+       if err := StartTrace(buf); err != nil {
+               t.Fatalf("failed to start tracing: %v", err)
+       }
+
+       defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(8))
+       c0 := make(chan int, 1)
+       c1 := make(chan int, 1)
+       c2 := make(chan int, 1)
+       const procs = 2
+       var done sync.WaitGroup
+       done.Add(4 * procs)
+       for p := 0; p < procs; p++ {
+               const iters = 1e3
+               go func() {
+                       for i := 0; i < iters; i++ {
+                               runtime.Gosched()
+                               c0 <- 0
+                       }
+                       done.Done()
+               }()
+               go func() {
+                       for i := 0; i < iters; i++ {
+                               runtime.Gosched()
+                               <-c0
+                       }
+                       done.Done()
+               }()
+               go func() {
+                       for i := 0; i < iters; i++ {
+                               runtime.Gosched()
+                               select {
+                               case c1 <- 0:
+                               case c2 <- 0:
+                               }
+                       }
+                       done.Done()
+               }()
+               go func() {
+                       for i := 0; i < iters; i++ {
+                               runtime.Gosched()
+                               select {
+                               case <-c1:
+                               case <-c2:
+                               }
+                       }
+                       done.Done()
+               }()
+       }
+       done.Wait()
+
+       StopTrace()
+       events, _, err := parseTrace(buf)
+       if err != nil {
+               t.Fatalf("failed to parse trace: %v", err)
+       }
+       // Check that (1) trace does not contain EvFutileWakeup events and
+       // (2) there are no consecutive EvGoBlock/EvGCStart/EvGoBlock events
+       // (we call runtime.Gosched between all operations, so these would be futile wakeups).
+       gs := make(map[uint64]int)
+       for _, ev := range events {
+               switch ev.Type {
+               case trace.EvFutileWakeup:
+                       t.Fatalf("found EvFutileWakeup event")
+               case trace.EvGoBlockSend, trace.EvGoBlockRecv, trace.EvGoBlockSelect:
+                       if gs[ev.G] == 2 {
+                               t.Fatalf("goroutine %v blocked on %v at %v right after start",
+                                       ev.G, trace.EventDescriptions[ev.Type].Name, ev.Ts)
+                       }
+                       if gs[ev.G] == 1 {
+                               t.Fatalf("goroutine %v blocked on %v at %v while blocked",
+                                       ev.G, trace.EventDescriptions[ev.Type].Name, ev.Ts)
+                       }
+                       gs[ev.G] = 1
+               case trace.EvGoStart:
+                       if gs[ev.G] == 1 {
+                               gs[ev.G] = 2
+                       }
+               default:
+                       delete(gs, ev.G)
+               }
+       }
+}
index 73fcb439f1d1a253f8a69e21cbbbfed534b74ea4..98ac5a3d61ff95e6efaa9e1532c13ef8d02b5469 100644 (file)
@@ -308,6 +308,7 @@ func selectgoImpl(sel *hselect) (uintptr, uint16) {
                k      *scase
                sglist *sudog
                sgnext *sudog
+               futile byte
        )
 
 loop:
@@ -392,7 +393,7 @@ loop:
 
        // wait for someone to wake us up
        gp.param = nil
-       gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2)
+       gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2)
 
        // someone woke us up
        sellock(sel)
@@ -435,6 +436,7 @@ loop:
        }
 
        if cas == nil {
+               futile = traceFutileWakeup
                goto loop
        }
 
index eb3ceb2f38050288c39765b8f45898c44340c723..7c4d8d3c91e26aa12c7c1f3738ca118991866e3a 100644 (file)
@@ -52,7 +52,8 @@ const (
        traceEvHeapAlloc      = 33 // memstats.heap_alloc change [timestamp, heap_alloc]
        traceEvNextGC         = 34 // memstats.next_gc change [timestamp, next_gc]
        traceEvTimerGoroutine = 35 // denotes timer goroutine [timer goroutine id]
-       traceEvCount          = 36
+       traceEvFutileWakeup   = 36 // denotes that the previous wakeup of this goroutine was futile [timestamp]
+       traceEvCount          = 37
 )
 
 const (
@@ -71,6 +72,13 @@ const (
        traceBytesPerNumber = 10
        // Shift of the number of arguments in the first event byte.
        traceArgCountShift = 6
+       // Flag passed to traceGoPark to denote that the previous wakeup of this
+       // goroutine was futile. For example, a goroutine was unblocked on a mutex,
+       // but another goroutine got ahead and acquired the mutex before the first
+       // goroutine is scheduled, so the first goroutine has to block again.
+       // Such wakeups happen on buffered channels and sync.Mutex,
+       // but are generally not interesting for end user.
+       traceFutileWakeup byte = 128
 )
 
 // trace is global tracing context.
@@ -775,7 +783,10 @@ func traceGoPreempt() {
 }
 
 func traceGoPark(traceEv byte, skip int, gp *g) {
-       traceEvent(traceEv, skip)
+       if traceEv&traceFutileWakeup != 0 {
+               traceEvent(traceEvFutileWakeup, -1)
+       }
+       traceEvent(traceEv & ^traceFutileWakeup, skip)
 }
 
 func traceGoUnpark(gp *g, skip int) {