From 964b6cf352291e015114ea67d3d60ab1fa8b8bea Mon Sep 17 00:00:00 2001 From: Rob Pike Date: Tue, 14 Jul 2009 20:47:39 -0700 Subject: [PATCH] add HTTP support R=rsc DELTA=159 (110 added, 29 deleted, 20 changed) OCL=31646 CL=31652 --- src/pkg/rpc/client.go | 20 +++++++++ src/pkg/rpc/server.go | 91 ++++++++++++++++++++++++++++---------- src/pkg/rpc/server_test.go | 52 ++++++++++++++-------- 3 files changed, 122 insertions(+), 41 deletions(-) diff --git a/src/pkg/rpc/client.go b/src/pkg/rpc/client.go index 196118834b..1bbe4241bc 100644 --- a/src/pkg/rpc/client.go +++ b/src/pkg/rpc/client.go @@ -8,6 +8,7 @@ import ( "gob"; "io"; "log"; + "net"; "os"; "rpc"; "sync"; @@ -94,6 +95,25 @@ func NewClient(conn io.ReadWriteCloser) *Client { return client; } +// Dial connects to an HTTP RPC server at the specified network address. +func DialHTTP(network, address string) (*Client, os.Error) { + conn, err := net.Dial(network, "", address); + if err != nil { + return nil, err + } + io.WriteString(conn, "GET " + rpcPath + " HTTP/1.0\n\n"); + return NewClient(conn), nil; +} + +// Dial connects to an RPC server at the specified network address. +func Dial(network, address string) (*Client, os.Error) { + conn, err := net.Dial(network, "", address); + if err != nil { + return nil, err + } + return NewClient(conn), nil; +} + // Go invokes the function asynchronously. It returns the Call structure representing // the invocation. func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { diff --git a/src/pkg/rpc/server.go b/src/pkg/rpc/server.go index 304f1e2df2..6012613ce3 100644 --- a/src/pkg/rpc/server.go +++ b/src/pkg/rpc/server.go @@ -6,6 +6,7 @@ package rpc import ( "gob"; + "http"; "log"; "io"; "net"; @@ -17,8 +18,6 @@ import ( "utf8"; ) -import "fmt" // TODO DELETE - // Precompute the reflect type for os.Error. Can't use os.Error directly // because Typeof takes an empty interface value. This is annoying. var unusedError *os.Error; @@ -50,25 +49,22 @@ type Response struct { Error string; } -// Server represents the set of services available to an RPC client. -// The zero type for Server is ready to have services added. -type Server struct { +type serverType struct { serviceMap map[string] *service; } +// This variable is a global whose "public" methods are really private methods +// called from the global functions of this package: rpc.Add, rpc.ServeConn, etc. +// For example, rpc.Add() calls server.add(). +var server = &serverType{ make(map[string] *service) } + // Is this a publicly vislble - upper case - name? func isPublic(name string) bool { rune, wid_ := utf8.DecodeRuneInString(name); return unicode.IsUpper(rune) } -// Add publishes in the server the set of methods of the -// recevier value that satisfy the following conditions: -// - public method -// - two arguments, both pointers to structs -// - one return value of type os.Error -// It returns an error if the receiver is not suitable. -func (server *Server) Add(rcvr interface{}) os.Error { +func (server *serverType) add(rcvr interface{}) os.Error { if server.serviceMap == nil { server.serviceMap = make(map[string] *service); } @@ -80,7 +76,7 @@ func (server *Server) Add(rcvr interface{}) os.Error { log.Exit("rpc: no service name for type", s.typ.String()) } if !isPublic(sname) { - s := "rpc server.Add: type " + sname + " is not public"; + s := "rpc Add: type " + sname + " is not public"; log.Stderr(s); return os.ErrorString(s); } @@ -132,7 +128,7 @@ func (server *Server) Add(rcvr interface{}) os.Error { } if len(s.method) == 0 { - s := "rpc server.Add: type " + sname + " has no public methods of suitable type"; + s := "rpc Add: type " + sname + " has no public methods of suitable type"; log.Stderr(s); return os.ErrorString(s); } @@ -177,7 +173,7 @@ func (s *service) call(sending *sync.Mutex, function *reflect.FuncValue, req *Re s.sendResponse(sending, req, replyv.Interface(), enc, errmsg); } -func (server *Server) serve(conn io.ReadWriteCloser) { +func (server *serverType) serve(conn io.ReadWriteCloser) { dec := gob.NewDecoder(conn); enc := gob.NewEncoder(conn); sending := new(sync.Mutex); @@ -222,20 +218,69 @@ func (server *Server) serve(conn io.ReadWriteCloser) { conn.Close(); } +func (server *serverType) accept(lis net.Listener) { + for { + conn, addr, err := lis.Accept(); + if err != nil { + log.Exit("rpc.Serve: accept:", err.String()); // TODO(r): exit? + } + go server.serve(conn); + } +} + +// Add publishes in the server the set of methods of the +// receiver value that satisfy the following conditions: +// - public method +// - two arguments, both pointers to structs +// - one return value of type os.Error +// It returns an error if the receiver is not suitable. +func Add(rcvr interface{}) os.Error { + return server.add(rcvr) +} + // ServeConn runs the server on a single connection. When the connection // completes, service terminates. -func (server *Server) ServeConn(conn io.ReadWriteCloser) { +func ServeConn(conn io.ReadWriteCloser) { go server.serve(conn) } // Accept accepts connections on the listener and serves requests // for each incoming connection. -func (server *Server) Accept(lis net.Listener) { - for { - conn, addr, err := lis.Accept(); - if err != nil { - log.Exit("rpc.Serve: accept:", err.String()); // TODO(r): exit? - } - go server.ServeConn(conn); +func Accept(lis net.Listener) { + server.accept(lis) +} + +type bufRWC struct { + r io.Reader; + w io.Writer; + c io.Closer; +} + +func (b *bufRWC) Read(p []byte) (n int, err os.Error) { + return b.r.Read(p); +} + +func (b *bufRWC) Write(p []byte) (n int, err os.Error) { + return b.w.Write(p); +} + +func (b *bufRWC) Close() os.Error { + return b.c.Close(); +} + +func serveHTTP(c *http.Conn, req *http.Request) { + conn, buf, err := c.Hijack(); + if err != nil { + log.Stderr("rpc hijacking ", c.RemoteAddr, ": ", err.String()); + return; } + server.serve(&bufRWC{buf, conn, conn}); +} + +var rpcPath string = "/_goRPC_" + +// HandleHTTP registers an HTTP handler for RPC messages. +// It is still necessary to call http.Serve(). +func HandleHTTP() { + http.Handle(rpcPath, http.HandlerFunc(serveHTTP)); } diff --git a/src/pkg/rpc/server_test.go b/src/pkg/rpc/server_test.go index 51d024d76b..634de7e191 100644 --- a/src/pkg/rpc/server_test.go +++ b/src/pkg/rpc/server_test.go @@ -5,7 +5,6 @@ package rpc import ( - "fmt"; "gob"; "http"; "io"; @@ -19,6 +18,7 @@ import ( ) var serverAddr string +var httpServerAddr string const second = 1e9 @@ -56,30 +56,35 @@ func (t *Arith) Error(args *Args, reply *Reply) os.Error { } func startServer() { - server := new(Server); - server.Add(new(Arith)); + rpc.Add(new(Arith)); + l, e := net.Listen("tcp", ":0"); // any available address if e != nil { - log.Stderrf("net.Listen tcp :0: %v", e); - os.Exit(1); + log.Exitf("net.Listen tcp :0: %v", e); } serverAddr = l.Addr(); log.Stderr("Test RPC server listening on ", serverAddr); - go server.Accept(l); + go rpc.Accept(l); + + HandleHTTP(); + l, e = net.Listen("tcp", ":0"); // any available address + if e != nil { + log.Stderrf("net.Listen tcp :0: %v", e); + os.Exit(1); + } + httpServerAddr = l.Addr(); + log.Stderr("Test HTTP RPC server listening on ", httpServerAddr); + go http.Serve(l, nil); } func TestRPC(t *testing.T) { - var i int; - once.Do(startServer); - conn, err := net.Dial("tcp", "", serverAddr); + client, err := Dial("tcp", serverAddr); if err != nil { - t.Fatal("dialing:", err) + t.Fatal("dialing", err); } - client := NewClient(conn); - // Synchronous calls args := &Args{7,8}; reply := new(Reply); @@ -124,9 +129,24 @@ func TestRPC(t *testing.T) { } } -func TestCheckUnknownService(t *testing.T) { - var i int; +func TestHTTPRPC(t *testing.T) { + once.Do(startServer); + client, err := DialHTTP("tcp", httpServerAddr); + if err != nil { + t.Fatal("dialing", err); + } + + // Synchronous calls + args := &Args{7,8}; + reply := new(Reply); + err = client.Call("Arith.Add", args, reply); + if reply.C != args.A + args.B { + t.Errorf("Add: expected %d got %d", reply.C, args.A + args.B); + } +} + +func TestCheckUnknownService(t *testing.T) { once.Do(startServer); conn, err := net.Dial("tcp", "", serverAddr); @@ -147,8 +167,6 @@ func TestCheckUnknownService(t *testing.T) { } func TestCheckUnknownMethod(t *testing.T) { - var i int; - once.Do(startServer); conn, err := net.Dial("tcp", "", serverAddr); @@ -169,8 +187,6 @@ func TestCheckUnknownMethod(t *testing.T) { } func TestCheckBadType(t *testing.T) { - var i int; - once.Do(startServer); conn, err := net.Dial("tcp", "", serverAddr); -- 2.48.1