}
pconn.br = bufio.NewReader(pconn.conn)
- pconn.cc = NewClientConn(conn, pconn.br)
+ pconn.bw = bufio.NewWriter(pconn.conn)
go pconn.readLoop()
return pconn, nil
}
t *Transport
cacheKey string // its connectMethod.String()
conn net.Conn
- cc *ClientConn
- br *bufio.Reader
+ br *bufio.Reader // from conn
+ bw *bufio.Writer // to conn
reqch chan requestAndChan // written by roundTrip(); read by readLoop()
isProxy bool
func (pc *persistConn) readLoop() {
alive := true
+ var lastbody io.ReadCloser // last response body, if any, read on this connection
+
for alive {
pb, err := pc.br.Peek(1)
if err != nil {
}
rc := <-pc.reqch
- resp, err := pc.cc.readUsing(rc.req, func(buf *bufio.Reader, forReq *Request) (*Response, error) {
- resp, err := ReadResponse(buf, forReq)
- if err != nil || resp.ContentLength == 0 {
- return resp, err
- }
+
+ // Advance past the previous response's body, if the
+ // caller hasn't done so.
+ if lastbody != nil {
+ lastbody.Close() // assumed idempotent
+ lastbody = nil
+ }
+ resp, err := ReadResponse(pc.br, rc.req)
+
+ if err == nil {
if rc.addedGzip && resp.Header.Get("Content-Encoding") == "gzip" {
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
- gzReader, err := gzip.NewReader(resp.Body)
+ gzReader, zerr := gzip.NewReader(resp.Body)
if err != nil {
pc.close()
- return nil, err
+ err = zerr
+ } else {
+ resp.Body = &readFirstCloseBoth{&discardOnCloseReadCloser{gzReader}, resp.Body}
}
- resp.Body = &readFirstCloseBoth{&discardOnCloseReadCloser{gzReader}, resp.Body}
}
resp.Body = &bodyEOFSignal{body: resp.Body}
- return resp, err
- })
+ }
- if err == ErrPersistEOF {
- // Succeeded, but we can't send any more
- // persistent connections on this again. We
- // hide this error to upstream callers.
- alive = false
- err = nil
- } else if err != nil || rc.req.Close {
+ if err != nil || resp.Close || rc.req.Close {
alive = false
}
var waitForBodyRead chan bool
if alive {
if hasBody {
+ lastbody = resp.Body
waitForBodyRead = make(chan bool)
resp.Body.(*bodyEOFSignal).fn = func() {
pc.t.putIdleConn(pc)
// loop, otherwise it might close the body
// before the client code has had a chance to
// read it (even though it'll just be 0, EOF).
- pc.cc.lk.Lock()
- pc.cc.lastbody = nil
- pc.cc.lk.Unlock()
+ lastbody = nil
pc.t.putIdleConn(pc)
}
pc.numExpectedResponses++
pc.lk.Unlock()
- pc.cc.writeReq = func(r *Request, w io.Writer) error {
- return r.write(w, pc.isProxy, req.extra)
- }
-
- err = pc.cc.Write(req.Request)
+ err = req.Request.write(pc.bw, pc.isProxy, req.extra)
if err != nil {
pc.close()
return
}
+ pc.bw.Flush()
ch := make(chan responseAndError, 1)
pc.reqch <- requestAndChan{req.Request, ch, requestedGzip}
pc.lk.Lock()
defer pc.lk.Unlock()
pc.broken = true
- pc.cc.Close()
pc.conn.Close()
pc.mutateHeaderFunc = nil
}