// A header is sent as a prefix to every transmission. It will be followed by
// a request structure, an error structure, or an arbitrary user payload structure.
type header struct {
- name string
- payloadType int
- seqNum int64
+ Name string
+ PayloadType int
+ SeqNum int64
}
// Sent with a header once per channel from importer to exporter to report
// that it wants to bind to a channel with the specified direction for count
// messages. If count is -1, it means unlimited.
type request struct {
- count int64
- dir Dir
+ Count int64
+ Dir Dir
}
// Sent with a header to report an error.
type error struct {
- error string
+ Error string
}
// Used to unify management of acknowledgements for import and export.
// Encode a header and payload onto the connection.
func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) os.Error {
ed.encLock.Lock()
- hdr.payloadType = payloadType
+ hdr.PayloadType = payloadType
err := ed.enc.Encode(hdr)
if err == nil {
if payload != nil {
func (client *expClient) sendError(hdr *header, err string) {
error := &error{err}
- expLog("sending error to client:", error.error)
+ expLog("sending error to client:", error.Error)
client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
client.mu.Lock()
client.errored = true
func (client *expClient) getChan(hdr *header, dir Dir) *chanDir {
exp := client.exp
exp.mu.Lock()
- ech, ok := exp.chans[hdr.name]
+ ech, ok := exp.chans[hdr.Name]
exp.mu.Unlock()
if !ok {
- client.sendError(hdr, "no such channel: "+hdr.name)
+ client.sendError(hdr, "no such channel: "+hdr.Name)
return nil
}
if ech.dir != dir {
- client.sendError(hdr, "wrong direction for channel: "+hdr.name)
+ client.sendError(hdr, "wrong direction for channel: "+hdr.Name)
return nil
}
return ech
expLog("error decoding client header:", err)
break
}
- switch hdr.payloadType {
+ switch hdr.PayloadType {
case payRequest:
*req = request{}
if err := client.decode(reqValue); err != nil {
expLog("error decoding client request:", err)
break
}
- switch req.dir {
+ switch req.Dir {
case Recv:
- go client.serveRecv(*hdr, req.count)
+ go client.serveRecv(*hdr, req.Count)
case Send:
// Request to send is clear as a matter of protocol
// but not actually used by the implementation.
// The actual sends will have payload type payData.
// TODO: manage the count?
default:
- error.error = "request: can't handle channel direction"
- expLog(error.error, req.dir)
+ error.Error = "request: can't handle channel direction"
+ expLog(error.Error, req.Dir)
client.encode(hdr, payError, error)
}
case payData:
client.serveClosed(*hdr)
case payAck:
client.mu.Lock()
- if client.ackNum != hdr.seqNum-1 {
+ if client.ackNum != hdr.SeqNum-1 {
// Since the sequence number is incremented and the message is sent
// in a single instance of locking client.mu, the messages are guaranteed
// to be sent in order. Therefore receipt of acknowledgement N means
// all messages <=N have been seen by the recipient. We check anyway.
- expLog("sequence out of order:", client.ackNum, hdr.seqNum)
+ expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
}
- if client.ackNum < hdr.seqNum { // If there has been an error, don't back up the count.
- client.ackNum = hdr.seqNum
+ if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count.
+ client.ackNum = hdr.SeqNum
}
client.mu.Unlock()
default:
- log.Exit("netchan export: unknown payload type", hdr.payloadType)
+ log.Exit("netchan export: unknown payload type", hdr.PayloadType)
}
}
client.exp.delClient(client)
// number, not one beyond.
client.mu.Lock()
client.seqNum++
- hdr.seqNum = client.seqNum
+ hdr.SeqNum = client.seqNum
client.seqLock.Lock() // guarantee ordering of messages
client.mu.Unlock()
err := client.encode(&hdr, payData, val.Interface())
imp.shutdown()
return
}
- switch hdr.payloadType {
+ switch hdr.PayloadType {
case payData:
// done lower in loop
case payError:
impLog("error:", e)
return
}
- if err.error != "" {
- impLog("response error:", err.error)
- if sent := imp.errors <- os.ErrorString(err.error); !sent {
+ if err.Error != "" {
+ impLog("response error:", err.Error)
+ if sent := imp.errors <- os.ErrorString(err.Error); !sent {
imp.shutdown()
return
}
continue // errors are not acknowledged.
}
case payClosed:
- ich := imp.getChan(hdr.name)
+ ich := imp.getChan(hdr.Name)
if ich != nil {
ich.ch.Close()
}
continue // closes are not acknowledged.
default:
- impLog("unexpected payload type:", hdr.payloadType)
+ impLog("unexpected payload type:", hdr.PayloadType)
return
}
- ich := imp.getChan(hdr.name)
+ ich := imp.getChan(hdr.Name)
if ich == nil {
continue
}
return
}
// Acknowledge receipt
- ackHdr.name = hdr.name
- ackHdr.seqNum = hdr.seqNum
+ ackHdr.Name = hdr.Name
+ ackHdr.SeqNum = hdr.SeqNum
imp.encode(ackHdr, payAck, nil)
// Create a new value for each received item.
value := reflect.MakeZero(ich.ch.Type().(*reflect.ChanType).Elem())
}
imp.chans[name] = &chanDir{ch, dir}
// Tell the other side about this channel.
- hdr := &header{name: name}
- req := &request{count: int64(n), dir: dir}
+ hdr := &header{Name: name}
+ req := &request{Count: int64(n), Dir: dir}
if err = imp.encode(hdr, payRequest, req); err != nil {
impLog("request encode:", err)
return err