]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: initial scheduler changes for timers on P's
authorIan Lance Taylor <iant@golang.org>
Fri, 5 Apr 2019 23:24:14 +0000 (16:24 -0700)
committerIan Lance Taylor <iant@golang.org>
Mon, 21 Oct 2019 17:23:42 +0000 (17:23 +0000)
Add support to the main scheduler loop for handling timers on P's.
This is not used yet, as timers are not yet put on P's.

Updates #6239
Updates #27707

Change-Id: I6a359df408629f333a9232142ce19e8be8496dae
Reviewed-on: https://go-review.googlesource.com/c/go/+/171826
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
src/runtime/proc.go
src/runtime/runtime2.go
src/runtime/time.go

index c419dee7710846109b0560b367d555e882627d41..1a51b1d83b03e368ec362df32abde2116f3a4e16 100644 (file)
@@ -2221,6 +2221,9 @@ top:
        if _p_.runSafePointFn != 0 {
                runSafePointFn()
        }
+
+       now, pollUntil, _ := checkTimers(_p_, 0)
+
        if fingwait && fingwake {
                if gp := wakefing(); gp != nil {
                        ready(gp, 0, true)
@@ -2266,12 +2269,7 @@ top:
 
        // Steal work from other P's.
        procs := uint32(gomaxprocs)
-       if atomic.Load(&sched.npidle) == procs-1 {
-               // Either GOMAXPROCS=1 or everybody, except for us, is idle already.
-               // New work can appear from returning syscall/cgocall, network or timers.
-               // Neither of that submits to local run queues, so no point in stealing.
-               goto stop
-       }
+       ranTimer := false
        // If number of spinning M's >= number of busy P's, block.
        // This is necessary to prevent excessive CPU consumption
        // when GOMAXPROCS>>1 but the program parallelism is low.
@@ -2288,11 +2286,48 @@ top:
                                goto top
                        }
                        stealRunNextG := i > 2 // first look for ready queues with more than 1 g
-                       if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
+                       p2 := allp[enum.position()]
+                       if _p_ == p2 {
+                               continue
+                       }
+                       if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
                                return gp, false
                        }
+
+                       // Consider stealing timers from p2.
+                       // This call to checkTimers is the only place where
+                       // we hold a lock on a different P's timers.
+                       // Lock contention can be a problem here, so avoid
+                       // grabbing the lock if p2 is running and not marked
+                       // for preemption. If p2 is running and not being
+                       // preempted we assume it will handle its own timers.
+                       if i > 2 && shouldStealTimers(p2) {
+                               tnow, w, ran := checkTimers(p2, now)
+                               now = tnow
+                               if w != 0 && (pollUntil == 0 || w < pollUntil) {
+                                       pollUntil = w
+                               }
+                               if ran {
+                                       // Running the timers may have
+                                       // made an arbitrary number of G's
+                                       // ready and added them to this P's
+                                       // local run queue. That invalidates
+                                       // the assumption of runqsteal
+                                       // that is always has room to add
+                                       // stolen G's. So check now if there
+                                       // is a local G to run.
+                                       if gp, inheritTime := runqget(_p_); gp != nil {
+                                               return gp, inheritTime
+                                       }
+                                       ranTimer = true
+                               }
+                       }
                }
        }
+       if ranTimer {
+               // Running a timer may have made some goroutine ready.
+               goto top
+       }
 
 stop:
 
@@ -2309,6 +2344,12 @@ stop:
                return gp, false
        }
 
+       delta := int64(-1)
+       if pollUntil != 0 {
+               // checkTimers ensures that polluntil > now.
+               delta = pollUntil - now
+       }
+
        // wasm only:
        // If a callback returned and no other goroutine is awake,
        // then pause execution until a callback was triggered.
@@ -2400,14 +2441,16 @@ stop:
        }
 
        // poll network
-       if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
+       if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
+               atomic.Store64(&sched.pollUntil, uint64(pollUntil))
                if _g_.m.p != 0 {
                        throw("findrunnable: netpoll with p")
                }
                if _g_.m.spinning {
                        throw("findrunnable: netpoll with spinning")
                }
-               list := netpoll(-1) // block until new work is available
+               list := netpoll(delta) // block until new work is available
+               atomic.Store64(&sched.pollUntil, 0)
                atomic.Store64(&sched.lastpoll, uint64(nanotime()))
                lock(&sched.lock)
                _p_ = pidleget()
@@ -2431,6 +2474,11 @@ stop:
                        }
                        goto top
                }
+       } else if pollUntil != 0 && netpollinited() {
+               pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
+               if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
+                       netpollBreak()
+               }
        }
        stopm()
        goto top
@@ -2457,6 +2505,22 @@ func pollWork() bool {
        return false
 }
 
