inflow http2flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
+ stopReqBody bool // stop writing req body; guarded by cc.mu
peerReset chan struct{} // closed on peer reset
resetErr error // populated before peerReset is closed
}
}
+func (cs *http2clientStream) abortRequestBodyWrite() {
+ cc := cs.cc
+ cc.mu.Lock()
+ cs.stopReqBody = true
+ cc.cond.Broadcast()
+ cc.mu.Unlock()
+}
+
type http2stickyErrWriter struct {
w io.Writer
err *error
return nil, werr
}
- var bodyCopyErrc chan error
- var gotResHeaders chan struct{} // closed on resheaders
+ var bodyCopyErrc chan error // result of body copy
if hasBody {
bodyCopyErrc = make(chan error, 1)
- gotResHeaders = make(chan struct{})
go func() {
- bodyCopyErrc <- cs.writeRequestBody(req.Body, gotResHeaders)
+ bodyCopyErrc <- cs.writeRequestBody(req.Body)
}()
}
for {
select {
case re := <-cs.resc:
- if gotResHeaders != nil {
- close(gotResHeaders)
+ res := re.res
+ if re.err != nil || res.StatusCode > 299 {
+
+ cs.abortRequestBodyWrite()
}
if re.err != nil {
return nil, re.err
}
- res := re.res
res.Request = req
res.TLS = cc.tlsState
return res, nil
}
}
-var http2errServerResponseBeforeRequestBody = errors.New("http2: server sent response while still writing request body")
+// errAbortReqBodyWrite is an internal error value.
+// It doesn't escape to callers.
+var http2errAbortReqBodyWrite = errors.New("http2: aborting request body write")
-func (cs *http2clientStream) writeRequestBody(body io.Reader, gotResHeaders <-chan struct{}) error {
+func (cs *http2clientStream) writeRequestBody(body io.ReadCloser) (err error) {
cc := cs.cc
sentEnd := false
buf := cc.frameScratchBuffer()
defer cc.putFrameScratchBuffer(buf)
- for !sentEnd {
- var sawEOF bool
- n, err := io.ReadFull(body, buf)
- if err == io.ErrUnexpectedEOF {
+ defer func() {
+
+ cerr := body.Close()
+ if err == nil {
+ err = cerr
+ }
+ }()
+
+ var sawEOF bool
+ for !sawEOF {
+ n, err := body.Read(buf)
+ if err == io.EOF {
sawEOF = true
err = nil
- } else if err == io.EOF {
- break
} else if err != nil {
return err
}
- toWrite := buf[:n]
- for len(toWrite) > 0 && err == nil {
+ remain := buf[:n]
+ for len(remain) > 0 && err == nil {
var allowed int32
- allowed, err = cs.awaitFlowControl(int32(len(toWrite)))
+ allowed, err = cs.awaitFlowControl(len(remain))
if err != nil {
return err
}
-
cc.wmu.Lock()
- select {
- case <-gotResHeaders:
- err = http2errServerResponseBeforeRequestBody
- case <-cs.peerReset:
- err = cs.resetErr
- default:
- data := toWrite[:allowed]
- toWrite = toWrite[allowed:]
- sentEnd = sawEOF && len(toWrite) == 0
- err = cc.fr.WriteData(cs.ID, sentEnd, data)
+ data := remain[:allowed]
+ remain = remain[allowed:]
+ sentEnd = sawEOF && len(remain) == 0
+ err = cc.fr.WriteData(cs.ID, sentEnd, data)
+ if err == nil {
+
+ err = cc.bw.Flush()
}
cc.wmu.Unlock()
}
}
}
- var err error
-
cc.wmu.Lock()
if !sentEnd {
err = cc.fr.WriteData(cs.ID, true, nil)
// control tokens from the server.
// It returns either the non-zero number of tokens taken or an error
// if the stream is dead.
-func (cs *http2clientStream) awaitFlowControl(maxBytes int32) (taken int32, err error) {
+func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.closed {
return 0, http2errClientConnClosed
}
+ if cs.stopReqBody {
+ return 0, http2errAbortReqBodyWrite
+ }
if err := cs.checkReset(); err != nil {
return 0, err
}
if a := cs.flow.available(); a > 0 {
take := a
- if take > maxBytes {
- take = maxBytes
+ if int(take) > maxBytes {
+
+ take = int32(maxBytes)
}
if take > int32(cc.maxFrameSize) {
take = int32(cc.maxFrameSize)
cs.resetErr = err
close(cs.peerReset)
cs.bufPipe.CloseWithError(err)
+ cs.cc.cond.Broadcast()
}
delete(rl.activeRes, cs.ID)
return nil