From 5b1d52e81dd10e91d83e5bcf7986328a4ac7f69e Mon Sep 17 00:00:00 2001 From: Rob Pike Date: Mon, 27 Jul 2009 17:25:41 -0700 Subject: [PATCH] document rpc. R=rsc DELTA=160 (124 added, 0 deleted, 36 changed) OCL=32233 CL=32256 --- src/pkg/rpc/client.go | 25 +++--- src/pkg/rpc/server.go | 163 ++++++++++++++++++++++++++++++++----- src/pkg/rpc/server_test.go | 4 +- 3 files changed, 158 insertions(+), 34 deletions(-) diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go index a76f2b65a9..92c283e6b8 100644 --- a/src/pkg/rpc/client.go +++ b/src/pkg/rpc/client.go @@ -17,7 +17,7 @@ import ( "sync"; ) -// Call represents an active RPC +// Call represents an active RPC. type Call struct { ServiceMethod string; // The name of the service and method to call. Args interface{}; // The argument to the function (*struct). @@ -28,8 +28,10 @@ type Call struct { } // Client represents an RPC Client. +// There may be multiple outstanding Calls associated +// with a single Client. type Client struct { - sync.Mutex; // protects pending, seq + mutex sync.Mutex; // protects pending, seq shutdown os.Error; // non-nil if the client is shut down sending sync.Mutex; seq uint64; @@ -41,17 +43,17 @@ type Client struct { func (client *Client) send(c *Call) { // Register this call. - client.Lock(); + client.mutex.Lock(); if client.shutdown != nil { c.Error = client.shutdown; - client.Unlock(); + client.mutex.Unlock(); doNotBlock := c.Done <- c; return; } c.seq = client.seq; client.seq++; client.pending[c.seq] = c; - client.Unlock(); + client.mutex.Unlock(); // Encode and send the request. request := new(Request); @@ -78,10 +80,10 @@ func (client *Client) input() { break } seq := response.Seq; - client.Lock(); + client.mutex.Lock(); c := client.pending[seq]; client.pending[seq] = c, false; - client.Unlock(); + client.mutex.Unlock(); err = client.dec.Decode(c.Reply); c.Error = os.ErrorString(response.Error); // We don't want to block here. It is the caller's responsibility to make @@ -89,13 +91,13 @@ func (client *Client) input() { doNotBlock := c.Done <- c; } // Terminate pending calls. - client.Lock(); + client.mutex.Lock(); client.shutdown = err; for seq, call := range client.pending { call.Error = err; doNotBlock := call.Done <- call; } - client.Unlock(); + client.mutex.Unlock(); log.Stderr("client protocol error:", err); } @@ -111,7 +113,7 @@ func NewClient(conn io.ReadWriteCloser) *Client { return client; } -// Dial connects to an HTTP RPC server at the specified network address. +// DialHTTP connects to an HTTP RPC server at the specified network address. func DialHTTP(network, address string) (*Client, os.Error) { conn, err := net.Dial(network, "", address); if err != nil { @@ -142,7 +144,8 @@ func Dial(network, address string) (*Client, os.Error) { } // Go invokes the function asynchronously. It returns the Call structure representing -// the invocation. +// the invocation. The done channel will signal when the call is complete by returning +// the same Call object. If done is nil, Go will allocate a new channel. func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { c := new(Call); c.ServiceMethod = serviceMethod; diff --git a/src/pkg/rpc/server.go b/src/pkg/rpc/server.go index 78458e40bd..2aa775d889 100644 --- a/src/pkg/rpc/server.go +++ b/src/pkg/rpc/server.go @@ -2,6 +2,109 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +/* + The rpc package provides access to the public methods of an object across a + network or other I/O connection. A server registers an object, making it visible + as a service with the name of the type of the object. After registration, public + methods of the object will be accessible remotely. A server may register multiple + objects (services) of different types but it is an error to register multiple + objects of the same type. + + Only methods that satisfy these criteria will be made available for remote access; + other methods will be ignored: + + - the method name is publicly visible, that is, begins with an upper case letter. + - the method has two arguments, both pointers to publicly visible structs. + - the method has return type os.Error. + + The method's first argument represents the arguments provided by the caller; the + second argument represents the result parameters to be returned to the caller. + The method's return value, if non-nil, is passed back as a string that the client + sees as an os.ErrorString. + + The server may handle requests on a single connection by calling ServeConn. More + typically it will create a network listener and call Accept or, for an HTTP + listener, HandleHTTP and http.Serve. + + A client wishing to use the service establishes a connection and then invokes + NewClient on the connection. The convenience function Dial (DialHTTP) performs + both steps for a raw network connection (an HTTP connection). The resulting + Client object has two methods, Call and Go, that specify the service and method to + call, a structure containing the arguments, and a structure to receive the result + parameters. + + Call waits for the remote call to complete; Go launches the call asynchronously + and returns a channel that will signal completion. + + Package "gob" is used to transport the data. + + Here is a simple example. A server wishes to export an object of type Arith: + + package server + + type Args struct { + A, B int + } + + type Reply struct { + C int + } + + type Arith int + + func (t *Arith) Multiply(args *Args, reply *Reply) os.Error { + reply.C = args.A * args.B; + return nil + } + + func (t *Arith) Divide(args *Args, reply *Reply) os.Error { + if args.B == 0 { + return os.ErrorString("divide by zero"); + } + reply.C = args.A / args.B; + return nil + } + + The server calls (for HTTP service): + + arith := new(Arith); + rpc.Register(arith); + rrpc.HandleHTTP(); + l, e := net.Listen("tcp", ":1234"); + if e != nil { + log.Exit("listen error:", e); + } + go http.Serve(l, nil); + + At this point, clients can see a service "Arith" with methods "Arith.Multiply" and + "Arith.Divide". To invoke one, a client first dials the server: + + client, err := rpc.DialHTTP("tcp", serverAddress + ":1234"); + if err != nil { + log.Exit("dialing:", err); + } + + Then it can make a remote call: + + // Synchronous call + args := &server.Args{7,8}; + reply := new(server.Reply); + err = client.Call("Arith.Multiply", args, reply); + if err != nil { + log.Exit("arith error:", err); + } + fmt.Printf("Arith: %d*%d=%d", args.A, args.B, reply.C); + + or + + // Asynchronous call + divCall := client.Go("Arith.Divide", args, reply, nil); + replyCall := <-divCall.Done; // will be equal to divCall + // check errors, print, etc. + + A server implementation will often provide a simple, type-safe wrapper for the + client. +*/ package rpc import ( @@ -36,17 +139,21 @@ type service struct { method map[string] *methodType; // registered methods } -// Request is a header written before every RPC call. +// Request is a header written before every RPC call. It is used internally +// but documented here as an aid to debugging, such as when analyzing +// network traffic. type Request struct { - ServiceMethod string; - Seq uint64; + ServiceMethod string; // format: "Service.Method" + Seq uint64; // sequence number chosen by client } -// Response is a header written before every RPC return. +// Response is a header written before every RPC return. It is used internally +// but documented here as an aid to debugging, such as when analyzing +// network traffic. type Response struct { - ServiceMethod string; - Seq uint64; - Error string; + ServiceMethod string; // echoes that of the Request + Seq uint64; // echoes that of the request + Error string; // error, if any. } type serverType struct { @@ -54,8 +161,8 @@ type serverType struct { } // This variable is a global whose "public" methods are really private methods -// called from the global functions of this package: rpc.Add, rpc.ServeConn, etc. -// For example, rpc.Add() calls server.add(). +// called from the global functions of this package: rpc.Register, rpc.ServeConn, etc. +// For example, rpc.Register() calls server.add(). var server = &serverType{ make(map[string] *service) } // Is this a publicly vislble - upper case - name? @@ -64,7 +171,7 @@ func isPublic(name string) bool { return unicode.IsUpper(rune) } -func (server *serverType) add(rcvr interface{}) os.Error { +func (server *serverType) register(rcvr interface{}) os.Error { if server.serviceMap == nil { server.serviceMap = make(map[string] *service); } @@ -76,10 +183,13 @@ func (server *serverType) add(rcvr interface{}) os.Error { log.Exit("rpc: no service name for type", s.typ.String()) } if !isPublic(sname) { - s := "rpc Add: type " + sname + " is not public"; + s := "rpc Register: type " + sname + " is not public"; log.Stderr(s); return os.ErrorString(s); } + if _, present := server.serviceMap[sname]; present { + return os.ErrorString("rpc: service already defined: " + sname); + } s.name = sname; s.method = make(map[string] *methodType); @@ -115,6 +225,14 @@ func (server *serverType) add(rcvr interface{}) os.Error { log.Stderr(mname, "reply type not a pointer to a struct:", replyType.String()); continue; } + if !isPublic(argType.Elem().Name()) { + log.Stderr(mname, "argument type not public:", argType.String()); + continue; + } + if !isPublic(replyType.Elem().Name()) { + log.Stderr(mname, "reply type not public:", replyType.String()); + continue; + } // Method needs one out: os.Error. if mtype.NumOut() != 1 { log.Stderr("method", mname, "has wrong number of outs:", mtype.NumOut()); @@ -128,7 +246,7 @@ func (server *serverType) add(rcvr interface{}) os.Error { } if len(s.method) == 0 { - s := "rpc Add: type " + sname + " has no public methods of suitable type"; + s := "rpc Register: type " + sname + " has no public methods of suitable type"; log.Stderr(s); return os.ErrorString(s); } @@ -136,7 +254,7 @@ func (server *serverType) add(rcvr interface{}) os.Error { return nil; } -// A value to be sent as a placeholder for the response when we receive invalid request. +// A value sent as a placeholder for the response when the server receives an invalid request. type InvalidRequest struct { marker int } @@ -234,24 +352,27 @@ func (server *serverType) accept(lis net.Listener) { } } -// Add publishes in the server the set of methods of the +// Register publishes in the server the set of methods of the // receiver value that satisfy the following conditions: // - public method -// - two arguments, both pointers to structs +// - two arguments, both pointers to public structs // - one return value of type os.Error -// It returns an error if the receiver is not suitable. -func Add(rcvr interface{}) os.Error { - return server.add(rcvr) +// It returns an error if the receiver is not public or has no +// suitable methods. +func Register(rcvr interface{}) os.Error { + return server.register(rcvr) } // ServeConn runs the server on a single connection. When the connection -// completes, service terminates. +// completes, service terminates. ServeConn blocks; the caller typically +// invokes it in a go statement. func ServeConn(conn io.ReadWriteCloser) { go server.input(conn) } // Accept accepts connections on the listener and serves requests -// for each incoming connection. +// for each incoming connection. Accept blocks; the caller typically +// invokes it in a go statement. func Accept(lis net.Listener) { server.accept(lis) } @@ -277,7 +398,7 @@ func serveHTTP(c *http.Conn, req *http.Request) { } // HandleHTTP registers an HTTP handler for RPC messages. -// It is still necessary to call http.Serve(). +// It is still necessary to invoke http.Serve(), typically in a go statement. func HandleHTTP() { http.Handle(rpcPath, http.HandlerFunc(serveHTTP)); } diff --git a/src/pkg/rpc/server_test.go b/src/pkg/rpc/server_test.go index 634de7e191..e21680eb6f 100644 --- a/src/pkg/rpc/server_test.go +++ b/src/pkg/rpc/server_test.go @@ -56,7 +56,7 @@ func (t *Arith) Error(args *Args, reply *Reply) os.Error { } func startServer() { - rpc.Add(new(Arith)); + Register(new(Arith)); l, e := net.Listen("tcp", ":0"); // any available address if e != nil { @@ -64,7 +64,7 @@ func startServer() { } serverAddr = l.Addr(); log.Stderr("Test RPC server listening on ", serverAddr); - go rpc.Accept(l); + go Accept(l); HandleHTTP(); l, e = net.Listen("tcp", ":0"); // any available address -- 2.48.1