]> Cypherpunks repositories - gostls13.git/commitdiff
runtime: converge duplicate calls to netpollBreak into one
authorAndy Pan <panjf2000@gmail.com>
Fri, 27 Mar 2020 03:21:17 +0000 (03:21 +0000)
committerIan Lance Taylor <iant@golang.org>
Fri, 27 Mar 2020 17:14:16 +0000 (17:14 +0000)
There might be some concurrent (maybe not concurrent, just sequential but in a short time window) and duplicate calls to `netpollBreak`, trying to wake up a net-poller. If one has called `netpollBreak` and that waking event hasn't been received by epollwait/kevent/..., then the subsequent calls of `netpollBreak` ought to be ignored or in other words, these calls should be converged into one.

Benchmarks go1.13.5 darwin/amd64:

benchmark-func           time/op (old)  time/op (new)  delta
BenchmarkNetpollBreak-4  29668ns ±1%    3131ns ±2%     -89.45%

mem/B (old)  mem/B (new)  delta
154B ±13%    0B ±0%       -100%

Change-Id: I3cf757a5d6edc5a99adad7aea3baee4b7f2a8f5c
GitHub-Last-Rev: 15bcfbab8a5db51f65da01315a5880a5dbf9e028
GitHub-Pull-Request: golang/go#36294
Reviewed-on: https://go-review.googlesource.com/c/go/+/212737
Run-TryBot: Emmanuel Odeke <emm.odeke@gmail.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Ian Lance Taylor <iant@golang.org>
src/runtime/netpoll_epoll.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_os_test.go [new file with mode: 0644]
src/runtime/netpoll_solaris.go
src/runtime/netpoll_windows.go

index b9dc18c939e9c3b644838a0207377d8d28c3f983..cc4c36f796370e94ee8685fc9a007e5fe3092c01 100644 (file)
@@ -6,7 +6,10 @@
 
 package runtime
 
-import "unsafe"
+import (
+       "runtime/internal/atomic"
+       "unsafe"
+)
 
 func epollcreate(size int32) int32
 func epollcreate1(flags int32) int32
