--- /dev/null
+# Test that when the coordinator experiences an I/O error communicating
+# with a worker, the coordinator stops the worker and reports the error.
+# The coordinator should not record a crasher.
+#
+# We simulate an I/O error in the test by writing garbage to fuzz_out.
+# This is unlikely, but possible. It's difficult to simulate interruptions
+# due to ^C and EOF errors which are more common. We don't report those.
+[short] skip
+[!darwin] [!linux] [!windows] skip
+
+# If the I/O error occurs before F.Fuzz is called, the coordinator should
+# stop the worker and say that.
+! go test -fuzz=FuzzClosePipeBefore -parallel=1
+stdout '\s*fuzzing process terminated without fuzzing:'
+! stdout 'communicating with fuzzing process'
+! exists testdata
+
+# If the I/O error occurs after F.Fuzz is called (unlikely), just exit.
+# It's hard to distinguish this case from the worker being interrupted by ^C
+# or exiting with status 0 (which it should do when interrupted by ^C).
+! go test -fuzz=FuzzClosePipeAfter -parallel=1
+stdout '^\s*communicating with fuzzing process: invalid character ''!'' looking for beginning of value$'
+! exists testdata
+
+-- go.mod --
+module test
+
+go 1.17
+-- io_error_test.go --
+package io_error
+
+import (
+ "flag"
+ "testing"
+ "time"
+)
+
+func isWorker() bool {
+ f := flag.Lookup("test.fuzzworker")
+ if f == nil {
+ return false
+ }
+ get, ok := f.Value.(flag.Getter)
+ if !ok {
+ return false
+ }
+ return get.Get() == interface{}(true)
+}
+
+func FuzzClosePipeBefore(f *testing.F) {
+ if isWorker() {
+ sendGarbageToCoordinator(f)
+ time.Sleep(3600 * time.Second) // pause until coordinator terminates the process
+ }
+ f.Fuzz(func(*testing.T, []byte) {})
+}
+
+func FuzzClosePipeAfter(f *testing.F) {
+ f.Fuzz(func(t *testing.T, _ []byte) {
+ if isWorker() {
+ sendGarbageToCoordinator(t)
+ time.Sleep(3600 * time.Second) // pause until coordinator terminates the process
+ }
+ })
+}
+-- io_error_windows_test.go --
+package io_error
+
+import (
+ "fmt"
+ "os"
+ "testing"
+)
+
+func sendGarbageToCoordinator(tb testing.TB) {
+ v := os.Getenv("GO_TEST_FUZZ_WORKER_HANDLES")
+ var fuzzInFD, fuzzOutFD uintptr
+ if _, err := fmt.Sscanf(v, "%x,%x", &fuzzInFD, &fuzzOutFD); err != nil {
+ tb.Fatalf("parsing GO_TEST_FUZZ_WORKER_HANDLES: %v", err)
+ }
+ f := os.NewFile(fuzzOutFD, "fuzz_out")
+ if _, err := f.Write([]byte("!!")); err != nil {
+ tb.Fatalf("writing fuzz_out: %v", err)
+ }
+}
+-- io_error_notwindows_test.go --
+// +build !windows
+
+package io_error
+
+import (
+ "os"
+ "testing"
+)
+
+func sendGarbageToCoordinator(tb testing.TB) {
+ f := os.NewFile(4, "fuzz_out")
+ if _, err := f.Write([]byte("!!")); err != nil {
+ tb.Fatalf("writing fuzz_out: %v", err)
+ }
+}
memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes.
- cmd *exec.Cmd // current worker process
- client *workerClient // used to communicate with worker process
- waitErr error // last error returned by wait, set before termC is closed.
- termC chan struct{} // closed by wait when worker process terminates
+ cmd *exec.Cmd // current worker process
+ client *workerClient // used to communicate with worker process
+ waitErr error // last error returned by wait, set before termC is closed.
+ interrupted bool // true after stop interrupts a running worker.
+ termC chan struct{} // closed by wait when worker process terminates
}
// cleanup releases persistent resources associated with the worker.
return mem.Close()
}
-// runFuzzing runs the test binary to perform fuzzing.
+// coordinate runs the test binary to perform fuzzing.
//
-// This function loops until w.coordinator.doneC is closed or some
-// fatal error is encountered. It receives inputs from w.coordinator.inputC,
-// then passes those on to the worker process.
-func (w *worker) runFuzzing() error {
+// coordinate loops until ctx is cancelled or a fatal error is encountered. While
+// looping, coordinate receives inputs from w.coordinator.inputC, then passes
+// those on to the worker process.
+func (w *worker) coordinate(ctx context.Context) error {
// Start the process.
if err := w.start(); err != nil {
// We couldn't start the worker process. We can't do anything, and it's
return err
}
- // inputC is set to w.coordinator.inputC when the worker is able to process
- // input. It's nil at other times, so its case won't be selected in the
- // event loop below.
- var inputC chan CorpusEntry
-
- // A value is sent to fuzzC to tell the worker to prepare to process an input
- // by setting inputC.
- fuzzC := make(chan struct{}, 1)
-
// Send the worker a message to make sure it can respond.
// Errors that occur before we get a response likely indicate that
// the worker did not call F.Fuzz or called F.Fail first.
// We don't record crashers for these errors.
- pinged := false
- go func() {
- err := w.client.ping()
- if err != nil {
- w.stop() // trigger termC case below
- return
+ if err := w.client.ping(ctx); err != nil {
+ w.stop()
+ if ctx.Err() != nil {
+ return ctx.Err()
}
- pinged = true
- fuzzC <- struct{}{}
- }()
+ if isInterruptError(err) {
+ // User may have pressed ^C before worker responded.
+ return nil
+ }
+ return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
+ // TODO(jayconrod,katiehockman): record and return stderr.
+ }
// Main event loop.
for {
select {
- case <-w.coordinator.doneC:
- // All workers were told to stop.
+ case <-ctx.Done():
+ // Worker was told to stop.
err := w.stop()
- if isInterruptError(err) {
- // Worker interrupted by SIGINT. This can happen if the worker receives
- // SIGINT before installing the signal handler. That's likely if
- // TestMain or the fuzz target setup takes a long time.
- return nil
+ if err != nil && !w.interrupted && !isInterruptError(err) {
+ return err
}
- return err
+ return ctx.Err()
case <-w.termC:
- // Worker process terminated unexpectedly.
- if !pinged {
- w.stop()
- return fmt.Errorf("worker terminated without fuzzing")
- // TODO(jayconrod,katiehockman): record and return stderr.
+ // Worker process terminated unexpectedly while waiting for input.
+ err := w.stop()
+ if w.interrupted {
+ panic("worker interrupted after unexpected termination")
}
- if isInterruptError(w.waitErr) {
- // Worker interrupted by SIGINT. See comment in doneC case.
- w.stop()
+ if err == nil || isInterruptError(err) {
+ // Worker stopped, either by exiting with status 0 or after being
+ // interrupted with a signal that was not sent by the coordinator.
+ //
+ // When the user presses ^C, on POSIX platforms, SIGINT is delivered to
+ // all processes in the group concurrently, and the worker may see it
+ // before the coordinator. The worker should exit 0 gracefully (in
+ // theory).
+ //
+ // This condition is probably intended by the user, so suppress
+ // the error.
return nil
}
- if exitErr, ok := w.waitErr.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
- w.stop()
- return fmt.Errorf("worker exited unexpectedly due to an internal failure")
- // TODO(jayconrod,katiehockman): record and return stderr.
- }
-
- // Unexpected termination. Inform the coordinator about the crash.
- // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
- mem := <-w.memMu
- value := mem.valueCopy()
- w.memMu <- mem
- message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v\n", w.waitErr)
- crasher := crasherEntry{
- CorpusEntry: CorpusEntry{Data: value},
- errMsg: message,
+ if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
+ // Worker exited with a code indicating F.Fuzz was not called correctly,
+ // for example, F.Fail was called first.
+ return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
}
- w.coordinator.crasherC <- crasher
- return w.stop()
+ // Worker exited non-zero or was terminated by a non-interrupt signal
+ // (for example, SIGSEGV).
+ return fmt.Errorf("fuzzing process terminated unexpectedly: %w", err)
+ // TODO(jayconrod,katiehockman): record and return stderr.
- case input := <-inputC:
+ case input := <-w.coordinator.inputC:
// Received input from coordinator.
- inputC = nil // block new inputs until we finish with this one.
- go func() {
- args := fuzzArgs{Duration: workerFuzzDuration}
- value, resp, err := w.client.fuzz(input.Data, args)
- if err != nil {
- // Error communicating with worker.
- select {
- case <-w.termC:
- // Worker terminated, perhaps unexpectedly.
- // We expect I/O errors due to partially sent or received RPCs,
- // so ignore this error.
- case <-w.coordinator.doneC:
- // Timeout or interruption. Worker may also be interrupted.
- // Again, ignore I/O errors.
- default:
- // TODO(jayconrod): if we get an error here, something failed between
- // main and the call to testing.F.Fuzz. The error here won't
- // be useful. Collect stderr, clean it up, and return that.
- // TODO(jayconrod): we can get EPIPE if w.stop is called concurrently
- // and it kills the worker process. Suppress this message in
- // that case.
- fmt.Fprintf(os.Stderr, "communicating with worker: %v\n", err)
- }
- // TODO(jayconrod): what happens if testing.F.Fuzz is never called?
- // TODO(jayconrod): time out if the test process hangs.
- } else if resp.Crashed {
- // The worker found a crasher. Inform the coordinator.
- crasher := crasherEntry{
- CorpusEntry: CorpusEntry{Data: value},
- errMsg: resp.Err,
- }
- w.coordinator.crasherC <- crasher
- } else {
- // Inform the coordinator that fuzzing found something
- // interesting (i.e. new coverage).
- if resp.Interesting {
- w.coordinator.interestingC <- CorpusEntry{Data: value}
- }
-
- // Continue fuzzing.
- fuzzC <- struct{}{}
+ args := fuzzArgs{Duration: workerFuzzDuration}
+ value, resp, err := w.client.fuzz(ctx, input.Data, args)
+ if err != nil {
+ // Error communicating with worker.
+ w.stop()
+ if ctx.Err() != nil {
+ // Timeout or interruption.
+ return ctx.Err()
+ }
+ if w.interrupted {
+ // Communication error before we stopped the worker.
+ // Report an error, but don't record a crasher.
+ return fmt.Errorf("communicating with fuzzing process: %v", err)
+ }
+ if w.waitErr == nil || isInterruptError(w.waitErr) {
+ // Worker stopped, either by exiting with status 0 or after being
+ // interrupted with a signal (not sent by coordinator). See comment in
+ // termC case above.
+ //
+ // Since we expect I/O errors around interrupts, ignore this error.
+ return nil
}
- // TODO(jayconrod,katiehockman): gather statistics.
- }()
- case <-fuzzC:
- // Worker finished fuzzing and nothing new happened.
- inputC = w.coordinator.inputC // unblock new inputs
+ // Unexpected termination. Inform the coordinator about the crash.
+ // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
+ mem := <-w.memMu
+ value := mem.valueCopy()
+ w.memMu <- mem
+ message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v", w.waitErr)
+ crasher := crasherEntry{
+ CorpusEntry: CorpusEntry{Data: value},
+ errMsg: message,
+ }
+ w.coordinator.crasherC <- crasher
+ return w.waitErr
+ } else if resp.Crashed {
+ // The worker found a crasher. Inform the coordinator.
+ crasher := crasherEntry{
+ CorpusEntry: CorpusEntry{Data: value},
+ errMsg: resp.Err,
+ }
+ w.coordinator.crasherC <- crasher
+ } else if resp.Interesting {
+ // Inform the coordinator that fuzzing found something
+ // interesting (i.e. new coverage).
+ w.coordinator.interestingC <- CorpusEntry{Data: value}
+ }
+ // TODO(jayconrod,katiehockman): gather statistics.
}
}
}
panic("worker already started")
}
w.waitErr = nil
+ w.interrupted = false
w.termC = nil
cmd := exec.Command(w.binPath, w.args...)
case <-t.C:
// Timer fired before worker terminated.
+ w.interrupted = true
switch sig {
case os.Interrupt:
// Try to stop the worker with SIGINT and wait a little longer.
case nil:
// Still waiting. Print a message to let the user know why.
- fmt.Fprintf(os.Stderr, "go: waiting for fuzz worker to terminate...\n")
+ fmt.Fprintf(os.Stderr, "go: waiting for fuzzing process to terminate...\n")
}
}
}
// does not return errors from method calls; those are passed through serialized
// responses.
func (ws *workerServer) serve(ctx context.Context) error {
- // Stop handling messages when ctx.Done() is closed. This normally happens
- // when the worker process receives a SIGINT signal, which on POSIX platforms
- // is sent to the process group when ^C is pressed.
- //
- // Ordinarily, the coordinator process may stop a worker by closing fuzz_in.
- // We simulate that and interrupt a blocked read here.
- doneC := make(chan struct{})
- defer func() { close(doneC) }()
+ // This goroutine may stay blocked after serve returns because the underlying
+ // read blocks, even after the file descriptor in this process is closed. The
+ // pipe must be closed by the client, too.
+ errC := make(chan error, 1)
go func() {
- select {
- case <-ctx.Done():
- ws.fuzzIn.Close()
- case <-doneC:
- }
- }()
+ enc := json.NewEncoder(ws.fuzzOut)
+ dec := json.NewDecoder(ws.fuzzIn)
+ for {
+ if ctx.Err() != nil {
+ return
+ }
- enc := json.NewEncoder(ws.fuzzOut)
- dec := json.NewDecoder(ws.fuzzIn)
- for {
- var c call
- if err := dec.Decode(&c); err != nil {
+ var c call
+ if err := dec.Decode(&c); err == io.EOF {
+ return
+ } else if err != nil {
+ errC <- err
+ return
+ }
if ctx.Err() != nil {
- return ctx.Err()
- } else if err == io.EOF {
- return nil
- } else {
- return err
+ return
}
- }
- var resp interface{}
- switch {
- case c.Fuzz != nil:
- resp = ws.fuzz(ctx, *c.Fuzz)
- case c.Ping != nil:
- resp = ws.ping(ctx, *c.Ping)
- default:
- return errors.New("no arguments provided for any call")
- }
+ var resp interface{}
+ switch {
+ case c.Fuzz != nil:
+ resp = ws.fuzz(ctx, *c.Fuzz)
+ case c.Ping != nil:
+ resp = ws.ping(ctx, *c.Ping)
+ default:
+ errC <- errors.New("no arguments provided for any call")
+ return
+ }
- if err := enc.Encode(resp); err != nil {
- return err
+ if err := enc.Encode(resp); err != nil {
+ errC <- err
+ return
+ }
}
+ }()
+
+ select {
+ case <-ctx.Done():
+ // Stop handling messages when ctx.Done() is closed. This normally happens
+ // when the worker process receives a SIGINT signal, which on POSIX platforms
+ // is sent to the process group when ^C is pressed.
+ return ctx.Err()
+ case err := <-errC:
+ return err
}
}
var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
// fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
-func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
+func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
wc.mu.Lock()
defer wc.mu.Unlock()
wc.memMu <- mem
c := call{Fuzz: &args}
- if err := wc.enc.Encode(c); err != nil {
- return nil, fuzzResponse{}, err
- }
- err = wc.dec.Decode(&resp)
-
+ err = wc.call(ctx, c, &resp)
mem, ok = <-wc.memMu
if !ok {
return nil, fuzzResponse{}, errSharedMemClosed
}
// ping tells the worker to call the ping method. See workerServer.ping.
-func (wc *workerClient) ping() error {
+func (wc *workerClient) ping(ctx context.Context) error {
c := call{Ping: &pingArgs{}}
- if err := wc.enc.Encode(c); err != nil {
- return err
- }
var resp pingResponse
- if err := wc.dec.Decode(&resp); err != nil {
+ return wc.call(ctx, c, &resp)
+}
+
+// call sends an RPC from the coordinator to the worker process and waits for
+// the response. The call may be cancelled with ctx.
+func (wc *workerClient) call(ctx context.Context, c call, resp interface{}) (err error) {
+ // This goroutine may stay blocked after call returns because the underlying
+ // read blocks, even after the file descriptor in this process is closed. The
+ // pipe must be closed by the server, too.
+ errC := make(chan error, 1)
+ go func() {
+ if err := wc.enc.Encode(c); err != nil {
+ errC <- err
+ return
+ }
+ errC <- wc.dec.Decode(resp)
+ }()
+
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-errC:
return err
}
- return nil
}