// does not return errors from method calls; those are passed through serialized
// responses.
func (ws *workerServer) serve(ctx context.Context) error {
- // This goroutine may stay blocked after serve returns because the underlying
- // read blocks, even after the file descriptor in this process is closed. The
- // pipe must be closed by the client, too.
- errC := make(chan error, 1)
- go func() {
- enc := json.NewEncoder(ws.fuzzOut)
- dec := json.NewDecoder(ws.fuzzIn)
- for {
- if ctx.Err() != nil {
- return
- }
-
- var c call
- if err := dec.Decode(&c); err == io.EOF {
- return
- } else if err != nil {
- errC <- err
- return
- }
- if ctx.Err() != nil {
- return
- }
-
- var resp interface{}
- switch {
- case c.Fuzz != nil:
- resp = ws.fuzz(ctx, *c.Fuzz)
- case c.Minimize != nil:
- resp = ws.minimize(ctx, *c.Minimize)
- case c.Ping != nil:
- resp = ws.ping(ctx, *c.Ping)
- default:
- errC <- errors.New("no arguments provided for any call")
- return
+ enc := json.NewEncoder(ws.fuzzOut)
+ dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn})
+ for {
+ var c call
+ if err := dec.Decode(&c); err != nil {
+ if err == io.EOF || err == ctx.Err() {
+ return nil
+ } else {
+ return err
}
+ }
- if err := enc.Encode(resp); err != nil {
- errC <- err
- return
- }
+ var resp interface{}
+ switch {
+ case c.Fuzz != nil:
+ resp = ws.fuzz(ctx, *c.Fuzz)
+ case c.Minimize != nil:
+ resp = ws.minimize(ctx, *c.Minimize)
+ case c.Ping != nil:
+ resp = ws.ping(ctx, *c.Ping)
+ default:
+ return errors.New("no arguments provided for any call")
}
- }()
- select {
- case <-ctx.Done():
- // Stop handling messages when ctx.Done() is closed. This normally happens
- // when the worker process receives a SIGINT signal, which on POSIX platforms
- // is sent to the process group when ^C is pressed.
- return ctx.Err()
- case err := <-errC:
- return err
+ if err := enc.Encode(resp); err != nil {
+ return err
+ }
}
}
// workerServer).
type workerClient struct {
workerComm
-
- mu sync.Mutex
- enc *json.Encoder
- dec *json.Decoder
+ mu sync.Mutex
}
func newWorkerClient(comm workerComm) *workerClient {
- return &workerClient{
- workerComm: comm,
- enc: json.NewEncoder(comm.fuzzIn),
- dec: json.NewDecoder(comm.fuzzOut),
- }
+ return &workerClient{workerComm: comm}
}
// Close shuts down the connection to the RPC server (the worker process) by
wc.memMu <- mem
c := call{Minimize: &args}
- err = wc.call(ctx, c, &resp)
+ err = wc.callLocked(ctx, c, &resp)
mem, ok = <-wc.memMu
if !ok {
return nil, minimizeResponse{}, errSharedMemClosed
wc.memMu <- mem
c := call{Fuzz: &args}
- err = wc.call(ctx, c, &resp)
+ err = wc.callLocked(ctx, c, &resp)
mem, ok = <-wc.memMu
if !ok {
return nil, fuzzResponse{}, errSharedMemClosed
// ping tells the worker to call the ping method. See workerServer.ping.
func (wc *workerClient) ping(ctx context.Context) error {
+ wc.mu.Lock()
+ defer wc.mu.Unlock()
c := call{Ping: &pingArgs{}}
var resp pingResponse
- return wc.call(ctx, c, &resp)
+ return wc.callLocked(ctx, c, &resp)
+}
+
+// callLocked sends an RPC from the coordinator to the worker process and waits
+// for the response. The callLocked may be cancelled with ctx.
+func (wc *workerClient) callLocked(ctx context.Context, c call, resp interface{}) (err error) {
+ enc := json.NewEncoder(wc.fuzzIn)
+ dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut})
+ if err := enc.Encode(c); err != nil {
+ return err
+ }
+ return dec.Decode(resp)
}
-// call sends an RPC from the coordinator to the worker process and waits for
-// the response. The call may be cancelled with ctx.
-func (wc *workerClient) call(ctx context.Context, c call, resp interface{}) (err error) {
- // This goroutine may stay blocked after call returns because the underlying
- // read blocks, even after the file descriptor in this process is closed. The
- // pipe must be closed by the server, too.
- errC := make(chan error, 1)
+// contextReader wraps a Reader with a Context. If the context is cancelled
+// while the underlying reader is blocked, Read returns immediately.
+//
+// This is useful for reading from a pipe. Closing a pipe file descriptor does
+// not unblock pending Reads on that file descriptor. All copies of the pipe's
+// other file descriptor (the write end) must be closed in all processes that
+// inherit it. This is difficult to do correctly in the situation we care about
+// (process group termination).
+type contextReader struct {
+ ctx context.Context
+ r io.Reader
+}
+
+func (cr *contextReader) Read(b []byte) (n int, err error) {
+ if err := cr.ctx.Err(); err != nil {
+ return 0, err
+ }
+ done := make(chan struct{})
+
+ // This goroutine may stay blocked after Read returns because the underlying
+ // read is blocked.
go func() {
- if err := wc.enc.Encode(c); err != nil {
- errC <- err
- return
- }
- errC <- wc.dec.Decode(resp)
+ n, err = cr.r.Read(b)
+ close(done)
}()
select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-errC:
- return err
+ case <-cr.ctx.Done():
+ return 0, cr.ctx.Err()
+ case <-done:
+ return n, err
}
}