diff --git a/clientconn.go b/clientconn.go index 0e9b6fb4..55ce8e37 100644 --- a/clientconn.go +++ b/clientconn.go @@ -68,7 +68,7 @@ var ( // errCredentialsConflict indicates that grpc.WithTransportCredentials() // and grpc.WithInsecure() are both called for a connection. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") - // errNetworkIP indicates that the connection is down due to some network I/O error. + // errNetworkIO indicates that the connection is down due to some network I/O error. errNetworkIO = errors.New("grpc: failed with network I/O error") // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. errConnDrain = errors.New("grpc: the connection is drained") @@ -631,13 +631,18 @@ func (ac *addrConn) transportMonitor() { } return case <-t.GoAway(): + // If GoAway happens without any network I/O error, ac is closed without shutting down the + // underlying transport (the transport will be closed when all the pending RPCs finished or + // failed.). + // If GoAway and some network I/O error happen concurrently, ac and its underlying transport + // are closed. + // In both cases, a new ac is created. select { case <-t.Error(): - t.Close() - return + ac.tearDown(errNetworkIO) default: + ac.tearDown(errConnDrain) } - ac.tearDown(errConnDrain) ac.cc.newAddrConn(ac.addr, true) return case <-t.Error(): @@ -646,7 +651,8 @@ func (ac *addrConn) transportMonitor() { t.Close() return case <-t.GoAway(): - t.Close() + ac.tearDown(errNetworkIO) + ac.cc.newAddrConn(ac.addr, true) return default: } diff --git a/test/end2end_test.go b/test/end2end_test.go index 57a47482..17c9e11d 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -710,50 +710,19 @@ func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - // Finish an RPC to make sure the connection is good. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } ch := make(chan struct{}) + // Close ClientConn and Server concurrently. go func() { te.srv.GracefulStop() close(ch) }() - // Loop until the server side GoAway signal is propagated to the client. - for { - ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil { - continue - } - break - } - // Stop the server and close all the connections. - te.srv.Stop() - respParam := []*testpb.ResponseParameters{ - { - Size: proto.Int32(1), - }, - } - payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) - if err != nil { - t.Fatal(err) - } - req := &testpb.StreamingOutputCallRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), - ResponseParameters: respParam, - Payload: payload, - } - if err := stream.Send(req); err == nil { - if _, err := stream.Recv(); err == nil { - t.Fatalf("%v.Recv() = _, %v, want _, ", stream, err) - } - } + go func() { + cc.Close() + }() <-ch - awaitNewConnLogOutput() } func TestConcurrentServerStopAndGoAway(t *testing.T) {