The new netpollBreak function can be used to interrupt a blocking netpoll.
This function is not currently used; it will be used by later CLs.
Updates #27707
Change-Id: I5cb936609ba13c3c127ea1368a49194fc58c9f4d
Reviewed-on: https://go-review.googlesource.com/c/go/+/171824
Run-TryBot: Ian Lance Taylor <iant@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
_EBADF = 0x9
_EFAULT = 0xe
_EAGAIN = 0xb
+ _EBUSY = 0x10
_ETIME = 0x3e
_ETIMEDOUT = 0x91
_EWOULDBLOCK = 0xb
_POLLHUP = 0x10
_POLLERR = 0x8
- _PORT_SOURCE_FD = 0x4
+ _PORT_SOURCE_FD = 0x4
+ _PORT_SOURCE_ALERT = 0x5
+ _PORT_ALERT_UPDATE = 0x2
)
type semt struct {
const (
EINTR = C.EINTR
EFAULT = C.EFAULT
+ EAGAIN = C.EAGAIN
ETIMEDOUT = C.ETIMEDOUT
PROT_NONE = C.PROT_NONE
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ETIMEDOUT = 0x3c
_PROT_NONE = 0x0
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ETIMEDOUT = 0x3c
_PROT_NONE = 0x0
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ETIMEDOUT = 0x3c
_PROT_NONE = 0x0
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ETIMEDOUT = 0x3c
_PROT_NONE = 0x0
const (
EINTR = C.EINTR
EFAULT = C.EFAULT
+ EAGAIN = C.EAGAIN
ENOSYS = C.ENOSYS
O_NONBLOCK = C.O_NONBLOCK
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
const (
EINTR = C.EINTR
EFAULT = C.EFAULT
+ EAGAIN = C.EAGAIN
ENOSYS = C.ENOSYS
O_NONBLOCK = C.O_NONBLOCK
const (
EINTR = C.EINTR
EFAULT = C.EFAULT
+ EAGAIN = C.EAGAIN
ENOSYS = C.ENOSYS
O_NONBLOCK = C.O_NONBLOCK
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
const (
_EINTR = 0x4
_EFAULT = 0xe
+ _EAGAIN = 0x23
_ENOSYS = 0x4e
_O_NONBLOCK = 0x4
EBADF = C.EBADF
EFAULT = C.EFAULT
EAGAIN = C.EAGAIN
+ EBUSY = C.EBUSY
ETIME = C.ETIME
ETIMEDOUT = C.ETIMEDOUT
EWOULDBLOCK = C.EWOULDBLOCK
POLLHUP = C.POLLHUP
POLLERR = C.POLLERR
- PORT_SOURCE_FD = C.PORT_SOURCE_FD
+ PORT_SOURCE_FD = C.PORT_SOURCE_FD
+ PORT_SOURCE_ALERT = C.PORT_SOURCE_ALERT
+ PORT_ALERT_UPDATE = C.PORT_ALERT_UPDATE
)
type SemT C.sem_t
var Atoi32 = atoi32
var Nanotime = nanotime
+var Netpoll = netpoll
+var NetpollBreak = netpollBreak
+var Usleep = usleep
var PhysHugePageSize = physHugePageSize
)
// Integrated network poller (platform-independent part).
-// A particular implementation (epoll/kqueue) must define the following functions:
-// func netpollinit() // to initialize the poller
-// func netpollopen(fd uintptr, pd *pollDesc) int32 // to arm edge-triggered notifications
-// and associate fd with pd.
-// An implementation must call the following function to denote that the pd is ready.
-// func netpollready(gpp **g, pd *pollDesc, mode int32)
+// A particular implementation (epoll/kqueue/port/AIX/Windows)
+// must define the following functions:
+//
+// func netpollinit()
+// Initialize the poller. Only called once.
+//
+// func netpollopen(fd uintptr, pd *pollDesc) int32
+// Arm edge-triggered notifications for fd. The pd argument is to pass
+// back to netpollready when fd is ready. Return an errno value.
+//
+// func netpoll(delta int64) gList
+// Poll the network. If delta < 0, block indefinitely. If delta == 0,
+// poll without blocking. If delta > 0, block for up to delta nanoseconds.
+// Return a list of goroutines built by calling netpollready.
+//
+// func netpollBreak()
+// Wake up the network poller, assumed to be blocked in netpoll.
+//
+// func netpollIsPollDescriptor(fd uintptr) bool
+// Reports whether fd is a file descriptor used by the poller.
// pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
// goroutines respectively. The semaphore can be in the following states:
// poll_runtime_isPollServerDescriptor reports whether fd is a
// descriptor being used by netpoll.
func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
- fds := netpolldescriptor()
- if GOOS != "aix" {
- return fd == fds
- } else {
- // AIX have a pipe in its netpoll implementation.
- // Therefore, two fd are returned by netpolldescriptor using a mask.
- return fd == fds&0xFFFF || fd == (fds>>16)&0xFFFF
- }
+ return netpollIsPollDescriptor(fd)
}
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
}
}
-// make pd ready, newly runnable goroutines (if any) are added to toRun.
-// May run during STW, so write barriers are not allowed.
+// netpollready is called by the platform-specific netpoll function.
+// It declares that the fd associated with pd is ready for I/O.
+// The toRun argument is used to build a list of goroutines to return
+// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
+// whether the fd is ready for reading or writing or both.
+//
+// This may run while the world is stopped, so write barriers are not allowed.
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
pds[0] = nil
}
-func netpolldescriptor() uintptr {
- // Both fd must be returned
- if rdwake > 0xFFFF || wrwake > 0xFFFF {
- throw("netpolldescriptor: invalid fd number")
- }
- return uintptr(rdwake<<16 | wrwake)
+func netpollIsPollDescriptor(fd uintptr) bool {
+ return fd == uintptr(rdwake) || fd == uintptr(wrwake)
}
// netpollwakeup writes on wrwake to wakeup poll before any changes.
unlock(&mtxset)
}
+// netpollBreak interrupts an epollwait.
+func netpollBreak() {
+ netpollwakeup()
+}
+
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
}
// Check if some descriptors need to be changed
if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
- var b [1]byte
- for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
+ if delay != 0 {
+ // A netpollwakeup could be picked up by a
+ // non-blocking poll. Only clear the wakeup
+ // if blocking.
+ var b [1]byte
+ for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
+ }
}
// Do not look at the other fds in this case as the mode may have changed
// XXX only additions of flags are made, so maybe it is ok
var (
epfd int32 = -1 // epoll descriptor
+
+ netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
)
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
- if epfd >= 0 {
- return
- }
- epfd = epollcreate(1024)
- if epfd >= 0 {
+ if epfd < 0 {
+ epfd = epollcreate(1024)
+ if epfd < 0 {
+ println("runtime: epollcreate failed with", -epfd)
+ throw("runtime: netpollinit failed")
+ }
closeonexec(epfd)
- return
}
- println("runtime: epollcreate failed with", -epfd)
- throw("runtime: netpollinit failed")
+ r, w, errno := nonblockingPipe()
+ if errno != 0 {
+ println("runtime: pipe failed with", -errno)
+ throw("runtime: pipe failed")
+ }
+ ev := epollevent{
+ events: _EPOLLIN,
+ }
+ *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
+ errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
+ if errno != 0 {
+ println("runtime: epollctl failed with", -errno)
+ throw("runtime: epollctl failed")
+ }
+ netpollBreakRd = uintptr(r)
+ netpollBreakWr = uintptr(w)
}
-func netpolldescriptor() uintptr {
- return uintptr(epfd)
+func netpollIsPollDescriptor(fd uintptr) bool {
+ return fd == uintptr(epfd) || fd == netpollBreakRd || fd == netpollBreakWr
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
throw("runtime: unused")
}
+// netpollBreak interrupts an epollwait.
+func netpollBreak() {
+ for {
+ var b byte
+ n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
+ if n == 1 {
+ break
+ }
+ if n == -_EINTR {
+ continue
+ }
+ if n == -_EAGAIN {
+ return
+ }
+ println("runtime: netpollBreak write failed with", -n)
+ throw("runtime: netpollBreak write failed")
+ }
+}
+
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
if ev.events == 0 {
continue
}
+
+ if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
+ if ev.events != _EPOLLIN {
+ println("runtime: netpoll: break fd ready for", ev.events)
+ throw("runtime: netpoll: break fd ready for something unexpected")
+ }
+ if delay != 0 {
+ // netpollBreak could be picked up by a
+ // nonblocking poll. Only read the byte
+ // if blocking.
+ var tmp [16]byte
+ read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
+ }
+ continue
+ }
+
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
func netpollinit() {
}
-func netpolldescriptor() uintptr {
- return ^uintptr(0)
+func netpollIsPollDescriptor(fd uintptr) bool {
+ return false
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
func netpollarm(pd *pollDesc, mode int) {
}
+func netpollBreak() {
+}
+
func netpoll(delay int64) gList {
return gList{}
}
var (
kq int32 = -1
+
+ netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
)
func netpollinit() {
throw("runtime: netpollinit failed")
}
closeonexec(kq)
+ r, w, errno := nonblockingPipe()
+ if errno != 0 {
+ println("runtime: pipe failed with", -errno)
+ throw("runtime: pipe failed")
+ }
+ ev := keventt{
+ filter: _EVFILT_READ,
+ flags: _EV_ADD,
+ }
+ *(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
+ n := kevent(kq, &ev, 1, nil, 0, nil)
+ if n < 0 {
+ println("runtime: kevent failed with", -errno)
+ throw("runtime: kevent failed")
+ }
+ netpollBreakRd = uintptr(r)
+ netpollBreakWr = uintptr(w)
}
-func netpolldescriptor() uintptr {
- return uintptr(kq)
+func netpollIsPollDescriptor(fd uintptr) bool {
+ return fd == uintptr(kq) || fd == netpollBreakRd || fd == netpollBreakWr
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
throw("runtime: unused")
}
+// netpollBreak interrupts an epollwait.
+func netpollBreak() {
+ for {
+ var b byte
+ n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
+ if n == 1 || n == -_EAGAIN {
+ break
+ }
+ if n == -_EINTR {
+ continue
+ }
+ println("runtime: netpollBreak write failed with", -n)
+ throw("runtime: netpollBreak write failed")
+ }
+}
+
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
var toRun gList
for i := 0; i < int(n); i++ {
ev := &events[i]
+
+ if uintptr(ev.ident) == netpollBreakRd {
+ if ev.filter != _EVFILT_READ {
+ println("runtime: netpoll: break fd ready for", ev.filter)
+ throw("runtime: netpoll: break fd ready for something unexpected")
+ }
+ if delay != 0 {
+ // netpollBreak could be picked up by a
+ // nonblocking poll. Only read the byte
+ // if blocking.
+ var tmp [16]byte
+ read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
+ }
+ continue
+ }
+
var mode int32
switch ev.filter {
case _EVFILT_READ:
//go:cgo_import_dynamic libc_port_associate port_associate "libc.so"
//go:cgo_import_dynamic libc_port_dissociate port_dissociate "libc.so"
//go:cgo_import_dynamic libc_port_getn port_getn "libc.so"
+//go:cgo_import_dynamic libc_port_alert port_alert "libc.so"
//go:linkname libc_port_create libc_port_create
//go:linkname libc_port_associate libc_port_associate
//go:linkname libc_port_dissociate libc_port_dissociate
//go:linkname libc_port_getn libc_port_getn
+//go:linkname libc_port_alert libc_port_alert
var (
libc_port_create,
libc_port_associate,
libc_port_dissociate,
- libc_port_getn libcFunc
+ libc_port_getn,
+ libc_port_alert libcFunc
)
func errno() int32 {
return int32(sysvicall5(&libc_port_getn, uintptr(port), uintptr(unsafe.Pointer(evs)), uintptr(max), uintptr(unsafe.Pointer(nget)), uintptr(unsafe.Pointer(timeout))))
}
+func port_alert(port int32, flags, events uint32, user uintptr) int32 {
+ return int32(sysvicall4(&libc_port_alert, uintptr(port), uintptr(flags), uintptr(events), user))
+}
+
var portfd int32 = -1
func netpollinit() {
throw("runtime: netpollinit failed")
}
-func netpolldescriptor() uintptr {
- return uintptr(portfd)
+func netpollIsPollDescriptor(fd uintptr) bool {
+ return fd == uintptr(portfd)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
unlock(&pd.lock)
}
+// netpollBreak interrupts a port_getn wait.
+func netpollBreak() {
+ // Use port_alert to put portfd into alert mode.
+ // This will wake up all threads sleeping in port_getn on portfd,
+ // and cause their calls to port_getn to return immediately.
+ // Further, until portfd is taken out of alert mode,
+ // all calls to port_getn will return immediately.
+ if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 {
+ if e := errno(); e != _EBUSY {
+ println("runtime: port_alert failed with", e)
+ throw("runtime: netpoll: port_alert failed")
+ }
+ }
+}
+
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
for i := 0; i < int(n); i++ {
ev := &events[i]
+ if ev.portev_source == _PORT_SOURCE_ALERT {
+ if ev.portev_events != _POLLHUP || unsafe.Pointer(ev.portev_user) != unsafe.Pointer(&portfd) {
+ throw("runtime: netpoll: bad port_alert wakeup")
+ }
+ if delay != 0 {
+ // Now that a blocking call to netpoll
+ // has seen the alert, take portfd
+ // back out of alert mode.
+ // See the comment in netpollBreak.
+ if port_alert(portfd, 0, 0, 0) < 0 {
+ e := errno()
+ println("runtime: port_alert failed with", e)
+ throw("runtime: netpoll: port_alert failed")
+ }
+ }
+ continue
+ }
+
if ev.portev_events == 0 {
continue
}
package runtime
+import "runtime/internal/atomic"
+
var netpollWaiters uint32
+var netpollStubLock mutex
+var netpollNote note
+var netpollBroken uint32
+
+func netpollBreak() {
+ if atomic.Cas(&netpollBroken, 0, 1) {
+ notewakeup(&netpollNote)
+ }
+}
+
// Polls for ready network connections.
// Returns list of goroutines that become runnable.
func netpoll(delay int64) gList {
// Implementation for platforms that do not support
// integrated network poller.
+ if delay != 0 {
+ noteclear(&netpollNote)
+ atomic.Store(&netpollBroken, 0)
+ notetsleep(&netpollNote, delay)
+ }
return gList{}
}
}
}
-func netpolldescriptor() uintptr {
- return iocphandle
+func netpollIsPollDescriptor(fd uintptr) bool {
+ return fd == iocphandle
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
throw("runtime: unused")
}
+func netpollBreak() {
+ if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 {
+ println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")")
+ throw("runtime: netpoll: PostQueuedCompletionStatus failed")
+ }
+}
+
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
mp.blocked = false
for i = 0; i < n; i++ {
op = entries[i].op
- errno = 0
- qty = 0
- if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 {
- errno = int32(getlasterror())
+ if op != nil {
+ errno = 0
+ qty = 0
+ if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 {
+ errno = int32(getlasterror())
+ }
+ handlecompletion(&toRun, op, errno, qty)
+ } else {
+ if delay == 0 {
+ // Forward the notification to the
+ // blocked poller.
+ netpollBreak()
+ }
}
- handlecompletion(&toRun, op, errno, qty)
}
} else {
op = nil
// dequeued failed IO packet, so report that
}
mp.blocked = false
+ if op == nil {
+ if delay == 0 {
+ // Forward the notification to the
+ // blocked poller.
+ netpollBreak()
+ }
+ return gList{}
+ }
handlecompletion(&toRun, op, errno, qty)
}
return toRun
// From <sys/lwp.h>
_LWP_DETACHED = 0x00000040
-
- _EAGAIN = 35
)
type mOS struct {
const (
_ESRCH = 3
- _EAGAIN = 35
_EWOULDBLOCK = _EAGAIN
_ENOTSUP = 91
//go:cgo_import_dynamic runtime._GetThreadContext GetThreadContext%2 "kernel32.dll"
//go:cgo_import_dynamic runtime._LoadLibraryW LoadLibraryW%1 "kernel32.dll"
//go:cgo_import_dynamic runtime._LoadLibraryA LoadLibraryA%1 "kernel32.dll"
+//go:cgo_import_dynamic runtime._PostQueuedCompletionStatus PostQueuedCompletionStatus%4 "kernel32.dll"
//go:cgo_import_dynamic runtime._ResumeThread ResumeThread%1 "kernel32.dll"
//go:cgo_import_dynamic runtime._SetConsoleCtrlHandler SetConsoleCtrlHandler%2 "kernel32.dll"
//go:cgo_import_dynamic runtime._SetErrorMode SetErrorMode%1 "kernel32.dll"
_GetThreadContext,
_LoadLibraryW,
_LoadLibraryA,
+ _PostQueuedCompletionStatus,
_QueryPerformanceCounter,
_QueryPerformanceFrequency,
_ResumeThread,
func TestGetgThreadSwitch(t *testing.T) {
runtime.RunGetgThreadSwitchTest()
}
+
+// TestNetpollBreak tests that netpollBreak can break a netpoll.
+// This test is not particularly safe since the call to netpoll
+// will pick up any stray files that are ready, but it should work
+// OK as long it is not run in parallel.
+func TestNetpollBreak(t *testing.T) {
+ if runtime.GOMAXPROCS(0) == 1 {
+ t.Skip("skipping: GOMAXPROCS=1")
+ }
+
+ // Make sure that netpoll is initialized.
+ time.Sleep(1)
+
+ start := time.Now()
+ c := make(chan bool, 2)
+ go func() {
+ c <- true
+ runtime.Netpoll(10 * time.Second.Nanoseconds())
+ c <- true
+ }()
+ <-c
+ // Loop because the break might get eaten by the scheduler.
+ // Break twice to break both the netpoll we started and the
+ // scheduler netpoll.
+loop:
+ for {
+ runtime.Usleep(100)
+ runtime.NetpollBreak()
+ runtime.NetpollBreak()
+ select {
+ case <-c:
+ break loop
+ default:
+ }
+ }
+ if dur := time.Since(start); dur > 5*time.Second {
+ t.Errorf("netpollBreak did not interrupt netpoll: slept for: %v", dur)
+ }
+}