From 97b8c7c5290351577a230e5845c7437d91b57a11 Mon Sep 17 00:00:00 2001
From: Adele Zhou <adelez@google.com>
Date: Tue, 27 Dec 2016 16:38:17 -0800
Subject: [PATCH 1/2] Set failfast to false for stress tests.

---
 interop/test_utils.go | 50 +++++++++++++++++++++----------------------
 stress/client/main.go | 22 +++++++++----------
 2 files changed, 36 insertions(+), 36 deletions(-)

diff --git a/interop/test_utils.go b/interop/test_utils.go
index e9d734e4..da33bacc 100644
--- a/interop/test_utils.go
+++ b/interop/test_utils.go
@@ -79,8 +79,8 @@ func clientNewPayload(t testpb.PayloadType, size int) *testpb.Payload {
 }
 
 // DoEmptyUnaryCall performs a unary RPC with empty request and response messages.
-func DoEmptyUnaryCall(tc testpb.TestServiceClient) {
-	reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{})
+func DoEmptyUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
+	reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, args...)
 	if err != nil {
 		grpclog.Fatal("/TestService/EmptyCall RPC failed: ", err)
 	}
@@ -90,14 +90,14 @@ func DoEmptyUnaryCall(tc testpb.TestServiceClient) {
 }
 
 // DoLargeUnaryCall performs a unary RPC with large payload in the request and response.
