]> Cypherpunks repositories - gostls13.git/commitdiff
pipe: implementation #3; this time for sure!
authorRuss Cox <rsc@golang.org>
Tue, 27 Apr 2010 17:17:17 +0000 (10:17 -0700)
committerRuss Cox <rsc@golang.org>
Tue, 27 Apr 2010 17:17:17 +0000 (10:17 -0700)
Added goroutine; got simpler.

Fixes deadlock when doing Read+Close
or Write+Close on same end.

R=r, cw
CC=golang-dev
https://golang.org/cl/994043

src/pkg/io/pipe.go
src/pkg/io/pipe_test.go

index fe7063446806647ac972f3c3c04b62343eaebab3..79221bd4978378d9b92e8628737482f141b81179 100644 (file)
@@ -9,114 +9,190 @@ package io
 
 import (
        "os"
+       "runtime"
        "sync"
 )
 
+type pipeResult struct {
+       n   int
+       err os.Error
+}
+
 // Shared pipe structure.
 type pipe struct {
-       rclosed bool        // Read end closed?
-       rerr    os.Error    // Error supplied to CloseReader
-       wclosed bool        // Write end closed?
-       werr    os.Error    // Error supplied to CloseWriter
-       wpend   []byte      // Written data waiting to be read.
-       wtot    int         // Bytes consumed so far in current write.
-       cw      chan []byte // Write sends data here...
-       cr      chan bool   // ... and reads a done notification from here.
-}
-
-func (p *pipe) Read(data []byte) (n int, err os.Error) {
-       if p.rclosed {
-               return 0, os.EINVAL
-       }
+       // Reader sends on cr1, receives on cr2.
+       // Writer does the same on cw1, cw2.
+       r1, w1 chan []byte
+       r2, w2 chan pipeResult
+
+       rclose chan os.Error // read close; error to return to writers
+       wclose chan os.Error // write close; error to return to readers
+
+       done chan int // read or write half is done
+}
 
