]> Cypherpunks repositories - gostls13.git/commitdiff
netchan: provide a method (Importer.Errors()) to recover protocol errors.
authorRob Pike <r@golang.org>
Mon, 20 Sep 2010 05:28:38 +0000 (15:28 +1000)
committerRob Pike <r@golang.org>
Mon, 20 Sep 2010 05:28:38 +0000 (15:28 +1000)
R=rsc
CC=golang-dev
https://golang.org/cl/2229044

src/pkg/netchan/export.go
src/pkg/netchan/import.go
src/pkg/netchan/netchan_test.go

index d7dceead9900d46fc04140d5010c286cfd63e10a..73a070c95cec8edc6d5a46f8cf64ac6ca26fa7bd 100644 (file)
@@ -66,7 +66,7 @@ func newClient(exp *Exporter, conn net.Conn) *expClient {
 
 func (client *expClient) sendError(hdr *header, err string) {
        error := &error{err}
-       expLog("sending error to client", error.error)
+       expLog("sending error to client:", error.error)
        client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
        client.mu.Lock()
        client.errored = true
index e6bf4cbb3283fd731c386911703eb83ba49e516c..bb19dd4702f129b32a9147bdbd4e8b73c2ff4e5f 100644 (file)
@@ -28,6 +28,7 @@ type Importer struct {
        conn     net.Conn
        chanLock sync.Mutex // protects access to channel map
        chans    map[string]*chanDir
+       errors   chan os.Error
 }
 
 // NewImporter creates a new Importer object to import channels
@@ -43,6 +44,7 @@ func NewImporter(network, remoteaddr string) (*Importer, os.Error) {
        imp.encDec = newEncDec(conn)
        imp.conn = conn
        imp.chans = make(map[string]*chanDir)
+       imp.errors = make(chan os.Error, 10)
        go imp.run()
        return imp, nil
 }
@@ -86,15 +88,18 @@ func (imp *Importer) run() {
                        }
                        if err.error != "" {
                                impLog("response error:", err.error)
-                               imp.shutdown()
-                               return
+                               if sent := imp.errors <- os.ErrorString(err.error); !sent {
+                                       imp.shutdown()
+                                       return
+                               }
+                               continue // errors are not acknowledged.
                        }
                case payClosed:
                        ich := imp.getChan(hdr.name)
                        if ich != nil {
                                ich.ch.Close()
                        }
-                       continue
+                       continue // closes are not acknowledged.
                default:
                        impLog("unexpected payload type:", hdr.payloadType)
                        return
@@ -132,6 +137,14 @@ func (imp *Importer) getChan(name string) *chanDir {
        return ich
 }
 
+// Errors returns a channel from which transmission and protocol errors
+// can be read. Clients of the importer are not required to read the error
+// channel for correct execution. However, if too many errors occur
+// without being read from the error channel, the importer will shut down.
+func (imp *Importer) Errors() chan os.Error {
+       return imp.errors
+}
+
 // Import imports a channel of the given type and specified direction.
 // It is equivalent to ImportNValues with a count of -1, meaning unbounded.
 func (imp *Importer) Import(name string, chT interface{}, dir Dir) os.Error {
index 42cb3d1ec1edc7ffce94b7433540d97e2132c38a..4240b07869cd856de30a0b8e347c8b48920a6a36 100644 (file)
@@ -4,7 +4,11 @@
 
 package netchan
 
-import "testing"
+import (
+       "strings"
+       "testing"
+       "time"
+)
 
 const count = 10     // number of items in most tests
 const closeCount = 5 // number of items when sender closes early
@@ -134,6 +138,45 @@ func TestClosingImportSendExportReceive(t *testing.T) {
        exportReceive(exp, t)
 }
 
+func TestErrorForIllegalChannel(t *testing.T) {
+       exp, err := NewExporter("tcp", "127.0.0.1:0")
+       if err != nil {
+               t.Fatal("new exporter:", err)
+       }
+       imp, err := NewImporter("tcp", exp.Addr().String())
+       if err != nil {
+               t.Fatal("new importer:", err)
+       }
+       // Now export a channel.
+       ch := make(chan int, 1)
+       err = exp.Export("aChannel", ch, Send)
+       if err != nil {
+               t.Fatal("export:", err)
+       }
+       ch <- 1234
+       close(ch)
+       // Now try to import a different channel.
+       ch = make(chan int)
+       err = imp.Import("notAChannel", ch, Recv)
+       if err != nil {
+               t.Fatal("import:", err)
+       }
+       // Expect an error now.  Start a timeout.
+       timeout := make(chan bool, 1) // buffered so closure will not hang around.
+       go func() {
+               time.Sleep(10e9) // very long, to give even really slow machines a chance.
+               timeout <- true
+       }()
+       select {
+       case err = <-imp.Errors():
+               if strings.Index(err.String(), "no such channel") < 0 {
+                       t.Errorf("wrong error for nonexistent channel:", err)
+               }
+       case <-timeout:
+               t.Error("import of nonexistent channel did not receive an error")
+       }
+}
+
 // Not a great test but it does at least invoke Drain.
 func TestExportDrain(t *testing.T) {
        exp, err := NewExporter("tcp", "127.0.0.1:0")