// - If the resulting value is zero or out of range, use a default.
type http2http2Config struct {
MaxConcurrentStreams uint32
+ StrictMaxConcurrentRequests bool
MaxDecoderHeaderTableSize uint32
MaxEncoderHeaderTableSize uint32
MaxReadFrameSize uint32
// (the net/http Transport).
func http2configFromTransport(h2 *http2Transport) http2http2Config {
conf := http2http2Config{
- MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize,
- MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize,
- MaxReadFrameSize: h2.MaxReadFrameSize,
- SendPingTimeout: h2.ReadIdleTimeout,
- PingTimeout: h2.PingTimeout,
- WriteByteTimeout: h2.WriteByteTimeout,
+ StrictMaxConcurrentRequests: h2.StrictMaxConcurrentStreams,
+ MaxEncoderHeaderTableSize: h2.MaxEncoderHeaderTableSize,
+ MaxDecoderHeaderTableSize: h2.MaxDecoderHeaderTableSize,
+ MaxReadFrameSize: h2.MaxReadFrameSize,
+ SendPingTimeout: h2.ReadIdleTimeout,
+ PingTimeout: h2.PingTimeout,
+ WriteByteTimeout: h2.WriteByteTimeout,
}
// Unlike most config fields, where out-of-range values revert to the default,
if h2.MaxConcurrentStreams != 0 {
conf.MaxConcurrentStreams = uint32(h2.MaxConcurrentStreams)
}
+ if http2http2ConfigStrictMaxConcurrentRequests(h2) {
+ conf.StrictMaxConcurrentRequests = true
+ }
if h2.MaxEncoderHeaderTableSize != 0 {
conf.MaxEncoderHeaderTableSize = uint32(h2.MaxEncoderHeaderTableSize)
}
}
}
+func http2http2ConfigStrictMaxConcurrentRequests(h2 *HTTP2Config) bool {
+ return h2.StrictMaxConcurrentRequests
+}
+
// Buffer chunks are allocated from a pool to reduce pressure on GC.
// The maximum wasted space per dataBuffer is 2x the largest size class,
// which happens when the dataBuffer has multiple chunks and there is
func (f *http2Framer) startWrite(ftype http2FrameType, flags http2Flags, streamID uint32) {
// Write the FrameHeader.
f.wbuf = append(f.wbuf[:0],
- 0, // 3 bytes of length, filled in in endWrite
+ 0, // 3 bytes of length, filled in endWrite
0,
0,
byte(ftype),
http2PriorityParam
}
+var http2defaultRFC9218Priority = http2PriorityParam{
+ incremental: 0,
+ urgency: 3,
+}
+
+// Note that HTTP/2 has had two different prioritization schemes, and
+// PriorityParam struct below is a superset of both schemes. The exported
+// symbols are from RFC 7540 and the non-exported ones are from RFC 9218.
+
// PriorityParam are the stream prioritzation parameters.
type http2PriorityParam struct {
// StreamDep is a 31-bit stream identifier for the
// the spec, "Add one to the value to obtain a weight between
// 1 and 256."
Weight uint8
+
+ // "The urgency (u) parameter value is Integer (see Section 3.3.1 of
+ // [STRUCTURED-FIELDS]), between 0 and 7 inclusive, in descending order of
+ // priority. The default is 3."
+ urgency uint8
+
+ // "The incremental (i) parameter value is Boolean (see Section 3.3.6 of
+ // [STRUCTURED-FIELDS]). It indicates if an HTTP response can be processed
+ // incrementally, i.e., provide some meaningful output as chunks of the
+ // response arrive."
+ //
+ // We use uint8 (i.e. 0 is false, 1 is true) instead of bool so we can
+ // avoid unnecessary type conversions and because either type takes 1 byte.
+ incremental uint8
}
func (p http2PriorityParam) IsZero() bool {
http2VerboseLogs bool
http2logFrameWrites bool
http2logFrameReads bool
- http2inTests bool
// Enabling extended CONNECT by causes browsers to attempt to use
// WebSockets-over-HTTP/2. This results in problems when the server's websocket
type http2serverInternalState struct {
mu sync.Mutex
activeConns map[*http2serverConn]struct{}
+
+ // Pool of error channels. This is per-Server rather than global
+ // because channels can't be reused across synctest bubbles.
+ errChanPool sync.Pool
}
func (s *http2serverInternalState) registerConn(sc *http2serverConn) {
s.mu.Unlock()
}
+// Global error channel pool used for uninitialized Servers.
+// We use a per-Server pool when possible to avoid using channels across synctest bubbles.
+var http2errChanPool = sync.Pool{
+ New: func() any { return make(chan error, 1) },
+}
+
+func (s *http2serverInternalState) getErrChan() chan error {
+ if s == nil {
+ return http2errChanPool.Get().(chan error) // Server used without calling ConfigureServer
+ }
+ return s.errChanPool.Get().(chan error)
+}
+
+func (s *http2serverInternalState) putErrChan(ch chan error) {
+ if s == nil {
+ http2errChanPool.Put(ch) // Server used without calling ConfigureServer
+ return
+ }
+ s.errChanPool.Put(ch)
+}
+
// ConfigureServer adds HTTP/2 support to a net/http Server.
//
// The configuration conf may be nil.
if conf == nil {
conf = new(http2Server)
}
- conf.state = &http2serverInternalState{activeConns: make(map[*http2serverConn]struct{})}
+ conf.state = &http2serverInternalState{
+ activeConns: make(map[*http2serverConn]struct{}),
+ errChanPool: sync.Pool{New: func() any { return make(chan error, 1) }},
+ }
if h1, h2 := s, conf; h2.IdleTimeout == 0 {
if h1.IdleTimeout != 0 {
h2.IdleTimeout = h1.IdleTimeout
}
}
-var http2errChanPool = sync.Pool{
- New: func() interface{} { return make(chan error, 1) },
-}
-
-func http2getErrChan() chan error {
- if http2inTests {
- // Channels cannot be reused across synctest tests.
- return make(chan error, 1)
- } else {
- return http2errChanPool.Get().(chan error)
- }
-}
-
-func http2putErrChan(ch chan error) {
- if !http2inTests {
- http2errChanPool.Put(ch)
- }
-}
-
var http2writeDataPool = sync.Pool{
New: func() interface{} { return new(http2writeData) },
}
// writeDataFromHandler writes DATA response frames from a handler on
// the given stream.
func (sc *http2serverConn) writeDataFromHandler(stream *http2stream, data []byte, endStream bool) error {
- ch := http2getErrChan()
+ ch := sc.srv.state.getErrChan()
writeArg := http2writeDataPool.Get().(*http2writeData)
*writeArg = http2writeData{stream.id, data, endStream}
err := sc.writeFrameFromHandler(http2FrameWriteRequest{
return http2errStreamClosed
}
}
- http2putErrChan(ch)
+ sc.srv.state.putErrChan(ch)
if frameWriteDone {
http2writeDataPool.Put(writeArg)
}
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
// mutates it.
- errc = http2getErrChan()
+ errc = sc.srv.state.getErrChan()
}
if err := sc.writeFrameFromHandler(http2FrameWriteRequest{
write: headerData,
if errc != nil {
select {
case err := <-errc:
- http2putErrChan(errc)
+ sc.srv.state.putErrChan(errc)
return err
case <-sc.doneServing:
return http2errClientDisconnected
method: opts.Method,
url: u,
header: http2cloneHeader(opts.Header),
- done: http2getErrChan(),
+ done: sc.srv.state.getErrChan(),
}
select {
case <-st.cw:
return http2errStreamClosed
case err := <-msg.done:
- http2putErrChan(msg.done)
+ sc.srv.state.putErrChan(msg.done)
return err
}
}
readIdleTimeout time.Duration
pingTimeout time.Duration
extendedConnectAllowed bool
+ strictMaxConcurrentStreams bool
// rstStreamPingsBlocked works around an unfortunate gRPC behavior.
// gRPC strictly limits the number of PING frames that it will receive.
initialWindowSize: 65535, // spec default
initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
maxConcurrentStreams: http2initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
- peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
+ strictMaxConcurrentStreams: conf.StrictMaxConcurrentRequests,
+ peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
streams: make(map[uint32]*http2clientStream),
singleUse: singleUse,
seenSettingsChan: make(chan struct{}),
return
}
var maxConcurrentOkay bool
- if cc.t.StrictMaxConcurrentStreams {
+ if cc.strictMaxConcurrentStreams {
// We'll tell the caller we can take a new request to
// prevent the caller from dialing a new TCP
// connection, but then we'll block later before
// PusherID is zero if the stream was initiated by the client. Otherwise,
// PusherID names the stream that pushed the newly opened stream.
PusherID uint32
+ // priority is used to set the priority of the newly opened stream.
+ priority http2PriorityParam
}
// FrameWriteRequest is a request to write a frame.
}
// RFC 7540, Section 5.3.5: the default weight is 16.
-const http2priorityDefaultWeight = 15 // 16 = 15 + 1
+const http2priorityDefaultWeightRFC7540 = 15 // 16 = 15 + 1
// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
type http2PriorityWriteSchedulerConfig struct {
}
}
- ws := &http2priorityWriteScheduler{
- nodes: make(map[uint32]*http2priorityNode),
+ ws := &http2priorityWriteSchedulerRFC7540{
+ nodes: make(map[uint32]*http2priorityNodeRFC7540),
maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
maxIdleNodesInTree: cfg.MaxIdleNodesInTree,
enableWriteThrottle: cfg.ThrottleOutOfOrderWrites,
return ws
}
-type http2priorityNodeState int
+type http2priorityNodeStateRFC7540 int
const (
- http2priorityNodeOpen http2priorityNodeState = iota
- http2priorityNodeClosed
- http2priorityNodeIdle
+ http2priorityNodeOpenRFC7540 http2priorityNodeStateRFC7540 = iota
+ http2priorityNodeClosedRFC7540
+ http2priorityNodeIdleRFC7540
)
-// priorityNode is a node in an HTTP/2 priority tree.
+// priorityNodeRFC7540 is a node in an HTTP/2 priority tree.
// Each node is associated with a single stream ID.
// See RFC 7540, Section 5.3.
-type http2priorityNode struct {
- q http2writeQueue // queue of pending frames to write
- id uint32 // id of the stream, or 0 for the root of the tree
- weight uint8 // the actual weight is weight+1, so the value is in [1,256]
- state http2priorityNodeState // open | closed | idle
- bytes int64 // number of bytes written by this node, or 0 if closed
- subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree
+type http2priorityNodeRFC7540 struct {
+ q http2writeQueue // queue of pending frames to write
+ id uint32 // id of the stream, or 0 for the root of the tree
+ weight uint8 // the actual weight is weight+1, so the value is in [1,256]
+ state http2priorityNodeStateRFC7540 // open | closed | idle
+ bytes int64 // number of bytes written by this node, or 0 if closed
+ subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree
// These links form the priority tree.
- parent *http2priorityNode
- kids *http2priorityNode // start of the kids list
- prev, next *http2priorityNode // doubly-linked list of siblings
+ parent *http2priorityNodeRFC7540
+ kids *http2priorityNodeRFC7540 // start of the kids list
+ prev, next *http2priorityNodeRFC7540 // doubly-linked list of siblings
}
-func (n *http2priorityNode) setParent(parent *http2priorityNode) {
+func (n *http2priorityNodeRFC7540) setParent(parent *http2priorityNodeRFC7540) {
if n == parent {
panic("setParent to self")
}
}
}
-func (n *http2priorityNode) addBytes(b int64) {
+func (n *http2priorityNodeRFC7540) addBytes(b int64) {
n.bytes += b
for ; n != nil; n = n.parent {
n.subtreeBytes += b
//
// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
// if any ancestor p of n is still open (ignoring the root node).
-func (n *http2priorityNode) walkReadyInOrder(openParent bool, tmp *[]*http2priorityNode, f func(*http2priorityNode, bool) bool) bool {
+func (n *http2priorityNodeRFC7540) walkReadyInOrder(openParent bool, tmp *[]*http2priorityNodeRFC7540, f func(*http2priorityNodeRFC7540, bool) bool) bool {
if !n.q.empty() && f(n, openParent) {
return true
}
// Don't consider the root "open" when updating openParent since
// we can't send data frames on the root stream (only control frames).
if n.id != 0 {
- openParent = openParent || (n.state == http2priorityNodeOpen)
+ openParent = openParent || (n.state == http2priorityNodeOpenRFC7540)
}
// Common case: only one kid or all kids have the same weight.
*tmp = append(*tmp, n.kids)
n.kids.setParent(nil)
}
- sort.Sort(http2sortPriorityNodeSiblings(*tmp))
+ sort.Sort(http2sortPriorityNodeSiblingsRFC7540(*tmp))
for i := len(*tmp) - 1; i >= 0; i-- {
(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
}
return false
}
-type http2sortPriorityNodeSiblings []*http2priorityNode
+type http2sortPriorityNodeSiblingsRFC7540 []*http2priorityNodeRFC7540
-func (z http2sortPriorityNodeSiblings) Len() int { return len(z) }
+func (z http2sortPriorityNodeSiblingsRFC7540) Len() int { return len(z) }
-func (z http2sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
+func (z http2sortPriorityNodeSiblingsRFC7540) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
-func (z http2sortPriorityNodeSiblings) Less(i, k int) bool {
+func (z http2sortPriorityNodeSiblingsRFC7540) Less(i, k int) bool {
// Prefer the subtree that has sent fewer bytes relative to its weight.
// See sections 5.3.2 and 5.3.4.
wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
return bi/bk <= wi/wk
}
-type http2priorityWriteScheduler struct {
+type http2priorityWriteSchedulerRFC7540 struct {
// root is the root of the priority tree, where root.id = 0.
// The root queues control frames that are not associated with any stream.
- root http2priorityNode
+ root http2priorityNodeRFC7540
// nodes maps stream ids to priority tree nodes.
- nodes map[uint32]*http2priorityNode
+ nodes map[uint32]*http2priorityNodeRFC7540
// maxID is the maximum stream id in nodes.
maxID uint32
// lists of nodes that have been closed or are idle, but are kept in
// the tree for improved prioritization. When the lengths exceed either
// maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
- closedNodes, idleNodes []*http2priorityNode
+ closedNodes, idleNodes []*http2priorityNodeRFC7540
// From the config.
maxClosedNodesInTree int
enableWriteThrottle bool
// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
- tmp []*http2priorityNode
+ tmp []*http2priorityNodeRFC7540
// pool of empty queues for reuse.
queuePool http2writeQueuePool
}
-func (ws *http2priorityWriteScheduler) OpenStream(streamID uint32, options http2OpenStreamOptions) {
+func (ws *http2priorityWriteSchedulerRFC7540) OpenStream(streamID uint32, options http2OpenStreamOptions) {
// The stream may be currently idle but cannot be opened or closed.
if curr := ws.nodes[streamID]; curr != nil {
- if curr.state != http2priorityNodeIdle {
+ if curr.state != http2priorityNodeIdleRFC7540 {
panic(fmt.Sprintf("stream %d already opened", streamID))
}
- curr.state = http2priorityNodeOpen
+ curr.state = http2priorityNodeOpenRFC7540
return
}
if parent == nil {
parent = &ws.root
}
- n := &http2priorityNode{
+ n := &http2priorityNodeRFC7540{
q: *ws.queuePool.get(),
id: streamID,
- weight: http2priorityDefaultWeight,
- state: http2priorityNodeOpen,
+ weight: http2priorityDefaultWeightRFC7540,
+ state: http2priorityNodeOpenRFC7540,
}
n.setParent(parent)
ws.nodes[streamID] = n
}
}
-func (ws *http2priorityWriteScheduler) CloseStream(streamID uint32) {
+func (ws *http2priorityWriteSchedulerRFC7540) CloseStream(streamID uint32) {
if streamID == 0 {
panic("violation of WriteScheduler interface: cannot close stream 0")
}
if ws.nodes[streamID] == nil {
panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
}
- if ws.nodes[streamID].state != http2priorityNodeOpen {
+ if ws.nodes[streamID].state != http2priorityNodeOpenRFC7540 {
panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
}
n := ws.nodes[streamID]
- n.state = http2priorityNodeClosed
+ n.state = http2priorityNodeClosedRFC7540
n.addBytes(-n.bytes)
q := n.q
}
}
-func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority http2PriorityParam) {
+func (ws *http2priorityWriteSchedulerRFC7540) AdjustStream(streamID uint32, priority http2PriorityParam) {
if streamID == 0 {
panic("adjustPriority on root")
}
return
}
ws.maxID = streamID
- n = &http2priorityNode{
+ n = &http2priorityNodeRFC7540{
q: *ws.queuePool.get(),
id: streamID,
- weight: http2priorityDefaultWeight,
- state: http2priorityNodeIdle,
+ weight: http2priorityDefaultWeightRFC7540,
+ state: http2priorityNodeIdleRFC7540,
}
n.setParent(&ws.root)
ws.nodes[streamID] = n
parent := ws.nodes[priority.StreamDep]
if parent == nil {
n.setParent(&ws.root)
- n.weight = http2priorityDefaultWeight
+ n.weight = http2priorityDefaultWeightRFC7540
return
}
n.weight = priority.Weight
}
-func (ws *http2priorityWriteScheduler) Push(wr http2FrameWriteRequest) {
- var n *http2priorityNode
+func (ws *http2priorityWriteSchedulerRFC7540) Push(wr http2FrameWriteRequest) {
+ var n *http2priorityNodeRFC7540
if wr.isControl() {
n = &ws.root
} else {
n.q.push(wr)
}
-func (ws *http2priorityWriteScheduler) Pop() (wr http2FrameWriteRequest, ok bool) {
- ws.root.walkReadyInOrder(false, &ws.tmp, func(n *http2priorityNode, openParent bool) bool {
+func (ws *http2priorityWriteSchedulerRFC7540) Pop() (wr http2FrameWriteRequest, ok bool) {
+ ws.root.walkReadyInOrder(false, &ws.tmp, func(n *http2priorityNodeRFC7540, openParent bool) bool {
limit := int32(math.MaxInt32)
if openParent {
limit = ws.writeThrottleLimit
return wr, ok
}
-func (ws *http2priorityWriteScheduler) addClosedOrIdleNode(list *[]*http2priorityNode, maxSize int, n *http2priorityNode) {
+func (ws *http2priorityWriteSchedulerRFC7540) addClosedOrIdleNode(list *[]*http2priorityNodeRFC7540, maxSize int, n *http2priorityNodeRFC7540) {
if maxSize == 0 {
return
}
*list = append(*list, n)
}
-func (ws *http2priorityWriteScheduler) removeNode(n *http2priorityNode) {
+func (ws *http2priorityWriteSchedulerRFC7540) removeNode(n *http2priorityNodeRFC7540) {
for n.kids != nil {
n.kids.setParent(n.parent)
}
delete(ws.nodes, n.id)
}
+type http2streamMetadata struct {
+ location *http2writeQueue
+ priority http2PriorityParam
+}
+
+type http2priorityWriteSchedulerRFC9218 struct {
+ // control contains control frames (SETTINGS, PING, etc.).
+ control http2writeQueue
+
+ // heads contain the head of a circular list of streams.
+ // We put these heads within a nested array that represents urgency and
+ // incremental, as defined in
+ // https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters.
+ // 8 represents u=0 up to u=7, and 2 represents i=false and i=true.
+ heads [8][2]*http2writeQueue
+
+ // streams contains a mapping between each stream ID and their metadata, so
+ // we can quickly locate them when needing to, for example, adjust their
+ // priority.
+ streams map[uint32]http2streamMetadata
+
+ // queuePool are empty queues for reuse.
+ queuePool http2writeQueuePool
+
+ // prioritizeIncremental is used to determine whether we should prioritize
+ // incremental streams or not, when urgency is the same in a given Pop()
+ // call.
+ prioritizeIncremental bool
+}
+
+func http2newPriorityWriteSchedulerRFC9128() http2WriteScheduler {
+ ws := &http2priorityWriteSchedulerRFC9218{
+ streams: make(map[uint32]http2streamMetadata),
+ }
+ return ws
+}
+
+func (ws *http2priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt http2OpenStreamOptions) {
+ if ws.streams[streamID].location != nil {
+ panic(fmt.Errorf("stream %d already opened", streamID))
+ }
+ q := ws.queuePool.get()
+ ws.streams[streamID] = http2streamMetadata{
+ location: q,
+ priority: opt.priority,
+ }
+
+ u, i := opt.priority.urgency, opt.priority.incremental
+ if ws.heads[u][i] == nil {
+ ws.heads[u][i] = q
+ q.next = q
+ q.prev = q
+ } else {
+ // Queues are stored in a ring.
+ // Insert the new stream before ws.head, putting it at the end of the list.
+ q.prev = ws.heads[u][i].prev
+ q.next = ws.heads[u][i]
+ q.prev.next = q
+ q.next.prev = q
+ }
+}
+
+func (ws *http2priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
+ metadata := ws.streams[streamID]
+ q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
+ if q == nil {
+ return
+ }
+ if q.next == q {
+ // This was the only open stream.
+ ws.heads[u][i] = nil
+ } else {
+ q.prev.next = q.next
+ q.next.prev = q.prev
+ if ws.heads[u][i] == q {
+ ws.heads[u][i] = q.next
+ }
+ }
+ delete(ws.streams, streamID)
+ ws.queuePool.put(q)
+}
+
+func (ws *http2priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority http2PriorityParam) {
+ metadata := ws.streams[streamID]
+ q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
+ if q == nil {
+ return
+ }
+
+ // Remove stream from current location.
+ if q.next == q {
+ // This was the only open stream.
+ ws.heads[u][i] = nil
+ } else {
+ q.prev.next = q.next
+ q.next.prev = q.prev
+ if ws.heads[u][i] == q {
+ ws.heads[u][i] = q.next
+ }
+ }
+
+ // Insert stream to the new queue.
+ u, i = priority.urgency, priority.incremental
+ if ws.heads[u][i] == nil {
+ ws.heads[u][i] = q
+ q.next = q
+ q.prev = q
+ } else {
+ // Queues are stored in a ring.
+ // Insert the new stream before ws.head, putting it at the end of the list.
+ q.prev = ws.heads[u][i].prev
+ q.next = ws.heads[u][i]
+ q.prev.next = q
+ q.next.prev = q
+ }
+}
+
+func (ws *http2priorityWriteSchedulerRFC9218) Push(wr http2FrameWriteRequest) {
+ if wr.isControl() {
+ ws.control.push(wr)
+ return
+ }
+ q := ws.streams[wr.StreamID()].location
+ if q == nil {
+ // This is a closed stream.
+ // wr should not be a HEADERS or DATA frame.
+ // We push the request onto the control queue.
+ if wr.DataSize() > 0 {
+ panic("add DATA on non-open stream")
+ }
+ ws.control.push(wr)
+ return
+ }
+ q.push(wr)
+}
+
+func (ws *http2priorityWriteSchedulerRFC9218) Pop() (http2FrameWriteRequest, bool) {
+ // Control and RST_STREAM frames first.
+ if !ws.control.empty() {
+ return ws.control.shift(), true
+ }
+
+ // On the next Pop(), we want to prioritize incremental if we prioritized
+ // non-incremental request of the same urgency this time. Vice-versa.
+ // i.e. when there are incremental and non-incremental requests at the same
+ // priority, we give 50% of our bandwidth to the incremental ones in
+ // aggregate and 50% to the first non-incremental one (since
+ // non-incremental streams do not use round-robin writes).
+ ws.prioritizeIncremental = !ws.prioritizeIncremental
+
+ // Always prioritize lowest u (i.e. highest urgency level).
+ for u := range ws.heads {
+ for i := range ws.heads[u] {
+ // When we want to prioritize incremental, we try to pop i=true
+ // first before i=false when u is the same.
+ if ws.prioritizeIncremental {
+ i = (i + 1) % 2
+ }
+ q := ws.heads[u][i]
+ if q == nil {
+ continue
+ }
+ for {
+ if wr, ok := q.consume(math.MaxInt32); ok {
+ if i == 1 {
+ // For incremental streams, we update head to q.next so
+ // we can round-robin between multiple streams that can
+ // immediately benefit from partial writes.
+ ws.heads[u][i] = q.next
+ } else {
+ // For non-incremental streams, we try to finish one to
+ // completion rather than doing round-robin. However,
+ // we update head here so that if q.consume() is !ok
+ // (e.g. the stream has no more frame to consume), head
+ // is updated to the next q that has frames to consume
+ // on future iterations. This way, we do not prioritize
+ // writing to unavailable stream on next Pop() calls,
+ // preventing head-of-line blocking.
+ ws.heads[u][i] = q
+ }
+ return wr, true
+ }
+ q = q.next
+ if q == ws.heads[u][i] {
+ break
+ }
+ }
+
+ }
+ }
+ return http2FrameWriteRequest{}, false
+}
+
// NewRandomWriteScheduler constructs a WriteScheduler that ignores HTTP/2
// priorities. Control frames like SETTINGS and PING are written before DATA
// frames, but if no control frames are queued and multiple streams have queued
}
// newRoundRobinWriteScheduler constructs a new write scheduler.
-// The round robin scheduler priorizes control frames
+// The round robin scheduler prioritizes control frames
// like SETTINGS and PING over DATA frames.
// When there are no control frames to send, it performs a round-robin
// selection from the ready streams.