]> Cypherpunks repositories - gostls13.git/commitdiff
internal/poll: honor ERROR_OPERATION_ABORTED if pipe is not closed
authorqmuntal <quimmuntal@gmail.com>
Fri, 28 Mar 2025 19:38:34 +0000 (20:38 +0100)
committerQuim Muntal <quimmuntal@gmail.com>
Fri, 28 Mar 2025 21:14:48 +0000 (14:14 -0700)
FD.Read converts a syscall.ERROR_OPERATION_ABORTED error to
ErrFileClosing. It does that in case the pipe operation was aborted by
a CancelIoEx call in FD.Close.

It doesn't take into account that the operation might have been
aborted by a CancelIoEx call in external code. In that case, the
operation should return the error as is.

Change-Id: I75dcf0edaace8b57dc47b398ea591ca9f116112b
Reviewed-on: https://go-review.googlesource.com/c/go/+/661555
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
src/internal/poll/fd_mutex.go
src/internal/poll/fd_windows.go
src/internal/poll/fd_windows_test.go

index 0a8ee6f0d49d5c205844bfd62771d6effa9a6e7f..4d194df1864dcc476c54a423b07e5b153eac8780 100644 (file)
@@ -250,3 +250,8 @@ func (fd *FD) writeUnlock() {
                fd.destroy()
        }
 }
+
+// closing returns true if fd is closing.
+func (fd *FD) closing() bool {
+       return atomic.LoadUint64(&fd.fdmu.state)&mutexClosed != 0
+}
index 81c8293911d4034b89f29e4a3c5b104d8bf7606f..1caa760349aa5db450d3e0aba6d0e205c26273e7 100644 (file)
@@ -461,10 +461,12 @@ func (fd *FD) Read(buf []byte) (int, error) {
                                // Returned by pipes when the other end is closed.
                                err = nil
                        case syscall.ERROR_OPERATION_ABORTED:
-                               // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
-                               // If the fd is a pipe and the Read was interrupted by CancelIoEx,
-                               // we assume it is interrupted by Close.
-                               err = ErrFileClosing
+                               if fd.closing() {
+                                       // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
+                                       // If the fd is a pipe and the Read was interrupted by CancelIoEx,
+                                       // we assume it is interrupted by Close.
+                                       err = ErrFileClosing
+                               }
                        }
                }
        case kindNet:
@@ -717,7 +719,7 @@ func (fd *FD) Write(buf []byte) (int, error) {
                                return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
                        })
                        fd.addOffset(n)
-                       if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
+                       if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED && fd.closing() {
                                // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
                                // If the fd is a pipe and the Write was interrupted by CancelIoEx,
                                // we assume it is interrupted by Close.
index f5fa4a26e351620cb47b4e2a6fa144bd1835db01..042bdf8bedeb858c8fe1ff0121c6bdf9b66834f9 100644 (file)
@@ -18,6 +18,7 @@ import (
        "sync/atomic"
        "syscall"
        "testing"
+       "time"
        "unsafe"
 )
 
@@ -339,7 +340,9 @@ func testPreadPwrite(t *testing.T, fdr, fdw *poll.FD) {
 }
 
 func TestFile(t *testing.T) {
+       t.Parallel()
        test := func(t *testing.T, r, w bool) {
+               t.Parallel()
                name := filepath.Join(t.TempDir(), "foo")
                rh := newFile(t, name, r)
                wh := newFile(t, name, w)
@@ -361,27 +364,33 @@ func TestFile(t *testing.T) {
 }
 
 func TestPipe(t *testing.T) {
+       t.Parallel()
        t.Run("overlapped", func(t *testing.T) {
+               t.Parallel()
                name, pipe := newPipe(t, true, false)
                file := newFile(t, name, true)
                testReadWrite(t, pipe, file)
        })
        t.Run("overlapped-write", func(t *testing.T) {
+               t.Parallel()
                name, pipe := newPipe(t, true, false)
                file := newFile(t, name, false)
                testReadWrite(t, file, pipe)
        })
        t.Run("overlapped-read", func(t *testing.T) {
+               t.Parallel()
                name, pipe := newPipe(t, false, false)
                file := newFile(t, name, true)
                testReadWrite(t, file, pipe)
        })
        t.Run("sync", func(t *testing.T) {
+               t.Parallel()
                name, pipe := newPipe(t, false, false)
                file := newFile(t, name, false)
                testReadWrite(t, file, pipe)
        })
        t.Run("anonymous", func(t *testing.T) {
+               t.Parallel()
                var r, w syscall.Handle
                if err := syscall.CreatePipe(&r, &w, nil, 0); err != nil {
                        t.Fatal(err)
@@ -402,6 +411,7 @@ func TestPipe(t *testing.T) {
 }
 
 func TestPipeWriteEOF(t *testing.T) {
+       t.Parallel()
        name, pipe := newPipe(t, false, true)
        file := newFile(t, name, false)
        read := make(chan struct{}, 1)
@@ -423,6 +433,39 @@ func TestPipeWriteEOF(t *testing.T) {
        }
 }
 
+func TestPipeCanceled(t *testing.T) {
+       t.Parallel()
+       name, _ := newPipe(t, true, false)
+       file := newFile(t, name, true)
+       ch := make(chan struct{}, 1)
+       go func() {
+               for {
+                       select {
+                       case <-ch:
+                               return
+                       default:
+                               syscall.CancelIo(syscall.Handle(file.Sysfd))
+                               time.Sleep(100 * time.Millisecond)
+                       }
+               }
+       }()
+       // Try to cancel for max 1 second.
+       // Canceling is normally really fast, but it can take an
+       // arbitrary amount of time on busy systems.
+       // If it takes too long, we skip the test.
+       file.SetReadDeadline(time.Now().Add(1 * time.Second))
+       var tmp [1]byte
+       // Read will block until the cancel is complete.
+       _, err := file.Read(tmp[:])
+       ch <- struct{}{}
+       if err == poll.ErrDeadlineExceeded {
+               t.Skip("took too long to cancel")
+       }
+       if err != syscall.ERROR_OPERATION_ABORTED {
+               t.Errorf("expected ERROR_OPERATION_ABORTED, got %v", err)
+       }
+}
+
 func BenchmarkReadOverlapped(b *testing.B) {
        benchmarkRead(b, true)
 }