diff --git a/test/end2end_test.go b/test/end2end_test.go index 4b3c1db6..cfdd6880 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -235,7 +235,7 @@ func TestReconnectTimeout(t *testing.T) { t.Fatalf("Failed to parse listener address: %v", err) } addr := "localhost:" + port - conn, err := grpc.Dial(addr, grpc.WithTimeout(5 * time.Second)) + conn, err := grpc.Dial(addr, grpc.WithTimeout(5*time.Second)) if err != nil { t.Fatalf("Failed to dial to the server %q: %v", addr, err) } @@ -263,7 +263,7 @@ func TestReconnectTimeout(t *testing.T) { } } -func setUp(useTLS bool, maxStream uint32) (s *grpc.Server, tc testpb.TestServiceClient) { +func setUp(useTLS bool, maxStream uint32) (s *grpc.Server, cc *grpc.ClientConn) { lis, err := net.Listen("tcp", ":0") if err != nil { log.Fatalf("Failed to listen: %v", err) @@ -284,25 +284,29 @@ func setUp(useTLS bool, maxStream uint32) (s *grpc.Server, tc testpb.TestService go s.Serve(lis) } addr := "localhost:" + port - var conn *grpc.ClientConn if useTLS { creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com") if err != nil { log.Fatalf("Failed to create credentials %v", err) } - conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(creds)) + cc, err = grpc.Dial(addr, grpc.WithTransportCredentials(creds)) } else { - conn, err = grpc.Dial(addr) + cc, err = grpc.Dial(addr) } if err != nil { log.Fatalf("Dial(%q) = %v", addr, err) } - tc = testpb.NewTestServiceClient(conn) return } +func tearDown(s *grpc.Server, cc *grpc.ClientConn) { + cc.Close() + s.Stop() +} + func TestTimeoutOnDeadServer(t *testing.T) { - s, tc := setUp(false, math.MaxUint32) + s, cc := setUp(false, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) s.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error // notification in time the failure path of the 1st invoke of @@ -311,11 +315,13 @@ func TestTimeoutOnDeadServer(t *testing.T) { if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(%v, _) = _, error %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) } + cc.Close() } func TestEmptyUnary(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) if err != nil || !proto.Equal(&testpb.Empty{}, reply) { t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, ", reply, err, &testpb.Empty{}) @@ -323,8 +329,9 @@ func TestEmptyUnary(t *testing.T) { } func TestFailedEmptyUnary(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != grpc.Errorf(codes.DataLoss, "got extra metadata") { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, grpc.Errorf(codes.DataLoss, "got extra metadata")) @@ -332,8 +339,9 @@ func TestFailedEmptyUnary(t *testing.T) { } func TestLargeUnary(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) argSize := 271828 respSize := 314159 req := &testpb.SimpleRequest{ @@ -353,8 +361,9 @@ func TestLargeUnary(t *testing.T) { } func TestMetadataUnaryRPC(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) argSize := 2718 respSize := 314 req := &testpb.SimpleRequest{ @@ -400,8 +409,9 @@ func performOneRPC(t *testing.T, tc testpb.TestServiceClient, wg *sync.WaitGroup // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy // and error-prone paths. func TestRetry(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) var wg sync.WaitGroup wg.Add(1) go func() { @@ -423,8 +433,9 @@ func TestRetry(t *testing.T) { // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. func TestRPCTimeout(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) argSize := 2718 respSize := 314 req := &testpb.SimpleRequest{ @@ -445,8 +456,9 @@ func TestRPCTimeout(t *testing.T) { } func TestCancel(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) argSize := 2718 respSize := 314 req := &testpb.SimpleRequest{ @@ -470,8 +482,9 @@ var ( ) func TestPingPong(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) stream, err := tc.FullDuplexCall(context.Background()) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) @@ -514,8 +527,9 @@ func TestPingPong(t *testing.T) { } func TestMetadataStreamingRPC(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) stream, err := tc.FullDuplexCall(ctx) if err != nil { @@ -553,8 +567,7 @@ func TestMetadataStreamingRPC(t *testing.T) { stream.CloseSend() }() for { - _, err := stream.Recv() - if err != nil { + if _, err := stream.Recv(); err != nil { break } } @@ -565,8 +578,9 @@ func TestMetadataStreamingRPC(t *testing.T) { } func TestServerStreaming(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) for i, s := range respSizes { respParam[i] = &testpb.ResponseParameters{ @@ -610,8 +624,9 @@ func TestServerStreaming(t *testing.T) { } func TestFailedServerStreaming(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) for i, s := range respSizes { respParam[i] = &testpb.ResponseParameters{ @@ -633,8 +648,9 @@ func TestFailedServerStreaming(t *testing.T) { } func TestClientStreaming(t *testing.T) { - s, tc := setUp(true, math.MaxUint32) - defer s.Stop() + s, cc := setUp(true, math.MaxUint32) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) stream, err := tc.StreamingInputCall(context.Background()) if err != nil { t.Fatalf("%v.StreamingInputCall(_) = _, %v, want ", tc, err) @@ -661,8 +677,9 @@ func TestClientStreaming(t *testing.T) { func TestExceedMaxStreamsLimit(t *testing.T) { // Only allows 1 live stream per server transport. - s, tc := setUp(true, 1) - defer s.Stop() + s, cc := setUp(true, 1) + tc := testpb.NewTestServiceClient(cc) + defer tearDown(s, cc) var err error for { time.Sleep(2 * time.Millisecond)