// There may be multiple outstanding Calls associated
// with a single Client.
type Client struct {
- mutex sync.Mutex // protects pending, seq
+ mutex sync.Mutex // protects pending, seq, request
sending sync.Mutex
+ request Request
seq uint64
codec ClientCodec
pending map[uint64]*Call
client.mutex.Unlock()
// Encode and send the request.
- request := new(Request)
client.sending.Lock()
defer client.sending.Unlock()
- request.Seq = c.seq
- request.ServiceMethod = c.ServiceMethod
- if err := client.codec.WriteRequest(request, c.Args); err != nil {
+ client.request.Seq = c.seq
+ client.request.ServiceMethod = c.ServiceMethod
+ if err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
panic("rpc: client encode error: " + err.String())
}
}
func (client *Client) input() {
var err os.Error
+ var response Response
for err == nil {
- response := new(Response)
- err = client.codec.ReadResponseHeader(response)
+ response = Response{}
+ err = client.codec.ReadResponseHeader(&response)
if err != nil {
if err == os.EOF && !client.closing {
err = io.ErrUnexpectedEOF
if client.shutdown {
return ErrShutdown
}
- call := <-client.Go(serviceMethod, args, reply, nil).Done
+ call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct {
- ServiceMethod string // format: "Service.Method"
- Seq uint64 // sequence number chosen by client
+ ServiceMethod string // format: "Service.Method"
+ Seq uint64 // sequence number chosen by client
+ next *Request // for free list in Server
}
// Response is a header written before every RPC return. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Response struct {
- ServiceMethod string // echoes that of the Request
- Seq uint64 // echoes that of the request
- Error string // error, if any.
+ ServiceMethod string // echoes that of the Request
+ Seq uint64 // echoes that of the request
+ Error string // error, if any.
+ next *Response // for free list in Server
}
// Server represents an RPC Server.
type Server struct {
sync.Mutex // protects the serviceMap
serviceMap map[string]*service
+ reqLock sync.Mutex // protects freeReq
+ freeReq *Request
+ respLock sync.Mutex // protects freeResp
+ freeResp *Response
}
// NewServer returns a new Server.
return v
}
-func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
- resp := new(Response)
+func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
+ resp := server.getResponse()
// Encode the response header
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
log.Println("rpc: writing response:", err)
}
sending.Unlock()
+ server.freeResponse(resp)
}
func (m *methodType) NumCalls() (n uint) {
return n
}
-func (s *service) call(sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
+func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
if errInter != nil {
errmsg = errInter.(os.Error).String()
}
- sendResponse(sending, req, replyv.Interface(), codec, errmsg)
+ server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
+ server.freeRequest(req)
}
type gobServerCodec struct {
// send a response if we actually managed to read a header.
if req != nil {
- sendResponse(sending, req, invalidRequest, codec, err.String())
+ server.sendResponse(sending, req, invalidRequest, codec, err.String())
+ server.freeRequest(req)
}
continue
}
}
break
}
- sendResponse(sending, req, replyv.Interface(), codec, err.String())
+ server.sendResponse(sending, req, replyv.Interface(), codec, err.String())
continue
}
- go service.call(sending, mtype, req, argv, replyv, codec)
+ go service.call(server, sending, mtype, req, argv, replyv, codec)
}
codec.Close()
}
+
+func (server *Server) getRequest() *Request {
+ server.reqLock.Lock()
+ req := server.freeReq
+ if req == nil {
+ req = new(Request)
+ } else {
+ server.freeReq = req.next
+ *req = Request{}
+ }
+ server.reqLock.Unlock()
+ return req
+}
+
+func (server *Server) freeRequest(req *Request) {
+ server.reqLock.Lock()
+ req.next = server.freeReq
+ server.freeReq = req
+ server.reqLock.Unlock()
+}
+
+func (server *Server) getResponse() *Response {
+ server.respLock.Lock()
+ resp := server.freeResp
+ if resp == nil {
+ resp = new(Response)
+ } else {
+ server.freeResp = resp.next
+ *resp = Response{}
+ }
+ server.respLock.Unlock()
+ return resp
+}
+
+func (server *Server) freeResponse(resp *Response) {
+ server.respLock.Lock()
+ resp.next = server.freeResp
+ server.freeResp = resp
+ server.respLock.Unlock()
+}
+
func (server *Server) readRequest(codec ServerCodec) (req *Request, service *service, mtype *methodType, err os.Error) {
// Grab the request header.
- req = new(Request)
+ req = server.getRequest()
err = codec.ReadRequestHeader(req)
if err != nil {
req = nil