// Shared pipe structure.
type pipe struct {
rclosed bool; // Read end closed?
+ rerr os.Error; // Error supplied to CloseReader
wclosed bool; // Write end closed?
+ werr os.Error; // Error supplied to CloseWriter
wpend []byte; // Written data waiting to be read.
wtot int; // Bytes consumed so far in current write.
cr chan []byte; // Write sends data here...
p.wpend = <-p.cr;
}
if p.wpend == nil {
- return 0, nil;
+ return 0, p.werr;
}
p.wtot = 0;
}
return 0, os.EINVAL;
}
if p.rclosed {
- return 0, os.EPIPE;
+ return 0, p.rerr;
}
// Send data to reader.
return res.n, res.err;
}
-func (p *pipe) CloseReader() os.Error {
+func (p *pipe) CloseReader(rerr os.Error) os.Error {
if p == nil || p.rclosed {
return os.EINVAL;
}
// Stop any future writes.
p.rclosed = true;
+ if rerr == nil {
+ rerr = os.EPIPE;
+ }
+ p.rerr = rerr;
// Stop the current write.
if !p.wclosed {
- p.cw <- pipeReturn{p.wtot, os.EPIPE};
+ p.cw <- pipeReturn{p.wtot, rerr};
}
return nil;
}
-func (p *pipe) CloseWriter() os.Error {
+func (p *pipe) CloseWriter(werr os.Error) os.Error {
if p == nil || p.wclosed {
return os.EINVAL;
}
// Stop any future reads.
p.wclosed = true;
+ p.werr = werr;
// Stop the current read.
if !p.rclosed {
// 2. Clients cannot use interface conversions on the
// read end to find the Write method, and vice versa.
-// Read half of pipe.
-type pipeRead struct {
+// A PipeReader is the read half of a pipe.
+type PipeReader struct {
lock sync.Mutex;
p *pipe;
}
-func (r *pipeRead) Read(data []byte) (n int, err os.Error) {
+// Read implements the standard Read interface:
+// it reads data from the pipe, blocking until a writer
+// arrives or the write end is closed.
+// If the write end is closed with an error, that error is
+// returned as err; otherwise err is nil.
+func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
r.lock.Lock();
defer r.lock.Unlock();
return r.p.Read(data);
}
-func (r *pipeRead) Close() os.Error {
+// Close closes the reader; subsequent writes to the
+// write half of the pipe will return the error os.EPIPE.
+func (r *PipeReader) Close() os.Error {
+ r.lock.Lock();
+ defer r.lock.Unlock();
+
+ return r.p.CloseReader(nil);
+}
+
+// CloseWithError closes the reader; subsequent writes
+// to the write half of the pipe will return the error rerr.
+func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
r.lock.Lock();
defer r.lock.Unlock();
- return r.p.CloseReader();
+ return r.p.CloseReader(rerr);
}
-func (r *pipeRead) finish() {
+func (r *PipeReader) finish() {
r.Close();
}
// Write half of pipe.
-type pipeWrite struct {
+type PipeWriter struct {
lock sync.Mutex;
p *pipe;
}
-func (w *pipeWrite) Write(data []byte) (n int, err os.Error) {
+// Write implements the standard Write interface:
+// it writes data to the pipe, blocking until readers
+// have consumed all the data or the read end is closed.
+// If the read end is closed with an error, that err is
+// returned as err; otherwise err is os.EPIPE.
+func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
w.lock.Lock();
defer w.lock.Unlock();
return w.p.Write(data);
}
-func (w *pipeWrite) Close() os.Error {
+// Close closes the writer; subsequent reads from the
+// read half of the pipe will return no bytes and a nil error.
+func (w *PipeWriter) Close() os.Error {
+ w.lock.Lock();
+ defer w.lock.Unlock();
+
+ return w.p.CloseWriter(nil);
+}
+
+// CloseWithError closes the writer; subsequent reads from the
+// read half of the pipe will return no bytes and the error werr.
+func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
w.lock.Lock();
defer w.lock.Unlock();
- return w.p.CloseWriter();
+ return w.p.CloseWriter(werr);
}
-func (w *pipeWrite) finish() {
+func (w *PipeWriter) finish() {
w.Close();
}
// Pipe creates a synchronous in-memory pipe.
-// Used to connect code expecting an io.Reader
+// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
-//
-// Reads on one end are matched by writes on the other.
-// Writes don't complete until all the data has been
-// written or the read end is closed. Reads return
-// any available data or block until the next write
-// or the write end is closed.
-func Pipe() (io.ReadCloser, io.WriteCloser) {
+// Reads on one end are matched with writes on the other,
+// copying data directly between the two; there is no internal buffering.
+func Pipe() (*PipeReader, *PipeWriter) {
p := new(pipe);
p.cr = make(chan []byte, 1);
p.cw = make(chan pipeReturn, 1);
- r := new(pipeRead);
+ r := new(PipeReader);
r.p = p;
- w := new(pipeWrite);
+ w := new(PipeWriter);
w.p = p;
return r, w;
}
package io
import (
+ "fmt";
"io";
"os";
"testing";
// Test read after/before writer close.
-func delayClose(t *testing.T, cl Closer, ch chan int) {
- time.Sleep(1000*1000); // 1 ms
- if err := cl.Close(); err != nil {
- t.Errorf("delayClose: %v", err);
- }
- ch <- 0;
+type closer interface {
+ CloseWithError(os.Error) os.Error;
+ Close() os.Error;
}
-func testPipeReadClose(t *testing.T, async bool) {
- c := make(chan int, 1);
- r, w := Pipe();
- if async {
- go delayClose(t, w, c);
- } else {
- delayClose(t, w, c);
- }
- var buf = make([]byte, 64);
- n, err := r.Read(buf);
- <-c;
- if err != nil {
- t.Errorf("read from closed pipe: %v", err);
- }
- if n != 0 {
- t.Errorf("read on closed pipe returned %d", n);
- }
- if err = r.Close(); err != nil {
- t.Errorf("r.Close: %v", err);
- }
+type pipeTest struct {
+ async bool;
+ err os.Error;
+ closeWithError bool;
}
-// Test write after/before reader close.
+func (p pipeTest) String() string {
+ return fmt.Sprintf("async=%v err=%v closeWithError=%v", p.async, p.err, p.closeWithError);
+}
-func testPipeWriteClose(t *testing.T, async bool) {
- c := make(chan int, 1);
- r, w := Pipe();
- if async {
- go delayClose(t, r, c);
+var pipeTests = []pipeTest {
+ pipeTest{ true, nil, false },
+ pipeTest{ true, nil, true },
+ pipeTest{ true, io.ErrShortWrite, true },
+ pipeTest{ false, nil, false },
+ pipeTest{ false, nil, true },
+ pipeTest{ false, io.ErrShortWrite, true },
+}
+
+func delayClose(t *testing.T, cl closer, ch chan int, tt pipeTest) {
+ time.Sleep(1e6); // 1 ms
+ var err os.Error;
+ if tt.closeWithError {
+ err = cl.CloseWithError(tt.err);
} else {
- delayClose(t, r, c);
- }
- n, err := WriteString(w, "hello, world");
- <-c;
- if err != os.EPIPE {
- t.Errorf("write on closed pipe: %v", err);
- }
- if n != 0 {
- t.Errorf("write on closed pipe returned %d", n);
+ err = cl.Close();
}
- if err = w.Close(); err != nil {
- t.Errorf("w.Close: %v", err);
+ if err != nil {
+ t.Errorf("delayClose: %v", err);
}
+ ch <- 0;
}
-func TestPipeReadCloseAsync(t *testing.T) {
- testPipeReadClose(t, true);
-}
-
-func TestPipeReadCloseSync(t *testing.T) {
- testPipeReadClose(t, false);
+func TestPipeReadClose(t *testing.T) {
+ for _, tt := range pipeTests {
+ c := make(chan int, 1);
+ r, w := Pipe();
+ if tt.async {
+ go delayClose(t, w, c, tt);
+ } else {
+ delayClose(t, w, c, tt);
+ }
+ var buf = make([]byte, 64);
+ n, err := r.Read(buf);
+ <-c;
+ if err != tt.err {
+ t.Errorf("read from closed pipe: %v want %v", err, tt.err);
+ }
+ if n != 0 {
+ t.Errorf("read on closed pipe returned %d", n);
+ }
+ if err = r.Close(); err != nil {
+ t.Errorf("r.Close: %v", err);
+ }
+ }
}
-func TestPipeWriteCloseAsync(t *testing.T) {
- testPipeWriteClose(t, true);
-}
+// Test write after/before reader close.
-func TestPipeWriteCloseSync(t *testing.T) {
- testPipeWriteClose(t, false);
+func TestPipeWriteClose(t *testing.T) {
+ for _, tt := range pipeTests {
+ c := make(chan int, 1);
+ r, w := Pipe();
+ if tt.async {
+ go delayClose(t, r, c, tt);
+ } else {
+ delayClose(t, r, c, tt);
+ }
+ n, err := WriteString(w, "hello, world");
+ <-c;
+ expect := tt.err;
+ if expect == nil {
+ expect = os.EPIPE;
+ }
+ if err != expect {
+ t.Errorf("write on closed pipe: %v want %v", err, expect);
+ }
+ if n != 0 {
+ t.Errorf("write on closed pipe returned %d", n);
+ }
+ if err = w.Close(); err != nil {
+ t.Errorf("w.Close: %v", err);
+ }
+ }
}
-