From ee8ed34bcf2c530f6081379b057f29fc39d20dae Mon Sep 17 00:00:00 2001 From: apolcyn Date: Mon, 3 Apr 2017 15:53:03 -0700 Subject: [PATCH] get more metrics from go benchmark servers (#913) * add user and system cpu usage to go benchmarks * fix import and style issues * sample elapsed time diffs after merging histograms * style fixes and variables renames * add pprof server to benchmark workers to grab different profile stats * rename variables for consistency and default to no pprof --- benchmark/worker/benchmark_client.go | 32 +++++++++++++++++------- benchmark/worker/benchmark_server.go | 37 +++++++++++++++++++++------- benchmark/worker/main.go | 17 +++++++++++-- benchmark/worker/util.go | 15 +++++++++++ 4 files changed, 81 insertions(+), 20 deletions(-) diff --git a/benchmark/worker/benchmark_client.go b/benchmark/worker/benchmark_client.go index 199bbe1f..dfe8c8f0 100644 --- a/benchmark/worker/benchmark_client.go +++ b/benchmark/worker/benchmark_client.go @@ -37,6 +37,7 @@ import ( "math" "runtime" "sync" + "syscall" "time" "golang.org/x/net/context" @@ -85,6 +86,7 @@ type benchmarkClient struct { lastResetTime time.Time histogramOptions stats.HistogramOptions lockingHistograms []lockingHistogram + rusageLastReset *syscall.Rusage } func printClientConfig(config *testpb.ClientConfig) { @@ -226,6 +228,9 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) return nil, err } + rusage := new(syscall.Rusage) + syscall.Getrusage(syscall.RUSAGE_SELF, rusage) + rpcCountPerConn := int(config.OutstandingRpcsPerChannel) bc := &benchmarkClient{ histogramOptions: stats.HistogramOptions{ @@ -236,9 +241,10 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error) }, lockingHistograms: make([]lockingHistogram, rpcCountPerConn*len(conns), rpcCountPerConn*len(conns)), - stop: make(chan bool), - lastResetTime: time.Now(), - closeConns: closeConns, + stop: make(chan bool), + lastResetTime: time.Now(), + closeConns: closeConns, + rusageLastReset: rusage, } if err = performRPCs(config, conns, bc); err != nil { @@ -338,8 +344,9 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou // getStats returns the stats for benchmark client. // It resets lastResetTime and all histograms if argument reset is true. func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { - var timeElapsed float64 + var wallTimeElapsed, uTimeElapsed, sTimeElapsed float64 mergedHistogram := stats.NewHistogram(bc.histogramOptions) + latestRusage := new(syscall.Rusage) if reset { // Merging histogram may take some time. @@ -353,14 +360,21 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { mergedHistogram.Merge(toMerge[i]) } - timeElapsed = time.Since(bc.lastResetTime).Seconds() + wallTimeElapsed = time.Since(bc.lastResetTime).Seconds() + syscall.Getrusage(syscall.RUSAGE_SELF, latestRusage) + uTimeElapsed, sTimeElapsed = cpuTimeDiff(bc.rusageLastReset, latestRusage) + + bc.rusageLastReset = latestRusage bc.lastResetTime = time.Now() } else { // Merge only, not reset. for i := range bc.lockingHistograms { bc.lockingHistograms[i].mergeInto(mergedHistogram) } - timeElapsed = time.Since(bc.lastResetTime).Seconds() + + wallTimeElapsed = time.Since(bc.lastResetTime).Seconds() + syscall.Getrusage(syscall.RUSAGE_SELF, latestRusage) + uTimeElapsed, sTimeElapsed = cpuTimeDiff(bc.rusageLastReset, latestRusage) } b := make([]uint32, len(mergedHistogram.Buckets), len(mergedHistogram.Buckets)) @@ -376,9 +390,9 @@ func (bc *benchmarkClient) getStats(reset bool) *testpb.ClientStats { SumOfSquares: float64(mergedHistogram.SumOfSquares), Count: float64(mergedHistogram.Count), }, - TimeElapsed: timeElapsed, - TimeUser: 0, - TimeSystem: 0, + TimeElapsed: wallTimeElapsed, + TimeUser: uTimeElapsed, + TimeSystem: sTimeElapsed, } } diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go index 667ef2c1..0d20581d 100644 --- a/benchmark/worker/benchmark_server.go +++ b/benchmark/worker/benchmark_server.go @@ -38,6 +38,7 @@ import ( "strconv" "strings" "sync" + "syscall" "time" "google.golang.org/grpc" @@ -55,11 +56,12 @@ var ( ) type benchmarkServer struct { - port int - cores int - closeFunc func() - mu sync.RWMutex - lastResetTime time.Time + port int + cores int + closeFunc func() + mu sync.RWMutex + lastResetTime time.Time + rusageLastReset *syscall.Rusage } func printServerConfig(config *testpb.ServerConfig) { @@ -156,18 +158,35 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma grpclog.Fatalf("failed to get port number from server address: %v", err) } - return &benchmarkServer{port: p, cores: numOfCores, closeFunc: closeFunc, lastResetTime: time.Now()}, nil + rusage := new(syscall.Rusage) + syscall.Getrusage(syscall.RUSAGE_SELF, rusage) + + return &benchmarkServer{ + port: p, + cores: numOfCores, + closeFunc: closeFunc, + lastResetTime: time.Now(), + rusageLastReset: rusage, + }, nil } // getStats returns the stats for benchmark server. // It resets lastResetTime if argument reset is true. func (bs *benchmarkServer) getStats(reset bool) *testpb.ServerStats { - // TODO wall time, sys time, user time. bs.mu.RLock() defer bs.mu.RUnlock() - timeElapsed := time.Since(bs.lastResetTime).Seconds() + wallTimeElapsed := time.Since(bs.lastResetTime).Seconds() + rusageLatest := new(syscall.Rusage) + syscall.Getrusage(syscall.RUSAGE_SELF, rusageLatest) + uTimeElapsed, sTimeElapsed := cpuTimeDiff(bs.rusageLastReset, rusageLatest) + if reset { bs.lastResetTime = time.Now() + bs.rusageLastReset = rusageLatest + } + return &testpb.ServerStats{ + TimeElapsed: wallTimeElapsed, + TimeUser: uTimeElapsed, + TimeSystem: sTimeElapsed, } - return &testpb.ServerStats{TimeElapsed: timeElapsed, TimeUser: 0, TimeSystem: 0} } diff --git a/benchmark/worker/main.go b/benchmark/worker/main.go index 17c52519..8a804062 100644 --- a/benchmark/worker/main.go +++ b/benchmark/worker/main.go @@ -38,6 +38,8 @@ import ( "fmt" "io" "net" + "net/http" + _ "net/http/pprof" "runtime" "strconv" "time" @@ -50,8 +52,10 @@ import ( ) var ( - driverPort = flag.Int("driver_port", 10000, "port for communication with driver") - serverPort = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message") + driverPort = flag.Int("driver_port", 10000, "port for communication with driver") + serverPort = flag.Int("server_port", 0, "port for benchmark server if not specified by server config message") + pprofPort = flag.Int("pprof_port", -1, "Port for pprof debug server to listen on. Pprof server doesn't start if unset") + blockProfRate = flag.Int("block_prof_rate", 0, "fraction of goroutine blocking events to report in blocking profile") ) type byteBufCodec struct { @@ -227,5 +231,14 @@ func main() { s.Stop() }() + runtime.SetBlockProfileRate(*blockProfRate) + + if *pprofPort >= 0 { + go func() { + grpclog.Println("Starting pprof server on port " + strconv.Itoa(*pprofPort)) + grpclog.Println(http.ListenAndServe("localhost:"+strconv.Itoa(*pprofPort), nil)) + }() + } + s.Serve(lis) } diff --git a/benchmark/worker/util.go b/benchmark/worker/util.go index f0016ce4..6f9b2b03 100644 --- a/benchmark/worker/util.go +++ b/benchmark/worker/util.go @@ -36,6 +36,7 @@ import ( "log" "os" "path/filepath" + "syscall" ) // abs returns the absolute path the given relative file or directory path, @@ -52,6 +53,20 @@ func abs(rel string) string { return filepath.Join(v, rel) } +func cpuTimeDiff(first *syscall.Rusage, latest *syscall.Rusage) (float64, float64) { + var ( + utimeDiffs = latest.Utime.Sec - first.Utime.Sec + utimeDiffus = latest.Utime.Usec - first.Utime.Usec + stimeDiffs = latest.Stime.Sec - first.Stime.Sec + stimeDiffus = latest.Stime.Usec - first.Stime.Usec + ) + + uTimeElapsed := float64(utimeDiffs) + float64(utimeDiffus)*1.0e-6 + sTimeElapsed := float64(stimeDiffs) + float64(stimeDiffus)*1.0e-6 + + return uTimeElapsed, sTimeElapsed +} + func goPackagePath(pkg string) (path string, err error) { gp := os.Getenv("GOPATH") if gp == "" {