@@ -22,6 +25,8 @@ var (
        epfd int32 = -1 // epoll descriptor
 
        netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
+
+       netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
 )
 
 func netpollinit() {
@@ -74,20 +79,22 @@ func netpollarm(pd *pollDesc, mode int) {
 
 // netpollBreak interrupts an epollwait.
 func netpollBreak() {
-       for {
-               var b byte
-               n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
-               if n == 1 {
-                       break
-               }
-               if n == -_EINTR {
-                       continue
-               }
-               if n == -_EAGAIN {
-                       return
+       if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
+               for {
+                       var b byte
+                       n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
+                       if n == 1 {
+                               break
+                       }
+                       if n == -_EINTR {
+                               continue
+                       }
+                       if n == -_EAGAIN {
+                               return
+                       }
+                       println("runtime: netpollBreak write failed with", -n)
+                       throw("runtime: netpollBreak write failed")
                }
-               println("runtime: netpollBreak write failed with", -n)
-               throw("runtime: netpollBreak write failed")
        }
 }
 
@@ -147,6 +154,7 @@ retry:
                                // if blocking.
                                var tmp [16]byte
                                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
+                               atomic.Storeuintptr(&netpollWakeSig, 0)
                        }
                        continue
                }
index 39d402252d622da66cdbd63af4b60fe9243b47c3..2ff21d8fcba3028867c9b42b79eeb4617668bc79 100644 (file)
@@ -8,12 +8,17 @@ package runtime
 
 // Integrated network poller (kqueue-based implementation).
 
-import "unsafe"
+import (
+       "runtime/internal/atomic"
+       "unsafe"
+)
 
 var (
        kq int32 = -1
 
        netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
+
+       netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
 )
 
 func netpollinit() {
@@ -78,17 +83,19 @@ func netpollarm(pd *pollDesc, mode int) {
 
 // netpollBreak interrupts a kevent.
 func netpollBreak() {
-       for {
-               var b byte
-               n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
-               if n == 1 || n == -_EAGAIN {
-                       break
-               }
-               if n == -_EINTR {
-                       continue
+       if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
+               for {
+                       var b byte
+                       n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
+                       if n == 1 || n == -_EAGAIN {
+                               break
+                       }
+                       if n == -_EINTR {
+                               continue
+                       }
+                       println("runtime: netpollBreak write failed with", -n)
+                       throw("runtime: netpollBreak write failed")
                }
-               println("runtime: netpollBreak write failed with", -n)
-               throw("runtime: netpollBreak write failed")
        }
 }
 
@@ -145,6 +152,7 @@ retry:
                                // if blocking.
                                var tmp [16]byte
                                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
+                               atomic.Storeuintptr(&netpollWakeSig, 0)
                        }
                        continue
                }
diff --git a/src/runtime/netpoll_os_test.go b/src/runtime/netpoll_os_test.go
new file mode 100644 (file)
index 0000000..b96b9f3
--- /dev/null
@@ -0,0 +1,28 @@
+package runtime_test
+
+import (
+       "runtime"
+       "sync"
+       "testing"
+)
+
+var wg sync.WaitGroup
+
+func init() {
+       runtime.NetpollGenericInit()
+}
+
+func BenchmarkNetpollBreak(b *testing.B) {
+       b.StartTimer()
+       for i := 0; i < b.N; i++ {
+               for j := 0; j < 10; j++ {
+                       wg.Add(1)
+                       go func() {
+                               runtime.NetpollBreak()
+                               wg.Done()
+                       }()
+               }
+       }
+       wg.Wait()
+       b.StopTimer()
+}
index 15818cb4eabc21502d8097bfd99b50cabd99649c..34b3ee9308ea7ed4b1b232284dd04620f18c408e 100644 (file)
@@ -4,7 +4,10 @@
 
 package runtime
 
-import "unsafe"
+import (
+       "runtime/internal/atomic"
+       "unsafe"
+)
 
 // Solaris runtime-integrated network poller.
 //
@@ -85,6 +88,7 @@ var (
        libc_port_dissociate,
        libc_port_getn,
        libc_port_alert libcFunc
+       netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
 )
 
 func errno() int32 {
@@ -187,15 +191,17 @@ func netpollarm(pd *pollDesc, mode int) {
 
 // netpollBreak interrupts a port_getn wait.
 func netpollBreak() {
-       // Use port_alert to put portfd into alert mode.
-       // This will wake up all threads sleeping in port_getn on portfd,
-       // and cause their calls to port_getn to return immediately.
-       // Further, until portfd is taken out of alert mode,
-       // all calls to port_getn will return immediately.
-       if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 {
-               if e := errno(); e != _EBUSY {
-                       println("runtime: port_alert failed with", e)
-                       throw("runtime: netpoll: port_alert failed")
+       if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
+               // Use port_alert to put portfd into alert mode.
+               // This will wake up all threads sleeping in port_getn on portfd,
+               // and cause their calls to port_getn to return immediately.
+               // Further, until portfd is taken out of alert mode,
+               // all calls to port_getn will return immediately.
+               if port_alert(portfd, _PORT_ALERT_UPDATE, _POLLHUP, uintptr(unsafe.Pointer(&portfd))) < 0 {
+                       if e := errno(); e != _EBUSY {
+                               println("runtime: port_alert failed with", e)
+                               throw("runtime: netpoll: port_alert failed")
+                       }
                }
        }
 }
@@ -268,6 +274,7 @@ retry:
                                        println("runtime: port_alert failed with", e)
                                        throw("runtime: netpoll: port_alert failed")
                                }
+                               atomic.Storeuintptr(&netpollWakeSig, 0)
                        }
                        continue
                }
index 28b2f6ef3b1780c5ca5b6f1334b539682c57045b..56a0798559123e996a69700a59f87227860a2cf7 100644 (file)
@@ -5,6 +5,7 @@
 package runtime
 
 import (
+       "runtime/internal/atomic"
        "unsafe"
 )
 
@@ -31,7 +32,11 @@ type overlappedEntry struct {
        qty      uint32
 }
 
-var iocphandle uintptr = _INVALID_HANDLE_VALUE // completion port io handle
+var (
+       iocphandle uintptr = _INVALID_HANDLE_VALUE // completion port io handle
+
+       netpollWakeSig uintptr // used to avoid duplicate calls of netpollBreak
+)
 
 func netpollinit() {
        iocphandle = stdcall4(_CreateIoCompletionPort, _INVALID_HANDLE_VALUE, 0, 0, _DWORD_MAX)
@@ -62,9 +67,11 @@ func netpollarm(pd *pollDesc, mode int) {
 }
 
 func netpollBreak() {
-       if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 {
-               println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")")
-               throw("runtime: netpoll: PostQueuedCompletionStatus failed")
+       if atomic.Casuintptr(&netpollWakeSig, 0, 1) {
+               if stdcall4(_PostQueuedCompletionStatus, iocphandle, 0, 0, 0) == 0 {
+                       println("runtime: netpoll: PostQueuedCompletionStatus failed (errno=", getlasterror(), ")")
+                       throw("runtime: netpoll: PostQueuedCompletionStatus failed")
+               }
        }
 }
 
@@ -126,6 +133,7 @@ func netpoll(delay int64) gList {
                        }
                        handlecompletion(&toRun, op, errno, qty)
                } else {
+                       atomic.Storeuintptr(&netpollWakeSig, 0)
                        if delay == 0 {
                                // Forward the notification to the
                                // blocked poller.