]> Cypherpunks repositories - gostls13.git/commitdiff
os: use poll.fdMutex for Plan 9 files
authorIan Lance Taylor <iant@golang.org>
Tue, 4 Oct 2022 21:35:39 +0000 (14:35 -0700)
committerGopher Robot <gobot@golang.org>
Sat, 8 Oct 2022 03:57:40 +0000 (03:57 +0000)
This permits us to safely support concurrent access to files on Plan 9.
Concurrent access was already safe on other systems.

This does introduce a change: if one goroutine calls a blocking read
on a pipe, and another goroutine closes the pipe, then before this CL
the close would occur. Now the close will be delayed until the blocking
read completes.

Also add tests that concurrent I/O and Close on a pipe are OK.

For #50436
For #56043

Change-Id: I969c869ea3b8c5c2f2ef319e441a56a3c64e7bf5
Reviewed-on: https://go-review.googlesource.com/c/go/+/438347
Reviewed-by: Bryan Mills <bcmills@google.com>
Reviewed-by: Ian Lance Taylor <iant@google.com>
Reviewed-by: David du Colombier <0intro@gmail.com>
Run-TryBot: Ian Lance Taylor <iant@google.com>
Auto-Submit: Ian Lance Taylor <iant@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Rob Pike <r@golang.org>
src/internal/poll/export_test.go
src/internal/poll/fd_mutex_test.go
src/internal/poll/file_plan9.go [new file with mode: 0644]
src/os/file_mutex_plan9.go [new file with mode: 0644]
src/os/file_plan9.go
src/os/os_test.go
src/os/stat_plan9.go
src/os/types_plan9.go

index 02664d9ea30698a02d0a803c71925569b42c7daf..66d7c3274bed638dbd53d670e6b602881d4f29e9 100644 (file)
@@ -10,26 +10,26 @@ package poll
 
 var Consume = consume
 
