]> Cypherpunks repositories - gostls13.git/commitdiff
[dev.fuzz] internal/fuzz: improve cancellation in worker event loops
authorJay Conrod <jayconrod@google.com>
Fri, 19 Mar 2021 19:11:29 +0000 (15:11 -0400)
committerJay Conrod <jayconrod@google.com>
Fri, 9 Apr 2021 19:50:15 +0000 (19:50 +0000)
worker.runFuzzing now accepts a Context, used for cancellation instead
of doneC (which is removed). This is passed down through workerClient
RPC methods (ping, fuzz).

workerClient RPC methods now wrap the call method, which handles
marshaling and cancellation.

Both workerClient.call and workerServer.serve should return quickly
when their contexts are cancelled. Turns out, closing the pipe won't
actually unblock a read on all platforms. Instead, we were falling
back to SIGKILL in worker.stop, which works but takes longer than
necessary.

Also fixed missing newline in log message.

Change-Id: I7b5ae54d6eb9afd6361a07759f049f048952e0cc
Reviewed-on: https://go-review.googlesource.com/c/go/+/303429
Trust: Jay Conrod <jayconrod@google.com>
Trust: Katie Hockman <katie@golang.org>
Run-TryBot: Jay Conrod <jayconrod@google.com>
Reviewed-by: Katie Hockman <katie@golang.org>
src/cmd/go/testdata/script/test_fuzz_io_error.txt [new file with mode: 0644]
src/internal/fuzz/fuzz.go
src/internal/fuzz/sys_windows.go
src/internal/fuzz/worker.go
src/testing/fuzz.go

diff --git a/src/cmd/go/testdata/script/test_fuzz_io_error.txt b/src/cmd/go/testdata/script/test_fuzz_io_error.txt
new file mode 100644 (file)
index 0000000..4c7ab4c
--- /dev/null
@@ -0,0 +1,101 @@
+# Test that when the coordinator experiences an I/O error communicating
+# with a worker, the coordinator stops the worker and reports the error.
+# The coordinator should not record a crasher.
+#
+# We simulate an I/O error in the test by writing garbage to fuzz_out.
+# This is unlikely, but possible. It's difficult to simulate interruptions
+# due to ^C and EOF errors which are more common. We don't report those.
+[short] skip
+[!darwin] [!linux] [!windows] skip
+
+# If the I/O error occurs before F.Fuzz is called, the coordinator should
+# stop the worker and say that.
+! go test -fuzz=FuzzClosePipeBefore -parallel=1
+stdout '\s*fuzzing process terminated without fuzzing:'
+! stdout 'communicating with fuzzing process'
+! exists testdata
+
+# If the I/O error occurs after F.Fuzz is called (unlikely), just exit.
+# It's hard to distinguish this case from the worker being interrupted by ^C
+# or exiting with status 0 (which it should do when interrupted by ^C).
+! go test -fuzz=FuzzClosePipeAfter -parallel=1
+stdout '^\s*communicating with fuzzing process: invalid character ''!'' looking for beginning of value$'
+! exists testdata
+
+-- go.mod --
+module test
+
+go 1.17
+-- io_error_test.go --
+package io_error
+
+import (
+       "flag"
+       "testing"
+       "time"
+)
+
+func isWorker() bool {
+       f := flag.Lookup("test.fuzzworker")
+       if f == nil {
+               return false
+       }
+       get, ok := f.Value.(flag.Getter)
+       if !ok {
+               return false
+       }
+       return get.Get() == interface{}(true)
+}
+
+func FuzzClosePipeBefore(f *testing.F) {
+       if isWorker() {
+               sendGarbageToCoordinator(f)
+               time.Sleep(3600 * time.Second) // pause until coordinator terminates the process
+       }
+       f.Fuzz(func(*testing.T, []byte) {})
+}
+
+func FuzzClosePipeAfter(f *testing.F) {
+       f.Fuzz(func(t *testing.T, _ []byte) {
+               if isWorker() {
+                       sendGarbageToCoordinator(t)
+                       time.Sleep(3600 * time.Second) // pause until coordinator terminates the process
+               }
+       })
+}
+-- io_error_windows_test.go --
+package io_error
+
+import (
+       "fmt"
+       "os"
+       "testing"
+)
+
+func sendGarbageToCoordinator(tb testing.TB) {
+       v := os.Getenv("GO_TEST_FUZZ_WORKER_HANDLES")
+       var fuzzInFD, fuzzOutFD uintptr
+       if _, err := fmt.Sscanf(v, "%x,%x", &fuzzInFD, &fuzzOutFD); err != nil {
+               tb.Fatalf("parsing GO_TEST_FUZZ_WORKER_HANDLES: %v", err)
+       }
+       f := os.NewFile(fuzzOutFD, "fuzz_out")
+       if _, err := f.Write([]byte("!!")); err != nil {
+               tb.Fatalf("writing fuzz_out: %v", err)
+       }
+}
+-- io_error_notwindows_test.go --
+// +build !windows
+
+package io_error
+
+import (
+       "os"
+       "testing"
+)
+
+func sendGarbageToCoordinator(tb testing.TB) {
+       f := os.NewFile(4, "fuzz_out")
+       if _, err := f.Write([]byte("!!")); err != nil {
+               tb.Fatalf("writing fuzz_out: %v", err)
+       }
+}
index 293cb48d4db637748a00190ab11de79ec605ebd1..5fa265f8c5b75400b1ab4aae9c5b9376c47b618b 100644 (file)
@@ -86,13 +86,13 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int,
        env := os.Environ() // same as self
 
        c := &coordinator{
-               doneC:        make(chan struct{}),
                inputC:       make(chan CorpusEntry),
                interestingC: make(chan CorpusEntry),
                crasherC:     make(chan crasherEntry),
        }
        errC := make(chan error)
 
