From: Keith Randall Date: Fri, 6 Nov 2015 08:29:53 +0000 (+0000) Subject: Revert "runtime: simplify buffered channels." X-Git-Tag: go1.6beta1~605 X-Git-Url: http://www.git.cypherpunks.su/?a=commitdiff_plain;h=e9f90ba246dab09943a2578a77b017186e8cb661;p=gostls13.git Revert "runtime: simplify buffered channels." Revert for now until #13169 is understood. This reverts commit 8e496f1d6923172291658f0a785bdb47cc152325. Change-Id: Ib3eb2588824ef47a2b6eb9e377a24e5c817fcc81 Reviewed-on: https://go-review.googlesource.com/16716 Reviewed-by: Keith Randall --- diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 966e4a9743..96ac306624 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -6,11 +6,6 @@ package runtime // This file contains the implementation of Go channels. -// Invariants: -// At least one of c.sendq and c.recvq is empty. -// For buffered channels, also: -// c.qcount > 0 implies that c.recvq is empty. -// c.qcount < c.dataqsiz implies that c.sendq is empty. import "unsafe" const ( @@ -158,117 +153,135 @@ func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uin } lock(&c.lock) - if c.closed != 0 { unlock(&c.lock) panic("send on closed channel") } - if sg := c.recvq.dequeue(); sg != nil { - // Found a waiting receiver. We pass the value we want to send - // directly to the receiver, bypassing the channel buffer (if any). - send(c, sg, ep, func() { unlock(&c.lock) }) - return true - } + if c.dataqsiz == 0 { // synchronous channel + sg := c.recvq.dequeue() + if sg != nil { // found a waiting receiver + if raceenabled { + racesync(c, sg) + } + unlock(&c.lock) - if c.qcount < c.dataqsiz { - // Space is available in the channel buffer. Enqueue the element to send. - qp := chanbuf(c, c.sendx) - if raceenabled { - raceacquire(qp) - racerelease(qp) + recvg := sg.g + if sg.elem != nil { + syncsend(c, sg, ep) + } + recvg.param = unsafe.Pointer(sg) + if sg.releasetime != 0 { + sg.releasetime = cputicks() + } + goready(recvg, 3) + return true } - typedmemmove(c.elemtype, qp, ep) - c.sendx++ - if c.sendx == c.dataqsiz { - c.sendx = 0 + + if !block { + unlock(&c.lock) + return false } - c.qcount++ - unlock(&c.lock) + + // no receiver available: block on this channel. + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.elem = ep + mysg.waitlink = nil + gp.waiting = mysg + mysg.g = gp + mysg.selectdone = nil + gp.param = nil + c.sendq.enqueue(mysg) + goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) + + // someone woke us up. + if mysg != gp.waiting { + throw("G waiting list is corrupted!") + } + gp.waiting = nil + if gp.param == nil { + if c.closed == 0 { + throw("chansend: spurious wakeup") + } + panic("send on closed channel") + } + gp.param = nil + if mysg.releasetime > 0 { + blockevent(int64(mysg.releasetime)-t0, 2) + } + releaseSudog(mysg) return true } - if !block { - unlock(&c.lock) - return false + // asynchronous channel + // wait for some space to write our data + var t1 int64 + for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup { + if !block { + unlock(&c.lock) + return false + } + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.g = gp + mysg.elem = nil + mysg.selectdone = nil + c.sendq.enqueue(mysg) + goparkunlock(&c.lock, "chan send", traceEvGoBlockSend|futile, 3) + + // someone woke us up - try again + if mysg.releasetime > 0 { + t1 = mysg.releasetime + } + releaseSudog(mysg) + lock(&c.lock) + if c.closed != 0 { + unlock(&c.lock) + panic("send on closed channel") + } } - // Block on the channel. Some receiver will complete our operation for us. - gp := getg() - mysg := acquireSudog() - mysg.releasetime = 0 - if t0 != 0 { - mysg.releasetime = -1 - } - mysg.elem = ep - mysg.waitlink = nil - mysg.g = gp - mysg.selectdone = nil - gp.waiting = mysg - gp.param = nil - c.sendq.enqueue(mysg) - goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) - - // someone woke us up. - if mysg != gp.waiting { - throw("G waiting list is corrupted") - } - gp.waiting = nil - if gp.param == nil { - if c.closed == 0 { - throw("chansend: spurious wakeup") - } - panic("send on closed channel") + // write our data into the channel buffer + if raceenabled { + raceacquire(chanbuf(c, c.sendx)) + racerelease(chanbuf(c, c.sendx)) } - gp.param = nil - if mysg.releasetime > 0 { - blockevent(int64(mysg.releasetime)-t0, 2) + typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep) + c.sendx++ + if c.sendx == c.dataqsiz { + c.sendx = 0 } - releaseSudog(mysg) - return true -} + c.qcount++ -// send processes a send operation on an empty channel c. -// The value ep sent by the sender is copied to the receiver sg. -// The receiver is then woken up to go on its merry way. -// Channel c must be empty and locked. send unlocks c with unlockf. -// sg must already be dequeued from c. -// ep must be non-nil and point to the heap or the caller's stack. -func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) { - if raceenabled { - if c.dataqsiz == 0 { - racesync(c, sg) - } else { - // Pretend we go through the buffer, even though - // we copy directly. Note that we need to increment - // the head/tail locations only when raceenabled. - qp := chanbuf(c, c.recvx) - raceacquire(qp) - racerelease(qp) - raceacquireg(sg.g, qp) - racereleaseg(sg.g, qp) - c.recvx++ - if c.recvx == c.dataqsiz { - c.recvx = 0 - } - c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz + // wake up a waiting receiver + sg := c.recvq.dequeue() + if sg != nil { + recvg := sg.g + unlock(&c.lock) + if sg.releasetime != 0 { + sg.releasetime = cputicks() } + goready(recvg, 3) + } else { + unlock(&c.lock) } - unlockf() - if sg.elem != nil { - sendDirect(c.elemtype, sg.elem, ep) - sg.elem = nil - } - gp := sg.g - gp.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() + if t1 > 0 { + blockevent(t1-t0, 2) } - goready(gp, 4) + return true } -func sendDirect(t *_type, dst, src unsafe.Pointer) { - // Send on an unbuffered or empty-buffered channel is the only operation +func syncsend(c *hchan, sg *sudog, elem unsafe.Pointer) { + // Send on unbuffered channel is the only operation // in the entire runtime where one goroutine // writes to the stack of another goroutine. The GC assumes that // stack writes only happen when the goroutine is running and are @@ -277,8 +290,9 @@ func sendDirect(t *_type, dst, src unsafe.Pointer) { // typedmemmove will call heapBitsBulkBarrier, but the target bytes // are not in the heap, so that will not help. We arrange to call // memmove and typeBitsBulkBarrier instead. - memmove(dst, src, t.size) - typeBitsBulkBarrier(t, uintptr(dst), t.size) + memmove(sg.elem, elem, c.elemtype.size) + typeBitsBulkBarrier(c.elemtype, uintptr(sg.elem), c.elemtype.size) + sg.elem = nil } func closechan(c *hchan) { @@ -306,36 +320,27 @@ func closechan(c *hchan) { if sg == nil { break } - if sg.elem != nil { - memclr(sg.elem, uintptr(c.elemsize)) - sg.elem = nil - } - if sg.releasetime != 0 { - sg.releasetime = cputicks() - } gp := sg.g + sg.elem = nil gp.param = nil - if raceenabled { - raceacquireg(gp, unsafe.Pointer(c)) + if sg.releasetime != 0 { + sg.releasetime = cputicks() } goready(gp, 3) } - // release all writers (they will panic) + // release all writers for { sg := c.sendq.dequeue() if sg == nil { break } + gp := sg.g sg.elem = nil + gp.param = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } - gp := sg.g - gp.param = nil - if raceenabled { - raceacquireg(gp, unsafe.Pointer(c)) - } goready(gp, 3) } unlock(&c.lock) @@ -358,10 +363,8 @@ func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) { // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). -// A non-nil ep must point to the heap or the caller's stack. func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { - // raceenabled: don't need to check ep, as it is always on the stack - // or is new memory allocated by reflect. + // raceenabled: don't need to check ep, as it is always on the stack. if debugChan { print("chanrecv: chan=", c, "\n") @@ -399,139 +402,167 @@ func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, r } lock(&c.lock) + if c.dataqsiz == 0 { // synchronous channel + if c.closed != 0 { + return recvclosed(c, ep) + } + + sg := c.sendq.dequeue() + if sg != nil { + if raceenabled { + racesync(c, sg) + } + unlock(&c.lock) - if c.closed != 0 && c.qcount == 0 { - if raceenabled { - raceacquire(unsafe.Pointer(c)) + if ep != nil { + typedmemmove(c.elemtype, ep, sg.elem) + } + sg.elem = nil + gp := sg.g + gp.param = unsafe.Pointer(sg) + if sg.releasetime != 0 { + sg.releasetime = cputicks() + } + goready(gp, 3) + selected = true + received = true + return } - unlock(&c.lock) - if ep != nil { - memclr(ep, uintptr(c.elemsize)) + + if !block { + unlock(&c.lock) + return } - return true, false - } - if sg := c.sendq.dequeue(); sg != nil { - // Found a waiting sender. If buffer is size 0, receive value - // directly from sender. Otherwise, recieve from head of queue - // and add sender's value to the tail of the queue (both map to - // the same buffer slot because the queue is full). - recv(c, sg, ep, func() { unlock(&c.lock) }) - return true, true + // no sender available: block on this channel. + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 + } + mysg.elem = ep + mysg.waitlink = nil + gp.waiting = mysg + mysg.g = gp + mysg.selectdone = nil + gp.param = nil + c.recvq.enqueue(mysg) + goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) + + // someone woke us up + if mysg != gp.waiting { + throw("G waiting list is corrupted!") + } + gp.waiting = nil + if mysg.releasetime > 0 { + blockevent(mysg.releasetime-t0, 2) + } + haveData := gp.param != nil + gp.param = nil + releaseSudog(mysg) + + if haveData { + // a sender sent us some data. It already wrote to ep. + selected = true + received = true + return + } + + lock(&c.lock) + if c.closed == 0 { + throw("chanrecv: spurious wakeup") + } + return recvclosed(c, ep) } - if c.qcount > 0 { - // Receive directly from queue - qp := chanbuf(c, c.recvx) - if raceenabled { - raceacquire(qp) - racerelease(qp) + // asynchronous channel + // wait for some data to appear + var t1 int64 + for futile := byte(0); c.qcount <= 0; futile = traceFutileWakeup { + if c.closed != 0 { + selected, received = recvclosed(c, ep) + if t1 > 0 { + blockevent(t1-t0, 2) + } + return + } + + if !block { + unlock(&c.lock) + return } - if ep != nil { - typedmemmove(c.elemtype, ep, qp) + + // wait for someone to send an element + gp := getg() + mysg := acquireSudog() + mysg.releasetime = 0 + if t0 != 0 { + mysg.releasetime = -1 } - memclr(qp, uintptr(c.elemsize)) - c.recvx++ - if c.recvx == c.dataqsiz { - c.recvx = 0 + mysg.elem = nil + mysg.g = gp + mysg.selectdone = nil + + c.recvq.enqueue(mysg) + goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv|futile, 3) + + // someone woke us up - try again + if mysg.releasetime > 0 { + t1 = mysg.releasetime } - c.qcount-- - unlock(&c.lock) - return true, true + releaseSudog(mysg) + lock(&c.lock) } - if !block { - unlock(&c.lock) - return false, false - } - - // no sender available: block on this channel. - gp := getg() - mysg := acquireSudog() - mysg.releasetime = 0 - if t0 != 0 { - mysg.releasetime = -1 - } - mysg.elem = ep - mysg.waitlink = nil - gp.waiting = mysg - mysg.g = gp - mysg.selectdone = nil - gp.param = nil - c.recvq.enqueue(mysg) - goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) - - // someone woke us up - if mysg != gp.waiting { - throw("G waiting list is corrupted") - } - gp.waiting = nil - if mysg.releasetime > 0 { - blockevent(mysg.releasetime-t0, 2) - } - closed := gp.param == nil - gp.param = nil - releaseSudog(mysg) - return true, !closed -} + if raceenabled { + raceacquire(chanbuf(c, c.recvx)) + racerelease(chanbuf(c, c.recvx)) + } + if ep != nil { + typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx)) + } + memclr(chanbuf(c, c.recvx), uintptr(c.elemsize)) -// recv processes a receive operation on a full channel c. -// There are 2 parts: -// 1) The value sent by the sender sg is put into the channel -// and the sender is woken up to go on its merry way. -// 2) The value received by the receiver (the current G) is -// written to ep. -// For synchronous channels, both values are the same. -// For asynchronous channels, the receiver gets its data from -// the channel buffer and the sender's data is put in the -// channel buffer. -// Channel c must be full and locked. recv unlocks c with unlockf. -// sg must already be dequeued from c. -// A non-nil ep must point to the heap or the caller's stack. -func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) { - if c.dataqsiz == 0 { - if raceenabled { - racesync(c, sg) - } - unlockf() - if ep != nil { - // copy data from sender - // ep points to our own stack or heap, so nothing - // special (ala sendDirect) needed here. - typedmemmove(c.elemtype, ep, sg.elem) + c.recvx++ + if c.recvx == c.dataqsiz { + c.recvx = 0 + } + c.qcount-- + + // ping a sender now that there is space + sg := c.sendq.dequeue() + if sg != nil { + gp := sg.g + unlock(&c.lock) + if sg.releasetime != 0 { + sg.releasetime = cputicks() } + goready(gp, 3) } else { - // Queue is full. Take the item at the - // head of the queue. Make the sender enqueue - // its item at the tail of the queue. Since the - // queue is full, those are both the same slot. - qp := chanbuf(c, c.recvx) - if raceenabled { - raceacquire(qp) - racerelease(qp) - raceacquireg(sg.g, qp) - racereleaseg(sg.g, qp) - } - // copy data from queue to receiver - if ep != nil { - typedmemmove(c.elemtype, ep, qp) - } - // copy data from sender to queue - typedmemmove(c.elemtype, qp, sg.elem) - c.recvx++ - if c.recvx == c.dataqsiz { - c.recvx = 0 - } - c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz - unlockf() + unlock(&c.lock) } - sg.elem = nil - gp := sg.g - gp.param = unsafe.Pointer(sg) - if sg.releasetime != 0 { - sg.releasetime = cputicks() + + if t1 > 0 { + blockevent(t1-t0, 2) + } + selected = true + received = true + return +} + +// recvclosed is a helper function for chanrecv. Handles cleanup +// when the receiver encounters a closed channel. +// Caller must hold c.lock, recvclosed will release the lock. +func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) { + if raceenabled { + raceacquire(unsafe.Pointer(c)) + } + unlock(&c.lock) + if ep != nil { + memclr(ep, uintptr(c.elemsize)) } - goready(gp, 4) + return true, false } // compiler implements diff --git a/src/runtime/select.go b/src/runtime/select.go index 508a19b630..8b6c3ed4c0 100644 --- a/src/runtime/select.go +++ b/src/runtime/select.go @@ -304,7 +304,7 @@ func selectgoImpl(sel *hselect) (uintptr, uint16) { k *scase sglist *sudog sgnext *sudog - qp unsafe.Pointer + futile byte ) loop: @@ -317,12 +317,15 @@ loop: switch cas.kind { case caseRecv: - sg = c.sendq.dequeue() - if sg != nil { - goto recv - } - if c.qcount > 0 { - goto bufrecv + if c.dataqsiz > 0 { + if c.qcount > 0 { + goto asyncrecv + } + } else { + sg = c.sendq.dequeue() + if sg != nil { + goto syncrecv + } } if c.closed != 0 { goto rclose @@ -335,12 +338,15 @@ loop: if c.closed != 0 { goto sclose } - sg = c.recvq.dequeue() - if sg != nil { - goto send - } - if c.qcount < c.dataqsiz { - goto bufsend + if c.dataqsiz > 0 { + if c.qcount < c.dataqsiz { + goto asyncsend + } + } else { + sg = c.recvq.dequeue() + if sg != nil { + goto syncsend + } } case caseDefault: @@ -357,9 +363,6 @@ loop: // pass 2 - enqueue on all chans gp = getg() done = 0 - if gp.waiting != nil { - throw("gp.waiting != nil") - } for i := 0; i < int(sel.ncase); i++ { cas = &scases[pollorder[i]] c = cas.c @@ -386,7 +389,7 @@ loop: // wait for someone to wake us up gp.param = nil - gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect, 2) + gopark(selparkcommit, unsafe.Pointer(sel), "select", traceEvGoBlockSelect|futile, 2) // someone woke us up sellock(sel) @@ -429,13 +432,16 @@ loop: } if cas == nil { - // This can happen if we were woken up by a close(). - // TODO: figure that out explicitly so we don't need this loop. + futile = traceFutileWakeup goto loop } c = cas.c + if c.dataqsiz > 0 { + throw("selectgo: shouldn't happen") + } + if debugSelect { print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n") } @@ -464,7 +470,7 @@ loop: selunlock(sel) goto retc -bufrecv: +asyncrecv: // can receive from buffer if raceenabled { if cas.elem != nil { @@ -479,20 +485,29 @@ bufrecv: if cas.receivedp != nil { *cas.receivedp = true } - qp = chanbuf(c, c.recvx) if cas.elem != nil { - typedmemmove(c.elemtype, cas.elem, qp) + typedmemmove(c.elemtype, cas.elem, chanbuf(c, c.recvx)) } - memclr(qp, uintptr(c.elemsize)) + memclr(chanbuf(c, c.recvx), uintptr(c.elemsize)) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- - selunlock(sel) + sg = c.sendq.dequeue() + if sg != nil { + gp = sg.g + selunlock(sel) + if sg.releasetime != 0 { + sg.releasetime = cputicks() + } + goready(gp, 3) + } else { + selunlock(sel) + } goto retc -bufsend: +asyncsend: // can send to buffer if raceenabled { raceacquire(chanbuf(c, c.sendx)) @@ -508,18 +523,47 @@ bufsend: c.sendx = 0 } c.qcount++ - selunlock(sel) + sg = c.recvq.dequeue() + if sg != nil { + gp = sg.g + selunlock(sel) + if sg.releasetime != 0 { + sg.releasetime = cputicks() + } + goready(gp, 3) + } else { + selunlock(sel) + } goto retc -recv: +syncrecv: // can receive from sleeping sender (sg) - recv(c, sg, cas.elem, func() { selunlock(sel) }) + if raceenabled { + if cas.elem != nil { + raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc) + } + racesync(c, sg) + } + if msanenabled && cas.elem != nil { + msanwrite(cas.elem, c.elemtype.size) + } + selunlock(sel) if debugSelect { print("syncrecv: sel=", sel, " c=", c, "\n") } if cas.receivedp != nil { *cas.receivedp = true } + if cas.elem != nil { + typedmemmove(c.elemtype, cas.elem, sg.elem) + } + sg.elem = nil + gp = sg.g + gp.param = unsafe.Pointer(sg) + if sg.releasetime != 0 { + sg.releasetime = cputicks() + } + goready(gp, 3) goto retc rclose: @@ -536,19 +580,29 @@ rclose: } goto retc -send: - // can send to a sleeping receiver (sg) +syncsend: + // can send to sleeping receiver (sg) if raceenabled { raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc) + racesync(c, sg) } if msanenabled { msanread(cas.elem, c.elemtype.size) } - send(c, sg, cas.elem, func() { selunlock(sel) }) + selunlock(sel) if debugSelect { print("syncsend: sel=", sel, " c=", c, "\n") } - goto retc + if sg.elem != nil { + syncsend(c, sg, cas.elem) + } + sg.elem = nil + gp = sg.g + gp.param = unsafe.Pointer(sg) + if sg.releasetime != 0 { + sg.releasetime = cputicks() + } + goready(gp, 3) retc: if cas.releasetime > 0 {