From 2e30218223a7bf2b560fbaf79bac8d80ea4ece1c Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 26 Apr 2016 15:43:04 -0700 Subject: [PATCH] net/http: remove idle transport connections from Transport when server closes Previously the Transport would cache idle connections from the Transport for later reuse, but if a peer server disconnected (e.g. idle timeout), we would not proactively remove the *persistConn from the Transport's idle list, leading to a waste of memory (potentially forever). Instead, when the persistConn's readLoop terminates, remote it from the idle list, if present. This also adds the beginning of accounting for the total number of idle connections, which will be needed for Transport.MaxIdleConns later. Updates #15461 Change-Id: Iab091f180f8dd1ee0d78f34b9705d68743b5557b Reviewed-on: https://go-review.googlesource.com/22492 Reviewed-by: Andrew Gerrand --- src/net/http/main_test.go | 14 +++++++++ src/net/http/transport.go | 52 ++++++++++++++++++++++++++++++---- src/net/http/transport_test.go | 48 +++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 5 deletions(-) diff --git a/src/net/http/main_test.go b/src/net/http/main_test.go index 1163874ac2..d10fd89b54 100644 --- a/src/net/http/main_test.go +++ b/src/net/http/main_test.go @@ -120,3 +120,17 @@ func afterTest(t testing.TB) { } t.Errorf("Test appears to have leaked %s:\n%s", bad, stacks) } + +// waitCondition reports whether fn eventually returned true, +// checking immediately and then every checkEvery amount, +// until waitFor has elpased, at which point it returns false. +func waitCondition(waitFor, checkEvery time.Duration, fn func() bool) bool { + deadline := time.Now().Add(waitFor) + for time.Now().Before(deadline) { + if fn() { + return true + } + time.Sleep(checkEvery) + } + return false +} diff --git a/src/net/http/transport.go b/src/net/http/transport.go index 0568822737..3ccc6dd0df 100644 --- a/src/net/http/transport.go +++ b/src/net/http/transport.go @@ -65,6 +65,7 @@ const DefaultMaxIdleConnsPerHost = 2 type Transport struct { idleMu sync.Mutex wantIdle bool // user has requested to close all idle conns + idleCount int idleConn map[connectMethodKey][]*persistConn idleConnCh map[connectMethodKey]chan *persistConn @@ -166,7 +167,7 @@ type Transport struct { nextProtoOnce sync.Once h2transport *http2Transport // non-nil if http2 wired up - // TODO: tunable on global max cached connections + // TODO: MaxIdleConns tunable for global max cached connections (Issue 15461) // TODO: tunable on timeout on cached connections // TODO: tunable on max per-host TCP dials in flight (Issue 13957) } @@ -613,6 +614,7 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { } } t.idleConn[key] = append(t.idleConn[key], pconn) + t.idleCount++ return nil } @@ -638,13 +640,14 @@ func (t *Transport) getIdleConnCh(cm connectMethod) chan *persistConn { return ch } -func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) { +func (t *Transport) getIdleConn(cm connectMethod) *persistConn { key := cm.key() t.idleMu.Lock() defer t.idleMu.Unlock() if t.idleConn == nil { return nil } + var pconn *persistConn for { pconns, ok := t.idleConn[key] if !ok { @@ -659,8 +662,44 @@ func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn) { pconn = pconns[len(pconns)-1] t.idleConn[key] = pconns[:len(pconns)-1] } - if !pconn.isBroken() { - return + t.idleCount-- + if pconn.isBroken() { + // There is a tiny window where this is + // possible, between the connecting dying and + // the persistConn readLoop calling + // Transport.removeIdleConn. Just skip it and + // carry on. + continue + } + return pconn + } +} + +// removeIdleConn marks pconn as dead. +func (t *Transport) removeIdleConn(pconn *persistConn) { + key := pconn.cacheKey + t.idleMu.Lock() + defer t.idleMu.Unlock() + + pconns, _ := t.idleConn[key] + switch len(pconns) { + case 0: + // Nothing + case 1: + if pconns[0] == pconn { + t.idleCount-- + delete(t.idleConn, key) + } + default: + // TODO(bradfitz): map into LRU element? + for i, v := range pconns { + if v != pconn { + continue + } + pconns[i] = pconns[len(pconns)-1] + t.idleConn[key] = pconns[:len(pconns)-1] + t.idleCount-- + break } } } @@ -1120,7 +1159,10 @@ func (pc *persistConn) cancelRequest() { func (pc *persistConn) readLoop() { closeErr := errReadLoopExiting // default value, if not changed below - defer func() { pc.close(closeErr) }() + defer func() { + pc.close(closeErr) + pc.t.removeIdleConn(pc) + }() tryPutIdleConn := func() bool { if err := pc.t.tryPutIdleConn(pc); err != nil { diff --git a/src/net/http/transport_test.go b/src/net/http/transport_test.go index 1aa26610b0..2e27cc1850 100644 --- a/src/net/http/transport_test.go +++ b/src/net/http/transport_test.go @@ -438,6 +438,54 @@ func TestTransportMaxPerHostIdleConns(t *testing.T) { } } +func TestTransportRemovesDeadIdleConnections(t *testing.T) { + defer afterTest(t) + ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { + io.WriteString(w, r.RemoteAddr) + })) + defer ts.Close() + + tr := &Transport{} + defer tr.CloseIdleConnections() + c := &Client{Transport: tr} + + doReq := func(name string) string { + // Do a POST instead of a GET to prevent the Transport's + // idempotent request retry logic from kicking in... + res, err := c.Post(ts.URL, "", nil) + if err != nil { + t.Fatalf("%s: %v", name, err) + } + if res.StatusCode != 200 { + t.Fatalf("%s: %v", name, res.Status) + } + defer res.Body.Close() + slurp, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Fatalf("%s: %v", name, err) + } + return string(slurp) + } + + first := doReq("first") + keys1 := tr.IdleConnKeysForTesting() + + ts.CloseClientConnections() + + var keys2 []string + if !waitCondition(3*time.Second, 50*time.Millisecond, func() bool { + keys2 = tr.IdleConnKeysForTesting() + return len(keys2) == 0 + }) { + t.Fatalf("Transport didn't notice idle connection's death.\nbefore: %q\n after: %q\n", keys1, keys2) + } + + second := doReq("second") + if first == second { + t.Errorf("expected a different connection between requests. got %q both times", first) + } +} + func TestTransportServerClosingUnexpectedly(t *testing.T) { setParallel(t) defer afterTest(t) -- 2.48.1