From: Rob Pike Date: Sat, 29 May 2010 05:32:29 +0000 (-0700) Subject: netchan: improve closing and shutdown. there's still more to do. X-Git-Tag: weekly.2010-06-09~65 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=752b47cfc59cee54dabe7aad6ce36b098e1aa27e;p=gostls13.git netchan: improve closing and shutdown. there's still more to do. Fixes #805. R=rsc CC=golang-dev https://golang.org/cl/1400041 --- diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go index 89deb20ae2..ea1d63fb9e 100644 --- a/src/pkg/netchan/export.go +++ b/src/pkg/netchan/export.go @@ -15,11 +15,12 @@ use the channels in the usual way. Networked channels are not synchronized; they always behave - as if there is a buffer of at least one element between the - two machines. + as if they are buffered channels of at least one element. */ package netchan +// BUG: can't use range clause to receive when using ImportNValues with N non-zero. + import ( "log" "net" @@ -143,6 +144,10 @@ func (client *expClient) serveRecv(hdr header, count int) { } for { val := ech.ch.Recv() + if ech.ch.Closed() { + client.sendError(&hdr, os.EOF.String()) + break + } if err := client.encode(&hdr, payData, val.Interface()); err != nil { log.Stderr("error encoding client response:", err) client.sendError(&hdr, err.String()) diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go index bde36f6152..454e265b21 100644 --- a/src/pkg/netchan/import.go +++ b/src/pkg/netchan/import.go @@ -49,6 +49,17 @@ func NewImporter(network, remoteaddr string) (*Importer, os.Error) { return imp, nil } +// shutdown closes all channels for which we are receiving data from the remote side. +func (imp *Importer) shutdown() { + imp.chanLock.Lock() + for _, ich := range imp.chans { + if ich.dir == Recv { + ich.ch.Close() + } + } + imp.chanLock.Unlock() +} + // Handle the data from a single imported data stream, which will // have the form // (response, data)* @@ -60,6 +71,7 @@ func (imp *Importer) run() { for { if e := imp.decode(hdr); e != nil { log.Stderr("importer header:", e) + imp.shutdown() return } switch hdr.payloadType { @@ -72,7 +84,7 @@ func (imp *Importer) run() { } if err.error != "" { log.Stderr("importer response error:", err.error) - // TODO: tear down connection + imp.shutdown() return } default: diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go index bce37c8669..1981a00c9e 100644 --- a/src/pkg/netchan/netchan_test.go +++ b/src/pkg/netchan/netchan_test.go @@ -11,17 +11,19 @@ type value struct { s string } -const count = 10 +const count = 10 // number of items in most tests +const closeCount = 5 // number of items when sender closes early -func exportSend(exp *Exporter, t *testing.T) { +func exportSend(exp *Exporter, n int, t *testing.T) { ch := make(chan value) err := exp.Export("exportedSend", ch, Send, new(value)) if err != nil { t.Fatal("exportSend:", err) } - for i := 0; i < count; i++ { + for i := 0; i < n; i++ { ch <- value{23 + i, "hello"} } + close(ch) } func exportReceive(exp *Exporter, t *testing.T) { @@ -46,6 +48,12 @@ func importReceive(imp *Importer, t *testing.T) { } for i := 0; i < count; i++ { v := <-ch + if closed(ch) { + if i != closeCount { + t.Errorf("expected close at %d; got one at %d\n", count/2, i) + } + break + } if v.i != 23+i || v.s != "hello" { t.Errorf("importReceive: bad value: expected %d, hello; got %+v", 23+i, v) } @@ -72,7 +80,7 @@ func TestExportSendImportReceive(t *testing.T) { if err != nil { t.Fatal("new importer:", err) } - go exportSend(exp, t) + go exportSend(exp, count, t) importReceive(imp, t) } @@ -88,3 +96,16 @@ func TestExportReceiveImportSend(t *testing.T) { go importSend(imp, t) exportReceive(exp, t) } + +func TestClosingExportSendImportReceive(t *testing.T) { + exp, err := NewExporter("tcp", ":0") + if err != nil { + t.Fatal("new exporter:", err) + } + imp, err := NewImporter("tcp", exp.Addr().String()) + if err != nil { + t.Fatal("new importer:", err) + } + go exportSend(exp, closeCount, t) + importReceive(imp, t) +}