From b681a11d086e32e91ee8e61afc22a2c3c7870aba Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 10 Jun 2019 09:53:35 -0700 Subject: [PATCH] A few more improvements to the benchmark code. (#2840) * A few more improvements to the benchmark code. * In benchmain/main.go: * Define types for function arguments to make code more readable * Significantly simplify the code as a result of stats package refactor. * In benchresult/main.go * Simplify code as a result of stats package refactor. * In stats/stats.go * Define and expose featureIndex enum. * Refactor the types used to store features, results, stats etc. * Provide easy to use methods to add/modify/read/dump stats info. * Delete stats/util.go - dead code. --- benchmark/benchmain/main.go | 339 +++++++++----------- benchmark/benchresult/main.go | 79 ++--- benchmark/stats/stats.go | 588 ++++++++++++++++++++-------------- benchmark/stats/util.go | 208 ------------ 4 files changed, 543 insertions(+), 671 deletions(-) delete mode 100644 benchmark/stats/util.go diff --git a/benchmark/benchmain/main.go b/benchmark/benchmain/main.go index d08dbba7..4dddb291 100644 --- a/benchmark/benchmain/main.go +++ b/benchmark/benchmain/main.go @@ -55,7 +55,6 @@ import ( "strings" "sync" "sync/atomic" - "testing" "time" "google.golang.org/grpc" @@ -66,6 +65,7 @@ import ( "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/test/bufconn" ) @@ -96,6 +96,8 @@ var ( cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided") benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file") useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O") + enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+ + "Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.") ) const ( @@ -120,6 +122,8 @@ const ( networkLongHaul = "Longhaul" numStatsBuckets = 10 + warmupCallCount = 10 + warmuptime = time.Second ) var ( @@ -139,6 +143,8 @@ var ( networkModeWAN: latency.WAN, networkLongHaul: latency.Longhaul, } + keepaliveTime = 10 * time.Second // this is the minimum allowed + keepaliveTimeout = 1 * time.Second ) // runModes indicates the workloads to run. This is initialized with a call to @@ -169,76 +175,86 @@ func runModesFromWorkloads(workload string) runModes { return r } -func unaryBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchTime time.Duration, s *stats.Stats) uint64 { - caller, cleanup := makeFuncUnary(benchFeatures) +type startFunc func(mode string, bf stats.Features) +type stopFunc func(count uint64) +type ucStopFunc func(req uint64, resp uint64) +type rpcCallFunc func(pos int) +type rpcSendFunc func(pos int) +type rpcRecvFunc func(pos int) +type rpcCleanupFunc func() + +func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) { + caller, cleanup := makeFuncUnary(bf) defer cleanup() - return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchTime, s) + runBenchmark(caller, start, stop, bf, s, workloadsUnary) } -func streamBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchTime time.Duration, s *stats.Stats) uint64 { - caller, cleanup := makeFuncStream(benchFeatures) +func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) { + caller, cleanup := makeFuncStream(bf) defer cleanup() - return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchTime, s) + runBenchmark(caller, start, stop, bf, s, workloadsStreaming) } -func unconstrainedStreamBenchmark(benchFeatures stats.Features, warmuptime, benchTime time.Duration) (uint64, uint64) { - var sender, recver func(int) - var cleanup func() - if benchFeatures.EnablePreloader { - sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(benchFeatures) +func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) { + var sender rpcSendFunc + var recver rpcRecvFunc + var cleanup rpcCleanupFunc + if bf.EnablePreloader { + sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf) } else { - sender, recver, cleanup = makeFuncUnconstrainedStream(benchFeatures) + sender, recver, cleanup = makeFuncUnconstrainedStream(bf) } defer cleanup() - var ( - wg sync.WaitGroup - requestCount uint64 - responseCount uint64 - ) - wg.Add(2 * benchFeatures.MaxConcurrentCalls) - - // Resets the counters once warmed up + var req, resp uint64 go func() { + // Resets the counters once warmed up <-time.NewTimer(warmuptime).C - atomic.StoreUint64(&requestCount, 0) - atomic.StoreUint64(&responseCount, 0) + atomic.StoreUint64(&req, 0) + atomic.StoreUint64(&resp, 0) + start(workloadsUnconstrained, bf) }() - bmEnd := time.Now().Add(benchTime + warmuptime) - for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { + bmEnd := time.Now().Add(bf.BenchTime + warmuptime) + var wg sync.WaitGroup + wg.Add(2 * bf.MaxConcurrentCalls) + for i := 0; i < bf.MaxConcurrentCalls; i++ { go func(pos int) { + defer wg.Done() for { t := time.Now() if t.After(bmEnd) { - break + return } sender(pos) - atomic.AddUint64(&requestCount, 1) + atomic.AddUint64(&req, 1) } - wg.Done() }(i) go func(pos int) { + defer wg.Done() for { t := time.Now() if t.After(bmEnd) { - break + return } recver(pos) - atomic.AddUint64(&responseCount, 1) + atomic.AddUint64(&resp, 1) } - wg.Done() }(i) } wg.Wait() - return requestCount, responseCount + stop(req, resp) } -func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, func()) { - nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} +// makeClient returns a gRPC client for the grpc.testing.BenchmarkService +// service. The client is configured using the different options in the passed +// 'bf'. Also returns a cleanup function to close the client and release +// resources. +func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) { + nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU} opts := []grpc.DialOption{} sopts := []grpc.ServerOption{} - if benchFeatures.ModeCompressor == compModeNop { + if bf.ModeCompressor == compModeNop { sopts = append(sopts, grpc.RPCCompressor(nopCompressor{}), grpc.RPCDecompressor(nopDecompressor{}), @@ -248,7 +264,7 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu grpc.WithDecompressor(nopDecompressor{}), ) } - if benchFeatures.ModeCompressor == compModeGzip { + if bf.ModeCompressor == compModeGzip { sopts = append(sopts, grpc.RPCCompressor(grpc.NewGZIPCompressor()), grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), @@ -258,11 +274,20 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu grpc.WithDecompressor(grpc.NewGZIPDecompressor()), ) } - sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) + if bf.EnableKeepalive { + opts = append(opts, + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: keepaliveTime, + Timeout: keepaliveTimeout, + PermitWithoutStream: true, + }), + ) + } + sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1))) opts = append(opts, grpc.WithInsecure()) var lis net.Listener - if benchFeatures.UseBufConn { + if bf.UseBufConn { bcLis := bufconn.Listen(256 * 1024) lis = bcLis opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { @@ -289,18 +314,18 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu } } -func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) { - tc, cleanup := makeClient(benchFeatures) +func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { + tc, cleanup := makeClient(bf) return func(int) { - unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) + unaryCaller(tc, bf.ReqSizeBytes, bf.RespSizeBytes) }, cleanup } -func makeFuncStream(benchFeatures stats.Features) (func(int), func()) { - tc, cleanup := makeClient(benchFeatures) +func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { + tc, cleanup := makeClient(bf) - streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls) - for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { + streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) + for i := 0; i < bf.MaxConcurrentCalls; i++ { stream, err := tc.StreamingCall(context.Background()) if err != nil { grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) @@ -309,12 +334,12 @@ func makeFuncStream(benchFeatures stats.Features) (func(int), func()) { } return func(pos int) { - streamCaller(streams[pos], benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) + streamCaller(streams[pos], bf.ReqSizeBytes, bf.RespSizeBytes) }, cleanup } -func makeFuncUnconstrainedStreamPreloaded(benchFeatures stats.Features) (func(int), func(int), func()) { - streams, req, cleanup := setupUnconstrainedStream(benchFeatures) +func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { + streams, req, cleanup := setupUnconstrainedStream(bf) preparedMsg := make([]*grpc.PreparedMsg, len(streams)) for i, stream := range streams { @@ -332,8 +357,8 @@ func makeFuncUnconstrainedStreamPreloaded(benchFeatures stats.Features) (func(in }, cleanup } -func makeFuncUnconstrainedStream(benchFeatures stats.Features) (func(int), func(int), func()) { - streams, req, cleanup := setupUnconstrainedStream(benchFeatures) +func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { + streams, req, cleanup := setupUnconstrainedStream(bf) return func(pos int) { streams[pos].Send(req) @@ -342,11 +367,11 @@ func makeFuncUnconstrainedStream(benchFeatures stats.Features) (func(int), func( }, cleanup } -func setupUnconstrainedStream(benchFeatures stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, func()) { - tc, cleanup := makeClient(benchFeatures) +func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { + tc, cleanup := makeClient(bf) - streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls) - for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { + streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) + for i := 0; i < bf.MaxConcurrentCalls; i++ { stream, err := tc.UnconstrainedStreamingCall(context.Background()) if err != nil { grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err) @@ -354,16 +379,18 @@ func setupUnconstrainedStream(benchFeatures stats.Features) ([]testpb.BenchmarkS streams[i] = stream } - pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, benchFeatures.ReqSizeBytes) + pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes) req := &testpb.SimpleRequest{ ResponseType: pl.Type, - ResponseSize: int32(benchFeatures.RespSizeBytes), + ResponseSize: int32(bf.RespSizeBytes), Payload: pl, } return streams, req, cleanup } +// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and +// request and response sizes. func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil { grpclog.Fatalf("DoUnaryCall failed: %v", err) @@ -376,41 +403,36 @@ func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, r } } -func runBenchmark(caller func(int), startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchTime time.Duration, s *stats.Stats) uint64 { +func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) { // Warm up connection. - for i := 0; i < 10; i++ { + for i := 0; i < warmupCallCount; i++ { caller(0) } + // Run benchmark. - startTimer() - var ( - mu sync.Mutex - wg sync.WaitGroup - ) - wg.Add(benchFeatures.MaxConcurrentCalls) - bmEnd := time.Now().Add(benchTime) + start(mode, bf) + var wg sync.WaitGroup + wg.Add(bf.MaxConcurrentCalls) + bmEnd := time.Now().Add(bf.BenchTime) var count uint64 - for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { + for i := 0; i < bf.MaxConcurrentCalls; i++ { go func(pos int) { + defer wg.Done() for { t := time.Now() if t.After(bmEnd) { - break + return } start := time.Now() caller(pos) elapse := time.Since(start) atomic.AddUint64(&count, 1) - mu.Lock() - s.Add(elapse) - mu.Unlock() + s.AddDuration(elapse) } - wg.Done() }(i) } wg.Wait() - stopTimer(count) - return count + stop(count) } // benchOpts represents all configurable options available while running this @@ -424,6 +446,7 @@ type benchOpts struct { networkMode string benchmarkResultFile string useBufconn bool + enableKeepalive bool features *featureOpts } @@ -432,39 +455,18 @@ type benchOpts struct { // features through command line flags. We generate all possible combinations // for the provided values and run the benchmarks for each combination. type featureOpts struct { - enableTrace []bool // Feature index 0 - readLatencies []time.Duration // Feature index 1 - readKbps []int // Feature index 2 - readMTU []int // Feature index 3 - maxConcurrentCalls []int // Feature index 4 - reqSizeBytes []int // Feature index 5 - respSizeBytes []int // Feature index 6 - compModes []string // Feature index 7 - enableChannelz []bool // Feature index 8 - enablePreloader []bool // Feature index 9 + enableTrace []bool + readLatencies []time.Duration + readKbps []int + readMTU []int + maxConcurrentCalls []int + reqSizeBytes []int + respSizeBytes []int + compModes []string + enableChannelz []bool + enablePreloader []bool } -// featureIndex is an enum for the different features that could be configured -// by the user through command line flags. -type featureIndex int - -const ( - enableTraceIndex featureIndex = iota - readLatenciesIndex - readKbpsIndex - readMTUIndex - maxConcurrentCallsIndex - reqSizeBytesIndex - respSizeBytesIndex - compModesIndex - enableChannelzIndex - enablePreloaderIndex - - // This is a place holder to indicate the total number of feature indices we - // have. Any new feature indices should be added above this. - maxFeatureIndex -) - // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each // element of the slice (indexed by 'featuresIndex' enum) contains the number // of features to be exercised by the benchmark code. @@ -472,31 +474,31 @@ const ( // enableTrace feature, while index 1 contains the number of value of // readLatencies feature and so on. func makeFeaturesNum(b *benchOpts) []int { - featuresNum := make([]int, maxFeatureIndex) + featuresNum := make([]int, stats.MaxFeatureIndex) for i := 0; i < len(featuresNum); i++ { - switch featureIndex(i) { - case enableTraceIndex: + switch stats.FeatureIndex(i) { + case stats.EnableTraceIndex: featuresNum[i] = len(b.features.enableTrace) - case readLatenciesIndex: + case stats.ReadLatenciesIndex: featuresNum[i] = len(b.features.readLatencies) - case readKbpsIndex: + case stats.ReadKbpsIndex: featuresNum[i] = len(b.features.readKbps) - case readMTUIndex: + case stats.ReadMTUIndex: featuresNum[i] = len(b.features.readMTU) - case maxConcurrentCallsIndex: + case stats.MaxConcurrentCallsIndex: featuresNum[i] = len(b.features.maxConcurrentCalls) - case reqSizeBytesIndex: + case stats.ReqSizeBytesIndex: featuresNum[i] = len(b.features.reqSizeBytes) - case respSizeBytesIndex: + case stats.RespSizeBytesIndex: featuresNum[i] = len(b.features.respSizeBytes) - case compModesIndex: + case stats.CompModesIndex: featuresNum[i] = len(b.features.compModes) - case enableChannelzIndex: + case stats.EnableChannelzIndex: featuresNum[i] = len(b.features.enableChannelz) - case enablePreloaderIndex: + case stats.EnablePreloaderIndex: featuresNum[i] = len(b.features.enablePreloader) default: - log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, maxFeatureIndex) + log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex) } } return featuresNum @@ -537,26 +539,28 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { // all options. var result []stats.Features var curPos []int - initialPos := make([]int, maxFeatureIndex) + initialPos := make([]int, stats.MaxFeatureIndex) for !reflect.DeepEqual(initialPos, curPos) { if curPos == nil { - curPos = make([]int, maxFeatureIndex) + curPos = make([]int, stats.MaxFeatureIndex) } result = append(result, stats.Features{ // These features stay the same for each iteration. - NetworkMode: b.networkMode, - UseBufConn: b.useBufconn, + NetworkMode: b.networkMode, + UseBufConn: b.useBufconn, + EnableKeepalive: b.enableKeepalive, + BenchTime: b.benchTime, // These features can potentially change for each iteration. - EnableTrace: b.features.enableTrace[curPos[enableTraceIndex]], - Latency: b.features.readLatencies[curPos[readLatenciesIndex]], - Kbps: b.features.readKbps[curPos[readKbpsIndex]], - Mtu: b.features.readMTU[curPos[readMTUIndex]], - MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[maxConcurrentCallsIndex]], - ReqSizeBytes: b.features.reqSizeBytes[curPos[reqSizeBytesIndex]], - RespSizeBytes: b.features.respSizeBytes[curPos[respSizeBytesIndex]], - ModeCompressor: b.features.compModes[curPos[compModesIndex]], - EnableChannelz: b.features.enableChannelz[curPos[enableChannelzIndex]], - EnablePreloader: b.features.enablePreloader[curPos[enablePreloaderIndex]], + EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]], + Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]], + Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]], + MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]], + MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]], + ReqSizeBytes: b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]], + RespSizeBytes: b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]], + ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]], + EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]], + EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]], }) addOne(curPos, featuresNum) } @@ -597,6 +601,7 @@ func processFlags() *benchOpts { networkMode: *networkMode, benchmarkResultFile: *benchmarkResultFile, useBufconn: *useBufconn, + enableKeepalive: *enableKeepalive, features: &featureOpts{ enableTrace: setToggleMode(*traceMode), readLatencies: append([]time.Duration(nil), *readLatency...), @@ -648,76 +653,36 @@ func setCompressorMode(val string) []string { } } -func printThroughput(requestCount uint64, requestSize int, responseCount uint64, responseSize int, benchTime time.Duration) { - requestThroughput := float64(requestCount) * float64(requestSize) * 8 / benchTime.Seconds() - responseThroughput := float64(responseCount) * float64(responseSize) * 8 / benchTime.Seconds() - fmt.Printf("Number of requests: %v\tRequest throughput: %v bit/s\n", requestCount, requestThroughput) - fmt.Printf("Number of responses: %v\tResponse throughput: %v bit/s\n", responseCount, responseThroughput) - fmt.Println() -} - func main() { opts := processFlags() before(opts) - s := stats.NewStats(numStatsBuckets) - s.SortLatency() - var memStats runtime.MemStats - var results testing.BenchmarkResult - var startAllocs, startBytes uint64 - var startTime time.Time - var startTimer = func() { - runtime.ReadMemStats(&memStats) - startAllocs = memStats.Mallocs - startBytes = memStats.TotalAlloc - startTime = time.Now() - } - var stopTimer = func(count uint64) { - runtime.ReadMemStats(&memStats) - results = testing.BenchmarkResult{ - N: int(count), - T: time.Since(startTime), - Bytes: 0, - MemAllocs: memStats.Mallocs - startAllocs, - MemBytes: memStats.TotalAlloc - startBytes, - } - } - // Run benchmarks - resultSlice := []stats.BenchResults{} + s := stats.NewStats(numStatsBuckets) featuresNum := makeFeaturesNum(opts) - sharedPos := sharedFeatures(featuresNum) - for _, benchFeature := range opts.generateFeatures(featuresNum) { - grpc.EnableTracing = benchFeature.EnableTrace - if benchFeature.EnableChannelz { + sf := sharedFeatures(featuresNum) + + var ( + start = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) } + stop = func(count uint64) { s.EndRun(count) } + ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) } + ) + + for _, bf := range opts.generateFeatures(featuresNum) { + grpc.EnableTracing = bf.EnableTrace + if bf.EnableChannelz { channelz.TurnOn() } if opts.rModes.unary { - count := unaryBenchmark(startTimer, stopTimer, benchFeature, opts.benchTime, s) - s.SetBenchmarkResult("Unary", benchFeature, results.N, - results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos) - fmt.Println(s.BenchString()) - fmt.Println(s.String()) - printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes, opts.benchTime) - resultSlice = append(resultSlice, s.GetBenchmarkResults()) - s.Clear() + unaryBenchmark(start, stop, bf, s) } if opts.rModes.streaming { - count := streamBenchmark(startTimer, stopTimer, benchFeature, opts.benchTime, s) - s.SetBenchmarkResult("Stream", benchFeature, results.N, - results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos) - fmt.Println(s.BenchString()) - fmt.Println(s.String()) - printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes, opts.benchTime) - resultSlice = append(resultSlice, s.GetBenchmarkResults()) - s.Clear() + streamBenchmark(start, stop, bf, s) } if opts.rModes.unconstrained { - requestCount, responseCount := unconstrainedStreamBenchmark(benchFeature, time.Second, opts.benchTime) - fmt.Printf("Unconstrained Stream-%v\n", benchFeature) - printThroughput(requestCount, benchFeature.ReqSizeBytes, responseCount, benchFeature.RespSizeBytes, opts.benchTime) + unconstrainedStreamBenchmark(start, ucStop, bf, s) } } - after(opts, resultSlice) + after(opts, s.GetResults()) } func before(opts *benchOpts) { diff --git a/benchmark/benchresult/main.go b/benchmark/benchresult/main.go index 40226cff..ec27a830 100644 --- a/benchmark/benchresult/main.go +++ b/benchmark/benchresult/main.go @@ -32,14 +32,13 @@ import ( "fmt" "log" "os" - "strconv" "strings" "time" "google.golang.org/grpc/benchmark/stats" ) -func createMap(fileName string, m map[string]stats.BenchResults) { +func createMap(fileName string) map[string]stats.BenchResults { f, err := os.Open(fileName) if err != nil { log.Fatalf("Read file %s error: %s\n", fileName, err) @@ -50,18 +49,22 @@ func createMap(fileName string, m map[string]stats.BenchResults) { if err = decoder.Decode(&data); err != nil { log.Fatalf("Decode file %s error: %s\n", fileName, err) } + m := make(map[string]stats.BenchResults) for _, d := range data { m[d.RunMode+"-"+d.Features.String()] = d } + return m } -func intChange(title string, val1, val2 int64) string { - return fmt.Sprintf("%10s %12s %12s %8.2f%%\n", title, strconv.FormatInt(val1, 10), - strconv.FormatInt(val2, 10), float64(val2-val1)*100/float64(val1)) +func intChange(title string, val1, val2 uint64) string { + return fmt.Sprintf("%20s %12d %12d %8.2f%%\n", title, val1, val2, float64(int64(val2)-int64(val1))*100/float64(val1)) } -func timeChange(title int, val1, val2 time.Duration) string { - return fmt.Sprintf("%10s %12s %12s %8.2f%%\n", strconv.Itoa(title)+" latency", val1.String(), +func floatChange(title string, val1, val2 float64) string { + return fmt.Sprintf("%20s %12.2f %12.2f %8.2f%%\n", title, val1, val2, float64(int64(val2)-int64(val1))*100/float64(val1)) +} +func timeChange(title string, val1, val2 time.Duration) string { + return fmt.Sprintf("%20s %12s %12s %8.2f%%\n", title, val1.String(), val2.String(), float64(val2-val1)*100/float64(val1)) } @@ -69,30 +72,30 @@ func compareTwoMap(m1, m2 map[string]stats.BenchResults) { for k2, v2 := range m2 { if v1, ok := m1[k2]; ok { changes := k2 + "\n" - changes += fmt.Sprintf("%10s %12s %12s %8s\n", "Title", "Before", "After", "Percentage") - changes += intChange("Bytes/op", v1.AllocedBytesPerOp, v2.AllocedBytesPerOp) - changes += intChange("Allocs/op", v1.AllocsPerOp, v2.AllocsPerOp) - changes += timeChange(v1.Latency[1].Percent, v1.Latency[1].Value, v2.Latency[1].Value) - changes += timeChange(v1.Latency[2].Percent, v1.Latency[2].Value, v2.Latency[2].Value) + changes += fmt.Sprintf("%20s %12s %12s %8s\n", "Title", "Before", "After", "Percentage") + changes += intChange("TotalOps", v1.Data.TotalOps, v2.Data.TotalOps) + changes += intChange("SendOps", v1.Data.SendOps, v2.Data.SendOps) + changes += intChange("RecvOps", v1.Data.RecvOps, v2.Data.RecvOps) + changes += intChange("Bytes/op", v1.Data.AllocedBytes, v2.Data.AllocedBytes) + changes += intChange("Allocs/op", v1.Data.Allocs, v2.Data.Allocs) + changes += floatChange("ReqT/op", v1.Data.ReqT, v2.Data.ReqT) + changes += floatChange("RespT/op", v1.Data.RespT, v2.Data.RespT) + changes += timeChange("50th-Lat", v1.Data.Fiftieth, v2.Data.Fiftieth) + changes += timeChange("90th-Lat", v1.Data.Ninetieth, v2.Data.Ninetieth) + changes += timeChange("99th-Lat", v1.Data.NinetyNinth, v2.Data.NinetyNinth) + changes += timeChange("Avg-Lat", v1.Data.Average, v2.Data.Average) fmt.Printf("%s\n", changes) } } } func compareBenchmark(file1, file2 string) { - var BenchValueFile1 map[string]stats.BenchResults - var BenchValueFile2 map[string]stats.BenchResults - BenchValueFile1 = make(map[string]stats.BenchResults) - BenchValueFile2 = make(map[string]stats.BenchResults) - - createMap(file1, BenchValueFile1) - createMap(file2, BenchValueFile2) - - compareTwoMap(BenchValueFile1, BenchValueFile2) + compareTwoMap(createMap(file1), createMap(file2)) } -func printline(benchName, ltc50, ltc90, allocByte, allocsOp interface{}) { - fmt.Printf("%-80v%12v%12v%12v%12v\n", benchName, ltc50, ltc90, allocByte, allocsOp) +func printline(benchName, total, send, recv, allocB, allocN, reqT, respT, ltc50, ltc90, l99, lAvg interface{}) { + fmt.Printf("%-80v%12v%12v%12v%12v%12v%18v%18v%12v%12v%12v%12v\n", + benchName, total, send, recv, allocB, allocN, reqT, respT, ltc50, ltc90, l99, lAvg) } func formatBenchmark(fileName string) { @@ -101,26 +104,30 @@ func formatBenchmark(fileName string) { log.Fatalf("Read file %s error: %s\n", fileName, err) } defer f.Close() - var data []stats.BenchResults + var results []stats.BenchResults decoder := gob.NewDecoder(f) - if err = decoder.Decode(&data); err != nil { + if err = decoder.Decode(&results); err != nil { log.Fatalf("Decode file %s error: %s\n", fileName, err) } - if len(data) == 0 { - log.Fatalf("No data in file %s\n", fileName) + if len(results) == 0 { + log.Fatalf("No benchmark results in file %s\n", fileName) } - printPos := data[0].SharedPosion + fmt.Println("\nShared features:\n" + strings.Repeat("-", 20)) - fmt.Print(stats.PartialPrintString(printPos, data[0].Features, true)) + fmt.Print(results[0].Features.SharedFeatures(results[0].SharedFeatures)) fmt.Println(strings.Repeat("-", 35)) - for i := 0; i < len(data[0].SharedPosion); i++ { - printPos[i] = !printPos[i] + + wantFeatures := results[0].SharedFeatures + for i := 0; i < len(results[0].SharedFeatures); i++ { + wantFeatures[i] = !wantFeatures[i] } - printline("Name", "latency-50", "latency-90", "Alloc (B)", "Alloc (#)") - for _, d := range data { - name := d.RunMode + stats.PartialPrintString(printPos, d.Features, false) - printline(name, d.Latency[1].Value.String(), d.Latency[2].Value.String(), - d.AllocedBytesPerOp, d.AllocsPerOp) + + printline("Name", "TotalOps", "SendOps", "RecvOps", "Alloc (B)", "Alloc (#)", + "RequestT", "ResponseT", "L-50", "L-90", "L-99", "L-Avg") + for _, r := range results { + d := r.Data + printline(r.RunMode+r.Features.PrintableName(wantFeatures), d.TotalOps, d.SendOps, d.RecvOps, + d.AllocedBytes, d.Allocs, d.ReqT, d.RespT, d.Fiftieth, d.Ninetieth, d.NinetyNinth, d.Average) } } diff --git a/benchmark/stats/stats.go b/benchmark/stats/stats.go index 0968f962..70972cb8 100644 --- a/benchmark/stats/stats.go +++ b/benchmark/stats/stats.go @@ -16,293 +16,401 @@ * */ -// Package stats registers stats used for creating benchmarks +// Package stats tracks the statistics associated with benchmark runs. package stats import ( "bytes" "fmt" - "io" + "log" "math" + "runtime" "sort" "strconv" + "sync" "time" ) -// Features contains most fields for a benchmark +// FeatureIndex is an enum for features that usually differ across individual +// benchmark runs in a single execution. These are usually configured by the +// user through command line flags. +type FeatureIndex int + +// FeatureIndex enum values corresponding to individually settable features. +const ( + EnableTraceIndex FeatureIndex = iota + ReadLatenciesIndex + ReadKbpsIndex + ReadMTUIndex + MaxConcurrentCallsIndex + ReqSizeBytesIndex + RespSizeBytesIndex + CompModesIndex + EnableChannelzIndex + EnablePreloaderIndex + + // MaxFeatureIndex is a place holder to indicate the total number of feature + // indices we have. Any new feature indices should be added above this. + MaxFeatureIndex +) + +// Features represent configured options for a specific benchmark run. This is +// usually constructed from command line arguments passed by the caller. See +// benchmark/benchmain/main.go for defined command line flags. This is also +// part of the BenchResults struct which is serialized and written to a file. type Features struct { - NetworkMode string - UseBufConn bool - EnableTrace bool - Latency time.Duration - Kbps int - Mtu int + // Network mode used for this benchmark run. Could be one of Local, LAN, WAN + // or Longhaul. + NetworkMode string + // UseBufCon indicates whether an in-memory connection was used for this + // benchmark run instead of system network I/O. + UseBufConn bool + // EnableKeepalive indicates if keepalives were enabled on the connections + // used in this benchmark run. + EnableKeepalive bool + // BenchTime indicates the duration of the benchmark run. + BenchTime time.Duration + + // Features defined above are usually the same for all benchmark runs in a + // particular invocation, while the features defined below could vary from + // run to run based on the configured command line. These features have a + // corresponding featureIndex value which is used for a variety of reasons. + + // EnableTrace indicates if tracing was enabled. + EnableTrace bool + // Latency is the simulated one-way network latency used. + Latency time.Duration + // Kbps is the simulated network throughput used. + Kbps int + // MTU is the simulated network MTU used. + MTU int + // MaxConcurrentCalls is the number of concurrent RPCs made during this + // benchmark run. MaxConcurrentCalls int - ReqSizeBytes int - RespSizeBytes int - ModeCompressor string - EnableChannelz bool - EnablePreloader bool + // ReqSizeBytes is the request size in bytes used in this benchmark run. + ReqSizeBytes int + // RespSizeBytes is the response size in bytes used in this benchmark run. + RespSizeBytes int + // ModeCompressor represents the compressor mode used. + ModeCompressor string + // EnableChannelz indicates if channelz was turned on. + EnableChannelz bool + // EnablePreloader indicates if preloading was turned on. + EnablePreloader bool } -// String returns the textual output of the Features as string. +// String returns all the feature values as a string. func (f Features) String() string { - return fmt.Sprintf("traceMode_%t-latency_%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+ - "%#v-reqSize_%#vB-respSize_%#vB-Compressor_%s-Preloader_%t", f.EnableTrace, - f.Latency.String(), f.Kbps, f.Mtu, f.MaxConcurrentCalls, f.ReqSizeBytes, f.RespSizeBytes, f.ModeCompressor, f.EnablePreloader) + return fmt.Sprintf("networkMode_%v-bufConn_%v-keepalive_%v-benchTime_%v-"+ + "trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-"+ + "reqSize_%vB-respSize_%vB-compressor_%v-channelz_%v-preloader_%v", + f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, + f.EnableTrace, f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, + f.ReqSizeBytes, f.RespSizeBytes, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader) } -// ConciseString returns the concise textual output of the Features as string, skipping -// setting with default value. -func (f Features) ConciseString() string { - noneEmptyPos := []bool{f.EnableTrace, f.Latency != 0, f.Kbps != 0, f.Mtu != 0, true, true, true, f.ModeCompressor != "off", f.EnableChannelz, f.EnablePreloader} - return PartialPrintString(noneEmptyPos, f, false) +// SharedFeatures returns the shared features as a pretty printable string. +// 'wantFeatures' is a bitmask of wanted features, indexed by FeaturesIndex. +func (f Features) SharedFeatures(wantFeatures []bool) string { + var b bytes.Buffer + if f.NetworkMode != "" { + b.WriteString(fmt.Sprintf("Network: %v\n", f.NetworkMode)) + } + if f.UseBufConn { + b.WriteString(fmt.Sprintf("UseBufConn: %v\n", f.UseBufConn)) + } + if f.EnableKeepalive { + b.WriteString(fmt.Sprintf("EnableKeepalive: %v\n", f.EnableKeepalive)) + } + b.WriteString(fmt.Sprintf("BenchTime: %v\n", f.BenchTime)) + f.partialString(&b, wantFeatures, ": ", "\n") + return b.String() } -// PartialPrintString can print certain features with different format. -func PartialPrintString(noneEmptyPos []bool, f Features, shared bool) string { - s := "" - var ( - prefix, suffix, linker string - isNetwork bool - ) - if shared { - suffix = "\n" - linker = ": " - } else { - prefix = "-" - linker = "_" - } - if noneEmptyPos[0] { - s += fmt.Sprintf("%sTrace%s%t%s", prefix, linker, f.EnableTrace, suffix) - } - if shared && f.NetworkMode != "" { - s += fmt.Sprintf("Network: %s \n", f.NetworkMode) - isNetwork = true - } - if !isNetwork { - if noneEmptyPos[1] { - s += fmt.Sprintf("%slatency%s%s%s", prefix, linker, f.Latency.String(), suffix) - } - if noneEmptyPos[2] { - s += fmt.Sprintf("%skbps%s%#v%s", prefix, linker, f.Kbps, suffix) - } - if noneEmptyPos[3] { - s += fmt.Sprintf("%sMTU%s%#v%s", prefix, linker, f.Mtu, suffix) +// PrintableName returns a one line name which includes the features specified +// by 'wantFeatures' which is a bitmask of wanted features, indexed by +// FeaturesIndex. +func (f Features) PrintableName(wantFeatures []bool) string { + var b bytes.Buffer + f.partialString(&b, wantFeatures, "_", "-") + return b.String() +} + +// partialString writes features specified by 'wantFeatures' to the provided +// bytes.Buffer. +func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim string) { + for i, sf := range wantFeatures { + if sf { + switch FeatureIndex(i) { + case EnableTraceIndex: + b.WriteString(fmt.Sprintf("Trace%v%v%v", sep, f.EnableTrace, delim)) + case ReadLatenciesIndex: + b.WriteString(fmt.Sprintf("Latency%v%v%v", sep, f.Latency, delim)) + case ReadKbpsIndex: + b.WriteString(fmt.Sprintf("Kbps%v%v%v", sep, f.Kbps, delim)) + case ReadMTUIndex: + b.WriteString(fmt.Sprintf("MTU%v%v%v", sep, f.MTU, delim)) + case MaxConcurrentCallsIndex: + b.WriteString(fmt.Sprintf("Callers%v%v%v", sep, f.MaxConcurrentCalls, delim)) + case ReqSizeBytesIndex: + b.WriteString(fmt.Sprintf("ReqSize%v%vB%v", sep, f.ReqSizeBytes, delim)) + case RespSizeBytesIndex: + b.WriteString(fmt.Sprintf("RespSize%v%vB%v", sep, f.RespSizeBytes, delim)) + case CompModesIndex: + b.WriteString(fmt.Sprintf("Compressor%v%v%v", sep, f.ModeCompressor, delim)) + case EnableChannelzIndex: + b.WriteString(fmt.Sprintf("Channelz%v%v%v", sep, f.EnableChannelz, delim)) + case EnablePreloaderIndex: + b.WriteString(fmt.Sprintf("Preloader%v%v%v", sep, f.EnablePreloader, delim)) + default: + log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex) + } } } - if noneEmptyPos[4] { - s += fmt.Sprintf("%sCallers%s%#v%s", prefix, linker, f.MaxConcurrentCalls, suffix) - } - if noneEmptyPos[5] { - s += fmt.Sprintf("%sreqSize%s%#vB%s", prefix, linker, f.ReqSizeBytes, suffix) - } - if noneEmptyPos[6] { - s += fmt.Sprintf("%srespSize%s%#vB%s", prefix, linker, f.RespSizeBytes, suffix) - } - if noneEmptyPos[7] { - s += fmt.Sprintf("%sCompressor%s%s%s", prefix, linker, f.ModeCompressor, suffix) - } - if noneEmptyPos[8] { - s += fmt.Sprintf("%sChannelz%s%t%s", prefix, linker, f.EnableChannelz, suffix) - } - if noneEmptyPos[9] { - s += fmt.Sprintf("%sPreloader%s%t%s", prefix, linker, f.EnablePreloader, suffix) - } - return s } -type percentLatency struct { - Percent int - Value time.Duration -} - -// BenchResults records features and result of a benchmark. +// BenchResults records features and results of a benchmark run. A collection +// of these structs is usually serialized and written to a file after a +// benchmark execution, and could later be read for pretty-printing or +// comparison with other benchmark results. type BenchResults struct { - RunMode string - Features Features - Latency []percentLatency - Operations int - NsPerOp int64 - AllocedBytesPerOp int64 - AllocsPerOp int64 - SharedPosion []bool + // RunMode is the workload mode for this benchmark run. This could be unary, + // stream or unconstrained. + RunMode string + // Features represents the configured feature options for this run. + Features Features + // SharedFeatures represents the features which were shared across all + // benchmark runs during one execution. It is a slice indexed by + // 'FeaturesIndex' and a value of true indicates that the associated + // feature is shared across all runs. + SharedFeatures []bool + // Data contains the statistical data of interest from the benchmark run. + Data RunData } -// SetBenchmarkResult sets features of benchmark and basic results. -func (stats *Stats) SetBenchmarkResult(mode string, features Features, o int, allocdBytes, allocs int64, sharedPos []bool) { - stats.result.RunMode = mode - stats.result.Features = features - stats.result.Operations = o - stats.result.AllocedBytesPerOp = allocdBytes - stats.result.AllocsPerOp = allocs - stats.result.SharedPosion = sharedPos -} +// RunData contains statistical data of interest from a benchmark run. +type RunData struct { + // TotalOps is the number of operations executed during this benchmark run. + // Only makes sense for unary and streaming workloads. + TotalOps uint64 + // SendOps is the number of send operations executed during this benchmark + // run. Only makes sense for unconstrained workloads. + SendOps uint64 + // RecvOps is the number of receive operations executed during this benchmark + // run. Only makes sense for unconstrained workloads. + RecvOps uint64 + // AllocedBytes is the average memory allocation in bytes per operation. + AllocedBytes uint64 + // Allocs is the average number of memory allocations per operation. + Allocs uint64 + // ReqT is the average request throughput associated with this run. + ReqT float64 + // RespT is the average response throughput associated with this run. + RespT float64 -// GetBenchmarkResults returns the result of the benchmark including features and result. -func (stats *Stats) GetBenchmarkResults() BenchResults { - return stats.result -} + // We store different latencies associated with each run. These latencies are + // only computed for unary and stream workloads as they are not very useful + // for unconstrained workloads. -// BenchString output latency stats as the format as time + unit. -func (stats *Stats) BenchString() string { - stats.maybeUpdate() - s := stats.result - res := s.RunMode + "-" + s.Features.String() + ": \n" - if len(s.Latency) != 0 { - var statsUnit = s.Latency[0].Value - var timeUnit = fmt.Sprintf("%v", statsUnit)[1:] - for i := 1; i < len(s.Latency)-1; i++ { - res += fmt.Sprintf("%d_Latency: %s %s \t", s.Latency[i].Percent, - strconv.FormatFloat(float64(s.Latency[i].Value)/float64(statsUnit), 'f', 4, 64), timeUnit) - } - res += fmt.Sprintf("Avg latency: %s %s \t", - strconv.FormatFloat(float64(s.Latency[len(s.Latency)-1].Value)/float64(statsUnit), 'f', 4, 64), timeUnit) - } - res += fmt.Sprintf("Count: %v \t", s.Operations) - res += fmt.Sprintf("%v Bytes/op\t", s.AllocedBytesPerOp) - res += fmt.Sprintf("%v Allocs/op\t", s.AllocsPerOp) - - return res -} - -// Stats is a simple helper for gathering additional statistics like histogram -// during benchmarks. This is not thread safe. -type Stats struct { - numBuckets int - unit time.Duration - min, max int64 - histogram *Histogram - - durations durationSlice - dirty bool - - sortLatency bool - result BenchResults + // Fiftieth is the 50th percentile latency. + Fiftieth time.Duration + // Ninetieth is the 90th percentile latency. + Ninetieth time.Duration + // Ninetyninth is the 99th percentile latency. + NinetyNinth time.Duration + // Average is the average latency. + Average time.Duration } type durationSlice []time.Duration -// NewStats creates a new Stats instance. If numBuckets is not positive, -// the default value (16) will be used. +func (a durationSlice) Len() int { return len(a) } +func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a durationSlice) Less(i, j int) bool { return a[i] < a[j] } + +// Stats is a helper for gathering statistics about individual benchmark runs. +type Stats struct { + mu sync.Mutex + numBuckets int + hw *histWrapper + results []BenchResults + startMS runtime.MemStats + stopMS runtime.MemStats +} + +type histWrapper struct { + unit time.Duration + histogram *Histogram + durations durationSlice +} + +// NewStats creates a new Stats instance. If numBuckets is not positive, the +// default value (16) will be used. func NewStats(numBuckets int) *Stats { if numBuckets <= 0 { numBuckets = 16 } - return &Stats{ - // Use one more bucket for the last unbounded bucket. - numBuckets: numBuckets + 1, - durations: make(durationSlice, 0, 100000), + // Use one more bucket for the last unbounded bucket. + s := &Stats{numBuckets: numBuckets + 1} + s.hw = &histWrapper{} + return s +} + +// StartRun is to be invoked to indicate the start of a new benchmark run. +func (s *Stats) StartRun(mode string, f Features, sf []bool) { + s.mu.Lock() + defer s.mu.Unlock() + + runtime.ReadMemStats(&s.startMS) + s.results = append(s.results, BenchResults{RunMode: mode, Features: f, SharedFeatures: sf}) +} + +// EndRun is to be invoked to indicate the end of the ongoing benchmark run. It +// computes a bunch of stats and dumps them to stdout. +func (s *Stats) EndRun(count uint64) { + s.mu.Lock() + defer s.mu.Unlock() + + runtime.ReadMemStats(&s.stopMS) + r := &s.results[len(s.results)-1] + r.Data = RunData{ + TotalOps: count, + AllocedBytes: s.stopMS.TotalAlloc - s.startMS.TotalAlloc, + Allocs: s.stopMS.Mallocs - s.startMS.Mallocs, + ReqT: float64(count) * float64(r.Features.ReqSizeBytes) * 8 / r.Features.BenchTime.Seconds(), + RespT: float64(count) * float64(r.Features.RespSizeBytes) * 8 / r.Features.BenchTime.Seconds(), } + s.computeLatencies(r) + s.dump(r) + s.hw = &histWrapper{} } -// Add adds an elapsed time per operation to the stats. -func (stats *Stats) Add(d time.Duration) { - stats.durations = append(stats.durations, d) - stats.dirty = true +// EndUnconstrainedRun is similar to EndRun, but is to be used for +// unconstrained workloads. +func (s *Stats) EndUnconstrainedRun(req uint64, resp uint64) { + s.mu.Lock() + defer s.mu.Unlock() + + runtime.ReadMemStats(&s.stopMS) + r := &s.results[len(s.results)-1] + r.Data = RunData{ + SendOps: req, + RecvOps: resp, + AllocedBytes: (s.stopMS.TotalAlloc - s.startMS.TotalAlloc) / ((req + resp) / 2), + Allocs: (s.stopMS.Mallocs - s.startMS.Mallocs) / ((req + resp) / 2), + ReqT: float64(req) * float64(r.Features.ReqSizeBytes) * 8 / r.Features.BenchTime.Seconds(), + RespT: float64(resp) * float64(r.Features.RespSizeBytes) * 8 / r.Features.BenchTime.Seconds(), + } + s.computeLatencies(r) + s.dump(r) + s.hw = &histWrapper{} } -// Clear resets the stats, removing all values. -func (stats *Stats) Clear() { - stats.durations = stats.durations[:0] - stats.histogram = nil - stats.dirty = false - stats.result = BenchResults{} +// AddDuration adds an elapsed duration per operation to the stats. This is +// used by unary and stream modes where request and response stats are equal. +func (s *Stats) AddDuration(d time.Duration) { + s.mu.Lock() + defer s.mu.Unlock() + + s.hw.durations = append(s.hw.durations, d) +} + +// GetResults returns the results from all benchmark runs. +func (s *Stats) GetResults() []BenchResults { + s.mu.Lock() + defer s.mu.Unlock() + + return s.results +} + +// computeLatencies computes percentile latencies based on durations stored in +// the stats object and updates the corresponding fields in the result object. +func (s *Stats) computeLatencies(result *BenchResults) { + if len(s.hw.durations) == 0 { + return + } + sort.Sort(s.hw.durations) + minDuration := int64(s.hw.durations[0]) + maxDuration := int64(s.hw.durations[len(s.hw.durations)-1]) + + // Use the largest unit that can represent the minimum time duration. + s.hw.unit = time.Nanosecond + for _, u := range []time.Duration{time.Microsecond, time.Millisecond, time.Second} { + if minDuration <= int64(u) { + break + } + s.hw.unit = u + } + + numBuckets := s.numBuckets + if n := int(maxDuration - minDuration + 1); n < numBuckets { + numBuckets = n + } + s.hw.histogram = NewHistogram(HistogramOptions{ + NumBuckets: numBuckets, + // max-min(lower bound of last bucket) = (1 + growthFactor)^(numBuckets-2) * baseBucketSize. + GrowthFactor: math.Pow(float64(maxDuration-minDuration), 1/float64(numBuckets-2)) - 1, + BaseBucketSize: 1.0, + MinValue: minDuration, + }) + for _, d := range s.hw.durations { + s.hw.histogram.Add(int64(d)) + } + result.Data.Fiftieth = s.hw.durations[max(s.hw.histogram.Count*int64(50)/100-1, 0)] + result.Data.Ninetieth = s.hw.durations[max(s.hw.histogram.Count*int64(90)/100-1, 0)] + result.Data.NinetyNinth = s.hw.durations[max(s.hw.histogram.Count*int64(99)/100-1, 0)] + result.Data.Average = time.Duration(float64(s.hw.histogram.Sum) / float64(s.hw.histogram.Count)) +} + +// dump returns a printable version. +func (s *Stats) dump(result *BenchResults) { + var b bytes.Buffer + // This prints the run mode and all features of the bench on a line. + b.WriteString(fmt.Sprintf("%s-%s:\n", result.RunMode, result.Features.String())) + + unit := s.hw.unit + tUnit := fmt.Sprintf("%v", unit)[1:] // stores one of s, ms, μs, ns + + if l := result.Data.Fiftieth; l != 0 { + b.WriteString(fmt.Sprintf("50_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit)) + } + if l := result.Data.Ninetieth; l != 0 { + b.WriteString(fmt.Sprintf("90_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit)) + } + if l := result.Data.NinetyNinth; l != 0 { + b.WriteString(fmt.Sprintf("99_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit)) + } + if l := result.Data.Average; l != 0 { + b.WriteString(fmt.Sprintf("Avg_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit)) + } + b.WriteString(fmt.Sprintf("Bytes/op: %v\t", result.Data.AllocedBytes)) + b.WriteString(fmt.Sprintf("Allocs/op: %v\t\n", result.Data.Allocs)) + + // This prints the histogram stats for the latency. + if s.hw.histogram == nil { + b.WriteString("Histogram (empty)\n") + } else { + b.WriteString(fmt.Sprintf("Histogram (unit: %s)\n", tUnit)) + s.hw.histogram.PrintWithUnit(&b, float64(unit)) + } + + // Print throughput data. + req := result.Data.SendOps + if req == 0 { + req = result.Data.TotalOps + } + resp := result.Data.RecvOps + if resp == 0 { + resp = result.Data.TotalOps + } + b.WriteString(fmt.Sprintf("Number of requests: %v\tRequest throughput: %v bit/s\n", req, result.Data.ReqT)) + b.WriteString(fmt.Sprintf("Number of responses: %v\tResponse throughput: %v bit/s\n", resp, result.Data.RespT)) + fmt.Println(b.String()) } -//Sort method for durations -func (a durationSlice) Len() int { return len(a) } -func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a durationSlice) Less(i, j int) bool { return a[i] < a[j] } func max(a, b int64) int64 { if a > b { return a } return b } - -// maybeUpdate updates internal stat data if there was any newly added -// stats since this was updated. -func (stats *Stats) maybeUpdate() { - if !stats.dirty { - return - } - - if stats.sortLatency { - sort.Sort(stats.durations) - stats.min = int64(stats.durations[0]) - stats.max = int64(stats.durations[len(stats.durations)-1]) - } - - stats.min = math.MaxInt64 - stats.max = 0 - for _, d := range stats.durations { - if stats.min > int64(d) { - stats.min = int64(d) - } - if stats.max < int64(d) { - stats.max = int64(d) - } - } - - // Use the largest unit that can represent the minimum time duration. - stats.unit = time.Nanosecond - for _, u := range []time.Duration{time.Microsecond, time.Millisecond, time.Second} { - if stats.min <= int64(u) { - break - } - stats.unit = u - } - - numBuckets := stats.numBuckets - if n := int(stats.max - stats.min + 1); n < numBuckets { - numBuckets = n - } - stats.histogram = NewHistogram(HistogramOptions{ - NumBuckets: numBuckets, - // max-min(lower bound of last bucket) = (1 + growthFactor)^(numBuckets-2) * baseBucketSize. - GrowthFactor: math.Pow(float64(stats.max-stats.min), 1/float64(numBuckets-2)) - 1, - BaseBucketSize: 1.0, - MinValue: stats.min}) - - for _, d := range stats.durations { - stats.histogram.Add(int64(d)) - } - - stats.dirty = false - - if stats.durations.Len() != 0 { - var percentToObserve = []int{50, 90, 99} - // First data record min unit from the latency result. - stats.result.Latency = append(stats.result.Latency, percentLatency{Percent: -1, Value: stats.unit}) - for _, position := range percentToObserve { - stats.result.Latency = append(stats.result.Latency, percentLatency{Percent: position, Value: stats.durations[max(stats.histogram.Count*int64(position)/100-1, 0)]}) - } - // Last data record the average latency. - avg := float64(stats.histogram.Sum) / float64(stats.histogram.Count) - stats.result.Latency = append(stats.result.Latency, percentLatency{Percent: -1, Value: time.Duration(avg)}) - } -} - -// SortLatency blocks the output -func (stats *Stats) SortLatency() { - stats.sortLatency = true -} - -// Print writes textual output of the Stats. -func (stats *Stats) Print(w io.Writer) { - stats.maybeUpdate() - if stats.histogram == nil { - fmt.Fprint(w, "Histogram (empty)\n") - } else { - fmt.Fprintf(w, "Histogram (unit: %s)\n", fmt.Sprintf("%v", stats.unit)[1:]) - stats.histogram.PrintWithUnit(w, float64(stats.unit)) - } -} - -// String returns the textual output of the Stats as string. -func (stats *Stats) String() string { - var b bytes.Buffer - stats.Print(&b) - return b.String() -} diff --git a/benchmark/stats/util.go b/benchmark/stats/util.go deleted file mode 100644 index f3bb3a36..00000000 --- a/benchmark/stats/util.go +++ /dev/null @@ -1,208 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package stats - -import ( - "bufio" - "bytes" - "fmt" - "os" - "runtime" - "sort" - "strings" - "sync" - "testing" -) - -var ( - curB *testing.B - curBenchName string - curStats map[string]*Stats - - orgStdout *os.File - nextOutPos int - - injectCond *sync.Cond - injectDone chan struct{} -) - -// AddStats adds a new unnamed Stats instance to the current benchmark. You need -// to run benchmarks by calling RunTestMain() to inject the stats to the -// benchmark results. If numBuckets is not positive, the default value (16) will -// be used. Please note that this calls b.ResetTimer() since it may be blocked -// until the previous benchmark stats is printed out. So AddStats() should -// typically be called at the very beginning of each benchmark function. -func AddStats(b *testing.B, numBuckets int) *Stats { - return AddStatsWithName(b, "", numBuckets) -} - -// AddStatsWithName adds a new named Stats instance to the current benchmark. -// With this, you can add multiple stats in a single benchmark. You need -// to run benchmarks by calling RunTestMain() to inject the stats to the -// benchmark results. If numBuckets is not positive, the default value (16) will -// be used. Please note that this calls b.ResetTimer() since it may be blocked -// until the previous benchmark stats is printed out. So AddStatsWithName() -// should typically be called at the very beginning of each benchmark function. -func AddStatsWithName(b *testing.B, name string, numBuckets int) *Stats { - var benchName string - for i := 1; ; i++ { - pc, _, _, ok := runtime.Caller(i) - if !ok { - panic("benchmark function not found") - } - p := strings.Split(runtime.FuncForPC(pc).Name(), ".") - benchName = p[len(p)-1] - if strings.HasPrefix(benchName, "run") { - break - } - } - procs := runtime.GOMAXPROCS(-1) - if procs != 1 { - benchName = fmt.Sprintf("%s-%d", benchName, procs) - } - - stats := NewStats(numBuckets) - - if injectCond != nil { - // We need to wait until the previous benchmark stats is printed out. - injectCond.L.Lock() - for curB != nil && curBenchName != benchName { - injectCond.Wait() - } - - curB = b - curBenchName = benchName - curStats[name] = stats - - injectCond.L.Unlock() - } - - b.ResetTimer() - return stats -} - -// RunTestMain runs the tests with enabling injection of benchmark stats. It -// returns an exit code to pass to os.Exit. -func RunTestMain(m *testing.M) int { - startStatsInjector() - defer stopStatsInjector() - return m.Run() -} - -// startStatsInjector starts stats injection to benchmark results. -func startStatsInjector() { - orgStdout = os.Stdout - r, w, _ := os.Pipe() - os.Stdout = w - nextOutPos = 0 - - resetCurBenchStats() - - injectCond = sync.NewCond(&sync.Mutex{}) - injectDone = make(chan struct{}) - go func() { - defer close(injectDone) - - scanner := bufio.NewScanner(r) - scanner.Split(splitLines) - for scanner.Scan() { - injectStatsIfFinished(scanner.Text()) - } - if err := scanner.Err(); err != nil { - panic(err) - } - }() -} - -// stopStatsInjector stops stats injection and restores os.Stdout. -func stopStatsInjector() { - os.Stdout.Close() - <-injectDone - injectCond = nil - os.Stdout = orgStdout -} - -// splitLines is a split function for a bufio.Scanner that returns each line -// of text, teeing texts to the original stdout even before each line ends. -func splitLines(data []byte, eof bool) (advance int, token []byte, err error) { - if eof && len(data) == 0 { - return 0, nil, nil - } - - if i := bytes.IndexByte(data, '\n'); i >= 0 { - orgStdout.Write(data[nextOutPos : i+1]) - nextOutPos = 0 - return i + 1, data[0:i], nil - } - - orgStdout.Write(data[nextOutPos:]) - nextOutPos = len(data) - - if eof { - // This is a final, non-terminated line. Return it. - return len(data), data, nil - } - - return 0, nil, nil -} - -// injectStatsIfFinished prints out the stats if the current benchmark finishes. -func injectStatsIfFinished(line string) { - injectCond.L.Lock() - defer injectCond.L.Unlock() - // We assume that the benchmark results start with "Benchmark". - if curB == nil || !strings.HasPrefix(line, "Benchmark") { - return - } - - if !curB.Failed() { - // Output all stats in alphabetical order. - names := make([]string, 0, len(curStats)) - for name := range curStats { - names = append(names, name) - } - sort.Strings(names) - for _, name := range names { - stats := curStats[name] - // The output of stats starts with a header like "Histogram (unit: ms)" - // followed by statistical properties and the buckets. Add the stats name - // if it is a named stats and indent them as Go testing outputs. - lines := strings.Split(stats.String(), "\n") - if n := len(lines); n > 0 { - if name != "" { - name = ": " + name - } - fmt.Fprintf(orgStdout, "--- %s%s\n", lines[0], name) - for _, line := range lines[1 : n-1] { - fmt.Fprintf(orgStdout, "\t%s\n", line) - } - } - } - } - - resetCurBenchStats() - injectCond.Signal() -} - -// resetCurBenchStats resets the current benchmark stats. -func resetCurBenchStats() { - curB = nil - curBenchName = "" - curStats = make(map[string]*Stats) -}