]> Cypherpunks repositories - gostls13.git/commitdiff
time: allow cancelling of After events.
authorRoger Peppe <rogpeppe@gmail.com>
Tue, 25 Jan 2011 20:25:48 +0000 (12:25 -0800)
committerRob Pike <r@golang.org>
Tue, 25 Jan 2011 20:25:48 +0000 (12:25 -0800)
Also simplify sleeper algorithm and poll
occasionally so redundant sleeper goroutines
will quit sooner.

R=r, niemeyer, r2
CC=golang-dev
https://golang.org/cl/4063043

src/pkg/time/sleep.go
src/pkg/time/sleep_test.go
src/pkg/time/tick_test.go

index 3538775adfb28c9dc0864182308a13296d1a9b28..833552d68412ca22fe0495a66223cd02f9c3d776 100644 (file)
@@ -11,20 +11,40 @@ import (
        "container/heap"
 )
 
-// The event type represents a single After or AfterFunc event.
-type event struct {
-       t        int64       // The absolute time that the event should fire.
-       f        func(int64) // The function to call when the event fires.
-       sleeping bool        // A sleeper is sleeping for this event.
+// The Timer type represents a single event.
+// When the Timer expires, the current time will be sent on C
+// unless the Timer represents an AfterFunc event.
+type Timer struct {
+       C <-chan int64
+       t int64       // The absolute time that the event should fire.
+       f func(int64) // The function to call when the event fires.
+       i int         // The event's index inside eventHeap.
 }
 
-type eventHeap []*event
+type timerHeap []*Timer
 
-var events eventHeap
-var eventMutex sync.Mutex
+// forever is the absolute time (in ns) of an event that is forever away.
+const forever = 1 << 62
+
+// maxSleepTime is the maximum length of time that a sleeper
+// sleeps for before checking if it is defunct.
+const maxSleepTime = 1e9
+
+var (
+       // timerMutex guards the variables inside this var group.
+       timerMutex sync.Mutex
+
+       // timers holds a binary heap of pending events, terminated with a sentinel.
+       timers timerHeap
+
+       // currentSleeper is an ever-incrementing counter which represents
+       // the current sleeper. It allows older sleepers to detect that they are
+       // defunct and exit.
+       currentSleeper int64
+)
 
 func init() {
-       events.Push(&event{1 << 62, nil, true}) // sentinel
+       timers.Push(&Timer{t: forever}) // sentinel
 }
 
 // Sleep pauses the current goroutine for at least ns nanoseconds.
@@ -51,101 +71,133 @@ func sleep(t, ns int64) (int64, os.Error) {
        return t, nil
 }
 
+// NewTimer creates a new Timer that will send
+// the current time on its channel after at least ns nanoseconds.
+func NewTimer(ns int64) *Timer {
+       c := make(chan int64, 1)
+       e := after(ns, func(t int64) { c <- t })
+       e.C = c
+       return e
+}
+
 // After waits at least ns nanoseconds before sending the current time
 // on the returned channel.
+// It is equivalent to NewTimer(ns).C.
 func After(ns int64) <-chan int64 {
-       c := make(chan int64, 1)
-       after(ns, func(t int64) { c <- t })
-       return c
+       return NewTimer(ns).C
 }
 
 // AfterFunc waits at least ns nanoseconds before calling f
-// in its own goroutine.
-func AfterFunc(ns int64, f func()) {
-       after(ns, func(_ int64) {
+// in its own goroutine. It returns a Timer that can
+// be used to cancel the call using its Stop method.
+func AfterFunc(ns int64, f func()) *Timer {
+       return after(ns, func(_ int64) {
                go f()
        })
 }
 
+// Stop prevents the Timer from firing.
+// It returns true if the call stops the timer, false if the timer has already
+// expired or stopped.
+func (e *Timer) Stop() (ok bool) {
+       timerMutex.Lock()
+       // Avoid removing the first event in the queue so that
+       // we don't start a new sleeper unnecessarily.
+       if e.i > 0 {
+               heap.Remove(timers, e.i)
+       }
+       ok = e.f != nil
+       e.f = nil
+       timerMutex.Unlock()
+       return
+}
+
 // after is the implementation of After and AfterFunc.
 // When the current time is after ns, it calls f with the current time.
 // It assumes that f will not block.
-func after(ns int64, f func(int64)) {
+func after(ns int64, f func(int64)) (e *Timer) {
+       now := Nanoseconds()
        t := Nanoseconds() + ns
-       eventMutex.Lock()
-       t0 := events[0].t
-       heap.Push(events, &event{t, f, false})
-       if t < t0 {
-               go sleeper()
+       if ns > 0 && t < now {
+               panic("time: time overflow")
        }
-       eventMutex.Unlock()
+       timerMutex.Lock()
+       t0 := timers[0].t
+       e = &Timer{nil, t, f, -1}
+       heap.Push(timers, e)
+       // Start a new sleeper if the new event is before
+       // the first event in the queue. If the length of time
+       // until the new event is at least maxSleepTime,
+       // then we're guaranteed that the sleeper will wake up
+       // in time to service it, so no new sleeper is needed.
+       if t0 > t && (t0 == forever || ns < maxSleepTime) {
+               currentSleeper++
+               go sleeper(currentSleeper)
+       }
+       timerMutex.Unlock()
+       return
 }
 
-// sleeper continually looks at the earliest event in the queue, marks it
-// as sleeping, waits until it happens, then removes any events
-// in the queue that are due. It stops when it finds an event that is
-// already marked as sleeping. When an event is inserted before the first item,
-// a new sleeper is started.
-//
-// Scheduling vagaries mean that sleepers may not wake up in
-// exactly the order of the events that they are waiting for,
-// but this does not matter as long as there are at least as
-// many sleepers as events marked sleeping (invariant). This ensures that
-// there is always a sleeper to service the remaining events.
-//
-// A sleeper will remove at least the event it has been waiting for
-// unless the event has already been removed by another sleeper.  Both
-// cases preserve the invariant described above.
-func sleeper() {
-       eventMutex.Lock()
-       e := events[0]
-       for !e.sleeping {
-               t := Nanoseconds()
+// sleeper continually looks at the earliest event in the queue, waits until it happens,
+// then removes any events in the queue that are due. It stops when the queue
+// is empty or when another sleeper has been started.
+func sleeper(sleeperId int64) {
+       timerMutex.Lock()
+       e := timers[0]
+       t := Nanoseconds()
+       for e.t != forever {
                if dt := e.t - t; dt > 0 {
-                       e.sleeping = true
-                       eventMutex.Unlock()
-                       if nt, err := sleep(t, dt); err != nil {
-                               // If sleep has encountered an error,
-                               // there's not much we can do. We pretend
-                               // that time really has advanced by the required
-                               // amount and lie to the rest of the system.
-                               t = e.t
-                       } else {
-                               t = nt
+                       if dt > maxSleepTime {
+                               dt = maxSleepTime
+                       }
+                       timerMutex.Unlock()
+                       syscall.Sleep(dt)
+                       timerMutex.Lock()
+                       if currentSleeper != sleeperId {
+                               // Another sleeper has been started, making this one redundant.
+                               break
                        }
-                       eventMutex.Lock()
-                       e = events[0]
                }
+               e = timers[0]
+               t = Nanoseconds()
                for t >= e.t {
-                       e.f(t)
-                       heap.Pop(events)
-                       e = events[0]
+                       if e.f != nil {
+                               e.f(t)
+                               e.f = nil
+                       }
+                       heap.Pop(timers)
+                       e = timers[0]
                }
        }
-       eventMutex.Unlock()
+       timerMutex.Unlock()
 }
 
-func (eventHeap) Len() int {
-       return len(events)
+func (timerHeap) Len() int {
+       return len(timers)
 }
 
-func (eventHeap) Less(i, j int) bool {
-       return events[i].t < events[j].t
+func (timerHeap) Less(i, j int) bool {
+       return timers[i].t < timers[j].t
 }
 
-func (eventHeap) Swap(i, j int) {
-       events[i], events[j] = events[j], events[i]
+func (timerHeap) Swap(i, j int) {
+       timers[i], timers[j] = timers[j], timers[i]
+       timers[i].i = i
+       timers[j].i = j
 }
 
-func (eventHeap) Push(x interface{}) {
-       events = append(events, x.(*event))
+func (timerHeap) Push(x interface{}) {
+       e := x.(*Timer)
+       e.i = len(timers)
+       timers = append(timers, e)
 }
 
-func (eventHeap) Pop() interface{} {
+func (timerHeap) Pop() interface{} {
        // TODO: possibly shrink array.
-       n := len(events) - 1
-       e := events[n]
-       events[n] = nil
-       events = events[0:n]
+       n := len(timers) - 1
+       e := timers[n]
+       timers[n] = nil
+       timers = timers[0:n]
+       e.i = -1
        return e
 }
index 9e36288f886306225914133c8dbcc9fbb774ca3e..4007db561a674e0e2e782ae3eb109e5968356072 100644 (file)
@@ -64,6 +64,18 @@ func BenchmarkAfterFunc(b *testing.B) {
        <-c
 }
 
+func BenchmarkAfter(b *testing.B) {
+       for i := 0; i < b.N; i++ {
+               <-After(1)
+       }
+}
+
+func BenchmarkStop(b *testing.B) {
+       for i := 0; i < b.N; i++ {
+               NewTimer(1e9).Stop()
+       }
+}
+
 func TestAfter(t *testing.T) {
        const delay = int64(100e6)
        start := Nanoseconds()
@@ -94,6 +106,30 @@ func TestAfterTick(t *testing.T) {
        }
 }
 
+func TestAfterStop(t *testing.T) {
+       const msec = 1e6
+       AfterFunc(100*msec, func() {})
+       t0 := NewTimer(50 * msec)
+       c1 := make(chan bool, 1)
+       t1 := AfterFunc(150*msec, func() { c1 <- true })
+       c2 := After(200 * msec)
+       if !t0.Stop() {
+               t.Fatalf("failed to stop event 0")
+       }
+       if !t1.Stop() {
+               t.Fatalf("failed to stop event 1")
+       }
+       <-c2
+       _, ok0 := <-t0.C
+       _, ok1 := <-c1
+       if ok0 || ok1 {
+               t.Fatalf("events were not stopped")
+       }
+       if t1.Stop() {
+               t.Fatalf("Stop returned true twice")
+       }
+}
+
 var slots = []int{5, 3, 6, 6, 6, 1, 1, 2, 7, 9, 4, 8, 0}
 
 type afterResult struct {
index 2a63a0f2b3bd5343d8447623662bf34736a74aec..f2304a14e33cb5e0e7c7a0192014a414dc3d1fc3 100644 (file)
@@ -43,3 +43,14 @@ func TestTeardown(t *testing.T) {
                ticker.Stop()
        }
 }
+
+func BenchmarkTicker(b *testing.B) {
+       ticker := NewTicker(1)
+       b.ResetTimer()
+       b.StartTimer()
+       for i := 0; i < b.N; i++ {
+               <-ticker.C
+       }
+       b.StopTimer()
+       ticker.Stop()
+}