diff --git a/Makefile b/Makefile index 03bb01f0..e9fceab4 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,9 @@ updatedeps: testdeps: go get -d -v -t google.golang.org/grpc/... +benchdeps: testdeps + go get -d -v golang.org/x/perf/cmd/benchstat + updatetestdeps: go get -d -v -t -u -f google.golang.org/grpc/... @@ -32,6 +35,9 @@ test: testdeps testrace: testdeps go test -v -race -cpu 1,4 google.golang.org/grpc/... +benchmark: benchdeps + go test google.golang.org/grpc/benchmark/... -benchmem -bench=. | tee /tmp/tmp.result && benchstat /tmp/tmp.result && rm /tmp/tmp.result + clean: go clean -i google.golang.org/grpc/... @@ -49,4 +55,6 @@ coverage: testdeps test \ testrace \ clean \ - coverage + coverage \ + benchdeps \ + benchmark diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 8a52de00..7afd49a9 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -25,10 +25,14 @@ import ( "fmt" "io" "net" + "sync" + "testing" + "time" "golang.org/x/net/context" "google.golang.org/grpc" testpb "google.golang.org/grpc/benchmark/grpc_testing" + "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/grpclog" ) @@ -217,3 +221,109 @@ func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { } return conn } + +func runUnary(b *testing.B, maxConcurrentCalls, reqSize, respSize int) { + s := stats.AddStats(b, 38) + b.StopTimer() + target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) + defer stopper() + conn := NewClientConn(target, grpc.WithInsecure()) + tc := testpb.NewBenchmarkServiceClient(conn) + + // Warm up connection. + for i := 0; i < 10; i++ { + unaryCaller(tc, reqSize, respSize) + } + ch := make(chan int, maxConcurrentCalls*4) + var ( + mu sync.Mutex + wg sync.WaitGroup + ) + wg.Add(maxConcurrentCalls) + + // Distribute the b.N calls over maxConcurrentCalls workers. + for i := 0; i < maxConcurrentCalls; i++ { + go func() { + for range ch { + start := time.Now() + unaryCaller(tc, reqSize, respSize) + elapse := time.Since(start) + mu.Lock() + s.Add(elapse) + mu.Unlock() + } + wg.Done() + }() + } + b.StartTimer() + for i := 0; i < b.N; i++ { + ch <- i + } + b.StopTimer() + close(ch) + wg.Wait() + conn.Close() +} + +func runStream(b *testing.B, maxConcurrentCalls, reqSize, respSize int) { + s := stats.AddStats(b, 38) + b.StopTimer() + target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) + defer stopper() + conn := NewClientConn(target, grpc.WithInsecure()) + tc := testpb.NewBenchmarkServiceClient(conn) + + // Warm up connection. + stream, err := tc.StreamingCall(context.Background()) + if err != nil { + b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) + } + for i := 0; i < 10; i++ { + streamCaller(stream, reqSize, respSize) + } + + ch := make(chan struct{}, maxConcurrentCalls*4) + var ( + mu sync.Mutex + wg sync.WaitGroup + ) + wg.Add(maxConcurrentCalls) + + // Distribute the b.N calls over maxConcurrentCalls workers. + for i := 0; i < maxConcurrentCalls; i++ { + stream, err := tc.StreamingCall(context.Background()) + if err != nil { + b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) + } + go func() { + for range ch { + start := time.Now() + streamCaller(stream, reqSize, respSize) + elapse := time.Since(start) + mu.Lock() + s.Add(elapse) + mu.Unlock() + } + wg.Done() + }() + } + b.StartTimer() + for i := 0; i < b.N; i++ { + ch <- struct{}{} + } + b.StopTimer() + close(ch) + wg.Wait() + conn.Close() +} +func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { + if err := DoUnaryCall(client, reqSize, respSize); err != nil { + grpclog.Fatalf("DoUnaryCall failed: %v", err) + } +} + +func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) { + if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil { + grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err) + } +} diff --git a/benchmark/benchmark16_test.go b/benchmark/benchmark16_test.go new file mode 100644 index 00000000..2ce668d4 --- /dev/null +++ b/benchmark/benchmark16_test.go @@ -0,0 +1,93 @@ +// +build go1.6,!go1.7 + +package benchmark + +import ( + "os" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/benchmark/stats" +) + +func BenchmarkClientStreamc1(b *testing.B) { + grpc.EnableTracing = true + runStream(b, 1, 1, 1) +} + +func BenchmarkClientStreamc8(b *testing.B) { + grpc.EnableTracing = true + runStream(b, 8, 1, 1) +} + +func BenchmarkClientStreamc64(b *testing.B) { + grpc.EnableTracing = true + runStream(b, 64, 1, 1) +} + +func BenchmarkClientStreamc512(b *testing.B) { + grpc.EnableTracing = true + runStream(b, 512, 1, 1) +} +func BenchmarkClientUnaryc1(b *testing.B) { + grpc.EnableTracing = true + runUnary(b, 1, 1, 1) +} + +func BenchmarkClientUnaryc8(b *testing.B) { + grpc.EnableTracing = true + runUnary(b, 8, 1, 1) +} + +func BenchmarkClientUnaryc64(b *testing.B) { + grpc.EnableTracing = true + runUnary(b, 64, 1, 1) +} + +func BenchmarkClientUnaryc512(b *testing.B) { + grpc.EnableTracing = true + runUnary(b, 512, 1, 1) +} + +func BenchmarkClientStreamNoTracec1(b *testing.B) { + grpc.EnableTracing = false + runStream(b, 1, 1, 1) +} + +func BenchmarkClientStreamNoTracec8(b *testing.B) { + grpc.EnableTracing = false + runStream(b, 8, 1, 1) +} + +func BenchmarkClientStreamNoTracec64(b *testing.B) { + grpc.EnableTracing = false + runStream(b, 64, 1, 1) +} + +func BenchmarkClientStreamNoTracec512(b *testing.B) { + grpc.EnableTracing = false + runStream(b, 512, 1, 1) +} +func BenchmarkClientUnaryNoTracec1(b *testing.B) { + grpc.EnableTracing = false + runUnary(b, 1, 1, 1) +} + +func BenchmarkClientUnaryNoTracec8(b *testing.B) { + grpc.EnableTracing = false + runUnary(b, 8, 1, 1) +} + +func BenchmarkClientUnaryNoTracec64(b *testing.B) { + grpc.EnableTracing = false + runUnary(b, 64, 1, 1) +} + +func BenchmarkClientUnaryNoTracec512(b *testing.B) { + grpc.EnableTracing = false + runUnary(b, 512, 1, 1) +} + +func TestMain(m *testing.M) { + os.Exit(stats.RunTestMain(m)) +} diff --git a/benchmark/benchmark17_test.go b/benchmark/benchmark17_test.go new file mode 100644 index 00000000..31f5e7c9 --- /dev/null +++ b/benchmark/benchmark17_test.go @@ -0,0 +1,44 @@ +// +build go1.7 + +package benchmark + +import ( + "fmt" + "os" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/benchmark/stats" +) + +func BenchmarkClient(b *testing.B) { + maxConcurrentCalls := []int{1, 8, 64, 512} + reqSizeBytes := []int{1, 1024} + reqspSizeBytes := []int{1, 1024} + for _, enableTracing := range []bool{true, false} { + grpc.EnableTracing = enableTracing + tracing := "Tracing" + if !enableTracing { + tracing = "noTrace" + } + for _, maxC := range maxConcurrentCalls { + for _, reqS := range reqSizeBytes { + for _, respS := range reqspSizeBytes { + b.Run(fmt.Sprintf("Unary-%s-maxConcurrentCalls_"+ + "%#v-reqSize_%#vB-respSize_%#vB", tracing, maxC, reqS, respS), func(b *testing.B) { + runUnary(b, maxC, reqS, respS) + }) + b.Run(fmt.Sprintf("Stream-%s-maxConcurrentCalls_"+ + "%#v-reqSize_%#vB-respSize_%#vB", tracing, maxC, reqS, respS), func(b *testing.B) { + runStream(b, maxC, reqS, respS) + }) + } + } + } + } + +} + +func TestMain(m *testing.M) { + os.Exit(stats.RunTestMain(m)) +} diff --git a/benchmark/benchmark_test.go b/benchmark/benchmark_test.go deleted file mode 100644 index 61c9fba0..00000000 --- a/benchmark/benchmark_test.go +++ /dev/null @@ -1,202 +0,0 @@ -package benchmark - -import ( - "os" - "sync" - "testing" - "time" - - "golang.org/x/net/context" - "google.golang.org/grpc" - testpb "google.golang.org/grpc/benchmark/grpc_testing" - "google.golang.org/grpc/benchmark/stats" - "google.golang.org/grpc/grpclog" -) - -func runUnary(b *testing.B, maxConcurrentCalls int) { - s := stats.AddStats(b, 38) - b.StopTimer() - target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) - defer stopper() - conn := NewClientConn(target, grpc.WithInsecure()) - tc := testpb.NewBenchmarkServiceClient(conn) - - // Warm up connection. - for i := 0; i < 10; i++ { - unaryCaller(tc) - } - ch := make(chan int, maxConcurrentCalls*4) - var ( - mu sync.Mutex - wg sync.WaitGroup - ) - wg.Add(maxConcurrentCalls) - - // Distribute the b.N calls over maxConcurrentCalls workers. - for i := 0; i < maxConcurrentCalls; i++ { - go func() { - for range ch { - start := time.Now() - unaryCaller(tc) - elapse := time.Since(start) - mu.Lock() - s.Add(elapse) - mu.Unlock() - } - wg.Done() - }() - } - b.StartTimer() - for i := 0; i < b.N; i++ { - ch <- i - } - b.StopTimer() - close(ch) - wg.Wait() - conn.Close() -} - -func runStream(b *testing.B, maxConcurrentCalls int) { - s := stats.AddStats(b, 38) - b.StopTimer() - target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}, grpc.MaxConcurrentStreams(uint32(maxConcurrentCalls+1))) - defer stopper() - conn := NewClientConn(target, grpc.WithInsecure()) - tc := testpb.NewBenchmarkServiceClient(conn) - - // Warm up connection. - stream, err := tc.StreamingCall(context.Background()) - if err != nil { - b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) - } - for i := 0; i < 10; i++ { - streamCaller(stream) - } - - ch := make(chan struct{}, maxConcurrentCalls*4) - var ( - mu sync.Mutex - wg sync.WaitGroup - ) - wg.Add(maxConcurrentCalls) - - // Distribute the b.N calls over maxConcurrentCalls workers. - for i := 0; i < maxConcurrentCalls; i++ { - stream, err := tc.StreamingCall(context.Background()) - if err != nil { - b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) - } - go func() { - for range ch { - start := time.Now() - streamCaller(stream) - elapse := time.Since(start) - mu.Lock() - s.Add(elapse) - mu.Unlock() - } - wg.Done() - }() - } - b.StartTimer() - for i := 0; i < b.N; i++ { - ch <- struct{}{} - } - b.StopTimer() - close(ch) - wg.Wait() - conn.Close() -} -func unaryCaller(client testpb.BenchmarkServiceClient) { - if err := DoUnaryCall(client, 1, 1); err != nil { - grpclog.Fatalf("DoUnaryCall failed: %v", err) - } -} - -func streamCaller(stream testpb.BenchmarkService_StreamingCallClient) { - if err := DoStreamingRoundTrip(stream, 1, 1); err != nil { - grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err) - } -} - -func BenchmarkClientStreamc1(b *testing.B) { - grpc.EnableTracing = true - runStream(b, 1) -} - -func BenchmarkClientStreamc8(b *testing.B) { - grpc.EnableTracing = true - runStream(b, 8) -} - -func BenchmarkClientStreamc64(b *testing.B) { - grpc.EnableTracing = true - runStream(b, 64) -} - -func BenchmarkClientStreamc512(b *testing.B) { - grpc.EnableTracing = true - runStream(b, 512) -} -func BenchmarkClientUnaryc1(b *testing.B) { - grpc.EnableTracing = true - runUnary(b, 1) -} - -func BenchmarkClientUnaryc8(b *testing.B) { - grpc.EnableTracing = true - runUnary(b, 8) -} - -func BenchmarkClientUnaryc64(b *testing.B) { - grpc.EnableTracing = true - runUnary(b, 64) -} - -func BenchmarkClientUnaryc512(b *testing.B) { - grpc.EnableTracing = true - runUnary(b, 512) -} - -func BenchmarkClientStreamNoTracec1(b *testing.B) { - grpc.EnableTracing = false - runStream(b, 1) -} - -func BenchmarkClientStreamNoTracec8(b *testing.B) { - grpc.EnableTracing = false - runStream(b, 8) -} - -func BenchmarkClientStreamNoTracec64(b *testing.B) { - grpc.EnableTracing = false - runStream(b, 64) -} - -func BenchmarkClientStreamNoTracec512(b *testing.B) { - grpc.EnableTracing = false - runStream(b, 512) -} -func BenchmarkClientUnaryNoTracec1(b *testing.B) { - grpc.EnableTracing = false - runUnary(b, 1) -} - -func BenchmarkClientUnaryNoTracec8(b *testing.B) { - grpc.EnableTracing = false - runUnary(b, 8) -} - -func BenchmarkClientUnaryNoTracec64(b *testing.B) { - grpc.EnableTracing = false - runUnary(b, 64) -} - -func BenchmarkClientUnaryNoTracec512(b *testing.B) { - grpc.EnableTracing = false - runUnary(b, 512) -} - -func TestMain(m *testing.M) { - os.Exit(stats.RunTestMain(m)) -} diff --git a/benchmark/stats/util.go b/benchmark/stats/util.go index a9922f98..28d162fd 100644 --- a/benchmark/stats/util.go +++ b/benchmark/stats/util.go @@ -50,7 +50,7 @@ func AddStatsWithName(b *testing.B, name string, numBuckets int) *Stats { } p := strings.Split(runtime.FuncForPC(pc).Name(), ".") benchName = p[len(p)-1] - if strings.HasPrefix(benchName, "Benchmark") { + if strings.HasPrefix(benchName, "run") { break } } @@ -148,9 +148,8 @@ func splitLines(data []byte, eof bool) (advance int, token []byte, err error) { func injectStatsIfFinished(line string) { injectCond.L.Lock() defer injectCond.L.Unlock() - - // We assume that the benchmark results start with the benchmark name. - if curB == nil || !strings.HasPrefix(line, curBenchName) { + // We assume that the benchmark results start with "Benchmark". + if curB == nil || !strings.HasPrefix(line, "Benchmark") { return }