Merge pull request #1035 from adelez/no_failfast

Set failfast to false for stress tests. Also wait for all rpcs are done before ending the test.
This commit is contained in:
Menghan Li
2016-12-28 16:41:21 -08:00
committed by GitHub
2 changed files with 54 additions and 57 deletions

View File

@ -79,8 +79,8 @@ func clientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
} }
// DoEmptyUnaryCall performs a unary RPC with empty request and response messages. // DoEmptyUnaryCall performs a unary RPC with empty request and response messages.
func DoEmptyUnaryCall(tc testpb.TestServiceClient) { func DoEmptyUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...)
if err != nil { if err != nil {
grpclog.Fatal("/TestService/EmptyCall RPC failed: ", err) grpclog.Fatal("/TestService/EmptyCall RPC failed: ", err)
} }
@ -90,14 +90,14 @@ func DoEmptyUnaryCall(tc testpb.TestServiceClient) {
} }
// DoLargeUnaryCall performs a unary RPC with large payload in the request and response. // DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
func DoLargeUnaryCall(tc testpb.TestServiceClient) { func DoLargeUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize) pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{ req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(int32(largeRespSize)), ResponseSize: proto.Int32(int32(largeRespSize)),
Payload: pl, Payload: pl,
} }
reply, err := tc.UnaryCall(context.Background(), req) reply, err := tc.UnaryCall(context.Background(), req, args...)
if err != nil { if err != nil {
grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err) grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
} }
@ -109,8 +109,8 @@ func DoLargeUnaryCall(tc testpb.TestServiceClient) {
} }
// DoClientStreaming performs a client streaming RPC. // DoClientStreaming performs a client streaming RPC.
func DoClientStreaming(tc testpb.TestServiceClient) { func DoClientStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
stream, err := tc.StreamingInputCall(context.Background()) stream, err := tc.StreamingInputCall(context.Background(), args...)
if err != nil { if err != nil {
grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err) grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
} }
@ -137,7 +137,7 @@ func DoClientStreaming(tc testpb.TestServiceClient) {
} }
// DoServerStreaming performs a server streaming RPC. // DoServerStreaming performs a server streaming RPC.
func DoServerStreaming(tc testpb.TestServiceClient) { func DoServerStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
respParam := make([]*testpb.ResponseParameters, len(respSizes)) respParam := make([]*testpb.ResponseParameters, len(respSizes))
for i, s := range respSizes { for i, s := range respSizes {
respParam[i] = &testpb.ResponseParameters{ respParam[i] = &testpb.ResponseParameters{
@ -148,7 +148,7 @@ func DoServerStreaming(tc testpb.TestServiceClient) {
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseParameters: respParam, ResponseParameters: respParam,
} }
stream, err := tc.StreamingOutputCall(context.Background(), req) stream, err := tc.StreamingOutputCall(context.Background(), req, args...)
if err != nil { if err != nil {
grpclog.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err) grpclog.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
} }
@ -181,8 +181,8 @@ func DoServerStreaming(tc testpb.TestServiceClient) {
} }
// DoPingPong performs ping-pong style bi-directional streaming RPC. // DoPingPong performs ping-pong style bi-directional streaming RPC.
func DoPingPong(tc testpb.TestServiceClient) { func DoPingPong(tc testpb.TestServiceClient, args ...grpc.CallOption) {
stream, err := tc.FullDuplexCall(context.Background()) stream, err := tc.FullDuplexCall(context.Background(), args...)
if err != nil { if err != nil {
grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
} }
@ -225,8 +225,8 @@ func DoPingPong(tc testpb.TestServiceClient) {
} }
// DoEmptyStream sets up a bi-directional streaming with zero message. // DoEmptyStream sets up a bi-directional streaming with zero message.
func DoEmptyStream(tc testpb.TestServiceClient) { func DoEmptyStream(tc testpb.TestServiceClient, args ...grpc.CallOption) {
stream, err := tc.FullDuplexCall(context.Background()) stream, err := tc.FullDuplexCall(context.Background(), args...)
if err != nil { if err != nil {
grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
} }
@ -239,9 +239,9 @@ func DoEmptyStream(tc testpb.TestServiceClient) {
} }
// DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout. // DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout.
func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient) { func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOption) {
ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond) ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond)
stream, err := tc.FullDuplexCall(ctx) stream, err := tc.FullDuplexCall(ctx, args...)
if err != nil { if err != nil {
if grpc.Code(err) == codes.DeadlineExceeded { if grpc.Code(err) == codes.DeadlineExceeded {
return return
@ -415,9 +415,9 @@ var (
) )
// DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent. // DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent.
func DoCancelAfterBegin(tc testpb.TestServiceClient) { func DoCancelAfterBegin(tc testpb.TestServiceClient, args ...grpc.CallOption) {
ctx, cancel := context.WithCancel(metadata.NewContext(context.Background(), testMetadata)) ctx, cancel := context.WithCancel(metadata.NewContext(context.Background(), testMetadata))
stream, err := tc.StreamingInputCall(ctx) stream, err := tc.StreamingInputCall(ctx, args...)
if err != nil { if err != nil {
grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err) grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
} }
@ -429,9 +429,9 @@ func DoCancelAfterBegin(tc testpb.TestServiceClient) {
} }
// DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server. // DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server.
func DoCancelAfterFirstResponse(tc testpb.TestServiceClient) { func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOption) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
stream, err := tc.FullDuplexCall(ctx) stream, err := tc.FullDuplexCall(ctx, args...)
if err != nil { if err != nil {
grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err) grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
} }
@ -483,7 +483,7 @@ func validateMetadata(header, trailer metadata.MD) {
} }
// DoCustomMetadata checks that metadata is echoed back to the client. // DoCustomMetadata checks that metadata is echoed back to the client.
func DoCustomMetadata(tc testpb.TestServiceClient) { func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) {
// Testing with UnaryCall. // Testing with UnaryCall.
pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, 1) pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
req := &testpb.SimpleRequest{ req := &testpb.SimpleRequest{
@ -493,11 +493,11 @@ func DoCustomMetadata(tc testpb.TestServiceClient) {
} }
ctx := metadata.NewContext(context.Background(), customMetadata) ctx := metadata.NewContext(context.Background(), customMetadata)
var header, trailer metadata.MD var header, trailer metadata.MD
args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
reply, err := tc.UnaryCall( reply, err := tc.UnaryCall(
ctx, ctx,
req, req,
grpc.Header(&header), args...,
grpc.Trailer(&trailer),
) )
if err != nil { if err != nil {
grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err) grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
@ -510,7 +510,7 @@ func DoCustomMetadata(tc testpb.TestServiceClient) {
validateMetadata(header, trailer) validateMetadata(header, trailer)
// Testing with FullDuplex. // Testing with FullDuplex.
stream, err := tc.FullDuplexCall(ctx) stream, err := tc.FullDuplexCall(ctx, args...)
if err != nil { if err != nil {
grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
} }
@ -545,7 +545,7 @@ func DoCustomMetadata(tc testpb.TestServiceClient) {
} }
// DoStatusCodeAndMessage checks that the status code is propagated back to the client. // DoStatusCodeAndMessage checks that the status code is propagated back to the client.
func DoStatusCodeAndMessage(tc testpb.TestServiceClient) { func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
var code int32 = 2 var code int32 = 2
msg := "test status message" msg := "test status message"
expectedErr := grpc.Errorf(codes.Code(code), msg) expectedErr := grpc.Errorf(codes.Code(code), msg)
@ -557,11 +557,11 @@ func DoStatusCodeAndMessage(tc testpb.TestServiceClient) {
req := &testpb.SimpleRequest{ req := &testpb.SimpleRequest{
ResponseStatus: respStatus, ResponseStatus: respStatus,
} }
if _, err := tc.UnaryCall(context.Background(), req); err == nil || err.Error() != expectedErr.Error() { if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() {
grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr) grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
} }
// Test FullDuplexCall. // Test FullDuplexCall.
stream, err := tc.FullDuplexCall(context.Background()) stream, err := tc.FullDuplexCall(context.Background(), args...)
if err != nil { if err != nil {
grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
} }

View File

@ -225,41 +225,38 @@ func performRPCs(gauge *gauge, conn *grpc.ClientConn, selector *weightedRandomTe
var numCalls int64 var numCalls int64
startTime := time.Now() startTime := time.Now()
for { for {
done := make(chan bool, 1)
go func() {
test := selector.getNextTest() test := selector.getNextTest()
switch test { switch test {
case "empty_unary": case "empty_unary":
interop.DoEmptyUnaryCall(client) interop.DoEmptyUnaryCall(client, grpc.FailFast(false))
case "large_unary": case "large_unary":
interop.DoLargeUnaryCall(client) interop.DoLargeUnaryCall(client, grpc.FailFast(false))
case "client_streaming": case "client_streaming":
interop.DoClientStreaming(client) interop.DoClientStreaming(client, grpc.FailFast(false))
case "server_streaming": case "server_streaming":
interop.DoServerStreaming(client) interop.DoServerStreaming(client, grpc.FailFast(false))
case "ping_pong": case "ping_pong":
interop.DoPingPong(client) interop.DoPingPong(client, grpc.FailFast(false))
case "empty_stream": case "empty_stream":
interop.DoEmptyStream(client) interop.DoEmptyStream(client, grpc.FailFast(false))
case "timeout_on_sleeping_server": case "timeout_on_sleeping_server":
interop.DoTimeoutOnSleepingServer(client) interop.DoTimeoutOnSleepingServer(client, grpc.FailFast(false))
case "cancel_after_begin": case "cancel_after_begin":
interop.DoCancelAfterBegin(client) interop.DoCancelAfterBegin(client, grpc.FailFast(false))
case "cancel_after_first_response": case "cancel_after_first_response":
interop.DoCancelAfterFirstResponse(client) interop.DoCancelAfterFirstResponse(client, grpc.FailFast(false))
case "status_code_and_message": case "status_code_and_message":
interop.DoStatusCodeAndMessage(client) interop.DoStatusCodeAndMessage(client, grpc.FailFast(false))
case "custom_metadata": case "custom_metadata":
interop.DoCustomMetadata(client) interop.DoCustomMetadata(client, grpc.FailFast(false))
} }
done <- true numCalls++
}() gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds()))
select { select {
case <-stop: case <-stop:
return return
case <-done: default:
numCalls++
gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds()))
} }
} }
} }