t.Fatalf("Got %v allocs, want %v", mallocs, maxMallocs)
}
}
+
+func TestTCPStress(t *testing.T) {
+ const conns = 2
+ const msgs = 1e4
+ const msgLen = 512
+
+ sendMsg := func(c Conn, buf []byte) bool {
+ n, err := c.Write(buf)
+ if n != len(buf) || err != nil {
+ t.Logf("Write failed: %v", err)
+ return false
+ }
+ return true
+ }
+ recvMsg := func(c Conn, buf []byte) bool {
+ for read := 0; read != len(buf); {
+ n, err := c.Read(buf)
+ read += n
+ if err != nil {
+ t.Logf("Read failed: %v", err)
+ return false
+ }
+ }
+ return true
+ }
+
+ ln, err := Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatalf("Listen failed: %v", err)
+ }
+ defer ln.Close()
+ // Acceptor.
+ go func() {
+ for {
+ c, err := ln.Accept()
+ if err != nil {
+ break
+ }
+ // Server connection.
+ go func(c Conn) {
+ defer c.Close()
+ var buf [msgLen]byte
+ for m := 0; m < msgs; m++ {
+ if !recvMsg(c, buf[:]) || !sendMsg(c, buf[:]) {
+ break
+ }
+ }
+ }(c)
+ }
+ }()
+ done := make(chan bool)
+ for i := 0; i < conns; i++ {
+ // Client connection.
+ go func() {
+ defer func() {
+ done <- true
+ }()
+ c, err := Dial("tcp", ln.Addr().String())
+ if err != nil {
+ t.Logf("Dial failed: %v", err)
+ return
+ }
+ defer c.Close()
+ var buf [msgLen]byte
+ for m := 0; m < msgs; m++ {
+ if !sendMsg(c, buf[:]) || !recvMsg(c, buf[:]) {
+ break
+ }
+ }
+ }()
+ }
+ for i := 0; i < conns; i++ {
+ <-done
+ }
+}
M* midle; // idle m's waiting for work
int32 nmidle; // number of idle m's waiting for work
- int32 mlocked; // number of locked m's waiting for work
+ int32 nmidlelocked; // number of locked m's waiting for work
int32 mcount; // number of m's that have been created
P* pidle; // idle P's
static void startlockedm(G*);
static void sysmon(void);
static uint32 retake(int64);
-static void inclocked(int32);
+static void incidlelocked(int32);
static void checkdead(void);
static void exitsyscall0(G*);
static void park0(G*);
p = releasep();
handoffp(p);
}
- inclocked(1);
+ incidlelocked(1);
// Wait until another thread schedules lockedg again.
runtime·notesleep(&m->park);
runtime·noteclear(&m->park);
if(mp->nextp)
runtime·throw("startlockedm: m has p");
// directly handoff current P to the locked m
- inclocked(-1);
+ incidlelocked(-1);
p = releasep();
mp->nextp = p;
runtime·notewakeup(&mp->park);
p = releasep();
handoffp(p);
if(g->isbackground) // do not consider blocked scavenger for deadlock detection
- inclocked(1);
+ incidlelocked(1);
// Resave for traceback during blocked call.
save(runtime·getcallerpc(&dummy), runtime·getcallersp(&dummy));
m->locks++; // see comment in entersyscall
if(g->isbackground) // do not consider blocked scavenger for deadlock detection
- inclocked(-1);
+ incidlelocked(-1);
if(exitsyscallfast()) {
// There's a cpu for us, so we can run.
}
static void
-inclocked(int32 v)
+incidlelocked(int32 v)
{
runtime·lock(&runtime·sched);
- runtime·sched.mlocked += v;
+ runtime·sched.nmidlelocked += v;
if(v > 0)
checkdead();
runtime·unlock(&runtime·sched);
int32 run, grunning, s;
// -1 for sysmon
- run = runtime·sched.mcount - runtime·sched.nmidle - runtime·sched.mlocked - 1;
+ run = runtime·sched.mcount - runtime·sched.nmidle - runtime·sched.nmidlelocked - 1;
if(run > 0)
return;
if(run < 0) {
- runtime·printf("checkdead: nmidle=%d mlocked=%d mcount=%d\n",
- runtime·sched.nmidle, runtime·sched.mlocked, runtime·sched.mcount);
+ runtime·printf("checkdead: nmidle=%d nmidlelocked=%d mcount=%d\n",
+ runtime·sched.nmidle, runtime·sched.nmidlelocked, runtime·sched.mcount);
runtime·throw("checkdead: inconsistent counts");
}
grunning = 0;
if(lastpoll != 0 && lastpoll + 10*1000*1000 > now) {
runtime·cas64(&runtime·sched.lastpoll, lastpoll, now);
gp = runtime·netpoll(false); // non-blocking
- injectglist(gp);
+ if(gp) {
+ // Need to decrement number of idle locked M's
+ // (pretending that one more is running) before injectglist.
+ // Otherwise it can lead to the following situation:
+ // injectglist grabs all P's but before it starts M's to run the P's,
+ // another M returns from syscall, finishes running its G,
+ // observes that there is no work to do and no other running M's
+ // and reports deadlock.
+ incidlelocked(-1);
+ injectglist(gp);
+ incidlelocked(1);
+ }
}
// retake P's blocked in syscalls
// and preempt long running G's
if(p->runqhead == p->runqtail &&
runtime·atomicload(&runtime·sched.nmspinning) + runtime·atomicload(&runtime·sched.npidle) > 0)
continue;
- // Need to increment number of locked M's before the CAS.
+ // Need to decrement number of idle locked M's
+ // (pretending that one more is running) before the CAS.
// Otherwise the M from which we retake can exit the syscall,
// increment nmidle and report deadlock.
- inclocked(-1);
+ incidlelocked(-1);
if(runtime·cas(&p->status, s, Pidle)) {
n++;
handoffp(p);
}
- inclocked(1);
+ incidlelocked(1);
} else if(s == Prunning) {
// Preempt G if it's running for more than 10ms.
if(pd->when + 10*1000*1000 > now)