}
}
+// waitIO waits for the IO operation o to complete.
+func waitIO(o *operation) error {
+ fd := o.fd
+ if !fd.pd.pollable() {
+ // The overlapped handle is not added to the runtime poller,
+ // the only way to wait for the IO to complete is block.
+ _, err := syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE)
+ return err
+ }
+ // Wait for our request to complete.
+ err := fd.pd.wait(int(o.mode), fd.isFile)
+ switch err {
+ case nil, ErrNetClosing, ErrFileClosing, ErrDeadlineExceeded:
+ // No other error is expected.
+ default:
+ panic("unexpected runtime.netpoll error: " + err.Error())
+ }
+ return err
+}
+
+// cancelIO cancels the IO operation o and waits for it to complete.
+func cancelIO(o *operation) {
+ fd := o.fd
+ if !fd.pd.pollable() {
+ return
+ }
+ // Cancel our request.
+ err := syscall.CancelIoEx(fd.Sysfd, &o.o)
+ // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
+ if err != nil && err != syscall.ERROR_NOT_FOUND {
+ // TODO(brainman): maybe do something else, but panic.
+ panic(err)
+ }
+ fd.pd.waitCanceled(int(o.mode))
+}
+
// execIO executes a single IO operation o.
// It supports both synchronous and asynchronous IO.
+// o.qty and o.flags are set to zero before calling submit
+// to avoid reusing the values from a previous call.
func execIO(o *operation, submit func(o *operation) error) (int, error) {
fd := o.fd
fd.initIO()
if err != nil {
return 0, err
}
- getOverlappedResult := func() (int, error) {
+ // Start IO.
+ o.qty = 0
+ o.flags = 0
+ err = submit(o)
+ var waitErr error
+ if err == syscall.ERROR_IO_PENDING || (err == nil && !o.fd.skipSyncNotif) {
+ // IO started asynchronously or completed synchronously but
+ // a sync notification is required. Wait for it to complete.
+ waitErr = waitIO(o)
+ if waitErr != nil {
+ // IO interrupted by "close" or "timeout".
+ cancelIO(o)
+ // We issued a cancellation request, but the IO operation may still succeeded
+ // before the cancellation request runs.
+ }
if fd.isFile {
err = windows.GetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false)
} else {
err = windows.WSAGetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false, &o.flags)
}
- switch err {
- case nil:
- return int(o.qty), nil
- case syscall.ERROR_HANDLE_EOF:
- // EOF reached.
- return int(o.qty), io.EOF
- case syscall.ERROR_MORE_DATA, windows.WSAEMSGSIZE:
- // More data available. Return back the size of received data.
- return int(o.qty), err
- default:
- return 0, err
- }
- }
- // Start IO.
- err = submit(o)
- if !fd.pd.pollable() {
- if err == syscall.ERROR_IO_PENDING {
- // The overlapped handle is not added to the runtime poller,
- // the only way to wait for the IO to complete is block.
- _, err = syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE)
- if err == nil {
- return getOverlappedResult()
- }
- }
- if err != nil {
- return 0, err
- }
- return int(o.qty), nil
- }
- switch err {
- case nil:
- // IO completed immediately
- if o.fd.skipSyncNotif {
- // No completion message will follow, so return immediately.
- return int(o.qty), nil
- }
- // Need to get our completion message anyway.
- case syscall.ERROR_IO_PENDING:
- // IO started, and we have to wait for its completion.
- err = nil
- default:
- return 0, err
}
- // Wait for our request to complete.
- err = fd.pd.wait(int(o.mode), fd.isFile)
- if err == nil {
- // All is good. Extract our IO results and return.
- return getOverlappedResult()
- }
- // IO is interrupted by "close" or "timeout"
- netpollErr := err
- switch netpollErr {
- case ErrNetClosing, ErrFileClosing, ErrDeadlineExceeded:
- // will deal with those.
- default:
- panic("unexpected runtime.netpoll error: " + netpollErr.Error())
- }
- // Cancel our request.
- err = syscall.CancelIoEx(fd.Sysfd, &o.o)
- // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
- if err != nil && err != syscall.ERROR_NOT_FOUND {
- // TODO(brainman): maybe do something else, but panic.
- panic(err)
- }
- // Wait for cancellation to complete.
- fd.pd.waitCanceled(int(o.mode))
- n, err := getOverlappedResult()
- if err != nil {
- if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
- err = netpollErr
+ // ERROR_OPERATION_ABORTED may have been caused by us. In that case,
+ // map it to our own error. Don't do more than that, each submitted
+ // function may have its own meaning for each error.
+ if err == syscall.ERROR_OPERATION_ABORTED {
+ if waitErr != nil {
+ // IO canceled by the poller while waiting for completion.
+ err = waitErr
+ } else if fd.kind == kindPipe && fd.closing() {
+ // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
+ // If the fd is a pipe and the Write was interrupted by CancelIoEx,
+ // we assume it is interrupted by Close.
+ err = errClosing(fd.isFile)
}
- return n, err
}
- // We issued a cancellation request. But, it seems, IO operation succeeded
- // before the cancellation request run. We need to treat the IO operation as
- // succeeded (the bytes are actually sent/recv from network).
- return n, nil
+ return int(o.qty), err
}
// FD is a file descriptor. The net and os packages embed this type in
kindPipe
)
-// logInitFD is set by tests to enable file descriptor initialization logging.
-var logInitFD func(net int, fd *FD, err error)
-
func (fd *FD) initIO() error {
fd.initIOOnce.Do(func() {
if fd.initPollable {
fd.initPollable = false
}
}
- if logInitFD != nil {
- logInitFD(int(fd.kind), fd, fd.initIOErr)
- }
if !fd.initPollable {
// Handle opened for overlapped I/O (aka non-blocking) that are not added
// to the runtime poller need special handling when reading and writing.
- var info windows.FILE_MODE_INFORMATION
- if err := windows.NtQueryInformationFile(fd.Sysfd, &windows.IO_STATUS_BLOCK{}, unsafe.Pointer(&info), uint32(unsafe.Sizeof(info)), windows.FileModeInformation); err == nil {
- fd.isBlocking = info.Mode&(windows.FILE_SYNCHRONOUS_IO_ALERT|windows.FILE_SYNCHRONOUS_IO_NONALERT) != 0
- } else {
- // If we fail to get the file mode information, assume the file is blocking.
- fd.isBlocking = true
- }
+ // If we fail to get the file mode information, assume the file is blocking.
+ overlapped, _ := windows.IsNonblock(fd.Sysfd)
+ fd.isBlocking = !overlapped
+ fd.skipSyncNotif = true
} else {
fd.rop.runtimeCtx = fd.pd.runtimeCtx
fd.wop.runtimeCtx = fd.pd.runtimeCtx
err := syscall.SetFileCompletionNotificationModes(fd.Sysfd,
syscall.FILE_SKIP_SET_EVENT_ON_HANDLE|syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
)
- if err == nil {
- fd.skipSyncNotif = true
- }
+ fd.skipSyncNotif = err == nil
}
}
})
// handles and that cares about handle IOCP association errors.
// We can should do the IOCP association here.
return fd.initIO()
- } else {
- if logInitFD != nil {
- // For testing.
- logInitFD(int(fd.kind), fd, nil)
- }
}
return nil
}
return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
})
fd.addOffset(n)
- if err == syscall.ERROR_HANDLE_EOF {
+ switch err {
+ case syscall.ERROR_HANDLE_EOF:
err = io.EOF
- }
- if fd.kind == kindPipe && err != nil {
- switch err {
- case syscall.ERROR_BROKEN_PIPE:
- // Returned by pipes when the other end is closed.
- err = nil
- case syscall.ERROR_OPERATION_ABORTED:
- if fd.closing() {
- // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
- // If the fd is a pipe and the Read was interrupted by CancelIoEx,
- // we assume it is interrupted by Close.
- err = ErrFileClosing
- }
+ case syscall.ERROR_BROKEN_PIPE:
+ // ReadFile only documents ERROR_BROKEN_PIPE for pipes.
+ if fd.kind == kindPipe {
+ err = io.EOF
}
}
case kindNet:
n, err := execIO(o, func(o *operation) error {
return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, &o.o)
})
- if err != nil {
- if err == syscall.ERROR_HANDLE_EOF {
- err = io.EOF
- }
+ if err == syscall.ERROR_HANDLE_EOF {
+ err = io.EOF
}
if len(b) != 0 {
err = fd.eofError(n, err)
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
})
fd.addOffset(n)
- if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED && fd.closing() {
- // Close uses CancelIoEx to interrupt concurrent I/O for pipes.
- // If the fd is a pipe and the Write was interrupted by CancelIoEx,
- // we assume it is interrupted by Close.
- err = ErrFileClosing
- }
case kindNet:
if race.Enabled {
race.ReleaseMerge(unsafe.Pointer(&ioSync))
// socket is readable. h/t https://stackoverflow.com/a/42019668/332798
o := &fd.rop
o.InitBuf(nil)
- if !fd.IsStream {
- o.flags |= windows.MSG_PEEK
- }
_, err := execIO(o, func(o *operation) error {
+ if !fd.IsStream {
+ o.flags |= windows.MSG_PEEK
+ }
return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
})
if err == windows.WSAEMSGSIZE {
import (
"bytes"
"errors"
- "fmt"
"internal/poll"
"internal/syscall/windows"
"io"
"unsafe"
)
-type loggedFD struct {
- Net int
- FD *poll.FD
- Err error
-}
-
-var (
- logMu sync.Mutex
- loggedFDs map[syscall.Handle]*loggedFD
-)
-
-func logFD(net int, fd *poll.FD, err error) {
- logMu.Lock()
- defer logMu.Unlock()
-
- loggedFDs[fd.Sysfd] = &loggedFD{
- Net: net,
- FD: fd,
- Err: err,
- }
-}
-
func init() {
- loggedFDs = make(map[syscall.Handle]*loggedFD)
- *poll.LogInitFD = logFD
-
poll.InitWSA()
}
-func findLoggedFD(h syscall.Handle) (lfd *loggedFD, found bool) {
- logMu.Lock()
- defer logMu.Unlock()
-
- lfd, found = loggedFDs[h]
- return lfd, found
-}
-
// checkFileIsNotPartOfNetpoll verifies that f is not managed by netpoll.
-// It returns error, if check fails.
-func checkFileIsNotPartOfNetpoll(f *os.File) error {
- lfd, found := findLoggedFD(syscall.Handle(f.Fd()))
- if !found {
- return fmt.Errorf("%v fd=%v: is not found in the log", f.Name(), f.Fd())
+func checkFileIsNotPartOfNetpoll(t *testing.T, f *os.File) {
+ t.Helper()
+ sc, err := f.SyscallConn()
+ if err != nil {
+ t.Fatal(err)
}
- if lfd.FD.IsPartOfNetpoll() {
- return fmt.Errorf("%v fd=%v: is part of netpoll, but should not be (logged: net=%v err=%v)", f.Name(), f.Fd(), lfd.Net, lfd.Err)
+ if err := sc.Control(func(fd uintptr) {
+ // Only try to associate the file with an IOCP if the handle is opened for overlapped I/O,
+ // else the association will always fail.
+ overlapped, err := windows.IsNonblock(syscall.Handle(fd))
+ if err != nil {
+ t.Fatalf("%v fd=%v: %v", f.Name(), fd, err)
+ }
+ if overlapped {
+ // If the file is part of netpoll, then associating it with another IOCP should fail.
+ if _, err := windows.CreateIoCompletionPort(syscall.Handle(fd), 0, 0, 1); err != nil {
+ t.Fatalf("%v fd=%v: is part of netpoll, but should not be: %v", f.Name(), fd, err)
+ }
+ }
+ }); err != nil {
+ t.Fatalf("%v fd=%v: is not initialized", f.Name(), f.Fd())
}
- return nil
}
func TestFileFdsAreInitialised(t *testing.T) {
+ t.Parallel()
exe, err := os.Executable()
if err != nil {
t.Fatal(err)
}
defer f.Close()
- err = checkFileIsNotPartOfNetpoll(f)
- if err != nil {
- t.Fatal(err)
- }
+ checkFileIsNotPartOfNetpoll(t, f)
}
func TestSerialFdsAreInitialised(t *testing.T) {
+ t.Parallel()
for _, name := range []string{"COM1", "COM2", "COM3", "COM4"} {
t.Run(name, func(t *testing.T) {
+ t.Parallel()
h, err := syscall.CreateFile(syscall.StringToUTF16Ptr(name),
syscall.GENERIC_READ|syscall.GENERIC_WRITE,
0,
f := os.NewFile(uintptr(h), name)
defer f.Close()
- err = checkFileIsNotPartOfNetpoll(f)
- if err != nil {
- t.Fatal(err)
- }
+ checkFileIsNotPartOfNetpoll(t, f)
})
}
}
func TestWSASocketConflict(t *testing.T) {
+ t.Parallel()
s, err := windows.WSASocket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP, nil, 0, windows.WSA_FLAG_OVERLAPPED)
if err != nil {
t.Fatal(err)
err := fd.Init(kind, pollable)
if overlapped && err != nil {
// Overlapped file handles should not error.
+ fd.Close()
t.Fatal(err)
}
+ t.Cleanup(func() {
+ fd.Close()
+ })
return &fd
}
if err != nil {
t.Fatal(err)
}
- t.Cleanup(func() {
- if err := syscall.CloseHandle(h); err != nil {
- t.Fatal(err)
- }
- })
typ, err := syscall.GetFileType(h)
if err != nil {
+ syscall.CloseHandle(h)
t.Fatal(err)
}
kind := "file"
if overlapped {
flags |= syscall.FILE_FLAG_OVERLAPPED
}
- typ := windows.PIPE_TYPE_BYTE
+ typ := windows.PIPE_TYPE_BYTE | windows.PIPE_READMODE_BYTE
if message {
- typ = windows.PIPE_TYPE_MESSAGE
+ typ = windows.PIPE_TYPE_MESSAGE | windows.PIPE_READMODE_MESSAGE
}
h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
if err != nil {
t.Fatal(err)
}
- t.Cleanup(func() {
- if err := syscall.CloseHandle(h); err != nil {
- t.Fatal(err)
- }
- })
return newFD(t, h, "pipe", overlapped, pollable)
}
close(write)
}
+func testFileReadEOF(t *testing.T, f *poll.FD) {
+ end, err := f.Seek(0, io.SeekEnd)
+ if err != nil {
+ t.Fatal(err)
+ }
+ var buf [1]byte
+ n, err := f.Read(buf[:])
+ if err != nil && err != io.EOF {
+ t.Errorf("expected EOF, got %v", err)
+ }
+ if n != 0 {
+ t.Errorf("expected 0 bytes, got %d", n)
+ }
+
+ n, err = f.Pread(buf[:], end)
+ if err != nil && err != io.EOF {
+ t.Errorf("expected EOF, got %v", err)
+ }
+ if n != 0 {
+ t.Errorf("expected 0 bytes, got %d", n)
+ }
+}
+
func TestFile(t *testing.T) {
t.Parallel()
tests := []struct {
wh := newFile(t, name, tt.overlappedWrite, tt.pollable)
testReadWrite(t, rh, wh)
testPreadPwrite(t, rh, wh)
+ testFileReadEOF(t, rh)
})
}
}
})
}
-func TestPipeWriteEOF(t *testing.T) {
+func TestPipeMessageReadEOF(t *testing.T) {
t.Parallel()
name := pipeName()
- pipe := newMessagePipe(t, name, false, true)
- file := newFile(t, name, false, true)
- read := make(chan struct{}, 1)
- go func() {
- _, err := pipe.Write(nil)
- read <- struct{}{}
- if err != nil {
- t.Error(err)
- }
- }()
- <-read
+ pipe := newMessagePipe(t, name, true, true)
+ file := newFile(t, name, true, true)
+
+ _, err := pipe.Write(nil)
+ if err != nil {
+ t.Error(err)
+ }
+
+ var buf [10]byte
+ n, err := file.Read(buf[:])
+ if err != io.EOF {
+ t.Errorf("expected EOF, got %v", err)
+ }
+ if n != 0 {
+ t.Errorf("expected 0 bytes, got %d", n)
+ }
+}
+
+func TestPipeClosedEOF(t *testing.T) {
+ t.Parallel()
+ name := pipeName()
+ pipe := newBytePipe(t, name, true, false)
+ file := newFile(t, name, true, true)
+
+ pipe.Close()
+
var buf [10]byte
n, err := file.Read(buf[:])
if err != io.EOF {