// operation contains superset of data necessary to perform all async IO.
type operation struct {
// Used by IOCP interface, it must be first field
- // of the struct, as our code rely on it.
+ // of the struct, as our code relies on it.
o syscall.Overlapped
// fields used by runtime.netpoll
bufs []syscall.WSABuf
}
+func (o *operation) setEvent() {
+ h, err := windows.CreateEvent(nil, 0, 0, nil)
+ if err != nil {
+ // This shouldn't happen when all CreateEvent arguments are zero.
+ panic(err)
+ }
+ // Set the low bit so that the external IOCP doesn't receive the completion packet.
+ o.o.HEvent = h | 1
+}
+
func (o *operation) overlapped() *syscall.Overlapped {
if o.fd.isBlocking {
// Don't return the overlapped object if the file handle
// waitIO waits for the IO operation o to complete.
func waitIO(o *operation) error {
+ if o.fd.isBlocking {
+ panic("can't wait on blocking operations")
+ }
fd := o.fd
if !fd.pd.pollable() {
// The overlapped handle is not added to the runtime poller,
- // the only way to wait for the IO to complete is block.
- _, err := syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE)
+ // the only way to wait for the IO to complete is block until
+ // the overlapped event is signaled.
+ _, err := syscall.WaitForSingleObject(o.o.HEvent, syscall.INFINITE)
return err
}
// Wait for our request to complete.
return 0, err
}
// Start IO.
+ if !fd.isBlocking && o.o.HEvent == 0 && !fd.pd.pollable() {
+ // If the handle is opened for overlapped IO but we can't
+ // use the runtime poller, then we need to use an
+ // event to wait for the IO to complete.
+ o.setEvent()
+ }
o.qty = 0
o.flags = 0
err = submit(o)
var waitErr error
- if err == syscall.ERROR_IO_PENDING || (err == nil && !o.fd.skipSyncNotif) {
+ // Blocking operations shouldn't return ERROR_IO_PENDING.
+ // Continue without waiting if that happens.
+ if !o.fd.isBlocking && (err == syscall.ERROR_IO_PENDING || (err == nil && !o.fd.skipSyncNotif)) {
// IO started asynchronously or completed synchronously but
// a sync notification is required. Wait for it to complete.
waitErr = waitIO(o)
// so it is safe to add handles owned by the caller.
fd.initIOErr = fd.pd.init(fd)
if fd.initIOErr != nil {
- // This can happen if the handle is already associated
- // with another IOCP or if the isBlocking flag is incorrect.
- // In both cases, fallback to synchronous IO.
- fd.isBlocking = true
- fd.skipSyncNotif = true
return
}
fd.rop.runtimeCtx = fd.pd.runtimeCtx
}
fd.isFile = fd.kind != kindNet
fd.isBlocking = !pollable
- fd.skipSyncNotif = fd.isBlocking
fd.rop.mode = 'r'
fd.wop.mode = 'w'
fd.rop.fd = fd
}
}
-func TestPipeExternalIOCP(t *testing.T) {
+func iocpAssociateFile(f *os.File, iocp syscall.Handle) error {
+ sc, err := f.SyscallConn()
+ if err != nil {
+ return err
+ }
+ var syserr error
+ err = sc.Control(func(fd uintptr) {
+ if _, err = windows.CreateIoCompletionPort(syscall.Handle(fd), iocp, 0, 0); err != nil {
+ syserr = err
+ }
+ })
+ if err == nil {
+ err = syserr
+ }
+ return err
+}
+
+func TestFileAssociatedWithExternalIOCP(t *testing.T) {
// Test that a caller can associate an overlapped handle to an external IOCP
- // even when the handle is also associated to a poll.FD. Also test that
- // the FD can still perform I/O after the association.
+ // after the handle has been passed to os.NewFile.
+ // Also test that the File can perform I/O after it is associated with the
+ // external IOCP and that those operations do not post to the external IOCP.
t.Parallel()
name := pipeName()
pipe := newMessagePipe(t, name, true)
- _ = newFileOverlapped(t, name, true) // Just open a pipe client
+ _ = newFileOverlapped(t, name, true) // just open a pipe client
+
+ // Use a file to exercise WriteAt.
+ file := newFileOverlapped(t, filepath.Join(t.TempDir(), "a"), true)
- sc, err := pipe.SyscallConn()
+ iocp, err := windows.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 0)
if err != nil {
- t.Error(err)
- return
+ t.Fatal(err)
}
- if err := sc.Control(func(fd uintptr) {
- _, err := windows.CreateIoCompletionPort(syscall.Handle(fd), 0, 0, 1)
- if err != nil {
+ defer func() {
+ if iocp == syscall.InvalidHandle {
+ // Already closed at the end of the test.
+ return
+ }
+ if err := syscall.CloseHandle(iocp); err != nil {
t.Fatal(err)
}
- }); err != nil {
- t.Error(err)
+ }()
+
+ ch := make(chan error, 1)
+ go func() {
+ var bytes, key uint32
+ var overlapped *syscall.Overlapped
+ err := syscall.GetQueuedCompletionStatus(syscall.Handle(iocp), &bytes, &key, &overlapped, syscall.INFINITE)
+ ch <- err
+ }()
+
+ if err := iocpAssociateFile(pipe, iocp); err != nil {
+ t.Fatal(err)
+ }
+ if err := iocpAssociateFile(file, iocp); err != nil {
+ t.Fatal(err)
}
- _, err = pipe.Write([]byte("hello"))
- if err != nil {
+ if _, err := pipe.Write([]byte("hello")); err != nil {
+ t.Fatal(err)
+ }
+ if _, err := file.Write([]byte("hello")); err != nil {
t.Fatal(err)
}
+ if _, err := file.WriteAt([]byte("hello"), 0); err != nil {
+ t.Fatal(err)
+ }
+
+ // Wait fot he goroutine to call GetQueuedCompletionStatus.
+ time.Sleep(100 * time.Millisecond)
+
+ // Trigger ERROR_ABANDONED_WAIT_0.
+ if err := syscall.CloseHandle(iocp); err != nil {
+ t.Fatal(err)
+ }
+
+ // Wait for the completion to be posted to the IOCP.
+ err = <-ch
+ iocp = syscall.InvalidHandle
+ const ERROR_ABANDONED_WAIT_0 = syscall.Errno(735)
+ switch err {
+ case ERROR_ABANDONED_WAIT_0:
+ // This is what we expect.
+ case nil:
+ t.Error("unexpected queued completion")
+ default:
+ t.Error(err)
+ }
}