]> Cypherpunks repositories - gostls13.git/commitdiff
Publish types PipeReader and PipeWriter
authorRuss Cox <rsc@golang.org>
Sun, 7 Jun 2009 04:51:05 +0000 (21:51 -0700)
committerRuss Cox <rsc@golang.org>
Sun, 7 Jun 2009 04:51:05 +0000 (21:51 -0700)
to expose new CloseWithError methods.

R=r
DELTA=161  (72 added, 15 deleted, 74 changed)
OCL=29980
CL=30003

src/lib/io/pipe.go
src/lib/io/pipe_test.go

index 5f9e7a488c0766f35bc45eef65d1c000d052212a..1a443ddcec292a0cce4c8056eabf413b38ea2cd3 100644 (file)
@@ -21,7 +21,9 @@ type pipeReturn struct {
 // Shared pipe structure.
 type pipe struct {
        rclosed bool;           // Read end closed?
+       rerr os.Error;          // Error supplied to CloseReader
        wclosed bool;           // Write end closed?
+       werr os.Error;          // Error supplied to CloseWriter
        wpend []byte;           // Written data waiting to be read.
        wtot int;               // Bytes consumed so far in current write.
        cr chan []byte;         // Write sends data here...
@@ -39,7 +41,7 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) {
                        p.wpend = <-p.cr;
                }
                if p.wpend == nil {
-                       return 0, nil;
+                       return 0, p.werr;
                }
                p.wtot = 0;
        }
@@ -70,7 +72,7 @@ func (p *pipe) Write(data []byte) (n int, err os.Error) {
                return 0, os.EINVAL;
        }
        if p.rclosed {
-               return 0, os.EPIPE;
+               return 0, p.rerr;
        }
 
        // Send data to reader.
@@ -81,29 +83,34 @@ func (p *pipe) Write(data []byte) (n int, err os.Error) {
        return res.n, res.err;
 }
 
