]> Cypherpunks repositories - gostls13.git/commitdiff
old/netchan: delete as part of move to go.exp/old/netchan
authorRob Pike <r@golang.org>
Sat, 2 Mar 2013 19:45:22 +0000 (11:45 -0800)
committerRob Pike <r@golang.org>
Sat, 2 Mar 2013 19:45:22 +0000 (11:45 -0800)
R=golang-dev, minux.ma
CC=golang-dev
https://golang.org/cl/7450050

src/pkg/old/netchan/common.go [deleted file]
src/pkg/old/netchan/export.go [deleted file]
src/pkg/old/netchan/import.go [deleted file]
src/pkg/old/netchan/netchan_test.go [deleted file]

diff --git a/src/pkg/old/netchan/common.go b/src/pkg/old/netchan/common.go
deleted file mode 100644 (file)
index d0daf53..0000000
+++ /dev/null
@@ -1,338 +0,0 @@
-// Copyright 2010 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package netchan
-
-import (
-       "encoding/gob"
-       "errors"
-       "io"
-       "reflect"
-       "sync"
-       "time"
-)
-
-// The direction of a connection from the client's perspective.
-type Dir int
-
-const (
-       Recv Dir = iota
-       Send
-)
-
-func (dir Dir) String() string {
-       switch dir {
-       case Recv:
-               return "Recv"
-       case Send:
-               return "Send"
-       }
-       return "???"
-}
-
-// Payload types
-const (
-       payRequest = iota // request structure follows
-       payError          // error structure follows
-       payData           // user payload follows
-       payAck            // acknowledgement; no payload
-       payClosed         // channel is now closed
-       payAckSend        // payload has been delivered.
-)
-
-// A header is sent as a prefix to every transmission.  It will be followed by
-// a request structure, an error structure, or an arbitrary user payload structure.
-type header struct {
-       Id          int
-       PayloadType int
-       SeqNum      int64
-}
-
-// Sent with a header once per channel from importer to exporter to report
-// that it wants to bind to a channel with the specified direction for count
-// messages, with space for size buffered values. If count is -1, it means unlimited.
-type request struct {
-       Name  string
-       Count int64
-       Size  int
-       Dir   Dir
-}
-
-// Sent with a header to report an error.
-type error_ struct {
-       Error string
-}
-
-// Used to unify management of acknowledgements for import and export.
-type unackedCounter interface {
-       unackedCount() int64
-       ack() int64
-       seq() int64
-}
-
-// A channel and its direction.
-type chanDir struct {
-       ch  reflect.Value
-       dir Dir
-}
-
-// clientSet contains the objects and methods needed for tracking
-// clients of an exporter and draining outstanding messages.
-type clientSet struct {
-       mu      sync.Mutex // protects access to channel and client maps
-       names   map[string]*chanDir
-       clients map[unackedCounter]bool
-}
-
-// Mutex-protected encoder and decoder pair.
-type encDec struct {
-       decLock sync.Mutex
-       dec     *gob.Decoder
-       encLock sync.Mutex
-       enc     *gob.Encoder
-}
-
-func newEncDec(conn io.ReadWriter) *encDec {
-       return &encDec{
-               dec: gob.NewDecoder(conn),
-               enc: gob.NewEncoder(conn),
-       }
-}
-
-// Decode an item from the connection.
-func (ed *encDec) decode(value reflect.Value) error {
-       ed.decLock.Lock()
-       err := ed.dec.DecodeValue(value)
-       if err != nil {
-               // TODO: tear down connection?
-       }
-       ed.decLock.Unlock()
-       return err
-}
-
-// Encode a header and payload onto the connection.
-func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error {
-       ed.encLock.Lock()
-       hdr.PayloadType = payloadType
-       err := ed.enc.Encode(hdr)
-       if err == nil {
-               if payload != nil {
-                       err = ed.enc.Encode(payload)
-               }
-       }
-       if err != nil {
-               // TODO: tear down connection if there is an error?
-       }
-       ed.encLock.Unlock()
-       return err
-}
-
-// See the comment for Exporter.Drain.
-func (cs *clientSet) drain(timeout time.Duration) error {
-       deadline := time.Now().Add(timeout)
-       for {
-               pending := false
-               cs.mu.Lock()
-               // Any messages waiting for a client?
-               for _, chDir := range cs.names {
-                       if chDir.ch.Len() > 0 {
-                               pending = true
-                       }
-               }
-               // Any unacknowledged messages?
-               for client := range cs.clients {
-                       n := client.unackedCount()
-                       if n > 0 { // Check for > rather than != just to be safe.
-                               pending = true
-                               break
-                       }
-               }
-               cs.mu.Unlock()
-               if !pending {
-                       break
-               }
-               if timeout > 0 && time.Now().After(deadline) {
-                       return errors.New("timeout")
-               }
-               time.Sleep(100 * time.Millisecond)
-       }
-       return nil
-}
-
-// See the comment for Exporter.Sync.
-func (cs *clientSet) sync(timeout time.Duration) error {
-       deadline := time.Now().Add(timeout)
-       // seq remembers the clients and their seqNum at point of entry.
-       seq := make(map[unackedCounter]int64)
-       cs.mu.Lock()
-       for client := range cs.clients {
-               seq[client] = client.seq()
-       }
-       cs.mu.Unlock()
-       for {
-               pending := false
-               cs.mu.Lock()
-               // Any unacknowledged messages?  Look only at clients that existed
-               // when we started and are still in this client set.
-               for client := range seq {
-                       if _, ok := cs.clients[client]; ok {
-                               if client.ack() < seq[client] {
-                                       pending = true
-                                       break
-                               }
-                       }
-               }
-               cs.mu.Unlock()
-               if !pending {
-                       break
-               }
-               if timeout > 0 && time.Now().After(deadline) {
-                       return errors.New("timeout")
-               }
-               time.Sleep(100 * time.Millisecond)
-       }
-       return nil
-}
-
-// A netChan represents a channel imported or exported
-// on a single connection. Flow is controlled by the receiving
-// side by sending payAckSend messages when values
-// are delivered into the local channel.
-type netChan struct {
-       *chanDir
-       name   string
-       id     int
-       size   int // buffer size of channel.
-       closed bool
-
-       // sender-specific state
-       ackCh chan bool // buffered with space for all the acks we need
-       space int       // available space.
-
-       // receiver-specific state
-       sendCh chan reflect.Value // buffered channel of values received from other end.
-       ed     *encDec            // so that we can send acks.
-       count  int64              // number of values still to receive.
-}
-
-// Create a new netChan with the given name (only used for
-// messages), id, direction, buffer size, and count.
-// The connection to the other side is represented by ed.
-func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
-       c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
-       if c.dir == Send {
-               c.ackCh = make(chan bool, size)
-               c.space = size
-       }
-       return c
-}
-
-// Close the channel.
-func (nch *netChan) close() {
-       if nch.closed {
-               return
-       }
-       if nch.dir == Recv {
-               if nch.sendCh != nil {
-                       // If the sender goroutine is active, close the channel to it.
-                       // It will close nch.ch when it can.
-                       close(nch.sendCh)
-               } else {
-                       nch.ch.Close()
-               }
-       } else {
-               nch.ch.Close()
-               close(nch.ackCh)
-       }
-       nch.closed = true
-}
-
-// Send message from remote side to local receiver.
-func (nch *netChan) send(val reflect.Value) {
-       if nch.dir != Recv {
-               panic("send on wrong direction of channel")
-       }
-       if nch.sendCh == nil {
-               // If possible, do local send directly and ack immediately.
-               if nch.ch.TrySend(val) {
-                       nch.sendAck()
-                       return
-               }
-               // Start sender goroutine to manage delayed delivery of values.
-               nch.sendCh = make(chan reflect.Value, nch.size)
-               go nch.sender()
-       }
-       select {
-       case nch.sendCh <- val:
-               // ok
-       default:
-               // TODO: should this be more resilient?
-               panic("netchan: remote sender sent more values than allowed")
-       }
-}
-
-// sendAck sends an acknowledgment that a message has left
-// the channel's buffer. If the messages remaining to be sent
-// will fit in the channel's buffer, then we don't
-// need to send an ack.
-func (nch *netChan) sendAck() {
-       if nch.count < 0 || nch.count > int64(nch.size) {
-               nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
-       }
-       if nch.count > 0 {
-               nch.count--
-       }
-}
-
-// The sender process forwards items from the sending queue
-// to the destination channel, acknowledging each item.
-func (nch *netChan) sender() {
-       if nch.dir != Recv {
-               panic("sender on wrong direction of channel")
-       }
-       // When Exporter.Hangup is called, the underlying channel is closed,
-       // and so we may get a "too many operations on closed channel" error
-       // if there are outstanding messages in sendCh.
-       // Make sure that this doesn't panic the whole program.
-       defer func() {
-               if r := recover(); r != nil {
-                       // TODO check that r is "too many operations", otherwise re-panic.
-               }
-       }()
-       for v := range nch.sendCh {
-               nch.ch.Send(v)
-               nch.sendAck()
-       }
-       nch.ch.Close()
-}
-
-// Receive value from local side for sending to remote side.
-func (nch *netChan) recv() (val reflect.Value, ok bool) {
-       if nch.dir != Send {
-               panic("recv on wrong direction of channel")
-       }
-
-       if nch.space == 0 {
-               // Wait for buffer space.
-               <-nch.ackCh
-               nch.space++
-       }
-       nch.space--
-       return nch.ch.Recv()
-}
-
-// acked is called when the remote side indicates that
-// a value has been delivered.
-func (nch *netChan) acked() {
-       if nch.dir != Send {
-               panic("recv on wrong direction of channel")
-       }
-       select {
-       case nch.ackCh <- true:
-               // ok
-       default:
-               // TODO: should this be more resilient?
-               panic("netchan: remote receiver sent too many acks")
-       }
-}
diff --git a/src/pkg/old/netchan/export.go b/src/pkg/old/netchan/export.go
deleted file mode 100644 (file)
index a65b260..0000000
+++ /dev/null
@@ -1,400 +0,0 @@
-// Copyright 2010 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-/*
-       Package netchan implements type-safe networked channels:
-       it allows the two ends of a channel to appear on different
-       computers connected by a network.  It does this by transporting
-       data sent to a channel on one machine so it can be recovered
-       by a receive of a channel of the same type on the other.
-
-       An exporter publishes a set of channels by name.  An importer
-       connects to the exporting machine and imports the channels
-       by name. After importing the channels, the two machines can
-       use the channels in the usual way.
-
-       Networked channels are not synchronized; they always behave
-       as if they are buffered channels of at least one element.
-*/
-package netchan
-
-// BUG: can't use range clause to receive when using ImportNValues to limit the count.
-
-import (
-       "errors"
-       "io"
-       "log"
-       "net"
-       "reflect"
-       "strconv"
-       "sync"
-       "time"
-)
-
-// Export
-
-// expLog is a logging convenience function.  The first argument must be a string.
-func expLog(args ...interface{}) {
-       args[0] = "netchan export: " + args[0].(string)
-       log.Print(args...)
-}
-
-// An Exporter allows a set of channels to be published on a single
-// network port.  A single machine may have multiple Exporters
-// but they must use different ports.
-type Exporter struct {
-       *clientSet
-}
-
-type expClient struct {
-       *encDec
-       exp     *Exporter
-       chans   map[int]*netChan // channels in use by client
-       mu      sync.Mutex       // protects remaining fields
-       errored bool             // client has been sent an error
-       seqNum  int64            // sequences messages sent to client; has value of highest sent
-       ackNum  int64            // highest sequence number acknowledged
-       seqLock sync.Mutex       // guarantees messages are in sequence, only locked under mu
-}
-
-func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
-       client := new(expClient)
-       client.exp = exp
-       client.encDec = newEncDec(conn)
-       client.seqNum = 0
-       client.ackNum = 0
-       client.chans = make(map[int]*netChan)
-       return client
-}
-
-func (client *expClient) sendError(hdr *header, err string) {
-       error := &error_{err}
-       expLog("sending error to client:", error.Error)
-       client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
-       client.mu.Lock()
-       client.errored = true
-       client.mu.Unlock()
-}
-
-func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
-       exp := client.exp
-       exp.mu.Lock()
-       ech, ok := exp.names[name]
-       exp.mu.Unlock()
-       if !ok {
-               client.sendError(hdr, "no such channel: "+name)
-               return nil
-       }
-       if ech.dir != dir {
-               client.sendError(hdr, "wrong direction for channel: "+name)
-               return nil
-       }
-       nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
-       client.chans[hdr.Id] = nch
-       return nch
-}
-
-func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
-       nch := client.chans[hdr.Id]
-       if nch == nil {
-               return nil
-       }
-       if nch.dir != dir {
-               client.sendError(hdr, "wrong direction for channel: "+nch.name)
-       }
-       return nch
-}
-
-// The function run manages sends and receives for a single client.  For each
-// (client Recv) request, this will launch a serveRecv goroutine to deliver
-// the data for that channel, while (client Send) requests are handled as
-// data arrives from the client.
-func (client *expClient) run() {
-       hdr := new(header)
-       hdrValue := reflect.ValueOf(hdr)
-       req := new(request)
-       reqValue := reflect.ValueOf(req)
-       error := new(error_)
-       for {
-               *hdr = header{}
-               if err := client.decode(hdrValue); err != nil {
-                       if err != io.EOF {
-                               expLog("error decoding client header:", err)
-                       }
-                       break
-               }
-               switch hdr.PayloadType {
-               case payRequest:
-                       *req = request{}
-                       if err := client.decode(reqValue); err != nil {
-                               expLog("error decoding client request:", err)
-                               break
-                       }
-                       if req.Size < 1 {
-                               panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
-                       }
-                       switch req.Dir {
-                       case Recv:
-                               // look up channel before calling serveRecv to
-                               // avoid a lock around client.chans.
-                               if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
-                                       go client.serveRecv(nch, *hdr, req.Count)
-                               }
-                       case Send:
-                               client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
-                               // The actual sends will have payload type payData.
-                               // TODO: manage the count?
-                       default:
-                               error.Error = "request: can't handle channel direction"
-                               expLog(error.Error, req.Dir)
-                               client.encode(hdr, payError, error)
-                       }
-               case payData:
-                       client.serveSend(*hdr)
-               case payClosed:
-                       client.serveClosed(*hdr)
-               case payAck:
-                       client.mu.Lock()
-                       if client.ackNum != hdr.SeqNum-1 {
-                               // Since the sequence number is incremented and the message is sent
-                               // in a single instance of locking client.mu, the messages are guaranteed
-                               // to be sent in order.  Therefore receipt of acknowledgement N means
-                               // all messages <=N have been seen by the recipient.  We check anyway.
-                               expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
-                       }
-                       if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count.
-                               client.ackNum = hdr.SeqNum
-                       }
-                       client.mu.Unlock()
-               case payAckSend:
-                       if nch := client.getChan(hdr, Send); nch != nil {
-                               nch.acked()
-                       }
-               default:
-                       log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
-               }
-       }
-       client.exp.delClient(client)
-}
-
-// Send all the data on a single channel to a client asking for a Recv.
-// The header is passed by value to avoid issues of overwriting.
-func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
-       for {
-               val, ok := nch.recv()
-               if !ok {
-                       if err := client.encode(&hdr, payClosed, nil); err != nil {
-                               expLog("error encoding server closed message:", err)
-                       }
-                       break
-               }
-               // We hold the lock during transmission to guarantee messages are
-               // sent in sequence number order.  Also, we increment first so the
-               // value of client.SeqNum is the value of the highest used sequence
-               // number, not one beyond.
-               client.mu.Lock()
-               client.seqNum++
-               hdr.SeqNum = client.seqNum
-               client.seqLock.Lock() // guarantee ordering of messages
-               client.mu.Unlock()
-               err := client.encode(&hdr, payData, val.Interface())
-               client.seqLock.Unlock()
-               if err != nil {
-                       expLog("error encoding client response:", err)
-                       client.sendError(&hdr, err.Error())
-                       break
-               }
-               // Negative count means run forever.
-               if count >= 0 {
-                       if count--; count <= 0 {
-                               break
-                       }
-               }
-       }
-}
-
-// Receive and deliver locally one item from a client asking for a Send
-// The header is passed by value to avoid issues of overwriting.
-func (client *expClient) serveSend(hdr header) {
-       nch := client.getChan(&hdr, Recv)
-       if nch == nil {
-               return
-       }
-       // Create a new value for each received item.
-       val := reflect.New(nch.ch.Type().Elem()).Elem()
-       if err := client.decode(val); err != nil {
-               expLog("value decode:", err, "; type ", nch.ch.Type())
-               return
-       }
-       nch.send(val)
-}
-
-// Report that client has closed the channel that is sending to us.
-// The header is passed by value to avoid issues of overwriting.
-func (client *expClient) serveClosed(hdr header) {
-       nch := client.getChan(&hdr, Recv)
-       if nch == nil {
-               return
-       }
-       nch.close()
-}
-
-func (client *expClient) unackedCount() int64 {
-       client.mu.Lock()
-       n := client.seqNum - client.ackNum
-       client.mu.Unlock()
-       return n
-}
-
-func (client *expClient) seq() int64 {
-       client.mu.Lock()
-       n := client.seqNum
-       client.mu.Unlock()
-       return n
-}
-
-func (client *expClient) ack() int64 {
-       client.mu.Lock()
-       n := client.seqNum
-       client.mu.Unlock()
-       return n
-}
-
-// Serve waits for incoming connections on the listener
-// and serves the Exporter's channels on each.
-// It blocks until the listener is closed.
-func (exp *Exporter) Serve(listener net.Listener) {
-       for {
-               conn, err := listener.Accept()
-               if err != nil {
-                       expLog("listen:", err)
-                       break
-               }
-               go exp.ServeConn(conn)
-       }
-}
-
-// ServeConn exports the Exporter's channels on conn.
-// It blocks until the connection is terminated.
-func (exp *Exporter) ServeConn(conn io.ReadWriter) {
-       exp.addClient(conn).run()
-}
-
-// NewExporter creates a new Exporter that exports a set of channels.
-func NewExporter() *Exporter {
-       e := &Exporter{
-               clientSet: &clientSet{
-                       names:   make(map[string]*chanDir),
-                       clients: make(map[unackedCounter]bool),
-               },
-       }
-       return e
-}
-
-// ListenAndServe exports the exporter's channels through the
-// given network and local address defined as in net.Listen.
-func (exp *Exporter) ListenAndServe(network, localaddr string) error {
-       listener, err := net.Listen(network, localaddr)
-       if err != nil {
-               return err
-       }
-       go exp.Serve(listener)
-       return nil
-}
-
-// addClient creates a new expClient and records its existence
-func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
-       client := newClient(exp, conn)
-       exp.mu.Lock()
-       exp.clients[client] = true
-       exp.mu.Unlock()
-       return client
-}
-
-// delClient forgets the client existed
-func (exp *Exporter) delClient(client *expClient) {
-       exp.mu.Lock()
-       delete(exp.clients, client)
-       exp.mu.Unlock()
-}
-
-// Drain waits until all messages sent from this exporter/importer, including
-// those not yet sent to any client and possibly including those sent while
-// Drain was executing, have been received by the importer.  In short, it
-// waits until all the exporter's messages have been received by a client.
-// If the timeout is positive and Drain takes longer than that to complete,
-// an error is returned.
-func (exp *Exporter) Drain(timeout time.Duration) error {
-       // This wrapper function is here so the method's comment will appear in godoc.
-       return exp.clientSet.drain(timeout)
-}
-
-// Sync waits until all clients of the exporter have received the messages
-// that were sent at the time Sync was invoked.  Unlike Drain, it does not
-// wait for messages sent while it is running or messages that have not been
-// dispatched to any client.  If the timeout is positive and Sync takes longer
-// than that to complete, an error is returned.
-func (exp *Exporter) Sync(timeout time.Duration) error {
-       // This wrapper function is here so the method's comment will appear in godoc.
-       return exp.clientSet.sync(timeout)
-}
-
-func checkChan(chT interface{}, dir Dir) (reflect.Value, error) {
-       chanType := reflect.TypeOf(chT)
-       if chanType.Kind() != reflect.Chan {
-               return reflect.Value{}, errors.New("not a channel")
-       }
-       if dir != Send && dir != Recv {
-               return reflect.Value{}, errors.New("unknown channel direction")
-       }
-       switch chanType.ChanDir() {
-       case reflect.BothDir:
-       case reflect.SendDir:
-               if dir != Recv {
-                       return reflect.Value{}, errors.New("to import/export with Send, must provide <-chan")
-               }
-       case reflect.RecvDir:
-               if dir != Send {
-                       return reflect.Value{}, errors.New("to import/export with Recv, must provide chan<-")
-               }
-       }
-       return reflect.ValueOf(chT), nil
-}
-
-// Export exports a channel of a given type and specified direction.  The
-// channel to be exported is provided in the call and may be of arbitrary
-// channel type.
-// Despite the literal signature, the effective signature is
-//     Export(name string, chT chan T, dir Dir)
-func (exp *Exporter) Export(name string, chT interface{}, dir Dir) error {
-       ch, err := checkChan(chT, dir)
-       if err != nil {
-               return err
-       }
-       exp.mu.Lock()
-       defer exp.mu.Unlock()
-       _, present := exp.names[name]
-       if present {
-               return errors.New("channel name already being exported:" + name)
-       }
-       exp.names[name] = &chanDir{ch, dir}
-       return nil
-}
-
-// Hangup disassociates the named channel from the Exporter and closes
-// the channel.  Messages in flight for the channel may be dropped.
-func (exp *Exporter) Hangup(name string) error {
-       exp.mu.Lock()
-       chDir, ok := exp.names[name]
-       if ok {
-               delete(exp.names, name)
-       }
-       // TODO drop all instances of channel from client sets
-       exp.mu.Unlock()
-       if !ok {
-               return errors.New("netchan export: hangup: no such channel: " + name)
-       }
-       chDir.ch.Close()
-       return nil
-}
diff --git a/src/pkg/old/netchan/import.go b/src/pkg/old/netchan/import.go
deleted file mode 100644 (file)
index 50abaa9..0000000
+++ /dev/null
@@ -1,287 +0,0 @@
-// Copyright 2010 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package netchan
-
-import (
-       "errors"
-       "io"
-       "log"
-       "net"
-       "reflect"
-       "sync"
-       "time"
-)
-
-// Import
-
-// impLog is a logging convenience function.  The first argument must be a string.
-func impLog(args ...interface{}) {
-       args[0] = "netchan import: " + args[0].(string)
-       log.Print(args...)
-}
-
-// An Importer allows a set of channels to be imported from a single
-// remote machine/network port.  A machine may have multiple
-// importers, even from the same machine/network port.
-type Importer struct {
-       *encDec
-       chanLock sync.Mutex // protects access to channel map
-       names    map[string]*netChan
-       chans    map[int]*netChan
-       errors   chan error
-       maxId    int
-       mu       sync.Mutex // protects remaining fields
-       unacked  int64      // number of unacknowledged sends.
-       seqLock  sync.Mutex // guarantees messages are in sequence, only locked under mu
-}
-
-// NewImporter creates a new Importer object to import a set of channels
-// from the given connection. The Exporter must be available and serving when
-// the Importer is created.
-func NewImporter(conn io.ReadWriter) *Importer {
-       imp := new(Importer)
-       imp.encDec = newEncDec(conn)
-       imp.chans = make(map[int]*netChan)
-       imp.names = make(map[string]*netChan)
-       imp.errors = make(chan error, 10)
-       imp.unacked = 0
-       go imp.run()
-       return imp
-}
-
-// Import imports a set of channels from the given network and address.
-func Import(network, remoteaddr string) (*Importer, error) {
-       conn, err := net.Dial(network, remoteaddr)
-       if err != nil {
-               return nil, err
-       }
-       return NewImporter(conn), nil
-}
-
-// shutdown closes all channels for which we are receiving data from the remote side.
-func (imp *Importer) shutdown() {
-       imp.chanLock.Lock()
-       for _, ich := range imp.chans {
-               if ich.dir == Recv {
-                       ich.close()
-               }
-       }
-       imp.chanLock.Unlock()
-}
-
-// Handle the data from a single imported data stream, which will
-// have the form
-//     (response, data)*
-// The response identifies by name which channel is transmitting data.
-func (imp *Importer) run() {
-       // Loop on responses; requests are sent by ImportNValues()
-       hdr := new(header)
-       hdrValue := reflect.ValueOf(hdr)
-       ackHdr := new(header)
-       err := new(error_)
-       errValue := reflect.ValueOf(err)
-       for {
-               *hdr = header{}
-               if e := imp.decode(hdrValue); e != nil {
-                       if e != io.EOF {
-                               impLog("header:", e)
-                               imp.shutdown()
-                       }
-                       return
-               }
-               switch hdr.PayloadType {
-               case payData:
-                       // done lower in loop
-               case payError:
-                       if e := imp.decode(errValue); e != nil {
-                               impLog("error:", e)
-                               return
-                       }
-                       if err.Error != "" {
-                               impLog("response error:", err.Error)
-                               select {
-                               case imp.errors <- errors.New(err.Error):
-                                       continue // errors are not acknowledged
-                               default:
-                                       imp.shutdown()
-                                       return
-                               }
-                       }
-               case payClosed:
-                       nch := imp.getChan(hdr.Id, false)
-                       if nch != nil {
-                               nch.close()
-                       }
-                       continue // closes are not acknowledged.
-               case payAckSend:
-                       // we can receive spurious acks if the channel is
-                       // hung up, so we ask getChan to ignore any errors.
-                       nch := imp.getChan(hdr.Id, true)
-                       if nch != nil {
-                               nch.acked()
-                               imp.mu.Lock()
-                               imp.unacked--
-                               imp.mu.Unlock()
-                       }
-                       continue
-               default:
-                       impLog("unexpected payload type:", hdr.PayloadType)
-                       return
-               }
-               nch := imp.getChan(hdr.Id, false)
-               if nch == nil {
-                       continue
-               }
-               if nch.dir != Recv {
-                       impLog("cannot happen: receive from non-Recv channel")
-                       return
-               }
-               // Acknowledge receipt
-               ackHdr.Id = hdr.Id
-               ackHdr.SeqNum = hdr.SeqNum
-               imp.encode(ackHdr, payAck, nil)
-               // Create a new value for each received item.
-               value := reflect.New(nch.ch.Type().Elem()).Elem()
-               if e := imp.decode(value); e != nil {
-                       impLog("importer value decode:", e)
-                       return
-               }
-               nch.send(value)
-       }
-}
-
-func (imp *Importer) getChan(id int, errOk bool) *netChan {
-       imp.chanLock.Lock()
-       ich := imp.chans[id]
-       imp.chanLock.Unlock()
-       if ich == nil {
-               if !errOk {
-                       impLog("unknown id in netchan request: ", id)
-               }
-               return nil
-       }
-       return ich
-}
-
-// Errors returns a channel from which transmission and protocol errors
-// can be read. Clients of the importer are not required to read the error
-// channel for correct execution. However, if too many errors occur
-// without being read from the error channel, the importer will shut down.
-func (imp *Importer) Errors() chan error {
-       return imp.errors
-}
-
-// Import imports a channel of the given type, size and specified direction.
-// It is equivalent to ImportNValues with a count of -1, meaning unbounded.
-func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) error {
-       return imp.ImportNValues(name, chT, dir, size, -1)
-}
-
-// ImportNValues imports a channel of the given type and specified
-// direction and then receives or transmits up to n values on that
-// channel.  A value of n==-1 implies an unbounded number of values.  The
-// channel will have buffer space for size values, or 1 value if size < 1.
-// The channel to be bound to the remote site's channel is provided
-// in the call and may be of arbitrary channel type.
-// Despite the literal signature, the effective signature is
-//     ImportNValues(name string, chT chan T, dir Dir, size, n int) error
-// Example usage:
-//     imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
-//     if err != nil { log.Fatal(err) }
-//     ch := make(chan myType)
-//     err = imp.ImportNValues("name", ch, Recv, 1, 1)
-//     if err != nil { log.Fatal(err) }
-//     fmt.Printf("%+v\n", <-ch)
-func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) error {
-       ch, err := checkChan(chT, dir)
-       if err != nil {
-               return err
-       }
-       imp.chanLock.Lock()
-       defer imp.chanLock.Unlock()
-       _, present := imp.names[name]
-       if present {
-               return errors.New("channel name already being imported:" + name)
-       }
-       if size < 1 {
-               size = 1
-       }
-       id := imp.maxId
-       imp.maxId++
-       nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))
-       imp.names[name] = nch
-       imp.chans[id] = nch
-       // Tell the other side about this channel.
-       hdr := &header{Id: id}
-       req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}
-       if err = imp.encode(hdr, payRequest, req); err != nil {
-               impLog("request encode:", err)
-               return err
-       }
-       if dir == Send {
-               go func() {
-                       for i := 0; n == -1 || i < n; i++ {
-                               val, ok := nch.recv()
-                               if !ok {
-                                       if err = imp.encode(hdr, payClosed, nil); err != nil {
-                                               impLog("error encoding client closed message:", err)
-                                       }
-                                       return
-                               }
-                               // We hold the lock during transmission to guarantee messages are
-                               // sent in order.
-                               imp.mu.Lock()
-                               imp.unacked++
-                               imp.seqLock.Lock()
-                               imp.mu.Unlock()
-                               if err = imp.encode(hdr, payData, val.Interface()); err != nil {
-                                       impLog("error encoding client send:", err)
-                                       return
-                               }
-                               imp.seqLock.Unlock()
-                       }
-               }()
-       }
-       return nil
-}
-
-// Hangup disassociates the named channel from the Importer and closes
-// the channel.  Messages in flight for the channel may be dropped.
-func (imp *Importer) Hangup(name string) error {
-       imp.chanLock.Lock()
-       defer imp.chanLock.Unlock()
-       nc := imp.names[name]
-       if nc == nil {
-               return errors.New("netchan import: hangup: no such channel: " + name)
-       }
-       delete(imp.names, name)
-       delete(imp.chans, nc.id)
-       nc.close()
-       return nil
-}
-
-func (imp *Importer) unackedCount() int64 {
-       imp.mu.Lock()
-       n := imp.unacked
-       imp.mu.Unlock()
-       return n
-}
-
-// Drain waits until all messages sent from this exporter/importer, including
-// those not yet sent to any server and possibly including those sent while
-// Drain was executing, have been received by the exporter.  In short, it
-// waits until all the importer's messages have been received.
-// If the timeout (measured in nanoseconds) is positive and Drain takes
-// longer than that to complete, an error is returned.
-func (imp *Importer) Drain(timeout int64) error {
-       deadline := time.Now().Add(time.Duration(timeout))
-       for imp.unackedCount() > 0 {
-               if timeout > 0 && time.Now().After(deadline) {
-                       return errors.New("timeout")
-               }
-               time.Sleep(100 * time.Millisecond)
-       }
-       return nil
-}
diff --git a/src/pkg/old/netchan/netchan_test.go b/src/pkg/old/netchan/netchan_test.go
deleted file mode 100644 (file)
index 9a7c076..0000000
+++ /dev/null
@@ -1,447 +0,0 @@
-// Copyright 2010 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package netchan
-
-import (
-       "net"
-       "strings"
-       "testing"
-       "time"
-)
-
-const count = 10     // number of items in most tests
-const closeCount = 5 // number of items when sender closes early
-
-const base = 23
-
-func exportSend(exp *Exporter, n int, t *testing.T, done chan bool) {
-       ch := make(chan int)
-       err := exp.Export("exportedSend", ch, Send)
-       if err != nil {
-               t.Fatal("exportSend:", err)
-       }
-       go func() {
-               for i := 0; i < n; i++ {
-                       ch <- base + i
-               }
-               close(ch)
-               if done != nil {
-                       done <- true
-               }
-       }()
-}
-
-func exportReceive(exp *Exporter, t *testing.T, expDone chan bool) {
-       ch := make(chan int)
-       err := exp.Export("exportedRecv", ch, Recv)
-       expDone <- true
-       if err != nil {
-               t.Fatal("exportReceive:", err)
-       }
-       for i := 0; i < count; i++ {
-               v, ok := <-ch
-               if !ok {
-                       if i != closeCount {
-                               t.Errorf("exportReceive expected close at %d; got one at %d", closeCount, i)
-                       }
-                       break
-               }
-               if v != base+i {
-                       t.Errorf("export Receive: bad value: expected %d+%d=%d; got %d", base, i, base+i, v)
-               }
-       }
-}
-
-func importSend(imp *Importer, n int, t *testing.T, done chan bool) {
-       ch := make(chan int)
-       err := imp.ImportNValues("exportedRecv", ch, Send, 3, -1)
-       if err != nil {
-               t.Fatal("importSend:", err)
-       }
-       go func() {
-               for i := 0; i < n; i++ {
-                       ch <- base + i
-               }
-               close(ch)
-               if done != nil {
-                       done <- true
-               }
-       }()
-}
-
-func importReceive(imp *Importer, t *testing.T, done chan bool) {
-       ch := make(chan int)
-       err := imp.ImportNValues("exportedSend", ch, Recv, 3, count)
-       if err != nil {
-               t.Fatal("importReceive:", err)
-       }
-       for i := 0; i < count; i++ {
-               v, ok := <-ch
-               if !ok {
-                       if i != closeCount {
-                               t.Errorf("importReceive expected close at %d; got one at %d", closeCount, i)
-                       }
-                       break
-               }
-               if v != base+i {
-                       t.Errorf("importReceive: bad value: expected %d+%d=%d; got %+d", base, i, base+i, v)
-               }
-       }
-       if done != nil {
-               done <- true
-       }
-}
-
-func TestExportSendImportReceive(t *testing.T) {
-       exp, imp := pair(t)
-       exportSend(exp, count, t, nil)
-       importReceive(imp, t, nil)
-}
-
-func TestExportReceiveImportSend(t *testing.T) {
-       exp, imp := pair(t)
-       expDone := make(chan bool)
-       done := make(chan bool)
-       go func() {
-               exportReceive(exp, t, expDone)
-               done <- true
-       }()
-       <-expDone
-       importSend(imp, count, t, nil)
-       <-done
-}
-
-func TestClosingExportSendImportReceive(t *testing.T) {
-       exp, imp := pair(t)
-       exportSend(exp, closeCount, t, nil)
-       importReceive(imp, t, nil)
-}
-
-func TestClosingImportSendExportReceive(t *testing.T) {
-       exp, imp := pair(t)
-       expDone := make(chan bool)
-       done := make(chan bool)
-       go func() {
-               exportReceive(exp, t, expDone)
-               done <- true
-       }()
-       <-expDone
-       importSend(imp, closeCount, t, nil)
-       <-done
-}
-
-func TestErrorForIllegalChannel(t *testing.T) {
-       exp, imp := pair(t)
-       // Now export a channel.
-       ch := make(chan int, 1)
-       err := exp.Export("aChannel", ch, Send)
-       if err != nil {
-               t.Fatal("export:", err)
-       }
-       ch <- 1234
-       close(ch)
-       // Now try to import a different channel.
-       ch = make(chan int)
-       err = imp.Import("notAChannel", ch, Recv, 1)
-       if err != nil {
-               t.Fatal("import:", err)
-       }
-       // Expect an error now.  Start a timeout.
-       timeout := make(chan bool, 1) // buffered so closure will not hang around.
-       go func() {
-               time.Sleep(10 * time.Second) // very long, to give even really slow machines a chance.
-               timeout <- true
-       }()
-       select {
-       case err = <-imp.Errors():
-               if strings.Index(err.Error(), "no such channel") < 0 {
-                       t.Error("wrong error for nonexistent channel:", err)
-               }
-       case <-timeout:
-               t.Error("import of nonexistent channel did not receive an error")
-       }
-}
-
-// Not a great test but it does at least invoke Drain.
-func TestExportDrain(t *testing.T) {
-       exp, imp := pair(t)
-       done := make(chan bool)
-       go func() {
-               exportSend(exp, closeCount, t, nil)
-               done <- true
-       }()
-       <-done
-       go importReceive(imp, t, done)
-       exp.Drain(0)
-       <-done
-}
-
-// Not a great test but it does at least invoke Drain.
-func TestImportDrain(t *testing.T) {
-       exp, imp := pair(t)
-       expDone := make(chan bool)
-       go exportReceive(exp, t, expDone)
-       <-expDone
-       importSend(imp, closeCount, t, nil)
-       imp.Drain(0)
-}
-
-// Not a great test but it does at least invoke Sync.
-func TestExportSync(t *testing.T) {
-       exp, imp := pair(t)
-       done := make(chan bool)
-       exportSend(exp, closeCount, t, nil)
-       go importReceive(imp, t, done)
-       exp.Sync(0)
-       <-done
-}
-
-// Test hanging up the send side of an export.
-// TODO: test hanging up the receive side of an export.
-func TestExportHangup(t *testing.T) {
-       exp, imp := pair(t)
-       ech := make(chan int)
-       err := exp.Export("exportedSend", ech, Send)
-       if err != nil {
-               t.Fatal("export:", err)
-       }
-       // Prepare to receive two values. We'll actually deliver only one.
-       ich := make(chan int)
-       err = imp.ImportNValues("exportedSend", ich, Recv, 1, 2)
-       if err != nil {
-               t.Fatal("import exportedSend:", err)
-       }
-       // Send one value, receive it.
-       const Value = 1234
-       ech <- Value
-       v := <-ich
-       if v != Value {
-               t.Fatal("expected", Value, "got", v)
-       }
-       // Now hang up the channel.  Importer should see it close.
-       exp.Hangup("exportedSend")
-       v, ok := <-ich
-       if ok {
-               t.Fatal("expected channel to be closed; got value", v)
-       }
-}
-
-// Test hanging up the send side of an import.
-// TODO: test hanging up the receive side of an import.
-func TestImportHangup(t *testing.T) {
-       exp, imp := pair(t)
-       ech := make(chan int)
-       err := exp.Export("exportedRecv", ech, Recv)
-       if err != nil {
-               t.Fatal("export:", err)
-       }
-       // Prepare to Send two values. We'll actually deliver only one.
-       ich := make(chan int)
-       err = imp.ImportNValues("exportedRecv", ich, Send, 1, 2)
-       if err != nil {
-               t.Fatal("import exportedRecv:", err)
-       }
-       // Send one value, receive it.
-       const Value = 1234
-       ich <- Value
-       v := <-ech
-       if v != Value {
-               t.Fatal("expected", Value, "got", v)
-       }
-       // Now hang up the channel.  Exporter should see it close.
-       imp.Hangup("exportedRecv")
-       v, ok := <-ech
-       if ok {
-               t.Fatal("expected channel to be closed; got value", v)
-       }
-}
-
-// loop back exportedRecv to exportedSend,
-// but receive a value from ctlch before starting the loop.
-func exportLoopback(exp *Exporter, t *testing.T) {
-       inch := make(chan int)
-       if err := exp.Export("exportedRecv", inch, Recv); err != nil {
-               t.Fatal("exportRecv")
-       }
-
-       outch := make(chan int)
-       if err := exp.Export("exportedSend", outch, Send); err != nil {
-               t.Fatal("exportSend")
-       }
-
-       ctlch := make(chan int)
-       if err := exp.Export("exportedCtl", ctlch, Recv); err != nil {
-               t.Fatal("exportRecv")
-       }
-
-       go func() {
-               <-ctlch
-               for i := 0; i < count; i++ {
-                       x := <-inch
-                       if x != base+i {
-                               t.Errorf("exportLoopback expected %d; got %d", i, x)
-                       }
-                       outch <- x
-               }
-       }()
-}
-
-// This test checks that channel operations can proceed
-// even when other concurrent operations are blocked.
-func TestIndependentSends(t *testing.T) {
-       if testing.Short() {
-               t.Logf("disabled test during -short")
-               return
-       }
-       exp, imp := pair(t)
-
-       exportLoopback(exp, t)
-
-       importSend(imp, count, t, nil)
-       done := make(chan bool)
-       go importReceive(imp, t, done)
-
-       // wait for export side to try to deliver some values.
-       time.Sleep(250 * time.Millisecond)
-
-       ctlch := make(chan int)
-       if err := imp.ImportNValues("exportedCtl", ctlch, Send, 1, 1); err != nil {
-               t.Fatal("importSend:", err)
-       }
-       ctlch <- 0
-
-       <-done
-}
-
-// This test cross-connects a pair of exporter/importer pairs.
-type value struct {
-       I      int
-       Source string
-}
-
-func TestCrossConnect(t *testing.T) {
-       e1, i1 := pair(t)
-       e2, i2 := pair(t)
-
-       crossExport(e1, e2, t)
-       crossImport(i1, i2, t)
-}
-
-// Export side of cross-traffic.
-func crossExport(e1, e2 *Exporter, t *testing.T) {
-       s := make(chan value)
-       err := e1.Export("exportedSend", s, Send)
-       if err != nil {
-               t.Fatal("exportSend:", err)
-       }
-
-       r := make(chan value)
-       err = e2.Export("exportedReceive", r, Recv)
-       if err != nil {
-               t.Fatal("exportReceive:", err)
-       }
-
-       go crossLoop("export", s, r, t)
-}
-
-// Import side of cross-traffic.
-func crossImport(i1, i2 *Importer, t *testing.T) {
-       s := make(chan value)
-       err := i2.Import("exportedReceive", s, Send, 2)
-       if err != nil {
-               t.Fatal("import of exportedReceive:", err)
-       }
-
-       r := make(chan value)
-       err = i1.Import("exportedSend", r, Recv, 2)
-       if err != nil {
-               t.Fatal("import of exported Send:", err)
-       }
-
-       crossLoop("import", s, r, t)
-}
-
-// Cross-traffic: send and receive 'count' numbers.
-func crossLoop(name string, s, r chan value, t *testing.T) {
-       for si, ri := 0, 0; si < count && ri < count; {
-               select {
-               case s <- value{si, name}:
-                       si++
-               case v := <-r:
-                       if v.I != ri {
-                               t.Errorf("loop: bad value: expected %d, hello; got %+v", ri, v)
-                       }
-                       ri++
-               }
-       }
-}
-
-const flowCount = 100
-
-// test flow control from exporter to importer.
-func TestExportFlowControl(t *testing.T) {
-       if testing.Short() {
-               t.Logf("disabled test during -short")
-               return
-       }
-       exp, imp := pair(t)
-
-       sendDone := make(chan bool, 1)
-       exportSend(exp, flowCount, t, sendDone)
-
-       ch := make(chan int)
-       err := imp.ImportNValues("exportedSend", ch, Recv, 20, -1)
-       if err != nil {
-               t.Fatal("importReceive:", err)
-       }
-
-       testFlow(sendDone, ch, flowCount, t)
-}
-
-// test flow control from importer to exporter.
-func TestImportFlowControl(t *testing.T) {
-       if testing.Short() {
-               t.Logf("disabled test during -short")
-               return
-       }
-       exp, imp := pair(t)
-
-       ch := make(chan int)
-       err := exp.Export("exportedRecv", ch, Recv)
-       if err != nil {
-               t.Fatal("importReceive:", err)
-       }
-
-       sendDone := make(chan bool, 1)
-       importSend(imp, flowCount, t, sendDone)
-       testFlow(sendDone, ch, flowCount, t)
-}
-
-func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) {
-       go func() {
-               time.Sleep(500 * time.Millisecond)
-               sendDone <- false
-       }()
-
-       if <-sendDone {
-               t.Fatal("send did not block")
-       }
-       n := 0
-       for i := range ch {
-               t.Log("after blocking, got value ", i)
-               n++
-       }
-       if n != N {
-               t.Fatalf("expected %d values; got %d", N, n)
-       }
-}
-
-func pair(t *testing.T) (*Exporter, *Importer) {
-       c0, c1 := net.Pipe()
-       exp := NewExporter()
-       go exp.ServeConn(c0)
-       imp := NewImporter(c1)
-       return exp, imp
-}