// Immutable; never nil.
server *Server
+ // cancelCtx cancels the connection-level context.
+ cancelCtx context.CancelFunc
+
// rwc is the underlying network connection.
// This is never wrapped by other types and is the value given out
// to CloseNotifier callers. It is usually of type *net.TCPConn or
// mu guards hijackedv, use of bufr, (*response).closeNotifyCh.
mu sync.Mutex
+ curReq atomic.Value // of *response (which has a Request in it)
+
// hijackedv is whether this connection has been hijacked
// by a Handler with the Hijacker interface.
// It is guarded by mu.
if c.hijackedv {
return nil, nil, ErrHijacked
}
+ c.r.abortPendingRead()
+
c.hijackedv = true
rwc = c.rwc
+ rwc.SetDeadline(time.Time{})
+
buf = bufio.NewReadWriter(c.bufr, bufio.NewWriter(rwc))
c.setState(rwc, StateHijacked)
return
dateBuf [len(TimeFormat)]byte
clenBuf [10]byte
- // closeNotifyCh is non-nil once CloseNotify is called.
- // Guarded by conn.mu
- closeNotifyCh <-chan bool
+ // closeNotifyCh is the channel returned by CloseNotify.
+ // TODO(bradfitz): this is currently (for Go 1.8) always
+ // non-nil. Make this lazily-created again as it used to be.
+ closeNotifyCh chan bool
+ didCloseNotify int32 // atomic (only 0->1 winner should send)
}
type atomicBool int32
// call blocked in a background goroutine to wait for activity and
// trigger a CloseNotifier channel.
type connReader struct {
- r io.Reader
- remain int64 // bytes remaining
+ conn *conn
+
+ mu sync.Mutex // guards following
+ hasByte bool
+ byteBuf [1]byte
+ bgErr error // non-nil means error happened on background read
+ cond *sync.Cond
+ inRead bool
+ aborted bool // set true before conn.rwc deadline is set to past
+ remain int64 // bytes remaining
+}
- // ch is non-nil if a background read is in progress.
- // It is guarded by conn.mu.
- ch chan readResult
+func (cr *connReader) lock() {
+ cr.mu.Lock()
+ if cr.cond == nil {
+ cr.cond = sync.NewCond(&cr.mu)
+ }
+}
+
+func (cr *connReader) unlock() { cr.mu.Unlock() }
+
+func (cr *connReader) startBackgroundRead() {
+ cr.lock()
+ defer cr.unlock()
+ if cr.inRead {
+ panic("invalid concurrent Body.Read call")
+ }
+ cr.inRead = true
+ go cr.backgroundRead()
+}
+
+func (cr *connReader) backgroundRead() {
+ n, err := cr.conn.rwc.Read(cr.byteBuf[:])
+ cr.lock()
+ if n == 1 {
+ cr.hasByte = true
+ // We were at EOF already (since we wouldn't be in a
+ // background read otherwise), so this is a pipelined
+ // HTTP request.
+ cr.closeNotifyFromPipelinedRequest()
+ }
+ if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {
+ // Ignore this error. It's the expected error from
+ // another goroutine calling abortPendingRead.
+ } else if err != nil {
+ cr.handleReadError(err)
+ }
+ cr.aborted = false
+ cr.inRead = false
+ cr.unlock()
+ cr.cond.Broadcast()
+}
+
+func (cr *connReader) abortPendingRead() {
+ cr.lock()
+ defer cr.unlock()
+ if !cr.inRead {
+ return
+ }
+ cr.aborted = true
+ cr.conn.rwc.SetReadDeadline(aLongTimeAgo)
+ for cr.inRead {
+ cr.cond.Wait()
+ }
+ cr.conn.rwc.SetReadDeadline(time.Time{})
}
func (cr *connReader) setReadLimit(remain int64) { cr.remain = remain }
func (cr *connReader) setInfiniteReadLimit() { cr.remain = maxInt64 }
func (cr *connReader) hitReadLimit() bool { return cr.remain <= 0 }
+// may be called from multiple goroutines.
+func (cr *connReader) handleReadError(err error) {
+ cr.conn.cancelCtx()
+ cr.closeNotify()
+}
+
+// closeNotifyFromPipelinedRequest simply calls closeNotify.
+//
+// This method wrapper is here for documentation. The callers are the
+// cases where we send on the closenotify channel because of a
+// pipelined HTTP request, per the previous Go behavior and
+// documentation (that this "MAY" happen).
+//
+// TODO: consider changing this behavior and making context
+// cancelation and closenotify work the same.
+func (cr *connReader) closeNotifyFromPipelinedRequest() {
+ cr.closeNotify()
+}
+
+// may be called from multiple goroutines.
+func (cr *connReader) closeNotify() {
+ res, _ := cr.conn.curReq.Load().(*response)
+ if res != nil {
+ if atomic.CompareAndSwapInt32(&res.didCloseNotify, 0, 1) {
+ res.closeNotifyCh <- true
+ }
+ }
+}
+
func (cr *connReader) Read(p []byte) (n int, err error) {
+ cr.lock()
+ if cr.inRead {
+ cr.unlock()
+ panic("invalid concurrent Body.Read call")
+ }
if cr.hitReadLimit() {
+ cr.unlock()
return 0, io.EOF
}
+ if cr.bgErr != nil {
+ err = cr.bgErr
+ cr.unlock()
+ return 0, err
+ }
if len(p) == 0 {
- return
+ cr.unlock()
+ return 0, nil
}
if int64(len(p)) > cr.remain {
p = p[:cr.remain]
}
-
- // Is a background read (started by CloseNotifier) already in
- // flight? If so, wait for it and use its result.
- ch := cr.ch
- if ch != nil {
- cr.ch = nil
- res := <-ch
- if res.n == 1 {
- p[0] = res.b
- cr.remain -= 1
- }
- return res.n, res.err
+ if cr.hasByte {
+ p[0] = cr.byteBuf[0]
+ cr.hasByte = false
+ cr.unlock()
+ return 1, nil
}
- n, err = cr.r.Read(p)
- cr.remain -= int64(n)
- return
-}
+ cr.inRead = true
+ cr.unlock()
+ n, err = cr.conn.rwc.Read(p)
-func (cr *connReader) startBackgroundRead(onReadComplete func()) {
- if cr.ch != nil {
- // Background read already started.
- return
+ cr.lock()
+ cr.inRead = false
+ if err != nil {
+ cr.handleReadError(err)
}
- cr.ch = make(chan readResult, 1)
- go cr.closeNotifyAwaitActivityRead(cr.ch, onReadComplete)
-}
+ cr.remain -= int64(n)
+ cr.unlock()
-func (cr *connReader) closeNotifyAwaitActivityRead(ch chan<- readResult, onReadComplete func()) {
- var buf [1]byte
- n, err := cr.r.Read(buf[:1])
- onReadComplete()
- ch <- readResult{n, err, buf[0]}
+ cr.cond.Broadcast()
+ return n, err
}
var (
reqBody: req.Body,
handlerHeader: make(Header),
contentLength: -1,
+ closeNotifyCh: make(chan bool, 1),
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
w.cw.close()
w.conn.bufw.Flush()
+ w.conn.r.abortPendingRead()
+
// Close the body (regardless of w.closeAfterReply) so we can
// re-use its bufio.Reader later safely.
w.reqBody.Close()
// HTTP/1.x from here on.
- c.r = &connReader{r: c.rwc}
- c.bufr = newBufioReader(c.r)
- c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
-
ctx, cancelCtx := context.WithCancel(ctx)
+ c.cancelCtx = cancelCtx
defer cancelCtx()
+ c.r = &connReader{conn: c}
+ c.bufr = newBufioReader(c.r)
+ c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
+
for {
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
return
}
+ c.curReq.Store(w)
+
+ if requestBodyRemains(req.Body) {
+ registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
+ } else {
+ if w.conn.bufr.Buffered() > 0 {
+ w.conn.r.closeNotifyFromPipelinedRequest()
+ }
+ w.conn.r.startBackgroundRead()
+ }
+
// HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.
+ // But we're not going to implement HTTP pipelining because it
+ // was never deployed in the wild and the answer is HTTP/2.
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
c.setState(c.rwc, StateIdle)
+ c.curReq.Store((*response)(nil))
}
}
c.mu.Lock()
defer c.mu.Unlock()
- if w.closeNotifyCh != nil {
- return nil, nil, errors.New("http: Hijack is incompatible with use of CloseNotifier in same ServeHTTP call")
- }
-
// Release the bufioWriter that writes to the chunk writer, it is not
// used after a connection has been hijacked.
rwc, buf, err = c.hijackLocked()
if w.handlerDone.isSet() {
panic("net/http: CloseNotify called after ServeHTTP finished")
}
- c := w.conn
- c.mu.Lock()
- defer c.mu.Unlock()
-
- if w.closeNotifyCh != nil {
- return w.closeNotifyCh
- }
- ch := make(chan bool, 1)
- w.closeNotifyCh = ch
-
- if w.conn.hijackedv {
- // CloseNotify is undefined after a hijack, but we have
- // no place to return an error, so just return a channel,
- // even though it'll never receive a value.
- return ch
- }
-
- var once sync.Once
- notify := func() { once.Do(func() { ch <- true }) }
-
- if requestBodyRemains(w.reqBody) {
- // They're still consuming the request body, so we
- // shouldn't notify yet.
- registerOnHitEOF(w.reqBody, func() {
- c.mu.Lock()
- defer c.mu.Unlock()
- startCloseNotifyBackgroundRead(c, notify)
- })
- } else {
- startCloseNotifyBackgroundRead(c, notify)
- }
- return ch
-}
-
-// c.mu must be held.
-func startCloseNotifyBackgroundRead(c *conn, notify func()) {
- if c.bufr.Buffered() > 0 {
- // They've consumed the request body, so anything
- // remaining is a pipelined request, which we
- // document as firing on.
- notify()
- } else {
- c.r.startBackgroundRead(notify)
- }
+ return w.closeNotifyCh
}
func registerOnHitEOF(rc io.ReadCloser, fn func()) {
n, err = w.c.rwc.Write(p)
if err != nil && w.c.werr == nil {
w.c.werr = err
+ w.c.cancelCtx()
}
return
}