return 1, io.EOF
}
+// transferBodyReader is an io.Reader that reads from tw.Body
+// and records any non-EOF error in tw.bodyReadError.
+// It is exactly 1 pointer wide to avoid allocations into interfaces.
+type transferBodyReader struct{ tw *transferWriter }
+
+func (br transferBodyReader) Read(p []byte) (n int, err error) {
+ n, err = br.tw.Body.Read(p)
+ if err != nil && err != io.EOF {
+ br.tw.bodyReadError = err
+ }
+ return
+}
+
// transferWriter inspects the fields of a user-supplied Request or Response,
// sanitizes them without changing the user object and provides methods for
// writing the respective header, body and trailer in wire format.
TransferEncoding []string
Trailer Header
IsResponse bool
+ bodyReadError error // any non-EOF error from reading Body
FlushHeaders bool // flush headers to network before body
ByteReadCh chan readResult // non-nil if probeRequestBody called
// Write body
if t.Body != nil {
+ var body = transferBodyReader{t}
if chunked(t.TransferEncoding) {
if bw, ok := w.(*bufio.Writer); ok && !t.IsResponse {
w = &internal.FlushAfterChunkWriter{Writer: bw}
}
cw := internal.NewChunkedWriter(w)
- _, err = io.Copy(cw, t.Body)
+ _, err = io.Copy(cw, body)
if err == nil {
err = cw.Close()
}
} else if t.ContentLength == -1 {
- ncopy, err = io.Copy(w, t.Body)
+ ncopy, err = io.Copy(w, body)
} else {
- ncopy, err = io.Copy(w, io.LimitReader(t.Body, t.ContentLength))
+ ncopy, err = io.Copy(w, io.LimitReader(body, t.ContentLength))
if err != nil {
return err
}
var nextra int64
- nextra, err = io.Copy(ioutil.Discard, t.Body)
+ nextra, err = io.Copy(ioutil.Discard, body)
ncopy += nextra
}
if err != nil {
}
// transportRequest is a wrapper around a *Request that adds
-// optional extra headers to write.
+// optional extra headers to write and stores any error to return
+// from roundTrip.
type transportRequest struct {
*Request // original request, not to be mutated
extra Header // extra headers to write, or nil
trace *httptrace.ClientTrace // optional
+
+ mu sync.Mutex // guards err
+ err error // first setError value for mapRoundTripError to consider
}
func (tr *transportRequest) extraHeaders() Header {
return tr.extra
}
+func (tr *transportRequest) setError(err error) {
+ tr.mu.Lock()
+ if tr.err == nil {
+ tr.err = err
+ }
+ tr.mu.Unlock()
+}
+
// RoundTrip implements the RoundTripper interface.
//
// For higher-level HTTP client support (such as handling of cookies
pc.close(errIdleConnTimeout)
}
-// mapRoundTripErrorFromReadLoop maps the provided readLoop error into
-// the error value that should be returned from persistConn.roundTrip.
+// mapRoundTripError returns the appropriate error value for
+// persistConn.roundTrip.
+//
+// The provided err is the first error that (*persistConn).roundTrip
+// happened to receive from its select statement.
//
// The startBytesWritten value should be the value of pc.nwrite before the roundTrip
// started writing the request.
-func (pc *persistConn) mapRoundTripErrorFromReadLoop(req *Request, startBytesWritten int64, err error) (out error) {
+func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error {
if err == nil {
return nil
}
- if err := pc.canceled(); err != nil {
- return err
+
+ // If the request was canceled, that's better than network
+ // failures that were likely the result of tearing down the
+ // connection.
+ if cerr := pc.canceled(); cerr != nil {
+ return cerr
+ }
+
+ // See if an error was set explicitly.
+ req.mu.Lock()
+ reqErr := req.err
+ req.mu.Unlock()
+ if reqErr != nil {
+ return reqErr
}
+
if err == errServerClosedIdle {
+ // Don't decorate
return err
}
+
if _, ok := err.(transportReadFromServerError); ok {
+ // Don't decorate
return err
}
if pc.isBroken() {
if pc.nwrite == startBytesWritten && req.outgoingLength() == 0 {
return nothingWrittenError{err}
}
+ return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %v", err)
}
return err
}
-// mapRoundTripErrorAfterClosed returns the error value to be propagated
-// up to Transport.RoundTrip method when persistConn.roundTrip sees
-// its pc.closech channel close, indicating the persistConn is dead.
-// (after closech is closed, pc.closed is valid).
-func (pc *persistConn) mapRoundTripErrorAfterClosed(req *Request, startBytesWritten int64) error {
- if err := pc.canceled(); err != nil {
- return err
- }
- err := pc.closed
- if err == errServerClosedIdle {
- // Don't decorate
- return err
- }
- if _, ok := err.(transportReadFromServerError); ok {
- // Don't decorate
- return err
- }
-
- // Wait for the writeLoop goroutine to terminated, and then
- // see if we actually managed to write anything. If not, we
- // can retry the request.
- <-pc.writeLoopDone
- if pc.nwrite == startBytesWritten && req.outgoingLength() == 0 {
- return nothingWrittenError{err}
- }
-
- return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %v", err)
-
-}
-
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
defer func() {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
+ if bre, ok := err.(requestBodyReadError); ok {
+ err = bre.error
+ // Errors reading from the user's
+ // Request.Body are high priority.
+ // Set it here before sending on the
+ // channels below or calling
+ // pc.close() which tears town
+ // connections and causes other
+ // errors.
+ wr.req.setError(err)
+ }
if err == nil {
err = pc.bw.Flush()
}
gone := make(chan struct{})
defer close(gone)
+ defer func() {
+ if err != nil {
+ pc.t.setReqCanceler(req.Request, nil)
+ }
+ }()
+
+ const debugRoundTrip = false
+
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
callerGone: gone,
}
- var re responseAndError
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done()
-WaitResponse:
for {
testHookWaitResLoop()
select {
case err := <-writeErrCh:
+ if debugRoundTrip {
+ req.logf("writeErrCh resv: %T/%#v", err, err)
+ }
if err != nil {
- if cerr := pc.canceled(); cerr != nil {
- err = cerr
- }
- re = responseAndError{err: err}
pc.close(fmt.Errorf("write error: %v", err))
- break WaitResponse
+ return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
+ if debugRoundTrip {
+ req.logf("starting timer for %v", d)
+ }
timer := time.NewTimer(d)
defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C
}
case <-pc.closech:
- re = responseAndError{err: pc.mapRoundTripErrorAfterClosed(req.Request, startBytesWritten)}
- break WaitResponse
+ if debugRoundTrip {
+ req.logf("closech recv: %T %#v", pc.closed, pc.closed)
+ }
+ return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
case <-respHeaderTimer:
+ if debugRoundTrip {
+ req.logf("timeout waiting for response headers.")
+ }
pc.close(errTimeout)
- re = responseAndError{err: errTimeout}
- break WaitResponse
- case re = <-resc:
- re.err = pc.mapRoundTripErrorFromReadLoop(req.Request, startBytesWritten, re.err)
- break WaitResponse
+ return nil, errTimeout
+ case re := <-resc:
+ if (re.res == nil) == (re.err == nil) {
+ panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
+ }
+ if debugRoundTrip {
+ req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
+ }
+ if re.err != nil {
+ return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
+ }
+ return re.res, nil
case <-cancelChan:
pc.t.CancelRequest(req.Request)
cancelChan = nil
ctxDoneChan = nil
}
}
+}
- if re.err != nil {
- pc.t.setReqCanceler(req.Request, nil)
- }
- if (re.res == nil) == (re.err == nil) {
- panic("internal error: exactly one of res or err should be set")
+// tLogKey is a context WithValue key for test debugging contexts containing
+// a t.Logf func. See export_test.go's Request.WithT method.
+type tLogKey struct{}
+
+func (r *transportRequest) logf(format string, args ...interface{}) {
+ if logf, ok := r.Request.Context().Value(tLogKey{}).(func(string, ...interface{})); ok {
+ logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...)
}
- return re.res, re.err
}
// markReused marks this connection as having been successfully used for a