diff --git a/call.go b/call.go index ac2d99d6..5fe5af26 100644 --- a/call.go +++ b/call.go @@ -132,12 +132,12 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli Last: true, Delay: false, } - var put func() for { var ( err error t transport.ClientTransport stream *transport.Stream + put func() ) // TODO(zhaoq): Need a formal spec of retry strategy for non-failFast rpcs. callHdr := &transport.CallHdr{ @@ -149,6 +149,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } t, put, err = cc.getTransport(ctx) if err != nil { + // TODO(zhaoq): Probably revisit the error handling. if err == ErrClientConnClosing { return toRPCErr(err) } diff --git a/clientconn.go b/clientconn.go index 6e6cb38b..81c2c731 100644 --- a/clientconn.go +++ b/clientconn.go @@ -384,6 +384,7 @@ func (cc *ClientConn) newAddrConn(addr Address) error { } } } + // Insert ac into ac.cc.conns. This needs to be done before any getTransport(...) is called. ac.cc.mu.Lock() if ac.cc.conns == nil { ac.cc.mu.Unlock() @@ -620,11 +621,11 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { ac.state = Ready ac.stateCV.Broadcast() ac.transport = newTransport - ac.down = ac.cc.balancer.Up(ac.addr) if ac.ready != nil { close(ac.ready) ac.ready = nil } + ac.down = ac.cc.balancer.Up(ac.addr) ac.mu.Unlock() return nil } diff --git a/test/end2end_test.go b/test/end2end_test.go index 70f85325..ecb1b527 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -340,7 +340,7 @@ func TestReconnectTimeout(t *testing.T) { // Block until reconnect times out. <-waitC if err := conn.Close(); err != nil { - t.Fatalf("%v.Close() = %v, want %v", conn, err, grpc.ErrClientConnClosing) + t.Fatalf("%v.Close() = %v, want ", conn, err) } } @@ -586,6 +586,7 @@ func TestTimeoutOnDeadServer(t *testing.T) { func testTimeoutOnDeadServer(t *testing.T, e env) { te := newTest(t, e) + te.userAgent = testAppUA te.declareLogNoise( "transport: http2Client.notifyError got notified that the client transport was broken EOF", "grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing", @@ -597,13 +598,16 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) + } te.srv.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error // notification in time the failure path of the 1st invoke of // ClientConn.wait hits the deadline exceeded error. ctx, _ := context.WithTimeout(context.Background(), -1) if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(%v, _) = _, error %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) + t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) } awaitNewConnLogOutput() } diff --git a/transport/transport.go b/transport/transport.go index 230e215d..1c9af545 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -392,6 +392,8 @@ type ClientTransport interface { // is called only once. Close() error + // GracefulClose starts to tear down the transport. It stops accepting + // new RPCs and wait the completion of the pending RPCs. GracefulClose() error // Write sends the data for the given stream. A nil stream indicates