From 95df156e6ac53f98efd6c57e4586c1dfb43066dd Mon Sep 17 00:00:00 2001 From: "Bryan C. Mills" Date: Mon, 10 Aug 2020 15:11:07 -0400 Subject: [PATCH] cmd/go/internal/par: add Queue as a simpler alternative to Work par.Work performs two different tasks: deduplicating work (a task which overlaps with par.Cache), and executing limited active work in parallel. It also requires the caller to re-invoke Do whenever the workqueue transititions from empty to non-empty. The new par.Queue only performs the second of those two tasks, and presents a simpler API: it starts and stops its own goroutines as needed (indicating its idle state via a channel), rather than expecting the caller to drive the transitions explicitly. For #36460 Change-Id: I5c38657dda63ab55718497467d05d41744ff59f2 Reviewed-on: https://go-review.googlesource.com/c/go/+/247766 Run-TryBot: Bryan C. Mills TryBot-Result: Gobot Gobot Reviewed-by: Jay Conrod --- src/cmd/go/internal/par/queue.go | 88 +++++++++++++++++++++++++++ src/cmd/go/internal/par/queue_test.go | 79 ++++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 src/cmd/go/internal/par/queue.go create mode 100644 src/cmd/go/internal/par/queue_test.go diff --git a/src/cmd/go/internal/par/queue.go b/src/cmd/go/internal/par/queue.go new file mode 100644 index 0000000000..180bc75e34 --- /dev/null +++ b/src/cmd/go/internal/par/queue.go @@ -0,0 +1,88 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package par + +import "fmt" + +// Queue manages a set of work items to be executed in parallel. The number of +// active work items is limited, and excess items are queued sequentially. +type Queue struct { + maxActive int + st chan queueState +} + +type queueState struct { + active int // number of goroutines processing work; always nonzero when len(backlog) > 0 + backlog []func() + idle chan struct{} // if non-nil, closed when active becomes 0 +} + +// NewQueue returns a Queue that executes up to maxActive items in parallel. +// +// maxActive must be positive. +func NewQueue(maxActive int) *Queue { + if maxActive < 1 { + panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive)) + } + + q := &Queue{ + maxActive: maxActive, + st: make(chan queueState, 1), + } + q.st <- queueState{} + return q +} + +// Add adds f as a work item in the queue. +// +// Add returns immediately, but the queue will be marked as non-idle until after +// f (and any subsequently-added work) has completed. +func (q *Queue) Add(f func()) { + st := <-q.st + if st.active == q.maxActive { + st.backlog = append(st.backlog, f) + q.st <- st + return + } + if st.active == 0 { + // Mark q as non-idle. + st.idle = nil + } + st.active++ + q.st <- st + + go func() { + for { + f() + + st := <-q.st + if len(st.backlog) == 0 { + if st.active--; st.active == 0 && st.idle != nil { + close(st.idle) + } + q.st <- st + return + } + f, st.backlog = st.backlog[0], st.backlog[1:] + q.st <- st + } + }() +} + +// Idle returns a channel that will be closed when q has no (active or enqueued) +// work outstanding. +func (q *Queue) Idle() <-chan struct{} { + st := <-q.st + defer func() { q.st <- st }() + + if st.idle == nil { + st.idle = make(chan struct{}) + if st.active == 0 { + close(st.idle) + } + } + + return st.idle +} diff --git a/src/cmd/go/internal/par/queue_test.go b/src/cmd/go/internal/par/queue_test.go new file mode 100644 index 0000000000..1331e65f98 --- /dev/null +++ b/src/cmd/go/internal/par/queue_test.go @@ -0,0 +1,79 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package par + +import ( + "sync" + "testing" +) + +func TestQueueIdle(t *testing.T) { + q := NewQueue(1) + select { + case <-q.Idle(): + default: + t.Errorf("NewQueue(1) is not initially idle.") + } + + started := make(chan struct{}) + unblock := make(chan struct{}) + q.Add(func() { + close(started) + <-unblock + }) + + <-started + idle := q.Idle() + select { + case <-idle: + t.Errorf("NewQueue(1) is marked idle while processing work.") + default: + } + + close(unblock) + <-idle // Should be closed as soon as the Add callback returns. +} + +func TestQueueBacklog(t *testing.T) { + const ( + maxActive = 2 + totalWork = 3 * maxActive + ) + + q := NewQueue(maxActive) + t.Logf("q = NewQueue(%d)", maxActive) + + var wg sync.WaitGroup + wg.Add(totalWork) + started := make([]chan struct{}, totalWork) + unblock := make(chan struct{}) + for i := range started { + started[i] = make(chan struct{}) + i := i + q.Add(func() { + close(started[i]) + <-unblock + wg.Done() + }) + } + + for i, c := range started { + if i < maxActive { + <-c // Work item i should be started immediately. + } else { + select { + case <-c: + t.Errorf("Work item %d started before previous items finished.", i) + default: + } + } + } + + close(unblock) + for _, c := range started[maxActive:] { + <-c + } + wg.Wait() +} -- 2.50.0