]> Cypherpunks repositories - gostls13.git/commitdiff
[dev.fuzz] internal/fuzz: fix race in worker RPC logic
authorJay Conrod <jayconrod@google.com>
Mon, 21 Jun 2021 22:22:32 +0000 (15:22 -0700)
committerJay Conrod <jayconrod@google.com>
Wed, 23 Jun 2021 16:05:25 +0000 (16:05 +0000)
We want worker RPCs to return as soon as the context is cancelled,
which happens if the user presses ^C, we hit the time limit, or
another worker discovers a crasher. RPCs typically block when reading
pipes: the server waits for call arguments from the client, and the
client waits for results from the server.

Since io.Reader.Read doesn't accept a context.Context and reads on
pipe file descriptors are difficult to reliably unblock, we've done
this by calling Read in a goroutine, and returning from the parent
function when ctx.Done() is closed, even if the underlying goroutine
isn't finished.

In workerServer.serve, we also called the fuzz function in the same
goroutine. This resulted in a bug: serve could return while the fuzz
function was still running. The fuzz function could observe side
effects from cleanup functions registered with F.Cleanup.

This change refactors read cancellation logic into contextReader. Only
the underlying Read is done in a goroutine. workerServe.serve won't
return while the fuzz function is running.

Fixes #46632

Change-Id: Id1ed31f6521155c7c8e76dd52a2d70aa93cab201
Reviewed-on: https://go-review.googlesource.com/c/go/+/329920
Trust: Jay Conrod <jayconrod@google.com>
Trust: Katie Hockman <katie@golang.org>
Run-TryBot: Jay Conrod <jayconrod@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Katie Hockman <katie@golang.org>
src/internal/fuzz/worker.go

