From 35f3b7053addf842690162d4d4937c0fbf438c50 Mon Sep 17 00:00:00 2001 From: Jay Conrod Date: Thu, 3 Dec 2020 13:05:14 -0500 Subject: [PATCH] [dev.fuzz] internal/fuzz: add mutex to workerClient This prevents workerClient.Close from closing fuzzIn while workerClient.fuzz is writing to it concurrently. It also prevents multiple callers from writing to fuzzIn concurrently, though there's nothing that does that yet. This should prevent most "broken pipe" errors, though they may still be possible if worker.stop is called and it needs to kill the process due to a timeout. In the future, we should detect and ignore those errors, but for now, they're useful for debugging. Also, improve documentation on workerClient and workerServer. Change-Id: Ie2c870392d5e91674d3b1e32b2fa4f9de9ac3eb0 Reviewed-on: https://go-review.googlesource.com/c/go/+/275173 Run-TryBot: Jay Conrod TryBot-Result: Go Bot Trust: Jay Conrod Reviewed-by: Katie Hockman --- src/internal/fuzz/worker.go | 47 +++++++++++++++++++++++++++++-------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/src/internal/fuzz/worker.go b/src/internal/fuzz/worker.go index a194a5f9be..148cc6dae9 100644 --- a/src/internal/fuzz/worker.go +++ b/src/internal/fuzz/worker.go @@ -13,6 +13,7 @@ import ( "os" "os/exec" "runtime" + "sync" "time" ) @@ -102,6 +103,9 @@ func (w *worker) runFuzzing() error { // TODO(jayconrod): if we get an error here, something failed between // main and the call to testing.F.Fuzz. The error here won't // be useful. Collect stderr, clean it up, and return that. + // TODO(jayconrod): we can get EPIPE if w.stop is called concurrently + // and it kills the worker process. Suppress this message in + // that case. // TODO(jayconrod): what happens if testing.F.Fuzz is never called? // TODO(jayconrod): time out if the test process hangs. fmt.Fprintf(os.Stderr, "communicating with worker: %v\n", err) @@ -284,7 +288,7 @@ func RunFuzzWorker(fn func([]byte) error) error { if err != nil { return err } - srv := &workerServer{workerComm: comm, fn: fn} + srv := &workerServer{workerComm: comm, fuzzFn: fn} return srv.serve() } @@ -304,23 +308,36 @@ type fuzzResponse struct { Err string } -// workerComm holds objects needed for the worker client and server -// to communicate. +// workerComm holds pipes and shared memory used for communication +// between the coordinator process (client) and a worker process (server). type workerComm struct { fuzzIn, fuzzOut *os.File mem *sharedMem } -// workerServer is a minimalist RPC server, run in fuzz worker processes. +// workerServer is a minimalist RPC server, run by fuzz worker processes. +// It allows the coordinator process (using workerClient) to call methods in a +// worker process. This system allows the coordinator to run multiple worker +// processes in parallel and to collect inputs that caused crashes from shared +// memory after a worker process terminates unexpectedly. type workerServer struct { workerComm - fn func([]byte) error + + // fuzzFn runs the worker's fuzz function on the given input and returns + // an error if it finds a crasher (the process may also exit or crash). + fuzzFn func([]byte) error } -// serve deserializes and executes RPCs on a given pair of pipes. +// serve reads serialized RPC messages on fuzzIn. When serve receives a message, +// it calls the corresponding method, then sends the serialized result back +// on fuzzOut. +// +// serve handles RPC calls synchronously; it will not attempt to read a message +// until the previous call has finished. // -// serve returns errors communicating over the pipes. It does not return -// errors from methods; those are passed through response values. +// serve returns errors that occurred when communicating over pipes. serve +// does not return errors from method calls; those are passed through serialized +// responses. func (ws *workerServer) serve() error { enc := json.NewEncoder(ws.fuzzOut) dec := json.NewDecoder(ws.fuzzIn) @@ -358,7 +375,7 @@ func (ws *workerServer) fuzz(value []byte, args fuzzArgs) fuzzResponse { return fuzzResponse{} default: b := mutate(value) - if err := ws.fn(b); err != nil { + if err := ws.fuzzFn(b); err != nil { return fuzzResponse{Crasher: b, Err: err.Error()} } // TODO(jayconrod,katiehockman): return early if coverage is expanded @@ -366,9 +383,13 @@ func (ws *workerServer) fuzz(value []byte, args fuzzArgs) fuzzResponse { } } -// workerClient is a minimalist RPC client, run in the fuzz coordinator. +// workerClient is a minimalist RPC client. The coordinator process uses a +// workerClient to call methods in each worker process (handled by +// workerServer). type workerClient struct { workerComm + + mu sync.Mutex enc *json.Encoder dec *json.Decoder } @@ -385,6 +406,9 @@ func newWorkerClient(comm workerComm) *workerClient { // closing fuzz_in. Close drains fuzz_out (avoiding a SIGPIPE in the worker), // and closes it after the worker process closes the other end. func (wc *workerClient) Close() error { + wc.mu.Lock() + defer wc.mu.Unlock() + // Close fuzzIn. This signals to the server that there are no more calls, // and it should exit. if err := wc.fuzzIn.Close(); err != nil { @@ -403,6 +427,9 @@ func (wc *workerClient) Close() error { // fuzz tells the worker to call the fuzz method. See workerServer.fuzz. func (wc *workerClient) fuzz(value []byte, args fuzzArgs) (fuzzResponse, error) { + wc.mu.Lock() + defer wc.mu.Unlock() + wc.mem.setValue(value) c := call{Fuzz: &args} if err := wc.enc.Encode(c); err != nil { -- 2.48.1