return (*LFNode)(unsafe.Pointer(lfstackpop(head)))
}
-type ParFor struct {
- body func(*ParFor, uint32)
- done uint32
- Nthr uint32
- thrseq uint32
- Cnt uint32
- wait bool
-}
-
-func NewParFor(nthrmax uint32) *ParFor {
- var desc *ParFor
- systemstack(func() {
- desc = (*ParFor)(unsafe.Pointer(parforalloc(nthrmax)))
- })
- return desc
-}
-
-func ParForSetup(desc *ParFor, nthr, n uint32, wait bool, body func(*ParFor, uint32)) {
- systemstack(func() {
- parforsetup((*parfor)(unsafe.Pointer(desc)), nthr, n, wait,
- *(*func(*parfor, uint32))(unsafe.Pointer(&body)))
- })
-}
-
-func ParForDo(desc *ParFor) {
- systemstack(func() {
- parfordo((*parfor)(unsafe.Pointer(desc)))
- })
-}
-
-func ParForIters(desc *ParFor, tid uint32) (uint32, uint32) {
- desc1 := (*parfor)(unsafe.Pointer(desc))
- pos := desc1.thr[tid].pos
- return uint32(pos), uint32(pos >> 32)
-}
-
func GCMask(x interface{}) (ret []byte) {
systemstack(func() {
ret = getgcmask(x)
+++ /dev/null
-// Copyright 2012 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// Parallel for algorithm.
-
-package runtime
-
-import (
- "runtime/internal/atomic"
- "runtime/internal/sys"
-)
-
-// A parfor holds state for the parallel for operation.
-type parfor struct {
- body func(*parfor, uint32) // executed for each element
- done uint32 // number of idle threads
- nthr uint32 // total number of threads
- thrseq uint32 // thread id sequencer
- cnt uint32 // iteration space [0, cnt)
- wait bool // if true, wait while all threads finish processing,
- // otherwise parfor may return while other threads are still working
-
- thr []parforthread // thread descriptors
-
- // stats
- nsteal uint64
- nstealcnt uint64
- nprocyield uint64
- nosyield uint64
- nsleep uint64
-}
-
-// A parforthread holds state for a single thread in the parallel for.
-type parforthread struct {
- // the thread's iteration space [32lsb, 32msb)
- pos uint64
- // stats
- nsteal uint64
- nstealcnt uint64
- nprocyield uint64
- nosyield uint64
- nsleep uint64
- pad [sys.CacheLineSize]byte
-}
-
-func parforalloc(nthrmax uint32) *parfor {
- return &parfor{
- thr: make([]parforthread, nthrmax),
- }
-}
-
-// Parforsetup initializes desc for a parallel for operation with nthr
-// threads executing n jobs.
-//
-// On return the nthr threads are each expected to call parfordo(desc)
-// to run the operation. During those calls, for each i in [0, n), one
-// thread will be used invoke body(desc, i).
-// If wait is true, no parfordo will return until all work has been completed.
-// If wait is false, parfordo may return when there is a small amount
-// of work left, under the assumption that another thread has that
-// work well in hand.
-func parforsetup(desc *parfor, nthr, n uint32, wait bool, body func(*parfor, uint32)) {
- if desc == nil || nthr == 0 || nthr > uint32(len(desc.thr)) || body == nil {
- print("desc=", desc, " nthr=", nthr, " count=", n, " body=", body, "\n")
- throw("parfor: invalid args")
- }
-
- desc.body = body
- desc.done = 0
- desc.nthr = nthr
- desc.thrseq = 0
- desc.cnt = n
- desc.wait = wait
- desc.nsteal = 0
- desc.nstealcnt = 0
- desc.nprocyield = 0
- desc.nosyield = 0
- desc.nsleep = 0
-
- for i := range desc.thr {
- begin := uint32(uint64(n) * uint64(i) / uint64(nthr))
- end := uint32(uint64(n) * uint64(i+1) / uint64(nthr))
- desc.thr[i].pos = uint64(begin) | uint64(end)<<32
- }
-}
-
-func parfordo(desc *parfor) {
- // Obtain 0-based thread index.
- tid := atomic.Xadd(&desc.thrseq, 1) - 1
- if tid >= desc.nthr {
- print("tid=", tid, " nthr=", desc.nthr, "\n")
- throw("parfor: invalid tid")
- }
-
- // If single-threaded, just execute the for serially.
- body := desc.body
- if desc.nthr == 1 {
- for i := uint32(0); i < desc.cnt; i++ {
- body(desc, i)
- }
- return
- }
-
- me := &desc.thr[tid]
- mypos := &me.pos
- for {
- for {
- // While there is local work,
- // bump low index and execute the iteration.
- pos := atomic.Xadd64(mypos, 1)
- begin := uint32(pos) - 1
- end := uint32(pos >> 32)
- if begin < end {
- body(desc, begin)
- continue
- }
- break
- }
-
- // Out of work, need to steal something.
- idle := false
- for try := uint32(0); ; try++ {
- // If we don't see any work for long enough,
- // increment the done counter...
- if try > desc.nthr*4 && !idle {
- idle = true
- atomic.Xadd(&desc.done, 1)
- }
-
- // ...if all threads have incremented the counter,
- // we are done.
- extra := uint32(0)
- if !idle {
- extra = 1
- }
- if desc.done+extra == desc.nthr {
- if !idle {
- atomic.Xadd(&desc.done, 1)
- }
- goto exit
- }
-
- // Choose a random victim for stealing.
- var begin, end uint32
- victim := fastrand1() % (desc.nthr - 1)
- if victim >= tid {
- victim++
- }
- victimpos := &desc.thr[victim].pos
- for {
- // See if it has any work.
- pos := atomic.Load64(victimpos)
- begin = uint32(pos)
- end = uint32(pos >> 32)
- if begin+1 >= end {
- end = 0
- begin = end
- break
- }
- if idle {
- atomic.Xadd(&desc.done, -1)
- idle = false
- }
- begin2 := begin + (end-begin)/2
- newpos := uint64(begin) | uint64(begin2)<<32
- if atomic.Cas64(victimpos, pos, newpos) {
- begin = begin2
- break
- }
- }
- if begin < end {
- // Has successfully stolen some work.
- if idle {
- throw("parfor: should not be idle")
- }
- atomic.Store64(mypos, uint64(begin)|uint64(end)<<32)
- me.nsteal++
- me.nstealcnt += uint64(end) - uint64(begin)
- break
- }
-
- // Backoff.
- if try < desc.nthr {
- // nothing
- } else if try < 4*desc.nthr {
- me.nprocyield++
- procyield(20)
- } else if !desc.wait {
- // If a caller asked not to wait for the others, exit now
- // (assume that most work is already done at this point).
- if !idle {
- atomic.Xadd(&desc.done, 1)
- }
- goto exit
- } else if try < 6*desc.nthr {
- me.nosyield++
- osyield()
- } else {
- me.nsleep++
- usleep(1)
- }
- }
- }
-
-exit:
- atomic.Xadd64(&desc.nsteal, int64(me.nsteal))
- atomic.Xadd64(&desc.nstealcnt, int64(me.nstealcnt))
- atomic.Xadd64(&desc.nprocyield, int64(me.nprocyield))
- atomic.Xadd64(&desc.nosyield, int64(me.nosyield))
- atomic.Xadd64(&desc.nsleep, int64(me.nsleep))
- me.nsteal = 0
- me.nstealcnt = 0
- me.nprocyield = 0
- me.nosyield = 0
- me.nsleep = 0
-}
+++ /dev/null
-// Copyright 2012 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// The race detector does not understand ParFor synchronization.
-// +build !race
-
-package runtime_test
-
-import (
- . "runtime"
- "testing"
-)
-
-// Simple serial sanity test for parallelfor.
-func TestParFor(t *testing.T) {
- const P = 1
- const N = 20
- data := make([]uint64, N)
- for i := uint64(0); i < N; i++ {
- data[i] = i
- }
- desc := NewParFor(P)
- ParForSetup(desc, P, N, true, func(desc *ParFor, i uint32) {
- data[i] = data[i]*data[i] + 1
- })
- ParForDo(desc)
- for i := uint64(0); i < N; i++ {
- if data[i] != i*i+1 {
- t.Fatalf("Wrong element %d: %d", i, data[i])
- }
- }
-}
-
-// Test that nonblocking parallelfor does not block.
-func TestParFor2(t *testing.T) {
- const P = 7
- const N = 1003
- data := make([]uint64, N)
- for i := uint64(0); i < N; i++ {
- data[i] = i
- }
- desc := NewParFor(P)
- ParForSetup(desc, P, N, false, func(desc *ParFor, i uint32) {
- data[i] = data[i]*data[i] + 1
- })
- for p := 0; p < P; p++ {
- ParForDo(desc)
- }
- for i := uint64(0); i < N; i++ {
- if data[i] != i*i+1 {
- t.Fatalf("Wrong element %d: %d", i, data[i])
- }
- }
-}
-
-// Test that iterations are properly distributed.
-func TestParForSetup(t *testing.T) {
- const P = 11
- const N = 101
- desc := NewParFor(P)
- for n := uint32(0); n < N; n++ {
- for p := uint32(1); p <= P; p++ {
- ParForSetup(desc, p, n, true, func(desc *ParFor, i uint32) {})
- sum := uint32(0)
- size0 := uint32(0)
- end0 := uint32(0)
- for i := uint32(0); i < p; i++ {
- begin, end := ParForIters(desc, i)
- size := end - begin
- sum += size
- if i == 0 {
- size0 = size
- if begin != 0 {
- t.Fatalf("incorrect begin: %d (n=%d, p=%d)", begin, n, p)
- }
- } else {
- if size != size0 && size != size0+1 {
- t.Fatalf("incorrect size: %d/%d (n=%d, p=%d)", size, size0, n, p)
- }
- if begin != end0 {
- t.Fatalf("incorrect begin/end: %d/%d (n=%d, p=%d)", begin, end0, n, p)
- }
- }
- end0 = end
- }
- if sum != n {
- t.Fatalf("incorrect sum: %d/%d (p=%d)", sum, n, p)
- }
- }
- }
-}
-
-// Test parallel parallelfor.
-func TestParForParallel(t *testing.T) {
- N := uint64(1e7)
- if testing.Short() {
- N /= 10
- }
- data := make([]uint64, N)
- for i := uint64(0); i < N; i++ {
- data[i] = i
- }
- P := GOMAXPROCS(-1)
- c := make(chan bool, P)
- desc := NewParFor(uint32(P))
- ParForSetup(desc, uint32(P), uint32(N), false, func(desc *ParFor, i uint32) {
- data[i] = data[i]*data[i] + 1
- })
- for p := 1; p < P; p++ {
- go func() {
- ParForDo(desc)
- c <- true
- }()
- }
- ParForDo(desc)
- for p := 1; p < P; p++ {
- <-c
- }
- for i := uint64(0); i < N; i++ {
- if data[i] != i*i+1 {
- t.Fatalf("Wrong element %d: %d", i, data[i])
- }
- }
-
- data, desc = nil, nil
- GC()
-}