]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: adjust netpollWaiters after goroutines are ready
authorIan Lance Taylor <iant@golang.org>
Wed, 19 Jul 2023 23:09:35 +0000 (16:09 -0700)
committerGopher Robot <gobot@golang.org>
Thu, 20 Jul 2023 15:45:57 +0000 (15:45 +0000)
The runtime was adjusting netpollWaiters before the waiting
goroutines were marked as ready. This could cause the scheduler
to report a deadlock because there were no goroutines ready to run.
Keeping netpollWaiters non-zero ensures that at least one goroutine
will call netpoll(-1) from findRunnable.

This does mean that if a program has network activity for a while
and then never has it again, and also has no timers, then we can leave
an M stranded in a call to netpoll from which it will never return.
At least this won't be a common case. And it's not new; this has been
a potential problem for some time.

Fixes #61454

Change-Id: I17c7f891c2bb1262fda12c6929664e64686463c8
Reviewed-on: https://go-review.googlesource.com/c/go/+/511455
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Ian Lance Taylor <iant@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
Auto-Submit: Ian Lance Taylor <iant@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
src/runtime/netpoll.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_epoll.go
src/runtime/netpoll_fake.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_solaris.go
src/runtime/netpoll_stub.go
src/runtime/netpoll_wasip1.go
src/runtime/netpoll_windows.go
src/runtime/proc.go

index 6877b2c350f0456102b9065ee9724d4af1fd96ce..9c2e40ce8a569da34a7899d8e68b5f31ea83fdc7 100644 (file)
@@ -26,10 +26,12 @@ import (
 // func netpollclose(fd uintptr) int32
 //     Disable notifications for fd. Return an errno value.
 //
-// func netpoll(delta int64) gList
+// func netpoll(delta int64) (gList, int32)
 //     Poll the network. If delta < 0, block indefinitely. If delta == 0,
 //     poll without blocking. If delta > 0, block for up to delta nanoseconds.
-//     Return a list of goroutines built by calling netpollready.
+//     Return a list of goroutines built by calling netpollready,
+//     and a delta to add to netpollWaiters when all goroutines are ready.
+//     This will never return an empty list with a non-zero delta.
 //
 // func netpollBreak()
 //     Wake up the network poller, assumed to be blocked in netpoll.
@@ -426,12 +428,13 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
        }
        // If we set the new deadline in the past, unblock currently pending IO if any.
        // Note that pd.publishInfo has already been called, above, immediately after modifying rd and wd.
+       delta := int32(0)
        var rg, wg *g
        if pd.rd < 0 {
-               rg = netpollunblock(pd, 'r', false)
+               rg = netpollunblock(pd, 'r', false, &delta)
        }
        if pd.wd < 0 {
-               wg = netpollunblock(pd, 'w', false)
+               wg = netpollunblock(pd, 'w', false, &delta)
        }
        unlock(&pd.lock)
        if rg != nil {
@@ -440,6 +443,7 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
        if wg != nil {
                netpollgoready(wg, 3)
        }
+       netpollAdjustWaiters(delta)
 }
 
 //go:linkname poll_runtime_pollUnblock internal/poll.runtime_pollUnblock
@@ -453,8 +457,9 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
        pd.wseq++
        var rg, wg *g
        pd.publishInfo()
-       rg = netpollunblock(pd, 'r', false)
-       wg = netpollunblock(pd, 'w', false)
+       delta := int32(0)
+       rg = netpollunblock(pd, 'r', false, &delta)
+       wg = netpollunblock(pd, 'w', false, &delta)
        if pd.rt.f != nil {
                deltimer(&pd.rt)
                pd.rt.f = nil
@@ -470,6 +475,7 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
        if wg != nil {
                netpollgoready(wg, 3)
        }
+       netpollAdjustWaiters(delta)
 }
 
 // netpollready is called by the platform-specific netpoll function.
@@ -478,16 +484,19 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
 // from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
 // whether the fd is ready for reading or writing or both.
 //
+// This returns a delta to apply to netpollWaiters.
+//
 // This may run while the world is stopped, so write barriers are not allowed.
 //
 //go:nowritebarrier
