]> Cypherpunks repositories - gostls13.git/commitdiff
io: reimplement Pipe
authorRuss Cox <rsc@golang.org>
Mon, 7 Mar 2011 15:37:28 +0000 (10:37 -0500)
committerRuss Cox <rsc@golang.org>
Mon, 7 Mar 2011 15:37:28 +0000 (10:37 -0500)
No known bugs in the current pipe,
but this one is simpler and easier to
understand.

R=iant
CC=golang-dev
https://golang.org/cl/4252057

src/pkg/io/pipe.go

index df76418b93da55d722fbd3090c14efcf3eadb545..00be8efa2e2058908ad29dc0171fc4d5af6f4536 100644 (file)
@@ -9,7 +9,6 @@ package io
 
 import (
        "os"
-       "runtime"
        "sync"
 )
 
@@ -18,208 +17,114 @@ type pipeResult struct {
        err os.Error
 }
 
-// Shared pipe structure.
+// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
 type pipe struct {
-       // Reader sends on cr1, receives on cr2.
-       // Writer does the same on cw1, cw2.
-       r1, w1 chan []byte
-       r2, w2 chan pipeResult
-
-       rclose chan os.Error // read close; error to return to writers
-       wclose chan os.Error // write close; error to return to readers
-
-       done chan int // read or write half is done
-}
-
-func (p *pipe) run() {
-       var (
-               rb    []byte      // pending Read
-               wb    []byte      // pending Write
-               wn    int         // amount written so far from wb
-               rerr  os.Error    // if read end is closed, error to send to writers
-               werr  os.Error    // if write end is closed, error to send to readers
-               r1    chan []byte // p.cr1 or nil depending on whether Read is ok
-               w1    chan []byte // p.cw1 or nil depending on whether Write is ok
-               ndone int
-       )
-
-       // Read and Write are enabled at the start.
-       r1 = p.r1
-       w1 = p.w1
-
+       rl    sync.Mutex // gates readers one at a time
+       wl    sync.Mutex // gates writers one at a time
+       l     sync.Mutex // protects remaining fields
+       data  []byte     // data remaining in pending write
+       rwait sync.Cond  // waiting reader
+       wwait sync.Cond  // waiting writer
+       rerr  os.Error   // if reader closed, error to give writes
+       werr  os.Error   // if writer closed, error to give reads
+}
+
+func (p *pipe) read(b []byte) (n int, err os.Error) {
+       // One reader at a time.
+       p.rl.Lock()
+       defer p.rl.Unlock()
+
+       p.l.Lock()
+       defer p.l.Unlock()
        for {
-               select {
-               case <-p.done:
-                       if ndone++; ndone == 2 {
-                               // both reader and writer are gone
-                               // close out any existing i/o
-                               if r1 == nil {
-                                       p.r2 <- pipeResult{0, os.EINVAL}
-                               }
-                               if w1 == nil {
-                                       p.w2 <- pipeResult{0, os.EINVAL}
-                               }
-                               return
-                       }
-                       continue
-               case rerr = <-p.rclose:
-                       if w1 == nil {
-                               // finish pending Write
-                               p.w2 <- pipeResult{wn, rerr}
-                               wn = 0
-                               w1 = p.w1 // allow another Write
-                       }
-                       if r1 == nil {
-                               // Close of read side during Read.
-                               // finish pending Read with os.EINVAL.
-                               p.r2 <- pipeResult{0, os.EINVAL}
-                               r1 = p.r1 // allow another Read
-                       }
-                       continue
-               case werr = <-p.wclose:
-                       if r1 == nil {
-                               // finish pending Read
-                               p.r2 <- pipeResult{0, werr}
-                               r1 = p.r1 // allow another Read
-                       }
-                       if w1 == nil {
-                               // Close of write side during Write.
-                               // finish pending Write with os.EINVAL.
-                               p.w2 <- pipeResult{wn, os.EINVAL}
-                               wn = 0
-                               w1 = p.w1 // allow another Write
-                       }
-                       continue
-               case rb = <-r1:
-                       if werr != nil {
-                               // write end is closed
-                               p.r2 <- pipeResult{0, werr}
-                               continue
-                       }
-                       if rerr != nil {
-                               // read end is closed
-                               p.r2 <- pipeResult{0, os.EINVAL}
-                               continue
-                       }
-                       r1 = nil // disable Read until this one is done
-               case wb = <-w1:
-                       if rerr != nil {
-                               // read end is closed
-                               p.w2 <- pipeResult{0, rerr}
-                               continue
-                       }
-                       if werr != nil {
-                               // write end is closed
-                               p.w2 <- pipeResult{0, os.EINVAL}
-                               continue
-                       }
-                       w1 = nil // disable Write until this one is done
+               if p.rerr != nil {
+                       return 0, os.EINVAL
                }
-
-               if r1 == nil && w1 == nil {
-                       // Have rb and wb.  Execute.
-                       n := copy(rb, wb)
-                       wn += n
-                       wb = wb[n:]
-
-                       // Finish Read.
-                       p.r2 <- pipeResult{n, nil}
-                       r1 = p.r1 // allow another Read
-
-                       // Maybe finish Write.
-                       if len(wb) == 0 {
-                               p.w2 <- pipeResult{wn, nil}
-                               wn = 0
-                               w1 = p.w1 // allow another Write
-                       }
+               if p.data != nil {
+                       break
                }
+               if p.werr != nil {
+                       return 0, p.werr
+               }
+               p.rwait.Wait()
+       }
+       n = copy(b, p.data)
+       p.data = p.data[n:]
+       if len(p.data) == 0 {
+               p.data = nil
+               p.wwait.Signal()
        }
+       return
 }
 
-// Read/write halves of the pipe.
-// They are separate structures for two reasons:
-//  1.  If one end becomes garbage without being Closed,
-//      its finalizer can Close so that the other end
-//      does not hang indefinitely.
-//  2.  Clients cannot use interface conversions on the
-//      read end to find the Write method, and vice versa.
+var zero [0]byte
 
-type pipeHalf struct {
-       c1     chan []byte
-       c2     chan pipeResult
-       cclose chan os.Error
-       done   chan int
-
-       lock   sync.Mutex
-       closed bool
+func (p *pipe) write(b []byte) (n int, err os.Error) {
+       // pipe uses nil to mean not available
+       if b == nil {
+               b = zero[:]
+       }
 
-       io       sync.Mutex
-       ioclosed bool
-}
+       // One writer at a time.
+       p.wl.Lock()
+       defer p.wl.Unlock()
 
-func (p *pipeHalf) rw(data []byte) (n int, err os.Error) {
-       // Run i/o operation.
-       // Check ioclosed flag under lock to make sure we're still allowed to do i/o.
-       p.io.Lock()
-       if p.ioclosed {
-               p.io.Unlock()
-               return 0, os.EINVAL
+       p.l.Lock()
+       defer p.l.Unlock()
+       p.data = b
+       p.rwait.Signal()
+       for {
+               if p.data == nil {
+                       break
+               }
+               if p.rerr != nil {
+                       err = p.rerr
+                       break
+               }
+               if p.werr != nil {
+                       err = os.EINVAL
+               }
+               p.wwait.Wait()
        }
-       p.io.Unlock()
-       p.c1 <- data
-       res := <-p.c2
-       return res.n, res.err
+       n = len(b) - len(p.data)
+       p.data = nil // in case of rerr or werr
+       return
 }
 
-func (p *pipeHalf) close(err os.Error) os.Error {
-       // Close pipe half.
-       // Only first call to close does anything.
-       p.lock.Lock()
-       if p.closed {
-               p.lock.Unlock()
-               return os.EINVAL
+func (p *pipe) rclose(err os.Error) {
+       if err == nil {
+               err = os.EPIPE
        }
-       p.closed = true
-       p.lock.Unlock()
-
-       // First, send the close notification.
-       p.cclose <- err
-
-       // Runner is now responding to rw operations
-       // with os.EINVAL.  Cut off future rw operations
-       // by setting ioclosed flag.
-       p.io.Lock()
-       p.ioclosed = true
-       p.io.Unlock()
-
-       // With ioclosed set, there will be no more rw operations
-       // working on the channels.
-       // Tell the runner we won't be bothering it anymore.
-       p.done <- 1
-
-       // Successfully torn down; can disable finalizer.
-       runtime.SetFinalizer(p, nil)
-
-       return nil
+       p.l.Lock()
+       defer p.l.Unlock()
+       p.rerr = err
+       p.rwait.Signal()
+       p.wwait.Signal()
 }
 
-func (p *pipeHalf) finalizer() {
-       p.close(os.EINVAL)
+func (p *pipe) wclose(err os.Error) {
+       if err == nil {
+               err = os.EOF
+       }
+       p.l.Lock()
+       defer p.l.Unlock()
+       p.werr = err
+       p.rwait.Signal()
+       p.wwait.Signal()
 }
 
-
 // A PipeReader is the read half of a pipe.
 type PipeReader struct {
-       pipeHalf
+       p *pipe
 }
 
 // Read implements the standard Read interface:
 // it reads data from the pipe, blocking until a writer
 // arrives or the write end is closed.
 // If the write end is closed with an error, that error is
-// returned as err; otherwise err is nil.
+// returned as err; otherwise err is os.EOF.
 func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
-       return r.rw(data)
+       return r.p.read(data)
 }
 
 // Close closes the reader; subsequent writes to the
@@ -231,15 +136,13 @@ func (r *PipeReader) Close() os.Error {
 // CloseWithError closes the reader; subsequent writes
 // to the write half of the pipe will return the error err.
 func (r *PipeReader) CloseWithError(err os.Error) os.Error {
-       if err == nil {
-               err = os.EPIPE
-       }
-       return r.close(err)
+       r.p.rclose(err)
+       return nil
 }
 
 // A PipeWriter is the write half of a pipe.
 type PipeWriter struct {
-       pipeHalf
+       p *pipe
 }
 
 // Write implements the standard Write interface:
@@ -248,7 +151,7 @@ type PipeWriter struct {
 // If the read end is closed with an error, that err is
 // returned as err; otherwise err is os.EPIPE.
 func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
-       return w.rw(data)
+       return w.p.write(data)
 }
 
 // Close closes the writer; subsequent reads from the
@@ -260,10 +163,8 @@ func (w *PipeWriter) Close() os.Error {
 // CloseWithError closes the writer; subsequent reads from the
 // read half of the pipe will return no bytes and the error err.
 func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
-       if err == nil {
-               err = os.EOF
-       }
-       return w.close(err)
+       w.p.wclose(err)
+       return nil
 }
 
 // Pipe creates a synchronous in-memory pipe.
@@ -272,34 +173,10 @@ func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
 // Reads on one end are matched with writes on the other,
 // copying data directly between the two; there is no internal buffering.
 func Pipe() (*PipeReader, *PipeWriter) {
-       p := &pipe{
-               r1:     make(chan []byte),
-               r2:     make(chan pipeResult),
-               w1:     make(chan []byte),
-               w2:     make(chan pipeResult),
-               rclose: make(chan os.Error),
-               wclose: make(chan os.Error),
-               done:   make(chan int),
-       }
-       go p.run()
-
-       // NOTE: Cannot use composite literal here:
-       //      pipeHalf{c1: p.cr1, c2: p.cr2, cclose: p.crclose, cdone: p.cdone}
-       // because this implicitly copies the pipeHalf, which copies the inner mutex.
-
-       r := new(PipeReader)
-       r.c1 = p.r1
-       r.c2 = p.r2
-       r.cclose = p.rclose
-       r.done = p.done
-       runtime.SetFinalizer(r, (*PipeReader).finalizer)
-
-       w := new(PipeWriter)
-       w.c1 = p.w1
-       w.c2 = p.w2
-       w.cclose = p.wclose
-       w.done = p.done
-       runtime.SetFinalizer(w, (*PipeWriter).finalizer)
-
+       p := new(pipe)
+       p.rwait.L = &p.l
+       p.wwait.L = &p.l
+       r := &PipeReader{p}
+       w := &PipeWriter{p}
        return r, w
 }