+// wakeNetPoller wakes up the thread sleeping in the network poller,
+// if there is one, and if it isn't going to wake up anyhow before
+// the when argument.
+func wakeNetPoller(when int64) {
+       if atomic.Load64(&sched.lastpoll) == 0 {
+               // In findrunnable we ensure that when polling the pollUntil
+               // field is either zero or the time to which the current
+               // poll is expected to run. This can have a spurious wakeup
+               // but should never miss a wakeup.
+               pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
+               if pollerPollUntil == 0 || pollerPollUntil > when {
+                       netpollBreak()
+               }
+       }
+}
+
 func resetspinning() {
        _g_ := getg()
        if !_g_.m.spinning {
@@ -2525,10 +2589,20 @@ top:
                gcstopm()
                goto top
        }
-       if _g_.m.p.ptr().runSafePointFn != 0 {
+       pp := _g_.m.p.ptr()
+       if pp.runSafePointFn != 0 {
                runSafePointFn()
        }
 
+       // Sanity check: if we are spinning, the run queue should be empty.
+       // Check this before calling checkTimers, as that might call
+       // goready to put a ready goroutine on the local run queue.
+       if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
+               throw("schedule: spinning with local work")
+       }
+
+       checkTimers(pp, 0)
+
        var gp *g
        var inheritTime bool
 
@@ -2560,9 +2634,8 @@ top:
        }
        if gp == nil {
                gp, inheritTime = runqget(_g_.m.p.ptr())
-               if gp != nil && _g_.m.spinning {
-                       throw("schedule: spinning with local work")
-               }
+               // We can see gp != nil here even if the M is spinning,
+               // if checkTimers added a local goroutine via goready.
        }
        if gp == nil {
                gp, inheritTime = findrunnable() // blocks until work is available
@@ -2623,6 +2696,60 @@ func dropg() {
        setGNoWB(&_g_.m.curg, nil)
 }
 
+// checkTimers runs any timers for the P that are ready.
+// If now is not 0 it is the current time.
+// It returns the current time or 0 if it is not known,
+// and the time when the next timer should run or 0 if there is no next timer,
+// and reports whether it ran any timers.
+// If the time when the next timer should run is not 0,
+// it is always larger than the returned time.
+// We pass now in and out to avoid extra calls of nanotime.
+//go:yeswritebarrierrec
+func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
+       lock(&pp.timersLock)
+
+       adjusttimers(pp)
+
+       rnow = now
+       if len(pp.timers) > 0 {
+               if rnow == 0 {
+                       rnow = nanotime()
+               }
+               for len(pp.timers) > 0 {
+                       if tw := runtimer(pp, rnow); tw != 0 {
+                               if tw > 0 {
+                                       pollUntil = tw
+                               }
+                               break
+                       }
+                       ran = true
+               }
+       }
+
+       unlock(&pp.timersLock)
+
+       return rnow, pollUntil, ran
+}
+
+// shouldStealTimers reports whether we should try stealing the timers from p2.
+// We don't steal timers from a running P that is not marked for preemption,
+// on the assumption that it will run its own timers. This reduces
+// contention on the timers lock.
+func shouldStealTimers(p2 *p) bool {
+       if p2.status != _Prunning {
+               return true
+       }
+       mp := p2.m.ptr()
+       if mp == nil || mp.locks > 0 {
+               return false
+       }
+       gp := mp.curg
+       if gp == nil || gp.atomicstatus != _Grunning || !gp.preempt {
+               return false
+       }
+       return true
+}
+
 func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
        unlock((*mutex)(lock))
        return true
@@ -4305,6 +4432,13 @@ func checkdead() {
                return
        }
 
+       // There are no goroutines running, so we can look at the P's.
+       for _, _p_ := range allp {
+               if len(_p_.timers) > 0 {
+                       return
+               }
+       }
+
        getg().m.throwing = -1 // do not dump full stacks
        throw("all goroutines are asleep - deadlock!")
 }
@@ -4392,6 +4526,12 @@ func sysmon() {
                                incidlelocked(1)
                        }
                }
+               if timeSleepUntil() < now {
+                       // There are timers that should have already run,
+                       // perhaps because there is an unpreemptible P.
+                       // Try to start an M to run them.
+                       startm(nil, false)
+               }
                // retake P's blocked in syscalls
                // and preempt long running G's
                if retake(now) != 0 {
index dd399e00a69d505f0adc61f417ad2d0d9739a38a..f44cd2fb1436c316107a5cb7688ac2eb6568d4f7 100644 (file)
@@ -598,13 +598,23 @@ type p struct {
 
        runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
 
+       // Lock for timers. We normally access the timers while running
+       // on this P, but the scheduler can also do it from a different P.
+       timersLock mutex
+
+       // Actions to take at some time. This is used to implement the
+       // standard library's time package.
+       // Must hold timersLock to access.
+       timers []*timer
+
        pad cpu.CacheLinePad
 }
 
 type schedt struct {
        // accessed atomically. keep at top to ensure alignment on 32-bit systems.
-       goidgen  uint64
-       lastpoll uint64
+       goidgen   uint64
+       lastpoll  uint64 // time of last network poll, 0 if currently polling
+       pollUntil uint64 // time to which current poll is sleeping
 
        lock mutex
 
index 5521b8a807e4dd023b3e35aa253e0a6705427ced..1bbb5684cbc06681e858f82a6ef48e38a1dfc489 100644 (file)
@@ -325,6 +325,27 @@ func timerproc(tb *timersBucket) {
        }
 }
 
+// adjusttimers looks through the timers in the current P's heap for
+// any timers that have been modified to run earlier, and puts them in
+// the correct place in the heap.
+// The caller must have locked the timers for pp.
+func adjusttimers(pp *p) {
+       if len(pp.timers) == 0 {
+               return
+       }
+       throw("adjusttimers: not yet implemented")
+}
+
+// runtimer examines the first timer in timers. If it is ready based on now,
+// it runs the timer and removes or updates it.
+// Returns 0 if it ran a timer, -1 if there are no more timers, or the time
+// when the first timer should run.
+// The caller must have locked the timers for pp.
+func runtimer(pp *p, now int64) int64 {
+       throw("runtimer: not yet implemented")
+       return -1
+}
+
 func timejump() *g {
        if faketime == 0 {
                return nil