]> Cypherpunks repositories - gostls13.git/commitdiff
net/http: Transport socket late binding
authorBrad Fitzpatrick <bradfitz@golang.org>
Fri, 8 Mar 2013 01:56:00 +0000 (17:56 -0800)
committerBrad Fitzpatrick <bradfitz@golang.org>
Fri, 8 Mar 2013 01:56:00 +0000 (17:56 -0800)
Implement what Chrome calls socket "late binding". See:
https://insouciant.org/tech/connection-management-in-chromium/

In a nutshell, if our HTTP client needs a TCP connection to a
remote host and there's not an idle one available, rather than
kick off a dial and wait for that specific dial, we instead
kick off a dial and wait for either our own dial to finish, or
any other TCP connection to that same host to become
available.

The implementation looks like a classic "Learning Go
Concurrency" slide.

Chrome's commit and numbers:
http://src.chromium.org/viewvc/chrome?view=rev&revision=36230

R=golang-dev, daniel.morsing, adg
CC=golang-dev
https://golang.org/cl/7587043

src/pkg/net/http/transport.go
src/pkg/net/http/transport_test.go

index 7bf08b8ae417b8fa9d02453b979aadb43ae1aabf..f3aaa79ccedf8c6d784478e2da4eae221c7f5f82 100644 (file)
@@ -41,12 +41,13 @@ const DefaultMaxIdleConnsPerHost = 2
 // https, and http proxies (for either http or https with CONNECT).
 // Transport can also cache connections for future re-use.
 type Transport struct {
-       idleMu   sync.Mutex
-       idleConn map[string][]*persistConn
-       reqMu    sync.Mutex
-       reqConn  map[*Request]*persistConn
-       altMu    sync.RWMutex
-       altProto map[string]RoundTripper // nil or map of URI scheme => RoundTripper
+       idleMu     sync.Mutex
+       idleConn   map[string][]*persistConn
+       idleConnCh map[string]chan *persistConn
+       reqMu      sync.Mutex
+       reqConn    map[*Request]*persistConn
+       altMu      sync.RWMutex
+       altProto   map[string]RoundTripper // nil or map of URI scheme => RoundTripper
 
        // TODO: tunable on global max cached connections
        // TODO: tunable on timeout on cached connections
@@ -279,6 +280,17 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
                max = DefaultMaxIdleConnsPerHost
        }
        t.idleMu.Lock()
+       select {
+       case t.idleConnCh[key] <- pconn:
+               // We're done with this pconn and somebody else is
+               // currently waiting for a conn of this type (they're
+               // actively dialing, but this conn is ready
+               // first). Chrome calls this socket late binding.  See
+               // https://insouciant.org/tech/connection-management-in-chromium/
+               t.idleMu.Unlock()
+               return true
+       default:
+       }
        if t.idleConn == nil {
                t.idleConn = make(map[string][]*persistConn)
        }
@@ -297,8 +309,23 @@ func (t *Transport) putIdleConn(pconn *persistConn) bool {
        return true
 }
 
