import (
. "context"
"fmt"
+ "runtime"
+ "sync"
"testing"
+ "time"
)
-func BenchmarkContextCancelTree(b *testing.B) {
+func BenchmarkWithTimeout(b *testing.B) {
+ for concurrency := 40; concurrency <= 4e5; concurrency *= 100 {
+ name := fmt.Sprintf("concurrency=%d", concurrency)
+ b.Run(name, func(b *testing.B) {
+ benchmarkWithTimeout(b, concurrency)
+ })
+ }
+}
+
+func benchmarkWithTimeout(b *testing.B, concurrentContexts int) {
+ gomaxprocs := runtime.GOMAXPROCS(0)
+ perPContexts := concurrentContexts / gomaxprocs
+ root := Background()
+
+ // Generate concurrent contexts.
+ var wg sync.WaitGroup
+ ccf := make([][]CancelFunc, gomaxprocs)
+ for i := range ccf {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ cf := make([]CancelFunc, perPContexts)
+ for j := range cf {
+ _, cf[j] = WithTimeout(root, time.Hour)
+ }
+ ccf[i] = cf
+ }(i)
+ }
+ wg.Wait()
+
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ wcf := make([]CancelFunc, 10)
+ for pb.Next() {
+ for i := range wcf {
+ _, wcf[i] = WithTimeout(root, time.Hour)
+ }
+ for _, f := range wcf {
+ f()
+ }
+ }
+ })
+ b.StopTimer()
+
+ for _, cf := range ccf {
+ for _, f := range cf {
+ f()
+ }
+ }
+}
+
+func BenchmarkCancelTree(b *testing.B) {
depths := []int{1, 10, 100, 1000}
for _, d := range depths {
b.Run(fmt.Sprintf("depth=%d", d), func(b *testing.B) {
package runtime
-import "unsafe"
+import (
+ "runtime/internal/sys"
+ "unsafe"
+)
// Package time knows the layout of this structure.
// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
// For GOOS=nacl, package syscall knows the layout of this structure.
// If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer.
type timer struct {
- i int // heap index
+ tb *timersBucket // the bucket the timer lives in
+ i int // heap index
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
// each time calling f(arg, now) in the timer goroutine, so f must be
seq uintptr
}
-var timers struct {
+// timersLen is the length of timers array.
+//
+// Ideally, this would be set to GOMAXPROCS, but that would require
+// dynamic reallocation
+//
+// The current value is a compromise between memory usage and performance
+// that should cover the majority of GOMAXPROCS values used in the wild.
+const timersLen = 64
+
+// timers contains "per-P" timer heaps.
+//
+// Timers are queued into timersBucket associated with the current P,
+// so each P may work with its own timers independently of other P instances.
+//
+// Each timersBucket may be associated with multiple P
+// if GOMAXPROCS > timersLen.
+var timers [timersLen]struct {
+ timersBucket
+
+ // The padding should eliminate false sharing
+ // between timersBucket values.
+ pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
+}
+
+func (t *timer) assignBucket() *timersBucket {
+ id := uint8(getg().m.p.ptr().id) % timersLen
+ t.tb = &timers[id].timersBucket
+ return t.tb
+}
+
+type timersBucket struct {
lock mutex
gp *g
created bool
return
}
- t := getg().timer
+ gp := getg()
+ t := gp.timer
if t == nil {
t = new(timer)
- getg().timer = t
+ gp.timer = t
}
*t = timer{}
t.when = nanotime() + ns
t.f = goroutineReady
- t.arg = getg()
- lock(&timers.lock)
- addtimerLocked(t)
- goparkunlock(&timers.lock, "sleep", traceEvGoSleep, 2)
+ t.arg = gp
+ tb := t.assignBucket()
+ lock(&tb.lock)
+ tb.addtimerLocked(t)
+ goparkunlock(&tb.lock, "sleep", traceEvGoSleep, 2)
}
// startTimer adds t to the timer heap.
}
func addtimer(t *timer) {
- lock(&timers.lock)
- addtimerLocked(t)
- unlock(&timers.lock)
+ tb := t.assignBucket()
+ lock(&tb.lock)
+ tb.addtimerLocked(t)
+ unlock(&tb.lock)
}
// Add a timer to the heap and start or kick timerproc if the new timer is
// earlier than any of the others.
// Timers are locked.
-func addtimerLocked(t *timer) {
+func (tb *timersBucket) addtimerLocked(t *timer) {
// when must never be negative; otherwise timerproc will overflow
// during its delta calculation and never expire other runtime timers.
if t.when < 0 {
t.when = 1<<63 - 1
}
- t.i = len(timers.t)
- timers.t = append(timers.t, t)
- siftupTimer(t.i)
+ t.i = len(tb.t)
+ tb.t = append(tb.t, t)
+ tb.siftupTimer(t.i)
if t.i == 0 {
// siftup moved to top: new earliest deadline.
- if timers.sleeping {
- timers.sleeping = false
- notewakeup(&timers.waitnote)
+ if tb.sleeping {
+ tb.sleeping = false
+ notewakeup(&tb.waitnote)
}
- if timers.rescheduling {
- timers.rescheduling = false
- goready(timers.gp, 0)
+ if tb.rescheduling {
+ tb.rescheduling = false
+ goready(tb.gp, 0)
}
}
- if !timers.created {
- timers.created = true
- go timerproc()
+ if !tb.created {
+ tb.created = true
+ go timerproc(tb)
}
}
// Delete timer t from the heap.
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {
- // Dereference t so that any panic happens before the lock is held.
- // Discard result, because t might be moving in the heap.
- _ = t.i
+ tb := t.tb
- lock(&timers.lock)
+ lock(&tb.lock)
// t may not be registered anymore and may have
// a bogus i (typically 0, if generated by Go).
// Verify it before proceeding.
i := t.i
- last := len(timers.t) - 1
- if i < 0 || i > last || timers.t[i] != t {
- unlock(&timers.lock)
+ last := len(tb.t) - 1
+ if i < 0 || i > last || tb.t[i] != t {
+ unlock(&tb.lock)
return false
}
if i != last {
- timers.t[i] = timers.t[last]
- timers.t[i].i = i
+ tb.t[i] = tb.t[last]
+ tb.t[i].i = i
}
- timers.t[last] = nil
- timers.t = timers.t[:last]
+ tb.t[last] = nil
+ tb.t = tb.t[:last]
if i != last {
- siftupTimer(i)
- siftdownTimer(i)
+ tb.siftupTimer(i)
+ tb.siftdownTimer(i)
}
- unlock(&timers.lock)
+ unlock(&tb.lock)
return true
}
// Timerproc runs the time-driven events.
-// It sleeps until the next event in the timers heap.
+// It sleeps until the next event in the tb heap.
// If addtimer inserts a new earlier event, it wakes timerproc early.
-func timerproc() {
- timers.gp = getg()
+func timerproc(tb *timersBucket) {
+ tb.gp = getg()
for {
- lock(&timers.lock)
- timers.sleeping = false
+ lock(&tb.lock)
+ tb.sleeping = false
now := nanotime()
delta := int64(-1)
for {
- if len(timers.t) == 0 {
+ if len(tb.t) == 0 {
delta = -1
break
}
- t := timers.t[0]
+ t := tb.t[0]
delta = t.when - now
if delta > 0 {
break
if t.period > 0 {
// leave in heap but adjust next time to fire
t.when += t.period * (1 + -delta/t.period)
- siftdownTimer(0)
+ tb.siftdownTimer(0)
} else {
// remove from heap
- last := len(timers.t) - 1
+ last := len(tb.t) - 1
if last > 0 {
- timers.t[0] = timers.t[last]
- timers.t[0].i = 0
+ tb.t[0] = tb.t[last]
+ tb.t[0].i = 0
}
- timers.t[last] = nil
- timers.t = timers.t[:last]
+ tb.t[last] = nil
+ tb.t = tb.t[:last]
if last > 0 {
- siftdownTimer(0)
+ tb.siftdownTimer(0)
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
- unlock(&timers.lock)
+ unlock(&tb.lock)
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
f(arg, seq)
- lock(&timers.lock)
+ lock(&tb.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
- timers.rescheduling = true
- goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
+ tb.rescheduling = true
+ goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
continue
}
// At least one timer pending. Sleep until then.
- timers.sleeping = true
- timers.sleepUntil = now + delta
- noteclear(&timers.waitnote)
- unlock(&timers.lock)
- notetsleepg(&timers.waitnote, delta)
+ tb.sleeping = true
+ tb.sleepUntil = now + delta
+ noteclear(&tb.waitnote)
+ unlock(&tb.lock)
+ notetsleepg(&tb.waitnote, delta)
}
}
return nil
}
- lock(&timers.lock)
- if !timers.created || len(timers.t) == 0 {
- unlock(&timers.lock)
+ for i := range timers {
+ lock(&timers[i].lock)
+ }
+ gp := timejumpLocked()
+ for i := range timers {
+ unlock(&timers[i].lock)
+ }
+
+ return gp
+}
+
+func timejumpLocked() *g {
+ // Determine a timer bucket with minimum when.
+ var minT *timer
+ for i := range timers {
+ tb := &timers[i]
+ if !tb.created || len(tb.t) == 0 {
+ continue
+ }
+ t := tb.t[0]
+ if minT == nil || t.when < minT.when {
+ minT = t
+ }
+ }
+ if minT == nil || minT.when <= faketime {
+ return nil
+ }
+
+ faketime = minT.when
+ tb := minT.tb
+ if !tb.rescheduling {
return nil
}
+ tb.rescheduling = false
+ return tb.gp
+}
+
+func timeSleepUntil() int64 {
+ next := int64(1<<63 - 1)
- var gp *g
- if faketime < timers.t[0].when {
- faketime = timers.t[0].when
- if timers.rescheduling {
- timers.rescheduling = false
- gp = timers.gp
+ // Determine minimum sleepUntil across all the timer buckets.
+ //
+ // The function can not return a precise answer,
+ // as another timer may pop in as soon as timers have been unlocked.
+ // So lock the timers one by one instead of all at once.
+ for i := range timers {
+ tb := &timers[i]
+
+ lock(&tb.lock)
+ if tb.sleeping && tb.sleepUntil < next {
+ next = tb.sleepUntil
}
+ unlock(&tb.lock)
}
- unlock(&timers.lock)
- return gp
+
+ return next
}
// Heap maintenance algorithms.
-func siftupTimer(i int) {
- t := timers.t
+func (tb *timersBucket) siftupTimer(i int) {
+ t := tb.t
when := t[i].when
tmp := t[i]
for i > 0 {
}
}
-func siftdownTimer(i int) {
- t := timers.t
+func (tb *timersBucket) siftdownTimer(i int) {
+ t := tb.t
n := len(t)
when := t[i].when
tmp := t[i]
}
func benchmark(b *testing.B, bench func(n int)) {
- garbage := make([]*Timer, 1<<17)
- for i := 0; i < len(garbage); i++ {
- garbage[i] = AfterFunc(Hour, nil)
+
+ // Create equal number of garbage timers on each P before starting
+ // the benchmark.
+ var wg sync.WaitGroup
+ garbageAll := make([][]*Timer, runtime.GOMAXPROCS(0))
+ for i := range garbageAll {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ garbage := make([]*Timer, 1<<15)
+ for j := range garbage {
+ garbage[j] = AfterFunc(Hour, nil)
+ }
+ garbageAll[i] = garbage
+ }(i)
}
- b.ResetTimer()
+ wg.Wait()
+ b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
bench(1000)
}
})
-
b.StopTimer()
- for i := 0; i < len(garbage); i++ {
- garbage[i].Stop()
+
+ for _, garbage := range garbageAll {
+ for _, t := range garbage {
+ t.Stop()
+ }
}
}
})
}
+func BenchmarkReset(b *testing.B) {
+ benchmark(b, func(n int) {
+ t := NewTimer(Hour)
+ for i := 0; i < n; i++ {
+ t.Reset(Hour)
+ }
+ t.Stop()
+ })
+}
+
+func BenchmarkSleep(b *testing.B) {
+ benchmark(b, func(n int) {
+ var wg sync.WaitGroup
+ wg.Add(n)
+ for i := 0; i < n; i++ {
+ go func() {
+ Sleep(Nanosecond)
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ })
+}
+
func TestAfter(t *testing.T) {
const delay = 100 * Millisecond
start := Now()