From: Jay Conrod Date: Mon, 21 Jun 2021 22:22:32 +0000 (-0700) Subject: [dev.fuzz] internal/fuzz: fix race in worker RPC logic X-Git-Tag: go1.18beta1~1282^2~38 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=3fa42437b5d6326aa1ae04726ad4319459280433;p=gostls13.git [dev.fuzz] internal/fuzz: fix race in worker RPC logic We want worker RPCs to return as soon as the context is cancelled, which happens if the user presses ^C, we hit the time limit, or another worker discovers a crasher. RPCs typically block when reading pipes: the server waits for call arguments from the client, and the client waits for results from the server. Since io.Reader.Read doesn't accept a context.Context and reads on pipe file descriptors are difficult to reliably unblock, we've done this by calling Read in a goroutine, and returning from the parent function when ctx.Done() is closed, even if the underlying goroutine isn't finished. In workerServer.serve, we also called the fuzz function in the same goroutine. This resulted in a bug: serve could return while the fuzz function was still running. The fuzz function could observe side effects from cleanup functions registered with F.Cleanup. This change refactors read cancellation logic into contextReader. Only the underlying Read is done in a goroutine. workerServe.serve won't return while the fuzz function is running. Fixes #46632 Change-Id: Id1ed31f6521155c7c8e76dd52a2d70aa93cab201 Reviewed-on: https://go-review.googlesource.com/c/go/+/329920 Trust: Jay Conrod Trust: Katie Hockman Run-TryBot: Jay Conrod TryBot-Result: Go Bot Reviewed-by: Katie Hockman --- diff --git a/src/internal/fuzz/worker.go b/src/internal/fuzz/worker.go index 34871024c3..e1fc999104 100644 --- a/src/internal/fuzz/worker.go +++ b/src/internal/fuzz/worker.go @@ -577,57 +577,33 @@ type workerServer struct { // does not return errors from method calls; those are passed through serialized // responses. func (ws *workerServer) serve(ctx context.Context) error { - // This goroutine may stay blocked after serve returns because the underlying - // read blocks, even after the file descriptor in this process is closed. The - // pipe must be closed by the client, too. - errC := make(chan error, 1) - go func() { - enc := json.NewEncoder(ws.fuzzOut) - dec := json.NewDecoder(ws.fuzzIn) - for { - if ctx.Err() != nil { - return - } - - var c call - if err := dec.Decode(&c); err == io.EOF { - return - } else if err != nil { - errC <- err - return - } - if ctx.Err() != nil { - return - } - - var resp interface{} - switch { - case c.Fuzz != nil: - resp = ws.fuzz(ctx, *c.Fuzz) - case c.Minimize != nil: - resp = ws.minimize(ctx, *c.Minimize) - case c.Ping != nil: - resp = ws.ping(ctx, *c.Ping) - default: - errC <- errors.New("no arguments provided for any call") - return + enc := json.NewEncoder(ws.fuzzOut) + dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn}) + for { + var c call + if err := dec.Decode(&c); err != nil { + if err == io.EOF || err == ctx.Err() { + return nil + } else { + return err } + } - if err := enc.Encode(resp); err != nil { - errC <- err - return - } + var resp interface{} + switch { + case c.Fuzz != nil: + resp = ws.fuzz(ctx, *c.Fuzz) + case c.Minimize != nil: + resp = ws.minimize(ctx, *c.Minimize) + case c.Ping != nil: + resp = ws.ping(ctx, *c.Ping) + default: + return errors.New("no arguments provided for any call") } - }() - select { - case <-ctx.Done(): - // Stop handling messages when ctx.Done() is closed. This normally happens - // when the worker process receives a SIGINT signal, which on POSIX platforms - // is sent to the process group when ^C is pressed. - return ctx.Err() - case err := <-errC: - return err + if err := enc.Encode(resp); err != nil { + return err + } } } @@ -871,18 +847,11 @@ func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse { // workerServer). type workerClient struct { workerComm - - mu sync.Mutex - enc *json.Encoder - dec *json.Decoder + mu sync.Mutex } func newWorkerClient(comm workerComm) *workerClient { - return &workerClient{ - workerComm: comm, - enc: json.NewEncoder(comm.fuzzIn), - dec: json.NewDecoder(comm.fuzzOut), - } + return &workerClient{workerComm: comm} } // Close shuts down the connection to the RPC server (the worker process) by @@ -932,7 +901,7 @@ func (wc *workerClient) minimize(ctx context.Context, valueIn []byte, args minim wc.memMu <- mem c := call{Minimize: &args} - err = wc.call(ctx, c, &resp) + err = wc.callLocked(ctx, c, &resp) mem, ok = <-wc.memMu if !ok { return nil, minimizeResponse{}, errSharedMemClosed @@ -958,7 +927,7 @@ func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs) wc.memMu <- mem c := call{Fuzz: &args} - err = wc.call(ctx, c, &resp) + err = wc.callLocked(ctx, c, &resp) mem, ok = <-wc.memMu if !ok { return nil, fuzzResponse{}, errSharedMemClosed @@ -972,30 +941,54 @@ func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs) // ping tells the worker to call the ping method. See workerServer.ping. func (wc *workerClient) ping(ctx context.Context) error { + wc.mu.Lock() + defer wc.mu.Unlock() c := call{Ping: &pingArgs{}} var resp pingResponse - return wc.call(ctx, c, &resp) + return wc.callLocked(ctx, c, &resp) +} + +// callLocked sends an RPC from the coordinator to the worker process and waits +// for the response. The callLocked may be cancelled with ctx. +func (wc *workerClient) callLocked(ctx context.Context, c call, resp interface{}) (err error) { + enc := json.NewEncoder(wc.fuzzIn) + dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut}) + if err := enc.Encode(c); err != nil { + return err + } + return dec.Decode(resp) } -// call sends an RPC from the coordinator to the worker process and waits for -// the response. The call may be cancelled with ctx. -func (wc *workerClient) call(ctx context.Context, c call, resp interface{}) (err error) { - // This goroutine may stay blocked after call returns because the underlying - // read blocks, even after the file descriptor in this process is closed. The - // pipe must be closed by the server, too. - errC := make(chan error, 1) +// contextReader wraps a Reader with a Context. If the context is cancelled +// while the underlying reader is blocked, Read returns immediately. +// +// This is useful for reading from a pipe. Closing a pipe file descriptor does +// not unblock pending Reads on that file descriptor. All copies of the pipe's +// other file descriptor (the write end) must be closed in all processes that +// inherit it. This is difficult to do correctly in the situation we care about +// (process group termination). +type contextReader struct { + ctx context.Context + r io.Reader +} + +func (cr *contextReader) Read(b []byte) (n int, err error) { + if err := cr.ctx.Err(); err != nil { + return 0, err + } + done := make(chan struct{}) + + // This goroutine may stay blocked after Read returns because the underlying + // read is blocked. go func() { - if err := wc.enc.Encode(c); err != nil { - errC <- err - return - } - errC <- wc.dec.Decode(resp) + n, err = cr.r.Read(b) + close(done) }() select { - case <-ctx.Done(): - return ctx.Err() - case err := <-errC: - return err + case <-cr.ctx.Done(): + return 0, cr.ctx.Err() + case <-done: + return n, err } }