]> Cypherpunks repositories - gostls13.git/commitdiff
first part of networked channels.
authorRob Pike <r@golang.org>
Wed, 20 Jan 2010 03:12:29 +0000 (14:12 +1100)
committerRob Pike <r@golang.org>
Wed, 20 Jan 2010 03:12:29 +0000 (14:12 +1100)
limitations:
poor error handling
teardown not done
exporter must send, importer must receive
testing is rudimentary at best

R=rsc
CC=golang-dev
https://golang.org/cl/186234

src/pkg/netchan/Makefile [new file with mode: 0644]
src/pkg/netchan/common.go [new file with mode: 0644]
src/pkg/netchan/export.go [new file with mode: 0644]
src/pkg/netchan/import.go [new file with mode: 0644]
src/pkg/netchan/netchan_test.go [new file with mode: 0644]

diff --git a/src/pkg/netchan/Makefile b/src/pkg/netchan/Makefile
new file mode 100644 (file)
index 0000000..a8a5c6a
--- /dev/null
@@ -0,0 +1,13 @@
+# Copyright 2009 The Go Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style
+# license that can be found in the LICENSE file.
+
+include ../../Make.$(GOARCH)
+
+TARG=netchan
+GOFILES=\
+       common.go\
+       export.go\
+       import.go\
+
+include ../../Make.pkg
diff --git a/src/pkg/netchan/common.go b/src/pkg/netchan/common.go
new file mode 100644 (file)
index 0000000..a82bd07
--- /dev/null
@@ -0,0 +1,63 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package netchan
+
+import (
+       "gob"
+       "log"
+       "net"
+       "os"
+       "sync"
+)
+
+type Dir int
+
+const (
+       Recv Dir = iota
+       Send
+)
+
+// Mutex-protected encoder and decoder pair
+
+type encDec struct {
+       decLock sync.Mutex
+       dec     *gob.Decoder
+       encLock sync.Mutex
+       enc     *gob.Encoder
+}
+
+func newEncDec(conn net.Conn) *encDec {
+       return &encDec{
+               dec: gob.NewDecoder(conn),
+               enc: gob.NewEncoder(conn),
+       }
+}
+
+func (ed *encDec) decode(e interface{}) os.Error {
+       ed.decLock.Lock()
+       defer ed.decLock.Unlock()
+       err := ed.dec.Decode(e)
+       if err != nil {
+               log.Stderr("exporter decode:", err)
+               // TODO: tear down connection
+               return err
+       }
+       return nil
+}
+
+func (ed *encDec) encode(e0, e1 interface{}) os.Error {
+       ed.encLock.Lock()
+       defer ed.encLock.Unlock()
+       err := ed.enc.Encode(e0)
+       if err == nil && e1 != nil {
+               err = ed.enc.Encode(e1)
+       }
+       if err != nil {
+               log.Stderr("exporter encode:", err)
+               // TODO: tear down connection?
+               return err
+       }
+       return nil
+}
diff --git a/src/pkg/netchan/export.go b/src/pkg/netchan/export.go
new file mode 100644 (file)
index 0000000..2bbebc9
--- /dev/null
@@ -0,0 +1,211 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+/*
+       The netchan package implements type-safe networked channels:
+       it allows the two ends of a channel to appear on different
+       computers connected by a network.  It does this by transporting
+       data sent to a channel on one machine so it can be recovered
+       by a receive of a channel of the same type on the other.
+
+       An exporter publishes a set of channels by name.  An importer
+       connects to the exporting machine and imports the channels
+       by name. After importing the channels, the two machines can
+       use the channels in the usual way.
+
+       Networked channels are not synchronized; they always behave
+       as if there is a buffer of at least one element between the
+       two machines.
+
+       TODO: at the moment, the exporting machine must send and
+       the importing machine must receive.  This restriction will
+       be lifted soon.
+*/
+package netchan
+
+import (
+       "log"
+       "net"
+       "os"
+       "reflect"
+       "sync"
+)
+
+// Export
+
+// A channel and its associated information: a direction
+type exportChan struct {
+       ch  *reflect.ChanValue
+       dir Dir
+}
+
+// An Exporter allows a set of channels to be published on a single
+// network port.  A single machine may have multiple Exporters
+// but they must use different ports.
+type Exporter struct {
+       listener net.Listener
+       chanLock sync.Mutex // protects access to channel map
+       chans    map[string]*exportChan
+}
+
+type expClient struct {
+       *encDec
+       exp *Exporter
+}
+
+func newClient(exp *Exporter, conn net.Conn) *expClient {
+       client := new(expClient)
+       client.exp = exp
+       client.encDec = newEncDec(conn)
+       return client
+
+}
+
+// TODO: ASSUMES EXPORT MEANS SEND
+
+// Sent once per channel from importer to exporter to report that it's listening to a channel
+type request struct {
+       name  string
+       dir   Dir
+       count int
+}
+
+// Reply to request, sent from exporter to importer on each send.
+type response struct {
+       name  string
+       error string
+}
+
+// Wait for incoming connections, start a new runner for each
+func (exp *Exporter) listen() {
+       for {
+               conn, err := exp.listener.Accept()
+               if err != nil {
+                       log.Stderr("exporter.listen:", err)
+                       break
+               }
+               log.Stderr("accepted call from", conn.RemoteAddr())
+               client := newClient(exp, conn)
+               go client.run()
+       }
+}
+
+// Send a single client all its data.  For each request, this will launch
+// a serveRecv goroutine to deliver the data for that channel.
+func (client *expClient) run() {
+       req := new(request)
+       for {
+               if err := client.decode(req); err != nil {
+                       log.Stderr("error decoding client request:", err)
+                       // TODO: tear down connection
+                       break
+               }
+               log.Stderrf("export request: %+v", req)
+               if req.dir == Recv {
+                       go client.serveRecv(req)
+               } else {
+                       log.Stderr("export request: can't handle channel direction", req.dir)
+                       resp := new(response)
+                       resp.name = req.name
+                       resp.error = "export request: can't handle channel direction"
+                       client.encode(resp, nil)
+                       break
+               }
+       }
+}
+
+// Send all the data on a single channel to a client asking for a Recv
+func (client *expClient) serveRecv(req *request) {
+       exp := client.exp
+       resp := new(response)
+       resp.name = req.name
+       var ok bool
+       exp.chanLock.Lock()
+       ech, ok := exp.chans[req.name]
+       exp.chanLock.Unlock()
+       if !ok {
+               resp.error = "no such channel: " + req.name
+               log.Stderr("export:", resp.error)
+               client.encode(resp, nil) // ignore any encode error, hope client gets it
+               return
+       }
+       for {
+               if ech.dir != Send {
+                       log.Stderr("TODO: recv export unimplemented")
+                       break
+               }
+               val := ech.ch.Recv()
+               if err := client.encode(resp, val.Interface()); err != nil {
+                       log.Stderr("error encoding client response:", err)
+                       break
+               }
+               if req.count > 0 {
+                       req.count--
+                       if req.count == 0 {
+                               break
+                       }
+               }
+       }
+}
+
+// NewExporter creates a new Exporter to export channels
+// on the network and local address defined as in net.Listen.
+func NewExporter(network, localaddr string) (*Exporter, os.Error) {
+       listener, err := net.Listen(network, localaddr)
+       if err != nil {
+               return nil, err
+       }
+       e := &Exporter{
+               listener: listener,
+               chans: make(map[string]*exportChan),
+       }
+       go e.listen()
+       return e, nil
+}
+
+// Addr returns the Exporter's local network address.
+func (exp *Exporter) Addr() net.Addr { return exp.listener.Addr() }
+
+func checkChan(chT interface{}, dir Dir) (*reflect.ChanValue, os.Error) {
+       chanType, ok := reflect.Typeof(chT).(*reflect.ChanType)
+       if !ok {
+               return nil, os.ErrorString("not a channel")
+       }
+       if dir != Send && dir != Recv {
+               return nil, os.ErrorString("unknown channel direction")
+       }
+       switch chanType.Dir() {
+       case reflect.BothDir:
+       case reflect.SendDir:
+               if dir != Recv {
+                       return nil, os.ErrorString("to import/export with Send, must provide <-chan")
+               }
+       case reflect.RecvDir:
+               if dir != Send {
+                       return nil, os.ErrorString("to import/export with Recv, must provide chan<-")
+               }
+       }
+       return reflect.NewValue(chT).(*reflect.ChanValue), nil
+}
+
+// Export exports a channel of a given type and specified direction.  The
+// channel to be exported is provided in the call and may be of arbitrary
+// channel type.
+// Despite the literal signature, the effective signature is
+//     Export(name string, chT chan T, dir Dir)
+// where T must be a struct, pointer to struct, etc.
+func (exp *Exporter) Export(name string, chT interface{}, dir Dir) os.Error {
+       ch, err := checkChan(chT, dir)
+       if err != nil {
+               return err
+       }
+       exp.chanLock.Lock()
+       defer exp.chanLock.Unlock()
+       _, present := exp.chans[name]
+       if present {
+               return os.ErrorString("channel name already being exported:" + name)
+       }
+       exp.chans[name] = &exportChan{ch, dir}
+       return nil
+}
diff --git a/src/pkg/netchan/import.go b/src/pkg/netchan/import.go
new file mode 100644 (file)
index 0000000..263ee44
--- /dev/null
@@ -0,0 +1,149 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package netchan
+
+import (
+       "log"
+       "net"
+       "os"
+       "reflect"
+       "sync"
+)
+
+// Import
+
+// A channel and its associated information: a template value, direction and a count
+type importChan struct {
+       ch    *reflect.ChanValue
+       dir   Dir
+       ptr   *reflect.PtrValue // a pointer value we can point at each new item
+       count int
+}
+
+// An Importer allows a set of channels to be imported from a single
+// remote machine/network port.  A machine may have multiple
+// importers, even from the same machine/network port.
+type Importer struct {
+       *encDec
+       conn     net.Conn
+       chanLock sync.Mutex // protects access to channel map
+       chans    map[string]*importChan
+}
+
+// TODO: ASSUMES IMPORT MEANS RECEIVE
+
+// NewImporter creates a new Importer object to import channels
+// from an Exporter at the network and remote address as defined in net.Dial.
+// The Exporter must be available and serving when the Importer is
+// created.
+func NewImporter(network, remoteaddr string) (*Importer, os.Error) {
+       conn, err := net.Dial(network, "", remoteaddr)
+       if err != nil {
+               return nil, err
+       }
+       imp := new(Importer)
+       imp.encDec = newEncDec(conn)
+       imp.conn = conn
+       imp.chans = make(map[string]*importChan)
+       go imp.run()
+       return imp, nil
+}
+
+// Handle the data from a single imported data stream, which will
+// have the form
+//     (response, data)*
+// The response identifies by name which channel is receiving data.
+// TODO: allow an importer to send.
+func (imp *Importer) run() {
+       // Loop on responses; requests are sent by ImportNValues()
+       resp := new(response)
+       for {
+               if err := imp.decode(resp); err != nil {
+                       log.Stderr("importer response decode:", err)
+                       break
+               }
+               if resp.error != "" {
+                       log.Stderr("importer response error:", resp.error)
+                       // TODO: tear down connection
+                       break
+               }
+               imp.chanLock.Lock()
+               ich, ok := imp.chans[resp.name]
+               imp.chanLock.Unlock()
+               if !ok {
+                       log.Stderr("unknown name in request:", resp.name)
+                       break
+               }
+               if ich.dir != Recv {
+                       log.Stderr("TODO: import send unimplemented")
+                       break
+               }
+               // Create a new value for each received item.
+               val := reflect.MakeZero(ich.ptr.Type().(*reflect.PtrType).Elem())
+               ich.ptr.PointTo(val)
+               if err := imp.decode(ich.ptr.Interface()); err != nil {
+                       log.Stderr("importer value decode:", err)
+                       return
+               }
+               ich.ch.Send(val)
+       }
+}
+
+// Import imports a channel of the given type and specified direction.
+// It is equivalent to ImportNValues with a count of 0, meaning unbounded.
+func (imp *Importer) Import(name string, chT interface{}, dir Dir, pT interface{}) os.Error {
+       return imp.ImportNValues(name, chT, dir, pT, 0)
+}
+
+// ImportNValues imports a channel of the given type and specified direction
+// and then receives or transmits up to n values on that channel.  A value of
+// n==0 implies an unbounded number of values.  The channel to be bound to
+// the remote site's channel is provided in the call and may be of arbitrary
+// channel type.
+// Despite the literal signature, the effective signature is
+//     ImportNValues(name string, chT chan T, dir Dir, pT T)
+// where T must be a struct, pointer to struct, etc.  pT may be more indirect
+// than the value type of the channel (e.g.  chan T, pT *T) but it must be a
+// pointer.
+// Example usage:
+//     imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
+//     if err != nil { log.Exit(err) }
+//     ch := make(chan myType)
+//     err := imp.ImportNValues("name", ch, Recv, new(myType), 1)
+//     if err != nil { log.Exit(err) }
+//     fmt.Printf("%+v\n", <-ch)
+// (TODO: Can we eliminate the need for pT?)
+func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, pT interface{}, n int) os.Error {
+       ch, err := checkChan(chT, dir)
+       if err != nil {
+               return err
+       }
+       // Make sure pT is a pointer (to a pointer...) to a struct.
+       rt := reflect.Typeof(pT)
+       if _, ok := rt.(*reflect.PtrType); !ok {
+               return os.ErrorString("not a pointer:" + rt.String())
+       }
+       if _, ok := reflect.Indirect(reflect.NewValue(pT)).(*reflect.StructValue); !ok {
+               return os.ErrorString("not a pointer to a struct:" + rt.String())
+       }
+       imp.chanLock.Lock()
+       defer imp.chanLock.Unlock()
+       _, present := imp.chans[name]
+       if present {
+               return os.ErrorString("channel name already being imported:" + name)
+       }
+       ptr := reflect.MakeZero(reflect.Typeof(pT)).(*reflect.PtrValue)
+       imp.chans[name] = &importChan{ch, dir, ptr, n}
+       // Tell the other side about this channel.
+       req := new(request)
+       req.name = name
+       req.dir = dir
+       req.count = n
+       if err := imp.encode(req, nil); err != nil {
+               log.Stderr("importer request encode:", err)
+               return err
+       }
+       return nil
+}
diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go
new file mode 100644 (file)
index 0000000..b7fd100
--- /dev/null
@@ -0,0 +1,50 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package netchan
+
+import (
+       "fmt"
+       "testing"
+)
+
+type value struct {
+       i int
+       s string
+}
+
+func exportSend(exp *Exporter, t *testing.T) {
+       c := make(chan value)
+       err := exp.Export("name", c, Send)
+       if err != nil {
+               t.Fatal("export:", err)
+       }
+       c <- value{23, "hello"}
+}
+
+func importReceive(imp *Importer, t *testing.T) {
+       ch := make(chan value)
+       err := imp.ImportNValues("name", ch, Recv, new(value), 1)
+       if err != nil {
+               t.Fatal("import:", err)
+       }
+       v := <-ch
+       fmt.Printf("%v\n", v)
+       if v.i != 23 || v.s != "hello" {
+               t.Errorf("bad value: expected 23, hello; got %+v\n", v)
+       }
+}
+
+func TestBabyStep(t *testing.T) {
+       exp, err := NewExporter("tcp", ":0")
+       if err != nil {
+               t.Fatal("new exporter:", err)
+       }
+       go exportSend(exp, t)
+       imp, err := NewImporter("tcp", exp.Addr().String())
+       if err != nil {
+               t.Fatal("new importer:", err)
+       }
+       importReceive(imp, t)
+}