"io"
"log"
"net"
+ "os"
"strings"
+ "sync"
+ "time"
)
// ReverseProxy is an HTTP Handler that takes an incoming request and
// The Transport used to perform proxy requests.
// If nil, DefaultTransport is used.
Transport RoundTripper
+
+ // FlushInterval specifies the flush interval, in
+ // nanoseconds, to flush to the client while
+ // coping the response body.
+ // If zero, no periodic flushing is done.
+ FlushInterval int64
}
func singleJoiningSlash(a, b string) string {
rw.WriteHeader(res.StatusCode)
if res.Body != nil {
- io.Copy(rw, res.Body)
+ var dst io.Writer = rw
+ if p.FlushInterval != 0 {
+ if wf, ok := rw.(writeFlusher); ok {
+ dst = &maxLatencyWriter{dst: wf, latency: p.FlushInterval}
+ }
+ }
+ io.Copy(dst, res.Body)
+ }
+}
+
+type writeFlusher interface {
+ io.Writer
+ Flusher
+}
+
+type maxLatencyWriter struct {
+ dst writeFlusher
+ latency int64 // nanos
+
+ lk sync.Mutex // protects init of done, as well Write + Flush
+ done chan bool
+}
+
+func (m *maxLatencyWriter) Write(p []byte) (n int, err os.Error) {
+ m.lk.Lock()
+ defer m.lk.Unlock()
+ if m.done == nil {
+ m.done = make(chan bool)
+ go m.flushLoop()
+ }
+ n, err = m.dst.Write(p)
+ if err != nil {
+ m.done <- true
+ }
+ return
+}
+
+func (m *maxLatencyWriter) flushLoop() {
+ t := time.NewTicker(m.latency)
+ defer t.Stop()
+ for {
+ select {
+ case <-t.C:
+ m.lk.Lock()
+ m.dst.Flush()
+ m.lk.Unlock()
+ case <-m.done:
+ return
+ }
}
+ panic("unreached")
}