-func DoLargeUnaryCall(tc testpb.TestServiceClient) {
+func DoLargeUnaryCall(tc testpb.TestServiceClient, args ...grpc.CallOption) {
 	pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
 	req := &testpb.SimpleRequest{
 		ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
 		ResponseSize: proto.Int32(int32(largeRespSize)),
 		Payload:      pl,
 	}
-	reply, err := tc.UnaryCall(context.Background(), req)
+	reply, err := tc.UnaryCall(context.Background(), req, args...)
 	if err != nil {
 		grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
 	}
@@ -109,8 +109,8 @@ func DoLargeUnaryCall(tc testpb.TestServiceClient) {
 }
 
 // DoClientStreaming performs a client streaming RPC.
-func DoClientStreaming(tc testpb.TestServiceClient) {
-	stream, err := tc.StreamingInputCall(context.Background())
+func DoClientStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
+	stream, err := tc.StreamingInputCall(context.Background(), args...)
 	if err != nil {
 		grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
 	}
@@ -137,7 +137,7 @@ func DoClientStreaming(tc testpb.TestServiceClient) {
 }
 
 // DoServerStreaming performs a server streaming RPC.
-func DoServerStreaming(tc testpb.TestServiceClient) {
+func DoServerStreaming(tc testpb.TestServiceClient, args ...grpc.CallOption) {
 	respParam := make([]*testpb.ResponseParameters, len(respSizes))
 	for i, s := range respSizes {
 		respParam[i] = &testpb.ResponseParameters{
@@ -148,7 +148,7 @@ func DoServerStreaming(tc testpb.TestServiceClient) {
 		ResponseType:       testpb.PayloadType_COMPRESSABLE.Enum(),
 		ResponseParameters: respParam,
 	}
-	stream, err := tc.StreamingOutputCall(context.Background(), req)
+	stream, err := tc.StreamingOutputCall(context.Background(), req, args...)
 	if err != nil {
 		grpclog.Fatalf("%v.StreamingOutputCall(_) = _, %v", tc, err)
 	}
@@ -181,8 +181,8 @@ func DoServerStreaming(tc testpb.TestServiceClient) {
 }
 
 // DoPingPong performs ping-pong style bi-directional streaming RPC.
-func DoPingPong(tc testpb.TestServiceClient) {
-	stream, err := tc.FullDuplexCall(context.Background())
+func DoPingPong(tc testpb.TestServiceClient, args ...grpc.CallOption) {
+	stream, err := tc.FullDuplexCall(context.Background(), args...)
 	if err != nil {
 		grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
 	}
@@ -225,8 +225,8 @@ func DoPingPong(tc testpb.TestServiceClient) {
 }
 
 // DoEmptyStream sets up a bi-directional streaming with zero message.
-func DoEmptyStream(tc testpb.TestServiceClient) {
-	stream, err := tc.FullDuplexCall(context.Background())
+func DoEmptyStream(tc testpb.TestServiceClient, args ...grpc.CallOption) {
+	stream, err := tc.FullDuplexCall(context.Background(), args...)
 	if err != nil {
 		grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
 	}
@@ -239,9 +239,9 @@ func DoEmptyStream(tc testpb.TestServiceClient) {
 }
 
 // DoTimeoutOnSleepingServer performs an RPC on a sleep server which causes RPC timeout.
-func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient) {
+func DoTimeoutOnSleepingServer(tc testpb.TestServiceClient, args ...grpc.CallOption) {
 	ctx, _ := context.WithTimeout(context.Background(), 1*time.Millisecond)
-	stream, err := tc.FullDuplexCall(ctx)
+	stream, err := tc.FullDuplexCall(ctx, args...)
 	if err != nil {
 		if grpc.Code(err) == codes.DeadlineExceeded {
 			return
@@ -415,9 +415,9 @@ var (
 )
 
 // DoCancelAfterBegin cancels the RPC after metadata has been sent but before payloads are sent.
-func DoCancelAfterBegin(tc testpb.TestServiceClient) {
+func DoCancelAfterBegin(tc testpb.TestServiceClient, args ...grpc.CallOption) {
 	ctx, cancel := context.WithCancel(metadata.NewContext(context.Background(), testMetadata))
-	stream, err := tc.StreamingInputCall(ctx)
+	stream, err := tc.StreamingInputCall(ctx, args...)
 	if err != nil {
 		grpclog.Fatalf("%v.StreamingInputCall(_) = _, %v", tc, err)
 	}
@@ -429,9 +429,9 @@ func DoCancelAfterBegin(tc testpb.TestServiceClient) {
 }
 
 // DoCancelAfterFirstResponse cancels the RPC after receiving the first message from the server.
-func DoCancelAfterFirstResponse(tc testpb.TestServiceClient) {
+func DoCancelAfterFirstResponse(tc testpb.TestServiceClient, args ...grpc.CallOption) {
 	ctx, cancel := context.WithCancel(context.Background())
-	stream, err := tc.FullDuplexCall(ctx)
+	stream, err := tc.FullDuplexCall(ctx, args...)
 	if err != nil {
 		grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v", tc, err)
 	}
@@ -483,7 +483,7 @@ func validateMetadata(header, trailer metadata.MD) {
 }
 
 // DoCustomMetadata checks that metadata is echoed back to the client.
-func DoCustomMetadata(tc testpb.TestServiceClient) {
+func DoCustomMetadata(tc testpb.TestServiceClient, args ...grpc.CallOption) {
 	// Testing with UnaryCall.
 	pl := clientNewPayload(testpb.PayloadType_COMPRESSABLE, 1)
 	req := &testpb.SimpleRequest{
@@ -493,11 +493,11 @@ func DoCustomMetadata(tc testpb.TestServiceClient) {
 	}
 	ctx := metadata.NewContext(context.Background(), customMetadata)
 	var header, trailer metadata.MD
+	args = append(args, grpc.Header(&header), grpc.Trailer(&trailer))
 	reply, err := tc.UnaryCall(
 		ctx,
 		req,
-		grpc.Header(&header),
-		grpc.Trailer(&trailer),
+		args...,
 	)
 	if err != nil {
 		grpclog.Fatal("/TestService/UnaryCall RPC failed: ", err)
@@ -510,7 +510,7 @@ func DoCustomMetadata(tc testpb.TestServiceClient) {
 	validateMetadata(header, trailer)
 
 	// Testing with FullDuplex.
-	stream, err := tc.FullDuplexCall(ctx)
+	stream, err := tc.FullDuplexCall(ctx, args...)
 	if err != nil {
 		grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
 	}
@@ -545,7 +545,7 @@ func DoCustomMetadata(tc testpb.TestServiceClient) {
 }
 
 // DoStatusCodeAndMessage checks that the status code is propagated back to the client.
-func DoStatusCodeAndMessage(tc testpb.TestServiceClient) {
+func DoStatusCodeAndMessage(tc testpb.TestServiceClient, args ...grpc.CallOption) {
 	var code int32 = 2
 	msg := "test status message"
 	expectedErr := grpc.Errorf(codes.Code(code), msg)
@@ -557,11 +557,11 @@ func DoStatusCodeAndMessage(tc testpb.TestServiceClient) {
 	req := &testpb.SimpleRequest{
 		ResponseStatus: respStatus,
 	}
-	if _, err := tc.UnaryCall(context.Background(), req); err == nil || err.Error() != expectedErr.Error() {
+	if _, err := tc.UnaryCall(context.Background(), req, args...); err == nil || err.Error() != expectedErr.Error() {
 		grpclog.Fatalf("%v.UnaryCall(_, %v) = _, %v, want _, %v", tc, req, err, expectedErr)
 	}
 	// Test FullDuplexCall.
-	stream, err := tc.FullDuplexCall(context.Background())
+	stream, err := tc.FullDuplexCall(context.Background(), args...)
 	if err != nil {
 		grpclog.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
 	}
diff --git a/stress/client/main.go b/stress/client/main.go
index cac68d7a..eb51ce0b 100644
--- a/stress/client/main.go
+++ b/stress/client/main.go
@@ -230,27 +230,27 @@ func performRPCs(gauge *gauge, conn *grpc.ClientConn, selector *weightedRandomTe
 			test := selector.getNextTest()
 			switch test {
 			case "empty_unary":
-				interop.DoEmptyUnaryCall(client)
+				interop.DoEmptyUnaryCall(client, grpc.FailFast(false))
 			case "large_unary":
-				interop.DoLargeUnaryCall(client)
+				interop.DoLargeUnaryCall(client, grpc.FailFast(false))
 			case "client_streaming":
-				interop.DoClientStreaming(client)
+				interop.DoClientStreaming(client, grpc.FailFast(false))
 			case "server_streaming":
-				interop.DoServerStreaming(client)
+				interop.DoServerStreaming(client, grpc.FailFast(false))
 			case "ping_pong":
-				interop.DoPingPong(client)
+				interop.DoPingPong(client, grpc.FailFast(false))
 			case "empty_stream":
-				interop.DoEmptyStream(client)
+				interop.DoEmptyStream(client, grpc.FailFast(false))
 			case "timeout_on_sleeping_server":
-				interop.DoTimeoutOnSleepingServer(client)
+				interop.DoTimeoutOnSleepingServer(client, grpc.FailFast(false))
 			case "cancel_after_begin":
-				interop.DoCancelAfterBegin(client)
+				interop.DoCancelAfterBegin(client, grpc.FailFast(false))
 			case "cancel_after_first_response":
-				interop.DoCancelAfterFirstResponse(client)
+				interop.DoCancelAfterFirstResponse(client, grpc.FailFast(false))
 			case "status_code_and_message":
-				interop.DoStatusCodeAndMessage(client)
+				interop.DoStatusCodeAndMessage(client, grpc.FailFast(false))
 			case "custom_metadata":
-				interop.DoCustomMetadata(client)
+				interop.DoCustomMetadata(client, grpc.FailFast(false))
 			}
 			done <- true
 		}()

From dd6905b8b2db137d27108a4e91c543ada9c7dda7 Mon Sep 17 00:00:00 2001
From: Adele Zhou <adelez@google.com>
Date: Wed, 28 Dec 2016 11:57:08 -0800
Subject: [PATCH 2/2] Wait until all rpcs are done instead of a hard stop.

---
 stress/client/main.go | 61 ++++++++++++++++++++-----------------------
 1 file changed, 29 insertions(+), 32 deletions(-)

diff --git a/stress/client/main.go b/stress/client/main.go
index eb51ce0b..dff85ffe 100644
--- a/stress/client/main.go
+++ b/stress/client/main.go
@@ -225,41 +225,38 @@ func performRPCs(gauge *gauge, conn *grpc.ClientConn, selector *weightedRandomTe
 	var numCalls int64
 	startTime := time.Now()
 	for {
-		done := make(chan bool, 1)
-		go func() {
-			test := selector.getNextTest()
-			switch test {
-			case "empty_unary":
-				interop.DoEmptyUnaryCall(client, grpc.FailFast(false))
-			case "large_unary":
-				interop.DoLargeUnaryCall(client, grpc.FailFast(false))
-			case "client_streaming":
-				interop.DoClientStreaming(client, grpc.FailFast(false))
-			case "server_streaming":
-				interop.DoServerStreaming(client, grpc.FailFast(false))
-			case "ping_pong":
-				interop.DoPingPong(client, grpc.FailFast(false))
-			case "empty_stream":
-				interop.DoEmptyStream(client, grpc.FailFast(false))
-			case "timeout_on_sleeping_server":
-				interop.DoTimeoutOnSleepingServer(client, grpc.FailFast(false))
-			case "cancel_after_begin":
-				interop.DoCancelAfterBegin(client, grpc.FailFast(false))
-			case "cancel_after_first_response":
-				interop.DoCancelAfterFirstResponse(client, grpc.FailFast(false))
-			case "status_code_and_message":
-				interop.DoStatusCodeAndMessage(client, grpc.FailFast(false))
-			case "custom_metadata":
-				interop.DoCustomMetadata(client, grpc.FailFast(false))
-			}
-			done <- true
-		}()
+		test := selector.getNextTest()
+		switch test {
+		case "empty_unary":
+			interop.DoEmptyUnaryCall(client, grpc.FailFast(false))
+		case "large_unary":
+			interop.DoLargeUnaryCall(client, grpc.FailFast(false))
+		case "client_streaming":
+			interop.DoClientStreaming(client, grpc.FailFast(false))
+		case "server_streaming":
+			interop.DoServerStreaming(client, grpc.FailFast(false))
+		case "ping_pong":
+			interop.DoPingPong(client, grpc.FailFast(false))
+		case "empty_stream":
+			interop.DoEmptyStream(client, grpc.FailFast(false))
+		case "timeout_on_sleeping_server":
+			interop.DoTimeoutOnSleepingServer(client, grpc.FailFast(false))
+		case "cancel_after_begin":
+			interop.DoCancelAfterBegin(client, grpc.FailFast(false))
+		case "cancel_after_first_response":
+			interop.DoCancelAfterFirstResponse(client, grpc.FailFast(false))
+		case "status_code_and_message":
+			interop.DoStatusCodeAndMessage(client, grpc.FailFast(false))
+		case "custom_metadata":
+			interop.DoCustomMetadata(client, grpc.FailFast(false))
+		}
+		numCalls++
+		gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds()))
+
 		select {
 		case <-stop:
 			return
-		case <-done:
-			numCalls++
-			gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds()))
+		default:
 		}
 	}
 }