return n
}
-func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
+func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
+ if wg != nil {
+ defer wg.Done()
+ }
mtype.Lock()
mtype.numCalls++
mtype.Unlock()
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
+ wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
}
continue
}
- go service.call(server, sending, mtype, req, argv, replyv, codec)
+ wg.Add(1)
+ go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
+ // We've seen that there are no more requests.
+ // Wait for responses to be sent before closing codec.
+ wg.Wait()
codec.Close()
}
}
return err
}
- service.call(server, sending, mtype, req, argv, replyv, codec)
+ service.call(server, sending, nil, mtype, req, argv, replyv, codec)
return nil
}
panic("ERROR")
}
+func (t *Arith) SleepMilli(args *Args, reply *Reply) error {
+ time.Sleep(time.Duration(args.A) * time.Millisecond)
+ return nil
+}
+
type hidden int
func (t *hidden) Exported(args Args, reply *Reply) error {
newServer.Accept(l)
}
+func TestShutdown(t *testing.T) {
+ var l net.Listener
+ l, _ = listenTCP()
+ ch := make(chan net.Conn, 1)
+ go func() {
+ defer l.Close()
+ c, err := l.Accept()
+ if err != nil {
+ t.Error(err)
+ }
+ ch <- c
+ }()
+ c, err := net.Dial("tcp", l.Addr().String())
+ if err != nil {
+ t.Fatal(err)
+ }
+ c1 := <-ch
+ if c1 == nil {
+ t.Fatal(err)
+ }
+
+ newServer := NewServer()
+ newServer.Register(new(Arith))
+ go newServer.ServeConn(c1)
+
+ args := &Args{7, 8}
+ reply := new(Reply)
+ client := NewClient(c)
+ err = client.Call("Arith.Add", args, reply)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // On an unloaded system 10ms is usually enough to fail 100% of the time
+ // with a broken server. On a loaded system, a broken server might incorrectly
+ // be reported as passing, but we're OK with that kind of flakiness.
+ // If the code is correct, this test will never fail, regardless of timeout.
+ args.A = 10 // 10 ms
+ done := make(chan *Call, 1)
+ call := client.Go("Arith.SleepMilli", args, reply, done)
+ c.(*net.TCPConn).CloseWrite()
+ <-done
+ if call.Error != nil {
+ t.Fatal(err)
+ }
+}
+
func benchmarkEndToEnd(dial func() (*Client, error), b *testing.B) {
once.Do(startServer)
client, err := dial()