From e2dd8ca946be884bb877e074a21727f1a685a706 Mon Sep 17 00:00:00 2001 From: Joe Tsai Date: Thu, 23 Feb 2017 17:13:09 -0800 Subject: [PATCH] net: implement deadline functionality on Pipe Implement deadline functionality on Pipe so that it properly implements the semantics of the Conn interface. This aids usages of Pipe (often in unit tests) with a more realistic and complete implementation. The new implementation avoids a dependency on a io.Pipe since it is impossible to keep the prior semantics of synchronous reads and writes while also trying to implement cancelation over an io.{Reader,Writer} that fundamentally has no cancelation support. The fact that net.Pipe is synchronous (and documented as such) is unfortunate because no realistic network connection is synchronous. Instead real networks introduces a read and write buffer of some sort. However, we do not change the semantics for backwards compatibility. The approach taken does not leave any long-running goroutines, meaning that tests that never call Close will not cause a resource leak. Fixes #18170 Change-Id: I5140b1f289a0a49fb2d485f031b5aa0ee99ecc30 Reviewed-on: https://go-review.googlesource.com/37402 Run-TryBot: Joe Tsai TryBot-Result: Gobot Gobot Reviewed-by: Ian Lance Taylor --- src/net/pipe.go | 240 ++++++++++++++++++++++++++++++++++++++----- src/net/pipe_test.go | 56 +++------- 2 files changed, 224 insertions(+), 72 deletions(-) diff --git a/src/net/pipe.go b/src/net/pipe.go index 37e552f54e..38d0f42a70 100644 --- a/src/net/pipe.go +++ b/src/net/pipe.go @@ -5,63 +5,247 @@ package net import ( - "errors" "io" + "sync" "time" ) +// pipeDeadline is an abstraction for handling timeouts. +type pipeDeadline struct { + mu sync.Mutex // Guards timer and cancel + timer *time.Timer + cancel chan struct{} // Must be non-nil +} + +func makePipeDeadline() pipeDeadline { + return pipeDeadline{cancel: make(chan struct{})} +} + +// set sets the point in time when the deadline will time out. +// A timeout event is signaled by closing the channel returned by waiter. +// Once a timeout has occurred, the deadline can be refreshed by specifying a +// t value in the future. +// +// A zero value for t prevents timeout. +func (d *pipeDeadline) set(t time.Time) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil && !d.timer.Stop() { + <-d.cancel // Wait for the timer callback to finish and close cancel + } + d.timer = nil + + // Time is zero, then there is no deadline. + closed := isClosedChan(d.cancel) + if t.IsZero() { + if closed { + d.cancel = make(chan struct{}) + } + return + } + + // Time in the future, setup a timer to cancel in the future. + if dur := time.Until(t); dur > 0 { + if closed { + d.cancel = make(chan struct{}) + } + d.timer = time.AfterFunc(dur, func() { + close(d.cancel) + }) + return + } + + // Time in the past, so close immediately. + if !closed { + close(d.cancel) + } +} + +// wait returns a channel that is closed when the deadline is exceeded. +func (d *pipeDeadline) wait() chan struct{} { + d.mu.Lock() + defer d.mu.Unlock() + return d.cancel +} + +func isClosedChan(c <-chan struct{}) bool { + select { + case <-c: + return true + default: + return false + } +} + +type pipeError struct { + errStr string + timeout bool +} + +func (pe pipeError) Error() string { return pe.errStr } +func (pe pipeError) Timeout() bool { return pe.timeout } +func (pe pipeError) Temporary() bool { return pe.timeout } + +var ( + errDeadline = pipeError{"deadline exceeded", true} + errClosed = pipeError{"closed connection", false} +) + +type pipeAddr struct{} + +func (pipeAddr) Network() string { return "pipe" } +func (pipeAddr) String() string { return "pipe" } + +type pipe struct { + wrMu sync.Mutex // Serialize Write operations + + // Used by local Read to interact with remote Write. + // Successful receive on rdRx is always followed by send on rdTx. + rdRx <-chan []byte + rdTx chan<- int + + // Used by local Write to interact with remote Read. + // Successful send on wrTx is always followed by receive on wrRx. + wrTx chan<- []byte + wrRx <-chan int + + once sync.Once // Protects closing localDone + localDone chan struct{} + remoteDone <-chan struct{} + + readDeadline pipeDeadline + writeDeadline pipeDeadline +} + // Pipe creates a synchronous, in-memory, full duplex // network connection; both ends implement the Conn interface. // Reads on one end are matched with writes on the other, // copying data directly between the two; there is no internal // buffering. func Pipe() (Conn, Conn) { - r1, w1 := io.Pipe() - r2, w2 := io.Pipe() + cb1 := make(chan []byte) + cb2 := make(chan []byte) + cn1 := make(chan int) + cn2 := make(chan int) + done1 := make(chan struct{}) + done2 := make(chan struct{}) - return &pipe{r1, w2}, &pipe{r2, w1} -} - -type pipe struct { - *io.PipeReader - *io.PipeWriter + p1 := &pipe{ + rdRx: cb1, rdTx: cn1, + wrTx: cb2, wrRx: cn2, + localDone: done1, remoteDone: done2, + readDeadline: makePipeDeadline(), + writeDeadline: makePipeDeadline(), + } + p2 := &pipe{ + rdRx: cb2, rdTx: cn2, + wrTx: cb1, wrRx: cn1, + localDone: done2, remoteDone: done1, + readDeadline: makePipeDeadline(), + writeDeadline: makePipeDeadline(), + } + return p1, p2 } -type pipeAddr int +func (*pipe) LocalAddr() Addr { return pipeAddr{} } +func (*pipe) RemoteAddr() Addr { return pipeAddr{} } -func (pipeAddr) Network() string { - return "pipe" +func (p *pipe) Read(b []byte) (int, error) { + n, err := p.read(b) + if err != nil && err != io.EOF { + err = &OpError{Op: "read", Net: "pipe", Err: err} + } + return n, err } -func (pipeAddr) String() string { - return "pipe" -} +func (p *pipe) read(b []byte) (n int, err error) { + switch { + case isClosedChan(p.localDone): + return 0, errClosed + case isClosedChan(p.remoteDone): + return 0, io.EOF + case isClosedChan(p.readDeadline.wait()): + return 0, errDeadline + } -func (p *pipe) Close() error { - err := p.PipeReader.Close() - err1 := p.PipeWriter.Close() - if err == nil { - err = err1 + select { + case bw := <-p.rdRx: + nr := copy(b, bw) + p.rdTx <- nr + return nr, nil + case <-p.localDone: + return 0, errClosed + case <-p.remoteDone: + return 0, io.EOF + case <-p.readDeadline.wait(): + return 0, errDeadline } - return err } -func (p *pipe) LocalAddr() Addr { - return pipeAddr(0) +func (p *pipe) Write(b []byte) (int, error) { + n, err := p.write(b) + if err != nil { + err = &OpError{Op: "write", Net: "pipe", Err: err} + } + return n, err } -func (p *pipe) RemoteAddr() Addr { - return pipeAddr(0) +func (p *pipe) write(b []byte) (n int, err error) { + switch { + case isClosedChan(p.localDone): + return 0, errClosed + case isClosedChan(p.remoteDone): + return 0, errClosed + case isClosedChan(p.writeDeadline.wait()): + return 0, errDeadline + } + + p.wrMu.Lock() // Ensure entirety of b is written together + defer p.wrMu.Unlock() + for once := true; once || len(b) > 0; once = false { + select { + case p.wrTx <- b: + nw := <-p.wrRx + b = b[nw:] + n += nw + case <-p.localDone: + return n, errClosed + case <-p.remoteDone: + return n, errClosed + case <-p.writeDeadline.wait(): + return n, errDeadline + } + } + return n, nil } func (p *pipe) SetDeadline(t time.Time) error { - return &OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} + if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { + return &OpError{Op: "set", Net: "pipe", Err: errClosed} + } + p.readDeadline.set(t) + p.writeDeadline.set(t) + return nil } func (p *pipe) SetReadDeadline(t time.Time) error { - return &OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} + if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { + return &OpError{Op: "set", Net: "pipe", Err: errClosed} + } + p.readDeadline.set(t) + return nil } func (p *pipe) SetWriteDeadline(t time.Time) error { - return &OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} + if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { + return &OpError{Op: "set", Net: "pipe", Err: errClosed} + } + p.writeDeadline.set(t) + return nil +} + +func (p *pipe) Close() error { + p.once.Do(func() { close(p.localDone) }) + return nil } diff --git a/src/net/pipe_test.go b/src/net/pipe_test.go index e3172d882f..382eca7520 100644 --- a/src/net/pipe_test.go +++ b/src/net/pipe_test.go @@ -2,54 +2,22 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -package net +package net_test import ( - "bytes" - "io" + "net" "testing" -) - -func checkPipeWrite(t *testing.T, w io.Writer, data []byte, c chan int) { - n, err := w.Write(data) - if err != nil { - t.Error(err) - } - if n != len(data) { - t.Errorf("short write: %d != %d", n, len(data)) - } - c <- 0 -} -func checkPipeRead(t *testing.T, r io.Reader, data []byte, wantErr error) { - buf := make([]byte, len(data)+10) - n, err := r.Read(buf) - if err != wantErr { - t.Error(err) - return - } - if n != len(data) || !bytes.Equal(buf[0:n], data) { - t.Errorf("bad read: got %q", buf[0:n]) - return - } -} + "golang_org/x/net/nettest" +) -// TestPipe tests a simple read/write/close sequence. -// Assumes that the underlying io.Pipe implementation -// is solid and we're just testing the net wrapping. func TestPipe(t *testing.T) { - c := make(chan int) - cli, srv := Pipe() - go checkPipeWrite(t, cli, []byte("hello, world"), c) - checkPipeRead(t, srv, []byte("hello, world"), nil) - <-c - go checkPipeWrite(t, srv, []byte("line 2"), c) - checkPipeRead(t, cli, []byte("line 2"), nil) - <-c - go checkPipeWrite(t, cli, []byte("a third line"), c) - checkPipeRead(t, srv, []byte("a third line"), nil) - <-c - go srv.Close() - checkPipeRead(t, cli, nil, io.EOF) - cli.Close() + nettest.TestConn(t, func() (c1, c2 net.Conn, stop func(), err error) { + c1, c2 = net.Pipe() + stop = func() { + c1.Close() + c2.Close() + } + return + }) } -- 2.48.1