}
func (mu *XFDMutex) RWLock(read bool) bool {
- return mu.rwlock(read)
+ return mu.rwlock(read, waitLock)
}
func (mu *XFDMutex) RWUnlock(read bool) bool {
mutexWMask = (1<<20 - 1) << 43
)
+const (
+ readlock = true
+ writeLock = false
+ waitLock = true
+ tryLock = false
+)
+
const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
-// Read operations must do rwlock(true)/rwunlock(true).
+// Read operations must do rwlock(readlock, waitLock)/rwunlock(readlock).
//
-// Write operations must do rwlock(false)/rwunlock(false).
+// Write operations must do rwlock(writeLock, waitLock)/rwunlock(writeLock).
//
// Misc operations must do incref/decref.
// Misc operations include functions like setsockopt and setDeadline.
// lock adds a reference to mu and locks mu.
// It reports whether mu is available for reading or writing.
-func (mu *fdMutex) rwlock(read bool) bool {
+// If wait is false, lock fails immediately if mu is not available.
+func (mu *fdMutex) rwlock(read bool, wait bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
}
} else {
// Wait for lock.
+ if !wait {
+ return false
+ }
new = old + mutexWait
if new&mutexMask == 0 {
panic(overflowMsg)
// readLock adds a reference to fd and locks fd for reading.
// It returns an error when fd cannot be used for reading.
func (fd *FD) readLock() error {
- if !fd.fdmu.rwlock(true) {
+ if !fd.fdmu.rwlock(readlock, waitLock) {
return errClosing(fd.isFile)
}
return nil
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) readUnlock() {
- if fd.fdmu.rwunlock(true) {
+ if fd.fdmu.rwunlock(readlock) {
fd.destroy()
}
}
// writeLock adds a reference to fd and locks fd for writing.
// It returns an error when fd cannot be used for writing.
func (fd *FD) writeLock() error {
- if !fd.fdmu.rwlock(false) {
+ if !fd.fdmu.rwlock(writeLock, waitLock) {
return errClosing(fd.isFile)
}
return nil
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) writeUnlock() {
- if fd.fdmu.rwunlock(false) {
+ if fd.fdmu.rwunlock(writeLock) {
fd.destroy()
}
}
// readWriteLock adds a reference to fd and locks fd for reading and writing.
// It returns an error when fd cannot be used for reading and writing.
func (fd *FD) readWriteLock() error {
- if !fd.fdmu.rwlock(true) {
+ if !fd.fdmu.rwlock(readlock, waitLock) {
return errClosing(fd.isFile)
}
- if !fd.fdmu.rwlock(false) {
- fd.fdmu.rwunlock(true) // unlock read lock acquired above
+ if !fd.fdmu.rwlock(writeLock, waitLock) {
+ fd.fdmu.rwunlock(readlock) // unlock read lock acquired above
return errClosing(fd.isFile)
}
return nil
}
+// tryReadWriteLock tries to add a reference to fd and lock fd for reading and writing.
+// It returns (false, nil) when fd is not available for reading and writing but is not closing.
+// It returns (false, errClosing) when fd is closing.
+func (fd *FD) tryReadWriteLock() (bool, error) {
+ if !fd.fdmu.rwlock(readlock, tryLock) {
+ if fd.closing() {
+ return false, errClosing(fd.isFile)
+ }
+ return false, nil
+ }
+ if !fd.fdmu.rwlock(writeLock, tryLock) {
+ fd.fdmu.rwunlock(readlock) // unlock read lock acquired above
+ if fd.closing() {
+ return false, errClosing(fd.isFile)
+ }
+ return false, nil
+ }
+ return true, nil
+}
+
// readWriteUnlock removes a reference from fd and unlocks fd for reading and writing.
// It also closes fd when the state of fd is set to closed and there
// is no remaining reference.
func (fd *FD) readWriteUnlock() {
- fd.fdmu.rwunlock(true)
- if fd.fdmu.rwunlock(false) {
+ fd.fdmu.rwunlock(readlock)
+ if fd.fdmu.rwunlock(writeLock) {
fd.destroy()
}
}
"io"
"runtime"
"sync"
- "sync/atomic"
"syscall"
"unicode/utf16"
"unicode/utf8"
// Whether FILE_FLAG_OVERLAPPED was not set when opening the file.
isBlocking bool
- disassociated atomic.Bool
+ disassociated bool
// readPinner and writePinner are automatically unpinned
// before execIO returns.
// pollable should be used instead of fd.pd.pollable(),
// as it is aware of the disassociated state.
func (fd *FD) pollable() bool {
- return fd.pd.pollable() && !fd.disassociated.Load()
+ return fd.pd.pollable() && !fd.disassociated
}
// fileKind describes the kind of file.
// DisassociateIOCP disassociates the file handle from the IOCP.
// The disassociate operation will not succeed if there is any
-// in-progress IO operation on the file handle.
+// in-progress I/O operation on the file handle.
func (fd *FD) DisassociateIOCP() error {
- if err := fd.incref(); err != nil {
+ // There is a small race window between execIO checking fd.disassociated and
+ // DisassociateIOCP setting it. NtSetInformationFile will fail anyway if
+ // there is any in-progress I/O operation, so just take a read-write lock
+ // to ensure there is no in-progress I/O and fail early if we can't get the lock.
+ if ok, err := fd.tryReadWriteLock(); err != nil || !ok {
+ if err == nil {
+ err = errors.New("can't disassociate the handle while there is in-progress I/O")
+ }
return err
}
- defer fd.decref()
+ defer fd.readWriteUnlock()
if fd.isBlocking || !fd.pollable() {
// Nothing to disassociate.
if err := windows.NtSetInformationFile(fd.Sysfd, &windows.IO_STATUS_BLOCK{}, unsafe.Pointer(&info), uint32(unsafe.Sizeof(info)), windows.FileReplaceCompletionInformation); err != nil {
return err
}
- fd.disassociated.Store(true)
+ // tryReadWriteLock means we have exclusive access to fd.
+ fd.disassociated = true
// Don't call fd.pd.close(), it would be too racy.
// There is no harm on leaving fd.pd open until Close is called.
return nil
}
func (fdmu *FDMutex) ReadLock() bool {
- return fdmu.fdmu.rwlock(true)
+ return fdmu.fdmu.rwlock(readlock, waitLock)
}
func (fdmu *FDMutex) ReadUnlock() bool {
- return fdmu.fdmu.rwunlock(true)
+ return fdmu.fdmu.rwunlock(readlock)
}
func (fdmu *FDMutex) WriteLock() bool {
- return fdmu.fdmu.rwlock(false)
+ return fdmu.fdmu.rwlock(writeLock, waitLock)
}
func (fdmu *FDMutex) WriteUnlock() bool {
- return fdmu.fdmu.rwunlock(false)
+ return fdmu.fdmu.rwunlock(writeLock)
}
name := pipeName()
// Create the read handle inherited by the child process.
- r := newPipe(t, name, false, true)
+ r := newPipe(t, name, 4096, false, true)
defer r.Close()
// Create a write handle.
var pipeCounter atomic.Uint64
func newBytePipe(t testing.TB, name string, overlapped bool) *os.File {
- return newPipe(t, name, false, overlapped)
+ return newPipe(t, name, 4096, false, overlapped)
}
func newMessagePipe(t testing.TB, name string, overlapped bool) *os.File {
- return newPipe(t, name, true, overlapped)
+ return newPipe(t, name, 4096, true, overlapped)
}
func pipeName() string {
return `\\.\pipe\go-os-test-` + currentProcess() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
}
-func newPipe(t testing.TB, name string, message, overlapped bool) *os.File {
+func newPipe(t testing.TB, name string, bufSize uint32, message, overlapped bool) *os.File {
wname, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
if message {
typ = windows.PIPE_TYPE_MESSAGE | windows.PIPE_READMODE_MESSAGE
}
- h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
+ h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, bufSize, bufSize, 0, nil)
if err != nil {
t.Fatal(err)
}
}
}
+func TestFileFdWithConcurrentIO(t *testing.T) {
+ t.Parallel()
+ name := pipeName()
+ pipe := newPipe(t, name, 0, true, true) // unbuffered pipe so Write blocks
+ file := newFileOverlapped(t, name, true)
+ const writeSize = 2
+ var wg sync.WaitGroup
+ wg.Go(func() {
+ // Ensure the Write is pending.
+ var tmp [writeSize / 2]byte
+ if _, err := file.Read(tmp[:]); err != nil {
+ t.Error(err)
+ }
+ // Try to dissaciate the file from any IOCP.
+ pipe.Fd()
+ // Complete the Write.
+ if _, err := file.Read(tmp[:]); err != nil {
+ t.Error(err)
+ }
+ })
+ // Write will block until the goroutine reads all 2 bytes.
+ var tmp [writeSize]byte
+ n, err := pipe.Write(tmp[:])
+ if err != nil {
+ t.Fatal(err)
+ }
+ if n != writeSize {
+ t.Fatalf("expected to write %d bytes, got %d", writeSize, n)
+ }
+ wg.Wait()
+
+ // Verify that the pipe is still associated with the Go runtime IOCP
+ // by trying to associate it with a new IOCP, which should fail.
+ iocp, err := windows.CreateIoCompletionPort(syscall.InvalidHandle, 0, 0, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer syscall.CloseHandle(iocp)
+ sc, err := pipe.SyscallConn()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := sc.Control(func(fd uintptr) {
+ _, err = windows.CreateIoCompletionPort(syscall.Handle(fd), iocp, 0, 0)
+ if err == nil {
+ t.Fatal("pipe should still be associated with the Go runtime IOCP")
+ }
+ }); err != nil {
+ t.Fatal(err)
+ }
+}
+
func TestSplitPath(t *testing.T) {
t.Parallel()
for _, tt := range []struct{ path, wantDir, wantBase string }{