]> Cypherpunks repositories - gostls13.git/commitdiff
io: revised Pipe implementation
authorRuss Cox <rsc@golang.org>
Tue, 2 Feb 2010 01:43:15 +0000 (17:43 -0800)
committerRuss Cox <rsc@golang.org>
Tue, 2 Feb 2010 01:43:15 +0000 (17:43 -0800)
* renamed channels to say what gets sent
* use channel closed status instead of racy check of boolean

R=nigeltao_golang
CC=golang-dev
https://golang.org/cl/196065

src/pkg/io/pipe.go

index 909989ae6adc8d3cf14afb7f9cc223947e319407..fe7063446806647ac972f3c3c04b62343eaebab3 100644 (file)
@@ -2,8 +2,8 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// Pipe adapter to connect code expecting an io.Read
-// with code expecting an io.Write.
+// Pipe adapter to connect code expecting an io.Reader
+// with code expecting an io.Writer.
 
 package io
 
@@ -12,54 +12,43 @@ import (
        "sync"
 )
 
-type pipeReturn struct {
-       n   int
-       err os.Error
-}
-
 // Shared pipe structure.
 type pipe struct {
-       rclosed bool            // Read end closed?
-       rerr    os.Error        // Error supplied to CloseReader
-       wclosed bool            // Write end closed?
-       werr    os.Error        // Error supplied to CloseWriter
-       wpend   []byte          // Written data waiting to be read.
-       wtot    int             // Bytes consumed so far in current write.
-       cr      chan []byte     // Write sends data here...
-       cw      chan pipeReturn // ... and reads the n, err back from here.
+       rclosed bool        // Read end closed?
+       rerr    os.Error    // Error supplied to CloseReader
+       wclosed bool        // Write end closed?
+       werr    os.Error    // Error supplied to CloseWriter
+       wpend   []byte      // Written data waiting to be read.
+       wtot    int         // Bytes consumed so far in current write.
+       cw      chan []byte // Write sends data here...
+       cr      chan bool   // ... and reads a done notification from here.
 }
 
 func (p *pipe) Read(data []byte) (n int, err os.Error) {
-       if p == nil || p.rclosed {
+       if p.rclosed {
                return 0, os.EINVAL
        }
 
        // Wait for next write block if necessary.
        if p.wpend == nil {
-               if !p.wclosed {
-                       p.wpend = <-p.cr
+               if !closed(p.cw) {
+                       p.wpend = <-p.cw
                }
-               if p.wclosed {
+               if closed(p.cw) {
                        return 0, p.werr
                }
                p.wtot = 0
        }
 
        // Read from current write block.
-       n = len(data)
-       if n > len(p.wpend) {
-               n = len(p.wpend)
-       }
-       for i := 0; i < n; i++ {
-               data[i] = p.wpend[i]
-       }
+       n = copy(data, p.wpend)
        p.wtot += n
        p.wpend = p.wpend[n:]
 
        // If write block is done, finish the write.
        if len(p.wpend) == 0 {
                p.wpend = nil
-               p.cw <- pipeReturn{p.wtot, nil}
+               p.cr <- true
                p.wtot = 0
        }
 
@@ -67,58 +56,52 @@ func (p *pipe) Read(data []byte) (n int, err os.Error) {
 }
 
 func (p *pipe) Write(data []byte) (n int, err os.Error) {
-       if p == nil || p.wclosed {
+       if p.wclosed {
                return 0, os.EINVAL
        }
-       if p.rclosed {
+       if closed(p.cr) {
                return 0, p.rerr
        }
 
-       // Send data to reader.
-       p.cr <- data
+       // Send write to reader.
+       p.cw <- data
 
        // Wait for reader to finish copying it.
-       res := <-p.cw
-       return res.n, res.err
+       <-p.cr
+       if closed(p.cr) {
+               _, _ = <-p.cw // undo send if reader is gone
+               return 0, p.rerr
+       }
+       return len(data), nil
 }
 
 func (p *pipe) CloseReader(rerr os.Error) os.Error {
-       if p == nil || p.rclosed {
+       if p.rclosed {
                return os.EINVAL
        }
-
-       // Stop any future writes.
        p.rclosed = true
+
+       // Wake up writes.
        if rerr == nil {
                rerr = os.EPIPE
        }
        p.rerr = rerr
-
-       // Stop the current write.
-       if !p.wclosed {
-               p.cw <- pipeReturn{p.wtot, rerr}
-       }
-
+       close(p.cr)
        return nil
 }
 
 func (p *pipe) CloseWriter(werr os.Error) os.Error {
-       if werr == nil {
-               werr = os.EOF
-       }
-       if p == nil || p.wclosed {
+       if p.wclosed {
                return os.EINVAL
        }
-
-       // Stop any future reads.
        p.wclosed = true
-       p.werr = werr
 
-       // Stop the current read.
-       if !p.rclosed {
-               p.cr <- nil
+       // Wake up reads.
+       if werr == nil {
+               werr = os.EOF
        }
-
+       p.werr = werr
+       close(p.cw)
        return nil
 }
 
@@ -212,12 +195,9 @@ func (w *PipeWriter) finish() { w.Close() }
 // Reads on one end are matched with writes on the other,
 // copying data directly between the two; there is no internal buffering.
 func Pipe() (*PipeReader, *PipeWriter) {
-       p := new(pipe)
-       p.cr = make(chan []byte, 1)
-       p.cw = make(chan pipeReturn, 1)
-       r := new(PipeReader)
-       r.p = p
-       w := new(PipeWriter)
-       w.p = p
-       return r, w
+       p := &pipe{
+               cw: make(chan []byte, 1),
+               cr: make(chan bool, 1),
+       }
+       return &PipeReader{p: p}, &PipeWriter{p: p}
 }