]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: add netpollBreak
authorIan Lance Taylor <iant@golang.org>
Fri, 5 Apr 2019 22:53:12 +0000 (15:53 -0700)
committerIan Lance Taylor <iant@golang.org>
Mon, 21 Oct 2019 16:37:45 +0000 (16:37 +0000)
The new netpollBreak function can be used to interrupt a blocking netpoll.
This function is not currently used; it will be used by later CLs.

Updates #27707

Change-Id: I5cb936609ba13c3c127ea1368a49194fc58c9f4d
Reviewed-on: https://go-review.googlesource.com/c/go/+/171824
Run-TryBot: Ian Lance Taylor <iant@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
34 files changed:
src/runtime/defs1_netbsd_386.go
src/runtime/defs1_netbsd_amd64.go
src/runtime/defs1_netbsd_arm.go
src/runtime/defs1_netbsd_arm64.go
src/runtime/defs1_solaris_amd64.go
src/runtime/defs_darwin.go
src/runtime/defs_darwin_386.go
src/runtime/defs_darwin_amd64.go
src/runtime/defs_darwin_arm.go
src/runtime/defs_darwin_arm64.go
src/runtime/defs_freebsd.go
src/runtime/defs_freebsd_386.go
src/runtime/defs_freebsd_amd64.go
src/runtime/defs_freebsd_arm.go
src/runtime/defs_netbsd.go
src/runtime/defs_openbsd.go
src/runtime/defs_openbsd_386.go
src/runtime/defs_openbsd_amd64.go
src/runtime/defs_openbsd_arm.go
src/runtime/defs_openbsd_arm64.go
src/runtime/defs_solaris.go
src/runtime/export_test.go
src/runtime/netpoll.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_epoll.go
src/runtime/netpoll_fake.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_solaris.go
src/runtime/netpoll_stub.go
src/runtime/netpoll_windows.go
src/runtime/os_netbsd.go
src/runtime/os_openbsd.go
src/runtime/os_windows.go
src/runtime/proc_test.go

