From bef4cb475c0638ab5193f75f2683b35a7c7f6547 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 11 Jan 2013 10:03:43 -0800 Subject: [PATCH] net/http: buffer before chunking This introduces a buffer between writing from a handler and writing chunks. Further, it delays writing the header until the first full chunk is ready. In the case where the first full chunk is also the final chunk (for small responses), that means we can also compute a Content-Length, which is a nice side effect for certain benchmarks. Fixes #2357 R=golang-dev, dave, minux.ma, rsc, adg, balasanjay CC=golang-dev https://golang.org/cl/6964043 --- src/pkg/net/http/header.go | 10 + src/pkg/net/http/serve_test.go | 16 +- src/pkg/net/http/server.go | 394 +++++++++++++++++++-------------- 3 files changed, 249 insertions(+), 171 deletions(-) diff --git a/src/pkg/net/http/header.go b/src/pkg/net/http/header.go index 91417366ae..f479b7b4eb 100644 --- a/src/pkg/net/http/header.go +++ b/src/pkg/net/http/header.go @@ -54,6 +54,16 @@ func (h Header) Write(w io.Writer) error { return h.WriteSubset(w, nil) } +func (h Header) clone() Header { + h2 := make(Header, len(h)) + for k, vv := range h { + vv2 := make([]string, len(vv)) + copy(vv2, vv) + h2[k] = vv2 + } + return h2 +} + var timeFormats = []string{ TimeFormat, time.RFC850, diff --git a/src/pkg/net/http/serve_test.go b/src/pkg/net/http/serve_test.go index 1de4171239..96d442b623 100644 --- a/src/pkg/net/http/serve_test.go +++ b/src/pkg/net/http/serve_test.go @@ -484,6 +484,7 @@ func TestChunkedResponseHeaders(t *testing.T) { ts := httptest.NewServer(HandlerFunc(func(w ResponseWriter, r *Request) { w.Header().Set("Content-Length", "intentional gibberish") // we check that this is deleted + w.(Flusher).Flush() fmt.Fprintf(w, "I am a chunked response.") })) defer ts.Close() @@ -764,6 +765,7 @@ func TestServerUnreadRequestBodyLittle(t *testing.T) { t.Errorf("on request, read buffer length is %d; expected about 100 KB", conn.readBuf.Len()) } rw.WriteHeader(200) + rw.(Flusher).Flush() if g, e := conn.readBuf.Len(), 0; g != e { t.Errorf("after WriteHeader, read buffer length is %d; want %d", g, e) } @@ -796,14 +798,16 @@ func TestServerUnreadRequestBodyLarge(t *testing.T) { t.Errorf("on request, read buffer length is %d; expected about 1MB", conn.readBuf.Len()) } rw.WriteHeader(200) + rw.(Flusher).Flush() if conn.readBuf.Len() < len(body)/2 { t.Errorf("post-WriteHeader, read buffer length is %d; expected about 1MB", conn.readBuf.Len()) } - if c := rw.Header().Get("Connection"); c != "close" { - t.Errorf(`Connection header = %q; want "close"`, c) - } })) <-done + + if res := conn.writeBuf.String(); !strings.Contains(res, "Connection: close") { + t.Errorf("Expected a Connection: close header; got response: %s", res) + } } func TestTimeoutHandler(t *testing.T) { @@ -1144,17 +1148,13 @@ func TestClientWriteShutdown(t *testing.T) { // Tests that chunked server responses that write 1 byte at a time are // buffered before chunk headers are added, not after chunk headers. func TestServerBufferedChunking(t *testing.T) { - if true { - t.Logf("Skipping known broken test; see Issue 2357") - return - } conn := new(testConn) conn.readBuf.Write([]byte("GET / HTTP/1.1\r\n\r\n")) done := make(chan bool) ls := &oneConnListener{conn} go Serve(ls, HandlerFunc(func(rw ResponseWriter, req *Request) { defer close(done) - rw.Header().Set("Content-Type", "text/plain") // prevent sniffing, which buffers + rw.(Flusher).Flush() // force the Header to be sent, in chunking mode, not counting the length rw.Write([]byte{'x'}) rw.Write([]byte{'y'}) rw.Write([]byte{'z'}) diff --git a/src/pkg/net/http/server.go b/src/pkg/net/http/server.go index 721be80293..e7b868557d 100644 --- a/src/pkg/net/http/server.go +++ b/src/pkg/net/http/server.go @@ -113,7 +113,6 @@ type conn struct { lr *io.LimitedReader // io.LimitReader(sr) buf *bufio.ReadWriter // buffered(lr,rwc), reading from bufio->limitReader->sr->rwc tlsState *tls.ConnectionState // or nil when not using TLS - body []byte mu sync.Mutex // guards the following clientGone bool // if client has disconnected mid-request @@ -193,18 +192,85 @@ func (sr *switchReader) Read(p []byte) (n int, err error) { return r.Read(p) } +// This should be >= 512 bytes for DetectContentType, +// but otherwise it's somewhat arbitrary. +const bufferBeforeChunkingSize = 2048 + +// chunkWriter writes to a response's conn buffer, and is the writer +// wrapped by the response.bufw buffered writer. +// +// chunkWriter also is responsible for finalizing the Header, including +// conditionally setting the Content-Type and setting a Content-Length +// in cases where the handler's final output is smaller than the buffer +// size. It also conditionally adds chunk headers, when in chunking mode. +// +// See the comment above (*response).Write for the entire write flow. +type chunkWriter struct { + res *response + header Header // a deep copy of r.Header, once WriteHeader is called + wroteHeader bool // whether the header's been sent + + // set by the writeHeader method: + chunking bool // using chunked transfer encoding for reply body +} + +var crlf = []byte("\r\n") + +func (cw *chunkWriter) Write(p []byte) (n int, err error) { + if !cw.wroteHeader { + cw.writeHeader(p) + } + if cw.chunking { + _, err = fmt.Fprintf(cw.res.conn.buf, "%x\r\n", len(p)) + if err != nil { + return + } + } + n, err = cw.res.conn.buf.Write(p) + if cw.chunking && err == nil { + _, err = cw.res.conn.buf.Write(crlf) + } + return +} + +func (cw *chunkWriter) flush() { + if !cw.wroteHeader { + cw.writeHeader(nil) + } + cw.res.conn.buf.Flush() +} + +func (cw *chunkWriter) close() { + if !cw.wroteHeader { + cw.writeHeader(nil) + } + if cw.chunking { + // zero EOF chunk, trailer key/value pairs (currently + // unsupported in Go's server), followed by a blank + // line. + io.WriteString(cw.res.conn.buf, "0\r\n\r\n") + } +} + // A response represents the server side of an HTTP response. type response struct { conn *conn req *Request // request for this response - chunking bool // using chunked transfer encoding for reply body - wroteHeader bool // reply header has been written + wroteHeader bool // reply header has been (logically) written wroteContinue bool // 100 Continue response was written - header Header // reply header parameters - written int64 // number of bytes written in body - contentLength int64 // explicitly-declared Content-Length; or -1 - status int // status code passed to WriteHeader - needSniff bool // need to sniff to find Content-Type + + w *bufio.Writer // buffers output in chunks to chunkWriter + cw *chunkWriter + + // handlerHeader is the Header that Handlers get access to, + // which may be retained and mutated even after WriteHeader. + // handlerHeader is copied into cw.header at WriteHeader + // time, and privately mutated thereafter. + handlerHeader Header + + written int64 // number of bytes written in body + contentLength int64 // explicitly-declared Content-Length; or -1 + status int // status code passed to WriteHeader // close connection after this reply. set on request and // updated after response from handler if there's a @@ -220,6 +286,8 @@ type response struct { // subsequent requests on this connection and stop reading // input from it. requestBodyLimitHit bool + + handlerDone bool // set true when the handler exits } // requestTooLarge is called by maxBytesReader when too much input has @@ -232,27 +300,46 @@ func (w *response) requestTooLarge() { } } +// needsSniff returns whether a Content-Type still needs to be sniffed. +func (w *response) needsSniff() bool { + return !w.cw.wroteHeader && w.handlerHeader.Get("Content-Type") == "" && w.written < sniffLen +} + type writerOnly struct { io.Writer } func (w *response) ReadFrom(src io.Reader) (n int64, err error) { - // Call WriteHeader before checking w.chunking if it hasn't - // been called yet, since WriteHeader is what sets w.chunking. if !w.wroteHeader { w.WriteHeader(StatusOK) } - if !w.chunking && w.bodyAllowed() && !w.needSniff { - w.Flush() + + if w.needsSniff() { + n0, err := io.Copy(writerOnly{w}, io.LimitReader(src, sniffLen)) + n += n0 + if err != nil { + return n, err + } + } + + w.w.Flush() // get rid of any previous writes + w.cw.flush() // make sure Header is written; flush data to rwc + + // Now that cw has been flushed, its chunking field is guaranteed initialized. + if !w.cw.chunking && w.bodyAllowed() { if rf, ok := w.conn.rwc.(io.ReaderFrom); ok { - n, err = rf.ReadFrom(src) - w.written += n - return + n0, err := rf.ReadFrom(src) + n += n0 + w.written += n0 + return n, err } } + // Fall back to default io.Copy implementation. // Use wrapper to hide w.ReadFrom from io.Copy. - return io.Copy(writerOnly{w}, src) + n0, err := io.Copy(writerOnly{w}, src) + n += n0 + return n, err } // noLimit is an effective infinite upper bound for io.LimitedReader @@ -272,7 +359,6 @@ func (srv *Server) newConn(rwc net.Conn) (c *conn, err error) { c.rwc = newLoggingConn("server", c.rwc) } c.sr = switchReader{r: c.rwc} - c.body = make([]byte, sniffLen) c.lr = io.LimitReader(&c.sr, noLimit).(*io.LimitedReader) br := bufio.NewReader(c.lr) bw := bufio.NewWriter(c.rwc) @@ -343,17 +429,20 @@ func (c *conn) readRequest() (w *response, err error) { req.RemoteAddr = c.remoteAddr req.TLS = c.tlsState - w = new(response) - w.conn = c - w.req = req - w.header = make(Header) - w.contentLength = -1 - c.body = c.body[:0] + w = &response{ + conn: c, + req: req, + handlerHeader: make(Header), + contentLength: -1, + cw: new(chunkWriter), + } + w.cw.res = w + w.w = bufio.NewWriterSize(w.cw, bufferBeforeChunkingSize) return w, nil } func (w *response) Header() Header { - return w.header + return w.handlerHeader } // maxPostHandlerReadBytes is the max number of Request.Body bytes not @@ -379,30 +468,68 @@ func (w *response) WriteHeader(code int) { w.wroteHeader = true w.status = code - // Check for a explicit (and valid) Content-Length header. - var hasCL bool - var contentLength int64 - if clenStr := w.header.get("Content-Length"); clenStr != "" { - var err error - contentLength, err = strconv.ParseInt(clenStr, 10, 64) - if err == nil { - hasCL = true + w.cw.header = w.handlerHeader.clone() + + if cl := w.cw.header.get("Content-Length"); cl != "" { + v, err := strconv.ParseInt(cl, 10, 64) + if err == nil && v >= 0 { + w.contentLength = v } else { - log.Printf("http: invalid Content-Length of %q sent", clenStr) - w.header.Del("Content-Length") + log.Printf("http: invalid Content-Length of %q", cl) + w.cw.header.Del("Content-Length") + } + } +} + +// writeHeader finalizes the header sent to the client and writes it +// to cw.res.conn.buf. +// +// p is not written by writeHeader, but is the first chunk of the body +// that will be written. It is sniffed for a Content-Type if none is +// set explicitly. It's also used to set the Content-Length, if the +// total body size was small and the handler has already finished +// running. +func (cw *chunkWriter) writeHeader(p []byte) { + if cw.wroteHeader { + return + } + cw.wroteHeader = true + + w := cw.res + code := w.status + done := w.handlerDone + + // If the handler is done but never sent a Content-Length + // response header and this is our first (and last) write, set + // it, even to zero. This helps HTTP/1.0 clients keep their + // "keep-alive" connections alive. + if done && cw.header.get("Content-Length") == "" && w.req.Method != "HEAD" { + w.contentLength = int64(len(p)) + cw.header.Set("Content-Length", strconv.Itoa(len(p))) + } + + // If this was an HTTP/1.0 request with keep-alive and we sent a + // Content-Length back, we can make this a keep-alive response ... + if w.req.wantsHttp10KeepAlive() { + sentLength := cw.header.get("Content-Length") != "" + if sentLength && cw.header.get("Connection") == "keep-alive" { + w.closeAfterReply = false } } + // Check for a explicit (and valid) Content-Length header. + hasCL := w.contentLength != -1 + if w.req.wantsHttp10KeepAlive() && (w.req.Method == "HEAD" || hasCL) { - _, connectionHeaderSet := w.header["Connection"] + _, connectionHeaderSet := cw.header["Connection"] if !connectionHeaderSet { - w.header.Set("Connection", "keep-alive") + cw.header.Set("Connection", "keep-alive") } } else if !w.req.ProtoAtLeast(1, 1) || w.req.wantsClose() { w.closeAfterReply = true } - if w.header.get("Connection") == "close" { + if cw.header.get("Connection") == "close" { w.closeAfterReply = true } @@ -416,7 +543,7 @@ func (w *response) WriteHeader(code int) { n, _ := io.CopyN(ioutil.Discard, w.req.Body, maxPostHandlerReadBytes+1) if n >= maxPostHandlerReadBytes { w.requestTooLarge() - w.header.Set("Connection", "close") + cw.header.Set("Connection", "close") } else { w.req.Body.Close() } @@ -426,69 +553,65 @@ func (w *response) WriteHeader(code int) { if code == StatusNotModified { // Must not have body. for _, header := range []string{"Content-Type", "Content-Length", "Transfer-Encoding"} { - if w.header.get(header) != "" { - // TODO: return an error if WriteHeader gets a return parameter - // or set a flag on w to make future Writes() write an error page? - // for now just log and drop the header. - log.Printf("http: StatusNotModified response with header %q defined", header) - w.header.Del(header) + // RFC 2616 section 10.3.5: "the response MUST NOT include other entity-headers" + if cw.header.get(header) != "" { + cw.header.Del(header) } } } else { // If no content type, apply sniffing algorithm to body. - if w.header.get("Content-Type") == "" && w.req.Method != "HEAD" { - w.needSniff = true + if cw.header.get("Content-Type") == "" && w.req.Method != "HEAD" { + cw.header.Set("Content-Type", DetectContentType(p)) } } - if _, ok := w.header["Date"]; !ok { - w.Header().Set("Date", time.Now().UTC().Format(TimeFormat)) + if _, ok := cw.header["Date"]; !ok { + cw.header.Set("Date", time.Now().UTC().Format(TimeFormat)) } - te := w.header.get("Transfer-Encoding") + te := cw.header.get("Transfer-Encoding") hasTE := te != "" if hasCL && hasTE && te != "identity" { // TODO: return an error if WriteHeader gets a return parameter // For now just ignore the Content-Length. log.Printf("http: WriteHeader called with both Transfer-Encoding of %q and a Content-Length of %d", - te, contentLength) - w.header.Del("Content-Length") + te, w.contentLength) + cw.header.Del("Content-Length") hasCL = false } if w.req.Method == "HEAD" || code == StatusNotModified { // do nothing } else if code == StatusNoContent { - w.header.Del("Transfer-Encoding") + cw.header.Del("Transfer-Encoding") } else if hasCL { - w.contentLength = contentLength - w.header.Del("Transfer-Encoding") + cw.header.Del("Transfer-Encoding") } else if w.req.ProtoAtLeast(1, 1) { // HTTP/1.1 or greater: use chunked transfer encoding // to avoid closing the connection at EOF. // TODO: this blows away any custom or stacked Transfer-Encoding they // might have set. Deal with that as need arises once we have a valid // use case. - w.chunking = true - w.header.Set("Transfer-Encoding", "chunked") + cw.chunking = true + cw.header.Set("Transfer-Encoding", "chunked") } else { // HTTP version < 1.1: cannot do chunked transfer // encoding and we don't know the Content-Length so // signal EOF by closing connection. w.closeAfterReply = true - w.header.Del("Transfer-Encoding") // in case already set + cw.header.Del("Transfer-Encoding") // in case already set } // Cannot use Content-Length with non-identity Transfer-Encoding. - if w.chunking { - w.header.Del("Content-Length") + if cw.chunking { + cw.header.Del("Content-Length") } if !w.req.ProtoAtLeast(1, 0) { return } - if w.closeAfterReply && !hasToken(w.header.get("Connection"), "close") { - w.header.Set("Connection", "close") + if w.closeAfterReply && !hasToken(cw.header.get("Connection"), "close") { + cw.header.Set("Connection", "close") } proto := "HTTP/1.0" @@ -501,37 +624,8 @@ func (w *response) WriteHeader(code int) { text = "status code " + codestring } io.WriteString(w.conn.buf, proto+" "+codestring+" "+text+"\r\n") - w.header.Write(w.conn.buf) - - // If we need to sniff the body, leave the header open. - // Otherwise, end it here. - if !w.needSniff { - io.WriteString(w.conn.buf, "\r\n") - } -} - -// sniff uses the first block of written data, -// stored in w.conn.body, to decide the Content-Type -// for the HTTP body. -func (w *response) sniff() { - if !w.needSniff { - return - } - w.needSniff = false - - data := w.conn.body - fmt.Fprintf(w.conn.buf, "Content-Type: %s\r\n\r\n", DetectContentType(data)) - - if len(data) == 0 { - return - } - if w.chunking { - fmt.Fprintf(w.conn.buf, "%x\r\n", len(data)) - } - _, err := w.conn.buf.Write(data) - if w.chunking && err == nil { - io.WriteString(w.conn.buf, "\r\n") - } + cw.header.Write(w.conn.buf) + w.conn.buf.Write(crlf) } // bodyAllowed returns true if a Write is allowed for this response type. @@ -543,6 +637,38 @@ func (w *response) bodyAllowed() bool { return w.status != StatusNotModified && w.req.Method != "HEAD" } +// The Life Of A Write is like this: +// +// Handler starts. No header has been sent. The handler can either +// write a header, or just start writing. Writing before sending a header +// sends an implicity empty 200 OK header. +// +// If the handler didn't declare a Content-Length up front, we either +// go into chunking mode or, if the handler finishes running before +// the chunking buffer size, we compute a Content-Length and send that +// in the header instead. +// +// Likewise, if the handler didn't set a Content-Type, we sniff that +// from the initial chunk of output. +// +// The Writers are wired together like: +// +// 1. *response (the ResponseWriter) -> +// 2. (*response).w, a *bufio.Writer of bufferBeforeChunkingSize bytes +// 3. chunkWriter.Writer (whose writeHeader finalizes Content-Length/Type) +// and which writes the chunk headers, if needed. +// 4. conn.buf, a bufio.Writer of default (4kB) bytes +// 5. the rwc, the net.Conn. +// +// TODO(bradfitz): short-circuit some of the buffering when the +// initial header contains both a Content-Type and Content-Length. +// Also short-circuit in (1) when the header's been sent and not in +// chunking mode, writing directly to (4) instead, if (2) has no +// buffered data. More generally, we could short-circuit from (1) to +// (3) even in chunking mode if the write size from (1) is over some +// threshold and nothing is in (2). The answer might be mostly making +// bufferBeforeChunkingSize smaller and having bufio's fast-paths deal +// with this instead. func (w *response) Write(data []byte) (n int, err error) { if w.conn.hijacked() { log.Print("http: response.Write on hijacked connection") @@ -562,81 +688,20 @@ func (w *response) Write(data []byte) (n int, err error) { if w.contentLength != -1 && w.written > w.contentLength { return 0, ErrContentLength } - - var m int - if w.needSniff { - // We need to sniff the beginning of the output to - // determine the content type. Accumulate the - // initial writes in w.conn.body. - // Cap m so that append won't allocate. - m = cap(w.conn.body) - len(w.conn.body) - if m > len(data) { - m = len(data) - } - w.conn.body = append(w.conn.body, data[:m]...) - data = data[m:] - if len(data) == 0 { - // Copied everything into the buffer. - // Wait for next write. - return m, nil - } - - // Filled the buffer; more data remains. - // Sniff the content (flushes the buffer) - // and then proceed with the remainder - // of the data as a normal Write. - // Calling sniff clears needSniff. - w.sniff() - } - - // TODO(rsc): if chunking happened after the buffering, - // then there would be fewer chunk headers. - // On the other hand, it would make hijacking more difficult. - if w.chunking { - fmt.Fprintf(w.conn.buf, "%x\r\n", len(data)) - } - n, err = w.conn.buf.Write(data) - if err == nil && w.chunking { - if n != len(data) { - err = io.ErrShortWrite - } - if err == nil { - io.WriteString(w.conn.buf, "\r\n") - } - } - - return m + n, err + return w.w.Write(data) } func (w *response) finishRequest() { - // If the handler never wrote any bytes and never sent a Content-Length - // response header, set the length explicitly to zero. This helps - // HTTP/1.0 clients keep their "keep-alive" connections alive, and for - // HTTP/1.1 clients is just as good as the alternative: sending a - // chunked response and immediately sending the zero-length EOF chunk. - if w.written == 0 && w.header.get("Content-Length") == "" && w.req.Method != "HEAD" { - w.header.Set("Content-Length", "0") - } - // If this was an HTTP/1.0 request with keep-alive and we sent a - // Content-Length back, we can make this a keep-alive response ... - if w.req.wantsHttp10KeepAlive() { - sentLength := w.header.get("Content-Length") != "" - if sentLength && w.header.get("Connection") == "keep-alive" { - w.closeAfterReply = false - } - } + w.handlerDone = true + if !w.wroteHeader { w.WriteHeader(StatusOK) } - if w.needSniff { - w.sniff() - } - if w.chunking { - io.WriteString(w.conn.buf, "0\r\n") - // trailer key/value pairs, followed by blank line - io.WriteString(w.conn.buf, "\r\n") - } + + w.w.Flush() + w.cw.close() w.conn.buf.Flush() + // Close the body, unless we're about to close the whole TCP connection // anyway. if !w.closeAfterReply { @@ -646,7 +711,7 @@ func (w *response) finishRequest() { w.req.MultipartForm.RemoveAll() } - if w.contentLength != -1 && w.contentLength != w.written { + if w.contentLength != -1 && w.bodyAllowed() && w.contentLength != w.written { // Did not write enough. Avoid getting out of sync. w.closeAfterReply = true } @@ -656,8 +721,8 @@ func (w *response) Flush() { if !w.wroteHeader { w.WriteHeader(StatusOK) } - w.sniff() - w.conn.buf.Flush() + w.w.Flush() + w.cw.flush() } func (c *conn) finalFlush() { @@ -809,6 +874,9 @@ func (w *response) sendExpectationFailed() { // Hijack implements the Hijacker.Hijack method. Our response is both a ResponseWriter // and a Hijacker. func (w *response) Hijack() (rwc net.Conn, buf *bufio.ReadWriter, err error) { + if w.wroteHeader { + w.cw.flush() + } return w.conn.hijack() } -- 2.48.1