func (client *expClient) sendError(hdr *header, err string) {
error := &error{err}
- expLog("sending error to client", error.error)
+ expLog("sending error to client:", error.error)
client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
client.mu.Lock()
client.errored = true
conn net.Conn
chanLock sync.Mutex // protects access to channel map
chans map[string]*chanDir
+ errors chan os.Error
}
// NewImporter creates a new Importer object to import channels
imp.encDec = newEncDec(conn)
imp.conn = conn
imp.chans = make(map[string]*chanDir)
+ imp.errors = make(chan os.Error, 10)
go imp.run()
return imp, nil
}
}
if err.error != "" {
impLog("response error:", err.error)
- imp.shutdown()
- return
+ if sent := imp.errors <- os.ErrorString(err.error); !sent {
+ imp.shutdown()
+ return
+ }
+ continue // errors are not acknowledged.
}
case payClosed:
ich := imp.getChan(hdr.name)
if ich != nil {
ich.ch.Close()
}
- continue
+ continue // closes are not acknowledged.
default:
impLog("unexpected payload type:", hdr.payloadType)
return
return ich
}
+// Errors returns a channel from which transmission and protocol errors
+// can be read. Clients of the importer are not required to read the error
+// channel for correct execution. However, if too many errors occur
+// without being read from the error channel, the importer will shut down.
+func (imp *Importer) Errors() chan os.Error {
+ return imp.errors
+}
+
// Import imports a channel of the given type and specified direction.
// It is equivalent to ImportNValues with a count of -1, meaning unbounded.
func (imp *Importer) Import(name string, chT interface{}, dir Dir) os.Error {
package netchan
-import "testing"
+import (
+ "strings"
+ "testing"
+ "time"
+)
const count = 10 // number of items in most tests
const closeCount = 5 // number of items when sender closes early
exportReceive(exp, t)
}
+func TestErrorForIllegalChannel(t *testing.T) {
+ exp, err := NewExporter("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatal("new exporter:", err)
+ }
+ imp, err := NewImporter("tcp", exp.Addr().String())
+ if err != nil {
+ t.Fatal("new importer:", err)
+ }
+ // Now export a channel.
+ ch := make(chan int, 1)
+ err = exp.Export("aChannel", ch, Send)
+ if err != nil {
+ t.Fatal("export:", err)
+ }
+ ch <- 1234
+ close(ch)
+ // Now try to import a different channel.
+ ch = make(chan int)
+ err = imp.Import("notAChannel", ch, Recv)
+ if err != nil {
+ t.Fatal("import:", err)
+ }
+ // Expect an error now. Start a timeout.
+ timeout := make(chan bool, 1) // buffered so closure will not hang around.
+ go func() {
+ time.Sleep(10e9) // very long, to give even really slow machines a chance.
+ timeout <- true
+ }()
+ select {
+ case err = <-imp.Errors():
+ if strings.Index(err.String(), "no such channel") < 0 {
+ t.Errorf("wrong error for nonexistent channel:", err)
+ }
+ case <-timeout:
+ t.Error("import of nonexistent channel did not receive an error")
+ }
+}
+
// Not a great test but it does at least invoke Drain.
func TestExportDrain(t *testing.T) {
exp, err := NewExporter("tcp", "127.0.0.1:0")