-func (p *pipe) CloseReader() os.Error {
+func (p *pipe) CloseReader(rerr os.Error) os.Error {
        if p == nil || p.rclosed {
                return os.EINVAL;
        }
 
        // Stop any future writes.
        p.rclosed = true;
+       if rerr == nil {
+               rerr = os.EPIPE;
+       }
+       p.rerr = rerr;
 
        // Stop the current write.
        if !p.wclosed {
-               p.cw <- pipeReturn{p.wtot, os.EPIPE};
+               p.cw <- pipeReturn{p.wtot, rerr};
        }
 
        return nil;
 }
 
-func (p *pipe) CloseWriter() os.Error {
+func (p *pipe) CloseWriter(werr os.Error) os.Error {
        if p == nil || p.wclosed {
                return os.EINVAL;
        }
 
        // Stop any future reads.
        p.wclosed = true;
+       p.werr = werr;
 
        // Stop the current read.
        if !p.rclosed {
@@ -121,70 +128,98 @@ func (p *pipe) CloseWriter() os.Error {
 //  2.  Clients cannot use interface conversions on the
 //      read end to find the Write method, and vice versa.
 
-// Read half of pipe.
-type pipeRead struct {
+// A PipeReader is the read half of a pipe.
+type PipeReader struct {
        lock sync.Mutex;
        p *pipe;
 }
 
-func (r *pipeRead) Read(data []byte) (n int, err os.Error) {
+// Read implements the standard Read interface:
+// it reads data from the pipe, blocking until a writer
+// arrives or the write end is closed.
+// If the write end is closed with an error, that error is
+// returned as err; otherwise err is nil.
+func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
        r.lock.Lock();
        defer r.lock.Unlock();
 
        return r.p.Read(data);
 }
 
-func (r *pipeRead) Close() os.Error {
+// Close closes the reader; subsequent writes to the
+// write half of the pipe will return the error os.EPIPE.
+func (r *PipeReader) Close() os.Error {
+       r.lock.Lock();
+       defer r.lock.Unlock();
+
+       return r.p.CloseReader(nil);
+}
+
+// CloseWithError closes the reader; subsequent writes
+// to the write half of the pipe will return the error rerr.
+func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
        r.lock.Lock();
        defer r.lock.Unlock();
 
-       return r.p.CloseReader();
+       return r.p.CloseReader(rerr);
 }
 
-func (r *pipeRead) finish() {
+func (r *PipeReader) finish() {
        r.Close();
 }
 
 // Write half of pipe.
-type pipeWrite struct {
+type PipeWriter struct {
        lock sync.Mutex;
        p *pipe;
 }
 
-func (w *pipeWrite) Write(data []byte) (n int, err os.Error) {
+// Write implements the standard Write interface:
+// it writes data to the pipe, blocking until readers
+// have consumed all the data or the read end is closed.
+// If the read end is closed with an error, that err is
+// returned as err; otherwise err is os.EPIPE.
+func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
        w.lock.Lock();
        defer w.lock.Unlock();
 
        return w.p.Write(data);
 }
 
-func (w *pipeWrite) Close() os.Error {
+// Close closes the writer; subsequent reads from the
+// read half of the pipe will return no bytes and a nil error.
+func (w *PipeWriter) Close() os.Error {
+       w.lock.Lock();
+       defer w.lock.Unlock();
+
+       return w.p.CloseWriter(nil);
+}
+
+// CloseWithError closes the writer; subsequent reads from the
+// read half of the pipe will return no bytes and the error werr.
+func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
        w.lock.Lock();
        defer w.lock.Unlock();
 
-       return w.p.CloseWriter();
+       return w.p.CloseWriter(werr);
 }
 
-func (w *pipeWrite) finish() {
+func (w *PipeWriter) finish() {
        w.Close();
 }
 
 // Pipe creates a synchronous in-memory pipe.
-// Used to connect code expecting an io.Reader
+// It can be used to connect code expecting an io.Reader
 // with code expecting an io.Writer.
-//
-// Reads on one end are matched by writes on the other.
-// Writes don't complete until all the data has been
-// written or the read end is closed.  Reads return
-// any available data or block until the next write
-// or the write end is closed.
-func Pipe() (io.ReadCloser, io.WriteCloser) {
+// Reads on one end are matched with writes on the other,
+// copying data directly between the two; there is no internal buffering.
+func Pipe() (*PipeReader, *PipeWriter) {
        p := new(pipe);
        p.cr = make(chan []byte, 1);
        p.cw = make(chan pipeReturn, 1);
-       r := new(pipeRead);
+       r := new(PipeReader);
        r.p = p;
-       w := new(pipeWrite);
+       w := new(PipeWriter);
        w.p = p;
        return r, w;
 }
index 3df66962854f5170870ef88afe7c0c3c11fbd568..277f445250c81614e2cb059c940ab265c0d2160c 100644 (file)
@@ -5,6 +5,7 @@
 package io
 
 import (
+       "fmt";
        "io";
        "os";
        "testing";
@@ -132,72 +133,93 @@ func TestPipe3(t *testing.T) {
 
 // Test read after/before writer close.
 
-func delayClose(t *testing.T, cl Closer, ch chan int) {
-       time.Sleep(1000*1000);  // 1 ms
-       if err := cl.Close(); err != nil {
-               t.Errorf("delayClose: %v", err);
-       }
-       ch <- 0;
+type closer interface {
+       CloseWithError(os.Error) os.Error;
+       Close() os.Error;
 }
 
-func testPipeReadClose(t *testing.T, async bool) {
-       c := make(chan int, 1);
-       r, w := Pipe();
-       if async {
-               go delayClose(t, w, c);
-       } else {
-               delayClose(t, w, c);
-       }
-       var buf = make([]byte, 64);
-       n, err := r.Read(buf);
-       <-c;
-       if err != nil {
-               t.Errorf("read from closed pipe: %v", err);
-       }
-       if n != 0 {
-               t.Errorf("read on closed pipe returned %d", n);
-       }
-       if err = r.Close(); err != nil {
-               t.Errorf("r.Close: %v", err);
-       }
+type pipeTest struct {
+       async bool;
+       err os.Error;
+       closeWithError bool;
 }
 
-// Test write after/before reader close.
+func (p pipeTest) String() string {
+       return fmt.Sprintf("async=%v err=%v closeWithError=%v", p.async, p.err, p.closeWithError);
+}
 
-func testPipeWriteClose(t *testing.T, async bool) {
-       c := make(chan int, 1);
-       r, w := Pipe();
-       if async {
-               go delayClose(t, r, c);
+var pipeTests = []pipeTest {
+       pipeTest{ true, nil, false },
+       pipeTest{ true, nil, true },
+       pipeTest{ true, io.ErrShortWrite, true },
+       pipeTest{ false, nil, false },
+       pipeTest{ false, nil, true },
+       pipeTest{ false, io.ErrShortWrite, true },
+}
+
+func delayClose(t *testing.T, cl closer, ch chan int, tt pipeTest) {
+       time.Sleep(1e6);        // 1 ms
+       var err os.Error;
+       if tt.closeWithError {
+               err = cl.CloseWithError(tt.err);
        } else {
-               delayClose(t, r, c);
-       }
-       n, err := WriteString(w, "hello, world");
-       <-c;
-       if err != os.EPIPE {
-               t.Errorf("write on closed pipe: %v", err);
-       }
-       if n != 0 {
-               t.Errorf("write on closed pipe returned %d", n);
+               err = cl.Close();
        }
-       if err = w.Close(); err != nil {
-               t.Errorf("w.Close: %v", err);
+       if err != nil {
+               t.Errorf("delayClose: %v", err);
        }
+       ch <- 0;
 }
 
-func TestPipeReadCloseAsync(t *testing.T) {
-       testPipeReadClose(t, true);
-}
-
-func TestPipeReadCloseSync(t *testing.T) {
-       testPipeReadClose(t, false);
+func TestPipeReadClose(t *testing.T) {
+       for _, tt := range pipeTests {
+               c := make(chan int, 1);
+               r, w := Pipe();
+               if tt.async {
+                       go delayClose(t, w, c, tt);
+               } else {
+                       delayClose(t, w, c, tt);
+               }
+               var buf = make([]byte, 64);
+               n, err := r.Read(buf);
+               <-c;
+               if err != tt.err {
+                       t.Errorf("read from closed pipe: %v want %v", err, tt.err);
+               }
+               if n != 0 {
+                       t.Errorf("read on closed pipe returned %d", n);
+               }
+               if err = r.Close(); err != nil {
+                       t.Errorf("r.Close: %v", err);
+               }
+       }
 }
 
-func TestPipeWriteCloseAsync(t *testing.T) {
-       testPipeWriteClose(t, true);
-}
+// Test write after/before reader close.
 
-func TestPipeWriteCloseSync(t *testing.T) {
-       testPipeWriteClose(t, false);
+func TestPipeWriteClose(t *testing.T) {
+       for _, tt := range pipeTests {
+               c := make(chan int, 1);
+               r, w := Pipe();
+               if tt.async {
+                       go delayClose(t, r, c, tt);
+               } else {
+                       delayClose(t, r, c, tt);
+               }
+               n, err := WriteString(w, "hello, world");
+               <-c;
+               expect := tt.err;
+               if expect == nil {
+                       expect = os.EPIPE;
+               }
+               if err != expect {
+                       t.Errorf("write on closed pipe: %v want %v", err, expect);
+               }
+               if n != 0 {
+                       t.Errorf("write on closed pipe returned %d", n);
+               }
+               if err = w.Close(); err != nil {
+                       t.Errorf("w.Close: %v", err);
+               }
+       }
 }
-