import (
"log"
+ "io"
"net"
"os"
"reflect"
// but they must use different ports.
type Exporter struct {
*clientSet
- listener net.Listener
}
type expClient struct {
seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
}
-func newClient(exp *Exporter, conn net.Conn) *expClient {
+func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
client := new(expClient)
client.exp = exp
client.encDec = newEncDec(conn)
return n
}
-// Wait for incoming connections, start a new runner for each
-func (exp *Exporter) listen() {
+// Serve waits for incoming connections on the listener
+// and serves the Exporter's channels on each.
+// It blocks until the listener is closed.
+func (exp *Exporter) Serve(listener net.Listener) {
for {
- conn, err := exp.listener.Accept()
+ conn, err := listener.Accept()
if err != nil {
expLog("listen:", err)
break
}
- client := exp.addClient(conn)
- go client.run()
+ go exp.ServeConn(conn)
}
}
-// NewExporter creates a new Exporter to export channels
-// on the network and local address defined as in net.Listen.
-func NewExporter(network, localaddr string) (*Exporter, os.Error) {
- listener, err := net.Listen(network, localaddr)
- if err != nil {
- return nil, err
- }
+// ServeConn exports the Exporter's channels on conn.
+// It blocks until the connection is terminated.
+func (exp *Exporter) ServeConn(conn io.ReadWriter) {
+ exp.addClient(conn).run()
+}
+
+// NewExporter creates a new Exporter that exports a set of channels.
+func NewExporter() *Exporter {
e := &Exporter{
- listener: listener,
clientSet: &clientSet{
names: make(map[string]*chanDir),
clients: make(map[unackedCounter]bool),
},
}
- go e.listen()
- return e, nil
+ return e
+}
+
+// ListenAndServe exports the exporter's channels through the
+// given network and local address defined as in net.Listen.
+func (exp *Exporter) ListenAndServe(network, localaddr string) os.Error {
+ listener, err := net.Listen(network, localaddr)
+ if err != nil {
+ return err
+ }
+ go exp.Serve(listener)
+ return nil
}
// addClient creates a new expClient and records its existence
-func (exp *Exporter) addClient(conn net.Conn) *expClient {
+func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
client := newClient(exp, conn)
exp.mu.Lock()
exp.clients[client] = true
return exp.clientSet.sync(timeout)
}
-// Addr returns the Exporter's local network address.
-func (exp *Exporter) Addr() net.Addr { return exp.listener.Addr() }
-
func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) {
chanType, ok := reflect.Typeof(chT).(*reflect.ChanType)
if !ok {
package netchan
import (
+ "io"
"log"
"net"
"os"
// importers, even from the same machine/network port.
type Importer struct {
*encDec
- conn net.Conn
chanLock sync.Mutex // protects access to channel map
names map[string]*netChan
chans map[int]*netChan
maxId int
}
-// NewImporter creates a new Importer object to import channels
-// from an Exporter at the network and remote address as defined in net.Dial.
-// The Exporter must be available and serving when the Importer is
-// created.
-func NewImporter(network, remoteaddr string) (*Importer, os.Error) {
- conn, err := net.Dial(network, "", remoteaddr)
- if err != nil {
- return nil, err
- }
+// NewImporter creates a new Importer object to import a set of channels
+// from the given connection. The Exporter must be available and serving when
+// the Importer is created.
+func NewImporter(conn io.ReadWriter) *Importer {
imp := new(Importer)
imp.encDec = newEncDec(conn)
- imp.conn = conn
imp.chans = make(map[int]*netChan)
imp.names = make(map[string]*netChan)
imp.errors = make(chan os.Error, 10)
go imp.run()
- return imp, nil
+ return imp
+}
+
+// Import imports a set of channels from the given network and address.
+func Import(network, remoteaddr string) (*Importer, os.Error) {
+ conn, err := net.Dial(network, "", remoteaddr)
+ if err != nil {
+ return nil, err
+ }
+ return NewImporter(conn), nil
}
// shutdown closes all channels for which we are receiving data from the remote side.
// the channel. Messages in flight for the channel may be dropped.
func (imp *Importer) Hangup(name string) os.Error {
imp.chanLock.Lock()
- nc, ok := imp.names[name]
- if ok {
- imp.names[name] = nil, false
- imp.chans[nc.id] = nil, false
- }
- imp.chanLock.Unlock()
- if !ok {
+ defer imp.chanLock.Unlock()
+ nc := imp.names[name]
+ if nc == nil {
return os.ErrorString("netchan import: hangup: no such channel: " + name)
}
+ imp.names[name] = nil, false
+ imp.chans[nc.id] = nil, false
nc.close()
return nil
}
package netchan
import (
+ "net"
"strings"
"testing"
"time"
}
func TestExportSendImportReceive(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
exportSend(exp, count, t, nil)
importReceive(imp, t, nil)
}
func TestExportReceiveImportSend(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
expDone := make(chan bool)
done := make(chan bool)
go func() {
}
func TestClosingExportSendImportReceive(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
exportSend(exp, closeCount, t, nil)
importReceive(imp, t, nil)
}
func TestClosingImportSendExportReceive(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
expDone := make(chan bool)
done := make(chan bool)
go func() {
}
func TestErrorForIllegalChannel(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
// Now export a channel.
ch := make(chan int, 1)
- err = exp.Export("aChannel", ch, Send)
+ err := exp.Export("aChannel", ch, Send)
if err != nil {
t.Fatal("export:", err)
}
// Not a great test but it does at least invoke Drain.
func TestExportDrain(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
done := make(chan bool)
go func() {
exportSend(exp, closeCount, t, nil)
// Not a great test but it does at least invoke Sync.
func TestExportSync(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
done := make(chan bool)
exportSend(exp, closeCount, t, nil)
go importReceive(imp, t, done)
// Test hanging up the send side of an export.
// TODO: test hanging up the receive side of an export.
func TestExportHangup(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
ech := make(chan int)
- err = exp.Export("exportedSend", ech, Send)
+ err := exp.Export("exportedSend", ech, Send)
if err != nil {
t.Fatal("export:", err)
}
// Test hanging up the send side of an import.
// TODO: test hanging up the receive side of an import.
func TestImportHangup(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
ech := make(chan int)
- err = exp.Export("exportedRecv", ech, Recv)
+ err := exp.Export("exportedRecv", ech, Recv)
if err != nil {
t.Fatal("export:", err)
}
// This test checks that channel operations can proceed
// even when other concurrent operations are blocked.
func TestIndependentSends(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
exportLoopback(exp, t)
}
func TestCrossConnect(t *testing.T) {
- e1, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- i1, err := NewImporter("tcp", e1.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
-
- e2, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- i2, err := NewImporter("tcp", e2.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ e1, i1 := pair(t)
+ e2, i2 := pair(t)
crossExport(e1, e2, t)
crossImport(i1, i2, t)
// test flow control from exporter to importer.
func TestExportFlowControl(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
sendDone := make(chan bool, 1)
exportSend(exp, flowCount, t, sendDone)
ch := make(chan int)
- err = imp.ImportNValues("exportedSend", ch, Recv, 20, -1)
+ err := imp.ImportNValues("exportedSend", ch, Recv, 20, -1)
if err != nil {
t.Fatal("importReceive:", err)
}
// test flow control from importer to exporter.
func TestImportFlowControl(t *testing.T) {
- exp, err := NewExporter("tcp", "127.0.0.1:0")
- if err != nil {
- t.Fatal("new exporter:", err)
- }
- imp, err := NewImporter("tcp", exp.Addr().String())
- if err != nil {
- t.Fatal("new importer:", err)
- }
+ exp, imp := pair(t)
ch := make(chan int)
- err = exp.Export("exportedRecv", ch, Recv)
+ err := exp.Export("exportedRecv", ch, Recv)
if err != nil {
t.Fatal("importReceive:", err)
}
t.Fatalf("expected %d values; got %d", N, n)
}
}
+
+func pair(t *testing.T) (*Exporter, *Importer) {
+ c0, c1 := net.Pipe()
+ exp := NewExporter()
+ go exp.ServeConn(c0)
+ imp := NewImporter(c1)
+ return exp, imp
+}