]> Cypherpunks repositories - gostls13.git/commitdiff
internal/poll: assume we have CancelIoEX on Windows
authorIan Lance Taylor <iant@golang.org>
Tue, 24 Mar 2020 04:11:01 +0000 (21:11 -0700)
committerIan Lance Taylor <iant@golang.org>
Tue, 24 Mar 2020 20:19:40 +0000 (20:19 +0000)
As of the Go 1.11 release we require at least Windows 7, so CancelIoEx
is always available.  This lets us simplify the code to not require
dedicated threads to handle I/O requests.

Fixes #37956

Change-Id: If1dc4ac4acb61c43e4f2a9f26f225869050262a5
Reviewed-on: https://go-review.googlesource.com/c/go/+/225060
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Alex Brainman <alex.brainman@gmail.com>
src/internal/poll/fd_windows.go
src/internal/poll/sendfile_windows.go

index b330ae21a99ae6f0b646259ed3d00ebc7a09fa48..4b2623ea8f2ea13f58bd3885ecd2fe7bfcbc4da1 100644 (file)
@@ -9,7 +9,6 @@ import (
        "internal/race"
        "internal/syscall/windows"
        "io"
-       "runtime"
        "sync"
        "syscall"
        "unicode/utf16"
@@ -22,18 +21,6 @@ var (
        ioSync  uint64
 )
 
-// CancelIo Windows API cancels all outstanding IO for a particular
-// socket on current thread. To overcome that limitation, we run
-// special goroutine, locked to OS single thread, that both starts
-// and cancels IO. It means, there are 2 unavoidable thread switches
-// for every IO.
-// Some newer versions of Windows has new CancelIoEx API, that does
-// not have that limitation and can be used from any thread. This
-// package uses CancelIoEx API, if present, otherwise it fallback
-// to CancelIo.
-
-var canCancelIO bool // determines if CancelIoEx API is present
-
 // This package uses the SetFileCompletionNotificationModes Windows
 // API to skip calling GetQueuedCompletionStatus if an IO operation
 // completes synchronously. There is a known bug where
@@ -72,7 +59,6 @@ func init() {
        if e != nil {
                initErr = e
        }
-       canCancelIO = syscall.LoadCancelIoEx() == nil
        checkSetFileCompletionNotificationModes()
 }
 
@@ -90,7 +76,6 @@ type operation struct {
 
        // fields used only by net package
        fd     *FD
-       errc   chan error
        buf    syscall.WSABuf
        msg    windows.WSAMsg
        sa     syscall.Sockaddr
@@ -155,46 +140,15 @@ func (o *operation) InitMsg(p []byte, oob []byte) {
        }
 }
 
-// ioSrv executes net IO requests.
-type ioSrv struct {
-       req chan ioSrvReq
-}
-
-type ioSrvReq struct {
-       o      *operation
-       submit func(o *operation) error // if nil, cancel the operation
-}
-
-// ProcessRemoteIO will execute submit IO requests on behalf
-// of other goroutines, all on a single os thread, so it can
-// cancel them later. Results of all operations will be sent
-// back to their requesters via channel supplied in request.
-// It is used only when the CancelIoEx API is unavailable.
-func (s *ioSrv) ProcessRemoteIO() {
-       runtime.LockOSThread()
-       defer runtime.UnlockOSThread()
-       for r := range s.req {
-               if r.submit != nil {
-                       r.o.errc <- r.submit(r.o)
-               } else {
-                       r.o.errc <- syscall.CancelIo(r.o.fd.Sysfd)
-               }
-       }
-}
-
-// ExecIO executes a single IO operation o. It submits and cancels
+// execIO executes a single IO operation o. It submits and cancels
 // IO in the current thread for systems where Windows CancelIoEx API
 // is available. Alternatively, it passes the request onto
 // runtime netpoll and waits for completion or cancels request.
-func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, error) {
+func execIO(o *operation, submit func(o *operation) error) (int, error) {
        if o.fd.pd.runtimeCtx == 0 {
                return 0, errors.New("internal error: polling on unsupported descriptor type")
        }
 
-       if !canCancelIO {
-               onceStartServer.Do(startServer)
-       }
-
        fd := o.fd
        // Notify runtime netpoll about starting IO.
        err := fd.pd.prepare(int(o.mode), fd.isFile)
@@ -202,14 +156,7 @@ func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, erro
                return 0, err
        }
        // Start IO.
-       if canCancelIO {
-               err = submit(o)
-       } else {
-               // Send request to a special dedicated thread,
-               // so it can stop the IO with CancelIO later.
-               s.req <- ioSrvReq{o, submit}
-               err = <-o.errc
-       }
+       err = submit(o)
        switch err {
        case nil:
                // IO completed immediately
@@ -247,16 +194,11 @@ func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, erro
                panic("unexpected runtime.netpoll error: " + netpollErr.Error())
        }
        // Cancel our request.
-       if canCancelIO {
-               err := syscall.CancelIoEx(fd.Sysfd, &o.o)
-               // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
-               if err != nil && err != syscall.ERROR_NOT_FOUND {
-                       // TODO(brainman): maybe do something else, but panic.
-                       panic(err)
-               }
-       } else {
-               s.req <- ioSrvReq{o, nil}
-               <-o.errc
+       err = syscall.CancelIoEx(fd.Sysfd, &o.o)
+       // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
+       if err != nil && err != syscall.ERROR_NOT_FOUND {
+               // TODO(brainman): maybe do something else, but panic.
+               panic(err)
        }
        // Wait for cancellation to complete.
        fd.pd.waitCanceled(int(o.mode))
@@ -273,21 +215,6 @@ func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, erro
        return int(o.qty), nil
 }
 
-// Start helper goroutines.
-var rsrv, wsrv ioSrv
-var onceStartServer sync.Once
-
-func startServer() {
-       // This is called, once, when only the CancelIo API is available.
-       // Start two special goroutines, both locked to an OS thread,
-       // that start and cancel IO requests.
-       // One will process read requests, while the other will do writes.
-       rsrv.req = make(chan ioSrvReq)
-       go rsrv.ProcessRemoteIO()
-       wsrv.req = make(chan ioSrvReq)
-       go wsrv.ProcessRemoteIO()
-}
-
 // FD is a file descriptor. The net and os packages embed this type in
 // a larger type representing a network connection or OS file.
 type FD struct {
@@ -385,9 +312,9 @@ func (fd *FD) Init(net string, pollable bool) (string, error) {
                // if the user is doing their own overlapped I/O.
                // See issue #21172.
                //
-               // In general the code below avoids calling the ExecIO
-               // method for non-network sockets. If some method does
-               // somehow call ExecIO, then ExecIO, and therefore the
+               // In general the code below avoids calling the execIO
+               // function for non-network sockets. If some method does
+               // somehow call execIO, then execIO, and therefore the
                // calling method, will return an error, because
                // fd.pd.runtimeCtx will be 0.
                err = fd.pd.init(fd)
@@ -429,10 +356,6 @@ func (fd *FD) Init(net string, pollable bool) (string, error) {
        fd.wop.fd = fd
        fd.rop.runtimeCtx = fd.pd.runtimeCtx
        fd.wop.runtimeCtx = fd.pd.runtimeCtx
-       if !canCancelIO {
-               fd.rop.errc = make(chan error)
-               fd.wop.errc = make(chan error)
-       }
        return "", nil
 }
 
@@ -515,7 +438,7 @@ func (fd *FD) Read(buf []byte) (int, error) {
        } else {
                o := &fd.rop
                o.InitBuf(buf)
-               n, err = rsrv.ExecIO(o, func(o *operation) error {
+               n, err = execIO(o, func(o *operation) error {
                        return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
                })
                if race.Enabled {
@@ -655,7 +578,7 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) {
        defer fd.readUnlock()
        o := &fd.rop
        o.InitBuf(buf)
-       n, err := rsrv.ExecIO(o, func(o *operation) error {
+       n, err := execIO(o, func(o *operation) error {
                if o.rsa == nil {
                        o.rsa = new(syscall.RawSockaddrAny)
                }
@@ -711,7 +634,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
                        }
                        o := &fd.wop
                        o.InitBuf(b)
-                       n, err = wsrv.ExecIO(o, func(o *operation) error {
+                       n, err = execIO(o, func(o *operation) error {
                                return syscall.WSASend(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
                        })
                }
@@ -820,7 +743,7 @@ func (fd *FD) Writev(buf *[][]byte) (int64, error) {
        }
        o := &fd.wop
        o.InitBufs(buf)
-       n, err := wsrv.ExecIO(o, func(o *operation) error {
+       n, err := execIO(o, func(o *operation) error {
                return syscall.WSASend(o.fd.Sysfd, &o.bufs[0], uint32(len(o.bufs)), &o.qty, 0, &o.o, nil)
        })
        o.ClearBufs()
@@ -841,7 +764,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
                o := &fd.wop
                o.InitBuf(buf)
                o.sa = sa
-               n, err := wsrv.ExecIO(o, func(o *operation) error {
+               n, err := execIO(o, func(o *operation) error {
                        return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
                })
                return n, err
@@ -856,7 +779,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
                o := &fd.wop
                o.InitBuf(b)
                o.sa = sa
-               n, err := wsrv.ExecIO(o, func(o *operation) error {
+               n, err := execIO(o, func(o *operation) error {
                        return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
                })
                ntotal += int(n)
@@ -874,7 +797,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
 func (fd *FD) ConnectEx(ra syscall.Sockaddr) error {
        o := &fd.wop
        o.sa = ra
-       _, err := wsrv.ExecIO(o, func(o *operation) error {
+       _, err := execIO(o, func(o *operation) error {
                return ConnectExFunc(o.fd.Sysfd, o.sa, nil, 0, nil, &o.o)
        })
        return err
@@ -884,7 +807,7 @@ func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny, o *ope
        // Submit accept request.
        o.handle = s
        o.rsan = int32(unsafe.Sizeof(rawsa[0]))
-       _, err := rsrv.ExecIO(o, func(o *operation) error {
+       _, err := execIO(o, func(o *operation) error {
                return AcceptFunc(o.fd.Sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
        })
        if err != nil {
@@ -1008,7 +931,7 @@ func (fd *FD) RawRead(f func(uintptr) bool) error {
                if !fd.IsStream {
                        o.flags |= windows.MSG_PEEK
                }
-               _, err := rsrv.ExecIO(o, func(o *operation) error {
+               _, err := execIO(o, func(o *operation) error {
                        return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
                })
                if err == windows.WSAEMSGSIZE {
@@ -1078,7 +1001,7 @@ func (fd *FD) ReadMsg(p []byte, oob []byte) (int, int, int, syscall.Sockaddr, er
        o.rsa = new(syscall.RawSockaddrAny)
        o.msg.Name = o.rsa
        o.msg.Namelen = int32(unsafe.Sizeof(*o.rsa))
-       n, err := rsrv.ExecIO(o, func(o *operation) error {
+       n, err := execIO(o, func(o *operation) error {
                return windows.WSARecvMsg(o.fd.Sysfd, &o.msg, &o.qty, &o.o, nil)
        })
        err = fd.eofError(n, err)
@@ -1110,7 +1033,7 @@ func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, err
                o.msg.Name = (*syscall.RawSockaddrAny)(rsa)
                o.msg.Namelen = len
        }
-       n, err := wsrv.ExecIO(o, func(o *operation) error {
+       n, err := execIO(o, func(o *operation) error {
                return windows.WSASendMsg(o.fd.Sysfd, &o.msg, 0, &o.qty, &o.o, nil)
        })
        return n, int(o.msg.Control.Len), err
index 5674af418952578f25551230ebe3cd42df2f8a45..50c3ee86c0fc0565da9244797a96ac6f2b3af860 100644 (file)
@@ -57,7 +57,7 @@ func SendFile(fd *FD, src syscall.Handle, n int64) (written int64, err error) {
                o.o.Offset = uint32(curpos)
                o.o.OffsetHigh = uint32(curpos >> 32)
 
-               nw, err := wsrv.ExecIO(o, func(o *operation) error {
+               nw, err := execIO(o, func(o *operation) error {
                        return syscall.TransmitFile(o.fd.Sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
                })
                if err != nil {