-type FDMutex struct {
+type XFDMutex struct {
        fdMutex
 }
 
-func (mu *FDMutex) Incref() bool {
+func (mu *XFDMutex) Incref() bool {
        return mu.incref()
 }
 
-func (mu *FDMutex) IncrefAndClose() bool {
+func (mu *XFDMutex) IncrefAndClose() bool {
        return mu.increfAndClose()
 }
 
-func (mu *FDMutex) Decref() bool {
+func (mu *XFDMutex) Decref() bool {
        return mu.decref()
 }
 
-func (mu *FDMutex) RWLock(read bool) bool {
+func (mu *XFDMutex) RWLock(read bool) bool {
        return mu.rwlock(read)
 }
 
-func (mu *FDMutex) RWUnlock(read bool) bool {
+func (mu *XFDMutex) RWUnlock(read bool) bool {
        return mu.rwunlock(read)
 }
index 3029b9a6811f1fe79fa64dc3cad5980459dedd26..62f953192d751eeae80fc819443d5b8b8dc60d61 100644 (file)
@@ -14,7 +14,7 @@ import (
 )
 
 func TestMutexLock(t *testing.T) {
-       var mu FDMutex
+       var mu XFDMutex
 
        if !mu.Incref() {
                t.Fatal("broken")
@@ -39,7 +39,7 @@ func TestMutexLock(t *testing.T) {
 }
 
 func TestMutexClose(t *testing.T) {
-       var mu FDMutex
+       var mu XFDMutex
        if !mu.IncrefAndClose() {
                t.Fatal("broken")
        }
@@ -60,7 +60,7 @@ func TestMutexClose(t *testing.T) {
 
 func TestMutexCloseUnblock(t *testing.T) {
        c := make(chan bool, 4)
-       var mu FDMutex
+       var mu XFDMutex
        mu.RWLock(true)
        for i := 0; i < 4; i++ {
                go func() {
@@ -104,7 +104,7 @@ func TestMutexPanic(t *testing.T) {
                f()
        }
 
-       var mu FDMutex
+       var mu XFDMutex
        ensurePanics(func() { mu.Decref() })
        ensurePanics(func() { mu.RWUnlock(true) })
        ensurePanics(func() { mu.RWUnlock(false) })
@@ -137,7 +137,7 @@ func TestMutexOverflowPanic(t *testing.T) {
                }
        }()
 
-       var mu1 FDMutex
+       var mu1 XFDMutex
        for i := 0; i < 1<<21; i++ {
                mu1.Incref()
        }
@@ -152,7 +152,7 @@ func TestMutexStress(t *testing.T) {
        }
        defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
        done := make(chan bool, P)
-       var mu FDMutex
+       var mu XFDMutex
        var readState [2]uint64
        var writeState [2]uint64
        for p := 0; p < P; p++ {
diff --git a/src/internal/poll/file_plan9.go b/src/internal/poll/file_plan9.go
new file mode 100644 (file)
index 0000000..57dc0c6
--- /dev/null
@@ -0,0 +1,42 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package poll
+
+// Expose fdMutex for use by the os package on Plan 9.
+// On Plan 9 we don't want to use async I/O for file operations,
+// but we still want the locking semantics that fdMutex provides.
+
+// FDMutex is an exported fdMutex, only for Plan 9.
+type FDMutex struct {
+       fdmu fdMutex
+}
+
+func (fdmu *FDMutex) Incref() bool {
+       return fdmu.fdmu.incref()
+}
+
+func (fdmu *FDMutex) Decref() bool {
+       return fdmu.fdmu.decref()
+}
+
+func (fdmu *FDMutex) IncrefAndClose() bool {
+       return fdmu.fdmu.increfAndClose()
+}
+
+func (fdmu *FDMutex) ReadLock() bool {
+       return fdmu.fdmu.rwlock(true)
+}
+
+func (fdmu *FDMutex) ReadUnlock() bool {
+       return fdmu.fdmu.rwunlock(true)
+}
+
+func (fdmu *FDMutex) WriteLock() bool {
+       return fdmu.fdmu.rwlock(false)
+}
+
+func (fdmu *FDMutex) WriteUnlock() bool {
+       return fdmu.fdmu.rwunlock(false)
+}
diff --git a/src/os/file_mutex_plan9.go b/src/os/file_mutex_plan9.go
new file mode 100644 (file)
index 0000000..26bf5a7
--- /dev/null
@@ -0,0 +1,70 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package os
+
+// File locking support for Plan 9. This uses fdMutex from the
+// internal/poll package.
+
+// incref adds a reference to the file. It returns an error if the file
+// is already closed. This method is on File so that we can incorporate
+// a nil test.
+func (f *File) incref(op string) (err error) {
+       if f == nil {
+               return ErrInvalid
+       }
+       if !f.fdmu.Incref() {
+               err = ErrClosed
+               if op != "" {
+                       err = &PathError{Op: op, Path: f.name, Err: err}
+               }
+       }
+       return err
+}
+
+// decref removes a reference to the file. If this is the last
+// remaining reference, and the file has been marked to be closed,
+// then actually close it.
+func (file *file) decref() error {
+       if file.fdmu.Decref() {
+               return file.destroy()
+       }
+       return nil
+}
+
+// readLock adds a reference to the file and locks it for reading.
+// It returns an error if the file is already closed.
+func (file *file) readLock() error {
+       if !file.fdmu.ReadLock() {
+               return ErrClosed
+       }
+       return nil
+}
+
+// readUnlock removes a reference from the file and unlocks it for reading.
+// It also closes the file if it marked as closed and there is no remaining
+// reference.
+func (file *file) readUnlock() {
+       if file.fdmu.ReadUnlock() {
+               file.destroy()
+       }
+}
+
+// writeLock adds a reference to the file and locks it for writing.
+// It returns an error if the file is already closed.
+func (file *file) writeLock() error {
+       if !file.fdmu.WriteLock() {
+               return ErrClosed
+       }
+       return nil
+}
+
+// writeUnlock removes a reference from the file and unlocks it for writing.
+// It also closes the file if it is marked as closed and there is no remaining
+// reference.
+func (file *file) writeUnlock() {
+       if file.fdmu.WriteUnlock() {
+               file.destroy()
+       }
+}
index 93eb233e004a793fa3f56b5e9c04c5b6312f219f..7a4a562783ee109c810fc5d17be0c95a4112d9a9 100644 (file)
@@ -22,6 +22,7 @@ func fixLongPath(path string) string {
 // can overwrite this data, which could cause the finalizer
 // to close the wrong file descriptor.
 type file struct {
+       fdmu       poll.FDMutex
        fd         int
        name       string
        dirinfo    *dirInfo // nil unless directory being read
@@ -142,24 +143,35 @@ func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
 // be canceled and return immediately with an ErrClosed error.
 // Close will return an error if it has already been called.
 func (f *File) Close() error {
-       if err := f.checkValid("close"); err != nil {
-               return err
+       if f == nil {
+               return ErrInvalid
        }
        return f.file.close()
 }
 
 func (file *file) close() error {
-       if file == nil || file.fd == badFd {
-               return ErrInvalid
+       if !file.fdmu.IncrefAndClose() {
+               return &PathError{Op: "close", Path: file.name, Err: ErrClosed}
        }
+
+       // At this point we should cancel any pending I/O.
+       // How do we do that on Plan 9?
+
+       err := file.decref()
+
+       // no need for a finalizer anymore
+       runtime.SetFinalizer(file, nil)
+       return err
+}
+
+// destroy actually closes the descriptor. This is called when
+// there are no remaining references, by the decref, readUnlock,
+// and writeUnlock methods.
+func (file *file) destroy() error {
        var err error
        if e := syscall.Close(file.fd); e != nil {
                err = &PathError{Op: "close", Path: file.name, Err: e}
        }
-       file.fd = badFd // so it can't be closed again
-
-       // no need for a finalizer anymore
-       runtime.SetFinalizer(file, nil)
        return err
 }
 
@@ -193,6 +205,12 @@ func (f *File) Truncate(size int64) error {
        if err != nil {
                return &PathError{Op: "truncate", Path: f.name, Err: err}
        }
+
+       if err := f.incref("truncate"); err != nil {
+               return err
+       }
+       defer f.decref()
+
        if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
                return &PathError{Op: "truncate", Path: f.name, Err: err}
        }
@@ -219,6 +237,12 @@ func (f *File) chmod(mode FileMode) error {
        if err != nil {
                return &PathError{Op: "chmod", Path: f.name, Err: err}
        }
+
+       if err := f.incref("chmod"); err != nil {
+               return err
+       }
+       defer f.decref()
+
        if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
                return &PathError{Op: "chmod", Path: f.name, Err: err}
        }
@@ -240,6 +264,12 @@ func (f *File) Sync() error {
        if err != nil {
                return &PathError{Op: "sync", Path: f.name, Err: err}
        }
+
+       if err := f.incref("sync"); err != nil {
+               return err
+       }
+       defer f.decref()
+
        if err = syscall.Fwstat(f.fd, buf[:n]); err != nil {
                return &PathError{Op: "sync", Path: f.name, Err: err}
        }
@@ -249,6 +279,10 @@ func (f *File) Sync() error {
 // read reads up to len(b) bytes from the File.
 // It returns the number of bytes read and an error, if any.
 func (f *File) read(b []byte) (n int, err error) {
+       if err := f.readLock(); err != nil {
+               return 0, err
+       }
+       defer f.readUnlock()
        n, e := fixCount(syscall.Read(f.fd, b))
        if n == 0 && len(b) > 0 && e == nil {
                return 0, io.EOF
@@ -260,6 +294,10 @@ func (f *File) read(b []byte) (n int, err error) {
 // It returns the number of bytes read and the error, if any.
 // EOF is signaled by a zero count with err set to nil.
 func (f *File) pread(b []byte, off int64) (n int, err error) {
+       if err := f.readLock(); err != nil {
+               return 0, err
+       }
+       defer f.readUnlock()
        n, e := fixCount(syscall.Pread(f.fd, b, off))
        if n == 0 && len(b) > 0 && e == nil {
                return 0, io.EOF
@@ -272,6 +310,10 @@ func (f *File) pread(b []byte, off int64) (n int, err error) {
 // Since Plan 9 preserves message boundaries, never allow
 // a zero-byte write.
 func (f *File) write(b []byte) (n int, err error) {
+       if err := f.writeLock(); err != nil {
+               return 0, err
+       }
+       defer f.writeUnlock()
        if len(b) == 0 {
                return 0, nil
        }
@@ -283,6 +325,10 @@ func (f *File) write(b []byte) (n int, err error) {
 // Since Plan 9 preserves message boundaries, never allow
 // a zero-byte write.
 func (f *File) pwrite(b []byte, off int64) (n int, err error) {
+       if err := f.writeLock(); err != nil {
+               return 0, err
+       }
+       defer f.writeUnlock()
        if len(b) == 0 {
                return 0, nil
        }
@@ -294,6 +340,10 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) {
 // relative to the current offset, and 2 means relative to the end.
 // It returns the new offset and an error, if any.
 func (f *File) seek(offset int64, whence int) (ret int64, err error) {
+       if err := f.incref(""); err != nil {
+               return 0, err
+       }
+       defer f.decref()
        if f.dirinfo != nil {
                // Free cached dirinfo, so we allocate a new one if we
                // access this file as a directory again. See #35767 and #37161.
@@ -493,9 +543,10 @@ func tempDir() string {
 // which must be a directory.
 // If there is an error, it will be of type *PathError.
 func (f *File) Chdir() error {
-       if err := f.checkValid("chdir"); err != nil {
+       if err := f.incref("chdir"); err != nil {
                return err
        }
+       defer f.decref()
        if e := syscall.Fchdir(f.fd); e != nil {
                return &PathError{Op: "chdir", Path: f.name, Err: e}
        }
@@ -526,16 +577,17 @@ func (f *File) setWriteDeadline(time.Time) error {
        return poll.ErrNoDeadline
 }
 
-// checkValid checks whether f is valid for use.
-// If not, it returns an appropriate error, perhaps incorporating the operation name op.
+// checkValid checks whether f is valid for use, but does not prepare
+// to actually use it. If f is not ready checkValid returns an appropriate
+// error, perhaps incorporating the operation name op.
 func (f *File) checkValid(op string) error {
        if f == nil {
                return ErrInvalid
        }
-       if f.fd == badFd {
-               return &PathError{Op: op, Path: f.name, Err: ErrClosed}
+       if err := f.incref(op); err != nil {
+               return err
        }
-       return nil
+       return f.decref()
 }
 
 type rawConn struct{}
index ff745983620274655b4762308ccff93d02b5a84c..550b7db5a3327e4c8f5026ff23950fa4d48b7824 100644 (file)
@@ -2799,3 +2799,115 @@ func TestWriteStringAlloc(t *testing.T) {
                t.Errorf("expected 0 allocs for File.WriteString, got %v", allocs)
        }
 }
+
+// Test that it's OK to have parallel I/O and Close on a pipe.
+func TestPipeIOCloseRace(t *testing.T) {
+       // Skip on wasm, which doesn't have pipes.
+       if runtime.GOOS == "js" {
+               t.Skip("skipping on js: no pipes")
+       }
+
+       r, w, err := Pipe()
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       var wg sync.WaitGroup
+       wg.Add(3)
+
+       go func() {
+               defer wg.Done()
+               for {
+                       n, err := w.Write([]byte("hi"))
+                       if err != nil {
+                               // We look at error strings as the
+                               // expected errors are OS-specific.
+                               switch {
+                               case errors.Is(err, ErrClosed),
+                                       strings.Contains(err.Error(), "broken pipe"),
+                                       strings.Contains(err.Error(), "pipe is being closed"),
+                                       strings.Contains(err.Error(), "hungup channel"):
+                                       // Ignore an expected error.
+                               default:
+                                       // Unexpected error.
+                                       t.Error(err)
+                               }
+                               return
+                       }
+                       if n != 2 {
+                               t.Errorf("wrote %d bytes, expected 2", n)
+                               return
+                       }
+               }
+       }()
+
+       go func() {
+               defer wg.Done()
+               for {
+                       var buf [2]byte
+                       n, err := r.Read(buf[:])
+                       if err != nil {
+                               if err != io.EOF && !errors.Is(err, ErrClosed) {
+                                       t.Error(err)
+                               }
+                               return
+                       }
+                       if n != 2 {
+                               t.Errorf("read %d bytes, want 2", n)
+                       }
+               }
+       }()
+
+       go func() {
+               defer wg.Done()
+
+               // Let the other goroutines start. This is just to get
+               // a better test, the test will still pass if they
+               // don't start.
+               time.Sleep(time.Millisecond)
+
+               if err := r.Close(); err != nil {
+                       t.Error(err)
+               }
+               if err := w.Close(); err != nil {
+                       t.Error(err)
+               }
+       }()
+
+       wg.Wait()
+}
+
+// Test that it's OK to call Close concurrently on a pipe.
+func TestPipeCloseRace(t *testing.T) {
+       // Skip on wasm, which doesn't have pipes.
+       if runtime.GOOS == "js" {
+               t.Skip("skipping on js: no pipes")
+       }
+
+       r, w, err := Pipe()
+       if err != nil {
+               t.Fatal(err)
+       }
+       var wg sync.WaitGroup
+       c := make(chan error, 4)
+       f := func() {
+               defer wg.Done()
+               c <- r.Close()
+               c <- w.Close()
+       }
+       wg.Add(2)
+       go f()
+       go f()
+       nils, errs := 0, 0
+       for i := 0; i < 4; i++ {
+               err := <-c
+               if err == nil {
+                       nils++
+               } else {
+                       errs++
+               }
+       }
+       if nils != 2 || errs != 2 {
+               t.Errorf("got nils %d errs %d, want 2 2", nils, errs)
+       }
+}
index e20accf191321cdc7d37e271e7f679742bb306f1..a5e9901379aeac52c68c94682c5f903945bac0e2 100644 (file)
@@ -56,7 +56,11 @@ func dirstat(arg any) (*syscall.Dir, error) {
                switch a := arg.(type) {
                case *File:
                        name = a.name
+                       if err := a.incref("fstat"); err != nil {
+                               return nil, err
+                       }
                        n, err = syscall.Fstat(a.fd, buf)
+                       a.decref()
                case string:
                        name = a
                        n, err = syscall.Stat(a, buf)
index ccf4fd932e7a53e4498a6a94fff69e330e3f70fb..adb40130855a82abab4c8392cc6da17c5348a928 100644 (file)
@@ -28,5 +28,3 @@ func sameFile(fs1, fs2 *fileStat) bool {
        b := fs2.sys.(*syscall.Dir)
        return a.Qid.Path == b.Qid.Path && a.Type == b.Type && a.Dev == b.Dev
 }
-
-const badFd = -1