]> Cypherpunks repositories - gostls13.git/commitdiff
internal/poll, os: cancel pending I/O when closing pipes on Windows
authorWèi Cōngruì <crvv.mail@gmail.com>
Mon, 4 Mar 2019 10:07:07 +0000 (10:07 +0000)
committerIan Lance Taylor <iant@golang.org>
Tue, 19 Mar 2019 04:20:08 +0000 (04:20 +0000)
When closing a pipe, use CancelIoEx to cancel pending I/O.
This makes concurrent Read and Write calls return os.ErrClosed.

This change also enables some pipe tests on Windows.

Fixes #28477
Fixes #25835

Change-Id: If52bb7d80895763488a61632e4682a78336e8420
Reviewed-on: https://go-review.googlesource.com/c/go/+/164721
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Ian Lance Taylor <iant@golang.org>
src/internal/poll/fd_windows.go
src/internal/poll/sendfile_windows.go
src/os/file_windows.go
src/os/pipe_test.go

index f666b061e2d5bdcd725b778ce5ffc0bac673e8aa..92bab5f9dd9724dd2c67d54e31fd0aa555212735 100644 (file)
@@ -342,6 +342,7 @@ const (
        kindFile
        kindConsole
        kindDir
+       kindPipe
 )
 
 // logInitFD is set by tests to enable file descriptor initialization logging.