index 34871024c3ba70c8aba53d58abc85978d6ecff15..e1fc999104bb79031ac17dbf8bd978bd865bee10 100644 (file)
@@ -577,57 +577,33 @@ type workerServer struct {
 // does not return errors from method calls; those are passed through serialized
 // responses.
 func (ws *workerServer) serve(ctx context.Context) error {
-       // This goroutine may stay blocked after serve returns because the underlying
-       // read blocks, even after the file descriptor in this process is closed. The
-       // pipe must be closed by the client, too.
-       errC := make(chan error, 1)
-       go func() {
-               enc := json.NewEncoder(ws.fuzzOut)
-               dec := json.NewDecoder(ws.fuzzIn)
-               for {
-                       if ctx.Err() != nil {
-                               return
-                       }
-
-                       var c call
-                       if err := dec.Decode(&c); err == io.EOF {
-                               return
-                       } else if err != nil {
-                               errC <- err
-                               return
-                       }
-                       if ctx.Err() != nil {
-                               return
-                       }
-
-                       var resp interface{}
-                       switch {
-                       case c.Fuzz != nil:
-                               resp = ws.fuzz(ctx, *c.Fuzz)
-                       case c.Minimize != nil:
-                               resp = ws.minimize(ctx, *c.Minimize)
-                       case c.Ping != nil:
-                               resp = ws.ping(ctx, *c.Ping)
-                       default:
-                               errC <- errors.New("no arguments provided for any call")
-                               return
+       enc := json.NewEncoder(ws.fuzzOut)
+       dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn})
+       for {
+               var c call
+               if err := dec.Decode(&c); err != nil {
+                       if err == io.EOF || err == ctx.Err() {
+                               return nil
+                       } else {
+                               return err
                        }
+               }
 
-                       if err := enc.Encode(resp); err != nil {
-                               errC <- err
-                               return
-                       }
+               var resp interface{}
+               switch {
+               case c.Fuzz != nil:
+                       resp = ws.fuzz(ctx, *c.Fuzz)
+               case c.Minimize != nil:
+                       resp = ws.minimize(ctx, *c.Minimize)
+               case c.Ping != nil:
+                       resp = ws.ping(ctx, *c.Ping)
+               default:
+                       return errors.New("no arguments provided for any call")
                }
-       }()
 
-       select {
-       case <-ctx.Done():
-               // Stop handling messages when ctx.Done() is closed. This normally happens
-               // when the worker process receives a SIGINT signal, which on POSIX platforms
-               // is sent to the process group when ^C is pressed.
-               return ctx.Err()
-       case err := <-errC:
-               return err
+               if err := enc.Encode(resp); err != nil {
+                       return err
+               }
        }
 }
 
@@ -871,18 +847,11 @@ func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse {
 // workerServer).
 type workerClient struct {
        workerComm
-
-       mu  sync.Mutex
-       enc *json.Encoder
-       dec *json.Decoder
+       mu sync.Mutex
 }
 
 func newWorkerClient(comm workerComm) *workerClient {
-       return &workerClient{
-               workerComm: comm,
-               enc:        json.NewEncoder(comm.fuzzIn),
-               dec:        json.NewDecoder(comm.fuzzOut),
-       }
+       return &workerClient{workerComm: comm}
 }
 
 // Close shuts down the connection to the RPC server (the worker process) by
@@ -932,7 +901,7 @@ func (wc *workerClient) minimize(ctx context.Context, valueIn []byte, args minim
        wc.memMu <- mem
 
        c := call{Minimize: &args}
-       err = wc.call(ctx, c, &resp)
+       err = wc.callLocked(ctx, c, &resp)
        mem, ok = <-wc.memMu
        if !ok {
                return nil, minimizeResponse{}, errSharedMemClosed
@@ -958,7 +927,7 @@ func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs)
        wc.memMu <- mem
 
        c := call{Fuzz: &args}
-       err = wc.call(ctx, c, &resp)
+       err = wc.callLocked(ctx, c, &resp)
        mem, ok = <-wc.memMu
        if !ok {
                return nil, fuzzResponse{}, errSharedMemClosed
@@ -972,30 +941,54 @@ func (wc *workerClient) fuzz(ctx context.Context, valueIn []byte, args fuzzArgs)
 
 // ping tells the worker to call the ping method. See workerServer.ping.
 func (wc *workerClient) ping(ctx context.Context) error {
+       wc.mu.Lock()
+       defer wc.mu.Unlock()
        c := call{Ping: &pingArgs{}}
        var resp pingResponse
-       return wc.call(ctx, c, &resp)
+       return wc.callLocked(ctx, c, &resp)
+}
+
+// callLocked sends an RPC from the coordinator to the worker process and waits
+// for the response. The callLocked may be cancelled with ctx.
+func (wc *workerClient) callLocked(ctx context.Context, c call, resp interface{}) (err error) {
+       enc := json.NewEncoder(wc.fuzzIn)
+       dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut})
+       if err := enc.Encode(c); err != nil {
+               return err
+       }
+       return dec.Decode(resp)
 }
 
-// call sends an RPC from the coordinator to the worker process and waits for
-// the response. The call may be cancelled with ctx.
-func (wc *workerClient) call(ctx context.Context, c call, resp interface{}) (err error) {
-       // This goroutine may stay blocked after call returns because the underlying
-       // read blocks, even after the file descriptor in this process is closed. The
-       // pipe must be closed by the server, too.
-       errC := make(chan error, 1)
+// contextReader wraps a Reader with a Context. If the context is cancelled
+// while the underlying reader is blocked, Read returns immediately.
+//
+// This is useful for reading from a pipe. Closing a pipe file descriptor does
+// not unblock pending Reads on that file descriptor. All copies of the pipe's
+// other file descriptor (the write end) must be closed in all processes that
+// inherit it. This is difficult to do correctly in the situation we care about
+// (process group termination).
+type contextReader struct {
+       ctx context.Context
+       r   io.Reader
+}
+
+func (cr *contextReader) Read(b []byte) (n int, err error) {
+       if err := cr.ctx.Err(); err != nil {
+               return 0, err
+       }
+       done := make(chan struct{})
+
+       // This goroutine may stay blocked after Read returns because the underlying
+       // read is blocked.
        go func() {
-               if err := wc.enc.Encode(c); err != nil {
-                       errC <- err
-                       return
-               }
-               errC <- wc.dec.Decode(resp)
+               n, err = cr.r.Read(b)
+               close(done)
        }()
 
        select {
-       case <-ctx.Done():
-               return ctx.Err()
-       case err := <-errC:
-               return err
+       case <-cr.ctx.Done():
+               return 0, cr.ctx.Err()
+       case <-done:
+               return n, err
        }
 }