]> Cypherpunks repositories - gostls13.git/commitdiff
time: make After use fewer goroutines and host processes.
authorRoger Peppe <rogpeppe@gmail.com>
Mon, 6 Dec 2010 19:19:30 +0000 (14:19 -0500)
committerRob Pike <r@golang.org>
Mon, 6 Dec 2010 19:19:30 +0000 (14:19 -0500)
With credit to Gustavo Niemeyer, who hinted at this approach
in #go-nuts.

R=adg, rsc, niemeyer, r
CC=golang-dev
https://golang.org/cl/3416043

src/pkg/time/sleep.go
src/pkg/time/sleep_test.go

index 702ced1304afe66fc3eb18729f7731b09fb1f7b6..77b7b4a59354a4049b7e280136a58b24c0759d70 100644 (file)
@@ -7,8 +7,26 @@ package time
 import (
        "os"
        "syscall"
+       "sync"
+       "container/heap"
 )
 
+// The event type represents a single After event.
+type event struct {
+       t        int64        // The absolute time that the event should fire.
+       c        chan<- int64 // The channel to send on.
+       sleeping bool         // A sleeper is sleeping for this event.
+}
+
+type eventHeap []*event
+
+var events eventHeap
+var eventMutex sync.Mutex
+
+func init() {
+       events.Push(&event{1 << 62, nil, true}) // sentinel
+}
+
 // Sleep pauses the current goroutine for at least ns nanoseconds.
 // Higher resolution sleeping may be provided by syscall.Nanosleep 
 // on some operating systems.
@@ -17,18 +35,6 @@ func Sleep(ns int64) os.Error {
        return err
 }
 
-// After waits at least ns nanoseconds before sending the current time
-// on the returned channel.
-func After(ns int64) <-chan int64 {
-       t := Nanoseconds()
-       ch := make(chan int64, 1)
-       go func() {
-               t, _ = sleep(t, ns)
-               ch <- t
-       }()
-       return ch
-}
-
 // sleep takes the current time and a duration,
 // pauses for at least ns nanoseconds, and
 // returns the current time and an error.
@@ -44,3 +50,87 @@ func sleep(t, ns int64) (int64, os.Error) {
        }
        return t, nil
 }
+
+// After waits at least ns nanoseconds before sending the current time
+// on the returned channel.
+func After(ns int64) <-chan int64 {
+       c := make(chan int64, 1)
+       t := ns + Nanoseconds()
+       eventMutex.Lock()
+       t0 := events[0].t
+       heap.Push(events, &event{t, c, false})
+       if t < t0 {
+               go sleeper()
+       }
+       eventMutex.Unlock()
+       return c
+}
+
+// sleeper continually looks at the earliest event in the queue, marks it
+// as sleeping, waits until it happens, then removes any events
+// in the queue that are due. It stops when it finds an event that is
+// already marked as sleeping. When an event is inserted before the first item,
+// a new sleeper is started.
+//
+// Scheduling vagaries mean that sleepers may not wake up in
+// exactly the order of the events that they are waiting for,
+// but this does not matter as long as there are at least as
+// many sleepers as events marked sleeping (invariant). This ensures that
+// there is always a sleeper to service the remaining events.
+//
+// A sleeper will remove at least the event it has been waiting for
+// unless the event has already been removed by another sleeper.  Both
+// cases preserve the invariant described above.
+func sleeper() {
+       eventMutex.Lock()
+       e := events[0]
+       for !e.sleeping {
+               t := Nanoseconds()
+               if dt := e.t - t; dt > 0 {
+                       e.sleeping = true
+                       eventMutex.Unlock()
+                       if nt, err := sleep(t, dt); err != nil {
+                               // If sleep has encountered an error,
+                               // there's not much we can do. We pretend
+                               // that time really has advanced by the required
+                               // amount and lie to the rest of the system.
+                               t = e.t
+                       } else {
+                               t = nt
+                       }
+                       eventMutex.Lock()
+                       e = events[0]
+               }
+               for t >= e.t {
+                       e.c <- t
+                       heap.Pop(events)
+                       e = events[0]
+               }
+       }
+       eventMutex.Unlock()
+}
+
+func (eventHeap) Len() int {
+       return len(events)
+}
+
+func (eventHeap) Less(i, j int) bool {
+       return events[i].t < events[j].t
+}
+
+func (eventHeap) Swap(i, j int) {
+       events[i], events[j] = events[j], events[i]
+}
+
+func (eventHeap) Push(x interface{}) {
+       events = append(events, x.(*event))
+}
+
+func (eventHeap) Pop() interface{} {
+       // TODO: possibly shrink array.
+       n := len(events) - 1
+       e := events[n]
+       events[n] = nil
+       events = events[0:n]
+       return e
+}
index 4934a38691310db4502f7fe02ba4374f28b48036..9fd38d18d1a5a36c2e7eeec03610f7f74f1f3063 100644 (file)
@@ -8,6 +8,7 @@ import (
        "os"
        "syscall"
        "testing"
+       "sort"
        . "time"
 )
 
@@ -36,3 +37,60 @@ func TestAfter(t *testing.T) {
                t.Fatalf("After(%d) expect >= %d, got %d", delay, min, end)
        }
 }
+
+func TestAfterTick(t *testing.T) {
+       const (
+               Delta = 100 * 1e6
+               Count = 10
+       )
+       t0 := Nanoseconds()
+       for i := 0; i < Count; i++ {
+               <-After(Delta)
+       }
+       t1 := Nanoseconds()
+       ns := t1 - t0
+       target := int64(Delta * Count)
+       slop := target * 2 / 10
+       if ns < target-slop || ns > target+slop {
+               t.Fatalf("%d ticks of %g ns took %g ns, expected %g", Count, float64(Delta), float64(ns), float64(target))
+       }
+}
+
+var slots = []int{5, 3, 6, 6, 6, 1, 1, 2, 7, 9, 4, 8, 0}
+
+type afterResult struct {
+       slot int
+       t    int64
+}
+
+func await(slot int, result chan<- afterResult, ac <-chan int64) {
+       result <- afterResult{slot, <-ac}
+}
+
+func TestAfterQueuing(t *testing.T) {
+       const (
+               Delta = 100 * 1e6
+       )
+       // make the result channel buffered because we don't want
+       // to depend on channel queueing semantics that might
+       // possibly change in the future.
+       result := make(chan afterResult, len(slots))
+
+       t0 := Nanoseconds()
+       for _, slot := range slots {
+               go await(slot, result, After(int64(slot)*Delta))
+       }
+       sort.SortInts(slots)
+       for _, slot := range slots {
+               r := <-result
+               if r.slot != slot {
+                       t.Fatalf("after queue got slot %d, expected %d", r.slot, slot)
+               }
+               ns := r.t - t0
+               target := int64(slot * Delta)
+               slop := int64(Delta) / 10
+               if ns < target-slop || ns > target+slop {
+                       t.Fatalf("after queue slot %d arrived at %g, expected %g", slot, float64(ns), float64(target))
+               }
+       }
+}