]> Cypherpunks repositories - gostls13.git/commitdiff
cmd/go/internal/par: add Queue as a simpler alternative to Work
authorBryan C. Mills <bcmills@google.com>
Mon, 10 Aug 2020 19:11:07 +0000 (15:11 -0400)
committerBryan C. Mills <bcmills@google.com>
Mon, 24 Aug 2020 21:08:46 +0000 (21:08 +0000)
par.Work performs two different tasks: deduplicating work (a task
which overlaps with par.Cache), and executing limited active work in
parallel. It also requires the caller to re-invoke Do whenever the
workqueue transititions from empty to non-empty.

The new par.Queue only performs the second of those two tasks, and
presents a simpler API: it starts and stops its own goroutines as
needed (indicating its idle state via a channel), rather than
expecting the caller to drive the transitions explicitly.

For #36460

Change-Id: I5c38657dda63ab55718497467d05d41744ff59f2
Reviewed-on: https://go-review.googlesource.com/c/go/+/247766
Run-TryBot: Bryan C. Mills <bcmills@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Jay Conrod <jayconrod@google.com>
src/cmd/go/internal/par/queue.go [new file with mode: 0644]
src/cmd/go/internal/par/queue_test.go [new file with mode: 0644]

diff --git a/src/cmd/go/internal/par/queue.go b/src/cmd/go/internal/par/queue.go
new file mode 100644 (file)
index 0000000..180bc75
--- /dev/null
@@ -0,0 +1,88 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package par
+
+import "fmt"
+
+// Queue manages a set of work items to be executed in parallel. The number of
+// active work items is limited, and excess items are queued sequentially.
+type Queue struct {
+       maxActive int
+       st        chan queueState
+}
+
+type queueState struct {
+       active  int // number of goroutines processing work; always nonzero when len(backlog) > 0
+       backlog []func()
+       idle    chan struct{} // if non-nil, closed when active becomes 0
+}
+
+// NewQueue returns a Queue that executes up to maxActive items in parallel.
+//
+// maxActive must be positive.
+func NewQueue(maxActive int) *Queue {
+       if maxActive < 1 {
+               panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive))
+       }
+
+       q := &Queue{
+               maxActive: maxActive,
+               st:        make(chan queueState, 1),
+       }
+       q.st <- queueState{}
+       return q
+}
+
+// Add adds f as a work item in the queue.
+//
+// Add returns immediately, but the queue will be marked as non-idle until after
+// f (and any subsequently-added work) has completed.
+func (q *Queue) Add(f func()) {
+       st := <-q.st
+       if st.active == q.maxActive {
+               st.backlog = append(st.backlog, f)
+               q.st <- st
+               return
+       }
+       if st.active == 0 {
+               // Mark q as non-idle.
+               st.idle = nil
+       }
+       st.active++
+       q.st <- st
+
+       go func() {
+               for {
+                       f()
+
+                       st := <-q.st
+                       if len(st.backlog) == 0 {
+                               if st.active--; st.active == 0 && st.idle != nil {
+                                       close(st.idle)
+                               }
+                               q.st <- st
+                               return
+                       }
+                       f, st.backlog = st.backlog[0], st.backlog[1:]
+                       q.st <- st
+               }
+       }()
+}
+
+// Idle returns a channel that will be closed when q has no (active or enqueued)
+// work outstanding.
+func (q *Queue) Idle() <-chan struct{} {
+       st := <-q.st
+       defer func() { q.st <- st }()
+
+       if st.idle == nil {
+               st.idle = make(chan struct{})
+               if st.active == 0 {
+                       close(st.idle)
+               }
+       }
+
+       return st.idle
+}
diff --git a/src/cmd/go/internal/par/queue_test.go b/src/cmd/go/internal/par/queue_test.go
new file mode 100644 (file)
index 0000000..1331e65
--- /dev/null
@@ -0,0 +1,79 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package par
+
+import (
+       "sync"
+       "testing"
+)
+
+func TestQueueIdle(t *testing.T) {
+       q := NewQueue(1)
+       select {
+       case <-q.Idle():
+       default:
+               t.Errorf("NewQueue(1) is not initially idle.")
+       }
+
+       started := make(chan struct{})
+       unblock := make(chan struct{})
+       q.Add(func() {
+               close(started)
+               <-unblock
+       })
+
+       <-started
+       idle := q.Idle()
+       select {
+       case <-idle:
+               t.Errorf("NewQueue(1) is marked idle while processing work.")
+       default:
+       }
+
+       close(unblock)
+       <-idle // Should be closed as soon as the Add callback returns.
+}
+
+func TestQueueBacklog(t *testing.T) {
+       const (
+               maxActive = 2
+               totalWork = 3 * maxActive
+       )
+
+       q := NewQueue(maxActive)
+       t.Logf("q = NewQueue(%d)", maxActive)
+
+       var wg sync.WaitGroup
+       wg.Add(totalWork)
+       started := make([]chan struct{}, totalWork)
+       unblock := make(chan struct{})
+       for i := range started {
+               started[i] = make(chan struct{})
+               i := i
+               q.Add(func() {
+                       close(started[i])
+                       <-unblock
+                       wg.Done()
+               })
+       }
+
+       for i, c := range started {
+               if i < maxActive {
+                       <-c // Work item i should be started immediately.
+               } else {
+                       select {
+                       case <-c:
+                               t.Errorf("Work item %d started before previous items finished.", i)
+                       default:
+                       }
+               }
+       }
+
+       close(unblock)
+       for _, c := range started[maxActive:] {
+               <-c
+       }
+       wg.Wait()
+}