This new facility will be used by future CLs in this series.
Change the only blocking call to netpoll to do the right thing when
netpoll returns an empty list.
Updates #6239
Updates #27707
Change-Id: I58b3c2903eda61a3698b1a4729ed0e81382bb1ed
Reviewed-on: https://go-review.googlesource.com/c/go/+/171821
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
_EBADF = 0x9
_EFAULT = 0xe
_EAGAIN = 0xb
+ _ETIME = 0x3e
_ETIMEDOUT = 0x91
_EWOULDBLOCK = 0xb
_EINPROGRESS = 0x96
EBADF = C.EBADF
EFAULT = C.EFAULT
EAGAIN = C.EAGAIN
+ ETIME = C.ETIME
ETIMEDOUT = C.ETIMEDOUT
EWOULDBLOCK = C.EWOULDBLOCK
EINPROGRESS = C.EINPROGRESS
unlock(&mtxset)
}
+// netpoll checks for ready network connections.
+// Returns list of goroutines that become runnable.
+// delay < 0: blocks indefinitely
+// delay == 0: does not block, just polls
+// delay > 0: block for up to that many nanoseconds
//go:nowritebarrierrec
-func netpoll(block bool) gList {
- timeout := ^uintptr(0)
- if !block {
- timeout = 0
+func netpoll(delay int64) gList {
+ var timeout uintptr
+ if delay < 0 {
+ timeout = ^uintptr(0)
+ } else if delay == 0 {
+ // TODO: call poll with timeout == 0
return gList{}
+ } else if delay < 1e6 {
+ timeout = 1
+ } else if delay < 1e15 {
+ timeout = uintptr(delay / 1e6)
+ } else {
+ // An arbitrary cap on how long to wait for a timer.
+ // 1e9 ms == ~11.5 days.
+ timeout = 1e9
}
retry:
lock(&mtxpoll)
throw("poll failed")
}
unlock(&mtxset)
+ // If a timed sleep was interrupted, just return to
+ // recalculate how long we should sleep now.
+ if timeout > 0 {
+ return gList{}
+ }
goto retry
}
// Check if some descriptors need to be changed
}
}
unlock(&mtxset)
- if block && toRun.empty() {
- goto retry
- }
return toRun
}
throw("runtime: unused")
}
-// polls for ready network connections
-// returns list of goroutines that become runnable
-func netpoll(block bool) gList {
+// netpoll checks for ready network connections.
+// Returns list of goroutines that become runnable.
+// delay < 0: blocks indefinitely
+// delay == 0: does not block, just polls
+// delay > 0: block for up to that many nanoseconds
+func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
- waitms := int32(-1)
- if !block {
+ var waitms int32
+ if delay < 0 {
+ waitms = -1
+ } else if delay == 0 {
waitms = 0
+ } else if delay < 1e6 {
+ waitms = 1
+ } else if delay < 1e15 {
+ waitms = int32(delay / 1e6)
+ } else {
+ // An arbitrary cap on how long to wait for a timer.
+ // 1e9 ms == ~11.5 days.
+ waitms = 1e9
}
var events [128]epollevent
retry:
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
+ // If a timed sleep was interrupted, just return to
+ // recalculate how long we should sleep now.
+ if waitms > 0 {
+ return gList{}
+ }
goto retry
}
var toRun gList
netpollready(&toRun, pd, mode)
}
}
- if block && toRun.empty() {
- goto retry
- }
return toRun
}
func netpollarm(pd *pollDesc, mode int) {
}
-func netpoll(block bool) gList {
+func netpoll(delay int64) gList {
return gList{}
}
throw("runtime: unused")
}
-// Polls for ready network connections.
+// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
-func netpoll(block bool) gList {
+// delay < 0: blocks indefinitely
+// delay == 0: does not block, just polls
+// delay > 0: block for up to that many nanoseconds
+func netpoll(delay int64) gList {
if kq == -1 {
return gList{}
}
var tp *timespec
var ts timespec
- if !block {
+ if delay < 0 {
+ tp = nil
+ } else if delay == 0 {
+ tp = &ts
+ } else {
+ ts.setNsec(delay)
+ if ts.tv_sec > 1e6 {
+ // Darwin returns EINVAL if the sleep time is too long.
+ ts.tv_sec = 1e6
+ }
tp = &ts
}
var events [64]keventt
println("runtime: kevent on fd", kq, "failed with", -n)
throw("runtime: netpoll failed")
}
+ // If a timed sleep was interrupted, just return to
+ // recalculate how long we should sleep now.
+ if delay > 0 {
+ return gList{}
+ }
goto retry
}
var toRun gList
netpollready(&toRun, pd, mode)
}
}
- if block && toRun.empty() {
- goto retry
- }
return toRun
}
unlock(&pd.lock)
}
-// polls for ready network connections
-// returns list of goroutines that become runnable
-func netpoll(block bool) gList {
+// netpoll checks for ready network connections.
+// Returns list of goroutines that become runnable.
+// delay < 0: blocks indefinitely
+// delay == 0: does not block, just polls
+// delay > 0: block for up to that many nanoseconds
+func netpoll(delay int64) gList {
if portfd == -1 {
return gList{}
}
var wait *timespec
- var zero timespec
- if !block {
- wait = &zero
+ var ts timespec
+ if delay < 0 {
+ wait = nil
+ } else if delay == 0 {
+ wait = &ts
+ } else {
+ ts.setNsec(delay)
+ if ts.tv_sec > 1e6 {
+ // An arbitrary cap on how long to wait for a timer.
+ // 1e6 s == ~11.5 days.
+ ts.tv_sec = 1e6
+ }
+ wait = &ts
}
var events [128]portevent
retry:
var n uint32 = 1
if port_getn(portfd, &events[0], uint32(len(events)), &n, wait) < 0 {
- if e := errno(); e != _EINTR {
+ if e := errno(); e != _EINTR && e != _ETIME {
print("runtime: port_getn on fd ", portfd, " failed (errno=", e, ")\n")
throw("runtime: netpoll failed")
}
+ // If a timed sleep was interrupted, just return to
+ // recalculate how long we should sleep now.
+ if delay > 0 {
+ return gList{}
+ }
goto retry
}
}
}
- if block && toRun.empty() {
- goto retry
- }
return toRun
}
// Polls for ready network connections.
// Returns list of goroutines that become runnable.
-func netpoll(block bool) gList {
+func netpoll(delay int64) gList {
// Implementation for platforms that do not support
// integrated network poller.
return gList{}
throw("runtime: unused")
}
-// Polls for completed network IO.
+// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
-func netpoll(block bool) gList {
+// delay < 0: blocks indefinitely
+// delay == 0: does not block, just polls
+// delay > 0: block for up to that many nanoseconds
+func netpoll(delay int64) gList {
var entries [64]overlappedEntry
var wait, qty, key, flags, n, i uint32
var errno int32
if iocphandle == _INVALID_HANDLE_VALUE {
return gList{}
}
- wait = 0
- if block {
+ if delay < 0 {
wait = _INFINITE
+ } else if delay == 0 {
+ wait = 0
+ } else if delay < 1e6 {
+ wait = 1
+ } else if delay < 1e15 {
+ wait = uint32(delay / 1e6)
+ } else {
+ // An arbitrary cap on how long to wait for a timer.
+ // 1e9 ms == ~11.5 days.
+ wait = 1e9
}
-retry:
+
if _GetQueuedCompletionStatusEx != nil {
n = uint32(len(entries) / int(gomaxprocs))
if n < 8 {
n = 8
}
- if block {
+ if delay != 0 {
mp.blocked = true
}
if stdcall6(_GetQueuedCompletionStatusEx, iocphandle, uintptr(unsafe.Pointer(&entries[0])), uintptr(n), uintptr(unsafe.Pointer(&n)), uintptr(wait), 0) == 0 {
mp.blocked = false
errno = int32(getlasterror())
- if !block && errno == _WAIT_TIMEOUT {
+ if errno == _WAIT_TIMEOUT {
return gList{}
}
println("runtime: GetQueuedCompletionStatusEx failed (errno=", errno, ")")
op = nil
errno = 0
qty = 0
- if block {
+ if delay != 0 {
mp.blocked = true
}
if stdcall5(_GetQueuedCompletionStatus, iocphandle, uintptr(unsafe.Pointer(&qty)), uintptr(unsafe.Pointer(&key)), uintptr(unsafe.Pointer(&op)), uintptr(wait)) == 0 {
mp.blocked = false
errno = int32(getlasterror())
- if !block && errno == _WAIT_TIMEOUT {
+ if errno == _WAIT_TIMEOUT {
return gList{}
}
if op == nil {
mp.blocked = false
handlecompletion(&toRun, op, errno, qty)
}
- if block && toRun.empty() {
- goto retry
- }
return toRun
}
func startTheWorldWithSema(emitTraceEvent bool) int64 {
mp := acquirem() // disable preemption because it can be holding p in a local var
if netpollinited() {
- list := netpoll(false) // non-blocking
+ list := netpoll(0) // non-blocking
injectglist(&list)
}
lock(&sched.lock)
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
- if list := netpoll(false); !list.empty() { // non-blocking
+ if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
- list := netpoll(true) // block until new work is available
+ list := netpoll(-1) // block until new work is available
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
- if !list.empty() {
- lock(&sched.lock)
- _p_ = pidleget()
- unlock(&sched.lock)
- if _p_ != nil {
- acquirep(_p_)
+ lock(&sched.lock)
+ _p_ = pidleget()
+ unlock(&sched.lock)
+ if _p_ == nil {
+ injectglist(&list)
+ } else {
+ acquirep(_p_)
+ if !list.empty() {
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
}
return gp, false
}
- injectglist(&list)
+ if wasSpinning {
+ _g_.m.spinning = true
+ atomic.Xadd(&sched.nmspinning, 1)
+ }
+ goto top
}
}
stopm()
return true
}
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 {
- if list := netpoll(false); !list.empty() {
+ if list := netpoll(0); !list.empty() {
injectglist(&list)
return true
}
now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
- list := netpoll(false) // non-blocking - returns list of goroutines
+ list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.