A few more improvements to the benchmark code. (#2840)

* A few more improvements to the benchmark code.

* In benchmain/main.go:
  * Define types for function arguments to make code more readable
  * Significantly simplify the code as a result of stats package refactor.
* In benchresult/main.go
  * Simplify code as a result of stats package refactor.
* In stats/stats.go
  * Define and expose featureIndex enum.
  * Refactor the types used to store features, results, stats etc.
  * Provide easy to use methods to add/modify/read/dump stats info.
* Delete stats/util.go - dead code.
This commit is contained in:
Easwar Swaminathan
2019-06-10 09:53:35 -07:00
committed by GitHub
parent f2967c2f83
commit b681a11d08
4 changed files with 543 additions and 671 deletions

View File

@ -55,7 +55,6 @@ import (
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing"
"time" "time"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -66,6 +65,7 @@ import (
"google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/test/bufconn" "google.golang.org/grpc/test/bufconn"
) )
@ -96,6 +96,8 @@ var (
cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided") cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file") benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O") useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
"Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
) )
const ( const (
@ -120,6 +122,8 @@ const (
networkLongHaul = "Longhaul" networkLongHaul = "Longhaul"
numStatsBuckets = 10 numStatsBuckets = 10
warmupCallCount = 10
warmuptime = time.Second
) )
var ( var (
@ -139,6 +143,8 @@ var (
networkModeWAN: latency.WAN, networkModeWAN: latency.WAN,
networkLongHaul: latency.Longhaul, networkLongHaul: latency.Longhaul,
} }
keepaliveTime = 10 * time.Second // this is the minimum allowed
keepaliveTimeout = 1 * time.Second
) )
// runModes indicates the workloads to run. This is initialized with a call to // runModes indicates the workloads to run. This is initialized with a call to
@ -169,76 +175,86 @@ func runModesFromWorkloads(workload string) runModes {
return r return r
} }
func unaryBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchTime time.Duration, s *stats.Stats) uint64 { type startFunc func(mode string, bf stats.Features)
caller, cleanup := makeFuncUnary(benchFeatures) type stopFunc func(count uint64)
type ucStopFunc func(req uint64, resp uint64)
type rpcCallFunc func(pos int)
type rpcSendFunc func(pos int)
type rpcRecvFunc func(pos int)
type rpcCleanupFunc func()
func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
caller, cleanup := makeFuncUnary(bf)
defer cleanup() defer cleanup()
return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchTime, s) runBenchmark(caller, start, stop, bf, s, workloadsUnary)
} }
func streamBenchmark(startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchTime time.Duration, s *stats.Stats) uint64 { func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
caller, cleanup := makeFuncStream(benchFeatures) caller, cleanup := makeFuncStream(bf)
defer cleanup() defer cleanup()
return runBenchmark(caller, startTimer, stopTimer, benchFeatures, benchTime, s) runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
} }
func unconstrainedStreamBenchmark(benchFeatures stats.Features, warmuptime, benchTime time.Duration) (uint64, uint64) { func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) {
var sender, recver func(int) var sender rpcSendFunc
var cleanup func() var recver rpcRecvFunc
if benchFeatures.EnablePreloader { var cleanup rpcCleanupFunc
sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(benchFeatures) if bf.EnablePreloader {
sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
} else { } else {
sender, recver, cleanup = makeFuncUnconstrainedStream(benchFeatures) sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
} }
defer cleanup() defer cleanup()
var ( var req, resp uint64
wg sync.WaitGroup
requestCount uint64
responseCount uint64
)
wg.Add(2 * benchFeatures.MaxConcurrentCalls)
// Resets the counters once warmed up
go func() { go func() {
// Resets the counters once warmed up
<-time.NewTimer(warmuptime).C <-time.NewTimer(warmuptime).C
atomic.StoreUint64(&requestCount, 0) atomic.StoreUint64(&req, 0)
atomic.StoreUint64(&responseCount, 0) atomic.StoreUint64(&resp, 0)
start(workloadsUnconstrained, bf)
}() }()
bmEnd := time.Now().Add(benchTime + warmuptime) bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { var wg sync.WaitGroup
wg.Add(2 * bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
go func(pos int) { go func(pos int) {
defer wg.Done()
for { for {
t := time.Now() t := time.Now()
if t.After(bmEnd) { if t.After(bmEnd) {
break return
} }
sender(pos) sender(pos)
atomic.AddUint64(&requestCount, 1) atomic.AddUint64(&req, 1)
} }
wg.Done()
}(i) }(i)
go func(pos int) { go func(pos int) {
defer wg.Done()
for { for {
t := time.Now() t := time.Now()
if t.After(bmEnd) { if t.After(bmEnd) {
break return
} }
recver(pos) recver(pos)
atomic.AddUint64(&responseCount, 1) atomic.AddUint64(&resp, 1)
} }
wg.Done()
}(i) }(i)
} }
wg.Wait() wg.Wait()
return requestCount, responseCount stop(req, resp)
} }
func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, func()) { // makeClient returns a gRPC client for the grpc.testing.BenchmarkService
nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} // service. The client is configured using the different options in the passed
// 'bf'. Also returns a cleanup function to close the client and release
// resources.
func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) {
nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
opts := []grpc.DialOption{} opts := []grpc.DialOption{}
sopts := []grpc.ServerOption{} sopts := []grpc.ServerOption{}
if benchFeatures.ModeCompressor == compModeNop { if bf.ModeCompressor == compModeNop {
sopts = append(sopts, sopts = append(sopts,
grpc.RPCCompressor(nopCompressor{}), grpc.RPCCompressor(nopCompressor{}),
grpc.RPCDecompressor(nopDecompressor{}), grpc.RPCDecompressor(nopDecompressor{}),
@ -248,7 +264,7 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu
grpc.WithDecompressor(nopDecompressor{}), grpc.WithDecompressor(nopDecompressor{}),
) )
} }
if benchFeatures.ModeCompressor == compModeGzip { if bf.ModeCompressor == compModeGzip {
sopts = append(sopts, sopts = append(sopts,
grpc.RPCCompressor(grpc.NewGZIPCompressor()), grpc.RPCCompressor(grpc.NewGZIPCompressor()),
grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
@ -258,11 +274,20 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu
grpc.WithDecompressor(grpc.NewGZIPDecompressor()), grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
) )
} }
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) if bf.EnableKeepalive {
opts = append(opts,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: keepaliveTime,
Timeout: keepaliveTimeout,
PermitWithoutStream: true,
}),
)
}
sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
opts = append(opts, grpc.WithInsecure()) opts = append(opts, grpc.WithInsecure())
var lis net.Listener var lis net.Listener
if benchFeatures.UseBufConn { if bf.UseBufConn {
bcLis := bufconn.Listen(256 * 1024) bcLis := bufconn.Listen(256 * 1024)
lis = bcLis lis = bcLis
opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
@ -289,18 +314,18 @@ func makeClient(benchFeatures stats.Features) (testpb.BenchmarkServiceClient, fu
} }
} }
func makeFuncUnary(benchFeatures stats.Features) (func(int), func()) { func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(benchFeatures) tc, cleanup := makeClient(bf)
return func(int) { return func(int) {
unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) unaryCaller(tc, bf.ReqSizeBytes, bf.RespSizeBytes)
}, cleanup }, cleanup
} }
func makeFuncStream(benchFeatures stats.Features) (func(int), func()) { func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(benchFeatures) tc, cleanup := makeClient(bf)
streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls) streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(context.Background()) stream, err := tc.StreamingCall(context.Background())
if err != nil { if err != nil {
grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
@ -309,12 +334,12 @@ func makeFuncStream(benchFeatures stats.Features) (func(int), func()) {
} }
return func(pos int) { return func(pos int) {
streamCaller(streams[pos], benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) streamCaller(streams[pos], bf.ReqSizeBytes, bf.RespSizeBytes)
}, cleanup }, cleanup
} }
func makeFuncUnconstrainedStreamPreloaded(benchFeatures stats.Features) (func(int), func(int), func()) { func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
streams, req, cleanup := setupUnconstrainedStream(benchFeatures) streams, req, cleanup := setupUnconstrainedStream(bf)
preparedMsg := make([]*grpc.PreparedMsg, len(streams)) preparedMsg := make([]*grpc.PreparedMsg, len(streams))
for i, stream := range streams { for i, stream := range streams {
@ -332,8 +357,8 @@ func makeFuncUnconstrainedStreamPreloaded(benchFeatures stats.Features) (func(in
}, cleanup }, cleanup
} }
func makeFuncUnconstrainedStream(benchFeatures stats.Features) (func(int), func(int), func()) { func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
streams, req, cleanup := setupUnconstrainedStream(benchFeatures) streams, req, cleanup := setupUnconstrainedStream(bf)
return func(pos int) { return func(pos int) {
streams[pos].Send(req) streams[pos].Send(req)
@ -342,11 +367,11 @@ func makeFuncUnconstrainedStream(benchFeatures stats.Features) (func(int), func(
}, cleanup }, cleanup
} }
func setupUnconstrainedStream(benchFeatures stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, func()) { func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
tc, cleanup := makeClient(benchFeatures) tc, cleanup := makeClient(bf)
streams := make([]testpb.BenchmarkService_StreamingCallClient, benchFeatures.MaxConcurrentCalls) streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.UnconstrainedStreamingCall(context.Background()) stream, err := tc.UnconstrainedStreamingCall(context.Background())
if err != nil { if err != nil {
grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err) grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err)
@ -354,16 +379,18 @@ func setupUnconstrainedStream(benchFeatures stats.Features) ([]testpb.BenchmarkS
streams[i] = stream streams[i] = stream
} }
pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, benchFeatures.ReqSizeBytes) pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
req := &testpb.SimpleRequest{ req := &testpb.SimpleRequest{
ResponseType: pl.Type, ResponseType: pl.Type,
ResponseSize: int32(benchFeatures.RespSizeBytes), ResponseSize: int32(bf.RespSizeBytes),
Payload: pl, Payload: pl,
} }
return streams, req, cleanup return streams, req, cleanup
} }
// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
// request and response sizes.
func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil { if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
grpclog.Fatalf("DoUnaryCall failed: %v", err) grpclog.Fatalf("DoUnaryCall failed: %v", err)
@ -376,41 +403,36 @@ func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, r
} }
} }
func runBenchmark(caller func(int), startTimer func(), stopTimer func(uint64), benchFeatures stats.Features, benchTime time.Duration, s *stats.Stats) uint64 { func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
// Warm up connection. // Warm up connection.
for i := 0; i < 10; i++ { for i := 0; i < warmupCallCount; i++ {
caller(0) caller(0)
} }
// Run benchmark. // Run benchmark.
startTimer() start(mode, bf)
var ( var wg sync.WaitGroup
mu sync.Mutex wg.Add(bf.MaxConcurrentCalls)
wg sync.WaitGroup bmEnd := time.Now().Add(bf.BenchTime)
)
wg.Add(benchFeatures.MaxConcurrentCalls)
bmEnd := time.Now().Add(benchTime)
var count uint64 var count uint64
for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { for i := 0; i < bf.MaxConcurrentCalls; i++ {
go func(pos int) { go func(pos int) {
defer wg.Done()
for { for {
t := time.Now() t := time.Now()
if t.After(bmEnd) { if t.After(bmEnd) {
break return
} }
start := time.Now() start := time.Now()
caller(pos) caller(pos)
elapse := time.Since(start) elapse := time.Since(start)
atomic.AddUint64(&count, 1) atomic.AddUint64(&count, 1)
mu.Lock() s.AddDuration(elapse)
s.Add(elapse)
mu.Unlock()
} }
wg.Done()
}(i) }(i)
} }
wg.Wait() wg.Wait()
stopTimer(count) stop(count)
return count
} }
// benchOpts represents all configurable options available while running this // benchOpts represents all configurable options available while running this
@ -424,6 +446,7 @@ type benchOpts struct {
networkMode string networkMode string
benchmarkResultFile string benchmarkResultFile string
useBufconn bool useBufconn bool
enableKeepalive bool
features *featureOpts features *featureOpts
} }
@ -432,39 +455,18 @@ type benchOpts struct {
// features through command line flags. We generate all possible combinations // features through command line flags. We generate all possible combinations
// for the provided values and run the benchmarks for each combination. // for the provided values and run the benchmarks for each combination.
type featureOpts struct { type featureOpts struct {
enableTrace []bool // Feature index 0 enableTrace []bool
readLatencies []time.Duration // Feature index 1 readLatencies []time.Duration
readKbps []int // Feature index 2 readKbps []int
readMTU []int // Feature index 3 readMTU []int
maxConcurrentCalls []int // Feature index 4 maxConcurrentCalls []int
reqSizeBytes []int // Feature index 5 reqSizeBytes []int
respSizeBytes []int // Feature index 6 respSizeBytes []int
compModes []string // Feature index 7 compModes []string
enableChannelz []bool // Feature index 8 enableChannelz []bool
enablePreloader []bool // Feature index 9 enablePreloader []bool
} }
// featureIndex is an enum for the different features that could be configured
// by the user through command line flags.
type featureIndex int
const (
enableTraceIndex featureIndex = iota
readLatenciesIndex
readKbpsIndex
readMTUIndex
maxConcurrentCallsIndex
reqSizeBytesIndex
respSizeBytesIndex
compModesIndex
enableChannelzIndex
enablePreloaderIndex
// This is a place holder to indicate the total number of feature indices we
// have. Any new feature indices should be added above this.
maxFeatureIndex
)
// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each // makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
// element of the slice (indexed by 'featuresIndex' enum) contains the number // element of the slice (indexed by 'featuresIndex' enum) contains the number
// of features to be exercised by the benchmark code. // of features to be exercised by the benchmark code.
@ -472,31 +474,31 @@ const (
// enableTrace feature, while index 1 contains the number of value of // enableTrace feature, while index 1 contains the number of value of
// readLatencies feature and so on. // readLatencies feature and so on.
func makeFeaturesNum(b *benchOpts) []int { func makeFeaturesNum(b *benchOpts) []int {
featuresNum := make([]int, maxFeatureIndex) featuresNum := make([]int, stats.MaxFeatureIndex)
for i := 0; i < len(featuresNum); i++ { for i := 0; i < len(featuresNum); i++ {
switch featureIndex(i) { switch stats.FeatureIndex(i) {
case enableTraceIndex: case stats.EnableTraceIndex:
featuresNum[i] = len(b.features.enableTrace) featuresNum[i] = len(b.features.enableTrace)
case readLatenciesIndex: case stats.ReadLatenciesIndex:
featuresNum[i] = len(b.features.readLatencies) featuresNum[i] = len(b.features.readLatencies)
case readKbpsIndex: case stats.ReadKbpsIndex:
featuresNum[i] = len(b.features.readKbps) featuresNum[i] = len(b.features.readKbps)
case readMTUIndex: case stats.ReadMTUIndex:
featuresNum[i] = len(b.features.readMTU) featuresNum[i] = len(b.features.readMTU)
case maxConcurrentCallsIndex: case stats.MaxConcurrentCallsIndex:
featuresNum[i] = len(b.features.maxConcurrentCalls) featuresNum[i] = len(b.features.maxConcurrentCalls)
case reqSizeBytesIndex: case stats.ReqSizeBytesIndex:
featuresNum[i] = len(b.features.reqSizeBytes) featuresNum[i] = len(b.features.reqSizeBytes)
case respSizeBytesIndex: case stats.RespSizeBytesIndex:
featuresNum[i] = len(b.features.respSizeBytes) featuresNum[i] = len(b.features.respSizeBytes)
case compModesIndex: case stats.CompModesIndex:
featuresNum[i] = len(b.features.compModes) featuresNum[i] = len(b.features.compModes)
case enableChannelzIndex: case stats.EnableChannelzIndex:
featuresNum[i] = len(b.features.enableChannelz) featuresNum[i] = len(b.features.enableChannelz)
case enablePreloaderIndex: case stats.EnablePreloaderIndex:
featuresNum[i] = len(b.features.enablePreloader) featuresNum[i] = len(b.features.enablePreloader)
default: default:
log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, maxFeatureIndex) log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
} }
} }
return featuresNum return featuresNum
@ -537,26 +539,28 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
// all options. // all options.
var result []stats.Features var result []stats.Features
var curPos []int var curPos []int
initialPos := make([]int, maxFeatureIndex) initialPos := make([]int, stats.MaxFeatureIndex)
for !reflect.DeepEqual(initialPos, curPos) { for !reflect.DeepEqual(initialPos, curPos) {
if curPos == nil { if curPos == nil {
curPos = make([]int, maxFeatureIndex) curPos = make([]int, stats.MaxFeatureIndex)
} }
result = append(result, stats.Features{ result = append(result, stats.Features{
// These features stay the same for each iteration. // These features stay the same for each iteration.
NetworkMode: b.networkMode, NetworkMode: b.networkMode,
UseBufConn: b.useBufconn, UseBufConn: b.useBufconn,
EnableKeepalive: b.enableKeepalive,
BenchTime: b.benchTime,
// These features can potentially change for each iteration. // These features can potentially change for each iteration.
EnableTrace: b.features.enableTrace[curPos[enableTraceIndex]], EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]],
Latency: b.features.readLatencies[curPos[readLatenciesIndex]], Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
Kbps: b.features.readKbps[curPos[readKbpsIndex]], Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]],
Mtu: b.features.readMTU[curPos[readMTUIndex]], MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]],
MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[maxConcurrentCallsIndex]], MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
ReqSizeBytes: b.features.reqSizeBytes[curPos[reqSizeBytesIndex]], ReqSizeBytes: b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]],
RespSizeBytes: b.features.respSizeBytes[curPos[respSizeBytesIndex]], RespSizeBytes: b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]],
ModeCompressor: b.features.compModes[curPos[compModesIndex]], ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]],
EnableChannelz: b.features.enableChannelz[curPos[enableChannelzIndex]], EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
EnablePreloader: b.features.enablePreloader[curPos[enablePreloaderIndex]], EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
}) })
addOne(curPos, featuresNum) addOne(curPos, featuresNum)
} }
@ -597,6 +601,7 @@ func processFlags() *benchOpts {
networkMode: *networkMode, networkMode: *networkMode,
benchmarkResultFile: *benchmarkResultFile, benchmarkResultFile: *benchmarkResultFile,
useBufconn: *useBufconn, useBufconn: *useBufconn,
enableKeepalive: *enableKeepalive,
features: &featureOpts{ features: &featureOpts{
enableTrace: setToggleMode(*traceMode), enableTrace: setToggleMode(*traceMode),
readLatencies: append([]time.Duration(nil), *readLatency...), readLatencies: append([]time.Duration(nil), *readLatency...),
@ -648,76 +653,36 @@ func setCompressorMode(val string) []string {
} }
} }
func printThroughput(requestCount uint64, requestSize int, responseCount uint64, responseSize int, benchTime time.Duration) {
requestThroughput := float64(requestCount) * float64(requestSize) * 8 / benchTime.Seconds()
responseThroughput := float64(responseCount) * float64(responseSize) * 8 / benchTime.Seconds()
fmt.Printf("Number of requests: %v\tRequest throughput: %v bit/s\n", requestCount, requestThroughput)
fmt.Printf("Number of responses: %v\tResponse throughput: %v bit/s\n", responseCount, responseThroughput)
fmt.Println()
}
func main() { func main() {
opts := processFlags() opts := processFlags()
before(opts) before(opts)
s := stats.NewStats(numStatsBuckets)
s.SortLatency()
var memStats runtime.MemStats
var results testing.BenchmarkResult
var startAllocs, startBytes uint64
var startTime time.Time
var startTimer = func() {
runtime.ReadMemStats(&memStats)
startAllocs = memStats.Mallocs
startBytes = memStats.TotalAlloc
startTime = time.Now()
}
var stopTimer = func(count uint64) {
runtime.ReadMemStats(&memStats)
results = testing.BenchmarkResult{
N: int(count),
T: time.Since(startTime),
Bytes: 0,
MemAllocs: memStats.Mallocs - startAllocs,
MemBytes: memStats.TotalAlloc - startBytes,
}
}
// Run benchmarks s := stats.NewStats(numStatsBuckets)
resultSlice := []stats.BenchResults{}
featuresNum := makeFeaturesNum(opts) featuresNum := makeFeaturesNum(opts)
sharedPos := sharedFeatures(featuresNum) sf := sharedFeatures(featuresNum)
for _, benchFeature := range opts.generateFeatures(featuresNum) {
grpc.EnableTracing = benchFeature.EnableTrace var (
if benchFeature.EnableChannelz { start = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
stop = func(count uint64) { s.EndRun(count) }
ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
)
for _, bf := range opts.generateFeatures(featuresNum) {
grpc.EnableTracing = bf.EnableTrace
if bf.EnableChannelz {
channelz.TurnOn() channelz.TurnOn()
} }
if opts.rModes.unary { if opts.rModes.unary {
count := unaryBenchmark(startTimer, stopTimer, benchFeature, opts.benchTime, s) unaryBenchmark(start, stop, bf, s)
s.SetBenchmarkResult("Unary", benchFeature, results.N,
results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos)
fmt.Println(s.BenchString())
fmt.Println(s.String())
printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes, opts.benchTime)
resultSlice = append(resultSlice, s.GetBenchmarkResults())
s.Clear()
} }
if opts.rModes.streaming { if opts.rModes.streaming {
count := streamBenchmark(startTimer, stopTimer, benchFeature, opts.benchTime, s) streamBenchmark(start, stop, bf, s)
s.SetBenchmarkResult("Stream", benchFeature, results.N,
results.AllocedBytesPerOp(), results.AllocsPerOp(), sharedPos)
fmt.Println(s.BenchString())
fmt.Println(s.String())
printThroughput(count, benchFeature.ReqSizeBytes, count, benchFeature.RespSizeBytes, opts.benchTime)
resultSlice = append(resultSlice, s.GetBenchmarkResults())
s.Clear()
} }
if opts.rModes.unconstrained { if opts.rModes.unconstrained {
requestCount, responseCount := unconstrainedStreamBenchmark(benchFeature, time.Second, opts.benchTime) unconstrainedStreamBenchmark(start, ucStop, bf, s)
fmt.Printf("Unconstrained Stream-%v\n", benchFeature)
printThroughput(requestCount, benchFeature.ReqSizeBytes, responseCount, benchFeature.RespSizeBytes, opts.benchTime)
} }
} }
after(opts, resultSlice) after(opts, s.GetResults())
} }
func before(opts *benchOpts) { func before(opts *benchOpts) {

View File

@ -32,14 +32,13 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"strconv"
"strings" "strings"
"time" "time"
"google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/benchmark/stats"
) )
func createMap(fileName string, m map[string]stats.BenchResults) { func createMap(fileName string) map[string]stats.BenchResults {
f, err := os.Open(fileName) f, err := os.Open(fileName)
if err != nil { if err != nil {
log.Fatalf("Read file %s error: %s\n", fileName, err) log.Fatalf("Read file %s error: %s\n", fileName, err)
@ -50,18 +49,22 @@ func createMap(fileName string, m map[string]stats.BenchResults) {
if err = decoder.Decode(&data); err != nil { if err = decoder.Decode(&data); err != nil {
log.Fatalf("Decode file %s error: %s\n", fileName, err) log.Fatalf("Decode file %s error: %s\n", fileName, err)
} }
m := make(map[string]stats.BenchResults)
for _, d := range data { for _, d := range data {
m[d.RunMode+"-"+d.Features.String()] = d m[d.RunMode+"-"+d.Features.String()] = d
} }
return m
} }
func intChange(title string, val1, val2 int64) string { func intChange(title string, val1, val2 uint64) string {
return fmt.Sprintf("%10s %12s %12s %8.2f%%\n", title, strconv.FormatInt(val1, 10), return fmt.Sprintf("%20s %12d %12d %8.2f%%\n", title, val1, val2, float64(int64(val2)-int64(val1))*100/float64(val1))
strconv.FormatInt(val2, 10), float64(val2-val1)*100/float64(val1))
} }
func timeChange(title int, val1, val2 time.Duration) string { func floatChange(title string, val1, val2 float64) string {
return fmt.Sprintf("%10s %12s %12s %8.2f%%\n", strconv.Itoa(title)+" latency", val1.String(), return fmt.Sprintf("%20s %12.2f %12.2f %8.2f%%\n", title, val1, val2, float64(int64(val2)-int64(val1))*100/float64(val1))
}
func timeChange(title string, val1, val2 time.Duration) string {
return fmt.Sprintf("%20s %12s %12s %8.2f%%\n", title, val1.String(),
val2.String(), float64(val2-val1)*100/float64(val1)) val2.String(), float64(val2-val1)*100/float64(val1))
} }
@ -69,30 +72,30 @@ func compareTwoMap(m1, m2 map[string]stats.BenchResults) {
for k2, v2 := range m2 { for k2, v2 := range m2 {
if v1, ok := m1[k2]; ok { if v1, ok := m1[k2]; ok {
changes := k2 + "\n" changes := k2 + "\n"
changes += fmt.Sprintf("%10s %12s %12s %8s\n", "Title", "Before", "After", "Percentage") changes += fmt.Sprintf("%20s %12s %12s %8s\n", "Title", "Before", "After", "Percentage")
changes += intChange("Bytes/op", v1.AllocedBytesPerOp, v2.AllocedBytesPerOp) changes += intChange("TotalOps", v1.Data.TotalOps, v2.Data.TotalOps)
changes += intChange("Allocs/op", v1.AllocsPerOp, v2.AllocsPerOp) changes += intChange("SendOps", v1.Data.SendOps, v2.Data.SendOps)
changes += timeChange(v1.Latency[1].Percent, v1.Latency[1].Value, v2.Latency[1].Value) changes += intChange("RecvOps", v1.Data.RecvOps, v2.Data.RecvOps)
changes += timeChange(v1.Latency[2].Percent, v1.Latency[2].Value, v2.Latency[2].Value) changes += intChange("Bytes/op", v1.Data.AllocedBytes, v2.Data.AllocedBytes)
changes += intChange("Allocs/op", v1.Data.Allocs, v2.Data.Allocs)
changes += floatChange("ReqT/op", v1.Data.ReqT, v2.Data.ReqT)
changes += floatChange("RespT/op", v1.Data.RespT, v2.Data.RespT)
changes += timeChange("50th-Lat", v1.Data.Fiftieth, v2.Data.Fiftieth)
changes += timeChange("90th-Lat", v1.Data.Ninetieth, v2.Data.Ninetieth)
changes += timeChange("99th-Lat", v1.Data.NinetyNinth, v2.Data.NinetyNinth)
changes += timeChange("Avg-Lat", v1.Data.Average, v2.Data.Average)
fmt.Printf("%s\n", changes) fmt.Printf("%s\n", changes)
} }
} }
} }
func compareBenchmark(file1, file2 string) { func compareBenchmark(file1, file2 string) {
var BenchValueFile1 map[string]stats.BenchResults compareTwoMap(createMap(file1), createMap(file2))
var BenchValueFile2 map[string]stats.BenchResults
BenchValueFile1 = make(map[string]stats.BenchResults)
BenchValueFile2 = make(map[string]stats.BenchResults)
createMap(file1, BenchValueFile1)
createMap(file2, BenchValueFile2)
compareTwoMap(BenchValueFile1, BenchValueFile2)
} }
func printline(benchName, ltc50, ltc90, allocByte, allocsOp interface{}) { func printline(benchName, total, send, recv, allocB, allocN, reqT, respT, ltc50, ltc90, l99, lAvg interface{}) {
fmt.Printf("%-80v%12v%12v%12v%12v\n", benchName, ltc50, ltc90, allocByte, allocsOp) fmt.Printf("%-80v%12v%12v%12v%12v%12v%18v%18v%12v%12v%12v%12v\n",
benchName, total, send, recv, allocB, allocN, reqT, respT, ltc50, ltc90, l99, lAvg)
} }
func formatBenchmark(fileName string) { func formatBenchmark(fileName string) {
@ -101,26 +104,30 @@ func formatBenchmark(fileName string) {
log.Fatalf("Read file %s error: %s\n", fileName, err) log.Fatalf("Read file %s error: %s\n", fileName, err)
} }
defer f.Close() defer f.Close()
var data []stats.BenchResults var results []stats.BenchResults
decoder := gob.NewDecoder(f) decoder := gob.NewDecoder(f)
if err = decoder.Decode(&data); err != nil { if err = decoder.Decode(&results); err != nil {
log.Fatalf("Decode file %s error: %s\n", fileName, err) log.Fatalf("Decode file %s error: %s\n", fileName, err)
} }
if len(data) == 0 { if len(results) == 0 {
log.Fatalf("No data in file %s\n", fileName) log.Fatalf("No benchmark results in file %s\n", fileName)
} }
printPos := data[0].SharedPosion
fmt.Println("\nShared features:\n" + strings.Repeat("-", 20)) fmt.Println("\nShared features:\n" + strings.Repeat("-", 20))
fmt.Print(stats.PartialPrintString(printPos, data[0].Features, true)) fmt.Print(results[0].Features.SharedFeatures(results[0].SharedFeatures))
fmt.Println(strings.Repeat("-", 35)) fmt.Println(strings.Repeat("-", 35))
for i := 0; i < len(data[0].SharedPosion); i++ {
printPos[i] = !printPos[i] wantFeatures := results[0].SharedFeatures
for i := 0; i < len(results[0].SharedFeatures); i++ {
wantFeatures[i] = !wantFeatures[i]
} }
printline("Name", "latency-50", "latency-90", "Alloc (B)", "Alloc (#)")
for _, d := range data { printline("Name", "TotalOps", "SendOps", "RecvOps", "Alloc (B)", "Alloc (#)",
name := d.RunMode + stats.PartialPrintString(printPos, d.Features, false) "RequestT", "ResponseT", "L-50", "L-90", "L-99", "L-Avg")
printline(name, d.Latency[1].Value.String(), d.Latency[2].Value.String(), for _, r := range results {
d.AllocedBytesPerOp, d.AllocsPerOp) d := r.Data
printline(r.RunMode+r.Features.PrintableName(wantFeatures), d.TotalOps, d.SendOps, d.RecvOps,
d.AllocedBytes, d.Allocs, d.ReqT, d.RespT, d.Fiftieth, d.Ninetieth, d.NinetyNinth, d.Average)
} }
} }

View File

@ -16,293 +16,401 @@
* *
*/ */
// Package stats registers stats used for creating benchmarks // Package stats tracks the statistics associated with benchmark runs.
package stats package stats
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"io" "log"
"math" "math"
"runtime"
"sort" "sort"
"strconv" "strconv"
"sync"
"time" "time"
) )
// Features contains most fields for a benchmark // FeatureIndex is an enum for features that usually differ across individual
// benchmark runs in a single execution. These are usually configured by the
// user through command line flags.
type FeatureIndex int
// FeatureIndex enum values corresponding to individually settable features.
const (
EnableTraceIndex FeatureIndex = iota
ReadLatenciesIndex
ReadKbpsIndex
ReadMTUIndex
MaxConcurrentCallsIndex
ReqSizeBytesIndex
RespSizeBytesIndex
CompModesIndex
EnableChannelzIndex
EnablePreloaderIndex
// MaxFeatureIndex is a place holder to indicate the total number of feature
// indices we have. Any new feature indices should be added above this.
MaxFeatureIndex
)
// Features represent configured options for a specific benchmark run. This is
// usually constructed from command line arguments passed by the caller. See
// benchmark/benchmain/main.go for defined command line flags. This is also
// part of the BenchResults struct which is serialized and written to a file.
type Features struct { type Features struct {
NetworkMode string // Network mode used for this benchmark run. Could be one of Local, LAN, WAN
UseBufConn bool // or Longhaul.
EnableTrace bool NetworkMode string
Latency time.Duration // UseBufCon indicates whether an in-memory connection was used for this
Kbps int // benchmark run instead of system network I/O.
Mtu int UseBufConn bool
// EnableKeepalive indicates if keepalives were enabled on the connections
// used in this benchmark run.
EnableKeepalive bool
// BenchTime indicates the duration of the benchmark run.
BenchTime time.Duration
// Features defined above are usually the same for all benchmark runs in a
// particular invocation, while the features defined below could vary from
// run to run based on the configured command line. These features have a
// corresponding featureIndex value which is used for a variety of reasons.
// EnableTrace indicates if tracing was enabled.
EnableTrace bool
// Latency is the simulated one-way network latency used.
Latency time.Duration
// Kbps is the simulated network throughput used.
Kbps int
// MTU is the simulated network MTU used.
MTU int
// MaxConcurrentCalls is the number of concurrent RPCs made during this
// benchmark run.
MaxConcurrentCalls int MaxConcurrentCalls int
ReqSizeBytes int // ReqSizeBytes is the request size in bytes used in this benchmark run.
RespSizeBytes int ReqSizeBytes int
ModeCompressor string // RespSizeBytes is the response size in bytes used in this benchmark run.
EnableChannelz bool RespSizeBytes int
EnablePreloader bool // ModeCompressor represents the compressor mode used.
ModeCompressor string
// EnableChannelz indicates if channelz was turned on.
EnableChannelz bool
// EnablePreloader indicates if preloading was turned on.
EnablePreloader bool
} }
// String returns the textual output of the Features as string. // String returns all the feature values as a string.
func (f Features) String() string { func (f Features) String() string {
return fmt.Sprintf("traceMode_%t-latency_%s-kbps_%#v-MTU_%#v-maxConcurrentCalls_"+ return fmt.Sprintf("networkMode_%v-bufConn_%v-keepalive_%v-benchTime_%v-"+
"%#v-reqSize_%#vB-respSize_%#vB-Compressor_%s-Preloader_%t", f.EnableTrace, "trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-"+
f.Latency.String(), f.Kbps, f.Mtu, f.MaxConcurrentCalls, f.ReqSizeBytes, f.RespSizeBytes, f.ModeCompressor, f.EnablePreloader) "reqSize_%vB-respSize_%vB-compressor_%v-channelz_%v-preloader_%v",
f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime,
f.EnableTrace, f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls,
f.ReqSizeBytes, f.RespSizeBytes, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader)
} }
// ConciseString returns the concise textual output of the Features as string, skipping // SharedFeatures returns the shared features as a pretty printable string.
// setting with default value. // 'wantFeatures' is a bitmask of wanted features, indexed by FeaturesIndex.
func (f Features) ConciseString() string { func (f Features) SharedFeatures(wantFeatures []bool) string {
noneEmptyPos := []bool{f.EnableTrace, f.Latency != 0, f.Kbps != 0, f.Mtu != 0, true, true, true, f.ModeCompressor != "off", f.EnableChannelz, f.EnablePreloader} var b bytes.Buffer
return PartialPrintString(noneEmptyPos, f, false) if f.NetworkMode != "" {
b.WriteString(fmt.Sprintf("Network: %v\n", f.NetworkMode))
}
if f.UseBufConn {
b.WriteString(fmt.Sprintf("UseBufConn: %v\n", f.UseBufConn))
}
if f.EnableKeepalive {
b.WriteString(fmt.Sprintf("EnableKeepalive: %v\n", f.EnableKeepalive))
}
b.WriteString(fmt.Sprintf("BenchTime: %v\n", f.BenchTime))
f.partialString(&b, wantFeatures, ": ", "\n")
return b.String()
} }
// PartialPrintString can print certain features with different format. // PrintableName returns a one line name which includes the features specified
func PartialPrintString(noneEmptyPos []bool, f Features, shared bool) string { // by 'wantFeatures' which is a bitmask of wanted features, indexed by
s := "" // FeaturesIndex.
var ( func (f Features) PrintableName(wantFeatures []bool) string {
prefix, suffix, linker string var b bytes.Buffer
isNetwork bool f.partialString(&b, wantFeatures, "_", "-")
) return b.String()
if shared { }
suffix = "\n"
linker = ": " // partialString writes features specified by 'wantFeatures' to the provided
} else { // bytes.Buffer.
prefix = "-" func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim string) {
linker = "_" for i, sf := range wantFeatures {
} if sf {
if noneEmptyPos[0] { switch FeatureIndex(i) {
s += fmt.Sprintf("%sTrace%s%t%s", prefix, linker, f.EnableTrace, suffix) case EnableTraceIndex:
} b.WriteString(fmt.Sprintf("Trace%v%v%v", sep, f.EnableTrace, delim))
if shared && f.NetworkMode != "" { case ReadLatenciesIndex:
s += fmt.Sprintf("Network: %s \n", f.NetworkMode) b.WriteString(fmt.Sprintf("Latency%v%v%v", sep, f.Latency, delim))
isNetwork = true case ReadKbpsIndex:
} b.WriteString(fmt.Sprintf("Kbps%v%v%v", sep, f.Kbps, delim))
if !isNetwork { case ReadMTUIndex:
if noneEmptyPos[1] { b.WriteString(fmt.Sprintf("MTU%v%v%v", sep, f.MTU, delim))
s += fmt.Sprintf("%slatency%s%s%s", prefix, linker, f.Latency.String(), suffix) case MaxConcurrentCallsIndex:
} b.WriteString(fmt.Sprintf("Callers%v%v%v", sep, f.MaxConcurrentCalls, delim))
if noneEmptyPos[2] { case ReqSizeBytesIndex:
s += fmt.Sprintf("%skbps%s%#v%s", prefix, linker, f.Kbps, suffix) b.WriteString(fmt.Sprintf("ReqSize%v%vB%v", sep, f.ReqSizeBytes, delim))
} case RespSizeBytesIndex:
if noneEmptyPos[3] { b.WriteString(fmt.Sprintf("RespSize%v%vB%v", sep, f.RespSizeBytes, delim))
s += fmt.Sprintf("%sMTU%s%#v%s", prefix, linker, f.Mtu, suffix) case CompModesIndex:
b.WriteString(fmt.Sprintf("Compressor%v%v%v", sep, f.ModeCompressor, delim))
case EnableChannelzIndex:
b.WriteString(fmt.Sprintf("Channelz%v%v%v", sep, f.EnableChannelz, delim))
case EnablePreloaderIndex:
b.WriteString(fmt.Sprintf("Preloader%v%v%v", sep, f.EnablePreloader, delim))
default:
log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex)
}
} }
} }
if noneEmptyPos[4] {
s += fmt.Sprintf("%sCallers%s%#v%s", prefix, linker, f.MaxConcurrentCalls, suffix)
}
if noneEmptyPos[5] {
s += fmt.Sprintf("%sreqSize%s%#vB%s", prefix, linker, f.ReqSizeBytes, suffix)
}
if noneEmptyPos[6] {
s += fmt.Sprintf("%srespSize%s%#vB%s", prefix, linker, f.RespSizeBytes, suffix)
}
if noneEmptyPos[7] {
s += fmt.Sprintf("%sCompressor%s%s%s", prefix, linker, f.ModeCompressor, suffix)
}
if noneEmptyPos[8] {
s += fmt.Sprintf("%sChannelz%s%t%s", prefix, linker, f.EnableChannelz, suffix)
}
if noneEmptyPos[9] {
s += fmt.Sprintf("%sPreloader%s%t%s", prefix, linker, f.EnablePreloader, suffix)
}
return s
} }
type percentLatency struct { // BenchResults records features and results of a benchmark run. A collection
Percent int // of these structs is usually serialized and written to a file after a
Value time.Duration // benchmark execution, and could later be read for pretty-printing or
} // comparison with other benchmark results.
// BenchResults records features and result of a benchmark.
type BenchResults struct { type BenchResults struct {
RunMode string // RunMode is the workload mode for this benchmark run. This could be unary,
Features Features // stream or unconstrained.
Latency []percentLatency RunMode string
Operations int // Features represents the configured feature options for this run.
NsPerOp int64 Features Features
AllocedBytesPerOp int64 // SharedFeatures represents the features which were shared across all
AllocsPerOp int64 // benchmark runs during one execution. It is a slice indexed by
SharedPosion []bool // 'FeaturesIndex' and a value of true indicates that the associated
// feature is shared across all runs.
SharedFeatures []bool
// Data contains the statistical data of interest from the benchmark run.
Data RunData
} }
// SetBenchmarkResult sets features of benchmark and basic results. // RunData contains statistical data of interest from a benchmark run.
func (stats *Stats) SetBenchmarkResult(mode string, features Features, o int, allocdBytes, allocs int64, sharedPos []bool) { type RunData struct {
stats.result.RunMode = mode // TotalOps is the number of operations executed during this benchmark run.
stats.result.Features = features // Only makes sense for unary and streaming workloads.
stats.result.Operations = o TotalOps uint64
stats.result.AllocedBytesPerOp = allocdBytes // SendOps is the number of send operations executed during this benchmark
stats.result.AllocsPerOp = allocs // run. Only makes sense for unconstrained workloads.
stats.result.SharedPosion = sharedPos SendOps uint64
} // RecvOps is the number of receive operations executed during this benchmark
// run. Only makes sense for unconstrained workloads.
RecvOps uint64
// AllocedBytes is the average memory allocation in bytes per operation.
AllocedBytes uint64
// Allocs is the average number of memory allocations per operation.
Allocs uint64
// ReqT is the average request throughput associated with this run.
ReqT float64
// RespT is the average response throughput associated with this run.
RespT float64
// GetBenchmarkResults returns the result of the benchmark including features and result. // We store different latencies associated with each run. These latencies are
func (stats *Stats) GetBenchmarkResults() BenchResults { // only computed for unary and stream workloads as they are not very useful
return stats.result // for unconstrained workloads.
}
// BenchString output latency stats as the format as time + unit. // Fiftieth is the 50th percentile latency.
func (stats *Stats) BenchString() string { Fiftieth time.Duration
stats.maybeUpdate() // Ninetieth is the 90th percentile latency.
s := stats.result Ninetieth time.Duration
res := s.RunMode + "-" + s.Features.String() + ": \n" // Ninetyninth is the 99th percentile latency.
if len(s.Latency) != 0 { NinetyNinth time.Duration
var statsUnit = s.Latency[0].Value // Average is the average latency.
var timeUnit = fmt.Sprintf("%v", statsUnit)[1:] Average time.Duration
for i := 1; i < len(s.Latency)-1; i++ {
res += fmt.Sprintf("%d_Latency: %s %s \t", s.Latency[i].Percent,
strconv.FormatFloat(float64(s.Latency[i].Value)/float64(statsUnit), 'f', 4, 64), timeUnit)
}
res += fmt.Sprintf("Avg latency: %s %s \t",
strconv.FormatFloat(float64(s.Latency[len(s.Latency)-1].Value)/float64(statsUnit), 'f', 4, 64), timeUnit)
}
res += fmt.Sprintf("Count: %v \t", s.Operations)
res += fmt.Sprintf("%v Bytes/op\t", s.AllocedBytesPerOp)
res += fmt.Sprintf("%v Allocs/op\t", s.AllocsPerOp)
return res
}
// Stats is a simple helper for gathering additional statistics like histogram
// during benchmarks. This is not thread safe.
type Stats struct {
numBuckets int
unit time.Duration
min, max int64
histogram *Histogram
durations durationSlice
dirty bool
sortLatency bool
result BenchResults
} }
type durationSlice []time.Duration type durationSlice []time.Duration
// NewStats creates a new Stats instance. If numBuckets is not positive, func (a durationSlice) Len() int { return len(a) }
// the default value (16) will be used. func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a durationSlice) Less(i, j int) bool { return a[i] < a[j] }
// Stats is a helper for gathering statistics about individual benchmark runs.
type Stats struct {
mu sync.Mutex
numBuckets int
hw *histWrapper
results []BenchResults
startMS runtime.MemStats
stopMS runtime.MemStats
}
type histWrapper struct {
unit time.Duration
histogram *Histogram
durations durationSlice
}
// NewStats creates a new Stats instance. If numBuckets is not positive, the
// default value (16) will be used.
func NewStats(numBuckets int) *Stats { func NewStats(numBuckets int) *Stats {
if numBuckets <= 0 { if numBuckets <= 0 {
numBuckets = 16 numBuckets = 16
} }
return &Stats{ // Use one more bucket for the last unbounded bucket.
// Use one more bucket for the last unbounded bucket. s := &Stats{numBuckets: numBuckets + 1}
numBuckets: numBuckets + 1, s.hw = &histWrapper{}
durations: make(durationSlice, 0, 100000), return s
}
// StartRun is to be invoked to indicate the start of a new benchmark run.
func (s *Stats) StartRun(mode string, f Features, sf []bool) {
s.mu.Lock()
defer s.mu.Unlock()
runtime.ReadMemStats(&s.startMS)
s.results = append(s.results, BenchResults{RunMode: mode, Features: f, SharedFeatures: sf})
}
// EndRun is to be invoked to indicate the end of the ongoing benchmark run. It
// computes a bunch of stats and dumps them to stdout.
func (s *Stats) EndRun(count uint64) {
s.mu.Lock()
defer s.mu.Unlock()
runtime.ReadMemStats(&s.stopMS)
r := &s.results[len(s.results)-1]
r.Data = RunData{
TotalOps: count,
AllocedBytes: s.stopMS.TotalAlloc - s.startMS.TotalAlloc,
Allocs: s.stopMS.Mallocs - s.startMS.Mallocs,
ReqT: float64(count) * float64(r.Features.ReqSizeBytes) * 8 / r.Features.BenchTime.Seconds(),
RespT: float64(count) * float64(r.Features.RespSizeBytes) * 8 / r.Features.BenchTime.Seconds(),
} }
s.computeLatencies(r)
s.dump(r)
s.hw = &histWrapper{}
} }
// Add adds an elapsed time per operation to the stats. // EndUnconstrainedRun is similar to EndRun, but is to be used for
func (stats *Stats) Add(d time.Duration) { // unconstrained workloads.
stats.durations = append(stats.durations, d) func (s *Stats) EndUnconstrainedRun(req uint64, resp uint64) {
stats.dirty = true s.mu.Lock()
defer s.mu.Unlock()
runtime.ReadMemStats(&s.stopMS)
r := &s.results[len(s.results)-1]
r.Data = RunData{
SendOps: req,
RecvOps: resp,
AllocedBytes: (s.stopMS.TotalAlloc - s.startMS.TotalAlloc) / ((req + resp) / 2),
Allocs: (s.stopMS.Mallocs - s.startMS.Mallocs) / ((req + resp) / 2),
ReqT: float64(req) * float64(r.Features.ReqSizeBytes) * 8 / r.Features.BenchTime.Seconds(),
RespT: float64(resp) * float64(r.Features.RespSizeBytes) * 8 / r.Features.BenchTime.Seconds(),
}
s.computeLatencies(r)
s.dump(r)
s.hw = &histWrapper{}
} }
// Clear resets the stats, removing all values. // AddDuration adds an elapsed duration per operation to the stats. This is
func (stats *Stats) Clear() { // used by unary and stream modes where request and response stats are equal.
stats.durations = stats.durations[:0] func (s *Stats) AddDuration(d time.Duration) {
stats.histogram = nil s.mu.Lock()
stats.dirty = false defer s.mu.Unlock()
stats.result = BenchResults{}
s.hw.durations = append(s.hw.durations, d)
}
// GetResults returns the results from all benchmark runs.
func (s *Stats) GetResults() []BenchResults {
s.mu.Lock()
defer s.mu.Unlock()
return s.results
}
// computeLatencies computes percentile latencies based on durations stored in
// the stats object and updates the corresponding fields in the result object.
func (s *Stats) computeLatencies(result *BenchResults) {
if len(s.hw.durations) == 0 {
return
}
sort.Sort(s.hw.durations)
minDuration := int64(s.hw.durations[0])
maxDuration := int64(s.hw.durations[len(s.hw.durations)-1])
// Use the largest unit that can represent the minimum time duration.
s.hw.unit = time.Nanosecond
for _, u := range []time.Duration{time.Microsecond, time.Millisecond, time.Second} {
if minDuration <= int64(u) {
break
}
s.hw.unit = u
}
numBuckets := s.numBuckets
if n := int(maxDuration - minDuration + 1); n < numBuckets {
numBuckets = n
}
s.hw.histogram = NewHistogram(HistogramOptions{
NumBuckets: numBuckets,
// max-min(lower bound of last bucket) = (1 + growthFactor)^(numBuckets-2) * baseBucketSize.
GrowthFactor: math.Pow(float64(maxDuration-minDuration), 1/float64(numBuckets-2)) - 1,
BaseBucketSize: 1.0,
MinValue: minDuration,
})
for _, d := range s.hw.durations {
s.hw.histogram.Add(int64(d))
}
result.Data.Fiftieth = s.hw.durations[max(s.hw.histogram.Count*int64(50)/100-1, 0)]
result.Data.Ninetieth = s.hw.durations[max(s.hw.histogram.Count*int64(90)/100-1, 0)]
result.Data.NinetyNinth = s.hw.durations[max(s.hw.histogram.Count*int64(99)/100-1, 0)]
result.Data.Average = time.Duration(float64(s.hw.histogram.Sum) / float64(s.hw.histogram.Count))
}
// dump returns a printable version.
func (s *Stats) dump(result *BenchResults) {
var b bytes.Buffer
// This prints the run mode and all features of the bench on a line.
b.WriteString(fmt.Sprintf("%s-%s:\n", result.RunMode, result.Features.String()))
unit := s.hw.unit
tUnit := fmt.Sprintf("%v", unit)[1:] // stores one of s, ms, μs, ns
if l := result.Data.Fiftieth; l != 0 {
b.WriteString(fmt.Sprintf("50_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit))
}
if l := result.Data.Ninetieth; l != 0 {
b.WriteString(fmt.Sprintf("90_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit))
}
if l := result.Data.NinetyNinth; l != 0 {
b.WriteString(fmt.Sprintf("99_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit))
}
if l := result.Data.Average; l != 0 {
b.WriteString(fmt.Sprintf("Avg_Latency: %s%s\t", strconv.FormatFloat(float64(l)/float64(unit), 'f', 4, 64), tUnit))
}
b.WriteString(fmt.Sprintf("Bytes/op: %v\t", result.Data.AllocedBytes))
b.WriteString(fmt.Sprintf("Allocs/op: %v\t\n", result.Data.Allocs))
// This prints the histogram stats for the latency.
if s.hw.histogram == nil {
b.WriteString("Histogram (empty)\n")
} else {
b.WriteString(fmt.Sprintf("Histogram (unit: %s)\n", tUnit))
s.hw.histogram.PrintWithUnit(&b, float64(unit))
}
// Print throughput data.
req := result.Data.SendOps
if req == 0 {
req = result.Data.TotalOps
}
resp := result.Data.RecvOps
if resp == 0 {
resp = result.Data.TotalOps
}
b.WriteString(fmt.Sprintf("Number of requests: %v\tRequest throughput: %v bit/s\n", req, result.Data.ReqT))
b.WriteString(fmt.Sprintf("Number of responses: %v\tResponse throughput: %v bit/s\n", resp, result.Data.RespT))
fmt.Println(b.String())
} }
//Sort method for durations
func (a durationSlice) Len() int { return len(a) }
func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a durationSlice) Less(i, j int) bool { return a[i] < a[j] }
func max(a, b int64) int64 { func max(a, b int64) int64 {
if a > b { if a > b {
return a return a
} }
return b return b
} }
// maybeUpdate updates internal stat data if there was any newly added
// stats since this was updated.
func (stats *Stats) maybeUpdate() {
if !stats.dirty {
return
}
if stats.sortLatency {
sort.Sort(stats.durations)
stats.min = int64(stats.durations[0])
stats.max = int64(stats.durations[len(stats.durations)-1])
}
stats.min = math.MaxInt64
stats.max = 0
for _, d := range stats.durations {
if stats.min > int64(d) {
stats.min = int64(d)
}
if stats.max < int64(d) {
stats.max = int64(d)
}
}
// Use the largest unit that can represent the minimum time duration.
stats.unit = time.Nanosecond
for _, u := range []time.Duration{time.Microsecond, time.Millisecond, time.Second} {
if stats.min <= int64(u) {
break
}
stats.unit = u
}
numBuckets := stats.numBuckets
if n := int(stats.max - stats.min + 1); n < numBuckets {
numBuckets = n
}
stats.histogram = NewHistogram(HistogramOptions{
NumBuckets: numBuckets,
// max-min(lower bound of last bucket) = (1 + growthFactor)^(numBuckets-2) * baseBucketSize.
GrowthFactor: math.Pow(float64(stats.max-stats.min), 1/float64(numBuckets-2)) - 1,
BaseBucketSize: 1.0,
MinValue: stats.min})
for _, d := range stats.durations {
stats.histogram.Add(int64(d))
}
stats.dirty = false
if stats.durations.Len() != 0 {
var percentToObserve = []int{50, 90, 99}
// First data record min unit from the latency result.
stats.result.Latency = append(stats.result.Latency, percentLatency{Percent: -1, Value: stats.unit})
for _, position := range percentToObserve {
stats.result.Latency = append(stats.result.Latency, percentLatency{Percent: position, Value: stats.durations[max(stats.histogram.Count*int64(position)/100-1, 0)]})
}
// Last data record the average latency.
avg := float64(stats.histogram.Sum) / float64(stats.histogram.Count)
stats.result.Latency = append(stats.result.Latency, percentLatency{Percent: -1, Value: time.Duration(avg)})
}
}
// SortLatency blocks the output
func (stats *Stats) SortLatency() {
stats.sortLatency = true
}
// Print writes textual output of the Stats.
func (stats *Stats) Print(w io.Writer) {
stats.maybeUpdate()
if stats.histogram == nil {
fmt.Fprint(w, "Histogram (empty)\n")
} else {
fmt.Fprintf(w, "Histogram (unit: %s)\n", fmt.Sprintf("%v", stats.unit)[1:])
stats.histogram.PrintWithUnit(w, float64(stats.unit))
}
}
// String returns the textual output of the Stats as string.
func (stats *Stats) String() string {
var b bytes.Buffer
stats.Print(&b)
return b.String()
}

View File

@ -1,208 +0,0 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package stats
import (
"bufio"
"bytes"
"fmt"
"os"
"runtime"
"sort"
"strings"
"sync"
"testing"
)
var (
curB *testing.B
curBenchName string
curStats map[string]*Stats
orgStdout *os.File
nextOutPos int
injectCond *sync.Cond
injectDone chan struct{}
)
// AddStats adds a new unnamed Stats instance to the current benchmark. You need
// to run benchmarks by calling RunTestMain() to inject the stats to the
// benchmark results. If numBuckets is not positive, the default value (16) will
// be used. Please note that this calls b.ResetTimer() since it may be blocked
// until the previous benchmark stats is printed out. So AddStats() should
// typically be called at the very beginning of each benchmark function.
func AddStats(b *testing.B, numBuckets int) *Stats {
return AddStatsWithName(b, "", numBuckets)
}
// AddStatsWithName adds a new named Stats instance to the current benchmark.
// With this, you can add multiple stats in a single benchmark. You need
// to run benchmarks by calling RunTestMain() to inject the stats to the
// benchmark results. If numBuckets is not positive, the default value (16) will
// be used. Please note that this calls b.ResetTimer() since it may be blocked
// until the previous benchmark stats is printed out. So AddStatsWithName()
// should typically be called at the very beginning of each benchmark function.
func AddStatsWithName(b *testing.B, name string, numBuckets int) *Stats {
var benchName string
for i := 1; ; i++ {
pc, _, _, ok := runtime.Caller(i)
if !ok {
panic("benchmark function not found")
}
p := strings.Split(runtime.FuncForPC(pc).Name(), ".")
benchName = p[len(p)-1]
if strings.HasPrefix(benchName, "run") {
break
}
}
procs := runtime.GOMAXPROCS(-1)
if procs != 1 {
benchName = fmt.Sprintf("%s-%d", benchName, procs)
}
stats := NewStats(numBuckets)
if injectCond != nil {
// We need to wait until the previous benchmark stats is printed out.
injectCond.L.Lock()
for curB != nil && curBenchName != benchName {
injectCond.Wait()
}
curB = b
curBenchName = benchName
curStats[name] = stats
injectCond.L.Unlock()
}
b.ResetTimer()
return stats
}
// RunTestMain runs the tests with enabling injection of benchmark stats. It
// returns an exit code to pass to os.Exit.
func RunTestMain(m *testing.M) int {
startStatsInjector()
defer stopStatsInjector()
return m.Run()
}
// startStatsInjector starts stats injection to benchmark results.
func startStatsInjector() {
orgStdout = os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w
nextOutPos = 0
resetCurBenchStats()
injectCond = sync.NewCond(&sync.Mutex{})
injectDone = make(chan struct{})
go func() {
defer close(injectDone)
scanner := bufio.NewScanner(r)
scanner.Split(splitLines)
for scanner.Scan() {
injectStatsIfFinished(scanner.Text())
}
if err := scanner.Err(); err != nil {
panic(err)
}
}()
}
// stopStatsInjector stops stats injection and restores os.Stdout.
func stopStatsInjector() {
os.Stdout.Close()
<-injectDone
injectCond = nil
os.Stdout = orgStdout
}
// splitLines is a split function for a bufio.Scanner that returns each line
// of text, teeing texts to the original stdout even before each line ends.
func splitLines(data []byte, eof bool) (advance int, token []byte, err error) {
if eof && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
orgStdout.Write(data[nextOutPos : i+1])
nextOutPos = 0
return i + 1, data[0:i], nil
}
orgStdout.Write(data[nextOutPos:])
nextOutPos = len(data)
if eof {
// This is a final, non-terminated line. Return it.
return len(data), data, nil
}
return 0, nil, nil
}
// injectStatsIfFinished prints out the stats if the current benchmark finishes.
func injectStatsIfFinished(line string) {
injectCond.L.Lock()
defer injectCond.L.Unlock()
// We assume that the benchmark results start with "Benchmark".
if curB == nil || !strings.HasPrefix(line, "Benchmark") {
return
}
if !curB.Failed() {
// Output all stats in alphabetical order.
names := make([]string, 0, len(curStats))
for name := range curStats {
names = append(names, name)
}
sort.Strings(names)
for _, name := range names {
stats := curStats[name]
// The output of stats starts with a header like "Histogram (unit: ms)"
// followed by statistical properties and the buckets. Add the stats name
// if it is a named stats and indent them as Go testing outputs.
lines := strings.Split(stats.String(), "\n")
if n := len(lines); n > 0 {
if name != "" {
name = ": " + name
}
fmt.Fprintf(orgStdout, "--- %s%s\n", lines[0], name)
for _, line := range lines[1 : n-1] {
fmt.Fprintf(orgStdout, "\t%s\n", line)
}
}
}
}
resetCurBenchStats()
injectCond.Signal()
}
// resetCurBenchStats resets the current benchmark stats.
func resetCurBenchStats() {
curB = nil
curBenchName = ""
curStats = make(map[string]*Stats)
}