]> Cypherpunks repositories - gostls13.git/commitdiff
http: add pipelining to ClientConn, ServerConn
authorPetar Maymounkov <petarm@gmail.com>
Fri, 11 Feb 2011 20:05:47 +0000 (15:05 -0500)
committerRuss Cox <rsc@golang.org>
Fri, 11 Feb 2011 20:05:47 +0000 (15:05 -0500)
R=rsc, bradfitzwork
CC=golang-dev
https://golang.org/cl/4082044

src/pkg/http/persist.go
src/pkg/http/serve_test.go

index 8bfc09755898b98f95caf3a07a6330f757d1cc65..000a4200e59a8a4d7e8730ffe2e93948585346b1 100644 (file)
@@ -6,14 +6,17 @@ package http
 
 import (
        "bufio"
-       "container/list"
        "io"
        "net"
+       "net/textproto"
        "os"
        "sync"
 )
 
-var ErrPersistEOF = &ProtocolError{"persistent connection closed"}
+var (
+       ErrPersistEOF = &ProtocolError{"persistent connection closed"}
+       ErrPipeline   = &ProtocolError{"pipeline error"}
+)
 
 // A ServerConn reads requests and sends responses over an underlying
 // connection, until the HTTP keepalive logic commands an end. ServerConn
@@ -26,8 +29,10 @@ type ServerConn struct {
        r               *bufio.Reader
        clsd            bool     // indicates a graceful close
        re, we          os.Error // read/write errors
-       lastBody        io.ReadCloser
+       lastbody        io.ReadCloser
        nread, nwritten int
+       pipe            textproto.Pipeline
+       pipereq         map[*Request]uint
        lk              sync.Mutex // protected read/write to re,we
 }
 
@@ -37,7 +42,7 @@ func NewServerConn(c net.Conn, r *bufio.Reader) *ServerConn {
        if r == nil {
                r = bufio.NewReader(c)
        }
-       return &ServerConn{c: c, r: r}
+       return &ServerConn{c: c, r: r, pipereq: make(map[*Request]uint)}
 }
 
 // Close detaches the ServerConn and returns the underlying connection as well
