-On Windows, [NewFile] supports overlapped (a.k.a non-blocking) file handles even
-when the handle can't be added to the Go runtime I/O Completion Port (IOCP), normally
-because it is already attached to another IOCP. The I/O operations will be performed in
-a blocking manner instead of using the Go runtime IOCP.
-Particularly, this means that is now possible to reliably pass overlapped named pipes and
-sockets to a Go process standard input, output, and error.
\ No newline at end of file
+On Windows, [NewFile] now supports handles opened for asynchronous I/O (that is,
+[syscall.FILE_FLAG_OVERLAPPED] is specified in the [syscall.CreateFile] call).
+These handles are associated with the Go runtime's I/O completion port,
+which provides the following benefits for the resulting [File]:
+
+- I/O methods ([File.Read], [File.Write], [File.ReadAt], and [File.WriteAt]) do not block an OS thread.
+- Deadline methods ([File.SetDeadline], [File.SetReadDeadline], and [File.SetWriteDeadline]) are supported.
+
+This enhancement is especially beneficial for applications that communicate via named pipes on Windows.
+
+Note that a handle can only be associated with one completion port at a time.
+If the handle provided to [NewFile] is already associated with a completion port,
+the returned [File] is downgraded to synchronous I/O mode.
+In this case, I/O methods will block an OS thread, and the deadline methods have no effect.
isBlocking bool
// Initialization parameters.
- initIOOnce sync.Once
- initIOErr error // only used in the net package
- initPollable bool // value passed to [FD.Init]
+ initIOOnce sync.Once
+ initIOErr error // only used in the net package
}
// setOffset sets the offset fields of the overlapped object
)
func (fd *FD) initIO() error {
+ if fd.isBlocking {
+ return nil
+ }
fd.initIOOnce.Do(func() {
- if fd.initPollable {
- // The runtime poller will ignore I/O completion
- // notifications not initiated by this package,
- // so it is safe to add handles owned by the caller.
- //
- // If we could not add the handle to the runtime poller,
- // assume the handle hasn't been opened for overlapped I/O.
- fd.initIOErr = fd.pd.init(fd)
- if fd.initIOErr != nil {
- fd.initPollable = false
- }
- }
- if !fd.initPollable {
- // Handle opened for overlapped I/O (aka non-blocking) that are not added
- // to the runtime poller need special handling when reading and writing.
- // If we fail to get the file mode information, assume the file is blocking.
- overlapped, _ := windows.IsNonblock(fd.Sysfd)
- fd.isBlocking = !overlapped
+ // The runtime poller will ignore I/O completion
+ // notifications not initiated by this package,
+ // so it is safe to add handles owned by the caller.
+ fd.initIOErr = fd.pd.init(fd)
+ if fd.initIOErr != nil {
+ // This can happen if the handle is already associated
+ // with another IOCP or if the isBlocking flag is incorrect.
+ // In both cases, fallback to synchronous IO.
+ fd.isBlocking = true
fd.skipSyncNotif = true
- } else {
- fd.rop.runtimeCtx = fd.pd.runtimeCtx
- fd.wop.runtimeCtx = fd.pd.runtimeCtx
- if fd.kind != kindNet || socketCanUseSetFileCompletionNotificationModes {
- // Non-socket handles can use SetFileCompletionNotificationModes without problems.
- err := syscall.SetFileCompletionNotificationModes(fd.Sysfd,
- syscall.FILE_SKIP_SET_EVENT_ON_HANDLE|syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
- )
- fd.skipSyncNotif = err == nil
- }
+ return
+ }
+ fd.rop.runtimeCtx = fd.pd.runtimeCtx
+ fd.wop.runtimeCtx = fd.pd.runtimeCtx
+ if fd.kind != kindNet || socketCanUseSetFileCompletionNotificationModes {
+ // Non-socket handles can use SetFileCompletionNotificationModes without problems.
+ err := syscall.SetFileCompletionNotificationModes(fd.Sysfd,
+ syscall.FILE_SKIP_SET_EVENT_ON_HANDLE|syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
+ )
+ fd.skipSyncNotif = err == nil
}
})
return fd.initIOErr
// The net argument is a network name from the net package (e.g., "tcp"),
// or "file" or "console" or "dir".
// Set pollable to true if fd should be managed by runtime netpoll.
+// Pollable must be set to true for overlapped fds.
func (fd *FD) Init(net string, pollable bool) error {
if initErr != nil {
return initErr
fd.kind = kindNet
}
fd.isFile = fd.kind != kindNet
- fd.initPollable = pollable
+ fd.isBlocking = !pollable
+ fd.skipSyncNotif = fd.isBlocking
fd.rop.mode = 'r'
fd.wop.mode = 'w'
fd.rop.fd = fd
package poll_test
import (
- "bytes"
"errors"
"internal/poll"
"internal/syscall/windows"
"io"
"os"
"path/filepath"
- "strconv"
- "sync"
- "sync/atomic"
"syscall"
"testing"
- "time"
"unsafe"
)
SynRetrans uint8
}
-func newFD(t testing.TB, h syscall.Handle, kind string, overlapped, pollable bool) *poll.FD {
+func newFD(t testing.TB, h syscall.Handle, kind string, overlapped bool) *poll.FD {
fd := poll.FD{
Sysfd: h,
IsStream: true,
ZeroReadIsEOF: true,
}
- err := fd.Init(kind, pollable)
+ err := fd.Init(kind, overlapped)
if overlapped && err != nil {
// Overlapped file handles should not error.
fd.Close()
return &fd
}
-func newFile(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
+func newFile(t testing.TB, name string, overlapped bool) *poll.FD {
namep, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
if typ == syscall.FILE_TYPE_PIPE {
kind = "pipe"
}
- return newFD(t, h, kind, overlapped, pollable)
-}
-
-var currentProces = sync.OnceValue(func() string {
- // Convert the process ID to a string.
- return strconv.FormatUint(uint64(os.Getpid()), 10)
-})
-
-var pipeCounter atomic.Uint64
-
-func newBytePipe(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
- return newPipe(t, name, false, overlapped, pollable)
-}
-
-func newMessagePipe(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
- return newPipe(t, name, true, overlapped, pollable)
-}
-
-func pipeName() string {
- return `\\.\pipe\go-internal-poll-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
-}
-
-func newPipe(t testing.TB, name string, message, overlapped, pollable bool) *poll.FD {
- wname, err := syscall.UTF16PtrFromString(name)
- if err != nil {
- t.Fatal(err)
- }
- // Create the read handle.
- flags := windows.PIPE_ACCESS_DUPLEX
- if overlapped {
- flags |= syscall.FILE_FLAG_OVERLAPPED
- }
- typ := windows.PIPE_TYPE_BYTE | windows.PIPE_READMODE_BYTE
- if message {
- typ = windows.PIPE_TYPE_MESSAGE | windows.PIPE_READMODE_MESSAGE
- }
- h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
- if err != nil {
- t.Fatal(err)
- }
- return newFD(t, h, "pipe", overlapped, pollable)
-}
-
-func testReadWrite(t *testing.T, fdr, fdw *poll.FD) {
- write := make(chan string, 1)
- read := make(chan struct{}, 1)
- go func() {
- for s := range write {
- n, err := fdw.Write([]byte(s))
- read <- struct{}{}
- if err != nil {
- t.Error(err)
- }
- if n != len(s) {
- t.Errorf("expected to write %d bytes, got %d", len(s), n)
- }
- }
- }()
- for i := range 10 {
- s := strconv.Itoa(i)
- write <- s
- <-read
- buf := make([]byte, len(s))
- _, err := io.ReadFull(fdr, buf)
- if err != nil {
- t.Fatalf("read failed: %v", err)
- }
- if !bytes.Equal(buf, []byte(s)) {
- t.Fatalf("expected %q, got %q", s, buf)
- }
- }
- close(read)
- close(write)
-}
-
-func testPreadPwrite(t *testing.T, fdr, fdw *poll.FD) {
- type op struct {
- s string
- off int64
- }
- write := make(chan op, 1)
- read := make(chan struct{}, 1)
- go func() {
- for o := range write {
- n, err := fdw.Pwrite([]byte(o.s), o.off)
- read <- struct{}{}
- if err != nil {
- t.Error(err)
- }
- if n != len(o.s) {
- t.Errorf("expected to write %d bytes, got %d", len(o.s), n)
- }
- }
- }()
- for i := range 10 {
- off := int64(i % 3) // exercise some back and forth
- s := strconv.Itoa(i)
- write <- op{s, off}
- <-read
- buf := make([]byte, len(s))
- n, err := fdr.Pread(buf, off)
- if err != nil {
- t.Fatal(err)
- }
- if n != len(s) {
- t.Fatalf("expected to read %d bytes, got %d", len(s), n)
- }
- if !bytes.Equal(buf, []byte(s)) {
- t.Fatalf("expected %q, got %q", s, buf)
- }
- }
- close(read)
- close(write)
-}
-
-func testFileReadEOF(t *testing.T, f *poll.FD) {
- end, err := f.Seek(0, io.SeekEnd)
- if err != nil {
- t.Fatal(err)
- }
- var buf [1]byte
- n, err := f.Read(buf[:])
- if err != nil && err != io.EOF {
- t.Errorf("expected EOF, got %v", err)
- }
- if n != 0 {
- t.Errorf("expected 0 bytes, got %d", n)
- }
-
- n, err = f.Pread(buf[:], end)
- if err != nil && err != io.EOF {
- t.Errorf("expected EOF, got %v", err)
- }
- if n != 0 {
- t.Errorf("expected 0 bytes, got %d", n)
- }
-}
-
-func TestFile(t *testing.T) {
- t.Parallel()
- tests := []struct {
- name string
- overlappedRead bool
- overlappedWrite bool
- pollable bool
- }{
- {"overlapped", true, true, true},
- {"overlapped-nonpollable", true, true, false},
- {"overlapped-read", true, false, true},
- {"overlapped-write", false, true, true},
- {"sync", false, false, false},
- {"sync-pollable", false, false, true},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- t.Parallel()
- name := filepath.Join(t.TempDir(), "foo")
- rh := newFile(t, name, tt.overlappedRead, tt.pollable)
- wh := newFile(t, name, tt.overlappedWrite, tt.pollable)
- testReadWrite(t, rh, wh)
- testPreadPwrite(t, rh, wh)
- testFileReadEOF(t, rh)
- })
- }
-}
-
-func TestPipe(t *testing.T) {
- t.Parallel()
- tests := []struct {
- name string
- overlappedRead bool
- overlappedWrite bool
- pollable bool
- }{
- {"overlapped", true, true, true},
- {"overlapped-nonpollable", true, true, false},
- {"overlapped-write", false, true, true},
- {"overlapped-read", true, false, true},
- {"sync", false, false, false},
- {"sync-pollable", false, false, true},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- t.Parallel()
- name := pipeName()
- pipe := newBytePipe(t, name, tt.overlappedWrite, tt.pollable)
- file := newFile(t, name, tt.overlappedRead, tt.pollable)
- testReadWrite(t, pipe, file)
- })
- }
- t.Run("anonymous", func(t *testing.T) {
- t.Parallel()
- var r, w syscall.Handle
- if err := syscall.CreatePipe(&r, &w, nil, 0); err != nil {
- t.Fatal(err)
- }
- defer func() {
- if err := syscall.CloseHandle(r); err != nil {
- t.Fatal(err)
- }
- if err := syscall.CloseHandle(w); err != nil {
- t.Fatal(err)
- }
- }()
- // CreatePipe always returns sync handles.
- fdr := newFD(t, r, "pipe", false, false)
- fdw := newFD(t, w, "file", false, false)
- testReadWrite(t, fdr, fdw)
- })
-}
-
-func TestPipeMessageReadEOF(t *testing.T) {
- t.Parallel()
- name := pipeName()
- pipe := newMessagePipe(t, name, true, true)
- file := newFile(t, name, true, true)
-
- _, err := pipe.Write(nil)
- if err != nil {
- t.Error(err)
- }
-
- var buf [10]byte
- n, err := file.Read(buf[:])
- if err != io.EOF {
- t.Errorf("expected EOF, got %v", err)
- }
- if n != 0 {
- t.Errorf("expected 0 bytes, got %d", n)
- }
-}
-
-func TestPipeClosedEOF(t *testing.T) {
- t.Parallel()
- name := pipeName()
- pipe := newBytePipe(t, name, true, false)
- file := newFile(t, name, true, true)
-
- pipe.Close()
-
- var buf [10]byte
- n, err := file.Read(buf[:])
- if err != io.EOF {
- t.Errorf("expected EOF, got %v", err)
- }
- if n != 0 {
- t.Errorf("expected 0 bytes, got %d", n)
- }
-}
-
-func TestPipeReadTimeout(t *testing.T) {
- t.Parallel()
- name := pipeName()
- _ = newBytePipe(t, name, true, true)
- file := newFile(t, name, true, true)
-
- err := file.SetReadDeadline(time.Now().Add(time.Millisecond))
- if err != nil {
- t.Fatal(err)
- }
-
- var buf [10]byte
- _, err = file.Read(buf[:])
- if err != poll.ErrDeadlineExceeded {
- t.Errorf("expected deadline exceeded, got %v", err)
- }
-}
-
-func TestPipeCanceled(t *testing.T) {
- t.Parallel()
- name := pipeName()
- _ = newBytePipe(t, name, true, true)
- file := newFile(t, name, true, true)
- ch := make(chan struct{}, 1)
- go func() {
- for {
- select {
- case <-ch:
- return
- default:
- syscall.CancelIo(syscall.Handle(file.Sysfd))
- time.Sleep(100 * time.Millisecond)
- }
- }
- }()
- // Try to cancel for max 1 second.
- // Canceling is normally really fast, but it can take an
- // arbitrary amount of time on busy systems.
- // If it takes too long, we skip the test.
- file.SetReadDeadline(time.Now().Add(1 * time.Second))
- var tmp [1]byte
- // Read will block until the cancel is complete.
- _, err := file.Read(tmp[:])
- ch <- struct{}{}
- if err == poll.ErrDeadlineExceeded {
- t.Skip("took too long to cancel")
- }
- if err != syscall.ERROR_OPERATION_ABORTED {
- t.Errorf("expected ERROR_OPERATION_ABORTED, got %v", err)
- }
-}
-
-func TestPipeExternalIOCP(t *testing.T) {
- // Test that a caller can associate an overlapped handle to an external IOCP
- // even when the handle is also associated to a poll.FD. Also test that
- // the FD can still perform I/O after the association.
- t.Parallel()
- name := pipeName()
- pipe := newMessagePipe(t, name, true, true)
- _ = newFile(t, name, true, true) // Just open a pipe client
-
- _, err := windows.CreateIoCompletionPort(syscall.Handle(pipe.Sysfd), 0, 0, 1)
- if err != nil {
- t.Fatal(err)
- }
-
- _, err = pipe.Write([]byte("hello"))
- if err != nil {
- t.Fatal(err)
- }
+ return newFD(t, h, kind, overlapped)
}
func BenchmarkReadOverlapped(b *testing.B) {
if err != nil {
b.Fatal(err)
}
- file := newFile(b, name, overlapped, true)
+ file := newFile(b, name, overlapped)
var buf [len(content)]byte
for b.Loop() {
_, err := io.ReadFull(file, buf[:])
return e.Err
}
+// NewFile returns a new [File] with the given file descriptor and name.
+// The returned value will be nil if fd is not a valid file descriptor.
+//
+// NewFile's behavior differs on some platforms:
+//
+// - On Unix, if fd is in non-blocking mode, NewFile will attempt to return a pollable file.
+// - On Windows, if fd is opened for asynchronous I/O (that is, [syscall.FILE_FLAG_OVERLAPPED]
+// has been specified in the [syscall.CreateFile] call), NewFile will attempt to return a pollable
+// file by associating fd with the Go runtime I/O completion port.
+// The I/O operations will be performed synchronously if the association fails.
+//
+// Only pollable files support [File.SetDeadline], [File.SetReadDeadline], and [File.SetWriteDeadline].
+//
+// After passing it to NewFile, fd may become invalid under the same conditions described
+// in the comments of [File.Fd], and the same constraints apply.
+func NewFile(fd uintptr, name string) *File {
+ return newFileFromNewFile(fd, name)
+}
+
// Read reads up to len(b) bytes from the File and stores them in b.
// It returns the number of bytes read and any error encountered.
// At end of file, Read returns 0, io.EOF.
return uintptr(f.sysfd)
}
-// NewFile returns a new File with the given file descriptor and
-// name. The returned value will be nil if fd is not a valid file
-// descriptor.
-func NewFile(fd uintptr, name string) *File {
+// newFileFromNewFile is called by [NewFile].
+func newFileFromNewFile(fd uintptr, name string) *File {
fdi := int(fd)
if fdi < 0 {
return nil
return uintptr(f.pfd.Sysfd)
}
-// NewFile returns a new File with the given file descriptor and
-// name. The returned value will be nil if fd is not a valid file
-// descriptor. On Unix systems, if the file descriptor is in
-// non-blocking mode, NewFile will attempt to return a pollable File
-// (one for which the SetDeadline methods work).
-//
-// After passing it to NewFile, fd may become invalid under the same
-// conditions described in the comments of the Fd method, and the same
-// constraints apply.
-func NewFile(fd uintptr, name string) *File {
+// newFileFromNewFile is called by [NewFile].
+func newFileFromNewFile(fd uintptr, name string) *File {
fdi := int(fd)
if fdi < 0 {
return nil
// newFile returns a new File with the given file handle and name.
// Unlike NewFile, it does not check that h is syscall.InvalidHandle.
-func newFile(h syscall.Handle, name string, kind string) *File {
+// If nonBlocking is true, it tries to add the file to the runtime poller.
+func newFile(h syscall.Handle, name string, kind string, nonBlocking bool) *File {
if kind == "file" {
t, err := syscall.GetFileType(h)
if err != nil || t == syscall.FILE_TYPE_CHAR {
// Ignore initialization errors.
// Assume any problems will show up in later I/O.
- f.pfd.Init(kind, false)
-
+ f.pfd.Init(kind, nonBlocking)
return f
}
// newConsoleFile creates new File that will be used as console.
func newConsoleFile(h syscall.Handle, name string) *File {
- return newFile(h, name, "console")
+ return newFile(h, name, "console", false)
}
-// NewFile returns a new File with the given file descriptor and
-// name. The returned value will be nil if fd is not a valid file
-// descriptor.
-func NewFile(fd uintptr, name string) *File {
+// newFileFromNewFile is called by [NewFile].
+func newFileFromNewFile(fd uintptr, name string) *File {
h := syscall.Handle(fd)
if h == syscall.InvalidHandle {
return nil
}
- return newFile(h, name, "file")
+ nonBlocking, _ := windows.IsNonblock(syscall.Handle(fd))
+ return newFile(h, name, "file", nonBlocking)
}
func epipecheck(file *File, e error) {
if err != nil {
return nil, &PathError{Op: "open", Path: name, Err: err}
}
- return newFile(r, name, "file"), nil
+ // syscall.Open always returns a non-blocking handle.
+ return newFile(r, name, "file", false), nil
}
func openDirNolog(name string) (*File, error) {
if e != nil {
return nil, nil, NewSyscallError("pipe", e)
}
- return newFile(p[0], "|0", "pipe"), newFile(p[1], "|1", "pipe"), nil
+ // syscall.Pipe always returns a non-blocking handle.
+ return newFile(p[0], "|0", "pipe", false), newFile(p[1], "|1", "pipe", false), nil
}
var useGetTempPath2 = sync.OnceValue(func() bool {
"sync/atomic"
"syscall"
"testing"
+ "time"
"unicode/utf16"
"unsafe"
)
}
}
-var currentProces = sync.OnceValue(func() string {
- // Convert the process ID to a string.
- return strconv.FormatUint(uint64(os.Getpid()), 10)
-})
-
-var pipeCounter atomic.Uint64
-
-func pipeName() string {
- return `\\.\pipe\go-os-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
-}
-
-func createPipe(t *testing.T, name string, inherit bool) *os.File {
- t.Helper()
- wname, err := syscall.UTF16PtrFromString(name)
- if err != nil {
- t.Fatal(err)
- }
- flags := windows.PIPE_ACCESS_DUPLEX | syscall.FILE_FLAG_OVERLAPPED
- typ := windows.PIPE_TYPE_BYTE
- sa := &syscall.SecurityAttributes{
- Length: uint32(unsafe.Sizeof(syscall.SecurityAttributes{})),
- }
- if inherit {
- sa.InheritHandle = 1
- }
- rh, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, sa)
- if err != nil {
- t.Fatal(err)
- }
- return os.NewFile(uintptr(rh), name)
-}
-
func TestStdinOverlappedPipe(t *testing.T) {
// Test that we can read from a named pipe open with FILE_FLAG_OVERLAPPED.
// See https://go.dev/issue/15388.
name := pipeName()
// Create the read handle inherited by the child process.
- r := createPipe(t, name, true)
+ r := newPipe(t, name, false, true)
defer r.Close()
// Create a write handle.
t.Fatalf("output %q does not contain %q", got, want)
}
}
+
+func newFileOverlapped(t testing.TB, name string, overlapped bool) *os.File {
+ namep, err := syscall.UTF16PtrFromString(name)
+ if err != nil {
+ t.Fatal(err)
+ }
+ flags := syscall.FILE_ATTRIBUTE_NORMAL
+ if overlapped {
+ flags |= syscall.FILE_FLAG_OVERLAPPED
+ }
+ h, err := syscall.CreateFile(namep,
+ syscall.GENERIC_READ|syscall.GENERIC_WRITE,
+ syscall.FILE_SHARE_WRITE|syscall.FILE_SHARE_READ,
+ nil, syscall.OPEN_ALWAYS, uint32(flags), 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ f := os.NewFile(uintptr(h), name)
+ t.Cleanup(func() {
+ if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
+ t.Fatal(err)
+ }
+ })
+ return f
+}
+
+var currentProcess = sync.OnceValue(func() string {
+ // Convert the process ID to a string.
+ return strconv.FormatUint(uint64(os.Getpid()), 10)
+})
+
+var pipeCounter atomic.Uint64
+
+func newBytePipe(t testing.TB, name string, overlapped bool) *os.File {
+ return newPipe(t, name, false, overlapped)
+}
+
+func newMessagePipe(t testing.TB, name string, overlapped bool) *os.File {
+ return newPipe(t, name, true, overlapped)
+}
+
+func pipeName() string {
+ return `\\.\pipe\go-os-test-` + currentProcess() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
+}
+
+func newPipe(t testing.TB, name string, message, overlapped bool) *os.File {
+ wname, err := syscall.UTF16PtrFromString(name)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Create the read handle.
+ flags := windows.PIPE_ACCESS_DUPLEX
+ if overlapped {
+ flags |= syscall.FILE_FLAG_OVERLAPPED
+ }
+ typ := windows.PIPE_TYPE_BYTE | windows.PIPE_READMODE_BYTE
+ if message {
+ typ = windows.PIPE_TYPE_MESSAGE | windows.PIPE_READMODE_MESSAGE
+ }
+ h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ f := os.NewFile(uintptr(h), name)
+ t.Cleanup(func() {
+ if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
+ t.Fatal(err)
+ }
+ })
+ return f
+}
+
+func testReadWrite(t *testing.T, fdr, fdw *os.File) {
+ write := make(chan string, 1)
+ read := make(chan struct{}, 1)
+ go func() {
+ for s := range write {
+ n, err := fdw.Write([]byte(s))
+ read <- struct{}{}
+ if err != nil {
+ t.Error(err)
+ }
+ if n != len(s) {
+ t.Errorf("expected to write %d bytes, got %d", len(s), n)
+ }
+ }
+ }()
+ for i := range 10 {
+ s := strconv.Itoa(i)
+ write <- s
+ <-read
+ buf := make([]byte, len(s))
+ _, err := io.ReadFull(fdr, buf)
+ if err != nil {
+ t.Fatalf("read failed: %v", err)
+ }
+ if !bytes.Equal(buf, []byte(s)) {
+ t.Fatalf("expected %q, got %q", s, buf)
+ }
+ }
+ close(read)
+ close(write)
+}
+
+func testPreadPwrite(t *testing.T, fdr, fdw *os.File) {
+ type op struct {
+ s string
+ off int64
+ }
+ write := make(chan op, 1)
+ read := make(chan struct{}, 1)
+ go func() {
+ for o := range write {
+ n, err := fdw.WriteAt([]byte(o.s), o.off)
+ read <- struct{}{}
+ if err != nil {
+ t.Error(err)
+ }
+ if n != len(o.s) {
+ t.Errorf("expected to write %d bytes, got %d", len(o.s), n)
+ }
+ }
+ }()
+ for i := range 10 {
+ off := int64(i % 3) // exercise some back and forth
+ s := strconv.Itoa(i)
+ write <- op{s, off}
+ <-read
+ buf := make([]byte, len(s))
+ n, err := fdr.ReadAt(buf, off)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if n != len(s) {
+ t.Fatalf("expected to read %d bytes, got %d", len(s), n)
+ }
+ if !bytes.Equal(buf, []byte(s)) {
+ t.Fatalf("expected %q, got %q", s, buf)
+ }
+ }
+ close(read)
+ close(write)
+}
+
+func testFileReadEOF(t *testing.T, f *os.File) {
+ end, err := f.Seek(0, io.SeekEnd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ var buf [1]byte
+ n, err := f.Read(buf[:])
+ if err != nil && err != io.EOF {
+ t.Errorf("expected EOF, got %v", err)
+ }
+ if n != 0 {
+ t.Errorf("expected 0 bytes, got %d", n)
+ }
+
+ n, err = f.ReadAt(buf[:], end)
+ if err != nil && err != io.EOF {
+ t.Errorf("expected EOF, got %v", err)
+ }
+ if n != 0 {
+ t.Errorf("expected 0 bytes, got %d", n)
+ }
+}
+
+func TestFile(t *testing.T) {
+ t.Parallel()
+ tests := []struct {
+ name string
+ overlappedRead bool
+ overlappedWrite bool
+ }{
+ {"overlapped", true, true},
+ {"overlapped-read", true, false},
+ {"overlapped-write", false, true},
+ {"sync", false, false},
+ {"sync-pollable", false, false},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+ name := filepath.Join(t.TempDir(), "foo")
+ rh := newFileOverlapped(t, name, tt.overlappedRead)
+ wh := newFileOverlapped(t, name, tt.overlappedWrite)
+ testReadWrite(t, rh, wh)
+ testPreadPwrite(t, rh, wh)
+ testFileReadEOF(t, rh)
+ })
+ }
+}
+
+func TestPipe(t *testing.T) {
+ t.Parallel()
+ r, w, err := os.Pipe()
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ if err := r.Close(); err != nil {
+ t.Fatal(err)
+ }
+ if err := w.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }()
+ testReadWrite(t, r, w)
+}
+
+func TestNamedPipe(t *testing.T) {
+ t.Parallel()
+ tests := []struct {
+ name string
+ overlappedRead bool
+ overlappedWrite bool
+ pollable bool
+ }{
+ {"overlapped", true, true, true},
+ {"overlapped-write", false, true, true},
+ {"overlapped-read", true, false, true},
+ {"sync", false, false, false},
+ {"sync-pollable", false, false, true},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Parallel()
+ name := pipeName()
+ pipe := newBytePipe(t, name, tt.overlappedWrite)
+ file := newFileOverlapped(t, name, tt.overlappedRead)
+ testReadWrite(t, pipe, file)
+ })
+ }
+}
+
+func TestPipeMessageReadEOF(t *testing.T) {
+ t.Parallel()
+ name := pipeName()
+ pipe := newMessagePipe(t, name, true)
+ file := newFileOverlapped(t, name, true)
+
+ _, err := pipe.Write(nil)
+ if err != nil {
+ t.Error(err)
+ }
+
+ var buf [10]byte
+ n, err := file.Read(buf[:])
+ if err != io.EOF {
+ t.Errorf("expected EOF, got %v", err)
+ }
+ if n != 0 {
+ t.Errorf("expected 0 bytes, got %d", n)
+ }
+}
+
+func TestPipeClosedEOF(t *testing.T) {
+ t.Parallel()
+ name := pipeName()
+ pipe := newBytePipe(t, name, true)
+ file := newFileOverlapped(t, name, true)
+
+ pipe.Close()
+
+ var buf [10]byte
+ n, err := file.Read(buf[:])
+ if err != io.EOF {
+ t.Errorf("expected EOF, got %v", err)
+ }
+ if n != 0 {
+ t.Errorf("expected 0 bytes, got %d", n)
+ }
+}
+
+func TestPipeReadTimeout(t *testing.T) {
+ t.Parallel()
+ name := pipeName()
+ _ = newBytePipe(t, name, true)
+ file := newFileOverlapped(t, name, true)
+
+ err := file.SetReadDeadline(time.Now().Add(time.Millisecond))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var buf [10]byte
+ _, err = file.Read(buf[:])
+ if !errors.Is(err, os.ErrDeadlineExceeded) {
+ t.Errorf("expected deadline exceeded, got %v", err)
+ }
+}
+
+func TestPipeCanceled(t *testing.T) {
+ t.Parallel()
+ name := pipeName()
+ _ = newBytePipe(t, name, true)
+ file := newFileOverlapped(t, name, true)
+ ch := make(chan struct{}, 1)
+ go func() {
+ for {
+ select {
+ case <-ch:
+ return
+ default:
+ sc, err := file.SyscallConn()
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if err := sc.Control(func(fd uintptr) {
+ syscall.CancelIo(syscall.Handle(fd))
+ }); err != nil {
+ t.Error(err)
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+ }()
+ // Try to cancel for max 1 second.
+ // Canceling is normally really fast, but it can take an
+ // arbitrary amount of time on busy systems.
+ // If it takes too long, we skip the test.
+ file.SetReadDeadline(time.Now().Add(1 * time.Second))
+ var tmp [1]byte
+ // Read will block until the cancel is complete.
+ _, err := file.Read(tmp[:])
+ ch <- struct{}{}
+ if errors.Is(err, os.ErrDeadlineExceeded) {
+ t.Skip("took too long to cancel")
+ }
+ if !errors.Is(err, syscall.ERROR_OPERATION_ABORTED) {
+ t.Errorf("expected ERROR_OPERATION_ABORTED, got %v", err)
+ }
+}
+
+func TestPipeExternalIOCP(t *testing.T) {
+ // Test that a caller can associate an overlapped handle to an external IOCP
+ // even when the handle is also associated to a poll.FD. Also test that
+ // the FD can still perform I/O after the association.
+ t.Parallel()
+ name := pipeName()
+ pipe := newMessagePipe(t, name, true)
+ _ = newFileOverlapped(t, name, true) // Just open a pipe client
+
+ sc, err := pipe.SyscallConn()
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if err := sc.Control(func(fd uintptr) {
+ _, err := windows.CreateIoCompletionPort(syscall.Handle(fd), 0, 0, 1)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }); err != nil {
+ t.Error(err)
+ }
+
+ _, err = pipe.Write([]byte("hello"))
+ if err != nil {
+ t.Fatal(err)
+ }
+}
}
func newDirFile(fd syscall.Handle, name string) (*File, error) {
- return newFile(fd, name, "file"), nil
+ return newFile(fd, name, "file", false), nil
}
if err != nil {
return nil, &PathError{Op: "openat", Path: name, Err: err}
}
- return newFile(fd, joinPath(root.Name(), name), "file"), nil
+ // openat always returns a non-blocking handle.
+ return newFile(fd, joinPath(root.Name(), name), "file", false), nil
}
func openat(dirfd syscall.Handle, name string, flag int, perm FileMode) (syscall.Handle, error) {