@@ -364,6 +365,8 @@ func (fd *FD) Init(net string, pollable bool) (string, error) {
                fd.kind = kindConsole
        case "dir":
                fd.kind = kindDir
+       case "pipe":
+               fd.kind = kindPipe
        case "tcp", "tcp4", "tcp6",
                "udp", "udp4", "udp6",
                "ip", "ip4", "ip6",
@@ -461,6 +464,9 @@ func (fd *FD) Close() error {
        if !fd.fdmu.increfAndClose() {
                return errClosing(fd.isFile)
        }
+       if fd.kind == kindPipe {
+               syscall.CancelIoEx(fd.Sysfd, nil)
+       }
        // unblock pending reader and writer
        fd.pd.evict()
        err := fd.decref()
@@ -505,6 +511,12 @@ func (fd *FD) Read(buf []byte) (int, error) {
                        n, err = fd.readConsole(buf)
                default:
                        n, err = syscall.Read(fd.Sysfd, buf)
+                       if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
+                               // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
+                               // If the fd is a pipe and the Read was interrupted by CancelIoEx,
+                               // we assume it is interrupted by Close.
+                               err = ErrFileClosing
+                       }
                }
                if err != nil {
                        n = 0
@@ -692,6 +704,12 @@ func (fd *FD) Write(buf []byte) (int, error) {
                                n, err = fd.writeConsole(b)
                        default:
                                n, err = syscall.Write(fd.Sysfd, b)
+                               if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
+                                       // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
+                                       // If the fd is a pipe and the Write was interrupted by CancelIoEx,
+                                       // we assume it is interrupted by Close.
+                                       err = ErrFileClosing
+                               }
                        }
                        if err != nil {
                                n = 0
index 17a3681064b66df8da5a3f2f3a25f5fd2076c5b8..0fe9b9b420922f8acedf5a9dd5c801e675da9071 100644 (file)
@@ -8,12 +8,8 @@ import "syscall"
 
 // SendFile wraps the TransmitFile call.
 func SendFile(fd *FD, src syscall.Handle, n int64) (int64, error) {
-       ft, err := syscall.GetFileType(src)
-       if err != nil {
-               return 0, err
-       }
-       // TransmitFile does not work with pipes
-       if ft == syscall.FILE_TYPE_PIPE {
+       if fd.kind == kindPipe {
+               // TransmitFile does not work with pipes
                return 0, syscall.ESPIPE
        }
 
index b0206d9200c508a13eba334a93f41e10cb249549..f311ae11d9ac36179b5b250bd8fbd82b848b5f00 100644 (file)
@@ -42,6 +42,9 @@ func newFile(h syscall.Handle, name string, kind string) *File {
                if syscall.GetConsoleMode(h, &m) == nil {
                        kind = "console"
                }
+               if t, err := syscall.GetFileType(h); err == nil && t == syscall.FILE_TYPE_PIPE {
+                       kind = "pipe"
+               }
        }
 
        f := &File{&file{
@@ -315,7 +318,7 @@ func Pipe() (r *File, w *File, err error) {
        if e != nil {
                return nil, nil, NewSyscallError("pipe", e)
        }
-       return newFile(p[0], "|0", "file"), newFile(p[1], "|1", "file"), nil
+       return newFile(p[0], "|0", "pipe"), newFile(p[1], "|1", "pipe"), nil
 }
 
 func tempDir() string {
index 779b2bdf85b3b2d34b1232cad45f0425e6cea90d..4c53bc985d2de4ced6cdeb636b768c3829bbbc82 100644 (file)
@@ -3,7 +3,7 @@
 // license that can be found in the LICENSE file.
 
 // Test broken pipes on Unix systems.
-// +build !windows,!plan9,!nacl,!js
+// +build !plan9,!nacl,!js
 
 package os_test
 
@@ -35,6 +35,11 @@ func TestEPIPE(t *testing.T) {
                t.Fatal(err)
        }
 
+       expect := syscall.EPIPE
+       if runtime.GOOS == "windows" {
+               // 232 is Windows error code ERROR_NO_DATA, "The pipe is being closed".
+               expect = syscall.Errno(232)
+       }
        // Every time we write to the pipe we should get an EPIPE.
        for i := 0; i < 20; i++ {
                _, err = w.Write([]byte("hi"))
@@ -47,13 +52,17 @@ func TestEPIPE(t *testing.T) {
                if se, ok := err.(*os.SyscallError); ok {
                        err = se.Err
                }
-               if err != syscall.EPIPE {
-                       t.Errorf("iteration %d: got %v, expected EPIPE", i, err)
+               if err != expect {
+                       t.Errorf("iteration %d: got %v, expected %v", i, err, expect)
                }
        }
 }
 
 func TestStdPipe(t *testing.T) {
+       switch runtime.GOOS {
+       case "windows":
+               t.Skip("Windows doesn't support SIGPIPE")
+       }
        testenv.MustHaveExec(t)
        r, w, err := os.Pipe()
        if err != nil {
@@ -195,8 +204,12 @@ func TestClosedPipeRaceWrite(t *testing.T) {
 // for unsupported file type." Currently it returns EAGAIN; it is
 // possible that in the future it will simply wait for data.
 func TestReadNonblockingFd(t *testing.T) {
+       switch runtime.GOOS {
+       case "windows":
+               t.Skip("Windows doesn't support SetNonblock")
+       }
        if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
-               fd := int(os.Stdin.Fd())
+               fd := syscallDescriptor(os.Stdin.Fd())
                syscall.SetNonblock(fd, true)
                defer syscall.SetNonblock(fd, false)
                _, err := os.Stdin.Read(make([]byte, 1))
@@ -226,7 +239,7 @@ func TestReadNonblockingFd(t *testing.T) {
 }
 
 func TestCloseWithBlockingReadByNewFile(t *testing.T) {
-       var p [2]int
+       var p [2]syscallDescriptor
        err := syscall.Pipe(p[:])
        if err != nil {
                t.Fatal(err)
@@ -276,8 +289,11 @@ func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
                if err == nil {
                        t.Error("I/O on closed pipe unexpectedly succeeded")
                }
-               if err != io.EOF {
-                       t.Errorf("got %v, expected io.EOF", err)
+               if pe, ok := err.(*os.PathError); ok {
+                       err = pe.Err
+               }
+               if err != io.EOF && err != os.ErrClosed {
+                       t.Errorf("got %v, expected EOF or closed", err)
                }
        }(c2)