-func netpollready(toRun *gList, pd *pollDesc, mode int32) {
+func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
+       delta := int32(0)
        var rg, wg *g
        if mode == 'r' || mode == 'r'+'w' {
-               rg = netpollunblock(pd, 'r', true)
+               rg = netpollunblock(pd, 'r', true, &delta)
        }
        if mode == 'w' || mode == 'r'+'w' {
-               wg = netpollunblock(pd, 'w', true)
+               wg = netpollunblock(pd, 'w', true, &delta)
        }
        if rg != nil {
                toRun.push(rg)
@@ -495,6 +504,7 @@ func netpollready(toRun *gList, pd *pollDesc, mode int32) {
        if wg != nil {
                toRun.push(wg)
        }
+       return delta
 }
 
 func netpollcheckerr(pd *pollDesc, mode int32) int {
@@ -520,7 +530,7 @@ func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
                // Bump the count of goroutines waiting for the poller.
                // The scheduler uses this to decide whether to block
                // waiting for the poller if there is nothing else to do.
-               netpollWaiters.Add(1)
+               netpollAdjustWaiters(1)
        }
        return r
 }
@@ -570,7 +580,13 @@ func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
        return old == pdReady
 }
 
-func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
+// netpollunblock moves either pd.rg (if mode == 'r') or
+// pd.wg (if mode == 'w') into the pdReady state.
+// This returns any goroutine blocked on pd.{rg,wg}.
+// It adds any adjustment to netpollWaiters to *delta;
+// this adjustment should be applied after the goroutine has
+// been marked ready.
+func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
        gpp := &pd.rg
        if mode == 'w' {
                gpp = &pd.wg
@@ -594,7 +610,7 @@ func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
                        if old == pdWait {
                                old = pdNil
                        } else if old != pdNil {
-                               netpollWaiters.Add(-1)
+                               *delta -= 1
                        }
                        return (*g)(unsafe.Pointer(old))
                }
@@ -614,6 +630,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
                unlock(&pd.lock)
                return
        }
+       delta := int32(0)
        var rg *g
        if read {
                if pd.rd <= 0 || pd.rt.f == nil {
@@ -621,7 +638,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
                }
                pd.rd = -1
                pd.publishInfo()
-               rg = netpollunblock(pd, 'r', false)
+               rg = netpollunblock(pd, 'r', false, &delta)
        }
        var wg *g
        if write {
@@ -630,7 +647,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
                }
                pd.wd = -1
                pd.publishInfo()