@@ -57,10 +62,25 @@ func (sc *ServerConn) Close() (c net.Conn, r *bufio.Reader) {
 // Read returns the next request on the wire. An ErrPersistEOF is returned if
 // it is gracefully determined that there are no more requests (e.g. after the
 // first request on an HTTP/1.0 connection, or after a Connection:close on a
-// HTTP/1.1 connection). Read can be called concurrently with Write, but not
-// with another Read.
+// HTTP/1.1 connection).
 func (sc *ServerConn) Read() (req *Request, err os.Error) {
 
+       // Ensure ordered execution of Reads and Writes
+       id := sc.pipe.Next()
+       sc.pipe.StartRequest(id)
+       defer func() {
+               sc.pipe.EndRequest(id)
+               if req == nil {
+                       sc.pipe.StartResponse(id)
+                       sc.pipe.EndResponse(id)
+               } else {
+                       // Remember the pipeline id of this request
+                       sc.lk.Lock()
+                       sc.pipereq[req] = id
+                       sc.lk.Unlock()
+               }
+       }()
+
        sc.lk.Lock()
        if sc.we != nil { // no point receiving if write-side broken or closed
                defer sc.lk.Unlock()
@@ -73,12 +93,12 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) {
        sc.lk.Unlock()
 
        // Make sure body is fully consumed, even if user does not call body.Close
-       if sc.lastBody != nil {
+       if sc.lastbody != nil {
                // body.Close is assumed to be idempotent and multiple calls to
                // it should return the error that its first invokation
                // returned.
-               err = sc.lastBody.Close()
-               sc.lastBody = nil
+               err = sc.lastbody.Close()
+               sc.lastbody = nil
                if err != nil {
                        sc.lk.Lock()
                        defer sc.lk.Unlock()
@@ -102,7 +122,7 @@ func (sc *ServerConn) Read() (req *Request, err os.Error) {
                        return
                }
        }
-       sc.lastBody = req.Body
+       sc.lastbody = req.Body
        sc.nread++
        if req.Close {
                sc.lk.Lock()
@@ -121,11 +141,24 @@ func (sc *ServerConn) Pending() int {
        return sc.nread - sc.nwritten
 }
 
-// Write writes a repsonse. To close the connection gracefully, set the
+// Write writes resp in response to req. To close the connection gracefully, set the
 // Response.Close field to true. Write should be considered operational until
 // it returns an error, regardless of any errors returned on the Read side.
-// Write can be called concurrently with Read, but not with another Write.
-func (sc *ServerConn) Write(resp *Response) os.Error {
+func (sc *ServerConn) Write(req *Request, resp *Response) os.Error {
+
+       // Retrieve the pipeline ID of this request/response pair
+       sc.lk.Lock()
+       id, ok := sc.pipereq[req]
+       sc.pipereq[req] = 0, false
+       if !ok {
+               sc.lk.Unlock()
+               return ErrPipeline
+       }
+       sc.lk.Unlock()
+
+       // Ensure pipeline order
+       sc.pipe.StartResponse(id)
+       defer sc.pipe.EndResponse(id)
 
        sc.lk.Lock()
        if sc.we != nil {
@@ -166,10 +199,11 @@ type ClientConn struct {
        c               net.Conn
        r               *bufio.Reader
        re, we          os.Error // read/write errors
-       lastBody        io.ReadCloser
+       lastbody        io.ReadCloser
        nread, nwritten int
-       reqm            list.List  // request methods in order of execution
-       lk              sync.Mutex // protects read/write to reqm,re,we
+       pipe            textproto.Pipeline
+       pipereq         map[*Request]uint
+       lk              sync.Mutex // protects read/write to re,we,pipereq,etc.
 }
 
 // NewClientConn returns a new ClientConn reading and writing c.  If r is not
@@ -178,7 +212,7 @@ func NewClientConn(c net.Conn, r *bufio.Reader) *ClientConn {
        if r == nil {
                r = bufio.NewReader(c)
        }
-       return &ClientConn{c: c, r: r}
+       return &ClientConn{c: c, r: r, pipereq: make(map[*Request]uint)}
 }
 
 // Close detaches the ClientConn and returns the underlying connection as well
@@ -191,7 +225,6 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) {
        r = cc.r
        cc.c = nil
        cc.r = nil
-       cc.reqm.Init()
        cc.lk.Unlock()
        return
 }
@@ -201,8 +234,23 @@ func (cc *ClientConn) Close() (c net.Conn, r *bufio.Reader) {
 // keepalive connection is logically closed after this request and the opposing
 // server is informed. An ErrUnexpectedEOF indicates the remote closed the
 // underlying TCP connection, which is usually considered as graceful close.
-// Write can be called concurrently with Read, but not with another Write.
-func (cc *ClientConn) Write(req *Request) os.Error {
+func (cc *ClientConn) Write(req *Request) (err os.Error) {
+
+       // Ensure ordered execution of Writes
+       id := cc.pipe.Next()
+       cc.pipe.StartRequest(id)
+       defer func() {
+               cc.pipe.EndRequest(id)
+               if err != nil {
+                       cc.pipe.StartResponse(id)
+                       cc.pipe.EndResponse(id)
+               } else {
+                       // Remember the pipeline id of this request
+                       cc.lk.Lock()
+                       cc.pipereq[req] = id
+                       cc.lk.Unlock()
+               }
+       }()
 
        cc.lk.Lock()
        if cc.re != nil { // no point sending if read-side closed or broken
@@ -223,7 +271,7 @@ func (cc *ClientConn) Write(req *Request) os.Error {
                cc.lk.Unlock()
        }
 
-       err := req.Write(cc.c)
+       err = req.Write(cc.c)
        if err != nil {
                cc.lk.Lock()
                defer cc.lk.Unlock()
@@ -231,9 +279,6 @@ func (cc *ClientConn) Write(req *Request) os.Error {
                return err
        }
        cc.nwritten++
-       cc.lk.Lock()
-       cc.reqm.PushBack(req.Method)
-       cc.lk.Unlock()
 
        return nil
 }
@@ -250,7 +295,21 @@ func (cc *ClientConn) Pending() int {
 // returned together with an ErrPersistEOF, which means that the remote
 // requested that this be the last request serviced. Read can be called
 // concurrently with Write, but not with another Read.
-func (cc *ClientConn) Read() (resp *Response, err os.Error) {
+func (cc *ClientConn) Read(req *Request) (resp *Response, err os.Error) {
+
+       // Retrieve the pipeline ID of this request/response pair
+       cc.lk.Lock()
+       id, ok := cc.pipereq[req]
+       cc.pipereq[req] = 0, false
+       if !ok {
+               cc.lk.Unlock()
+               return nil, ErrPipeline
+       }
+       cc.lk.Unlock()
+
+       // Ensure pipeline order
+       cc.pipe.StartResponse(id)
+       defer cc.pipe.EndResponse(id)
 
        cc.lk.Lock()
        if cc.re != nil {
@@ -259,17 +318,13 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) {
        }
        cc.lk.Unlock()
 
-       if cc.nread >= cc.nwritten {
-               return nil, os.NewError("persist client pipe count")
-       }
-
        // Make sure body is fully consumed, even if user does not call body.Close
-       if cc.lastBody != nil {
+       if cc.lastbody != nil {
                // body.Close is assumed to be idempotent and multiple calls to
                // it should return the error that its first invokation
                // returned.
-               err = cc.lastBody.Close()
-               cc.lastBody = nil
+               err = cc.lastbody.Close()
+               cc.lastbody = nil
                if err != nil {
                        cc.lk.Lock()
                        defer cc.lk.Unlock()
@@ -278,18 +333,14 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) {
                }
        }
 
-       cc.lk.Lock()
-       m := cc.reqm.Front()
-       cc.reqm.Remove(m)
-       cc.lk.Unlock()
-       resp, err = ReadResponse(cc.r, m.Value.(string))
+       resp, err = ReadResponse(cc.r, req.Method)
        if err != nil {
                cc.lk.Lock()
                defer cc.lk.Unlock()
                cc.re = err
                return
        }
-       cc.lastBody = resp.Body
+       cc.lastbody = resp.Body
 
        cc.nread++
 
@@ -301,3 +352,12 @@ func (cc *ClientConn) Read() (resp *Response, err os.Error) {
        }
        return
 }
+
+// Do is convenience method that writes a request and reads a response.
+func (cc *ClientConn) Do(req *Request) (resp *Response, err os.Error) {
+       err = cc.Write(req)
+       if err != nil {
+               return
+       }
+       return cc.Read(req)
+}
index 80ad86290d4982928463a65ffc9377bccb301306..5594d512ad77ef915641646fcf04a9ab6ee5e0f8 100644 (file)
@@ -192,7 +192,7 @@ func TestHostHandlers(t *testing.T) {
                        t.Errorf("writing request: %v", err)
                        continue
                }
-               r, err := cc.Read()
+               r, err := cc.Read(&req)
                if err != nil {
                        t.Errorf("reading response: %v", err)
                        continue