--- /dev/null
+# Copyright 2009 The Go Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+include ../../Make.$(GOARCH)
+
+TARG=netchan
+GOFILES=\
+ common.go\
+ export.go\
+ import.go\
+
+include ../../Make.pkg
--- /dev/null
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package netchan
+
+import (
+ "gob"
+ "log"
+ "net"
+ "os"
+ "sync"
+)
+
+type Dir int
+
+const (
+ Recv Dir = iota
+ Send
+)
+
+// Mutex-protected encoder and decoder pair
+
+type encDec struct {
+ decLock sync.Mutex
+ dec *gob.Decoder
+ encLock sync.Mutex
+ enc *gob.Encoder
+}
+
+func newEncDec(conn net.Conn) *encDec {
+ return &encDec{
+ dec: gob.NewDecoder(conn),
+ enc: gob.NewEncoder(conn),
+ }
+}
+
+func (ed *encDec) decode(e interface{}) os.Error {
+ ed.decLock.Lock()
+ defer ed.decLock.Unlock()
+ err := ed.dec.Decode(e)
+ if err != nil {
+ log.Stderr("exporter decode:", err)
+ // TODO: tear down connection
+ return err
+ }
+ return nil
+}
+
+func (ed *encDec) encode(e0, e1 interface{}) os.Error {
+ ed.encLock.Lock()
+ defer ed.encLock.Unlock()
+ err := ed.enc.Encode(e0)
+ if err == nil && e1 != nil {
+ err = ed.enc.Encode(e1)
+ }
+ if err != nil {
+ log.Stderr("exporter encode:", err)
+ // TODO: tear down connection?
+ return err
+ }
+ return nil
+}
--- /dev/null
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+/*
+ The netchan package implements type-safe networked channels:
+ it allows the two ends of a channel to appear on different
+ computers connected by a network. It does this by transporting
+ data sent to a channel on one machine so it can be recovered
+ by a receive of a channel of the same type on the other.
+
+ An exporter publishes a set of channels by name. An importer
+ connects to the exporting machine and imports the channels
+ by name. After importing the channels, the two machines can
+ use the channels in the usual way.
+
+ Networked channels are not synchronized; they always behave
+ as if there is a buffer of at least one element between the
+ two machines.
+
+ TODO: at the moment, the exporting machine must send and
+ the importing machine must receive. This restriction will
+ be lifted soon.
+*/
+package netchan
+
+import (
+ "log"
+ "net"
+ "os"
+ "reflect"
+ "sync"
+)
+
+// Export
+
+// A channel and its associated information: a direction
+type exportChan struct {
+ ch *reflect.ChanValue
+ dir Dir
+}
+
+// An Exporter allows a set of channels to be published on a single
+// network port. A single machine may have multiple Exporters
+// but they must use different ports.
+type Exporter struct {
+ listener net.Listener
+ chanLock sync.Mutex // protects access to channel map
+ chans map[string]*exportChan
+}
+
+type expClient struct {
+ *encDec
+ exp *Exporter
+}
+
+func newClient(exp *Exporter, conn net.Conn) *expClient {
+ client := new(expClient)
+ client.exp = exp
+ client.encDec = newEncDec(conn)
+ return client
+
+}
+
+// TODO: ASSUMES EXPORT MEANS SEND
+
+// Sent once per channel from importer to exporter to report that it's listening to a channel
+type request struct {
+ name string
+ dir Dir
+ count int
+}
+
+// Reply to request, sent from exporter to importer on each send.
+type response struct {
+ name string
+ error string
+}
+
+// Wait for incoming connections, start a new runner for each
+func (exp *Exporter) listen() {
+ for {
+ conn, err := exp.listener.Accept()
+ if err != nil {
+ log.Stderr("exporter.listen:", err)
+ break
+ }
+ log.Stderr("accepted call from", conn.RemoteAddr())
+ client := newClient(exp, conn)
+ go client.run()
+ }
+}
+
+// Send a single client all its data. For each request, this will launch
+// a serveRecv goroutine to deliver the data for that channel.
+func (client *expClient) run() {
+ req := new(request)
+ for {
+ if err := client.decode(req); err != nil {
+ log.Stderr("error decoding client request:", err)
+ // TODO: tear down connection
+ break
+ }
+ log.Stderrf("export request: %+v", req)
+ if req.dir == Recv {
+ go client.serveRecv(req)
+ } else {
+ log.Stderr("export request: can't handle channel direction", req.dir)
+ resp := new(response)
+ resp.name = req.name
+ resp.error = "export request: can't handle channel direction"
+ client.encode(resp, nil)
+ break
+ }
+ }
+}
+
+// Send all the data on a single channel to a client asking for a Recv
+func (client *expClient) serveRecv(req *request) {
+ exp := client.exp
+ resp := new(response)
+ resp.name = req.name
+ var ok bool
+ exp.chanLock.Lock()
+ ech, ok := exp.chans[req.name]
+ exp.chanLock.Unlock()
+ if !ok {
+ resp.error = "no such channel: " + req.name
+ log.Stderr("export:", resp.error)
+ client.encode(resp, nil) // ignore any encode error, hope client gets it
+ return
+ }
+ for {
+ if ech.dir != Send {
+ log.Stderr("TODO: recv export unimplemented")
+ break
+ }
+ val := ech.ch.Recv()
+ if err := client.encode(resp, val.Interface()); err != nil {
+ log.Stderr("error encoding client response:", err)
+ break
+ }
+ if req.count > 0 {
+ req.count--
+ if req.count == 0 {
+ break
+ }
+ }
+ }
+}
+
+// NewExporter creates a new Exporter to export channels
+// on the network and local address defined as in net.Listen.
+func NewExporter(network, localaddr string) (*Exporter, os.Error) {
+ listener, err := net.Listen(network, localaddr)
+ if err != nil {
+ return nil, err
+ }
+ e := &Exporter{
+ listener: listener,
+ chans: make(map[string]*exportChan),
+ }
+ go e.listen()
+ return e, nil
+}
+
+// Addr returns the Exporter's local network address.
+func (exp *Exporter) Addr() net.Addr { return exp.listener.Addr() }
+
+func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) {
+ chanType, ok := reflect.Typeof(chT).(*reflect.ChanType)
+ if !ok {
+ return nil, os.ErrorString("not a channel")
+ }
+ if dir != Send && dir != Recv {
+ return nil, os.ErrorString("unknown channel direction")
+ }
+ switch chanType.Dir() {
+ case reflect.BothDir:
+ case reflect.SendDir:
+ if dir != Recv {
+ return nil, os.ErrorString("to import/export with Send, must provide <-chan")
+ }
+ case reflect.RecvDir:
+ if dir != Send {
+ return nil, os.ErrorString("to import/export with Recv, must provide chan<-")
+ }
+ }
+ return reflect.NewValue(chT).(*reflect.ChanValue), nil
+}
+
+// Export exports a channel of a given type and specified direction. The
+// channel to be exported is provided in the call and may be of arbitrary
+// channel type.
+// Despite the literal signature, the effective signature is
+// Export(name string, chT chan T, dir Dir)
+// where T must be a struct, pointer to struct, etc.
+func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error {
+ ch, err := checkChan(chT, dir)
+ if err != nil {
+ return err
+ }
+ exp.chanLock.Lock()
+ defer exp.chanLock.Unlock()
+ _, present := exp.chans[name]
+ if present {
+ return os.ErrorString("channel name already being exported:" + name)
+ }
+ exp.chans[name] = &exportChan{ch, dir}
+ return nil
+}
--- /dev/null
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package netchan
+
+import (
+ "log"
+ "net"
+ "os"
+ "reflect"
+ "sync"
+)
+
+// Import
+
+// A channel and its associated information: a template value, direction and a count
+type importChan struct {
+ ch *reflect.ChanValue
+ dir Dir
+ ptr *reflect.PtrValue // a pointer value we can point at each new item
+ count int
+}
+
+// An Importer allows a set of channels to be imported from a single
+// remote machine/network port. A machine may have multiple
+// importers, even from the same machine/network port.
+type Importer struct {
+ *encDec
+ conn net.Conn
+ chanLock sync.Mutex // protects access to channel map
+ chans map[string]*importChan
+}
+
+// TODO: ASSUMES IMPORT MEANS RECEIVE
+
+// NewImporter creates a new Importer object to import channels
+// from an Exporter at the network and remote address as defined in net.Dial.
+// The Exporter must be available and serving when the Importer is
+// created.
+func NewImporter(network, remoteaddr string) (*Importer, os.Error) {
+ conn, err := net.Dial(network, "", remoteaddr)
+ if err != nil {
+ return nil, err
+ }
+ imp := new(Importer)
+ imp.encDec = newEncDec(conn)
+ imp.conn = conn
+ imp.chans = make(map[string]*importChan)
+ go imp.run()
+ return imp, nil
+}
+
+// Handle the data from a single imported data stream, which will
+// have the form
+// (response, data)*
+// The response identifies by name which channel is receiving data.
+// TODO: allow an importer to send.
+func (imp *Importer) run() {
+ // Loop on responses; requests are sent by ImportNValues()
+ resp := new(response)
+ for {
+ if err := imp.decode(resp); err != nil {
+ log.Stderr("importer response decode:", err)
+ break
+ }
+ if resp.error != "" {
+ log.Stderr("importer response error:", resp.error)
+ // TODO: tear down connection
+ break
+ }
+ imp.chanLock.Lock()
+ ich, ok := imp.chans[resp.name]
+ imp.chanLock.Unlock()
+ if !ok {
+ log.Stderr("unknown name in request:", resp.name)
+ break
+ }
+ if ich.dir != Recv {
+ log.Stderr("TODO: import send unimplemented")
+ break
+ }
+ // Create a new value for each received item.
+ val := reflect.MakeZero(ich.ptr.Type().(*reflect.PtrType).Elem())
+ ich.ptr.PointTo(val)
+ if err := imp.decode(ich.ptr.Interface()); err != nil {
+ log.Stderr("importer value decode:", err)
+ return
+ }
+ ich.ch.Send(val)
+ }
+}
+
+// Import imports a channel of the given type and specified direction.
+// It is equivalent to ImportNValues with a count of 0, meaning unbounded.
+func (imp *Importer) Import(name string, chT interface{}, dir Dir, pT interface{}) os.Error {
+ return imp.ImportNValues(name, chT, dir, pT, 0)
+}
+
+// ImportNValues imports a channel of the given type and specified direction
+// and then receives or transmits up to n values on that channel. A value of
+// n==0 implies an unbounded number of values. The channel to be bound to
+// the remote site's channel is provided in the call and may be of arbitrary
+// channel type.
+// Despite the literal signature, the effective signature is
+// ImportNValues(name string, chT chan T, dir Dir, pT T)
+// where T must be a struct, pointer to struct, etc. pT may be more indirect
+// than the value type of the channel (e.g. chan T, pT *T) but it must be a
+// pointer.
+// Example usage:
+// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
+// if err != nil { log.Exit(err) }
+// ch := make(chan myType)
+// err := imp.ImportNValues("name", ch, Recv, new(myType), 1)
+// if err != nil { log.Exit(err) }
+// fmt.Printf("%+v\n", <-ch)
+// (TODO: Can we eliminate the need for pT?)
+func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, pT interface{}, n int) os.Error {
+ ch, err := checkChan(chT, dir)
+ if err != nil {
+ return err
+ }
+ // Make sure pT is a pointer (to a pointer...) to a struct.
+ rt := reflect.Typeof(pT)
+ if _, ok := rt.(*reflect.PtrType); !ok {
+ return os.ErrorString("not a pointer:" + rt.String())
+ }
+ if _, ok := reflect.Indirect(reflect.NewValue(pT)).(*reflect.StructValue); !ok {
+ return os.ErrorString("not a pointer to a struct:" + rt.String())
+ }
+ imp.chanLock.Lock()
+ defer imp.chanLock.Unlock()
+ _, present := imp.chans[name]
+ if present {
+ return os.ErrorString("channel name already being imported:" + name)
+ }
+ ptr := reflect.MakeZero(reflect.Typeof(pT)).(*reflect.PtrValue)
+ imp.chans[name] = &importChan{ch, dir, ptr, n}
+ // Tell the other side about this channel.
+ req := new(request)
+ req.name = name
+ req.dir = dir
+ req.count = n
+ if err := imp.encode(req, nil); err != nil {
+ log.Stderr("importer request encode:", err)
+ return err
+ }
+ return nil
+}
--- /dev/null
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package netchan
+
+import (
+ "fmt"
+ "testing"
+)
+
+type value struct {
+ i int
+ s string
+}
+
+func exportSend(exp *Exporter, t *testing.T) {
+ c := make(chan value)
+ err := exp.Export("name", c, Send)
+ if err != nil {
+ t.Fatal("export:", err)
+ }
+ c <- value{23, "hello"}
+}
+
+func importReceive(imp *Importer, t *testing.T) {
+ ch := make(chan value)
+ err := imp.ImportNValues("name", ch, Recv, new(value), 1)
+ if err != nil {
+ t.Fatal("import:", err)
+ }
+ v := <-ch
+ fmt.Printf("%v\n", v)
+ if v.i != 23 || v.s != "hello" {
+ t.Errorf("bad value: expected 23, hello; got %+v\n", v)
+ }
+}
+
+func TestBabyStep(t *testing.T) {
+ exp, err := NewExporter("tcp", ":0")
+ if err != nil {
+ t.Fatal("new exporter:", err)
+ }
+ go exportSend(exp, t)
+ imp, err := NewImporter("tcp", exp.Addr().String())
+ if err != nil {
+ t.Fatal("new importer:", err)
+ }
+ importReceive(imp, t)
+}