}
t.Errorf("Test appears to have leaked %s:\n%s", bad, stacks)
}
+
+// waitCondition reports whether fn eventually returned true,
+// checking immediately and then every checkEvery amount,
+// until waitFor has elpased, at which point it returns false.
+func waitCondition(waitFor, checkEvery time.Duration, fn func() bool) bool {
+ deadline := time.Now().Add(waitFor)
+ for time.Now().Before(deadline) {
+ if fn() {
+ return true
+ }
+ time.Sleep(checkEvery)
+ }
+ return false
+}
type Transport struct {
idleMu sync.Mutex
wantIdle bool // user has requested to close all idle conns
+ idleCount int
idleConn map[connectMethodKey][]*persistConn
idleConnCh map[connectMethodKey]chan *persistConn
nextProtoOnce sync.Once
h2transport *http2Transport // non-nil if http2 wired up
- // TODO: tunable on global max cached connections
+ // TODO: MaxIdleConns tunable for global max cached connections (Issue 15461)
// TODO: tunable on timeout on cached connections
// TODO: tunable on max per-host TCP dials in flight (Issue 13957)
}
}
}
t.idleConn[key] = append(t.idleConn[key], pconn)
+ t.idleCount++
return nil
}
return ch
}
-func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) {
+func (t *Transport) getIdleConn(cm connectMethod) *persistConn {
key := cm.key()
t.idleMu.Lock()
defer t.idleMu.Unlock()
if t.idleConn == nil {
return nil
}
+ var pconn *persistConn
for {
pconns, ok := t.idleConn[key]
if !ok {
pconn = pconns[len(pconns)-1]
t.idleConn[key] = pconns[:len(pconns)-1]
}
- if !pconn.isBroken() {
- return
+ t.idleCount--
+ if pconn.isBroken() {
+ // There is a tiny window where this is
+ // possible, between the connecting dying and
+ // the persistConn readLoop calling
+ // Transport.removeIdleConn. Just skip it and
+ // carry on.
+ continue
+ }
+ return pconn
+ }
+}
+
+// removeIdleConn marks pconn as dead.
+func (t *Transport) removeIdleConn(pconn *persistConn) {
+ key := pconn.cacheKey
+ t.idleMu.Lock()
+ defer t.idleMu.Unlock()
+
+ pconns, _ := t.idleConn[key]
+ switch len(pconns) {
+ case 0:
+ // Nothing
+ case 1:
+ if pconns[0] == pconn {
+ t.idleCount--
+ delete(t.idleConn, key)
+ }
+ default:
+ // TODO(bradfitz): map into LRU element?
+ for i, v := range pconns {
+ if v != pconn {
+ continue
+ }
+ pconns[i] = pconns[len(pconns)-1]
+ t.idleConn[key] = pconns[:len(pconns)-1]
+ t.idleCount--
+ break
}
}
}
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
- defer func() { pc.close(closeErr) }()
+ defer func() {
+ pc.close(closeErr)
+ pc.t.removeIdleConn(pc)
+ }()
tryPutIdleConn := func() bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
}
}
+func TestTransportRemovesDeadIdleConnections(t *testing.T) {
+ defer afterTest(t)
+ ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
+ io.WriteString(w, r.RemoteAddr)
+ }))
+ defer ts.Close()
+
+ tr := &Transport{}
+ defer tr.CloseIdleConnections()
+ c := &Client{Transport: tr}
+
+ doReq := func(name string) string {
+ // Do a POST instead of a GET to prevent the Transport's
+ // idempotent request retry logic from kicking in...
+ res, err := c.Post(ts.URL, "", nil)
+ if err != nil {
+ t.Fatalf("%s: %v", name, err)
+ }
+ if res.StatusCode != 200 {
+ t.Fatalf("%s: %v", name, res.Status)
+ }
+ defer res.Body.Close()
+ slurp, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ t.Fatalf("%s: %v", name, err)
+ }
+ return string(slurp)
+ }
+
+ first := doReq("first")
+ keys1 := tr.IdleConnKeysForTesting()
+
+ ts.CloseClientConnections()
+
+ var keys2 []string
+ if !waitCondition(3*time.Second, 50*time.Millisecond, func() bool {
+ keys2 = tr.IdleConnKeysForTesting()
+ return len(keys2) == 0
+ }) {
+ t.Fatalf("Transport didn't notice idle connection's death.\nbefore: %q\n after: %q\n", keys1, keys2)
+ }
+
+ second := doReq("second")
+ if first == second {
+ t.Errorf("expected a different connection between requests. got %q both times", first)
+ }
+}
+
func TestTransportServerClosingUnexpectedly(t *testing.T) {
setParallel(t)
defer afterTest(t)