diff --git a/interop/test_utils.go b/interop/test_utils.go index e9d734e4..da33bacc 100644 --- a/interop/test_utils.go +++ b/interop/test_utils.go @@ -79,8 +79,8 @@ func clientNewPayload(t testpb.PayloadType, size int) *testpb.Payload { } // DoEmptyUnaryCall performs a unary RPC with empty request and response messages. -func DoEmptyUnaryCall(tc testpb.TestServiceClient) { - reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) +func DoEmptyUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) { + reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...) if err != nil { grpclog.Fatal("/TestService/EmptyCall RPC failed: ", err) } @@ -90,14 +90,14 @@ func DoEmptyUnaryCall(tc testpb.TestServiceClient) { } // DoLargeUnaryCall performs a unary RPC with large payload in the request and response. -func DoLargeUnaryCall(tc testpb.TestServiceClient) { +func DoLargeUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) { pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize) req := &testpb.SimpleRequest{ ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), ResponseSize: proto.Int32(int32(largeRespSize)), Payload: pl, } - reply, err := tc.UnaryCall(context.Background(), req) + reply, err := tc.UnaryCall(context.Background(), req, args...) if err != nil { grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err) } @@ -109,8 +109,8 @@ func DoLargeUnaryCall(tc testpb.TestServiceClient) { } // DoClientStreaming performs a client streaming RPC. -func DoClientStreaming(tc testpb.TestServiceClient) { - stream, err := tc.StreamingInputCall(context.Background()) +func DoClientStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) { + stream, err := tc.StreamingInputCall(context.Background(), args...) if err != nil { grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err) } @@ -137,7 +137,7 @@ func DoClientStreaming(tc testpb.TestServiceClient) { } // DoServerStreaming performs a server streaming RPC. -func DoServerStreaming(tc testpb.TestServiceClient) { +func DoServerStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) { respParam := make([]*testpb.ResponseParameters, len(respSizes)) for i, s := range respSizes { respParam[i] = &testpb.ResponseParameters{ @@ -148,7 +148,7 @@ func DoServerStreaming(tc testpb.TestServiceClient) { ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), ResponseParameters: respParam, } - stream, err := tc.StreamingOutputCall(context.Background(), req) + stream, err := tc.StreamingOutputCall(context.Background(), req, args...) if err != nil { grpclog.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err) } @@ -181,8 +181,8 @@ func DoServerStreaming(tc testpb.TestServiceClient) { } // DoPingPong performs ping-pong style bi-directional streaming RPC. -func DoPingPong(tc testpb.TestServiceClient) { - stream, err := tc.FullDuplexCall(context.Background()) +func DoPingPong(tc testpb.TestServiceClient, args ...grpc.CallOption) { + stream, err := tc.FullDuplexCall(context.Background(), args...) if err != nil { grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) } @@ -225,8 +225,8 @@ func DoPingPong(tc testpb.TestServiceClient) { } // DoEmptyStream sets up a bi-directional streaming with zero message. -func DoEmptyStream(tc testpb.TestServiceClient) { - stream, err := tc.FullDuplexCall(context.Background()) +func DoEmptyStream(tc testpb.TestServiceClient, args ...grpc.CallOption) { + stream, err := tc.FullDuplexCall(context.Background(), args...) if err != nil { grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) } @@ -239,9 +239,9 @@ func DoEmptyStream(tc testpb.TestServiceClient) { } // DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout. -func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient) { +func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOption) { ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond) - stream, err := tc.FullDuplexCall(ctx) + stream, err := tc.FullDuplexCall(ctx, args...) if err != nil { if grpc.Code(err) == codes.DeadlineExceeded { return @@ -415,9 +415,9 @@ var ( ) // DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent. -func DoCancelAfterBegin(tc testpb.TestServiceClient) { +func DoCancelAfterBegin(tc testpb.TestServiceClient, args ...grpc.CallOption) { ctx, cancel := context.WithCancel(metadata.NewContext(context.Background(), testMetadata)) - stream, err := tc.StreamingInputCall(ctx) + stream, err := tc.StreamingInputCall(ctx, args...) if err != nil { grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err) } @@ -429,9 +429,9 @@ func DoCancelAfterBegin(tc testpb.TestServiceClient) { } // DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server. -func DoCancelAfterFirstResponse(tc testpb.TestServiceClient) { +func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOption) { ctx, cancel := context.WithCancel(context.Background()) - stream, err := tc.FullDuplexCall(ctx) + stream, err := tc.FullDuplexCall(ctx, args...) if err != nil { grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) } @@ -483,7 +483,7 @@ func validateMetadata(header, trailer metadata.MD) { } // DoCustomMetadata checks that metadata is echoed back to the client. -func DoCustomMetadata(tc testpb.TestServiceClient) { +func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) { // Testing with UnaryCall. pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, 1) req := &testpb.SimpleRequest{ @@ -493,11 +493,11 @@ func DoCustomMetadata(tc testpb.TestServiceClient) { } ctx := metadata.NewContext(context.Background(), customMetadata) var header, trailer metadata.MD + args = append(args, grpc.Header(&header), grpc.Trailer(&trailer)) reply, err := tc.UnaryCall( ctx, req, - grpc.Header(&header), - grpc.Trailer(&trailer), + args..., ) if err != nil { grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err) @@ -510,7 +510,7 @@ func DoCustomMetadata(tc testpb.TestServiceClient) { validateMetadata(header, trailer) // Testing with FullDuplex. - stream, err := tc.FullDuplexCall(ctx) + stream, err := tc.FullDuplexCall(ctx, args...) if err != nil { grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } @@ -545,7 +545,7 @@ func DoCustomMetadata(tc testpb.TestServiceClient) { } // DoStatusCodeAndMessage checks that the status code is propagated back to the client. -func DoStatusCodeAndMessage(tc testpb.TestServiceClient) { +func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) { var code int32 = 2 msg := "test status message" expectedErr := grpc.Errorf(codes.Code(code), msg) @@ -557,11 +557,11 @@ func DoStatusCodeAndMessage(tc testpb.TestServiceClient) { req := &testpb.SimpleRequest{ ResponseStatus: respStatus, } - if _, err := tc.UnaryCall(context.Background(), req); err == nil || err.Error() != expectedErr.Error() { + if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() { grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr) } // Test FullDuplexCall. - stream, err := tc.FullDuplexCall(context.Background()) + stream, err := tc.FullDuplexCall(context.Background(), args...) if err != nil { grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } diff --git a/stress/client/main.go b/stress/client/main.go index cac68d7a..dff85ffe 100644 --- a/stress/client/main.go +++ b/stress/client/main.go @@ -225,41 +225,38 @@ func performRPCs(gauge *gauge, conn *grpc.ClientConn, selector *weightedRandomTe var numCalls int64 startTime := time.Now() for { - done := make(chan bool, 1) - go func() { - test := selector.getNextTest() - switch test { - case "empty_unary": - interop.DoEmptyUnaryCall(client) - case "large_unary": - interop.DoLargeUnaryCall(client) - case "client_streaming": - interop.DoClientStreaming(client) - case "server_streaming": - interop.DoServerStreaming(client) - case "ping_pong": - interop.DoPingPong(client) - case "empty_stream": - interop.DoEmptyStream(client) - case "timeout_on_sleeping_server": - interop.DoTimeoutOnSleepingServer(client) - case "cancel_after_begin": - interop.DoCancelAfterBegin(client) - case "cancel_after_first_response": - interop.DoCancelAfterFirstResponse(client) - case "status_code_and_message": - interop.DoStatusCodeAndMessage(client) - case "custom_metadata": - interop.DoCustomMetadata(client) - } - done <- true - }() + test := selector.getNextTest() + switch test { + case "empty_unary": + interop.DoEmptyUnaryCall(client, grpc.FailFast(false)) + case "large_unary": + interop.DoLargeUnaryCall(client, grpc.FailFast(false)) + case "client_streaming": + interop.DoClientStreaming(client, grpc.FailFast(false)) + case "server_streaming": + interop.DoServerStreaming(client, grpc.FailFast(false)) + case "ping_pong": + interop.DoPingPong(client, grpc.FailFast(false)) + case "empty_stream": + interop.DoEmptyStream(client, grpc.FailFast(false)) + case "timeout_on_sleeping_server": + interop.DoTimeoutOnSleepingServer(client, grpc.FailFast(false)) + case "cancel_after_begin": + interop.DoCancelAfterBegin(client, grpc.FailFast(false)) + case "cancel_after_first_response": + interop.DoCancelAfterFirstResponse(client, grpc.FailFast(false)) + case "status_code_and_message": + interop.DoStatusCodeAndMessage(client, grpc.FailFast(false)) + case "custom_metadata": + interop.DoCustomMetadata(client, grpc.FailFast(false)) + } + numCalls++ + gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds())) + select { case <-stop: return - case <-done: - numCalls++ - gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds())) + default: } } }