From 187957d37056592203fd758ae0245a28f4518122 Mon Sep 17 00:00:00 2001 From: Ian Lance Taylor Date: Tue, 17 Oct 2017 13:57:34 -0700 Subject: [PATCH] os: add deadline methods for File type Add SetDeadline, SetReadDeadline, and SetWriteDeadline methods to os.File, just as they exist today for the net package. Fixes #22114 Change-Id: I4d390d739169b991175baba676010897dc8568fa Reviewed-on: https://go-review.googlesource.com/71770 Run-TryBot: Ian Lance Taylor TryBot-Result: Gobot Gobot Reviewed-by: David Crawshaw Reviewed-by: Joe Tsai --- src/internal/poll/fd.go | 4 + src/internal/poll/fd_poll_runtime.go | 2 +- src/os/error.go | 25 ++ src/os/file.go | 45 ++ src/os/file_plan9.go | 25 ++ src/os/file_posix.go | 24 ++ src/os/file_unix.go | 25 +- src/os/pipe_bsd.go | 2 +- src/os/pipe_freebsd.go | 2 +- src/os/pipe_linux.go | 2 +- src/os/timeout_test.go | 589 +++++++++++++++++++++++++++ 11 files changed, 734 insertions(+), 11 deletions(-) create mode 100644 src/os/timeout_test.go diff --git a/src/internal/poll/fd.go b/src/internal/poll/fd.go index f1454dba90..2567746106 100644 --- a/src/internal/poll/fd.go +++ b/src/internal/poll/fd.go @@ -21,6 +21,10 @@ var ErrNetClosing = errors.New("use of closed network connection") // has been closed. var ErrFileClosing = errors.New("use of closed file") +// ErrNoDeadline is returned when a request is made to set a deadline +// on a file type that does not use the poller. +var ErrNoDeadline = errors.New("file type does not support deadline") + // Return the appropriate closing error based on isFile. func errClosing(isFile bool) error { if isFile { diff --git a/src/internal/poll/fd_poll_runtime.go b/src/internal/poll/fd_poll_runtime.go index 866f26f3fc..87a01a8b69 100644 --- a/src/internal/poll/fd_poll_runtime.go +++ b/src/internal/poll/fd_poll_runtime.go @@ -149,7 +149,7 @@ func setDeadlineImpl(fd *FD, t time.Time, mode int) error { } defer fd.decref() if fd.pd.runtimeCtx == 0 { - return errors.New("file type does not support deadlines") + return ErrNoDeadline } runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode) return nil diff --git a/src/os/error.go b/src/os/error.go index 7235bfb6d6..b4242a4829 100644 --- a/src/os/error.go +++ b/src/os/error.go @@ -6,6 +6,7 @@ package os import ( "errors" + "internal/poll" ) // Portable analogs of some common system call errors. @@ -15,8 +16,13 @@ var ( ErrExist = errors.New("file already exists") ErrNotExist = errors.New("file does not exist") ErrClosed = errors.New("file already closed") + ErrNoDeadline = poll.ErrNoDeadline ) +type timeout interface { + Timeout() bool +} + // PathError records an error and the operation and file path that caused it. type PathError struct { Op string @@ -26,6 +32,12 @@ type PathError struct { func (e *PathError) Error() string { return e.Op + " " + e.Path + ": " + e.Err.Error() } +// Timeout reports whether this error represents a timeout. +func (e *PathError) Timeout() bool { + t, ok := e.Err.(timeout) + return ok && t.Timeout() +} + // SyscallError records an error from a specific system call. type SyscallError struct { Syscall string @@ -34,6 +46,12 @@ type SyscallError struct { func (e *SyscallError) Error() string { return e.Syscall + ": " + e.Err.Error() } +// Timeout reports whether this error represents a timeout. +func (e *SyscallError) Timeout() bool { + t, ok := e.Err.(timeout) + return ok && t.Timeout() +} + // NewSyscallError returns, as an error, a new SyscallError // with the given system call name and error details. // As a convenience, if err is nil, NewSyscallError returns nil. @@ -65,6 +83,13 @@ func IsPermission(err error) bool { return isPermission(err) } +// IsTimeout returns a boolean indicating whether the error is known +// to report that a timeout occurred. +func IsTimeout(err error) bool { + terr, ok := underlyingError(err).(timeout) + return ok && terr.Timeout() +} + // underlyingError returns the underlying error for known os error types. func underlyingError(err error) error { switch err := err.(type) { diff --git a/src/os/file.go b/src/os/file.go index cf621d1c87..1fc4cf34da 100644 --- a/src/os/file.go +++ b/src/os/file.go @@ -41,6 +41,7 @@ import ( "internal/poll" "io" "syscall" + "time" ) // Name returns the name of the file as presented to Open. @@ -316,3 +317,47 @@ func Chmod(name string, mode FileMode) error { return chmod(name, mode) } // Chmod changes the mode of the file to mode. // If there is an error, it will be of type *PathError. func (f *File) Chmod(mode FileMode) error { return f.chmod(mode) } + +// SetDeadline sets the read and write deadlines for a File. +// It is equivalent to calling both SetReadDeadline and SetWriteDeadline. +// +// Only some kinds of files support setting a deadline. Calls to SetDeadline +// for files that do not support deadlines will return ErrNoDeadline. +// On most systems ordinary files do not support deadlines, but pipes do. +// +// A deadline is an absolute time after which I/O operations fail with an +// error instead of blocking. The deadline applies to all future and pending +// I/O, not just the immediately following call to Read or Write. +// After a deadline has been exceeded, the connection can be refreshed +// by setting a deadline in the future. +// +// An error returned after a timeout fails will implement the +// Timeout method, and calling the Timeout method will return true. +// The PathError and SyscallError types implement the Timeout method. +// In general, call IsTimeout to test whether an error indicates a timeout. +// +// An idle timeout can be implemented by repeatedly extending +// the deadline after successful Read or Write calls. +// +// A zero value for t means I/O operations will not time out. +func (f *File) SetDeadline(t time.Time) error { + return f.setDeadline(t) +} + +// SetReadDeadline sets the deadline for future Read calls and any +// currently-blocked Read call. +// A zero value for t means Read will not time out. +// Not all files support setting deadlines; see SetDeadline. +func (f *File) SetReadDeadline(t time.Time) error { + return f.setReadDeadline(t) +} + +// SetWriteDeadline sets the deadline for any future Write calls and any +// currently-blocked Write call. +// Even if Write times out, it may return n > 0, indicating that +// some of the data was successfully written. +// A zero value for t means Write will not time out. +// Not all files support setting deadlines; see SetDeadline. +func (f *File) SetWriteDeadline(t time.Time) error { + return f.setWriteDeadline(t) +} diff --git a/src/os/file_plan9.go b/src/os/file_plan9.go index 0f4a736c26..74c377127d 100644 --- a/src/os/file_plan9.go +++ b/src/os/file_plan9.go @@ -5,6 +5,7 @@ package os import ( + "internal/poll" "io" "runtime" "syscall" @@ -491,6 +492,30 @@ func (f *File) Chdir() error { return nil } +// setDeadline sets the read and write deadline. +func (f *File) setDeadline(time.Time) error { + if err := f.checkValid("SetDeadline"); err != nil { + return err + } + return poll.ErrNoDeadline +} + +// setReadDeadline sets the read deadline. +func (f *File) setReadDeadline(time.Time) error { + if err := f.checkValid("SetReadDeadline"); err != nil { + return err + } + return poll.ErrNoDeadline +} + +// setWriteDeadline sets the write deadline. +func (f *File) setWriteDeadline(time.Time) error { + if err := f.checkValid("SetWriteDeadline"); err != nil { + return err + } + return poll.ErrNoDeadline +} + // checkValid checks whether f is valid for use. // If not, it returns an appropriate error, perhaps incorporating the operation name op. func (f *File) checkValid(op string) error { diff --git a/src/os/file_posix.go b/src/os/file_posix.go index f38d43e43f..36f7b90e80 100644 --- a/src/os/file_posix.go +++ b/src/os/file_posix.go @@ -159,6 +159,30 @@ func (f *File) Chdir() error { return nil } +// setDeadline sets the read and write deadline. +func (f *File) setDeadline(t time.Time) error { + if err := f.checkValid("SetDeadline"); err != nil { + return err + } + return f.pfd.SetDeadline(t) +} + +// setReadDeadline sets the read deadline. +func (f *File) setReadDeadline(t time.Time) error { + if err := f.checkValid("SetReadDeadline"); err != nil { + return err + } + return f.pfd.SetReadDeadline(t) +} + +// setWriteDeadline sets the write deadline. +func (f *File) setWriteDeadline(t time.Time) error { + if err := f.checkValid("SetWriteDeadline"); err != nil { + return err + } + return f.pfd.SetWriteDeadline(t) +} + // checkValid checks whether f is valid for use. // If not, it returns an appropriate error, perhaps incorporating the operation name op. func (f *File) checkValid(op string) error { diff --git a/src/os/file_unix.go b/src/os/file_unix.go index 102cdfec50..84a2bb5f00 100644 --- a/src/os/file_unix.go +++ b/src/os/file_unix.go @@ -75,12 +75,22 @@ func (f *File) Fd() uintptr { // name. The returned value will be nil if fd is not a valid file // descriptor. func NewFile(fd uintptr, name string) *File { - return newFile(fd, name, false) + return newFile(fd, name, kindNewFile) } -// newFile is like NewFile, but if pollable is true it tries to add the -// file to the runtime poller. -func newFile(fd uintptr, name string, pollable bool) *File { +// newFileKind describes the kind of file to newFile. +type newFileKind int + +const ( + kindNewFile newFileKind = iota + kindOpenFile + kindPipe +) + +// newFile is like NewFile, but if called from OpenFile or Pipe +// (as passed in the kind parameter) it tries to add the file to +// the runtime poller. +func newFile(fd uintptr, name string, kind newFileKind) *File { fdi := int(fd) if fdi < 0 { return nil @@ -98,10 +108,11 @@ func newFile(fd uintptr, name string, pollable bool) *File { // Don't try to use kqueue with regular files on FreeBSD. // It crashes the system unpredictably while running all.bash. // Issue 19093. - if runtime.GOOS == "freebsd" { - pollable = false + if runtime.GOOS == "freebsd" && kind == kindOpenFile { + kind = kindNewFile } + pollable := kind == kindOpenFile || kind == kindPipe if err := f.pfd.Init("file", pollable); err != nil { // An error here indicates a failure to register // with the netpoll system. That can happen for @@ -183,7 +194,7 @@ func OpenFile(name string, flag int, perm FileMode) (*File, error) { syscall.CloseOnExec(r) } - return newFile(uintptr(r), name, true), nil + return newFile(uintptr(r), name, kindOpenFile), nil } // Close closes the File, rendering it unusable for I/O. diff --git a/src/os/pipe_bsd.go b/src/os/pipe_bsd.go index ffd201cf45..d16c2a6c0b 100644 --- a/src/os/pipe_bsd.go +++ b/src/os/pipe_bsd.go @@ -24,5 +24,5 @@ func Pipe() (r *File, w *File, err error) { syscall.CloseOnExec(p[1]) syscall.ForkLock.RUnlock() - return newFile(uintptr(p[0]), "|0", true), newFile(uintptr(p[1]), "|1", true), nil + return newFile(uintptr(p[0]), "|0", kindPipe), newFile(uintptr(p[1]), "|1", kindPipe), nil } diff --git a/src/os/pipe_freebsd.go b/src/os/pipe_freebsd.go index ea6622cd26..47983065d9 100644 --- a/src/os/pipe_freebsd.go +++ b/src/os/pipe_freebsd.go @@ -30,5 +30,5 @@ func Pipe() (r *File, w *File, err error) { syscall.ForkLock.RUnlock() } - return newFile(uintptr(p[0]), "|0", true), newFile(uintptr(p[1]), "|1", true), nil + return newFile(uintptr(p[0]), "|0", kindPipe), newFile(uintptr(p[1]), "|1", kindPipe), nil } diff --git a/src/os/pipe_linux.go b/src/os/pipe_linux.go index 96f2ce900c..acd7b88e1d 100644 --- a/src/os/pipe_linux.go +++ b/src/os/pipe_linux.go @@ -29,5 +29,5 @@ func Pipe() (r *File, w *File, err error) { return nil, nil, NewSyscallError("pipe2", e) } - return newFile(uintptr(p[0]), "|0", true), newFile(uintptr(p[1]), "|1", true), nil + return newFile(uintptr(p[0]), "|0", kindPipe), newFile(uintptr(p[1]), "|1", kindPipe), nil } diff --git a/src/os/timeout_test.go b/src/os/timeout_test.go new file mode 100644 index 0000000000..6f47ed04a9 --- /dev/null +++ b/src/os/timeout_test.go @@ -0,0 +1,589 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !nacl +// +build !plan9 +// +build !windows + +package os_test + +import ( + "fmt" + "internal/poll" + "io" + "io/ioutil" + "math/rand" + "os" + "runtime" + "sync" + "testing" + "time" +) + +func TestNonpollableDeadline(t *testing.T) { + // On BSD systems regular files seem to be pollable, + // so just run this test on Linux. + if runtime.GOOS != "linux" { + t.Skipf("skipping on %s", runtime.GOOS) + } + + f, err := ioutil.TempFile("", "ostest") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + defer f.Close() + deadline := time.Now().Add(10 * time.Second) + if err := f.SetDeadline(deadline); err != os.ErrNoDeadline { + t.Errorf("SetDeadline on file returned %v, wanted %v", err, os.ErrNoDeadline) + } + if err := f.SetReadDeadline(deadline); err != os.ErrNoDeadline { + t.Errorf("SetReadDeadline on file returned %v, wanted %v", err, os.ErrNoDeadline) + } + if err := f.SetWriteDeadline(deadline); err != os.ErrNoDeadline { + t.Errorf("SetWriteDeadline on file returned %v, wanted %v", err, os.ErrNoDeadline) + } +} + +// noDeadline is a zero time.Time value, which cancels a deadline. +var noDeadline time.Time + +var readTimeoutTests = []struct { + timeout time.Duration + xerrs [2]error // expected errors in transition +}{ + // Tests that read deadlines work, even if there's data ready + // to be read. + {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}}, + + {50 * time.Millisecond, [2]error{nil, poll.ErrTimeout}}, +} + +func TestReadTimeout(t *testing.T) { + t.Parallel() + + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + if _, err := w.Write([]byte("READ TIMEOUT TEST")); err != nil { + t.Fatal(err) + } + + for i, tt := range readTimeoutTests { + if err := r.SetReadDeadline(time.Now().Add(tt.timeout)); err != nil { + t.Fatalf("#%d: %v", i, err) + } + var b [1]byte + for j, xerr := range tt.xerrs { + for { + n, err := r.Read(b[:]) + if xerr != nil { + if !os.IsTimeout(err) { + t.Fatalf("#%d/%d: %v", i, j, err) + } + } + if err == nil { + time.Sleep(tt.timeout / 3) + continue + } + if n != 0 { + t.Fatalf("#%d/%d: read %d; want 0", i, j, n) + } + break + } + } + } +} + +func TestReadTimeoutMustNotReturn(t *testing.T) { + t.Parallel() + + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + max := time.NewTimer(100 * time.Millisecond) + defer max.Stop() + ch := make(chan error) + go func() { + if err := r.SetDeadline(time.Now().Add(-5 * time.Second)); err != nil { + t.Error(err) + } + if err := r.SetWriteDeadline(time.Now().Add(-5 * time.Second)); err != nil { + t.Error(err) + } + if err := r.SetReadDeadline(noDeadline); err != nil { + t.Error(err) + } + var b [1]byte + _, err := r.Read(b[:]) + ch <- err + }() + + select { + case err := <-ch: + t.Fatalf("expected Read to not return, but it returned with %v", err) + case <-max.C: + w.Close() + err := <-ch // wait for tester goroutine to stop + if os.IsTimeout(err) { + t.Fatal(err) + } + } +} + +var writeTimeoutTests = []struct { + timeout time.Duration + xerrs [2]error // expected errors in transition +}{ + // Tests that write deadlines work, even if there's buffer + // space available to write. + {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}}, + + {10 * time.Millisecond, [2]error{nil, poll.ErrTimeout}}, +} + +func TestWriteTimeout(t *testing.T) { + t.Parallel() + + for i, tt := range writeTimeoutTests { + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + if err := w.SetWriteDeadline(time.Now().Add(tt.timeout)); err != nil { + t.Fatalf("%v", err) + } + for j, xerr := range tt.xerrs { + for { + n, err := w.Write([]byte("WRITE TIMEOUT TEST")) + if xerr != nil { + if !os.IsTimeout(err) { + t.Fatalf("%d: %v", j, err) + } + } + if err == nil { + time.Sleep(tt.timeout / 3) + continue + } + if n != 0 { + t.Fatalf("%d: wrote %d; want 0", j, n) + } + break + } + } + }) + } +} + +func TestWriteTimeoutMustNotReturn(t *testing.T) { + t.Parallel() + + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + max := time.NewTimer(100 * time.Millisecond) + defer max.Stop() + ch := make(chan error) + go func() { + if err := w.SetDeadline(time.Now().Add(-5 * time.Second)); err != nil { + t.Error(err) + } + if err := w.SetReadDeadline(time.Now().Add(-5 * time.Second)); err != nil { + t.Error(err) + } + if err := w.SetWriteDeadline(noDeadline); err != nil { + t.Error(err) + } + var b [1]byte + for { + if _, err := w.Write(b[:]); err != nil { + ch <- err + break + } + } + }() + + select { + case err := <-ch: + t.Fatalf("expected Write to not return, but it returned with %v", err) + case <-max.C: + r.Close() + err := <-ch // wait for tester goroutine to stop + if os.IsTimeout(err) { + t.Fatal(err) + } + } +} + +func timeoutReader(r *os.File, d, min, max time.Duration, ch chan<- error) { + var err error + defer func() { ch <- err }() + + t0 := time.Now() + if err = r.SetReadDeadline(time.Now().Add(d)); err != nil { + return + } + b := make([]byte, 256) + var n int + n, err = r.Read(b) + t1 := time.Now() + if n != 0 || err == nil || !os.IsTimeout(err) { + err = fmt.Errorf("Read did not return (0, timeout): (%d, %v)", n, err) + return + } + if dt := t1.Sub(t0); min > dt || dt > max && !testing.Short() { + err = fmt.Errorf("Read took %s; expected %s", dt, d) + return + } +} + +func TestReadTimeoutFluctuation(t *testing.T) { + t.Parallel() + + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + max := time.NewTimer(time.Second) + defer max.Stop() + ch := make(chan error) + go timeoutReader(r, 100*time.Millisecond, 50*time.Millisecond, 250*time.Millisecond, ch) + + select { + case <-max.C: + t.Fatal("Read took over 1s; expected 0.1s") + case err := <-ch: + if !os.IsTimeout(err) { + t.Fatal(err) + } + } +} + +func timeoutWriter(w *os.File, d, min, max time.Duration, ch chan<- error) { + var err error + defer func() { ch <- err }() + + t0 := time.Now() + if err = w.SetWriteDeadline(time.Now().Add(d)); err != nil { + return + } + var n int + for { + n, err = w.Write([]byte("TIMEOUT WRITER")) + if err != nil { + break + } + } + t1 := time.Now() + if err == nil || !os.IsTimeout(err) { + err = fmt.Errorf("Write did not return (any, timeout): (%d, %v)", n, err) + return + } + if dt := t1.Sub(t0); min > dt || dt > max && !testing.Short() { + err = fmt.Errorf("Write took %s; expected %s", dt, d) + return + } +} + +func TestWriteTimeoutFluctuation(t *testing.T) { + t.Parallel() + + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + d := time.Second + max := time.NewTimer(d) + defer max.Stop() + ch := make(chan error) + go timeoutWriter(w, 100*time.Millisecond, 50*time.Millisecond, 250*time.Millisecond, ch) + + select { + case <-max.C: + t.Fatalf("Write took over %v; expected 0.1s", d) + case err := <-ch: + if !os.IsTimeout(err) { + t.Fatal(err) + } + } +} + +func TestVariousDeadlines(t *testing.T) { + t.Parallel() + testVariousDeadlines(t) +} + +func TestVariousDeadlines1Proc(t *testing.T) { + // Cannot use t.Parallel - modifies global GOMAXPROCS. + if testing.Short() { + t.Skip("skipping in short mode") + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) + testVariousDeadlines(t) +} + +func TestVariousDeadlines4Proc(t *testing.T) { + // Cannot use t.Parallel - modifies global GOMAXPROCS. + if testing.Short() { + t.Skip("skipping in short mode") + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + testVariousDeadlines(t) +} + +type neverEnding byte + +func (b neverEnding) Read(p []byte) (int, error) { + for i := range p { + p[i] = byte(b) + } + return len(p), nil +} + +func testVariousDeadlines(t *testing.T) { + type result struct { + n int64 + err error + d time.Duration + } + + handler := func(w *os.File, pasvch chan result) { + // The writer, with no timeouts of its own, + // sending bytes to clients as fast as it can. + t0 := time.Now() + n, err := io.Copy(w, neverEnding('a')) + dt := time.Since(t0) + pasvch <- result{n, err, dt} + } + + for _, timeout := range []time.Duration{ + 1 * time.Nanosecond, + 2 * time.Nanosecond, + 5 * time.Nanosecond, + 50 * time.Nanosecond, + 100 * time.Nanosecond, + 200 * time.Nanosecond, + 500 * time.Nanosecond, + 750 * time.Nanosecond, + 1 * time.Microsecond, + 5 * time.Microsecond, + 25 * time.Microsecond, + 250 * time.Microsecond, + 500 * time.Microsecond, + 1 * time.Millisecond, + 5 * time.Millisecond, + 100 * time.Millisecond, + 250 * time.Millisecond, + 500 * time.Millisecond, + 1 * time.Second, + } { + numRuns := 3 + if testing.Short() { + numRuns = 1 + if timeout > 500*time.Microsecond { + continue + } + } + for run := 0; run < numRuns; run++ { + t.Run(fmt.Sprintf("%v-%d", timeout, run+1), func(t *testing.T) { + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + pasvch := make(chan result) + go handler(w, pasvch) + + tooLong := 5 * time.Second + max := time.NewTimer(tooLong) + defer max.Stop() + actvch := make(chan result) + go func() { + t0 := time.Now() + if err := r.SetDeadline(t0.Add(timeout)); err != nil { + t.Error(err) + } + n, err := io.Copy(ioutil.Discard, r) + dt := time.Since(t0) + r.Close() + actvch <- result{n, err, dt} + }() + + select { + case res := <-actvch: + if os.IsTimeout(res.err) { + t.Logf("good client timeout after %v, reading %d bytes", res.d, res.n) + } else { + t.Fatalf("client Copy = %d, %v; want timeout", res.n, res.err) + } + case <-max.C: + t.Fatalf("timeout (%v) waiting for client to timeout (%v) reading", tooLong, timeout) + } + + select { + case res := <-pasvch: + t.Logf("writer in %v wrote %d: %v", res.d, res.n, res.err) + case <-max.C: + t.Fatalf("timeout waiting for writer to finish writing") + } + }) + } + } +} + +func TestReadWriteDeadlineRace(t *testing.T) { + t.Parallel() + + N := 1000 + if testing.Short() { + N = 50 + } + + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + tic := time.NewTicker(2 * time.Microsecond) + defer tic.Stop() + for i := 0; i < N; i++ { + if err := r.SetReadDeadline(time.Now().Add(2 * time.Microsecond)); err != nil { + break + } + if err := w.SetWriteDeadline(time.Now().Add(2 * time.Microsecond)); err != nil { + break + } + <-tic.C + } + }() + go func() { + defer wg.Done() + var b [1]byte + for i := 0; i < N; i++ { + _, err := r.Read(b[:]) + if err != nil && !os.IsTimeout(err) { + t.Error("Read returned non-timeout error", err) + } + } + }() + go func() { + defer wg.Done() + var b [1]byte + for i := 0; i < N; i++ { + _, err := w.Write(b[:]) + if err != nil && !os.IsTimeout(err) { + t.Error("Write returned non-timeout error", err) + } + } + }() + wg.Wait() // wait for tester goroutine to stop +} + +// TestRacyRead tests that it is safe to mutate the input Read buffer +// immediately after cancelation has occurred. +func TestRacyRead(t *testing.T) { + t.Parallel() + + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + var wg sync.WaitGroup + defer wg.Wait() + + go io.Copy(w, rand.New(rand.NewSource(0))) + + r.SetReadDeadline(time.Now().Add(time.Millisecond)) + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + b1 := make([]byte, 1024) + b2 := make([]byte, 1024) + for j := 0; j < 100; j++ { + _, err := r.Read(b1) + copy(b1, b2) // Mutate b1 to trigger potential race + if err != nil { + if !os.IsTimeout(err) { + t.Error(err) + } + r.SetReadDeadline(time.Now().Add(time.Millisecond)) + } + } + }() + } +} + +// TestRacyWrite tests that it is safe to mutate the input Write buffer +// immediately after cancelation has occurred. +func TestRacyWrite(t *testing.T) { + t.Parallel() + + r, w, err := os.Pipe() + if err != nil { + t.Fatal(err) + } + defer r.Close() + defer w.Close() + + var wg sync.WaitGroup + defer wg.Wait() + + go io.Copy(ioutil.Discard, r) + + w.SetWriteDeadline(time.Now().Add(time.Millisecond)) + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + b1 := make([]byte, 1024) + b2 := make([]byte, 1024) + for j := 0; j < 100; j++ { + _, err := w.Write(b1) + copy(b1, b2) // Mutate b1 to trigger potential race + if err != nil { + if !os.IsTimeout(err) { + t.Error(err) + } + w.SetWriteDeadline(time.Now().Add(time.Millisecond)) + } + } + }() + } +} -- 2.48.1