From fbaf881cc62bd0e1f4c5e608217fd52106438dbb Mon Sep 17 00:00:00 2001 From: Russ Cox Date: Fri, 28 Jun 2019 15:01:16 -0400 Subject: [PATCH] net/http: fix Transport.MaxConnsPerHost limits & idle pool races MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit There were at least three races in the implementation of the pool of idle HTTP connections before this CL. The first race is that HTTP/2 connections can be shared for many requests, but each requesting goroutine would take the connection out of the pool and then immediately return it before using it; this created unnecessary, tiny little race windows during which another goroutine might dial a second connection instead of reusing the first. This CL changes the idle pool to just leave the HTTP/2 connection in the pool permanently (until there is reason to close it), instead of doing the take-it-out-put-it-back dance race. The second race is that “is there an idle connection?” and “register to wait for an idle connection” were implemented as two separate steps, in different critical sections. So a client could end up registered to wait for an idle connection and be waiting or perhaps dialing, not having noticed the idle connection sitting in the pool that arrived between the two steps. The third race is that t.getIdleConnCh assumes that the inability to send on the channel means the client doesn't need the result, when it could mean that the client has not yet entered the select. That is, the main dial does: idleConnCh := t.getIdleConnCh(cm) select { case v := <-dialc: ... case pc := <-idleConnCh ... ... } But then tryPutIdleConn does: waitingDialer := t.idleConnCh[key] // what getIdleConnCh(cm) returned select { case waitingDialer <- pconn: // We're done ... return nil default: if waitingDialer != nil { // They had populated this, but their dial won // first, so we can clean up this map entry. delete(t.idleConnCh, key) } } If the client has returned from getIdleConnCh but not yet reached the select, tryPutIdleConn will be unable to do the send, incorrectly conclude that the client does not care anymore, and put the connection in the idle pool instead, again leaving the client dialing unnecessarily while a connection sits in the idle pool. (It's also odd that the success case does not clean up the map entry, and also that the map has room for only a single waiting goroutine for a given host.) None of these races mattered too much before Go 1.11: at most they meant that connections were not reused quite as promptly as possible, or a few more than necessary would be created. But Go 1.11 added Transport.MaxConnsPerHost, which limited the number of connections created for a given host. The default is 0 (unlimited), but if a user did explicitly impose a low limit (2 is common), all these misplaced conns could easily add up to the entire limit, causing a deadlock. This was causing intermittent timeouts in TestTransportMaxConnsPerHost. The addition of the MaxConnsPerHost support added its own races. For example, here t.incHostConnCount could increment the count and return a channel ready for receiving, and then the client would not receive from it nor ever issue the decrement, because the select need not evaluate these two cases in order: select { case <-t.incHostConnCount(cmKey): // count below conn per host limit; proceed case pc := <-t.getIdleConnCh(cm): if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil ... } Obviously, unmatched increments are another way to get to a deadlock. TestTransportMaxConnsPerHost deadlocked approximately 100% of the time with a small random sleep added between incHostConnCount and the select: ch := t.incHostConnCount(cmKey): time.Sleep(time.Duration(rand.Intn(10))*time.Millisecond) select { case <-ch // count below conn per host limit; proceed case pc := <-t.getIdleConnCh(cm): ... } The limit also did not properly apply to HTTP/2, because of the decrement being attached to the underlying net.Conn.Close and net/http not having access to the underlying HTTP/2 conn. The alternate decrements for HTTP/2 may also have introduced spurious decrements (discussion in #29889). Perhaps those spurious decrements or other races caused the other intermittent non-deadlock failures in TestTransportMaxConnsPerHost, in which the HTTP/2 phase created too many connections (#31982). This CL replaces the buggy, racy code with new code that is hopefully neither buggy nor racy. Fixes #29889. Fixes #31982. Fixes #32336. Change-Id: I0dfac3a6fe8a6cdf5f0853722781fe2ec071ac97 Reviewed-on: https://go-review.googlesource.com/c/go/+/184262 Run-TryBot: Russ Cox TryBot-Result: Gobot Gobot Reviewed-by: Bryan C. Mills --- src/net/http/export_test.go | 28 +- src/net/http/serve_test.go | 23 +- src/net/http/transport.go | 614 +++++++++++++++++++-------------- src/net/http/transport_test.go | 30 +- 4 files changed, 412 insertions(+), 283 deletions(-) diff --git a/src/net/http/export_test.go b/src/net/http/export_test.go index f0dfa8cd33..d265cd3f72 100644 --- a/src/net/http/export_test.go +++ b/src/net/http/export_test.go @@ -166,30 +166,40 @@ func (t *Transport) IdleConnCountForTesting(scheme, addr string) int { return 0 } -func (t *Transport) IdleConnChMapSizeForTesting() int { +func (t *Transport) IdleConnWaitMapSizeForTesting() int { t.idleMu.Lock() defer t.idleMu.Unlock() - return len(t.idleConnCh) + return len(t.idleConnWait) } func (t *Transport) IsIdleForTesting() bool { t.idleMu.Lock() defer t.idleMu.Unlock() - return t.wantIdle + return t.closeIdle } -func (t *Transport) RequestIdleConnChForTesting() { - t.getIdleConnCh(connectMethod{nil, "http", "example.com", false}) +func (t *Transport) QueueForIdleConnForTesting() { + t.queueForIdleConn(nil) } +// PutIdleTestConn reports whether it was able to insert a fresh +// persistConn for scheme, addr into the idle connection pool. func (t *Transport) PutIdleTestConn(scheme, addr string) bool { c, _ := net.Pipe() key := connectMethodKey{"", scheme, addr, false} - select { - case <-t.incHostConnCount(key): - default: - return false + + if t.MaxConnsPerHost > 0 { + // Transport is tracking conns-per-host. + // Increment connection count to account + // for new persistConn created below. + t.connsPerHostMu.Lock() + if t.connsPerHost == nil { + t.connsPerHost = make(map[connectMethodKey]int) + } + t.connsPerHost[key]++ + t.connsPerHostMu.Unlock() } + return t.tryPutIdleConn(&persistConn{ t: t, conn: c, // dummy diff --git a/src/net/http/serve_test.go b/src/net/http/serve_test.go index e7ed15c3aa..61adda2604 100644 --- a/src/net/http/serve_test.go +++ b/src/net/http/serve_test.go @@ -2407,6 +2407,7 @@ func TestTimeoutHandlerRace(t *testing.T) { } // See issues 8209 and 8414. +// Both issues involved panics in the implementation of TimeoutHandler. func TestTimeoutHandlerRaceHeader(t *testing.T) { setParallel(t) defer afterTest(t) @@ -2434,7 +2435,9 @@ func TestTimeoutHandlerRaceHeader(t *testing.T) { defer func() { <-gate }() res, err := c.Get(ts.URL) if err != nil { - t.Error(err) + // We see ECONNRESET from the connection occasionally, + // and that's OK: this test is checking that the server does not panic. + t.Log(err) return } defer res.Body.Close() @@ -5507,19 +5510,23 @@ func TestServerSetKeepAlivesEnabledClosesConns(t *testing.T) { if a1 != a2 { t.Fatal("expected first two requests on same connection") } - var idle0 int - if !waitCondition(2*time.Second, 10*time.Millisecond, func() bool { - idle0 = tr.IdleConnKeyCountForTesting() - return idle0 == 1 - }) { - t.Fatalf("idle count before SetKeepAlivesEnabled called = %v; want 1", idle0) + addr := strings.TrimPrefix(ts.URL, "http://") + + // The two requests should have used the same connection, + // and there should not have been a second connection that + // was created by racing dial against reuse. + // (The first get was completed when the second get started.) + n := tr.IdleConnCountForTesting("http", addr) + if n != 1 { + t.Fatalf("idle count for %q after 2 gets = %d, want 1", addr, n) } + // SetKeepAlivesEnabled should discard idle conns. ts.Config.SetKeepAlivesEnabled(false) var idle1 int if !waitCondition(2*time.Second, 10*time.Millisecond, func() bool { - idle1 = tr.IdleConnKeyCountForTesting() + idle1 = tr.IdleConnCountForTesting("http", addr) return idle1 == 0 }) { t.Fatalf("idle count after SetKeepAlivesEnabled called = %v; want 0", idle1) diff --git a/src/net/http/transport.go b/src/net/http/transport.go index 26f642aa7a..2f9bdc2700 100644 --- a/src/net/http/transport.go +++ b/src/net/http/transport.go @@ -57,15 +57,6 @@ var DefaultTransport RoundTripper = &Transport{ // MaxIdleConnsPerHost. const DefaultMaxIdleConnsPerHost = 2 -// connsPerHostClosedCh is a closed channel used by MaxConnsPerHost -// for the property that receives from a closed channel return the -// zero value. -var connsPerHostClosedCh = make(chan struct{}) - -func init() { - close(connsPerHostClosedCh) -} - // Transport is an implementation of RoundTripper that supports HTTP, // HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT). // @@ -102,11 +93,11 @@ func init() { // request is treated as idempotent but the header is not sent on the // wire. type Transport struct { - idleMu sync.Mutex - wantIdle bool // user has requested to close all idle conns - idleConn map[connectMethodKey][]*persistConn // most recently used at end - idleConnCh map[connectMethodKey]chan *persistConn - idleLRU connLRU + idleMu sync.Mutex + closeIdle bool // user has requested to close all idle conns + idleConn map[connectMethodKey][]*persistConn // most recently used at end + idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns + idleLRU connLRU reqMu sync.Mutex reqCanceler map[*Request]func(error) @@ -114,9 +105,9 @@ type Transport struct { altMu sync.Mutex // guards changing altProto only altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme - connCountMu sync.Mutex - connPerHostCount map[connectMethodKey]int - connPerHostAvailable map[connectMethodKey]chan struct{} + connsPerHostMu sync.Mutex + connsPerHost map[connectMethodKey]int + connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns // Proxy specifies a function to return a proxy for a given // Request. If the function returns a non-nil error, the @@ -203,11 +194,6 @@ type Transport struct { // active, and idle states. On limit violation, dials will block. // // Zero means no limit. - // - // For HTTP/2, this currently only controls the number of new - // connections being created at a time, instead of the total - // number. In practice, hosts using HTTP/2 only have about one - // idle connection, though. MaxConnsPerHost int // IdleConnTimeout is the maximum amount of time an idle @@ -543,7 +529,6 @@ func (t *Transport) roundTrip(req *Request) (*Response, error) { var resp *Response if pconn.alt != nil { // HTTP/2 path. - t.putOrCloseIdleConn(pconn) t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { @@ -554,7 +539,6 @@ func (t *Transport) roundTrip(req *Request) (*Response, error) { } if http2isNoCachedConnError(err) { t.removeIdleConn(pconn) - t.decHostConnCount(cm.key()) // clean up the persistent connection } else if !pconn.shouldRetryRequest(req, err) { // Issue 16465: return underlying net.Conn.Read error from peek, // as we've historically done. @@ -665,8 +649,7 @@ func (t *Transport) CloseIdleConnections() { t.idleMu.Lock() m := t.idleConn t.idleConn = nil - t.idleConnCh = nil - t.wantIdle = true + t.closeIdle = true // close newly idle connections t.idleLRU = connLRU{} t.idleMu.Unlock() for _, conns := range m { @@ -762,7 +745,7 @@ func (cm *connectMethod) proxyAuth() string { var ( errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled") errConnBroken = errors.New("http: putIdleConn: connection is in bad state") - errWantIdle = errors.New("http: putIdleConn: CloseIdleConnections was called") + errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called") errTooManyIdle = errors.New("http: putIdleConn: too many idle connections") errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host") errCloseIdleConns = errors.New("http: CloseIdleConnections called") @@ -821,29 +804,56 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { return errConnBroken } pconn.markReused() - key := pconn.cacheKey t.idleMu.Lock() defer t.idleMu.Unlock() - waitingDialer := t.idleConnCh[key] - select { - case waitingDialer <- pconn: - // We're done with this pconn and somebody else is - // currently waiting for a conn of this type (they're - // actively dialing, but this conn is ready - // first). Chrome calls this socket late binding. See - // https://insouciant.org/tech/connection-management-in-chromium/ + // HTTP/2 (pconn.alt != nil) connections do not come out of the idle list, + // because multiple goroutines can use them simultaneously. + // If this is an HTTP/2 connection being “returned,” we're done. + if pconn.alt != nil && t.idleLRU.m[pconn] != nil { return nil - default: - if waitingDialer != nil { - // They had populated this, but their dial won - // first, so we can clean up this map entry. - delete(t.idleConnCh, key) + } + + // Deliver pconn to goroutine waiting for idle connection, if any. + // (They may be actively dialing, but this conn is ready first. + // Chrome calls this socket late binding. + // See https://insouciant.org/tech/connection-management-in-chromium/.) + key := pconn.cacheKey + if q, ok := t.idleConnWait[key]; ok { + done := false + if pconn.alt == nil { + // HTTP/1. + // Loop over the waiting list until we find a w that isn't done already, and hand it pconn. + for q.len() > 0 { + w := q.popFront() + if w.tryDeliver(pconn, nil) { + done = true + break + } + } + } else { + // HTTP/2. + // Can hand the same pconn to everyone in the waiting list, + // and we still won't be done: we want to put it in the idle + // list unconditionally, for any future clients too. + for q.len() > 0 { + w := q.popFront() + w.tryDeliver(pconn, nil) + } + } + if q.len() == 0 { + delete(t.idleConnWait, key) + } else { + t.idleConnWait[key] = q + } + if done { + return nil } } - if t.wantIdle { - return errWantIdle + + if t.closeIdle { + return errCloseIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) @@ -864,71 +874,86 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { oldest.close(errTooManyIdle) t.removeIdleConnLocked(oldest) } - if t.IdleConnTimeout > 0 { + + // Set idle timer, but only for HTTP/1 (pconn.alt == nil). + // The HTTP/2 implementation manages the idle timer itself + // (see idleConnTimeout in h2_bundle.go). + if t.IdleConnTimeout > 0 && pconn.alt == nil { if pconn.idleTimer != nil { pconn.idleTimer.Reset(t.IdleConnTimeout) } else { - // idleTimer does not apply to HTTP/2 - if pconn.alt == nil { - pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle) - } + pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle) } } pconn.idleAt = time.Now() return nil } -// getIdleConnCh returns a channel to receive and return idle -// persistent connection for the given connectMethod. -// It may return nil, if persistent connections are not being used. -func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn { +// queueForIdleConn queues w to receive the next idle connection for w.cm. +// As an optimization hint to the caller, queueForIdleConn reports whether +// it successfully delivered an already-idle connection. +func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) { if t.DisableKeepAlives { - return nil + return false } - key := cm.key() + t.idleMu.Lock() defer t.idleMu.Unlock() - t.wantIdle = false - if t.idleConnCh == nil { - t.idleConnCh = make(map[connectMethodKey]chan *persistConn) - } - ch, ok := t.idleConnCh[key] - if !ok { - ch = make(chan *persistConn) - t.idleConnCh[key] = ch + + // Stop closing connections that become idle - we might want one. + // (That is, undo the effect of t.CloseIdleConnections.) + t.closeIdle = false + + if w == nil { + // Happens in test hook. + return false } - return ch -} -func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince time.Time) { - key := cm.key() - t.idleMu.Lock() - defer t.idleMu.Unlock() - for { - pconns, ok := t.idleConn[key] - if !ok { - return nil, time.Time{} + // Look for most recently-used idle connection. + if list, ok := t.idleConn[w.key]; ok { + stop := false + delivered := false + for len(list) > 0 && !stop { + pconn := list[len(list)-1] + if pconn.isBroken() { + // persistConn.readLoop has marked the connection broken, + // but Transport.removeIdleConn has not yet removed it from the idle list. + // Drop on floor on behalf of Transport.removeIdleConn. + list = list[:len(list)-1] + continue + } + delivered = w.tryDeliver(pconn, nil) + if delivered { + if pconn.alt != nil { + // HTTP/2: multiple clients can share pconn. + // Leave it in the list. + } else { + // HTTP/1: only one client can use pconn. + // Remove it from the list. + t.idleLRU.remove(pconn) + list = list[:len(list)-1] + } + } + stop = true } - if len(pconns) == 1 { - pconn = pconns[0] - delete(t.idleConn, key) + if len(list) > 0 { + t.idleConn[w.key] = list } else { - // 2 or more cached connections; use the most - // recently used one at the end. - pconn = pconns[len(pconns)-1] - t.idleConn[key] = pconns[:len(pconns)-1] + delete(t.idleConn, w.key) } - t.idleLRU.remove(pconn) - if pconn.isBroken() { - // There is a tiny window where this is - // possible, between the connecting dying and - // the persistConn readLoop calling - // Transport.removeIdleConn. Just skip it and - // carry on. - continue + if stop { + return delivered } - return pconn, pconn.idleAt } + + // Register to receive next connection that becomes idle. + if t.idleConnWait == nil { + t.idleConnWait = make(map[connectMethodKey]wantConnQueue) + } + q := t.idleConnWait[w.key] + q.pushBack(w) + t.idleConnWait[w.key] = q + return false } // removeIdleConn marks pconn as dead. @@ -1015,20 +1040,147 @@ func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, e return zeroDialer.DialContext(ctx, network, addr) } +// A wantConn records state about a wanted connection +// (that is, an active call to getConn). +// The conn may be gotten by dialing or by finding an idle connection, +// or a cancellation may make the conn no longer wanted. +// These three options are racing against each other and use +// wantConn to coordinate and agree about the winning outcome. +type wantConn struct { + cm connectMethod + key connectMethodKey // cm.key() + ctx context.Context // context for dial + ready chan struct{} // closed when pc, err pair is delivered + + // hooks for testing to know when dials are done + // beforeDial is called in the getConn goroutine when the dial is queued. + // afterDial is called when the dial is completed or cancelled. + beforeDial func() + afterDial func() + + mu sync.Mutex // protects pc, err, close(ready) + pc *persistConn + err error +} + +// waiting reports whether w is still waiting for an answer (connection or error). +func (w *wantConn) waiting() bool { + select { + case <-w.ready: + return false + default: + return true + } +} + +// tryDeliver attempts to deliver pc, err to w and reports whether it succeeded. +func (w *wantConn) tryDeliver(pc *persistConn, err error) bool { + w.mu.Lock() + defer w.mu.Unlock() + + if w.pc != nil || w.err != nil { + return false + } + + w.pc = pc + w.err = err + if w.pc == nil && w.err == nil { + panic("net/http: internal error: misuse of tryDeliver") + } + close(w.ready) + return true +} + +// cancel marks w as no longer wanting a result (for example, due to cancellation). +// If a connection has been delivered already, cancel returns it with t.putOrCloseIdleConn. +func (w *wantConn) cancel(t *Transport, err error) { + w.mu.Lock() + if w.pc == nil && w.err == nil { + close(w.ready) // catch misbehavior in future delivery + } + pc := w.pc + w.pc = nil + w.err = err + w.mu.Unlock() + + if pc != nil { + t.putOrCloseIdleConn(pc) + } +} + +// A wantConnQueue is a queue of wantConns. +type wantConnQueue struct { + // This is a queue, not a deque. + // It is split into two stages - head[headPos:] and tail. + // popFront is trivial (headPos++) on the first stage, and + // pushBack is trivial (append) on the second stage. + // If the first stage is empty, popFront can swap the + // first and second stages to remedy the situation. + // + // This two-stage split is analogous to the use of two lists + // in Okasaki's purely functional queue but without the + // overhead of reversing the list when swapping stages. + head []*wantConn + headPos int + tail []*wantConn +} + +// len returns the number of items in the queue. +func (q *wantConnQueue) len() int { + return len(q.head) - q.headPos + len(q.tail) +} + +// pushBack adds w to the back of the queue. +func (q *wantConnQueue) pushBack(w *wantConn) { + q.tail = append(q.tail, w) +} + +// popFront removes and returns the w at the front of the queue. +func (q *wantConnQueue) popFront() *wantConn { + if q.headPos >= len(q.head) { + if len(q.tail) == 0 { + return nil + } + // Pick up tail as new head, clear tail. + q.head, q.headPos, q.tail = q.tail, 0, q.head[:0] + } + w := q.head[q.headPos] + q.head[q.headPos] = nil + q.headPos++ + return w +} + // getConn dials and creates a new persistConn to the target as // specified in the connectMethod. This includes doing a proxy CONNECT // and/or setting up TLS. If this doesn't return an error, the persistConn // is ready to write requests to. -func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) { +func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) { req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } - if pc, idleSince := t.getIdleConn(cm); pc != nil { + + w := &wantConn{ + cm: cm, + key: cm.key(), + ctx: ctx, + ready: make(chan struct{}, 1), + beforeDial: testHookPrePendingDial, + afterDial: testHookPostPendingDial, + } + defer func() { + if err != nil { + w.cancel(t, err) + } + }() + + // Queue for idle connection. + if delivered := t.queueForIdleConn(w); delivered { + pc := w.pc if trace != nil && trace.GotConn != nil { - trace.GotConn(pc.gotIdleConnTrace(idleSince)) + trace.GotConn(pc.gotIdleConnTrace(pc.idleAt)) } // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when @@ -1037,108 +1189,44 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistC return pc, nil } - type dialRes struct { - pc *persistConn - err error - } - dialc := make(chan dialRes) - cmKey := cm.key() - - // Copy these hooks so we don't race on the postPendingDial in - // the goroutine we launch. Issue 11136. - testHookPrePendingDial := testHookPrePendingDial - testHookPostPendingDial := testHookPostPendingDial - - handlePendingDial := func() { - testHookPrePendingDial() - go func() { - if v := <-dialc; v.err == nil { - t.putOrCloseIdleConn(v.pc) - } else { - t.decHostConnCount(cmKey) - } - testHookPostPendingDial() - }() - } - cancelc := make(chan error, 1) t.setReqCanceler(req, func(err error) { cancelc <- err }) - if t.MaxConnsPerHost > 0 { - select { - case <-t.incHostConnCount(cmKey): - // count below conn per host limit; proceed - case pc := <-t.getIdleConnCh(cm): - if trace != nil && trace.GotConn != nil { - trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) - } - return pc, nil - case <-req.Cancel: - return nil, errRequestCanceledConn - case <-req.Context().Done(): - return nil, req.Context().Err() - case err := <-cancelc: - if err == errRequestCanceled { - err = errRequestCanceledConn - } - return nil, err - } - } + // Queue for permission to dial. + t.queueForDial(w) - go func() { - pc, err := t.dialConn(ctx, cm) - dialc <- dialRes{pc, err} - }() - - idleConnCh := t.getIdleConnCh(cm) + // Wait for completion or cancellation. select { - case v := <-dialc: - // Our dial finished. - if v.pc != nil { - if trace != nil && trace.GotConn != nil && v.pc.alt == nil { - trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) - } - return v.pc, nil - } - // Our dial failed. See why to return a nicer error - // value. - t.decHostConnCount(cmKey) - select { - case <-req.Cancel: - // It was an error due to cancellation, so prioritize that - // error value. (Issue 16049) - return nil, errRequestCanceledConn - case <-req.Context().Done(): - return nil, req.Context().Err() - case err := <-cancelc: - if err == errRequestCanceled { - err = errRequestCanceledConn + case <-w.ready: + // Trace success but only for HTTP/1. + // HTTP/2 calls trace.GotConn itself. + if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil { + trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()}) + } + if w.err != nil { + // If the request has been cancelled, that's probably + // what caused w.err; if so, prefer to return the + // cancellation error (see golang.org/issue/16049). + select { + case <-req.Cancel: + return nil, errRequestCanceledConn + case <-req.Context().Done(): + return nil, req.Context().Err() + case err := <-cancelc: + if err == errRequestCanceled { + err = errRequestCanceledConn + } + return nil, err + default: + // return below } - return nil, err - default: - // It wasn't an error due to cancellation, so - // return the original error message: - return nil, v.err - } - case pc := <-idleConnCh: - // Another request finished first and its net.Conn - // became available before our dial. Or somebody - // else's dial that they didn't use. - // But our dial is still going, so give it away - // when it finishes: - handlePendingDial() - if trace != nil && trace.GotConn != nil { - trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } - return pc, nil + return w.pc, w.err case <-req.Cancel: - handlePendingDial() return nil, errRequestCanceledConn case <-req.Context().Done(): - handlePendingDial() return nil, req.Context().Err() case err := <-cancelc: - handlePendingDial() if err == errRequestCanceled { err = errRequestCanceledConn } @@ -1146,81 +1234,102 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistC } } -// incHostConnCount increments the count of connections for a -// given host. It returns an already-closed channel if the count -// is not at its limit; otherwise it returns a channel which is -// notified when the count is below the limit. -func (t *Transport) incHostConnCount(cmKey connectMethodKey) <-chan struct{} { +// queueForDial queues w to wait for permission to begin dialing. +// Once w receives permission to dial, it will do so in a separate goroutine. +func (t *Transport) queueForDial(w *wantConn) { + w.beforeDial() if t.MaxConnsPerHost <= 0 { - return connsPerHostClosedCh + go t.dialConnFor(w) + return } - t.connCountMu.Lock() - defer t.connCountMu.Unlock() - if t.connPerHostCount[cmKey] == t.MaxConnsPerHost { - if t.connPerHostAvailable == nil { - t.connPerHostAvailable = make(map[connectMethodKey]chan struct{}) - } - ch, ok := t.connPerHostAvailable[cmKey] - if !ok { - ch = make(chan struct{}) - t.connPerHostAvailable[cmKey] = ch + + t.connsPerHostMu.Lock() + defer t.connsPerHostMu.Unlock() + + if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { + if t.connsPerHost == nil { + t.connsPerHost = make(map[connectMethodKey]int) } - return ch + t.connsPerHost[w.key] = n + 1 + go t.dialConnFor(w) + return + } + + if t.connsPerHostWait == nil { + t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue) + } + q := t.connsPerHostWait[w.key] + q.pushBack(w) + t.connsPerHostWait[w.key] = q +} + +// dialConnFor dials on behalf of w and delivers the result to w. +// dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()]. +// If the dial is cancelled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()]. +func (t *Transport) dialConnFor(w *wantConn) { + defer w.afterDial() + + pc, err := t.dialConn(w.ctx, w.cm) + delivered := w.tryDeliver(pc, err) + if err == nil && (!delivered || pc.alt != nil) { + // pconn was not passed to w, + // or it is HTTP/2 and can be shared. + // Add to the idle connection pool. + t.putOrCloseIdleConn(pc) } - if t.connPerHostCount == nil { - t.connPerHostCount = make(map[connectMethodKey]int) + if err != nil { + t.decConnsPerHost(w.key) } - t.connPerHostCount[cmKey]++ - // return a closed channel to avoid race: if decHostConnCount is called - // after incHostConnCount and during the nil check, decHostConnCount - // will delete the channel since it's not being listened on yet. - return connsPerHostClosedCh } -// decHostConnCount decrements the count of connections -// for a given host. -// See Transport.MaxConnsPerHost. -func (t *Transport) decHostConnCount(cmKey connectMethodKey) { +// decConnsPerHost decrements the per-host connection count for key, +// which may in turn give a different waiting goroutine permission to dial. +func (t *Transport) decConnsPerHost(key connectMethodKey) { if t.MaxConnsPerHost <= 0 { return } - t.connCountMu.Lock() - defer t.connCountMu.Unlock() - t.connPerHostCount[cmKey]-- - select { - case t.connPerHostAvailable[cmKey] <- struct{}{}: - default: - // close channel before deleting avoids getConn waiting forever in - // case getConn has reference to channel but hasn't started waiting. - // This could lead to more than MaxConnsPerHost in the unlikely case - // that > 1 go routine has fetched the channel but none started waiting. - if t.connPerHostAvailable[cmKey] != nil { - close(t.connPerHostAvailable[cmKey]) + + t.connsPerHostMu.Lock() + defer t.connsPerHostMu.Unlock() + n := t.connsPerHost[key] + if n == 0 { + // Shouldn't happen, but if it does, the counting is buggy and could + // easily lead to a silent deadlock, so report the problem loudly. + panic("net/http: internal error: connCount underflow") + } + + // Can we hand this count to a goroutine still waiting to dial? + // (Some goroutines on the wait list may have timed out or + // gotten a connection another way. If they're all gone, + // we don't want to kick off any spurious dial operations.) + if q := t.connsPerHostWait[key]; q.len() > 0 { + done := false + for q.len() > 0 { + w := q.popFront() + if w.waiting() { + go t.dialConnFor(w) + done = true + break + } + } + if q.len() == 0 { + delete(t.connsPerHostWait, key) + } else { + // q is a value (like a slice), so we have to store + // the updated q back into the map. + t.connsPerHostWait[key] = q + } + if done { + return } - delete(t.connPerHostAvailable, cmKey) - } - if t.connPerHostCount[cmKey] == 0 { - delete(t.connPerHostCount, cmKey) } -} - -// connCloseListener wraps a connection, the transport that dialed it -// and the connected-to host key so the host connection count can be -// transparently decremented by whatever closes the embedded connection. -type connCloseListener struct { - net.Conn - t *Transport - cmKey connectMethodKey - didClose int32 -} -func (c *connCloseListener) Close() error { - if atomic.AddInt32(&c.didClose, 1) != 1 { - return nil + // Otherwise, decrement the recorded count. + if n--; n == 0 { + delete(t.connsPerHost, key) + } else { + t.connsPerHost[key] = n } - err := c.Conn.Close() - c.t.decHostConnCount(c.cmKey) - return err } // The connect method and the transport can both specify a TLS @@ -1283,8 +1392,8 @@ func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) erro return nil } -func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) { - pconn := &persistConn{ +func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { + pconn = &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), @@ -1423,9 +1532,6 @@ func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistCon } } - if t.MaxConnsPerHost > 0 { - pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey} - } pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) @@ -1631,7 +1737,7 @@ func (pc *persistConn) canceled() error { return pc.canceledErr } -// isReused reports whether this connection is in a known broken state. +// isReused reports whether this connection has been used before. func (pc *persistConn) isReused() bool { pc.mu.Lock() r := pc.reused @@ -2119,10 +2225,12 @@ func (pc *persistConn) wroteRequest() bool { // but the server has already replied. In this case, we don't // want to wait too long, and we want to return false so this // connection isn't re-used. + t := time.NewTimer(maxWriteWaitBeforeConnReuse) + defer t.Stop() select { case err := <-pc.writeErrCh: return err == nil - case <-time.After(maxWriteWaitBeforeConnReuse): + case <-t.C: return false } } @@ -2374,10 +2482,10 @@ func (pc *persistConn) closeLocked(err error) { pc.broken = true if pc.closed == nil { pc.closed = err - if pc.alt != nil { - // Clean up any host connection counting. - pc.t.decHostConnCount(pc.cacheKey) - } else { + pc.t.decConnsPerHost(pc.cacheKey) + // Close HTTP/1 (pc.alt == nil) connection. + // HTTP/2 closes its connection itself. + if pc.alt == nil { if err != errCallerOwnsConn { pc.conn.Close() } diff --git a/src/net/http/transport_test.go b/src/net/http/transport_test.go index 2b58e1daec..ea01a2017e 100644 --- a/src/net/http/transport_test.go +++ b/src/net/http/transport_test.go @@ -655,13 +655,17 @@ func TestTransportMaxConnsPerHost(t *testing.T) { expected := int32(tr.MaxConnsPerHost) if dialCnt != expected { - t.Errorf("Too many dials (%s): %d", scheme, dialCnt) + t.Errorf("round 1: too many dials (%s): %d != %d", scheme, dialCnt, expected) } if gotConnCnt != expected { - t.Errorf("Too many get connections (%s): %d", scheme, gotConnCnt) + t.Errorf("round 1: too many get connections (%s): %d != %d", scheme, gotConnCnt, expected) } if ts.TLS != nil && tlsHandshakeCnt != expected { - t.Errorf("Too many tls handshakes (%s): %d", scheme, tlsHandshakeCnt) + t.Errorf("round 1: too many tls handshakes (%s): %d != %d", scheme, tlsHandshakeCnt, expected) + } + + if t.Failed() { + t.FailNow() } (<-connCh).Close() @@ -670,13 +674,13 @@ func TestTransportMaxConnsPerHost(t *testing.T) { doReq() expected++ if dialCnt != expected { - t.Errorf("Too many dials (%s): %d", scheme, dialCnt) + t.Errorf("round 2: too many dials (%s): %d", scheme, dialCnt) } if gotConnCnt != expected { - t.Errorf("Too many get connections (%s): %d", scheme, gotConnCnt) + t.Errorf("round 2: too many get connections (%s): %d != %d", scheme, gotConnCnt, expected) } if ts.TLS != nil && tlsHandshakeCnt != expected { - t.Errorf("Too many tls handshakes (%s): %d", scheme, tlsHandshakeCnt) + t.Errorf("round 2: too many tls handshakes (%s): %d != %d", scheme, tlsHandshakeCnt, expected) } } @@ -2795,8 +2799,8 @@ func TestIdleConnChannelLeak(t *testing.T) { <-didRead } - if got := tr.IdleConnChMapSizeForTesting(); got != 0 { - t.Fatalf("ForDisableKeepAlives = %v, map size = %d; want 0", disableKeep, got) + if got := tr.IdleConnWaitMapSizeForTesting(); got != 0 { + t.Fatalf("for DisableKeepAlives = %v, map size = %d; want 0", disableKeep, got) } } } @@ -3378,9 +3382,9 @@ func TestTransportCloseIdleConnsThenReturn(t *testing.T) { } wantIdle("after second put", 0) - tr.RequestIdleConnChForTesting() // should toggle the transport out of idle mode + tr.QueueForIdleConnForTesting() // should toggle the transport out of idle mode if tr.IsIdleForTesting() { - t.Error("shouldn't be idle after RequestIdleConnChForTesting") + t.Error("shouldn't be idle after QueueForIdleConnForTesting") } if !tr.PutIdleTestConn("http", "example.com") { t.Fatal("after re-activation") @@ -3802,8 +3806,8 @@ func TestNoCrashReturningTransportAltConn(t *testing.T) { ln := newLocalListener(t) defer ln.Close() - handledPendingDial := make(chan bool, 1) - SetPendingDialHooks(nil, func() { handledPendingDial <- true }) + var wg sync.WaitGroup + SetPendingDialHooks(func() { wg.Add(1) }, wg.Done) defer SetPendingDialHooks(nil, nil) testDone := make(chan struct{}) @@ -3873,7 +3877,7 @@ func TestNoCrashReturningTransportAltConn(t *testing.T) { doReturned <- true <-madeRoundTripper - <-handledPendingDial + wg.Wait() } func TestTransportReuseConnection_Gzip_Chunked(t *testing.T) { -- 2.50.0