This permits us to safely support concurrent access to files on Plan 9.
Concurrent access was already safe on other systems.
This does introduce a change: if one goroutine calls a blocking read
on a pipe, and another goroutine closes the pipe, then before this CL
the close would occur. Now the close will be delayed until the blocking
read completes.
Also add tests that concurrent I/O and Close on a pipe are OK.
For #50436
For #56043
Change-Id: I969c869ea3b8c5c2f2ef319e441a56a3c64e7bf5
Reviewed-on: https://go-review.googlesource.com/c/go/+/438347
Reviewed-by: Bryan Mills <bcmills@google.com>
Reviewed-by: Ian Lance Taylor <iant@google.com>
Reviewed-by: David du Colombier <0intro@gmail.com>
Run-TryBot: Ian Lance Taylor <iant@google.com>
Auto-Submit: Ian Lance Taylor <iant@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Rob Pike <r@golang.org>
var Consume = consume
-type FDMutex struct {
+type XFDMutex struct {
fdMutex
}
-func (mu *FDMutex) Incref() bool {
+func (mu *XFDMutex) Incref() bool {
return mu.incref()
}
-func (mu *FDMutex) IncrefAndClose() bool {
+func (mu *XFDMutex) IncrefAndClose() bool {
return mu.increfAndClose()
}
-func (mu *FDMutex) Decref() bool {
+func (mu *XFDMutex) Decref() bool {
return mu.decref()
}
-func (mu *FDMutex) RWLock(read bool) bool {
+func (mu *XFDMutex) RWLock(read bool) bool {
return mu.rwlock(read)
}
-func (mu *FDMutex) RWUnlock(read bool) bool {
+func (mu *XFDMutex) RWUnlock(read bool) bool {
return mu.rwunlock(read)
}
)
func TestMutexLock(t *testing.T) {
- var mu FDMutex
+ var mu XFDMutex
if !mu.Incref() {
t.Fatal("broken")
}
func TestMutexClose(t *testing.T) {
- var mu FDMutex
+ var mu XFDMutex
if !mu.IncrefAndClose() {
t.Fatal("broken")
}
func TestMutexCloseUnblock(t *testing.T) {
c := make(chan bool, 4)
- var mu FDMutex
+ var mu XFDMutex
mu.RWLock(true)
for i := 0; i < 4; i++ {
go func() {
f()
}
- var mu FDMutex
+ var mu XFDMutex
ensurePanics(func() { mu.Decref() })
ensurePanics(func() { mu.RWUnlock(true) })
ensurePanics(func() { mu.RWUnlock(false) })
}
}()
- var mu1 FDMutex
+ var mu1 XFDMutex
for i := 0; i < 1<<21; i++ {
mu1.Incref()
}
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
done := make(chan bool, P)
- var mu FDMutex
+ var mu XFDMutex
var readState [2]uint64
var writeState [2]uint64
for p := 0; p < P; p++ {
--- /dev/null
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package poll
+
+// Expose fdMutex for use by the os package on Plan 9.
+// On Plan 9 we don't want to use async I/O for file operations,
+// but we still want the locking semantics that fdMutex provides.
+
+// FDMutex is an exported fdMutex, only for Plan 9.
+type FDMutex struct {
+ fdmu fdMutex
+}
+
+func (fdmu *FDMutex) Incref() bool {
+ return fdmu.fdmu.incref()
+}
+
+func (fdmu *FDMutex) Decref() bool {
+ return fdmu.fdmu.decref()
+}
+
+func (fdmu *FDMutex) IncrefAndClose() bool {
+ return fdmu.fdmu.increfAndClose()
+}
+
+func (fdmu *FDMutex) ReadLock() bool {
+ return fdmu.fdmu.rwlock(true)
+}
+
+func (fdmu *FDMutex) ReadUnlock() bool {
+ return fdmu.fdmu.rwunlock(true)
+}
+
+func (fdmu *FDMutex) WriteLock() bool {
+ return fdmu.fdmu.rwlock(false)
+}
+
+func (fdmu *FDMutex) WriteUnlock() bool {
+ return fdmu.fdmu.rwunlock(false)
+}
--- /dev/null
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package os
+
+// File locking support for Plan 9. This uses fdMutex from the
+// internal/poll package.
+
+// incref adds a reference to the file. It returns an error if the file
+// is already closed. This method is on File so that we can incorporate
+// a nil test.
+func (f *File) incref(op string) (err error) {
+ if f == nil {
+ return ErrInvalid
+ }
+ if !f.fdmu.Incref() {
+ err = ErrClosed
+ if op != "" {
+ err = &PathError{Op: op, Path: f.name, Err: err}
+ }
+ }
+ return err
+}
+
+// decref removes a reference to the file. If this is the last
+// remaining reference, and the file has been marked to be closed,
+// then actually close it.
+func (file *file) decref() error {
+ if file.fdmu.Decref() {
+ return file.destroy()
+ }
+ return nil
+}
+
+// readLock adds a reference to the file and locks it for reading.
+// It returns an error if the file is already closed.
+func (file *file) readLock() error {
+ if !file.fdmu.ReadLock() {
+ return ErrClosed
+ }
+ return nil
+}
+
+// readUnlock removes a reference from the file and unlocks it for reading.
+// It also closes the file if it marked as closed and there is no remaining
+// reference.
+func (file *file) readUnlock() {
+ if file.fdmu.ReadUnlock() {
+ file.destroy()
+ }
+}
+
+// writeLock adds a reference to the file and locks it for writing.
+// It returns an error if the file is already closed.
+func (file *file) writeLock() error {
+ if !file.fdmu.WriteLock() {
+ return ErrClosed
+ }
+ return nil
+}
+
+// writeUnlock removes a reference from the file and unlocks it for writing.
+// It also closes the file if it is marked as closed and there is no remaining
+// reference.
+func (file *file) writeUnlock() {
+ if file.fdmu.WriteUnlock() {
+ file.destroy()
+ }
+}
// can overwrite this data, which could cause the finalizer
// to close the wrong file descriptor.
type file struct {
+ fdmu poll.FDMutex
fd int
name string
dirinfo *dirInfo // nil unless directory being read
// be canceled and return immediately with an ErrClosed error.
// Close will return an error if it has already been called.
func (f *File) Close() error {
- if err := f.checkValid("close"); err != nil {
- return err
+ if f == nil {
+ return ErrInvalid
}
return f.file.close()
}
func (file *file) close() error {
- if file == nil || file.fd == badFd {
- return ErrInvalid
+ if !file.fdmu.IncrefAndClose() {
+ return &PathError{Op: "close", Path: file.name, Err: ErrClosed}
}
+
+ // At this point we should cancel any pending I/O.
+ // How do we do that on Plan 9?
+
+ err := file.decref()
+
+ // no need for a finalizer anymore
+ runtime.SetFinalizer(file, nil)
+ return err
+}
+
+// destroy actually closes the descriptor. This is called when
+// there are no remaining references, by the decref, readUnlock,
+// and writeUnlock methods.
+func (file *file) destroy() error {
var err error
if e := syscall.Close(file.fd); e != nil {
err = &PathError{Op: "close", Path: file.name, Err: e}
}
- file.fd = badFd // so it can't be closed again
-
- // no need for a finalizer anymore
- runtime.SetFinalizer(file, nil)
return err
}
if err != nil {
return &PathError{Op: "truncate", Path: f.name, Err: err}
}
+
+ if err := f.incref("truncate"); err != nil {
+ return err
+ }
+ defer f.decref()
+
if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "truncate", Path: f.name, Err: err}
}
if err != nil {
return &PathError{Op: "chmod", Path: f.name, Err: err}
}
+
+ if err := f.incref("chmod"); err != nil {
+ return err
+ }
+ defer f.decref()
+
if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "chmod", Path: f.name, Err: err}
}
if err != nil {
return &PathError{Op: "sync", Path: f.name, Err: err}
}
+
+ if err := f.incref("sync"); err != nil {
+ return err
+ }
+ defer f.decref()
+
if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
return &PathError{Op: "sync", Path: f.name, Err: err}
}
// read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any.
func (f *File) read(b []byte) (n int, err error) {
+ if err := f.readLock(); err != nil {
+ return 0, err
+ }
+ defer f.readUnlock()
n, e := fixCount(syscall.Read(f.fd, b))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
// It returns the number of bytes read and the error, if any.
// EOF is signaled by a zero count with err set to nil.
func (f *File) pread(b []byte, off int64) (n int, err error) {
+ if err := f.readLock(); err != nil {
+ return 0, err
+ }
+ defer f.readUnlock()
n, e := fixCount(syscall.Pread(f.fd, b, off))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
// Since Plan 9 preserves message boundaries, never allow
// a zero-byte write.
func (f *File) write(b []byte) (n int, err error) {
+ if err := f.writeLock(); err != nil {
+ return 0, err
+ }
+ defer f.writeUnlock()
if len(b) == 0 {
return 0, nil
}
// Since Plan 9 preserves message boundaries, never allow
// a zero-byte write.
func (f *File) pwrite(b []byte, off int64) (n int, err error) {
+ if err := f.writeLock(); err != nil {
+ return 0, err
+ }
+ defer f.writeUnlock()
if len(b) == 0 {
return 0, nil
}
// relative to the current offset, and 2 means relative to the end.
// It returns the new offset and an error, if any.
func (f *File) seek(offset int64, whence int) (ret int64, err error) {
+ if err := f.incref(""); err != nil {
+ return 0, err
+ }
+ defer f.decref()
if f.dirinfo != nil {
// Free cached dirinfo, so we allocate a new one if we
// access this file as a directory again. See #35767 and #37161.
// which must be a directory.
// If there is an error, it will be of type *PathError.
func (f *File) Chdir() error {
- if err := f.checkValid("chdir"); err != nil {
+ if err := f.incref("chdir"); err != nil {
return err
}
+ defer f.decref()
if e := syscall.Fchdir(f.fd); e != nil {
return &PathError{Op: "chdir", Path: f.name, Err: e}
}
return poll.ErrNoDeadline
}
-// checkValid checks whether f is valid for use.
-// If not, it returns an appropriate error, perhaps incorporating the operation name op.
+// checkValid checks whether f is valid for use, but does not prepare
+// to actually use it. If f is not ready checkValid returns an appropriate
+// error, perhaps incorporating the operation name op.
func (f *File) checkValid(op string) error {
if f == nil {
return ErrInvalid
}
- if f.fd == badFd {
- return &PathError{Op: op, Path: f.name, Err: ErrClosed}
+ if err := f.incref(op); err != nil {
+ return err
}
- return nil
+ return f.decref()
}
type rawConn struct{}
t.Errorf("expected 0 allocs for File.WriteString, got %v", allocs)
}
}
+
+// Test that it's OK to have parallel I/O and Close on a pipe.
+func TestPipeIOCloseRace(t *testing.T) {
+ // Skip on wasm, which doesn't have pipes.
+ if runtime.GOOS == "js" {
+ t.Skip("skipping on js: no pipes")
+ }
+
+ r, w, err := Pipe()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(3)
+
+ go func() {
+ defer wg.Done()
+ for {
+ n, err := w.Write([]byte("hi"))
+ if err != nil {
+ // We look at error strings as the
+ // expected errors are OS-specific.
+ switch {
+ case errors.Is(err, ErrClosed),
+ strings.Contains(err.Error(), "broken pipe"),
+ strings.Contains(err.Error(), "pipe is being closed"),
+ strings.Contains(err.Error(), "hungup channel"):
+ // Ignore an expected error.
+ default:
+ // Unexpected error.
+ t.Error(err)
+ }
+ return
+ }
+ if n != 2 {
+ t.Errorf("wrote %d bytes, expected 2", n)
+ return
+ }
+ }
+ }()
+
+ go func() {
+ defer wg.Done()
+ for {
+ var buf [2]byte
+ n, err := r.Read(buf[:])
+ if err != nil {
+ if err != io.EOF && !errors.Is(err, ErrClosed) {
+ t.Error(err)
+ }
+ return
+ }
+ if n != 2 {
+ t.Errorf("read %d bytes, want 2", n)
+ }
+ }
+ }()
+
+ go func() {
+ defer wg.Done()
+
+ // Let the other goroutines start. This is just to get
+ // a better test, the test will still pass if they
+ // don't start.
+ time.Sleep(time.Millisecond)
+
+ if err := r.Close(); err != nil {
+ t.Error(err)
+ }
+ if err := w.Close(); err != nil {
+ t.Error(err)
+ }
+ }()
+
+ wg.Wait()
+}
+
+// Test that it's OK to call Close concurrently on a pipe.
+func TestPipeCloseRace(t *testing.T) {
+ // Skip on wasm, which doesn't have pipes.
+ if runtime.GOOS == "js" {
+ t.Skip("skipping on js: no pipes")
+ }
+
+ r, w, err := Pipe()
+ if err != nil {
+ t.Fatal(err)
+ }
+ var wg sync.WaitGroup
+ c := make(chan error, 4)
+ f := func() {
+ defer wg.Done()
+ c <- r.Close()
+ c <- w.Close()
+ }
+ wg.Add(2)
+ go f()
+ go f()
+ nils, errs := 0, 0
+ for i := 0; i < 4; i++ {
+ err := <-c
+ if err == nil {
+ nils++
+ } else {
+ errs++
+ }
+ }
+ if nils != 2 || errs != 2 {
+ t.Errorf("got nils %d errs %d, want 2 2", nils, errs)
+ }
+}
switch a := arg.(type) {
case *File:
name = a.name
+ if err := a.incref("fstat"); err != nil {
+ return nil, err
+ }
n, err = syscall.Fstat(a.fd, buf)
+ a.decref()
case string:
name = a
n, err = syscall.Stat(a, buf)
b := fs2.sys.(*syscall.Dir)
return a.Qid.Path == b.Qid.Path && a.Type == b.Type && a.Dev == b.Dev
}
-
-const badFd = -1