Merge pull request #215 from dsymonds/bench-fix

Fix data race in benchmark_test.go.
This commit is contained in:
Qi Zhao
2015-06-07 21:11:20 -07:00
2 changed files with 9 additions and 4 deletions

View File

@ -120,7 +120,7 @@ func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) {
} }
} }
// DoStreamingRoundTrip performs a round trip for a single streaming rpc. // DoStreamingRoundTrip performs a round trip for a single streaming rpc.
func DoStreamingRoundTrip(tc testpb.TestServiceClient, stream testpb.TestService_StreamingCallClient, reqSize, respSize int) { func DoStreamingRoundTrip(tc testpb.TestServiceClient, stream testpb.TestService_StreamingCallClient, reqSize, respSize int) {
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize) pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{ req := &testpb.SimpleRequest{
@ -129,10 +129,10 @@ func DoStreamingRoundTrip(tc testpb.TestServiceClient, stream testpb.TestService
Payload: pl, Payload: pl,
} }
if err := stream.Send(req); err != nil { if err := stream.Send(req); err != nil {
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) grpclog.Fatalf("StreamingCall(_).Send: %v", err)
} }
if _, err := stream.Recv(); err != nil { if _, err := stream.Recv(); err != nil {
grpclog.Fatal("%v.StreamingCall(_) = _, %v", tc, err) grpclog.Fatalf("StreamingCall(_).Recv: %v", err)
} }
} }

View File

@ -62,11 +62,12 @@ func runStream(b *testing.B, maxConcurrentCalls int) {
defer stopper() defer stopper()
conn := NewClientConn(target) conn := NewClientConn(target)
tc := testpb.NewTestServiceClient(conn) tc := testpb.NewTestServiceClient(conn)
// Warm up connection.
stream, err := tc.StreamingCall(context.Background()) stream, err := tc.StreamingCall(context.Background())
if err != nil { if err != nil {
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
} }
// Warm up connection.
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
streamCaller(tc, stream) streamCaller(tc, stream)
} }
@ -81,6 +82,10 @@ func runStream(b *testing.B, maxConcurrentCalls int) {
// Distribute the b.N calls over maxConcurrentCalls workers. // Distribute the b.N calls over maxConcurrentCalls workers.
for i := 0; i < maxConcurrentCalls; i++ { for i := 0; i < maxConcurrentCalls; i++ {
go func() { go func() {
stream, err := tc.StreamingCall(context.Background())
if err != nil {
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
for _ = range ch { for _ = range ch {
start := time.Now() start := time.Now()
streamCaller(tc, stream) streamCaller(tc, stream)