-               wg = netpollunblock(pd, 'w', false)
+               wg = netpollunblock(pd, 'w', false, &delta)
        }
        unlock(&pd.lock)
        if rg != nil {
@@ -639,6 +656,7 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
        if wg != nil {
                netpollgoready(wg, 0)
        }
+       netpollAdjustWaiters(delta)
 }
 
 func netpollDeadline(arg any, seq uintptr) {
@@ -653,6 +671,18 @@ func netpollWriteDeadline(arg any, seq uintptr) {
        netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
 }
 
+// netpollAnyWaiters reports whether any goroutines are waiting for I/O.
+func netpollAnyWaiters() bool {
+       return netpollWaiters.Load() > 0
+}
+
+// netpollAdjustWaiters adds delta to netpollWaiters.
+func netpollAdjustWaiters(delta int32) {
+       if delta != 0 {
+               netpollWaiters.Add(delta)
+       }
+}
+
 func (c *pollCache) alloc() *pollDesc {
        lock(&c.lock)
        if c.first == nil {
index fad976b9324f134e4c2dc695a91614471cee020e..a34b4d8bcfba88301fa84407bc8956c6ee0e1953 100644 (file)
@@ -154,13 +154,13 @@ func netpollBreak() {
 // delay > 0: block for up to that many nanoseconds
 //
 //go:nowritebarrierrec
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
        var timeout uintptr
        if delay < 0 {
                timeout = ^uintptr(0)
        } else if delay == 0 {
                // TODO: call poll with timeout == 0
-               return gList{}
+               return gList{}, 0
        } else if delay < 1e6 {
                timeout = 1
        } else if delay < 1e15 {
@@ -186,7 +186,7 @@ retry:
                // If a timed sleep was interrupted, just return to
                // recalculate how long we should sleep now.
                if timeout > 0 {
-                       return gList{}
+                       return gList{}, 0
                }
                goto retry
        }
@@ -206,6 +206,7 @@ retry:
                n--
        }
        var toRun gList
+       delta := int32(0)
        for i := 1; i < len(pfds) && n > 0; i++ {
                pfd := &pfds[i]
 
@@ -220,10 +221,10 @@ retry:
                }
                if mode != 0 {
                        pds[i].setEventErr(pfd.revents == _POLLERR, 0)
-                       netpollready(&toRun, pds[i], mode)
+                       delta += netpollready(&toRun, pds[i], mode)
                        n--
                }
        }
        unlock(&mtxset)
-       return toRun
+       return toRun, delta
 }
index e29b64dc9c5d18611522cfa7f2d1ab550b132853..cda19fbc278790de815aad1616f33a76a65ec401 100644 (file)
@@ -95,9 +95,9 @@ func netpollBreak() {
 // delay < 0: blocks indefinitely
 // delay == 0: does not block, just polls
 // delay > 0: block for up to that many nanoseconds
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
        if epfd == -1 {
-               return gList{}
+               return gList{}, 0
        }
        var waitms int32
        if delay < 0 {
@@ -124,11 +124,12 @@ retry:
                // If a timed sleep was interrupted, just return to
                // recalculate how long we should sleep now.
                if waitms > 0 {
-                       return gList{}
+                       return gList{}, 0
                }
                goto retry
        }
        var toRun gList
+       delta := int32(0)
        for i := int32(0); i < n; i++ {
                ev := events[i]
                if ev.Events == 0 {
@@ -164,9 +165,9 @@ retry:
                        tag := tp.tag()
                        if pd.fdseq.Load() == tag {
                                pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
-                               netpollready(&toRun, pd, mode)
+                               delta += netpollready(&toRun, pd, mode)
                        }
                }
        }
-       return toRun
+       return toRun, delta
 }
index 5319561779158d0d96bc3aa36799c2119af14417..41f86a85e3b2b2a946edd16c51242a404ef8657a 100644 (file)
@@ -30,6 +30,6 @@ func netpollarm(pd *pollDesc, mode int) {
 func netpollBreak() {
 }
 
-func netpoll(delay int64) gList {
-       return gList{}
+func netpoll(delay int64) (gList, int32) {
+       return gList{}, 0
 }
index 3af45e689261639053c8e9f2ffe07f142a1e72ef..33b9815965068d29fe4be2a862f1d46026e8a9a1 100644 (file)
@@ -118,9 +118,9 @@ func netpollBreak() {
 // delay < 0: blocks indefinitely
 // delay == 0: does not block, just polls
 // delay > 0: block for up to that many nanoseconds
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
        if kq == -1 {
-               return gList{}
+               return gList{}, 0
        }
        var tp *timespec
        var ts timespec
@@ -147,11 +147,12 @@ retry:
                // If a timed sleep was interrupted, just return to
                // recalculate how long we should sleep now.
                if delay > 0 {
-                       return gList{}
+                       return gList{}, 0
                }
                goto retry
        }
        var toRun gList
+       delta := int32(0)
        for i := 0; i < int(n); i++ {
                ev := &events[i]
 
@@ -208,8 +209,8 @@ retry:
                                }
                        }
                        pd.setEventErr(ev.flags == _EV_ERROR, tag)
-                       netpollready(&toRun, pd, mode)
+                       delta += netpollready(&toRun, pd, mode)
                }
        }
-       return toRun
+       return toRun, delta
 }
index 13c7ffc2ca3a3721fe26bcc1ab66ff86ee40aa38..41f145c86692eedc4b0a52fc9c81569d01201644 100644 (file)
@@ -219,9 +219,9 @@ func netpollBreak() {
 // delay < 0: blocks indefinitely
 // delay == 0: does not block, just polls
 // delay > 0: block for up to that many nanoseconds
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
        if portfd == -1 {
-               return gList{}
+               return gList{}, 0
        }
 
        var wait *timespec
@@ -259,12 +259,13 @@ retry:
                // If a timed sleep was interrupted and there are no events,
                // just return to recalculate how long we should sleep now.
                if delay > 0 {
-                       return gList{}
+                       return gList{}, 0
                }
                goto retry
        }
 
        var toRun gList
+       delta := int32(0)
        for i := 0; i < int(n); i++ {
                ev := &events[i]
 
@@ -324,9 +325,9 @@ retry:
                        // about the event port on SmartOS.
                        //
                        // See golang.org/x/issue/30840.
-                       netpollready(&toRun, pd, mode)
+                       delta += netpollready(&toRun, pd, mode)
                }
        }
 
