From 132e816734de8cb7d5c52ca3a5a707135fc81075 Mon Sep 17 00:00:00 2001
From: Russ Cox
A buffered channel can be used like a semaphore, for instance to
limit throughput. In this example, incoming requests are passed
-to
-Because data synchronization occurs on a receive from a channel
-(that is, the send "happens before" the receive; see
-The Go Memory Model),
-acquisition of the semaphore must be on a channel receive, not a send.
+Once
@@ -2993,10 +2985,10 @@ Here's an obvious solution, but beware it has a bug we'll fix subsequently:
handle
, which receives a value from the channel, processes
-the request, and then sends a value back to the channel
-to ready the "semaphore" for the next consumer.
+to handle
, which sends a value into the channel, processes
+the request, and then receives a value from the channel
+to ready the “semaphore” for the next consumer.
The capacity of the channel buffer limits the number of
-simultaneous calls to process
,
-so during initialization we prime the channel by filling it to capacity.
+simultaneous calls to process
.
var sem = make(chan int, MaxOutstanding)
func handle(r *Request) {
- <-sem // Wait for active queue to drain.
- process(r) // May take a long time.
- sem <- 1 // Done; enable next request to run.
-}
-
-func init() {
- for i := 0; i < MaxOutstanding; i++ {
- sem <- 1
- }
+ sem <- 1 // Wait for active queue to drain.
+ process(r) // May take a long time.
+ <-sem // Done; enable next request to run.
}
func Serve(queue chan *Request) {
@@ -2973,10 +2966,9 @@ func Serve(queue chan *Request) {
MaxOutstanding
handlers are executing process
,
+any more will block trying to send into the filled channel buffer,
+until one of the existing handlers finishes and receives from the buffer.
func Serve(queue chan *Request) {
for req := range queue {
- <-sem
+ sem <- 1
go func() {
process(req) // Buggy; see explanation below.
- sem <- 1
+ <-sem
}()
}
}
@@ -3014,10 +3006,10 @@ to the closure in the goroutine:
func Serve(queue chan *Request) {
for req := range queue {
- <-sem
+ sem <- 1
go func(req *Request) {
process(req)
- sem <- 1
+ <-sem
}(req)
}
}
@@ -3032,11 +3024,11 @@ name, as in this example:
func Serve(queue chan *Request) {
for req := range queue {
- <-sem
req := req // Create new instance of req for the goroutine.
+ sem <- 1
go func() {
process(req)
- sem <- 1
+ <-sem
}()
}
}
diff --git a/doc/go_mem.html b/doc/go_mem.html
index 3e769daeca..69e7c8ce75 100644
--- a/doc/go_mem.html
+++ b/doc/go_mem.html
@@ -274,6 +274,41 @@ then the program would not be guaranteed to print
crash, or do something else.)
+The kth send on a channel with capacity C happens before the k+Cth receive from that channel completes. +
+ ++This rule generalizes the previous rule to buffered channels. +It allows a counting semaphore to be modeled by a buffered channel: +the number of items in the channel corresponds to the semaphore count, +the capacity of the channel corresponds to the semaphore maximum, +sending an item acquires the semaphore, and receiving an item releases +the semaphore. +This is a common idiom for rate-limiting work. +
+ +
+This program starts a goroutine for every entry in the work list, but the
+goroutines coordinate using the limit
channel to ensure
+that at most three are running work functions at a time.
+
+var limit = make(chan int, 3) + +func main() { + for _, w := range work { + go func() { + limit <- 1 + w() + <-limit + }() + } + select{} +} ++
-- 2.48.1