-       // Wait for next write block if necessary.
-       if p.wpend == nil {
-               if !closed(p.cw) {
-                       p.wpend = <-p.cw
+func (p *pipe) run() {
+       var (
+               rb    []byte      // pending Read
+               wb    []byte      // pending Write
+               wn    int         // amount written so far from wb
+               rerr  os.Error    // if read end is closed, error to send to writers
+               werr  os.Error    // if write end is closed, error to send to readers
+               r1    chan []byte // p.cr1 or nil depending on whether Read is ok
+               w1    chan []byte // p.cw1 or nil depending on whether Write is ok
+               ndone int
+       )
+
+       // Read and Write are enabled at the start.
+       r1 = p.r1
+       w1 = p.w1
+
+       for {
+               select {
+               case <-p.done:
+                       if ndone++; ndone == 2 {
+                               // both reader and writer are gone
+                               return
+                       }
+                       continue
+               case rerr = <-p.rclose:
+                       if w1 == nil {
+                               // finish pending Write
+                               p.w2 <- pipeResult{wn, rerr}
+                               wn = 0
+                               w1 = p.w1 // allow another Write
+                       }
+                       if r1 == nil {
+                               // Close of read side during Read.
+                               // finish pending Read with os.EINVAL.
+                               p.r2 <- pipeResult{0, os.EINVAL}
+                               r1 = p.r1 // allow another Read
+                       }
+                       continue
+               case werr = <-p.wclose:
+                       if r1 == nil {
+                               // finish pending Read
+                               p.r2 <- pipeResult{0, werr}
+                               r1 = p.r1 // allow another Read
+                       }
+                       if w1 == nil {
+                               // Close of write side during Write.
+                               // finish pending Write with os.EINVAL.
+                               p.w2 <- pipeResult{wn, os.EINVAL}
+                               wn = 0
+                               w1 = p.w1 // allow another Write
+                       }
+                       continue
+               case rb = <-r1:
+                       if werr != nil {
+                               // write end is closed
+                               p.r2 <- pipeResult{0, werr}
+                               continue
+                       }
+                       r1 = nil // disable Read until this one is done
+               case wb = <-w1:
+                       if rerr != nil {
+                               // read end is closed
+                               p.w2 <- pipeResult{0, rerr}
+                               continue
+                       }
+                       w1 = nil // disable Write until this one is done
                }
-               if closed(p.cw) {
-                       return 0, p.werr
+
+               if r1 == nil && w1 == nil {
+                       // Have rb and wb.  Execute.
+                       n := copy(rb, wb)
+                       wn += n
+                       wb = wb[n:]
+
+                       // Finish Read.
+                       p.r2 <- pipeResult{n, nil}
+                       r1 = p.r1 // allow another Read
+
+                       // Maybe finish Write.
+                       if len(wb) == 0 {
+                               p.w2 <- pipeResult{wn, nil}
+                               wn = 0
+                               w1 = p.w1 // allow another Write
+                       }
                }
-               p.wtot = 0
        }
+}
+
+// Read/write halves of the pipe.
+// They are separate structures for two reasons:
+//  1.  If one end becomes garbage without being Closed,
+//      its finalizer can Close so that the other end
+//      does not hang indefinitely.
+//  2.  Clients cannot use interface conversions on the
+//      read end to find the Write method, and vice versa.
 
-       // Read from current write block.
-       n = copy(data, p.wpend)
-       p.wtot += n
-       p.wpend = p.wpend[n:]
+type pipeHalf struct {
+       c1     chan []byte
+       c2     chan pipeResult
+       cclose chan os.Error
+       done   chan int
 
-       // If write block is done, finish the write.
-       if len(p.wpend) == 0 {
-               p.wpend = nil
-               p.cr <- true
-               p.wtot = 0
-       }
+       lock   sync.Mutex
+       closed bool
 
-       return n, nil
+       io       sync.Mutex
+       ioclosed bool
 }
 
-func (p *pipe) Write(data []byte) (n int, err os.Error) {
-       if p.wclosed {
+func (p *pipeHalf) rw(data []byte) (n int, err os.Error) {
+       // Run i/o operation.
+       // Check ioclosed flag under lock to make sure we're still allowed to do i/o.
+       p.io.Lock()
+       defer p.io.Unlock()
+       if p.ioclosed {
                return 0, os.EINVAL
        }
-       if closed(p.cr) {
-               return 0, p.rerr
-       }
-
-       // Send write to reader.
-       p.cw <- data
-
-       // Wait for reader to finish copying it.
-       <-p.cr
-       if closed(p.cr) {
-               _, _ = <-p.cw // undo send if reader is gone
-               return 0, p.rerr
-       }
-       return len(data), nil
+       p.c1 <- data
+       res := <-p.c2
+       return res.n, res.err
 }
 
-func (p *pipe) CloseReader(rerr os.Error) os.Error {
-       if p.rclosed {
+func (p *pipeHalf) close(err os.Error) os.Error {
+       // Close pipe half.
+       // Only first call to close does anything.
+       p.lock.Lock()
+       if p.closed {
+               p.lock.Unlock()
                return os.EINVAL
        }
-       p.rclosed = true
+       p.closed = true
+       p.lock.Unlock()
 
-       // Wake up writes.
-       if rerr == nil {
-               rerr = os.EPIPE
-       }
-       p.rerr = rerr
-       close(p.cr)
-       return nil
-}
+       // First, send the close notification.
+       p.cclose <- err
 
-func (p *pipe) CloseWriter(werr os.Error) os.Error {
-       if p.wclosed {
-               return os.EINVAL
-       }
-       p.wclosed = true
+       // Runner is now responding to rw operations
+       // with os.EINVAL.  Cut off future rw operations
+       // by setting ioclosed flag.
+       p.io.Lock()
+       p.ioclosed = true
+       p.io.Unlock()
+
+       // With ioclosed set, there will be no more rw operations
+       // working on the channels.
+       // Tell the runner we won't be bothering it anymore.
+       p.done <- 1
+
+       // Successfully torn down; can disable finalizer.
+       runtime.SetFinalizer(p, nil)
 
-       // Wake up reads.
-       if werr == nil {
-               werr = os.EOF
-       }
-       p.werr = werr
-       close(p.cw)
        return nil
 }
 
-// Read/write halves of the pipe.
-// They are separate structures for two reasons:
-//  1.  If one end becomes garbage without being Closed,
-//      its finisher can Close so that the other end
-//      does not hang indefinitely.
-//  2.  Clients cannot use interface conversions on the
-//      read end to find the Write method, and vice versa.
+func (p *pipeHalf) finalizer() {
+       p.close(os.EINVAL)
+}
+
 
 // A PipeReader is the read half of a pipe.
 type PipeReader struct {
-       lock sync.Mutex
-       p    *pipe
+       pipeHalf
 }
 
 // Read implements the standard Read interface:
@@ -125,36 +201,27 @@ type PipeReader struct {
 // If the write end is closed with an error, that error is
 // returned as err; otherwise err is nil.
 func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
-       r.lock.Lock()
-       defer r.lock.Unlock()
-
-       return r.p.Read(data)
+       return r.rw(data)
 }
 
 // Close closes the reader; subsequent writes to the
 // write half of the pipe will return the error os.EPIPE.
 func (r *PipeReader) Close() os.Error {
-       r.lock.Lock()
-       defer r.lock.Unlock()
-
-       return r.p.CloseReader(nil)
+       return r.CloseWithError(nil)
 }
 
 // CloseWithError closes the reader; subsequent writes
-// to the write half of the pipe will return the error rerr.
-func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
-       r.lock.Lock()
-       defer r.lock.Unlock()
-
-       return r.p.CloseReader(rerr)
+// to the write half of the pipe will return the error err.
+func (r *PipeReader) CloseWithError(err os.Error) os.Error {
+       if err == nil {
+               err = os.EPIPE
+       }
+       return r.close(err)
 }
 
-func (r *PipeReader) finish() { r.Close() }
-
-// Write half of pipe.
+// A PipeWriter is the write half of a pipe.
 type PipeWriter struct {
-       lock sync.Mutex
-       p    *pipe
+       pipeHalf
 }
 
 // Write implements the standard Write interface:
@@ -163,32 +230,24 @@ type PipeWriter struct {
 // If the read end is closed with an error, that err is
 // returned as err; otherwise err is os.EPIPE.
 func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
-       w.lock.Lock()
-       defer w.lock.Unlock()
-
-       return w.p.Write(data)
+       return w.rw(data)
 }
 
 // Close closes the writer; subsequent reads from the
-// read half of the pipe will return no bytes and a nil error.
+// read half of the pipe will return no bytes and os.EOF.
 func (w *PipeWriter) Close() os.Error {
-       w.lock.Lock()
-       defer w.lock.Unlock()
-
-       return w.p.CloseWriter(nil)
+       return w.CloseWithError(nil)
 }
 
 // CloseWithError closes the writer; subsequent reads from the
-// read half of the pipe will return no bytes and the error werr.
-func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
-       w.lock.Lock()
-       defer w.lock.Unlock()
-
-       return w.p.CloseWriter(werr)
+// read half of the pipe will return no bytes and the error err.
+func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
+       if err == nil {
+               err = os.EOF
+       }
+       return w.close(err)
 }
 
-func (w *PipeWriter) finish() { w.Close() }
-
 // Pipe creates a synchronous in-memory pipe.
 // It can be used to connect code expecting an io.Reader
 // with code expecting an io.Writer.
@@ -196,8 +255,39 @@ func (w *PipeWriter) finish() { w.Close() }
 // copying data directly between the two; there is no internal buffering.
 func Pipe() (*PipeReader, *PipeWriter) {
        p := &pipe{
-               cw: make(chan []byte, 1),
-               cr: make(chan bool, 1),
+               r1:     make(chan []byte),
+               r2:     make(chan pipeResult),
+               w1:     make(chan []byte),
+               w2:     make(chan pipeResult),
+               rclose: make(chan os.Error),
+               wclose: make(chan os.Error),
+               done:   make(chan int),
        }
-       return &PipeReader{p: p}, &PipeWriter{p: p}
+       go p.run()
+
+       // NOTE: Cannot use composite literal here:
+       //      pipeHalf{c1: p.cr1, c2: p.cr2, cclose: p.crclose, cdone: p.cdone}
+       // because this implicitly copies the pipeHalf, which copies the inner mutex.
+
+       r := new(PipeReader)
+       r.c1 = p.r1
+       r.c2 = p.r2
+       r.cclose = p.rclose
+       r.done = p.done
+       // TODO(rsc): Should be able to write
+       //      runtime.SetFinalizer(r, (*PipeReader).finalizer)
+       // but 6g doesn't see the finalizer method.
+       runtime.SetFinalizer(&r.pipeHalf, (*pipeHalf).finalizer)
+
+       w := new(PipeWriter)
+       w.c1 = p.w1
+       w.c2 = p.w2
+       w.cclose = p.wclose
+       w.done = p.done
+       // TODO(rsc): Should be able to write
+       //      runtime.SetFinalizer(w, (*PipeWriter).finalizer)
+       // but 6g doesn't see the finalizer method.
+       runtime.SetFinalizer(&w.pipeHalf, (*pipeHalf).finalizer)
+
+       return r, w
 }
index 27eb061d48f7fc67b27c94a940523e92acb74682..902d7a0a3fdd0449fc4a58247b45d63b493b1ca6 100644 (file)
@@ -207,6 +207,18 @@ func TestPipeReadClose(t *testing.T) {
        }
 }
 
+// Test close on Read side during Read.
+func TestPipeReadClose2(t *testing.T) {
+       c := make(chan int, 1)
+       r, _ := Pipe()
+       go delayClose(t, r, c, pipeTest{})
+       n, err := r.Read(make([]byte, 64))
+       <-c
+       if n != 0 || err != os.EINVAL {
+               t.Errorf("read from closed pipe: %v, %v want %v, %v", n, err, 0, os.EINVAL)
+       }
+}
+
 // Test write after/before reader close.
 
 func TestPipeWriteClose(t *testing.T) {