index da48cc84c27dd80fe7994a178a4340650e305f1e..a4548e6f0629bcb7b505c20c22c48bc25159fd8b 100644 (file)
@@ -6,6 +6,7 @@ package runtime
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index 0b25b8da7c44514452452d7eb7ebfa0101ad1b57..4b0e79ebb6fc4cf9fcab999fee104775e8fcda31 100644 (file)
@@ -6,6 +6,7 @@ package runtime
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index 4738b546d17f0ed49acdc2db11416959e2c9d1fb..2b5d5990d3618bfb978b329139ad274608d9551c 100644 (file)
@@ -6,6 +6,7 @@ package runtime
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index 14c07d17040db2ecf0c1abbd47e5d8a31b7ecbfb..740dc77658f9170f60409004a1f1d7c8293f9356 100644 (file)
@@ -6,6 +6,7 @@ package runtime
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index 14b5c7949e407bc029e5ba88c0afd5239d0c1459..ee6c45e5242b6d8b97313ceaef33c61767182643 100644 (file)
@@ -8,6 +8,7 @@ const (
        _EBADF       = 0x9
        _EFAULT      = 0xe
        _EAGAIN      = 0xb
+       _EBUSY       = 0x10
        _ETIME       = 0x3e
        _ETIMEDOUT   = 0x91
        _EWOULDBLOCK = 0xb
@@ -100,7 +101,9 @@ const (
        _POLLHUP = 0x10
        _POLLERR = 0x8
 
-       _PORT_SOURCE_FD = 0x4
+       _PORT_SOURCE_FD    = 0x4
+       _PORT_SOURCE_ALERT = 0x5
+       _PORT_ALERT_UPDATE = 0x2
 )
 
 type semt struct {
index 0cd133f6e02755c1fa74bda5367aa72fd3fb0f3c..de1489f032632b86140421ea09117a6c5731e8a8 100644 (file)
@@ -30,6 +30,7 @@ import "C"
 const (
        EINTR     = C.EINTR
        EFAULT    = C.EFAULT
+       EAGAIN    = C.EAGAIN
        ETIMEDOUT = C.ETIMEDOUT
 
        PROT_NONE  = C.PROT_NONE
index 83928e784151558a57f5e5a215e807e656f4e14b..a78f54bcf56c1de960fcd67cfe8c487843a869c9 100644 (file)
@@ -8,6 +8,7 @@ import "unsafe"
 const (
        _EINTR     = 0x4
        _EFAULT    = 0xe
+       _EAGAIN    = 0x23
        _ETIMEDOUT = 0x3c
 
        _PROT_NONE  = 0x0
index 45c34a8fc04fbd5f0d7cc72a92a3b43566d75a3a..cbc26bfcffa681305abb8ffaf70319a1470b2c68 100644 (file)
@@ -8,6 +8,7 @@ import "unsafe"
 const (
        _EINTR     = 0x4
        _EFAULT    = 0xe
+       _EAGAIN    = 0x23
        _ETIMEDOUT = 0x3c
 
        _PROT_NONE  = 0x0
index 5e2af978a7b4cccce7fdbf86edda3d4fa6eed64c..199886aad1f200c0f2154bf8620c6a0eda512a40 100644 (file)
@@ -10,6 +10,7 @@ import "unsafe"
 const (
        _EINTR     = 0x4
        _EFAULT    = 0xe
+       _EAGAIN    = 0x23
        _ETIMEDOUT = 0x3c
 
        _PROT_NONE  = 0x0
index f673eb7b24b031897196145398aea6b307645a09..2f466045d4f26978d1b1789e26e933b6ca1d5905 100644 (file)
@@ -8,6 +8,7 @@ import "unsafe"
 const (
        _EINTR     = 0x4
        _EFAULT    = 0xe
+       _EAGAIN    = 0x23
        _ETIMEDOUT = 0x3c
 
        _PROT_NONE  = 0x0
index 700e06eb8094d7a0dd68a1f4c49f4184ca07f541..e196dff07698d0c9f43f75a0f08c0b05e8f340d2 100644 (file)
@@ -47,6 +47,7 @@ const (
 const (
        EINTR  = C.EINTR
        EFAULT = C.EFAULT
+       EAGAIN = C.EAGAIN
        ENOSYS = C.ENOSYS
 
        O_NONBLOCK = C.O_NONBLOCK
index c113eee34c87503495fb848da66df974eee5eda4..6294fc32d4ccb96029dcb8a9c685388f0b435045 100644 (file)
@@ -15,6 +15,7 @@ const (
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index 9105cc392b1bb6c938095594cfb7aeb6fb9718f1..840c710eeb966388bc59744d6606b89e0264d98d 100644 (file)
@@ -15,6 +15,7 @@ const (
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index cf7ca696f4f7ddb0b0f032eb7f03819e90cda3c8..3307c8bbae3aa3c67c8030c28f39bebb670376de 100644 (file)
@@ -15,6 +15,7 @@ const (
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index 40eeb8c70f0fdca01017a8bf55af06ca2550a70a..3f5ce5adcac2ce3b4032e66d27c5b7d75f5d5a48 100644 (file)
@@ -32,6 +32,7 @@ import "C"
 const (
        EINTR  = C.EINTR
        EFAULT = C.EFAULT
+       EAGAIN = C.EAGAIN
        ENOSYS = C.ENOSYS
 
        O_NONBLOCK = C.O_NONBLOCK
index c425864b214fb3dfc65c10851c86762093a37bf5..4774e36c9234b29ab1f1f242b45c7c480581f43e 100644 (file)
@@ -28,6 +28,7 @@ import "C"
 const (
        EINTR  = C.EINTR
        EFAULT = C.EFAULT
+       EAGAIN = C.EAGAIN
        ENOSYS = C.ENOSYS
 
        O_NONBLOCK = C.O_NONBLOCK
index 0c89bf28ccf6398977f734eb70032411f651dce1..35f2e53fcf096e36093f8e8ef67c63c5ec1e3111 100644 (file)
@@ -8,6 +8,7 @@ import "unsafe"
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index 8d3523bf98cc6d0c607e73a7adf9ab201f008b95..c187a98ae0c0f1f823a094e4c21d43ce7c373780 100644 (file)
@@ -8,6 +8,7 @@ import "unsafe"
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index 8f5a5d6e9de16a7b6e813cff72cfb40667fb68e2..170bb3876c4abd7175ab195ff2828e6a2436a8ee 100644 (file)
@@ -8,6 +8,7 @@ import "unsafe"
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index c4ddefbd74d4e65c9186d5c2caa6cf64a8b42977..8b8d5cddf2eda9ff265f1d26a21599de41f0ca37 100644 (file)
@@ -5,6 +5,7 @@ import "unsafe"
 const (
        _EINTR  = 0x4
        _EFAULT = 0xe
+       _EAGAIN = 0x23
        _ENOSYS = 0x4e
 
        _O_NONBLOCK = 0x4
index b8ef12a1459d5e954a49832f34e0fb9779b38b5c..f42adebee37202b0bb37304c8bc37e5d40cd362b 100644 (file)
@@ -38,6 +38,7 @@ const (
        EBADF       = C.EBADF
        EFAULT      = C.EFAULT
        EAGAIN      = C.EAGAIN
+       EBUSY       = C.EBUSY
        ETIME       = C.ETIME
        ETIMEDOUT   = C.ETIMEDOUT
        EWOULDBLOCK = C.EWOULDBLOCK
@@ -129,7 +130,9 @@ const (
        POLLHUP = C.POLLHUP
        POLLERR = C.POLLERR
 
-       PORT_SOURCE_FD = C.PORT_SOURCE_FD
+       PORT_SOURCE_FD    = C.PORT_SOURCE_FD
+       PORT_SOURCE_ALERT = C.PORT_SOURCE_ALERT
+       PORT_ALERT_UPDATE = C.PORT_ALERT_UPDATE
 )
 
 type SemT C.sem_t
index e4a7faf96596067f922a6ea7e7ab73418a9d4776..42a456c7079bfb265fbc7a0a6d1eb5b9184717b9 100644 (file)
@@ -35,6 +35,9 @@ var Atoi = atoi
 var Atoi32 = atoi32
 
 var Nanotime = nanotime
+var Netpoll = netpoll
+var NetpollBreak = netpollBreak
+var Usleep = usleep
 
 var PhysHugePageSize = physHugePageSize
 
index adb072db38b6a985e1286857d3972694fa6fcb91..7d18dcaeeab3b1b70e210b02b15be5abda5e978c 100644 (file)
@@ -12,12 +12,26 @@ import (
 )
 
 // Integrated network poller (platform-independent part).
-// A particular implementation (epoll/kqueue) must define the following functions:
-// func netpollinit()                  // to initialize the poller
-// func netpollopen(fd uintptr, pd *pollDesc) int32    // to arm edge-triggered notifications
-// and associate fd with pd.
-// An implementation must call the following function to denote that the pd is ready.
-// func netpollready(gpp **g, pd *pollDesc, mode int32)
+// A particular implementation (epoll/kqueue/port/AIX/Windows)
+// must define the following functions:
+//
+// func netpollinit()
+//     Initialize the poller. Only called once.
+//
+// func netpollopen(fd uintptr, pd *pollDesc) int32
+//     Arm edge-triggered notifications for fd. The pd argument is to pass
+//     back to netpollready when fd is ready. Return an errno value.
+//
+// func netpoll(delta int64) gList
+//     Poll the network. If delta < 0, block indefinitely. If delta == 0,
+//     poll without blocking. If delta > 0, block for up to delta nanoseconds.
+//     Return a list of goroutines built by calling netpollready.
+//
+// func netpollBreak()
+//     Wake up the network poller, assumed to be blocked in netpoll.
+//
+// func netpollIsPollDescriptor(fd uintptr) bool
+//     Reports whether fd is a file descriptor used by the poller.
 
 // pollDesc contains 2 binary semaphores, rg and wg, to park reader and writer
 // goroutines respectively. The semaphore can be in the following states:
@@ -99,14 +113,7 @@ func netpollinited() bool {
 // poll_runtime_isPollServerDescriptor reports whether fd is a
 // descriptor being used by netpoll.
 func poll_runtime_isPollServerDescriptor(fd uintptr) bool {
-       fds := netpolldescriptor()
-       if GOOS != "aix" {
-               return fd == fds
-       } else {
-               // AIX have a pipe in its netpoll implementation.
-               // Therefore, two fd are returned by netpolldescriptor using a mask.
-               return fd == fds&0xFFFF || fd == (fds>>16)&0xFFFF
-       }
+       return netpollIsPollDescriptor(fd)
 }
 
 //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
@@ -316,8 +323,13 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
        }
 }
 
-// make pd ready, newly runnable goroutines (if any) are added to toRun.
-// May run during STW, so write barriers are not allowed.
+// netpollready is called by the platform-specific netpoll function.
+// It declares that the fd associated with pd is ready for I/O.
+// The toRun argument is used to build a list of goroutines to return
+// from netpoll. The mode argument is 'r', 'w', or 'r'+'w' to indicate
+// whether the fd is ready for reading or writing or both.
+//
+// This may run while the world is stopped, so write barriers are not allowed.
 //go:nowritebarrier
 func netpollready(toRun *gList, pd *pollDesc, mode int32) {
        var rg, wg *g
index 6feda27b80577b8318bcc05fd4013f94e27d1cb1..e1512f826cbc54e5620e7da0eed28ac83af95208 100644 (file)
@@ -63,12 +63,8 @@ func netpollinit() {
        pds[0] = nil
 }
 
-func netpolldescriptor() uintptr {
-       // Both fd must be returned
-       if rdwake > 0xFFFF || wrwake > 0xFFFF {
-               throw("netpolldescriptor: invalid fd number")
-       }
-       return uintptr(rdwake<<16 | wrwake)
+func netpollIsPollDescriptor(fd uintptr) bool {
+       return fd == uintptr(rdwake) || fd == uintptr(wrwake)
 }
 
 // netpollwakeup writes on wrwake to wakeup poll before any changes.
@@ -132,6 +128,11 @@ func netpollarm(pd *pollDesc, mode int) {
        unlock(&mtxset)
 }
 
+// netpollBreak interrupts an epollwait.
+func netpollBreak() {
+       netpollwakeup()
+}
+
 // netpoll checks for ready network connections.
 // Returns list of goroutines that become runnable.
 // delay < 0: blocks indefinitely
@@ -176,8 +177,13 @@ retry:
        }
        // Check if some descriptors need to be changed
        if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
-               var b [1]byte
-               for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
+               if delay != 0 {
+                       // A netpollwakeup could be picked up by a
+                       // non-blocking poll. Only clear the wakeup
+                       // if blocking.
+                       var b [1]byte
+                       for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
+                       }
                }
                // Do not look at the other fds in this case as the mode may have changed
                // XXX only additions of flags are made, so maybe it is ok
index 73dfb4561ea68328a04e1217a1d8b3d231786f5f..b9dc18c939e9c3b644838a0207377d8d28c3f983 100644 (file)
@@ -20,24 +20,40 @@ func closeonexec(fd int32)
 
 var (
        epfd int32 = -1 // epoll descriptor
+
+       netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
 )
 
 func netpollinit() {
        epfd = epollcreate1(_EPOLL_CLOEXEC)
-       if epfd >= 0 {
-               return
-       }
-       epfd = epollcreate(1024)
-       if epfd >= 0 {
+       if epfd < 0 {
+               epfd = epollcreate(1024)
+               if epfd < 0 {
+                       println("runtime: epollcreate failed with", -epfd)
+                       throw("runtime: netpollinit failed")
+               }
                closeonexec(epfd)
-               return
        }
-       println("runtime: epollcreate failed with", -epfd)
-       throw("runtime: netpollinit failed")
+       r, w, errno := nonblockingPipe()
+       if errno != 0 {
+               println("runtime: pipe failed with", -errno)
+               throw("runtime: pipe failed")
+       }
+       ev := epollevent{
+               events: _EPOLLIN,
+       }
+       *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
+       errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
+       if errno != 0 {
+               println("runtime: epollctl failed with", -errno)
+               throw("runtime: epollctl failed")
+       }
+       netpollBreakRd = uintptr(r)
+       netpollBreakWr = uintptr(w)
 }
 
-func netpolldescriptor() uintptr {
-       return uintptr(epfd)
+func netpollIsPollDescriptor(fd uintptr) bool {
+       return fd == uintptr(epfd) || fd == netpollBreakRd || fd == netpollBreakWr
 }
 
 func netpollopen(fd uintptr, pd *pollDesc) int32 {
@@ -56,6 +72,25 @@ func netpollarm(pd *pollDesc, mode int) {
        throw("runtime: unused")
 }
 
+// netpollBreak interrupts an epollwait.
+func netpollBreak() {
+       for {
+               var b byte
+               n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
+               if n == 1 {
+                       break
+               }
+               if n == -_EINTR {
+                       continue
+               }
+               if n == -_EAGAIN {
+                       return
+               }
+               println("runtime: netpollBreak write failed with", -n)
+               throw("runtime: netpollBreak write failed")
+       }
+}
+
 // netpoll checks for ready network connections.
 // Returns list of goroutines that become runnable.
 // delay < 0: blocks indefinitely
@@ -100,6 +135,22 @@ retry:
                if ev.events == 0 {
                        continue
                }
+
+               if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
+                       if ev.events != _EPOLLIN {
+                               println("runtime: netpoll: break fd ready for", ev.events)
+                               throw("runtime: netpoll: break fd ready for something unexpected")
+                       }
+                       if delay != 0 {
+                               // netpollBreak could be picked up by a
+                               // nonblocking poll. Only read the byte
+                               // if blocking.
+                               var tmp [16]byte
+                               read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
+                       }
+                       continue
+               }
+
                var mode int32
                if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
                        mode += 'r'
index 071d87ad50a62643fbfd9fb12dac60d60c70f856..b2af3b89b2c51c034d4e5271a5b2b11693e852e6 100644 (file)
@@ -12,8 +12,8 @@ package runtime
 func netpollinit() {
 }
 
-func netpolldescriptor() uintptr {
-       return ^uintptr(0)
+func netpollIsPollDescriptor(fd uintptr) bool {
+       return false
 }
 
 func netpollopen(fd uintptr, pd *pollDesc) int32 {
@@ -27,6 +27,9 @@ func netpollclose(fd uintptr) int32 {
 func netpollarm(pd *pollDesc, mode int) {
 }
 
+func netpollBreak() {
+}
+
 func netpoll(delay int64) gList {
        return gList{}
 }
index ce8da73d1e403cd2c98e4af86253e48632876b63..54586a393d65b0b649b7c9d55bf318fb7a7c2b27 100644 (file)
@@ -12,6 +12,8 @@ import "unsafe"
 
 var (
        kq int32 = -1
+
+       netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
 )
 
 func netpollinit() {
@@ -21,10 +23,27 @@ func netpollinit() {
                throw("runtime: netpollinit failed")
        }
        closeonexec(kq)
+       r, w, errno := nonblockingPipe()
+       if errno != 0 {
+               println("runtime: pipe failed with", -errno)
+               throw("runtime: pipe failed")
+       }
+       ev := keventt{
+               filter: _EVFILT_READ,
+               flags:  _EV_ADD,
+       }
+       *(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
+       n := kevent(kq, &ev, 1, nil, 0, nil)
+       if n < 0 {
+               println("runtime: kevent failed with", -errno)
+               throw("runtime: kevent failed")
+       }
+       netpollBreakRd = uintptr(r)
+       netpollBreakWr = uintptr(w)
 }
 
-func netpolldescriptor() uintptr {
-       return uintptr(kq)
+func netpollIsPollDescriptor(fd uintptr) bool {
+       return fd == uintptr(kq) || fd == netpollBreakRd || fd == netpollBreakWr
 }
 
 func netpollopen(fd uintptr, pd *pollDesc) int32 {
@@ -57,6 +76,22 @@ func netpollarm(pd *pollDesc, mode int) {
        throw("runtime: unused")
 }
 
+// netpollBreak interrupts an epollwait.
+func netpollBreak() {
+       for {
+               var b byte
+               n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
+               if n == 1 || n == -_EAGAIN {
+                       break
+               }
+               if n == -_EINTR {
+                       continue
+               }
+               println("runtime: netpollBreak write failed with", -n)
+               throw("runtime: netpollBreak write failed")
+       }
+}
+
 // netpoll checks for ready network connections.
 // Returns list of goroutines that become runnable.
 // delay < 0: blocks indefinitely
@@ -98,6 +133,22 @@ retry:
        var toRun gList
        for i := 0; i < int(n); i++ {
                ev := &events[i]
+
+               if uintptr(ev.ident) == netpollBreakRd {
+                       if ev.filter != _EVFILT_READ {
+                               println("runtime: netpoll: break fd ready for", ev.filter)
+                               throw("runtime: netpoll: break fd ready for something unexpected")
+                       }
+                       if delay != 0 {
+                               // netpollBreak could be picked up by a
+                               // nonblocking poll. Only read the byte
+                               // if blocking.
+                               var tmp [16]byte
+                               read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
+                       }
+                       continue
+               }
+
                var mode int32
                switch ev.filter {
                case _EVFILT_READ:
index ad41ab5af2126b31db2b16bd88c1c405e583ee65..fac4829ed1bcbf7b9fe080d0f439fc565c204464 100644 (file)
@@ -71,17 +71,20 @@ import "unsafe"
 //go:cgo_import_dynamic libc_port_associate port_associate "libc.so"
 //go:cgo_import_dynamic libc_port_dissociate port_dissociate "libc.so"
 //go:cgo_import_dynamic libc_port_getn port_getn "libc.so"
+//go:cgo_import_dynamic libc_port_alert port_alert "libc.so"
 
 //go:linkname libc_port_create libc_port_create
 //go:linkname libc_port_associate libc_port_associate
 //go:linkname libc_port_dissociate libc_port_dissociate
 //go:linkname libc_port_getn libc_port_getn
+//go:linkname libc_port_alert libc_port_alert
 
 var (
        libc_port_create,
        libc_port_associate,
        libc_port_dissociate,
-       libc_port_getn libcFunc
+       libc_port_getn,
+       libc_port_alert libcFunc
 )
 
 func errno() int32 {
@@ -108,6 +111,10 @@ func port_getn(port int32, evs *portevent, max uint32, nget *uint32, timeout *ti
        return int32(sysvicall5(&libc_port_getn, uintptr(port), uintptr(unsafe.Pointer(evs)), uintptr(max), uintptr(unsafe.Pointer(nget)), uintptr(unsafe.Pointer(timeout))))
 }
 
+func port_alert(port int32, flags, events uint32, user uintptr) int32 {
+       return int32(sysvicall4(&libc_port_alert, uintptr(port), uintptr(flags), uintptr(events), user))
+}
+
 var portfd int32 = -1
 
 func netpollinit() {
@@ -121,8 +128,8 @@ func netpollinit() {
        throw("runtime: netpollinit failed")
 }
 
-func netpolldescriptor() uintptr {
-       return uintptr(portfd)
+func netpollIsPollDescriptor(fd uintptr) bool {
+       return fd == uintptr(portfd)
 }
 
 func netpollopen(fd uintptr, pd *pollDesc) int32 {
@@ -178,6 +185,21 @@ func netpollarm(pd *pollDesc, mode int) {
        unlock(&pd.lock)
 }
 
+// netpollBreak interrupts a port_getn wait.
+func netpollBreak() {
+       // Use port_alert to put portfd into alert mode.
+       // This will wake up all threads sleeping in port_getn on portfd,
+       // and cause their calls to port_getn to return immediately.
+       // Further, until portfd is taken out of alert mode,
+       // all calls to port_getn will return immediately.
+       if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 {
+               if e := errno(); e != _EBUSY {
+                       println("runtime: port_alert failed with", e)
+                       throw("runtime: netpoll: port_alert failed")
+               }
+       }
+}
+
 // netpoll checks for ready network connections.
 // Returns list of goroutines that become runnable.
 // delay < 0: blocks indefinitely
@@ -224,6 +246,24 @@ retry:
        for i := 0; i < int(n); i++ {
                ev := &events[i]
 
+               if ev.portev_source == _PORT_SOURCE_ALERT {
+                       if ev.portev_events != _POLLHUP || unsafe.Pointer(ev.portev_user) != unsafe.Pointer(&portfd) {
+                               throw("runtime: netpoll: bad port_alert wakeup")
+                       }
+                       if delay != 0 {
+                               // Now that a blocking call to netpoll
+                               // has seen the alert, take portfd
+                               // back out of alert mode.
+                               // See the comment in netpollBreak.
+                               if port_alert(portfd, 0, 0, 0) < 0 {
+                                       e := errno()
+                                       println("runtime: port_alert failed with", e)
+                                       throw("runtime: netpoll: port_alert failed")
+                               }
+                       }
+                       continue
+               }
+
                if ev.portev_events == 0 {
                        continue
                }
index 3437a274915a18e4ddba814f652765c8c33c1a16..00c06a440b7b0d9de8acfde8fe50b24a2496d27a 100644 (file)
@@ -6,13 +6,30 @@
 
 package runtime
 
+import "runtime/internal/atomic"
+
 var netpollWaiters uint32
 
+var netpollStubLock mutex
+var netpollNote note
+var netpollBroken uint32
+
+func netpollBreak() {
+       if atomic.Cas(&netpollBroken, 0, 1) {
+               notewakeup(&netpollNote)
+       }
+}
+
 // Polls for ready network connections.
 // Returns list of goroutines that become runnable.
 func netpoll(delay int64) gList {
        // Implementation for platforms that do not support
        // integrated network poller.
+       if delay != 0 {
+               noteclear(&netpollNote)
+               atomic.Store(&netpollBroken, 0)
+               notetsleep(&netpollNote, delay)
+       }
        return gList{}
 }
 
index fde413677a9fe73a701a97f023ef2ec83f89bdab..ced52cbd3a3aa9c76bd1806294209c5709f073f3 100644 (file)
@@ -41,8 +41,8 @@ func netpollinit() {
        }
 }
 
-func netpolldescriptor() uintptr {
-       return iocphandle
+func netpollIsPollDescriptor(fd uintptr) bool {
+       return fd == iocphandle
 }
 
 func netpollopen(fd uintptr, pd *pollDesc) int32 {
@@ -61,6 +61,13 @@ func netpollarm(pd *pollDesc, mode int) {
        throw("runtime: unused")
 }
 
+func netpollBreak() {
+       if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 {
+               println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")")
+               throw("runtime: netpoll: PostQueuedCompletionStatus failed")
+       }
+}
+
 // netpoll checks for ready network connections.
 // Returns list of goroutines that become runnable.
 // delay < 0: blocks indefinitely
@@ -112,12 +119,20 @@ func netpoll(delay int64) gList {
                mp.blocked = false
                for i = 0; i < n; i++ {
                        op = entries[i].op
-                       errno = 0
-                       qty = 0
-                       if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 {
-                               errno = int32(getlasterror())
+                       if op != nil {
+                               errno = 0
+                               qty = 0
+                               if stdcall5(_WSAGetOverlappedResult, op.pd.fd, uintptr(unsafe.Pointer(op)), uintptr(unsafe.Pointer(&qty)), 0, uintptr(unsafe.Pointer(&flags))) == 0 {
+                                       errno = int32(getlasterror())
+                               }
+                               handlecompletion(&toRun, op, errno, qty)
+                       } else {
+                               if delay == 0 {
+                                       // Forward the notification to the
+                                       // blocked poller.
+                                       netpollBreak()
+                               }
                        }
-                       handlecompletion(&toRun, op, errno, qty)
                }
        } else {
                op = nil
@@ -139,6 +154,14 @@ func netpoll(delay int64) gList {
                        // dequeued failed IO packet, so report that
                }
                mp.blocked = false
+               if op == nil {
+                       if delay == 0 {
+                               // Forward the notification to the
+                               // blocked poller.
+                               netpollBreak()
+                       }
+                       return gList{}
+               }
                handlecompletion(&toRun, op, errno, qty)
        }
        return toRun
index 17742207661497788b74a43a330880af5ba33c2d..3cb9411a9cd80d9a1e3c37678866f7c4f927fab7 100644 (file)
@@ -24,8 +24,6 @@ const (
 
        // From <sys/lwp.h>
        _LWP_DETACHED = 0x00000040
-
-       _EAGAIN = 35
 )
 
 type mOS struct {
index be887a549d7a4fdb4608e7a5a1bd64ec2f63c205..351a99f7e9f61cd0c2d2af186fb488fa1a9fbd3e 100644 (file)
@@ -65,7 +65,6 @@ func setNonblock(fd int32)
 
 const (
        _ESRCH       = 3
-       _EAGAIN      = 35
        _EWOULDBLOCK = _EAGAIN
        _ENOTSUP     = 91
 
index 34d0627fcbfe58c3faffd44b5f17e26ce678c636..764db6edb0c977eb46520cb4308902648c00985e 100644 (file)
@@ -34,6 +34,7 @@ const (
 //go:cgo_import_dynamic runtime._GetThreadContext GetThreadContext%2 "kernel32.dll"
 //go:cgo_import_dynamic runtime._LoadLibraryW LoadLibraryW%1 "kernel32.dll"
 //go:cgo_import_dynamic runtime._LoadLibraryA LoadLibraryA%1 "kernel32.dll"
+//go:cgo_import_dynamic runtime._PostQueuedCompletionStatus PostQueuedCompletionStatus%4 "kernel32.dll"
 //go:cgo_import_dynamic runtime._ResumeThread ResumeThread%1 "kernel32.dll"
 //go:cgo_import_dynamic runtime._SetConsoleCtrlHandler SetConsoleCtrlHandler%2 "kernel32.dll"
 //go:cgo_import_dynamic runtime._SetErrorMode SetErrorMode%1 "kernel32.dll"
@@ -80,6 +81,7 @@ var (
        _GetThreadContext,
        _LoadLibraryW,
        _LoadLibraryA,
+       _PostQueuedCompletionStatus,
        _QueryPerformanceCounter,
        _QueryPerformanceFrequency,
        _ResumeThread,
index 6e6272e80a2a57248e7b02453aadae01f1429f17..3a1bf91fa5d9ac069e20ee69a29b74f4ad3d2a4e 100644 (file)
@@ -981,3 +981,42 @@ func TestPreemptionAfterSyscall(t *testing.T) {
 func TestGetgThreadSwitch(t *testing.T) {
        runtime.RunGetgThreadSwitchTest()
 }
+
+// TestNetpollBreak tests that netpollBreak can break a netpoll.
+// This test is not particularly safe since the call to netpoll
+// will pick up any stray files that are ready, but it should work
+// OK as long it is not run in parallel.
+func TestNetpollBreak(t *testing.T) {
+       if runtime.GOMAXPROCS(0) == 1 {
+               t.Skip("skipping: GOMAXPROCS=1")
+       }
+
+       // Make sure that netpoll is initialized.
+       time.Sleep(1)
+
+       start := time.Now()
+       c := make(chan bool, 2)
+       go func() {
+               c <- true
+               runtime.Netpoll(10 * time.Second.Nanoseconds())
+               c <- true
+       }()
+       <-c
+       // Loop because the break might get eaten by the scheduler.
+       // Break twice to break both the netpoll we started and the
+       // scheduler netpoll.
+loop:
+       for {
+               runtime.Usleep(100)
+               runtime.NetpollBreak()
+               runtime.NetpollBreak()
+               select {
+               case <-c:
+                       break loop
+               default:
+               }
+       }
+       if dur := time.Since(start); dur > 5*time.Second {
+               t.Errorf("netpollBreak did not interrupt netpoll: slept for: %v", dur)
+       }
+}