--- /dev/null
+pkg net/http, method (*ClientConn) Available() int #75772
+pkg net/http, method (*ClientConn) Close() error #75772
+pkg net/http, method (*ClientConn) Err() error #75772
+pkg net/http, method (*ClientConn) InFlight() int #75772
+pkg net/http, method (*ClientConn) Release() #75772
+pkg net/http, method (*ClientConn) Reserve() error #75772
+pkg net/http, method (*ClientConn) RoundTrip(*Request) (*Response, error) #75772
+pkg net/http, method (*ClientConn) SetStateHook(func(*ClientConn)) #75772
+pkg net/http, method (*Transport) NewClientConn(context.Context, string, string) (*ClientConn, error) #75772
+pkg net/http, type ClientConn struct #75772
--- /dev/null
+The new [Transport.NewClientConn] method returns a client connection
+to an HTTP server.
+Most users should continue to use [Transport.RoundTrip] to make requests,
+which manages a pool of connection.
+`NewClientConn` is useful for users who need to implement their own conection management.
--- /dev/null
+// Copyright 2025 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package http
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net"
+ "net/http/httptrace"
+ "net/url"
+ "sync"
+)
+
+// A ClientConn is a client connection to an HTTP server.
+//
+// Unlike a [Transport], a ClientConn represents a single connection.
+// Most users should use a Transport rather than creating client connections directly.
+type ClientConn struct {
+ cc genericClientConn
+
+ stateHookMu sync.Mutex
+ userStateHook func(*ClientConn)
+ stateHookRunning bool
+ lastAvailable int
+ lastInFlight int
+ lastClosed bool
+}
+
+// newClientConner is the interface implemented by HTTP/2 transports to create new client conns.
+//
+// The http package (this package) needs a way to ask the http2 package to
+// create a client connection.
+//
+// Transport.TLSNextProto["h2"] contains a function which appears to do this,
+// but for historical reasons it does not: The TLSNextProto function adds a
+// *tls.Conn to the http2.Transport's connection pool and returns a RoundTripper
+// which is backed by that connection pool. NewClientConn needs a way to get a
+// single client connection out of the http2 package.
+//
+// The http2 package registers a RoundTripper with Transport.RegisterProtocol.
+// If this RoundTripper implements newClientConner, then Transport.NewClientConn will use
+// it to create new HTTP/2 client connections.
+type newClientConner interface {
+ // NewClientConn creates a new client connection from a net.Conn.
+ //
+ // The RoundTripper returned by NewClientConn must implement genericClientConn.
+ // (We don't define NewClientConn as returning genericClientConn,
+ // because either we'd need to make genericClientConn an exported type
+ // or define it as a type alias. Neither is particularly appealing.)
+ //
+ // The state hook passed here is the internal state hook
+ // (ClientConn.maybeRunStateHook). The internal state hook calls
+ // the user state hook (if any), which is set by the user with
+ // ClientConn.SetStateHook.
+ //
+ // The client connection should arrange to call the internal state hook
+ // when the connection closes, when requests complete, and when the
+ // connection concurrency limit changes.
+ //
+ // The client connection must call the internal state hook when the connection state
+ // changes asynchronously, such as when a request completes.
+ //
+ // The internal state hook need not be called after synchronous changes to the state:
+ // Close, Reserve, Release, and RoundTrip calls which don't start a request
+ // do not need to call the hook.
+ //
+ // The general idea is that if we call (for example) Close,
+ // we know that the connection state has probably changed and we
+ // don't need the state hook to tell us that.
+ // However, if the connection closes asynchronously
+ // (because, for example, the other end of the conn closed it),
+ // the state hook needs to inform us.
+ NewClientConn(nc net.Conn, internalStateHook func()) (RoundTripper, error)
+}
+
+// genericClientConn is an interface implemented by HTTP/2 client conns
+// returned from newClientConner.NewClientConn.
+//
+// See the newClientConner doc comment for more information.
+type genericClientConn interface {
+ Close() error
+ Err() error
+ RoundTrip(req *Request) (*Response, error)
+ Reserve() error
+ Release()
+ Available() int
+ InFlight() int
+}
+
+// NewClientConn creates a new client connection to the given address.
+//
+// If scheme is "http", the connection is unencrypted.
+// If scheme is "https", the connection uses TLS.
+//
+// The protocol used for the new connection is determined by the scheme,
+// Transport.Protocols configuration field, and protocols supported by the
+// server. See Transport.Protocols for more details.
+//
+// If Transport.Proxy is set and indicates that a request sent to the given
+// address should use a proxy, the new connection uses that proxy.
+//
+// NewClientConn always creates a new connection,
+// even if the Transport has an existing cached connection to the given host.
+//
+// The new connection is not added to the Transport's connection cache,
+// and will not be used by [Transport.RoundTrip].
+// It does not count against the MaxIdleConns and MaxConnsPerHost limits.
+//
+// The caller is responsible for closing the new connection.
+func (t *Transport) NewClientConn(ctx context.Context, scheme, address string) (*ClientConn, error) {
+ t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
+
+ switch scheme {
+ case "http", "https":
+ default:
+ return nil, fmt.Errorf("net/http: invalid scheme %q", scheme)
+ }
+
+ host, port, err := net.SplitHostPort(address)
+ if err != nil {
+ return nil, err
+ }
+ if port == "" {
+ port = schemePort(scheme)
+ }
+
+ var proxyURL *url.URL
+ if t.Proxy != nil {
+ // Transport.Proxy takes a *Request, so create a fake one to pass it.
+ req := &Request{
+ ctx: ctx,
+ Method: "GET",
+ URL: &url.URL{
+ Scheme: scheme,
+ Host: host,
+ Path: "/",
+ },
+ Proto: "HTTP/1.1",
+ ProtoMajor: 1,
+ ProtoMinor: 1,
+ Header: make(Header),
+ Body: NoBody,
+ Host: host,
+ }
+ var err error
+ proxyURL, err = t.Proxy(req)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ cm := connectMethod{
+ targetScheme: scheme,
+ targetAddr: net.JoinHostPort(host, port),
+ proxyURL: proxyURL,
+ }
+
+ // The state hook is a bit tricky:
+ // The persistConn has a state hook which calls ClientConn.maybeRunStateHook,
+ // which in turn calls the user-provided state hook (if any).
+ //
+ // ClientConn.maybeRunStateHook handles debouncing hook calls for both
+ // HTTP/1 and HTTP/2.
+ //
+ // Since there's no need to change the persistConn's hook, we set it at creation time.
+ cc := &ClientConn{}
+ const isClientConn = true
+ pconn, err := t.dialConn(ctx, cm, isClientConn, cc.maybeRunStateHook)
+ if err != nil {
+ return nil, err
+ }
+
+ // Note that cc.maybeRunStateHook may have been called
+ // in the short window between dialConn and now.
+ // This is fine.
+ cc.stateHookMu.Lock()
+ defer cc.stateHookMu.Unlock()
+ if pconn.alt != nil {
+ // If pconn.alt is set, this is a connection implemented in another package
+ // (probably x/net/http2) or the bundled copy in h2_bundle.go.
+ gc, ok := pconn.alt.(genericClientConn)
+ if !ok {
+ return nil, errors.New("http: NewClientConn returned something that is not a ClientConn")
+ }
+ cc.cc = gc
+ cc.lastAvailable = gc.Available()
+ } else {
+ // This is an HTTP/1 connection.
+ pconn.availch = make(chan struct{}, 1)
+ pconn.availch <- struct{}{}
+ cc.cc = http1ClientConn{pconn}
+ cc.lastAvailable = 1
+ }
+ return cc, nil
+}
+
+// Close closes the connection.
+// Outstanding RoundTrip calls are interrupted.
+func (cc *ClientConn) Close() error {
+ defer cc.maybeRunStateHook()
+ return cc.cc.Close()
+}
+
+// Err reports any fatal connection errors.
+// It returns nil if the connection is usable.
+// If it returns non-nil, the connection can no longer be used.
+func (cc *ClientConn) Err() error {
+ return cc.cc.Err()
+}
+
+func validateClientConnRequest(req *Request) error {
+ if req.URL == nil {
+ return errors.New("http: nil Request.URL")
+ }
+ if req.Header == nil {
+ return errors.New("http: nil Request.Header")
+ }
+ // Validate the outgoing headers.
+ if err := validateHeaders(req.Header); err != "" {
+ return fmt.Errorf("http: invalid header %s", err)
+ }
+ // Validate the outgoing trailers too.
+ if err := validateHeaders(req.Trailer); err != "" {
+ return fmt.Errorf("http: invalid trailer %s", err)
+ }
+ if req.Method != "" && !validMethod(req.Method) {
+ return fmt.Errorf("http: invalid method %q", req.Method)
+ }
+ if req.URL.Host == "" {
+ return errors.New("http: no Host in request URL")
+ }
+ return nil
+}
+
+// RoundTrip implements the [RoundTripper] interface.
+//
+// The request is sent on the client connection,
+// regardless of the URL being requested or any proxy settings.
+//
+// If the connection is at its concurrency limit,
+// RoundTrip waits for the connection to become available
+// before sending the request.
+func (cc *ClientConn) RoundTrip(req *Request) (*Response, error) {
+ defer cc.maybeRunStateHook()
+ if err := validateClientConnRequest(req); err != nil {
+ cc.Release()
+ return nil, err
+ }
+ return cc.cc.RoundTrip(req)
+}
+
+// Available reports the number of requests that may be sent
+// to the connection without blocking.
+// It returns 0 if the connection is closed.
+func (cc *ClientConn) Available() int {
+ return cc.cc.Available()
+}
+
+// InFlight reports the number of requests in flight,
+// including reserved requests.
+// It returns 0 if the connection is closed.
+func (cc *ClientConn) InFlight() int {
+ return cc.cc.InFlight()
+}
+
+// Reserve reserves a concurrency slot on the connection.
+// If Reserve returns nil, one additional RoundTrip call may be made
+// without waiting for an existing request to complete.
+//
+// The reserved concurrency slot is accounted as an in-flight request.
+// A successful call to RoundTrip will decrement the Available count
+// and increment the InFlight count.
+//
+// Each successful call to Reserve should be followed by exactly one call
+// to RoundTrip or Release, which will consume or release the reservation.
+//
+// If the connection is closed or at its concurrency limit,
+// Reserve returns an error.
+func (cc *ClientConn) Reserve() error {
+ defer cc.maybeRunStateHook()
+ return cc.cc.Reserve()
+}
+
+// Release releases an unused concurrency slot reserved by Reserve.
+// If there are no reserved concurrency slots, it has no effect.
+func (cc *ClientConn) Release() {
+ defer cc.maybeRunStateHook()
+ cc.cc.Release()
+}
+
+// shouldRunStateHook returns the user's state hook if we should call it,
+// or nil if we don't need to call it at this time.
+func (cc *ClientConn) shouldRunStateHook(stopRunning bool) func(*ClientConn) {
+ cc.stateHookMu.Lock()
+ defer cc.stateHookMu.Unlock()
+ if cc.cc == nil {
+ return nil
+ }
+ if stopRunning {
+ cc.stateHookRunning = false
+ }
+ if cc.userStateHook == nil {
+ return nil
+ }
+ if cc.stateHookRunning {
+ return nil
+ }
+ var (
+ available = cc.Available()
+ inFlight = cc.InFlight()
+ closed = cc.Err() != nil
+ )
+ var hook func(*ClientConn)
+ if available > cc.lastAvailable || inFlight < cc.lastInFlight || closed != cc.lastClosed {
+ hook = cc.userStateHook
+ cc.stateHookRunning = true
+ }
+ cc.lastAvailable = available
+ cc.lastInFlight = inFlight
+ cc.lastClosed = closed
+ return hook
+}
+
+func (cc *ClientConn) maybeRunStateHook() {
+ hook := cc.shouldRunStateHook(false)
+ if hook == nil {
+ return
+ }
+ // Run the hook synchronously.
+ //
+ // This means that if, for example, the user calls resp.Body.Close to finish a request,
+ // the Close call will synchronously run the hook, giving the hook the chance to
+ // return the ClientConn to a connection pool before the next request is made.
+ hook(cc)
+ // The connection state may have changed while the hook was running,
+ // in which case we need to run it again.
+ //
+ // If we do need to run the hook again, do so in a new goroutine to avoid blocking
+ // the current goroutine indefinitely.
+ hook = cc.shouldRunStateHook(true)
+ if hook != nil {
+ go func() {
+ for hook != nil {
+ hook(cc)
+ hook = cc.shouldRunStateHook(true)
+ }
+ }()
+ }
+}
+
+// SetStateHook arranges for f to be called when the state of the connection changes.
+// At most one call to f is made at a time.
+// If the connection's state has changed since it was created,
+// f is called immediately in a separate goroutine.
+// f may be called synchronously from RoundTrip or Response.Body.Close.
+//
+// If SetStateHook is called multiple times, the new hook replaces the old one.
+// If f is nil, no further calls will be made to f after SetStateHook returns.
+//
+// f is called when Available increases (more requests may be sent on the connection),
+// InFlight decreases (existing requests complete), or Err begins returning non-nil
+// (the connection is no longer usable).
+func (cc *ClientConn) SetStateHook(f func(*ClientConn)) {
+ cc.stateHookMu.Lock()
+ cc.userStateHook = f
+ cc.stateHookMu.Unlock()
+ cc.maybeRunStateHook()
+}
+
+// http1ClientConn is a genericClientConn implementation backed by
+// an HTTP/1 *persistConn (pconn.alt is nil).
+type http1ClientConn struct {
+ pconn *persistConn
+}
+
+func (cc http1ClientConn) RoundTrip(req *Request) (*Response, error) {
+ ctx := req.Context()
+ trace := httptrace.ContextClientTrace(ctx)
+
+ // Convert Request.Cancel into context cancelation.
+ ctx, cancel := context.WithCancelCause(req.Context())
+ if req.Cancel != nil {
+ go awaitLegacyCancel(ctx, cancel, req)
+ }
+
+ treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
+ resp, err := cc.pconn.roundTrip(treq)
+ if err != nil {
+ return nil, err
+ }
+ resp.Request = req
+ return resp, nil
+}
+
+func (cc http1ClientConn) Close() error {
+ cc.pconn.close(errors.New("ClientConn closed"))
+ return nil
+}
+
+func (cc http1ClientConn) Err() error {
+ select {
+ case <-cc.pconn.closech:
+ return cc.pconn.closed
+ default:
+ return nil
+ }
+}
+
+func (cc http1ClientConn) Available() int {
+ cc.pconn.mu.Lock()
+ defer cc.pconn.mu.Unlock()
+ if cc.pconn.closed != nil || cc.pconn.reserved || cc.pconn.inFlight {
+ return 0
+ }
+ return 1
+}
+
+func (cc http1ClientConn) InFlight() int {
+ cc.pconn.mu.Lock()
+ defer cc.pconn.mu.Unlock()
+ if cc.pconn.closed == nil && (cc.pconn.reserved || cc.pconn.inFlight) {
+ return 1
+ }
+ return 0
+}
+
+func (cc http1ClientConn) Reserve() error {
+ cc.pconn.mu.Lock()
+ defer cc.pconn.mu.Unlock()
+ if cc.pconn.closed != nil {
+ return cc.pconn.closed
+ }
+ select {
+ case <-cc.pconn.availch:
+ default:
+ return errors.New("connection is unavailable")
+ }
+ cc.pconn.reserved = true
+ return nil
+}
+
+func (cc http1ClientConn) Release() {
+ cc.pconn.mu.Lock()
+ defer cc.pconn.mu.Unlock()
+ if cc.pconn.reserved {
+ select {
+ case cc.pconn.availch <- struct{}{}:
+ default:
+ panic("cannot release reservation")
+ }
+ cc.pconn.reserved = false
+ }
+}
--- /dev/null
+// Copyright 2025 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package http_test
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "testing/synctest"
+)
+
+func TestTransportNewClientConnRoundTrip(t *testing.T) { run(t, testTransportNewClientConnRoundTrip) }
+func testTransportNewClientConnRoundTrip(t *testing.T, mode testMode) {
+ cst := newClientServerTest(t, mode, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ io.WriteString(w, req.Host)
+ }), optFakeNet)
+
+ scheme := mode.Scheme() // http or https
+ cc, err := cst.tr.NewClientConn(t.Context(), scheme, cst.ts.Listener.Addr().String())
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer cc.Close()
+
+ // Send requests for a couple different domains.
+ // All use the same connection.
+ for _, host := range []string{"example.tld", "go.dev"} {
+ req, _ := http.NewRequest("GET", fmt.Sprintf("%v://%v/", scheme, host), nil)
+ resp, err := cc.RoundTrip(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ got, _ := io.ReadAll(resp.Body)
+ if string(got) != host {
+ t.Errorf("got response body %q, want %v", got, host)
+ }
+ resp.Body.Close()
+
+ // CloseIdleConnections does not close connections created by NewClientConn.
+ cst.tr.CloseIdleConnections()
+ }
+
+ if err := cc.Err(); err != nil {
+ t.Errorf("before close: ClientConn.Err() = %v, want nil", err)
+ }
+
+ cc.Close()
+ if err := cc.Err(); err == nil {
+ t.Errorf("after close: ClientConn.Err() = nil, want error")
+ }
+
+ req, _ := http.NewRequest("GET", scheme+"://example.tld/", nil)
+ resp, err := cc.RoundTrip(req)
+ if err == nil {
+ resp.Body.Close()
+ t.Errorf("after close: cc.RoundTrip succeeded, want error")
+ }
+ t.Log(err)
+}
+
+func newClientConnTest(t testing.TB, mode testMode, h http.HandlerFunc, opts ...any) (*clientServerTest, *http.ClientConn) {
+ if h == nil {
+ h = func(w http.ResponseWriter, req *http.Request) {}
+ }
+ cst := newClientServerTest(t, mode, h, opts...)
+ cc, err := cst.tr.NewClientConn(t.Context(), mode.Scheme(), cst.ts.Listener.Addr().String())
+ if err != nil {
+ t.Fatal(err)
+ }
+ t.Cleanup(func() {
+ cc.Close()
+ })
+ synctest.Wait()
+ return cst, cc
+}
+
+// TestClientConnReserveAll reserves every concurrency slot on a connection.
+func TestClientConnReserveAll(t *testing.T) { runSynctest(t, testClientConnReserveAll) }
+func testClientConnReserveAll(t *testing.T, mode testMode) {
+ cst, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
+ s.HTTP2 = &http.HTTP2Config{
+ MaxConcurrentStreams: 3,
+ }
+ })
+
+ want := 1
+ switch mode {
+ case http2Mode, http2UnencryptedMode:
+ want = cst.ts.Config.HTTP2.MaxConcurrentStreams
+ }
+ available := cc.Available()
+ if available != want {
+ t.Fatalf("cc.Available() = %v, want %v", available, want)
+ }
+
+ // Reserve every available concurrency slot on the connection.
+ for i := range available {
+ if err := cc.Reserve(); err != nil {
+ t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
+ }
+ if got, want := cc.Available(), available-i-1; got != want {
+ t.Fatalf("cc.Available() = %v, want %v", got, want)
+ }
+ if got, want := cc.InFlight(), i+1; got != want {
+ t.Fatalf("cc.InFlight() = %v, want %v", got, want)
+ }
+ }
+
+ // The next reservation attempt should fail, since every slot is consumed.
+ if err := cc.Reserve(); err == nil {
+ t.Fatalf("cc.Reserve() = nil, want error")
+ }
+}
+
+// TestClientConnReserveParallel starts concurrent goroutines which reserve every
+// concurrency slot on a connection.
+func TestClientConnReserveParallel(t *testing.T) { runSynctest(t, testClientConnReserveParallel) }
+func testClientConnReserveParallel(t *testing.T, mode testMode) {
+ _, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
+ s.HTTP2 = &http.HTTP2Config{
+ MaxConcurrentStreams: 3,
+ }
+ })
+ var (
+ wg sync.WaitGroup
+ mu sync.Mutex
+ success int
+ failure int
+ )
+ available := cc.Available()
+ const extra = 2
+ for range available + extra {
+ wg.Go(func() {
+ err := cc.Reserve()
+ mu.Lock()
+ defer mu.Unlock()
+ if err == nil {
+ success++
+ } else {
+ failure++
+ }
+ })
+ }
+ wg.Wait()
+
+ if got, want := success, available; got != want {
+ t.Errorf("%v successful reservations, want %v", got, want)
+ }
+ if got, want := failure, extra; got != want {
+ t.Errorf("%v failed reservations, want %v", got, want)
+ }
+}
+
+// TestClientConnReserveRelease repeatedly reserves and releases concurrency slots.
+func TestClientConnReserveRelease(t *testing.T) { runSynctest(t, testClientConnReserveRelease) }
+func testClientConnReserveRelease(t *testing.T, mode testMode) {
+ _, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
+ s.HTTP2 = &http.HTTP2Config{
+ MaxConcurrentStreams: 3,
+ }
+ })
+
+ available := cc.Available()
+ for i := range 2 * available {
+ if err := cc.Reserve(); err != nil {
+ t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
+ }
+ cc.Release()
+ }
+
+ if got, want := cc.Available(), available; got != want {
+ t.Fatalf("cc.Available() = %v, want %v", available, want)
+ }
+}
+
+// TestClientConnReserveAndConsume reserves a concurrency slot on a connection,
+// and then verifies that various events consume the reservation.
+func TestClientConnReserveAndConsume(t *testing.T) {
+ for _, test := range []struct {
+ name string
+ consume func(t *testing.T, cc *http.ClientConn, mode testMode)
+ handler func(w http.ResponseWriter, req *http.Request, donec chan struct{})
+ h1Closed bool
+ }{{
+ // Explicit release.
+ name: "release",
+ consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
+ cc.Release()
+ },
+ }, {
+ // Invalid request sent to RoundTrip.
+ name: "invalid field name",
+ consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
+ req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
+ req.Header["invalid field name"] = []string{"x"}
+ _, err := cc.RoundTrip(req)
+ if err == nil {
+ t.Fatalf("RoundTrip succeeded, want failure")
+ }
+ },
+ }, {
+ // Successful request/response cycle.
+ name: "body close",
+ consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
+ req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
+ resp, err := cc.RoundTrip(req)
+ if err != nil {
+ t.Fatalf("RoundTrip: %v", err)
+ }
+ resp.Body.Close()
+ },
+ }, {
+ // Request context canceled before headers received.
+ name: "cancel",
+ consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
+ ctx, cancel := context.WithCancel(t.Context())
+ go func() {
+ req, _ := http.NewRequestWithContext(ctx, "GET", mode.Scheme()+"://example.tld/", nil)
+ _, err := cc.RoundTrip(req)
+ if err == nil {
+ t.Errorf("RoundTrip succeeded, want failure")
+ }
+ }()
+ synctest.Wait()
+ cancel()
+ },
+ handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
+ <-donec
+ },
+ // An HTTP/1 connection is closed after a request is canceled on it.
+ h1Closed: true,
+ }, {
+ // Response body closed before full response received.
+ name: "early body close",
+ consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
+ req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
+ resp, err := cc.RoundTrip(req)
+ if err != nil {
+ t.Fatalf("RoundTrip: %v", err)
+ }
+ t.Logf("%T", resp.Body)
+ resp.Body.Close()
+ },
+ handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
+ w.WriteHeader(200)
+ http.NewResponseController(w).Flush()
+ <-donec
+ },
+ // An HTTP/1 connection is closed after a request is canceled on it.
+ h1Closed: true,
+ }} {
+ t.Run(test.name, func(t *testing.T) {
+ runSynctest(t, func(t *testing.T, mode testMode) {
+ donec := make(chan struct{})
+ defer close(donec)
+ handler := func(w http.ResponseWriter, req *http.Request) {
+ if test.handler != nil {
+ test.handler(w, req, donec)
+ }
+ }
+
+ _, cc := newClientConnTest(t, mode, handler, optFakeNet)
+ stateHookCalls := 0
+ cc.SetStateHook(func(cc *http.ClientConn) {
+ stateHookCalls++
+ })
+ synctest.Wait()
+ stateHookCalls = 0 // ignore any initial update call
+
+ avail := cc.Available()
+ if err := cc.Reserve(); err != nil {
+ t.Fatalf("cc.Reserve() = %v, want nil", err)
+ }
+ synctest.Wait()
+ if got, want := stateHookCalls, 0; got != want {
+ t.Errorf("connection state hook calls: %v, want %v", got, want)
+ }
+
+ test.consume(t, cc, mode)
+ synctest.Wait()
+
+ // State hook should be called, either to report the
+ // connection availability increasing or the connection closing.
+ if got, want := stateHookCalls, 1; got != want {
+ t.Errorf("connection state hook calls: %v, want %v", got, want)
+ }
+
+ if test.h1Closed && (mode == http1Mode || mode == https1Mode) {
+ if got, want := cc.Available(), 0; got != want {
+ t.Errorf("cc.Available() = %v, want %v", got, want)
+ }
+ if got, want := cc.InFlight(), 0; got != want {
+ t.Errorf("cc.InFlight() = %v, want %v", got, want)
+ }
+ if err := cc.Err(); err == nil {
+ t.Errorf("cc.Err() = nil, want closed connection")
+ }
+ } else {
+ if got, want := cc.Available(), avail; got != want {
+ t.Errorf("cc.Available() = %v, want %v", got, want)
+ }
+ if got, want := cc.InFlight(), 0; got != want {
+ t.Errorf("cc.InFlight() = %v, want %v", got, want)
+ }
+ if err := cc.Err(); err != nil {
+ t.Errorf("cc.Err() = %v, want nil", err)
+ }
+ }
+
+ if cc.Available() > 0 {
+ if err := cc.Reserve(); err != nil {
+ t.Errorf("cc.Reserve() = %v, want success", err)
+ }
+ }
+ })
+ })
+ }
+
+}
+
+// TestClientConnRoundTripBlocks verifies that RoundTrip blocks until a concurrency
+// slot is available on a connection.
+func TestClientConnRoundTripBlocks(t *testing.T) { runSynctest(t, testClientConnRoundTripBlocks) }
+func testClientConnRoundTripBlocks(t *testing.T, mode testMode) {
+ var handlerCalls atomic.Int64
+ requestc := make(chan struct{})
+ handler := func(w http.ResponseWriter, req *http.Request) {
+ handlerCalls.Add(1)
+ <-requestc
+ }
+ _, cc := newClientConnTest(t, mode, handler, optFakeNet, func(s *http.Server) {
+ s.HTTP2 = &http.HTTP2Config{
+ MaxConcurrentStreams: 3,
+ }
+ })
+
+ available := cc.Available()
+ var responses atomic.Int64
+ const extra = 2
+ for range available + extra {
+ go func() {
+ req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
+ resp, err := cc.RoundTrip(req)
+ responses.Add(1)
+ if err != nil {
+ t.Errorf("RoundTrip: %v", err)
+ return
+ }
+ resp.Body.Close()
+ }()
+ }
+
+ synctest.Wait()
+ if got, want := int(handlerCalls.Load()), available; got != want {
+ t.Errorf("got %v handler calls, want %v", got, want)
+ }
+ if got, want := int(responses.Load()), 0; got != want {
+ t.Errorf("got %v responses, want %v", got, want)
+ }
+
+ for i := range available + extra {
+ requestc <- struct{}{}
+ synctest.Wait()
+ if got, want := int(responses.Load()), i+1; got != want {
+ t.Errorf("got %v responses, want %v", got, want)
+ }
+ }
+}
http2UnencryptedMode = testMode("h2unencrypted") // HTTP/2
)
+func (m testMode) Scheme() string {
+ switch m {
+ case http1Mode, http2UnencryptedMode:
+ return "http"
+ case https1Mode, http2Mode:
+ return "https"
+ }
+ panic("unknown testMode")
+}
+
type testNotParallelOpt struct{}
var (
return errConnBroken
}
pconn.markReused()
+ if pconn.isClientConn {
+ // internalStateHook is always set for conns created by NewClientConn.
+ defer pconn.internalStateHook()
+ pconn.mu.Lock()
+ defer pconn.mu.Unlock()
+ if !pconn.inFlight {
+ panic("pconn is not in flight")
+ }
+ pconn.inFlight = false
+ select {
+ case pconn.availch <- struct{}{}:
+ default:
+ panic("unable to make pconn available")
+ }
+ return nil
+ }
t.idleMu.Lock()
defer t.idleMu.Unlock()
// removeIdleConn marks pconn as dead.
func (t *Transport) removeIdleConn(pconn *persistConn) bool {
+ if pconn.isClientConn {
+ return true
+ }
t.idleMu.Lock()
defer t.idleMu.Unlock()
return t.removeIdleConnLocked(pconn)
return
}
- pc, err := t.dialConn(ctx, w.cm)
+ const isClientConn = false
+ pc, err := t.dialConn(ctx, w.cm, isClientConn, nil)
delivered := w.tryDeliver(pc, err, time.Time{})
if err == nil && (!delivered || pc.alt != nil) {
// pconn was not passed to w,
var testHookProxyConnectTimeout = context.WithTimeout
-func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
+func (t *Transport) dialConn(ctx context.Context, cm connectMethod, isClientConn bool, internalStateHook func()) (pconn *persistConn, err error) {
pconn = &persistConn{
- t: t,
- cacheKey: cm.key(),
- reqch: make(chan requestAndChan, 1),
- writech: make(chan writeRequest, 1),
- closech: make(chan struct{}),
- writeErrCh: make(chan error, 1),
- writeLoopDone: make(chan struct{}),
+ t: t,
+ cacheKey: cm.key(),
+ reqch: make(chan requestAndChan, 1),
+ writech: make(chan writeRequest, 1),
+ closech: make(chan struct{}),
+ writeErrCh: make(chan error, 1),
+ writeLoopDone: make(chan struct{}),
+ isClientConn: isClientConn,
+ internalStateHook: internalStateHook,
}
trace := httptrace.ContextClientTrace(ctx)
wrapErr := func(err error) error {
t.Protocols != nil &&
t.Protocols.UnencryptedHTTP2() &&
!t.Protocols.HTTP1()
+
+ if isClientConn && (unencryptedHTTP2 || (pconn.tlsState != nil && pconn.tlsState.NegotiatedProtocol == "h2")) {
+ altProto, _ := t.altProto.Load().(map[string]RoundTripper)
+ h2, ok := altProto["https"].(newClientConner)
+ if !ok {
+ return nil, errors.New("http: HTTP/2 implementation does not support NewClientConn (update golang.org/x/net?)")
+ }
+ alt, err := h2.NewClientConn(pconn.conn, internalStateHook)
+ if err != nil {
+ pconn.conn.Close()
+ return nil, err
+ }
+ return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt, isClientConn: true}, nil
+ }
+
if unencryptedHTTP2 {
next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
if !ok {
// If it's non-nil, the rest of the fields are unused.
alt RoundTripper
- t *Transport
- cacheKey connectMethodKey
- conn net.Conn
- tlsState *tls.ConnectionState
- br *bufio.Reader // from conn
- bw *bufio.Writer // to conn
- nwrite int64 // bytes written
- reqch chan requestAndChan // written by roundTrip; read by readLoop
- writech chan writeRequest // written by roundTrip; read by writeLoop
- closech chan struct{} // closed when conn closed
- isProxy bool
- sawEOF bool // whether we've seen EOF from conn; owned by readLoop
- readLimit int64 // bytes allowed to be read; owned by readLoop
+ t *Transport
+ cacheKey connectMethodKey
+ conn net.Conn
+ tlsState *tls.ConnectionState
+ br *bufio.Reader // from conn
+ bw *bufio.Writer // to conn
+ nwrite int64 // bytes written
+ reqch chan requestAndChan // written by roundTrip; read by readLoop
+ writech chan writeRequest // written by roundTrip; read by writeLoop
+ closech chan struct{} // closed when conn closed
+ availch chan struct{} // ClientConn only: contains a value when conn is usable
+ isProxy bool
+ sawEOF bool // whether we've seen EOF from conn; owned by readLoop
+ isClientConn bool // whether this is a ClientConn (outside any pool)
+ readLimit int64 // bytes allowed to be read; owned by readLoop
// writeErrCh passes the request write error (usually nil)
// from the writeLoop goroutine to the readLoop which passes
// it off to the res.Body reader, which then uses it to decide
mu sync.Mutex // guards following fields
numExpectedResponses int
- closed error // set non-nil when conn is closed, before closech is closed
- canceledErr error // set non-nil if conn is canceled
- reused bool // whether conn has had successful request/response and is being reused.
+ closed error // set non-nil when conn is closed, before closech is closed
+ canceledErr error // set non-nil if conn is canceled
+ reused bool // whether conn has had successful request/response and is being reused.
+ reserved bool // ClientConn only: concurrency slot reserved
+ inFlight bool // ClientConn only: request is in flight
+ internalStateHook func() // ClientConn state hook
+
// mutateHeaderFunc is an optional func to modify extra
// headers on each outbound request before it's written. (the
// original Request given to RoundTrip is not modified)
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
+ if pc.internalStateHook != nil {
+ pc.internalStateHook()
+ }
}()
tryPutIdleConn := func(treq *transportRequest) bool {
testHookReadLoopBeforeNextRead = nop
)
+func (pc *persistConn) waitForAvailability(ctx context.Context) error {
+ select {
+ case <-pc.availch:
+ return nil
+ case <-pc.closech:
+ return pc.closed
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+}
+
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
+
pc.mu.Lock()
+ if pc.isClientConn {
+ if !pc.reserved {
+ pc.mu.Unlock()
+ if err := pc.waitForAvailability(req.ctx); err != nil {
+ return nil, err
+ }
+ pc.mu.Lock()
+ }
+ pc.reserved = false
+ pc.inFlight = true
+ }
pc.numExpectedResponses++
headerFn := pc.mutateHeaderFunc
pc.mu.Unlock()