// This file contains the implementation of Go channels.
+// Invariants:
+// At least one of c.sendq and c.recvq is empty.
+// For buffered channels, also:
+// c.qcount > 0 implies that c.recvq is empty.
+// c.qcount < c.dataqsiz implies that c.sendq is empty.
import "unsafe"
const (
}
lock(&c.lock)
+
if c.closed != 0 {
unlock(&c.lock)
panic("send on closed channel")
}
- if c.dataqsiz == 0 { // synchronous channel
- sg := c.recvq.dequeue()
- if sg != nil { // found a waiting receiver
- if raceenabled {
- racesync(c, sg)
- }
- unlock(&c.lock)
-
- recvg := sg.g
- if sg.elem != nil {
- syncsend(c, sg, ep)
- }
- recvg.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(recvg, 3)
- return true
- }
-
- if !block {
- unlock(&c.lock)
- return false
- }
-
- // no receiver available: block on this channel.
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
- }
- mysg.elem = ep
- mysg.waitlink = nil
- gp.waiting = mysg
- mysg.g = gp
- mysg.selectdone = nil
- gp.param = nil
- c.sendq.enqueue(mysg)
- goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
+ if sg := c.recvq.dequeue(); sg != nil {
+ // Found a waiting receiver. We pass the value we want to send
+ // directly to the receiver, bypassing the channel buffer (if any).
+ send(c, sg, ep, func() { unlock(&c.lock) })
+ return true
+ }
- // someone woke us up.
- if mysg != gp.waiting {
- throw("G waiting list is corrupted!")
- }
- gp.waiting = nil
- if gp.param == nil {
- if c.closed == 0 {
- throw("chansend: spurious wakeup")
- }
- panic("send on closed channel")
+ if c.qcount < c.dataqsiz {
+ // Space is available in the channel buffer. Enqueue the element to send.
+ qp := chanbuf(c, c.sendx)
+ if raceenabled {
+ raceacquire(qp)
+ racerelease(qp)
}
- gp.param = nil
- if mysg.releasetime > 0 {
- blockevent(int64(mysg.releasetime)-t0, 2)
+ typedmemmove(c.elemtype, qp, ep)
+ c.sendx++
+ if c.sendx == c.dataqsiz {
+ c.sendx = 0
}
- releaseSudog(mysg)
+ c.qcount++
+ unlock(&c.lock)
return true
}
- // asynchronous channel
- // wait for some space to write our data
- var t1 int64
- for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup {
- if !block {
- unlock(&c.lock)
- return false
- }
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
- }
- mysg.g = gp
- mysg.elem = nil
- mysg.selectdone = nil
- c.sendq.enqueue(mysg)
- goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3)
-
- // someone woke us up - try again
- if mysg.releasetime > 0 {
- t1 = mysg.releasetime
- }
- releaseSudog(mysg)
- lock(&c.lock)
- if c.closed != 0 {
- unlock(&c.lock)
- panic("send on closed channel")
- }
+ if !block {
+ unlock(&c.lock)
+ return false
}
- // write our data into the channel buffer
- if raceenabled {
- raceacquire(chanbuf(c, c.sendx))
- racerelease(chanbuf(c, c.sendx))
+ // Block on the channel. Some receiver will complete our operation for us.
+ gp := getg()
+ mysg := acquireSudog()
+ mysg.releasetime = 0
+ if t0 != 0 {
+ mysg.releasetime = -1
+ }
+ mysg.elem = ep
+ mysg.waitlink = nil
+ mysg.g = gp
+ mysg.selectdone = nil
+ gp.waiting = mysg
+ gp.param = nil
+ c.sendq.enqueue(mysg)
+ goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
+
+ // someone woke us up.
+ if mysg != gp.waiting {
+ throw("G waiting list is corrupted")
+ }
+ gp.waiting = nil
+ if gp.param == nil {
+ if c.closed == 0 {
+ throw("chansend: spurious wakeup")
+ }
+ panic("send on closed channel")
}
- typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep)
- c.sendx++
- if c.sendx == c.dataqsiz {
- c.sendx = 0
+ gp.param = nil
+ if mysg.releasetime > 0 {
+ blockevent(int64(mysg.releasetime)-t0, 2)
}
- c.qcount++
+ releaseSudog(mysg)
+ return true
+}
- // wake up a waiting receiver
- sg := c.recvq.dequeue()
- if sg != nil {
- recvg := sg.g
- unlock(&c.lock)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
+// send processes a send operation on an empty channel c.
+// The value ep sent by the sender is copied to the receiver sg.
+// The receiver is then woken up to go on its merry way.
+// Channel c must be empty and locked. send unlocks c with unlockf.
+// sg must already be dequeued from c.
+// ep must be non-nil and point to the heap or the caller's stack.
+func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
+ if raceenabled {
+ if c.dataqsiz == 0 {
+ racesync(c, sg)
+ } else {
+ // Pretend we go through the buffer, even though
+ // we copy directly. Note that we need to increment
+ // the head/tail locations only when raceenabled.
+ qp := chanbuf(c, c.recvx)
+ raceacquire(qp)
+ racerelease(qp)
+ raceacquireg(sg.g, qp)
+ racereleaseg(sg.g, qp)
+ c.recvx++
+ if c.recvx == c.dataqsiz {
+ c.recvx = 0
+ }
+ c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
- goready(recvg, 3)
- } else {
- unlock(&c.lock)
}
- if t1 > 0 {
- blockevent(t1-t0, 2)
+ unlockf()
+ if sg.elem != nil {
+ sendDirect(c.elemtype, sg.elem, ep)
+ sg.elem = nil
}
- return true
+ gp := sg.g
+ gp.param = unsafe.Pointer(sg)
+ if sg.releasetime != 0 {
+ sg.releasetime = cputicks()
+ }
+ goready(gp, 4)
}
-func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) {
- // Send on unbuffered channel is the only operation
+func sendDirect(t *_type, dst, src unsafe.Pointer) {
+ // Send on an unbuffered or empty-buffered channel is the only operation
// in the entire runtime where one goroutine
// writes to the stack of another goroutine. The GC assumes that
// stack writes only happen when the goroutine is running and are
// typedmemmove will call heapBitsBulkBarrier, but the target bytes
// are not in the heap, so that will not help. We arrange to call
// memmove and typeBitsBulkBarrier instead.
- memmove(sg.elem, elem, c.elemtype.size)
- typeBitsBulkBarrier(c.elemtype, uintptr(sg.elem), c.elemtype.size)
- sg.elem = nil
+ memmove(dst, src, t.size)
+ typeBitsBulkBarrier(t, uintptr(dst), t.size)
}
func closechan(c *hchan) {
if sg == nil {
break
}
- gp := sg.g
- sg.elem = nil
- gp.param = nil
+ if sg.elem != nil {
+ memclr(sg.elem, uintptr(c.elemsize))
+ sg.elem = nil
+ }
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
+ gp := sg.g
+ gp.param = nil
+ if raceenabled {
+ raceacquireg(gp, unsafe.Pointer(c))
+ }
goready(gp, 3)
}
- // release all writers
+ // release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
- gp := sg.g
sg.elem = nil
- gp.param = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
+ gp := sg.g
+ gp.param = nil
+ if raceenabled {
+ raceacquireg(gp, unsafe.Pointer(c))
+ }
goready(gp, 3)
}
unlock(&c.lock)
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
+// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
- // raceenabled: don't need to check ep, as it is always on the stack.
+ // raceenabled: don't need to check ep, as it is always on the stack
+ // or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
lock(&c.lock)
- if c.dataqsiz == 0 { // synchronous channel
- if c.closed != 0 {
- return recvclosed(c, ep)
- }
-
- sg := c.sendq.dequeue()
- if sg != nil {
- if raceenabled {
- racesync(c, sg)
- }
- unlock(&c.lock)
-
- if ep != nil {
- typedmemmove(c.elemtype, ep, sg.elem)
- }
- sg.elem = nil
- gp := sg.g
- gp.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp, 3)
- selected = true
- received = true
- return
- }
-
- if !block {
- unlock(&c.lock)
- return
- }
- // no sender available: block on this channel.
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
+ if c.closed != 0 && c.qcount == 0 {
+ if raceenabled {
+ raceacquire(unsafe.Pointer(c))
}
- mysg.elem = ep
- mysg.waitlink = nil
- gp.waiting = mysg
- mysg.g = gp
- mysg.selectdone = nil
- gp.param = nil
- c.recvq.enqueue(mysg)
- goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
-
- // someone woke us up
- if mysg != gp.waiting {
- throw("G waiting list is corrupted!")
- }
- gp.waiting = nil
- if mysg.releasetime > 0 {
- blockevent(mysg.releasetime-t0, 2)
- }
- haveData := gp.param != nil
- gp.param = nil
- releaseSudog(mysg)
-
- if haveData {
- // a sender sent us some data. It already wrote to ep.
- selected = true
- received = true
- return
- }
-
- lock(&c.lock)
- if c.closed == 0 {
- throw("chanrecv: spurious wakeup")
+ unlock(&c.lock)
+ if ep != nil {
+ memclr(ep, uintptr(c.elemsize))
}
- return recvclosed(c, ep)
+ return true, false
}
- // asynchronous channel
- // wait for some data to appear
- var t1 int64
- for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup {
- if c.closed != 0 {
- selected, received = recvclosed(c, ep)
- if t1 > 0 {
- blockevent(t1-t0, 2)
- }
- return
- }
-
- if !block {
- unlock(&c.lock)
- return
- }
+ if sg := c.sendq.dequeue(); sg != nil {
+ // Found a waiting sender. If buffer is size 0, receive value
+ // directly from sender. Otherwise, recieve from head of queue
+ // and add sender's value to the tail of the queue (both map to
+ // the same buffer slot because the queue is full).
+ recv(c, sg, ep, func() { unlock(&c.lock) })
+ return true, true
+ }
- // wait for someone to send an element
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
+ if c.qcount > 0 {
+ // Receive directly from queue
+ qp := chanbuf(c, c.recvx)
+ if raceenabled {
+ raceacquire(qp)
+ racerelease(qp)
}
- mysg.elem = nil
- mysg.g = gp
- mysg.selectdone = nil
-
- c.recvq.enqueue(mysg)
- goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3)
-
- // someone woke us up - try again
- if mysg.releasetime > 0 {
- t1 = mysg.releasetime
+ if ep != nil {
+ typedmemmove(c.elemtype, ep, qp)
}
- releaseSudog(mysg)
- lock(&c.lock)
- }
-
- if raceenabled {
- raceacquire(chanbuf(c, c.recvx))
- racerelease(chanbuf(c, c.recvx))
- }
- if ep != nil {
- typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx))
- }
- memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
-
- c.recvx++
- if c.recvx == c.dataqsiz {
- c.recvx = 0
- }
- c.qcount--
-
- // ping a sender now that there is space
- sg := c.sendq.dequeue()
- if sg != nil {
- gp := sg.g
- unlock(&c.lock)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
+ memclr(qp, uintptr(c.elemsize))
+ c.recvx++
+ if c.recvx == c.dataqsiz {
+ c.recvx = 0
}
- goready(gp, 3)
- } else {
+ c.qcount--
unlock(&c.lock)
+ return true, true
}
- if t1 > 0 {
- blockevent(t1-t0, 2)
- }
- selected = true
- received = true
- return
+ if !block {
+ unlock(&c.lock)
+ return false, false
+ }
+
+ // no sender available: block on this channel.
+ gp := getg()
+ mysg := acquireSudog()
+ mysg.releasetime = 0
+ if t0 != 0 {
+ mysg.releasetime = -1
+ }
+ mysg.elem = ep
+ mysg.waitlink = nil
+ gp.waiting = mysg
+ mysg.g = gp
+ mysg.selectdone = nil
+ gp.param = nil
+ c.recvq.enqueue(mysg)
+ goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
+
+ // someone woke us up
+ if mysg != gp.waiting {
+ throw("G waiting list is corrupted")
+ }
+ gp.waiting = nil
+ if mysg.releasetime > 0 {
+ blockevent(mysg.releasetime-t0, 2)
+ }
+ closed := gp.param == nil
+ gp.param = nil
+ releaseSudog(mysg)
+ return true, !closed
}
-// recvclosed is a helper function for chanrecv. Handles cleanup
-// when the receiver encounters a closed channel.
-// Caller must hold c.lock, recvclosed will release the lock.
-func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) {
- if raceenabled {
- raceacquire(unsafe.Pointer(c))
+// recv processes a receive operation on a full channel c.
+// There are 2 parts:
+// 1) The value sent by the sender sg is put into the channel
+// and the sender is woken up to go on its merry way.
+// 2) The value received by the receiver (the current G) is
+// written to ep.
+// For synchronous channels, both values are the same.
+// For asynchronous channels, the receiver gets its data from
+// the channel buffer and the sender's data is put in the
+// channel buffer.
+// Channel c must be full and locked. recv unlocks c with unlockf.
+// sg must already be dequeued from c.
+// A non-nil ep must point to the heap or the caller's stack.
+func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) {
+ if c.dataqsiz == 0 {
+ if raceenabled {
+ racesync(c, sg)
+ }
+ unlockf()
+ if ep != nil {
+ // copy data from sender
+ // ep points to our own stack or heap, so nothing
+ // special (ala sendDirect) needed here.
+ typedmemmove(c.elemtype, ep, sg.elem)
+ }
+ } else {
+ // Queue is full. Take the item at the
+ // head of the queue. Make the sender enqueue
+ // its item at the tail of the queue. Since the
+ // queue is full, those are both the same slot.
+ qp := chanbuf(c, c.recvx)
+ if raceenabled {
+ raceacquire(qp)
+ racerelease(qp)
+ raceacquireg(sg.g, qp)
+ racereleaseg(sg.g, qp)
+ }
+ // copy data from queue to receiver
+ if ep != nil {
+ typedmemmove(c.elemtype, ep, qp)
+ }
+ // copy data from sender to queue
+ typedmemmove(c.elemtype, qp, sg.elem)
+ c.recvx++
+ if c.recvx == c.dataqsiz {
+ c.recvx = 0
+ }
+ c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
+ unlockf()
}
- unlock(&c.lock)
- if ep != nil {
- memclr(ep, uintptr(c.elemsize))
+ sg.elem = nil
+ gp := sg.g
+ gp.param = unsafe.Pointer(sg)
+ if sg.releasetime != 0 {
+ sg.releasetime = cputicks()
}
- return true, false
+ goready(gp, 4)
}
// compiler implements
k *scase
sglist *sudog
sgnext *sudog
- futile byte
+ qp unsafe.Pointer
)
loop:
switch cas.kind {
case caseRecv:
- if c.dataqsiz > 0 {
- if c.qcount > 0 {
- goto asyncrecv
- }
- } else {
- sg = c.sendq.dequeue()
- if sg != nil {
- goto syncrecv
- }
+ sg = c.sendq.dequeue()
+ if sg != nil {
+ goto recv
+ }
+ if c.qcount > 0 {
+ goto bufrecv
}
if c.closed != 0 {
goto rclose
if c.closed != 0 {
goto sclose
}
- if c.dataqsiz > 0 {
- if c.qcount < c.dataqsiz {
- goto asyncsend
- }
- } else {
- sg = c.recvq.dequeue()
- if sg != nil {
- goto syncsend
- }
+ sg = c.recvq.dequeue()
+ if sg != nil {
+ goto send
+ }
+ if c.qcount < c.dataqsiz {
+ goto bufsend
}
case caseDefault:
// pass 2 - enqueue on all chans
gp = getg()
done = 0
+ if gp.waiting != nil {
+ throw("gp.waiting != nil")
+ }
for i := 0; i < int(sel.ncase); i++ {
cas = &scases[pollorder[i]]
c = cas.c
// wait for someone to wake us up
gp.param = nil
- gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2)
+ gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2)
// someone woke us up
sellock(sel)
}
if cas == nil {
- futile = traceFutileWakeup
+ // This can happen if we were woken up by a close().
+ // TODO: figure that out explicitly so we don't need this loop.
goto loop
}
c = cas.c
- if c.dataqsiz > 0 {
- throw("selectgo: shouldn't happen")
- }
-
if debugSelect {
print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
}
selunlock(sel)
goto retc
-asyncrecv:
+bufrecv:
// can receive from buffer
if raceenabled {
if cas.elem != nil {
if cas.receivedp != nil {
*cas.receivedp = true
}
+ qp = chanbuf(c, c.recvx)
if cas.elem != nil {
- typedmemmove(c.elemtype, cas.elem, chanbuf(c, c.recvx))
+ typedmemmove(c.elemtype, cas.elem, qp)
}
- memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
+ memclr(qp, uintptr(c.elemsize))
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
- sg = c.sendq.dequeue()
- if sg != nil {
- gp = sg.g
- selunlock(sel)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp, 3)
- } else {
- selunlock(sel)
- }
+ selunlock(sel)
goto retc
-asyncsend:
+bufsend:
// can send to buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
c.sendx = 0
}
c.qcount++
- sg = c.recvq.dequeue()
- if sg != nil {
- gp = sg.g
- selunlock(sel)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp, 3)
- } else {
- selunlock(sel)
- }
+ selunlock(sel)
goto retc
-syncrecv:
+recv:
// can receive from sleeping sender (sg)
- if raceenabled {
- if cas.elem != nil {
- raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
- }
- racesync(c, sg)
- }
- if msanenabled && cas.elem != nil {
- msanwrite(cas.elem, c.elemtype.size)
- }
- selunlock(sel)
+ recv(c, sg, cas.elem, func() { selunlock(sel) })
if debugSelect {
print("syncrecv: sel=", sel, " c=", c, "\n")
}
if cas.receivedp != nil {
*cas.receivedp = true
}
- if cas.elem != nil {
- typedmemmove(c.elemtype, cas.elem, sg.elem)
- }
- sg.elem = nil
- gp = sg.g
- gp.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp, 3)
goto retc
rclose:
}
goto retc
-syncsend:
- // can send to sleeping receiver (sg)
+send:
+ // can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
- racesync(c, sg)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
- selunlock(sel)
+ send(c, sg, cas.elem, func() { selunlock(sel) })
if debugSelect {
print("syncsend: sel=", sel, " c=", c, "\n")
}
- if sg.elem != nil {
- syncsend(c, sg, cas.elem)
- }
- sg.elem = nil
- gp = sg.g
- gp.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp, 3)
+ goto retc
retc:
if cas.releasetime > 0 {