+       // newWorker creates a worker but doesn't start it yet.
        newWorker := func() (*worker, error) {
                mem, err := sharedMemTempFile(workerSharedMemSize)
                if err != nil {
@@ -110,17 +110,30 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int,
                }, nil
        }
 
+       // fuzzCtx is used to stop workers, for example, after finding a crasher.
+       fuzzCtx, cancelWorkers := context.WithCancel(ctx)
+       defer cancelWorkers()
+       doneC := ctx.Done()
+
+       // stop is called when a worker encounters a fatal error.
        var fuzzErr error
        stopping := false
        stop := func(err error) {
-               if fuzzErr == nil || fuzzErr == ctx.Err() {
+               if err == fuzzCtx.Err() || isInterruptError(err) {
+                       // Suppress cancellation errors and terminations due to SIGINT.
+                       // The messages are not helpful since either the user triggered the error
+                       // (with ^C) or another more helpful message will be printed (a crasher).
+                       err = nil
+               }
+               if err != nil && (fuzzErr == nil || fuzzErr == ctx.Err()) {
                        fuzzErr = err
                }
                if stopping {
                        return
                }
                stopping = true
-               close(c.doneC)
+               cancelWorkers()
+               doneC = nil
        }
 
        // Start workers.
@@ -135,7 +148,7 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int,
        for i := range workers {
                w := workers[i]
                go func() {
-                       err := w.runFuzzing()
+                       err := w.coordinate(fuzzCtx)
                        cleanErr := w.cleanup()
                        if err == nil {
                                err = cleanErr
@@ -146,17 +159,14 @@ func CoordinateFuzzing(ctx context.Context, timeout time.Duration, parallel int,
 
        // Main event loop.
        // Do not return until all workers have terminated. We avoid a deadlock by
-       // receiving messages from workers even after closing c.doneC.
+       // receiving messages from workers even after ctx is cancelled.
        activeWorkers := len(workers)
        i := 0
        for {
                select {
-               case <-ctx.Done():
+               case <-doneC:
                        // Interrupted, cancelled, or timed out.
-                       // TODO(jayconrod,katiehockman): On Windows, ^C only interrupts 'go test',
-                       // not the coordinator or worker processes. 'go test' will stop running
-                       // actions, but it won't interrupt its child processes. This makes it
-                       // difficult to stop fuzzing on Windows without a timeout.
+                       // stop sets doneC to nil so we don't busy wait here.
                        stop(ctx.Err())
 
                case crasher := <-c.crasherC:
@@ -259,11 +269,6 @@ type crasherEntry struct {
 // coordinator holds channels that workers can use to communicate with
 // the coordinator.
 type coordinator struct {
-       // doneC is closed to indicate fuzzing is done and workers should stop.
-       // doneC may be closed due to a time limit expiring or a fatal error in
-       // a worker.
-       doneC chan struct{}
-
        // inputC is sent values to fuzz by the coordinator. Any worker may receive
        // values from this channel.
        inputC chan CorpusEntry
index e1734af53c628700fffcb0a916df5b399ea17c76..de6af81d9457da18776ead851bced70fa108e2e4 100644 (file)
@@ -135,6 +135,7 @@ func getWorkerComm() (comm workerComm, err error) {
 }
 
 func isInterruptError(err error) bool {
-       // TODO(jayconrod): implement
+       // On Windows, we can't tell whether the process was interrupted by the error
+       // returned by Wait. It looks like an ExitError with status 1.
        return false
 }
index 506a485f24c099f4309bb5eef52bb326d37441f5..2c4cc1f82bb44932b7929caac7e5966ff8fb309b 100644 (file)
@@ -51,10 +51,11 @@ type worker struct {
 
        memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes.
 
-       cmd     *exec.Cmd     // current worker process
-       client  *workerClient // used to communicate with worker process
-       waitErr error         // last error returned by wait, set before termC is closed.
-       termC   chan struct{} // closed by wait when worker process terminates
+       cmd         *exec.Cmd     // current worker process
+       client      *workerClient // used to communicate with worker process
+       waitErr     error         // last error returned by wait, set before termC is closed.
+       interrupted bool          // true after stop interrupts a running worker.
+       termC       chan struct{} // closed by wait when worker process terminates
 }
 
 // cleanup releases persistent resources associated with the worker.
@@ -67,12 +68,12 @@ func (w *worker) cleanup() error {
        return mem.Close()
 }
 
-// runFuzzing runs the test binary to perform fuzzing.
+// coordinate runs the test binary to perform fuzzing.
 //
-// This function loops until w.coordinator.doneC is closed or some
-// fatal error is encountered. It receives inputs from w.coordinator.inputC,
-// then passes those on to the worker process.
-func (w *worker) runFuzzing() error {
+// coordinate loops until ctx is cancelled or a fatal error is encountered. While
+// looping, coordinate receives inputs from w.coordinator.inputC, then passes
+// those on to the worker process.
+func (w *worker) coordinate(ctx context.Context) error {
        // Start the process.
        if err := w.start(); err != nil {
                // We couldn't start the worker process. We can't do anything, and it's
@@ -80,125 +81,113 @@ func (w *worker) runFuzzing() error {
                return err
        }
 
-       // inputC is set to w.coordinator.inputC when the worker is able to process
-       // input. It's nil at other times, so its case won't be selected in the
-       // event loop below.
-       var inputC chan CorpusEntry
-
-       // A value is sent to fuzzC to tell the worker to prepare to process an input
-       // by setting inputC.
-       fuzzC := make(chan struct{}, 1)
-
        // Send the worker a message to make sure it can respond.
        // Errors that occur before we get a response likely indicate that
        // the worker did not call F.Fuzz or called F.Fail first.
        // We don't record crashers for these errors.
-       pinged := false
-       go func() {
-               err := w.client.ping()
-               if err != nil {
-                       w.stop() // trigger termC case below
-                       return
+       if err := w.client.ping(ctx); err != nil {
+               w.stop()
+               if ctx.Err() != nil {
+                       return ctx.Err()
                }
-               pinged = true
-               fuzzC <- struct{}{}
-       }()
+               if isInterruptError(err) {
+                       // User may have pressed ^C before worker responded.
+                       return nil
+               }
+               return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
+               // TODO(jayconrod,katiehockman): record and return stderr.
+       }
 
        // Main event loop.
        for {
                select {
-               case <-w.coordinator.doneC:
-                       // All workers were told to stop.
+               case <-ctx.Done():
+                       // Worker was told to stop.
                        err := w.stop()
-                       if isInterruptError(err) {
-                               // Worker interrupted by SIGINT. This can happen if the worker receives
-                               // SIGINT before installing the signal handler. That's likely if
-                               // TestMain or the fuzz target setup takes a long time.
-                               return nil
+                       if err != nil && !w.interrupted && !isInterruptError(err) {
+                               return err
                        }
-                       return err
+                       return ctx.Err()
 
                case <-w.termC:
-                       // Worker process terminated unexpectedly.
-                       if !pinged {
-                               w.stop()
-                               return fmt.Errorf("worker terminated without fuzzing")
-                               // TODO(jayconrod,katiehockman): record and return stderr.
+                       // Worker process terminated unexpectedly while waiting for input.
+                       err := w.stop()
+                       if w.interrupted {
+                               panic("worker interrupted after unexpected termination")
                        }
-                       if isInterruptError(w.waitErr) {
-                               // Worker interrupted by SIGINT. See comment in doneC case.
-                               w.stop()
+                       if err == nil || isInterruptError(err) {
+                               // Worker stopped, either by exiting with status 0 or after being
+                               // interrupted with a signal that was not sent by the coordinator.
+                               //
+                               // When the user presses ^C, on POSIX platforms, SIGINT is delivered to
+                               // all processes in the group concurrently, and the worker may see it
+                               // before the coordinator. The worker should exit 0 gracefully (in
+                               // theory).
+                               //
+                               // This condition is probably intended by the user, so suppress
+                               // the error.
                                return nil
                        }
-                       if exitErr, ok := w.waitErr.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
-                               w.stop()
-                               return fmt.Errorf("worker exited unexpectedly due to an internal failure")
-                               // TODO(jayconrod,katiehockman): record and return stderr.
-                       }
-
-                       // Unexpected termination. Inform the coordinator about the crash.
-                       // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
-                       mem := <-w.memMu
-                       value := mem.valueCopy()
-                       w.memMu <- mem
-                       message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v\n", w.waitErr)
-                       crasher := crasherEntry{
-                               CorpusEntry: CorpusEntry{Data: value},
-                               errMsg:      message,
+                       if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
+                               // Worker exited with a code indicating F.Fuzz was not called correctly,
+                               // for example, F.Fail was called first.
+                               return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
                        }
-                       w.coordinator.crasherC <- crasher
-                       return w.stop()
+                       // Worker exited non-zero or was terminated by a non-interrupt signal
+                       // (for example, SIGSEGV).
+                       return fmt.Errorf("fuzzing process terminated unexpectedly: %w", err)
+                       // TODO(jayconrod,katiehockman): record and return stderr.
 
-               case input := <-inputC:
+               case input := <-w.coordinator.inputC:
                        // Received input from coordinator.
-                       inputC = nil // block new inputs until we finish with this one.
-                       go func() {
-                               args := fuzzArgs{Duration: workerFuzzDuration}
-                               value, resp, err := w.client.fuzz(input.Data, args)
-                               if err != nil {
-                                       // Error communicating with worker.
-                                       select {
-                                       case <-w.termC:
-                                               // Worker terminated, perhaps unexpectedly.
-                                               // We expect I/O errors due to partially sent or received RPCs,
-                                               // so ignore this error.
-                                       case <-w.coordinator.doneC:
-                                               // Timeout or interruption. Worker may also be interrupted.
-                                               // Again, ignore I/O errors.
-                                       default:
-                                               // TODO(jayconrod): if we get an error here, something failed between
-                                               // main and the call to testing.F.Fuzz. The error here won't
-                                               // be useful. Collect stderr, clean it up, and return that.
-                                               // TODO(jayconrod): we can get EPIPE if w.stop is called concurrently
-                                               // and it kills the worker process. Suppress this message in
-                                               // that case.
-                                               fmt.Fprintf(os.Stderr, "communicating with worker: %v\n", err)
-                                       }
-                                       // TODO(jayconrod): what happens if testing.F.Fuzz is never called?
-                                       // TODO(jayconrod): time out if the test process hangs.
-                               } else if resp.Crashed {
-                                       // The worker found a crasher. Inform the coordinator.
-                                       crasher := crasherEntry{
-                                               CorpusEntry: CorpusEntry{Data: value},
-                                               errMsg:      resp.Err,
-                                       }
-                                       w.coordinator.crasherC <- crasher
-                               } else {
-                                       // Inform the coordinator that fuzzing found something
-                                       // interesting (i.e. new coverage).
-                                       if resp.Interesting {
-                                               w.coordinator.interestingC <- CorpusEntry{Data: value}
-                                       }
-
-                                       // Continue fuzzing.
-                                       fuzzC <- struct{}{}
+                       args := fuzzArgs{Duration: workerFuzzDuration}
+                       value, resp, err := w.client.fuzz(ctx, input.Data, args)
+                       if err != nil {
+                               // Error communicating with worker.
+                               w.stop()
+                               if ctx.Err() != nil {
+                                       // Timeout or interruption.
+                                       return ctx.Err()
+                               }
+                               if w.interrupted {
+                                       // Communication error before we stopped the worker.
+                                       // Report an error, but don't record a crasher.
+                                       return fmt.Errorf("communicating with fuzzing process: %v", err)
+                               }
+                               if w.waitErr == nil || isInterruptError(w.waitErr) {
+                                       // Worker stopped, either by exiting with status 0 or after being
+                                       // interrupted with a signal (not sent by coordinator). See comment in
+                                       // termC case above.
+                                       //
+                                       // Since we expect I/O errors around interrupts, ignore this error.
+                                       return nil
                                }
-                               // TODO(jayconrod,katiehockman): gather statistics.
-                       }()
 
-               case <-fuzzC:
-                       // Worker finished fuzzing and nothing new happened.
-                       inputC = w.coordinator.inputC // unblock new inputs
+                               // Unexpected termination. Inform the coordinator about the crash.
+                               // TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
+                               mem := <-w.memMu
+                               value := mem.valueCopy()
+                               w.memMu <- mem
+                               message := fmt.Sprintf("fuzzing process terminated unexpectedly: %v", w.waitErr)
+                               crasher := crasherEntry{
+                                       CorpusEntry: CorpusEntry{Data: value},
+                                       errMsg:      message,
+                               }
+                               w.coordinator.crasherC <- crasher
+                               return w.waitErr
+                       } else if resp.Crashed {
+                               // The worker found a crasher. Inform the coordinator.
+                               crasher := crasherEntry{
+                                       CorpusEntry: CorpusEntry{Data: value},
+                                       errMsg:      resp.Err,
+                               }
+                               w.coordinator.crasherC <- crasher
+                       } else if resp.Interesting {
+                               // Inform the coordinator that fuzzing found something
+                               // interesting (i.e. new coverage).
+                               w.coordinator.interestingC <- CorpusEntry{Data: value}
+                       }
+                       // TODO(jayconrod,katiehockman): gather statistics.
                }
        }
 }
@@ -218,6 +207,7 @@ func (w *worker) start() (err error) {
                panic("worker already started")
        }
        w.waitErr = nil
+       w.interrupted = false
        w.termC = nil
 
        cmd := exec.Command(w.binPath, w.args...)
@@ -332,6 +322,7 @@ func (w *worker) stop() error {
 
                case <-t.C:
                        // Timer fired before worker terminated.
+                       w.interrupted = true
                        switch sig {
                        case os.Interrupt:
                                // Try to stop the worker with SIGINT and wait a little longer.
@@ -347,7 +338,7 @@ func (w *worker) stop() error {
 
                        case nil:
                                // Still waiting. Print a message to let the user know why.
-                               fmt.Fprintf(os.Stderr, "go: waiting for fuzz worker to terminate...\n")
+                               fmt.Fprintf(os.Stderr, "go: waiting for fuzzing process to terminate...\n")
                        }
                }
        }
@@ -446,49 +437,55 @@ type workerServer struct {
 // does not return errors from method calls; those are passed through serialized
 // responses.
 func (ws *workerServer) serve(ctx context.Context) error {
-       // Stop handling messages when ctx.Done() is closed. This normally happens
-       // when the worker process receives a SIGINT signal, which on POSIX platforms
-       // is sent to the process group when ^C is pressed.
-       //
-       // Ordinarily, the coordinator process may stop a worker by closing fuzz_in.
-       // We simulate that and interrupt a blocked read here.
-       doneC := make(chan struct{})
-       defer func() { close(doneC) }()
+       // This goroutine may stay blocked after serve returns because the underlying
+       // read blocks, even after the file descriptor in this process is closed. The
+       // pipe must be closed by the client, too.
+       errC := make(chan error, 1)
        go func() {
-               select {
-               case <-ctx.Done():
-                       ws.fuzzIn.Close()
-               case <-doneC:
-               }
-       }()
+               enc := json.NewEncoder(ws.fuzzOut)
+               dec := json.NewDecoder(ws.fuzzIn)
+               for {
+                       if ctx.Err() != nil {
+                               return
+                       }
 
-       enc := json.NewEncoder(ws.fuzzOut)
-       dec := json.NewDecoder(ws.fuzzIn)
-       for {
-               var c call
-               if err := dec.Decode(&c); err != nil {
+                       var c call
+                       if err := dec.Decode(&c); err == io.EOF {
+                               return
+                       } else if err != nil {
+                               errC <- err
+                               return
+                       }
                        if ctx.Err() != nil {
-                               return ctx.Err()
-                       } else if err == io.EOF {
-                               return nil
-                       } else {
-                               return err
+                               return
                        }
-               }
 
-               var resp interface{}
-               switch {
-               case c.Fuzz != nil:
-                       resp = ws.fuzz(ctx, *c.Fuzz)
-               case c.Ping != nil:
-                       resp = ws.ping(ctx, *c.Ping)
-               default:
-                       return errors.New("no arguments provided for any call")
-               }
+                       var resp interface{}
+                       switch {
+                       case c.Fuzz != nil:
+                               resp = ws.fuzz(ctx, *c.Fuzz)
+                       case c.Ping != nil:
+                               resp = ws.ping(ctx, *c.Ping)
+                       default:
+                               errC <- errors.New("no arguments provided for any call")
+                               return
+                       }
 
-               if err := enc.Encode(resp); err != nil {
-                       return err
+                       if err := enc.Encode(resp); err != nil {
+                               errC <- err
+                               return
+                       }
                }
+       }()
+
+       select {
+       case <-ctx.Done():
+               // Stop handling messages when ctx.Done() is closed. This normally happens
+               // when the worker process receives a SIGINT signal, which on POSIX platforms
+               // is sent to the process group when ^C is pressed.
+               return ctx.Err()
+       case err := <-errC:
+               return err
        }
 }
 
@@ -691,7 +688,7 @@ func (wc *workerClient) Close() error {
 var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
 
 // fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
-func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
+func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs) (valueOut []byte, resp fuzzResponse, err error) {
        wc.mu.Lock()
        defer wc.mu.Unlock()
 
@@ -703,11 +700,7 @@ func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, re
        wc.memMu <- mem
 
        c := call{Fuzz: &args}
-       if err := wc.enc.Encode(c); err != nil {
-               return nil, fuzzResponse{}, err
-       }
-       err = wc.dec.Decode(&resp)
-
+       err = wc.call(ctx, c, &resp)
        mem, ok = <-wc.memMu
        if !ok {
                return nil, fuzzResponse{}, errSharedMemClosed
@@ -719,14 +712,31 @@ func (wc *workerClient) fuzz(valueIn []byte, args fuzzArgs) (valueOut []byte, re
 }
 
 // ping tells the worker to call the ping method. See workerServer.ping.
-func (wc *workerClient) ping() error {
+func (wc *workerClient) ping(ctx context.Context) error {
        c := call{Ping: &pingArgs{}}
-       if err := wc.enc.Encode(c); err != nil {
-               return err
-       }
        var resp pingResponse
-       if err := wc.dec.Decode(&resp); err != nil {
+       return wc.call(ctx, c, &resp)
+}
+
+// call sends an RPC from the coordinator to the worker process and waits for
+// the response. The call may be cancelled with ctx.
+func (wc *workerClient) call(ctx context.Context, c call, resp interface{}) (err error) {
+       // This goroutine may stay blocked after call returns because the underlying
+       // read blocks, even after the file descriptor in this process is closed. The
+       // pipe must be closed by the server, too.
+       errC := make(chan error, 1)
+       go func() {
+               if err := wc.enc.Encode(c); err != nil {
+                       errC <- err
+                       return
+               }
+               errC <- wc.dec.Decode(resp)
+       }()
+
+       select {
+       case <-ctx.Done():
+               return ctx.Err()
+       case err := <-errC:
                return err
        }
-       return nil
 }
index 2a0754fdd7b283e9092dd834a05b97a176622d84..73ac59cfb4c60c71ab3944bba1533ed7a5070c3e 100644 (file)
@@ -362,7 +362,7 @@ func (f *F) Fuzz(ff interface{}) {
                if err != nil {
                        f.result = FuzzResult{Error: err}
                        f.Fail()
-                       fmt.Fprintf(f.w, "%v", err)
+                       fmt.Fprintf(f.w, "%v\n", err)
                        if crashErr, ok := err.(fuzzCrashError); ok {
                                crashName := crashErr.CrashName()
                                fmt.Fprintf(f.w, "Crash written to %s\n", filepath.Join("testdata/corpus", f.name, crashName))