From 7db1564ba1229bc42919bb1f6d9c4186f3aa8678 Mon Sep 17 00:00:00 2001 From: ZhouyihaiDing Date: Thu, 24 Aug 2017 10:08:43 -0700 Subject: [PATCH] benchmark: add benchmain/main.go to run benchmark with flag set (#1352) --- benchmark/benchmain/main.go | 380 ++++++++++++++++++++++++++++++++++ benchmark/benchmark.go | 5 +- benchmark/benchmark16_test.go | 32 +-- 3 files changed, 399 insertions(+), 18 deletions(-) create mode 100644 benchmark/benchmain/main.go diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go new file mode 100644 index 00000000..0945d9c2 --- /dev/null +++ b/benchmark/benchmain/main.go @@ -0,0 +1,380 @@ +// +build go1.7 + +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package main + +import ( + "errors" + "flag" + "fmt" + "net" + "os" + "reflect" + "runtime" + "runtime/pprof" + "strconv" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + bm "google.golang.org/grpc/benchmark" + testpb "google.golang.org/grpc/benchmark/grpc_testing" + "google.golang.org/grpc/benchmark/latency" + "google.golang.org/grpc/benchmark/stats" + "google.golang.org/grpc/grpclog" +) + +var ( + // runMode{runUnary, runStream} + runMode = []bool{true, true} + enableTrace = []bool{false} + // When set the latency to 0 (no delay), the result is slower than the real result with no delay + // because latency simulation section has extra operations + ltc = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. + kbps = []int{0, 10240} // if non-positive, infinite + mtu = []int{0} // if non-positive, infinite + maxConcurrentCalls = []int{1, 8, 64, 512} + reqSizeBytes = []int{1, 1024, 1024 * 1024} + respSizeBytes = []int{1, 1024, 1024 * 1024} + timeout = []time.Duration{1 * time.Second} + memProfile, cpuProfile string + memProfileRate int + enableCompressor = []bool{false} +) + +func unaryBenchmark(startTimer func(), stopTimer func(int32), benchFeatures bm.Features, timeout time.Duration, s *stats.Stats) { + caller, close := makeFuncUnary(benchFeatures) + defer close() + runBenchmark(caller, startTimer, stopTimer, benchFeatures, timeout, s) +} + +func streamBenchmark(startTimer func(), stopTimer func(int32), benchFeatures bm.Features, timeout time.Duration, s *stats.Stats) { + caller, close := makeFuncStream(benchFeatures) + defer close() + runBenchmark(caller, startTimer, stopTimer, benchFeatures, timeout, s) +} + +func makeFuncUnary(benchFeatures bm.Features) (func(int), func()) { + nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} + opts := []grpc.DialOption{} + sopts := []grpc.ServerOption{} + if benchFeatures.EnableCompressor { + sopts = append(sopts, + grpc.RPCCompressor(grpc.NewGZIPCompressor()), + grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), + ) + opts = append(opts, + grpc.WithCompressor(grpc.NewGZIPCompressor()), + grpc.WithDecompressor(grpc.NewGZIPDecompressor()), + ) + } + sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) + opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) { + return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout) + })) + opts = append(opts, grpc.WithInsecure()) + + target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...) + conn := bm.NewClientConn(target, opts...) + tc := testpb.NewBenchmarkServiceClient(conn) + return func(int) { + unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) + }, func() { + conn.Close() + stopper() + } +} + +func makeFuncStream(benchFeatures bm.Features) (func(int), func()) { + fmt.Println(benchFeatures) + nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} + opts := []grpc.DialOption{} + sopts := []grpc.ServerOption{} + if benchFeatures.EnableCompressor { + sopts = append(sopts, + grpc.RPCCompressor(grpc.NewGZIPCompressor()), + grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), + ) + opts = append(opts, + grpc.WithCompressor(grpc.NewGZIPCompressor()), + grpc.WithDecompressor(grpc.NewGZIPDecompressor()), + ) + } + sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) + opts = append(opts, grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) { + return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout) + })) + opts = append(opts, grpc.WithInsecure()) + + target, stopper := bm.StartServer(bm.ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, sopts...) + conn := bm.NewClientConn(target, opts...) + tc := testpb.NewBenchmarkServiceClient(conn) + streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls) + for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { + stream, err := tc.StreamingCall(context.Background()) + if err != nil { + grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) + } + streams[i] = stream + } + return func(pos int) { + streamCaller(streams[pos], benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) + }, func() { + conn.Close() + stopper() + } +} + +func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { + if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil { + grpclog.Fatalf("DoUnaryCall failed: %v", err) + } +} + +func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) { + if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil { + grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err) + } +} + +func runBenchmark(caller func(int), startTimer func(), stopTimer func(int32), benchFeatures bm.Features, timeout time.Duration, s *stats.Stats) { + // Warm up connection. + for i := 0; i < 10; i++ { + caller(0) + } + // Run benchmark. + startTimer() + var ( + mu sync.Mutex + wg sync.WaitGroup + ) + wg.Add(benchFeatures.MaxConcurrentCalls) + bmEnd := time.Now().Add(timeout) + var count int32 + for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { + go func(pos int) { + for { + t := time.Now() + if t.After(bmEnd) { + break + } + start := time.Now() + caller(pos) + elapse := time.Since(start) + atomic.AddInt32(&count, 1) + mu.Lock() + s.Add(elapse) + mu.Unlock() + } + wg.Done() + }(i) + } + wg.Wait() + stopTimer(count) +} + +// Initiate main function to get settings of features. +func init() { + var runUnary, runStream bool + var traceMode, compressorMode bool + var readLatency, readTimeout string + var readKbps, readMtu, readMaxConcurrentCalls, readReqSizeBytes, readReqspSizeBytes intSliceType + flag.BoolVar(&runUnary, "runUnary", false, "runUnary") + flag.BoolVar(&runStream, "runStream", false, "runStream") + flag.BoolVar(&traceMode, "traceMode", false, "traceMode") + flag.StringVar(&readLatency, "latency", "", "latency") + flag.StringVar(&readTimeout, "timeout", "", "timeout") + flag.Var(&readKbps, "kbps", "kbps") + flag.Var(&readMtu, "mtu", "mtu") + flag.Var(&readMaxConcurrentCalls, "maxConcurrentCalls", "maxConcurrentCalls") + flag.Var(&readReqSizeBytes, "reqSizeBytes", "reqSizeBytes") + flag.Var(&readReqspSizeBytes, "reqspSizeBytes", "reqspSizeBytes") + flag.StringVar(&memProfile, "memProfile", "", "memProfile") + flag.IntVar(&memProfileRate, "memProfileRate", 0, "memProfileRate") + flag.StringVar(&cpuProfile, "cpuProfile", "", "cpuProfile") + flag.BoolVar(&compressorMode, "compressorMode", false, "compressorMode") + flag.Parse() + // If no flags related to mode are set, it runs both by default. + if runUnary || runStream { + runMode[0] = runUnary + runMode[1] = runStream + } + if traceMode { + enableTrace = []bool{true} + } + if compressorMode { + enableCompressor = []bool{true} + } + // Time input formats as (time + unit). + readTimeFromInput(<c, readLatency) + readTimeFromInput(&timeout, readTimeout) + readIntFromIntSlice(&kbps, readKbps) + readIntFromIntSlice(&mtu, readMtu) + readIntFromIntSlice(&maxConcurrentCalls, readMaxConcurrentCalls) + readIntFromIntSlice(&reqSizeBytes, readReqSizeBytes) + readIntFromIntSlice(&respSizeBytes, readReqspSizeBytes) +} + +type intSliceType []int + +func (intSlice *intSliceType) String() string { + return fmt.Sprintf("%v", *intSlice) +} + +func (intSlice *intSliceType) Set(value string) error { + if len(*intSlice) > 0 { + return errors.New("interval flag already set") + } + for _, num := range strings.Split(value, ",") { + next, err := strconv.Atoi(num) + if err != nil { + return err + } + *intSlice = append(*intSlice, next) + } + return nil +} + +func readIntFromIntSlice(values *[]int, replace intSliceType) { + // If not set replace in the flag, just return to run the default settings. + if len(replace) == 0 { + return + } + *values = replace +} + +func readTimeFromInput(values *[]time.Duration, replace string) { + if strings.Compare(replace, "") != 0 { + *values = []time.Duration{} + for _, ltc := range strings.Split(replace, ",") { + duration, err := time.ParseDuration(ltc) + if err != nil { + fmt.Println(err) + return + } + *values = append(*values, duration) + } + } +} + +func main() { + before() + featuresPos := make([]int, 8) + // 0:enableTracing 1:ltc 2:kbps 3:mtu 4:maxC 5:reqSize 6:respSize + featuresNum := []int{len(enableTrace), len(ltc), len(kbps), len(mtu), + len(maxConcurrentCalls), len(reqSizeBytes), len(respSizeBytes), len(enableCompressor)} + initalPos := make([]int, len(featuresPos)) + s := stats.NewStats(38) + var memStats runtime.MemStats + var results testing.BenchmarkResult + var startAllocs, startBytes uint64 + var startTime time.Time + start := true + var startTimer = func() { + runtime.ReadMemStats(&memStats) + startAllocs = memStats.Mallocs + startBytes = memStats.TotalAlloc + startTime = time.Now() + } + var stopTimer = func(count int32) { + runtime.ReadMemStats(&memStats) + results = testing.BenchmarkResult{N: int(count), T: time.Now().Sub(startTime), + Bytes: 0, MemAllocs: memStats.Mallocs - startAllocs, MemBytes: memStats.TotalAlloc - startBytes} + } + // Run benchmarks + for !reflect.DeepEqual(featuresPos, initalPos) || start { + start = false + tracing := "Trace" + if !enableTrace[featuresPos[0]] { + tracing = "noTrace" + } + benchFeature := bm.Features{ + EnableTrace: enableTrace[featuresPos[0]], + Latency: ltc[featuresPos[1]], + Kbps: kbps[featuresPos[2]], + Mtu: mtu[featuresPos[3]], + MaxConcurrentCalls: maxConcurrentCalls[featuresPos[4]], + ReqSizeBytes: reqSizeBytes[featuresPos[5]], + RespSizeBytes: respSizeBytes[featuresPos[6]], + EnableCompressor: enableCompressor[featuresPos[7]], + } + + grpc.EnableTracing = enableTrace[featuresPos[0]] + if runMode[0] { + fmt.Printf("Unary-%s-%s:\n", tracing, benchFeature.String()) + unaryBenchmark(startTimer, stopTimer, benchFeature, timeout[0], s) + fmt.Println(results.String(), results.MemString()) + fmt.Println(s.String()) + s.Clear() + } + + if runMode[1] { + fmt.Printf("Stream-%s-%s\n", tracing, benchFeature.String()) + streamBenchmark(startTimer, stopTimer, benchFeature, timeout[0], s) + fmt.Println(results.String(), results.MemString()) + fmt.Println(s.String()) + s.Clear() + } + bm.AddOne(featuresPos, featuresNum) + } + after() + +} + +func before() { + if memProfileRate > 0 { + runtime.MemProfileRate = memProfileRate + } + if cpuProfile != "" { + f, err := os.Create(cpuProfile) + if err != nil { + fmt.Fprintf(os.Stderr, "testing: %s\n", err) + return + } + if err := pprof.StartCPUProfile(f); err != nil { + fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err) + f.Close() + return + } + } +} + +func after() { + if cpuProfile != "" { + pprof.StopCPUProfile() // flushes profile to disk + } + if memProfile != "" { + f, err := os.Create(memProfile) + if err != nil { + fmt.Fprintf(os.Stderr, "testing: %s\n", err) + os.Exit(2) + } + runtime.GC() // materialize all statistics + if err = pprof.WriteHeapProfile(f); err != nil { + fmt.Fprintf(os.Stderr, "testing: can't write %s: %s\n", memProfile, err) + os.Exit(2) + } + f.Close() + } +} diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 4c758225..d4a4d766 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -48,12 +48,13 @@ type Features struct { MaxConcurrentCalls int ReqSizeBytes int RespSizeBytes int + EnableCompressor bool } func (f Features) String() string { return fmt.Sprintf("latency_%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+ - "%#v-reqSize_%#vB-respSize_%#vB", - f.Latency.String(), f.Kbps, f.Mtu, f.MaxConcurrentCalls, f.ReqSizeBytes, f.RespSizeBytes) + "%#v-reqSize_%#vB-respSize_%#vB-Compressor_%t", + f.Latency.String(), f.Kbps, f.Mtu, f.MaxConcurrentCalls, f.ReqSizeBytes, f.RespSizeBytes, f.EnableCompressor) } // AddOne add 1 to the features slice diff --git a/benchmark/benchmark16_test.go b/benchmark/benchmark16_test.go index ba4797d6..4882ae18 100644 --- a/benchmark/benchmark16_test.go +++ b/benchmark/benchmark16_test.go @@ -30,80 +30,80 @@ import ( func BenchmarkClientStreamc1(b *testing.B) { grpc.EnableTracing = true - runStream(b, Features{true, 0, 0, 0, 1, 1, 1}) + runStream(b, Features{true, 0, 0, 0, 1, 1, 1, false}) } func BenchmarkClientStreamc8(b *testing.B) { grpc.EnableTracing = true - runStream(b, Features{true, 0, 0, 0, 8, 1, 1}) + runStream(b, Features{true, 0, 0, 0, 8, 1, 1, false}) } func BenchmarkClientStreamc64(b *testing.B) { grpc.EnableTracing = true - runStream(b, Features{true, 0, 0, 0, 64, 1, 1}) + runStream(b, Features{true, 0, 0, 0, 64, 1, 1, false}) } func BenchmarkClientStreamc512(b *testing.B) { grpc.EnableTracing = true - runStream(b, Features{true, 0, 0, 0, 512, 1, 1}) + runStream(b, Features{true, 0, 0, 0, 512, 1, 1, false}) } func BenchmarkClientUnaryc1(b *testing.B) { grpc.EnableTracing = true - runStream(b, Features{true, 0, 0, 0, 1, 1, 1}) + runStream(b, Features{true, 0, 0, 0, 1, 1, 1, false}) } func BenchmarkClientUnaryc8(b *testing.B) { grpc.EnableTracing = true - runStream(b, Features{true, 0, 0, 0, 8, 1, 1}) + runStream(b, Features{true, 0, 0, 0, 8, 1, 1, false}) } func BenchmarkClientUnaryc64(b *testing.B) { grpc.EnableTracing = true - runStream(b, Features{true, 0, 0, 0, 64, 1, 1}) + runStream(b, Features{true, 0, 0, 0, 64, 1, 1, false}) } func BenchmarkClientUnaryc512(b *testing.B) { grpc.EnableTracing = true - runStream(b, Features{true, 0, 0, 0, 512, 1, 1}) + runStream(b, Features{true, 0, 0, 0, 512, 1, 1, false}) } func BenchmarkClientStreamNoTracec1(b *testing.B) { grpc.EnableTracing = false - runStream(b, Features{false, 0, 0, 0, 1, 1, 1}) + runStream(b, Features{false, 0, 0, 0, 1, 1, 1, false}) } func BenchmarkClientStreamNoTracec8(b *testing.B) { grpc.EnableTracing = false - runStream(b, Features{false, 0, 0, 0, 8, 1, 1}) + runStream(b, Features{false, 0, 0, 0, 8, 1, 1, false}) } func BenchmarkClientStreamNoTracec64(b *testing.B) { grpc.EnableTracing = false - runStream(b, Features{false, 0, 0, 0, 64, 1, 1}) + runStream(b, Features{false, 0, 0, 0, 64, 1, 1, false}) } func BenchmarkClientStreamNoTracec512(b *testing.B) { grpc.EnableTracing = false - runStream(b, Features{false, 0, 0, 0, 512, 1, 1}) + runStream(b, Features{false, 0, 0, 0, 512, 1, 1, false}) } func BenchmarkClientUnaryNoTracec1(b *testing.B) { grpc.EnableTracing = false - runStream(b, Features{false, 0, 0, 0, 1, 1, 1}) + runStream(b, Features{false, 0, 0, 0, 1, 1, 1, false}) } func BenchmarkClientUnaryNoTracec8(b *testing.B) { grpc.EnableTracing = false - runStream(b, Features{false, 0, 0, 0, 8, 1, 1}) + runStream(b, Features{false, 0, 0, 0, 8, 1, 1, false}) } func BenchmarkClientUnaryNoTracec64(b *testing.B) { grpc.EnableTracing = false - runStream(b, Features{false, 0, 0, 0, 64, 1, 1}) + runStream(b, Features{false, 0, 0, 0, 64, 1, 1, false}) } func BenchmarkClientUnaryNoTracec512(b *testing.B) { grpc.EnableTracing = false - runStream(b, Features{false, 0, 0, 0, 512, 1, 1}) + runStream(b, Features{false, 0, 0, 0, 512, 1, 1, false}) } func TestMain(m *testing.M) {