// Explore work graph in parallel in case reqs.Required
// does high-latency network operations.
- var work par.Work
+ var work par.Work[module.Version]
for _, target := range targets {
work.Add(target)
}
- work.Do(10, func(item any) {
- m := item.(module.Version)
+ work.Do(10, func(m module.Version) {
var required []module.Version
var err error
// Work manages a set of work items to be executed in parallel, at most once each.
// The items in the set must all be valid map keys.
-type Work struct {
- f func(any) // function to run for each item
- running int // total number of runners
+type Work[T comparable] struct {
+ f func(T) // function to run for each item
+ running int // total number of runners
mu sync.Mutex
- added map[any]bool // items added to set
- todo []any // items yet to be run
- wait sync.Cond // wait when todo is empty
- waiting int // number of runners waiting for todo
+ added map[T]bool // items added to set
+ todo []T // items yet to be run
+ wait sync.Cond // wait when todo is empty
+ waiting int // number of runners waiting for todo
}
-func (w *Work) init() {
+func (w *Work[T]) init() {
if w.added == nil {
- w.added = make(map[any]bool)
+ w.added = make(map[T]bool)
}
}
// Add adds item to the work set, if it hasn't already been added.
-func (w *Work) Add(item any) {
+func (w *Work[T]) Add(item T) {
w.mu.Lock()
w.init()
if !w.added[item] {
// before calling Do (or else Do returns immediately),
// but it is allowed for f(item) to add new items to the set.
// Do should only be used once on a given Work.
-func (w *Work) Do(n int, f func(item any)) {
+func (w *Work[T]) Do(n int, f func(item T)) {
if n < 1 {
panic("par.Work.Do: n < 1")
}
// runner executes work in w until both nothing is left to do
// and all the runners are waiting for work.
// (Then all the runners return.)
-func (w *Work) runner() {
+func (w *Work[T]) runner() {
for {
// Wait for something to do.
w.mu.Lock()
)
func TestWork(t *testing.T) {
- var w Work
+ var w Work[int]
const N = 10000
n := int32(0)
w.Add(N)
- w.Do(100, func(x any) {
+ w.Do(100, func(i int) {
atomic.AddInt32(&n, 1)
- i := x.(int)
if i >= 2 {
w.Add(i - 1)
w.Add(i - 2)
func TestWorkParallel(t *testing.T) {
for tries := 0; tries < 10; tries++ {
- var w Work
+ var w Work[int]
const N = 100
for i := 0; i < N; i++ {
w.Add(i)
}
start := time.Now()
var n int32
- w.Do(N, func(x any) {
+ w.Do(N, func(x int) {
time.Sleep(1 * time.Millisecond)
atomic.AddInt32(&n, +1)
})