-       return toRun
+       return toRun, delta
 }
index 14cf0c327fd7588122cb2153476cdd6e8e94abcf..d950661acf3dd12520ae8de3cf8080ac16aa5563 100644 (file)
@@ -9,7 +9,6 @@ package runtime
 import "runtime/internal/atomic"
 
 var netpollInited atomic.Uint32
-var netpollWaiters atomic.Uint32
 
 var netpollStubLock mutex
 var netpollNote note
@@ -34,7 +33,7 @@ func netpollBreak() {
 
 // Polls for ready network connections.
 // Returns list of goroutines that become runnable.
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
        // Implementation for platforms that do not support
        // integrated network poller.
        if delay != 0 {
@@ -53,9 +52,16 @@ func netpoll(delay int64) gList {
                // (eg when running TestNetpollBreak).
                osyield()
        }
-       return gList{}
+       return gList{}, 0
 }
 
 func netpollinited() bool {
        return netpollInited.Load() != 0
 }
+
+func netpollAnyWaiters() bool {
+       return false
+}
+
+func netpollAdjustWaiters(delta int32) {
+}
index 677287b30f0ca7e6ccb385963542ab6c4be717ca..9903726809f8909dd75612d8d450aed0f44b3064 100644 (file)
@@ -184,7 +184,7 @@ func netpollclose(fd uintptr) int32 {
 
 func netpollBreak() {}
 
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
        lock(&mtx)
 
        // If delay >= 0, we include a subscription of type Clock that we use as
@@ -201,7 +201,7 @@ func netpoll(delay int64) gList {
 
        if len(pollsubs) == 0 {
                unlock(&mtx)
-               return gList{}
+               return gList{}, 0
        }
 
        evts = evts[:len(pollsubs)]
@@ -221,12 +221,13 @@ retry:
                // recalculate how long we should sleep now.
                if delay > 0 {
                        unlock(&mtx)
-                       return gList{}
+                       return gList{}, 0
                }
                goto retry
        }
 
        var toRun gList
+       delta := int32(0)
        for i := 0; i < int(nevents); i++ {
                e := &evts[i]
                if e.typ == eventtypeClock {
@@ -245,10 +246,10 @@ retry:
                        pd := (*pollDesc)(unsafe.Pointer(uintptr(e.userdata)))
                        netpolldisarm(pd, mode)
                        pd.setEventErr(e.error != 0, 0)
-                       netpollready(&toRun, pd, mode)
+                       delta += netpollready(&toRun, pd, mode)
                }
        }
 
        unlock(&mtx)
-       return toRun
+       return toRun, delta
 }
index bb77d8d045efc51fb2ee61227383b4a1c8ec5b67..484a9e85b2d61113dc45247424ef3e149a69f777 100644 (file)
@@ -84,7 +84,7 @@ func netpollBreak() {
 // delay < 0: blocks indefinitely
 // delay == 0: does not block, just polls
 // delay > 0: block for up to that many nanoseconds
-func netpoll(delay int64) gList {
+func netpoll(delay int64) (gList, int32) {
        var entries [64]overlappedEntry
        var wait, qty, flags, n, i uint32
        var errno int32
@@ -94,7 +94,7 @@ func netpoll(delay int64) gList {
        mp := getg().m
 
        if iocphandle == _INVALID_HANDLE_VALUE {
-               return gList{}
+               return gList{}, 0
        }
        if delay < 0 {
                wait = _INFINITE
@@ -121,12 +121,13 @@ func netpoll(delay int64) gList {
                mp.blocked = false
                errno = int32(getlasterror())
                if errno == _WAIT_TIMEOUT {
-                       return gList{}
+                       return gList{}, 0
                }
                println("runtime: GetQueuedCompletionStatusEx failed (errno=", errno, ")")
                throw("runtime: netpoll failed")
        }
        mp.blocked = false
+       delta := int32(0)
        for i = 0; i < n; i++ {
                op = entries[i].op
                if op != nil && op.pd == entries[i].key {
@@ -135,7 +136,7 @@ func netpoll(delay int64) gList {
                        if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 {
                                errno = int32(getlasterror())
                        }
-                       handlecompletion(&toRun, op, errno, qty)
+                       delta += handlecompletion(&toRun, op, errno, qty)
                } else {
                        netpollWakeSig.Store(0)
                        if delay == 0 {
@@ -145,10 +146,10 @@ func netpoll(delay int64) gList {
                        }
                }
        }
-       return toRun
+       return toRun, delta
 }
 
-func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) {
+func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) int32 {
        mode := op.mode
        if mode != 'r' && mode != 'w' {
                println("runtime: GetQueuedCompletionStatusEx returned invalid mode=", mode)
@@ -156,5 +157,5 @@ func handlecompletion(toRun *gList, op *net_op, errno int32, qty uint32) {
        }
        op.errno = errno
        op.qty = qty
-       netpollready(toRun, op.pd, mode)
+       return netpollready(toRun, op.pd, mode)
 }
index 9fd200ea329021ddd131e03afaae5855dcfc9edd..f4af26f172dd27bf344fd298c67628f418e0d3ad 100644 (file)
@@ -1435,8 +1435,9 @@ func startTheWorldWithSema() int64 {
 
        mp := acquirem() // disable preemption because it can be holding p in a local var
        if netpollinited() {
-               list := netpoll(0) // non-blocking
+               list, delta := netpoll(0) // non-blocking
                injectglist(&list)
+               netpollAdjustWaiters(delta)
        }
        lock(&sched.lock)
 
@@ -2974,10 +2975,11 @@ top:
        // blocked thread (e.g. it has already returned from netpoll, but does
        // not set lastpoll yet), this thread will do blocking netpoll below
        // anyway.
-       if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
-               if list := netpoll(0); !list.empty() { // non-blocking
+       if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
+               if list, delta := netpoll(0); !list.empty() { // non-blocking
                        gp := list.pop()
                        injectglist(&list)
+                       netpollAdjustWaiters(delta)
                        casgstatus(gp, _Gwaiting, _Grunnable)
                        if traceEnabled() {
                                traceGoUnpark(gp, 0)
@@ -3166,7 +3168,7 @@ top:
        }
 
        // Poll network until next timer.
-       if netpollinited() && (netpollWaiters.Load() > 0 || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {
+       if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {
                sched.pollUntil.Store(pollUntil)
                if mp.p != 0 {
                        throw("findrunnable: netpoll with p")
@@ -3188,7 +3190,7 @@ top:
                        // When using fake time, just poll.
                        delay = 0
                }
-               list := netpoll(delay) // block until new work is available
+               list, delta := netpoll(delay) // block until new work is available
                // Refresh now again, after potentially blocking.
                now = nanotime()
                sched.pollUntil.Store(0)
@@ -3204,11 +3206,13 @@ top:
                unlock(&sched.lock)
                if pp == nil {
                        injectglist(&list)
+                       netpollAdjustWaiters(delta)
                } else {
                        acquirep(pp)
                        if !list.empty() {
                                gp := list.pop()
                                injectglist(&list)
+                               netpollAdjustWaiters(delta)
                                casgstatus(gp, _Gwaiting, _Grunnable)
                                if traceEnabled() {
                                        traceGoUnpark(gp, 0)
@@ -3242,9 +3246,10 @@ func pollWork() bool {
        if !runqempty(p) {
                return true
        }
-       if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
-               if list := netpoll(0); !list.empty() {
+       if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
+               if list, delta := netpoll(0); !list.empty() {
                        injectglist(&list)
+                       netpollAdjustWaiters(delta)
                        return true
                }
        }
@@ -5596,7 +5601,7 @@ func sysmon() {
                lastpoll := sched.lastpoll.Load()
                if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
                        sched.lastpoll.CompareAndSwap(lastpoll, now)
-                       list := netpoll(0) // non-blocking - returns list of goroutines
+                       list, delta := netpoll(0) // non-blocking - returns list of goroutines
                        if !list.empty() {
                                // Need to decrement number of idle locked M's
                                // (pretending that one more is running) before injectglist.
@@ -5608,6 +5613,7 @@ func sysmon() {
                                incidlelocked(-1)
                                injectglist(&list)
                                incidlelocked(1)
+                               netpollAdjustWaiters(delta)
                        }
                }
                if GOOS == "netbsd" && needSysmonWorkaround {