"os"
"strings"
"sync"
+ "time"
)
// DefaultTransport is the default implementation of Transport and is
pconn.close()
return false
}
+ for _, exist := range t.idleConn[key] {
+ if exist == pconn {
+ log.Fatalf("dup idle pconn %p in freelist", pconn)
+ }
+ }
t.idleConn[key] = append(t.idleConn[key], pconn)
return true
}
return
}
}
- return
+ panic("unreachable")
}
func (t *Transport) dial(network, addr string) (c net.Conn, err error) {
conn: conn,
reqch: make(chan requestAndChan, 50),
writech: make(chan writeRequest, 50),
+ closech: make(chan struct{}),
}
switch {
bw *bufio.Writer // to conn
reqch chan requestAndChan // written by roundTrip; read by readLoop
writech chan writeRequest // written by roundTrip; read by writeLoop
+ closech chan struct{} // broadcast close when readLoop (TCP connection) closes
isProxy bool
// mutateHeaderFunc is an optional func to modify extra
}
func (pc *persistConn) readLoop() {
+ defer close(pc.closech)
defer close(pc.writech)
alive := true
var lastbody io.ReadCloser // last response body, if any, read on this connection
lastbody.Close() // assumed idempotent
lastbody = nil
}
- resp, err := ReadResponse(pc.br, rc.req)
+
+ var resp *Response
+ if err == nil {
+ resp, err = ReadResponse(pc.br, rc.req)
+ }
if err != nil {
pc.close()
var waitForBodyRead chan bool
if hasBody {
lastbody = resp.Body
- waitForBodyRead = make(chan bool)
+ waitForBodyRead = make(chan bool, 1)
resp.Body.(*bodyEOFSignal).fn = func() {
if alive && !pc.t.putIdleConn(pc) {
alive = false
pc.reqch <- requestAndChan{req.Request, resc, requestedGzip}
var re responseAndError
+ var pconnDeadCh = pc.closech
+ var failTicker <-chan time.Time
WaitResponse:
for {
select {
re = responseAndError{nil, err}
break WaitResponse
}
+ case <-pconnDeadCh:
+ // The persist connection is dead. This shouldn't
+ // usually happen (only with Connection: close responses
+ // with no response bodies), but if it does happen it
+ // means either a) the remote server hung up on us
+ // prematurely, or b) the readLoop sent us a response &
+ // closed its closech at roughly the same time, and we
+ // selected this case first, in which case a response
+ // might still be coming soon.
+ //
+ // We can't avoid the select race in b) by using a unbuffered
+ // resc channel instead, because then goroutines can
+ // leak if we exit due to other errors.
+ pconnDeadCh = nil // avoid spinning
+ failTicker = time.After(100 * time.Millisecond) // arbitrary time to wait for resc
+ case <-failTicker:
+ re = responseAndError{nil, errors.New("net/http: transport closed before response was received")}
+ break WaitResponse
case re = <-resc:
break WaitResponse
}
body io.ReadCloser
fn func()
isClosed bool
+ once sync.Once
}
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
if es.isClosed && n > 0 {
panic("http: unexpected bodyEOFSignal Read after Close; see issue 1725")
}
- if err == io.EOF && es.fn != nil {
- es.fn()
- es.fn = nil
+ if err == io.EOF {
+ es.condfn()
}
return
}
}
es.isClosed = true
err = es.body.Close()
- if err == nil && es.fn != nil {
- es.fn()
- es.fn = nil
+ if err == nil {
+ es.condfn()
}
return
}
+func (es *bodyEOFSignal) condfn() {
+ if es.fn != nil {
+ es.once.Do(es.fn)
+ }
+}
+
type readFirstCloseBoth struct {
io.ReadCloser
io.Closer
}
}
+func TestTransportConcurrency(t *testing.T) {
+ const maxProcs = 16
+ const numReqs = 500
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(maxProcs))
+ ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) {
+ fmt.Fprintf(w, "%v", r.FormValue("echo"))
+ }))
+ defer ts.Close()
+ tr := &Transport{}
+ c := &Client{Transport: tr}
+ reqs := make(chan string)
+ defer close(reqs)
+
+ var wg sync.WaitGroup
+ wg.Add(numReqs)
+ for i := 0; i < maxProcs*2; i++ {
+ go func() {
+ for req := range reqs {
+ res, err := c.Get(ts.URL + "/?echo=" + req)
+ if err != nil {
+ t.Errorf("error on req %s: %v", req, err)
+ wg.Done()
+ continue
+ }
+ all, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ t.Errorf("read error on req %s: %v", req, err)
+ wg.Done()
+ continue
+ }
+ if string(all) != req {
+ t.Errorf("body of req %s = %q; want %q", req, all, req)
+ }
+ wg.Done()
+ res.Body.Close()
+ }
+ }()
+ }
+ for i := 0; i < numReqs; i++ {
+ reqs <- fmt.Sprintf("request-%d", i)
+ }
+ wg.Wait()
+}
+
type fooProto struct{}
func (fooProto) RoundTrip(req *Request) (*Response, error) {