]> Cypherpunks repositories - gostls13.git/commitdiff
http: support for periodic flushing in ReverseProxy
authorBrad Fitzpatrick <bradfitz@golang.org>
Tue, 12 Jul 2011 03:56:21 +0000 (20:56 -0700)
committerBrad Fitzpatrick <bradfitz@golang.org>
Tue, 12 Jul 2011 03:56:21 +0000 (20:56 -0700)
Fixes #2012

R=golang-dev, dsymonds
CC=golang-dev
https://golang.org/cl/4662091

src/pkg/http/reverseproxy.go

index e4ce1e34c79b54081237817a5b1949fe21204407..015f87f246a4dd8e750752b449d71266ff0e0886 100644 (file)
@@ -10,7 +10,10 @@ import (
        "io"
        "log"
        "net"
+       "os"
        "strings"
+       "sync"
+       "time"
 )
 
 // ReverseProxy is an HTTP Handler that takes an incoming request and
@@ -26,6 +29,12 @@ type ReverseProxy struct {
        // The Transport used to perform proxy requests.
        // If nil, DefaultTransport is used.
        Transport RoundTripper
+
+       // FlushInterval specifies the flush interval, in
+       // nanoseconds, to flush to the client while
+       // coping the response body.
+       // If zero, no periodic flushing is done.
+       FlushInterval int64
 }
 
 func singleJoiningSlash(a, b string) string {
@@ -95,6 +104,55 @@ func (p *ReverseProxy) ServeHTTP(rw ResponseWriter, req *Request) {
        rw.WriteHeader(res.StatusCode)
 
        if res.Body != nil {
-               io.Copy(rw, res.Body)
+               var dst io.Writer = rw
+               if p.FlushInterval != 0 {
+                       if wf, ok := rw.(writeFlusher); ok {
+                               dst = &maxLatencyWriter{dst: wf, latency: p.FlushInterval}
+                       }
+               }
+               io.Copy(dst, res.Body)
+       }
+}
+
+type writeFlusher interface {
+       io.Writer
+       Flusher
+}
+
+type maxLatencyWriter struct {
+       dst     writeFlusher
+       latency int64 // nanos
+
+       lk   sync.Mutex // protects init of done, as well Write + Flush
+       done chan bool
+}
+
+func (m *maxLatencyWriter) Write(p []byte) (n int, err os.Error) {
+       m.lk.Lock()
+       defer m.lk.Unlock()
+       if m.done == nil {
+               m.done = make(chan bool)
+               go m.flushLoop()
+       }
+       n, err = m.dst.Write(p)
+       if err != nil {
+               m.done <- true
+       }
+       return
+}
+
+func (m *maxLatencyWriter) flushLoop() {
+       t := time.NewTicker(m.latency)
+       defer t.Stop()
+       for {
+               select {
+               case <-t.C:
+                       m.lk.Lock()
+                       m.dst.Flush()
+                       m.lk.Unlock()
+               case <-m.done:
+                       return
+               }
        }
+       panic("unreached")
 }