+func (t *Transport) getIdleConnCh(cm *connectMethod) chan *persistConn {
+       key := cm.key()
+       t.idleMu.Lock()
+       defer t.idleMu.Unlock()
+       if t.idleConnCh == nil {
+               t.idleConnCh = make(map[string]chan *persistConn)
+       }
+       ch, ok := t.idleConnCh[key]
+       if !ok {
+               ch = make(chan *persistConn)
+               t.idleConnCh[key] = ch
+       }
+       return ch
+}
+
 func (t *Transport) getIdleConn(cm *connectMethod) (pconn *persistConn) {
-       key := cm.String()
+       key := cm.key()
        t.idleMu.Lock()
        defer t.idleMu.Unlock()
        if t.idleConn == nil {
@@ -354,6 +381,37 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
                return pc, nil
        }
 
+       type dialRes struct {
+               pc  *persistConn
+               err error
+       }
+       dialc := make(chan dialRes)
+       go func() {
+               pc, err := t.dialConn(cm)
+               dialc <- dialRes{pc, err}
+       }()
+
+       idleConnCh := t.getIdleConnCh(cm)
+       select {
+       case v := <-dialc:
+               // Our dial finished.
+               return v.pc, v.err
+       case pc := <-idleConnCh:
+               // Another request finished first and its net.Conn
+               // became available before our dial. Or somebody
+               // else's dial that they didn't use.
+               // But our dial is still going, so give it away
+               // when it finishes:
+               go func() {
+                       if v := <-dialc; v.err == nil {
+                               t.putIdleConn(v.pc)
+                       }
+               }()
+               return pc, nil
+       }
+}
+
+func (t *Transport) dialConn(cm *connectMethod) (*persistConn, error) {
        conn, err := t.dial("tcp", cm.addr())
        if err != nil {
                if cm.proxyURL != nil {
@@ -366,7 +424,7 @@ func (t *Transport) getConn(cm *connectMethod) (*persistConn, error) {
 
        pconn := &persistConn{
                t:        t,
-               cacheKey: cm.String(),
+               cacheKey: cm.key(),
                conn:     conn,
                reqch:    make(chan requestAndChan, 50),
                writech:  make(chan writeRequest, 50),
@@ -516,6 +574,10 @@ type connectMethod struct {
        targetAddr   string   // Not used if proxy + http targetScheme (4th example in table)
 }
 
+func (ck *connectMethod) key() string {
+       return ck.String() // TODO: use a struct type instead
+}
+
 func (ck *connectMethod) String() string {
        proxyStr := ""
        targetAddr := ck.targetAddr
index feaa53d7a58181f0ccd06cf54cbc5de52d46500a..213c5198dd2bc70ee1bc3996cd690093ac38622c 100644 (file)
@@ -1324,6 +1324,64 @@ func TestTransportNoHost(t *testing.T) {
        }
 }
 
+func TestTransportSocketLateBinding(t *testing.T) {
+       defer checkLeakedTransports(t)
+
+       mux := NewServeMux()
+       fooGate := make(chan bool, 1)
+       mux.HandleFunc("/foo", func(w ResponseWriter, r *Request) {
+               w.Header().Set("foo-ipport", r.RemoteAddr)
+               w.(Flusher).Flush()
+               <-fooGate
+       })
+       mux.HandleFunc("/bar", func(w ResponseWriter, r *Request) {
+               w.Header().Set("bar-ipport", r.RemoteAddr)
+       })
+       ts := httptest.NewServer(mux)
+       defer ts.Close()
+
+       dialGate := make(chan bool, 1)
+       tr := &Transport{
+               Dial: func(n, addr string) (net.Conn, error) {
+                       <-dialGate
+                       return net.Dial(n, addr)
+               },
+               DisableKeepAlives: false,
+       }
+       defer tr.CloseIdleConnections()
+       c := &Client{
+               Transport: tr,
+       }
+
+       dialGate <- true // only allow one dial
+       fooRes, err := c.Get(ts.URL + "/foo")
+       if err != nil {
+               t.Fatal(err)
+       }
+       fooAddr := fooRes.Header.Get("foo-ipport")
+       if fooAddr == "" {
+               t.Fatal("No addr on /foo request")
+       }
+       time.AfterFunc(200*time.Millisecond, func() {
+               // let the foo response finish so we can use its
+               // connection for /bar
+               fooGate <- true
+               io.Copy(ioutil.Discard, fooRes.Body)
+               fooRes.Body.Close()
+       })
+
+       barRes, err := c.Get(ts.URL + "/bar")
+       if err != nil {
+               t.Fatal(err)
+       }
+       barAddr := barRes.Header.Get("bar-ipport")
+       if barAddr != fooAddr {
+               t.Fatalf("/foo came from conn %q; /bar came from %q instead", fooAddr, barAddr)
+       }
+       barRes.Body.Close()
+       dialGate <- true
+}
+
 type proxyFromEnvTest struct {
        req     string // URL to fetch; blank means "http://example.com"
        env     string